P99 CONF Agenda Now Online

P99 CONF is the conference for low-latency high-performance distributed systems. An event by engineers for engineers, P99 CONF brings together speakers from across the tech landscape spanning all aspects, from architecture and design, to the latest techniques in operating systems and development languages, to databases and streaming architectures, to real-time operations and observability.

P99 CONF is a free online virtual event scheduled for Wednesday and Thursday, October 6th and 7th, from 8:25 AM to 1:00 PM Pacific Daylight Time (PDT).

We recently published the full agenda, and wanted to share the highlights with you. You will find sessions with speakers from companies like Twitter, Netflix, WarnerMedia, Mozilla, Confluent, Red Hat, VMWare, Splunk, Datadog, Dynatrace, ScyllaDB, Couchbase, Redis Labs and more.

REGISTER NOW FOR P99 CONF

Keynote Speakers

  • Brian Martin, Twitter Site Reliability Engineer, will  talk about  reimplementing Pelikan, the unified cache project, in Whoops! I Wrote it in Rust!
  • Avi Kivity, ScyllaDB CTO, will speak on Keeping Latency Low and Throughput High with Application-level Priority Management
  • Simon Ritter, Azul Deputy CTO, presents Thursday on how to Get Lower Latency and Higher Throughput for Java Applications
  • Steve Rostedt, VMware Open Source Engineer, will dig deep into fstrace in his talk on New Ways to Find Latency in Linux Using Tracing

General Sessions

Our general session tracks will run in two virtual stages, with each session running a brisk 20 minutes.

Wednesday, October 6th

  • Peter Zaitsev, Percona CEO, will describe USE, RED and Golden Signals methods in his talk Performance Analysis and Troubleshooting Methodologies for Databases.
  • Abel Gordon, Lightbit Labs Chief System Architect, will show how his team is Vanquishing Latency Outliers in the Lightbits LightOS Software Defined Storage System.
  • Karthik Ramaswamy, Splunk Senior Director of Engineering, will show how data, including logs and metrics, can be processed at scale and speed in his talk Scaling Apache Pulsar to 10 Petabytes/Day.
  • Denis Rystsov, Vectorized Staff Engineer, will discuss how you can tune for performance without discarding distributed transaction guarantees in his session Is it Faster to Go With Redpanda Transactions than Without Them?
  • Heinrich Hartmann, Zalando Site Reliability Engineer, will go into the deeper maths and technical considerations in his level-setting session, How to Measure Latency.
  • Thomas Dullien, Optimyze.cloud CEO, will uncover all the hidden places where you can recover your wasted CPU resources in his session Where Did All These Cycles Go?
  • Daniel Bristot de Oliveira, Red Hat Principal Software Engineer, will look into operating system noise in his talk OSNoise Tracer: Who is Stealing My CPU Time?
  • Marc Richards, Talawah Solutions, will show how he was able to optimize utilization in his talk Extreme HTTP Performance Tuning: 1.2M API req/s on a 4 vCPU EC2 Instance.
  • Sam Just, Red Hat Principal Software Engineer, will describe the underlying architecture of their next-generation distributed filesystem to take advantage of emerging storage technologies in his talk Seastore: Next Generation Backing Store for Ceph.
  • Orit Wasserman, OpenShift Data Foundation Architect, will talk about implementing Seastar, a highly asynchronous engine as a new foundation for the Ceph distributed storage system in her talk Crimson: Ceph for the Age of NVMe and Persistent Memory
  • Pavel Emelyanov, ScyllaDB Developer, will show how, with the latest generation of high-performance options, storage is no longer the same bottleneck it once was in his session What We Need to Unlearn about Persistent Storage
  • Glauber Costa, Datadog Staff Software Engineer, will address the performance concern on the top of developer minds in his session Rust Is Safe. But Is It Fast?
  • Jim Blandy, Mozilla Staff Software Engineer, will continue the dialogue on using Rust for I/O-bound tasks in his talk Wait, What is Rust Async Actually Good For?
  • Bryan Cantrill, Oxide Computer Company CTO, will look ahead to the coming decade of development in his session on Rust, Wright’s Law, and the Future of Low-Latency Systems
  • Felix Geisendörfer, Datadog Staff Engineer, will uncover the unique aspects of the Go runtime and interoperability with tools like Linux perf and bpftrace in his session Continuous Go Profiling & Observability

Thursday, October 7th

  • Tanel Poder will introduce a new eBPF script for Continuous Monitoring of IO Latency Reasons and Outliers
  • Bryan McCoid, Couchbase Senior Software Engineer, will speak on the latest tools in the Linux kernel in his talk High-Performance Networking Using eBPF, XDP, and io_uring
  • Yarden Shafir, CrowdStrike Software Engineer, will present on I/O Rings and You — Optimizing I/O on Windows
  • Shlomi Livne, ScyllaDB VP of Research & Development, will engage audiences on How to Meet your P99 Goal While Overcommitting Another Workload
  • Miles Ward, SADA CTO, will share his insights on building and maintaining Multi-cloud State for Kubernetes (K8s): Anthos and ScyllaDB
  • Andreas Grabner, Dynatrace DevOps Activist, will also be talking about Kubernetes in his session on Using SLOs for Continuous Performance Optimizations of Your k8s Workloads
  • Tejas Chopra, Netflix Senior Software Engineer, will describe how Netflix reaches petabyte scale in his talk on Object Compaction in Cloud for High Yield
  • Felipe Oliveira, Redis Labs Performance Engineer, will speak on Data Structures for High Resolution and Real Time Telemetry at Scale
  • Gunnar Morling, Red Hat Principal Software Engineer, will show off the capabilities of the Java JDK Flight Recorder in his session Continuous Performance Regression Testing with JfrUnit
  • Pere Urbón-Bayes, Confluent Senior Solution Architect, will show audiences how to measure, evaluate and optimize performance in his talk on Understanding Apache Kafka P99 Latency at Scale
  • Waldek Kozaczuk, OSv Committer, will talk about building components for the video supply chain of CNN in his talk OSv Unikernel — Optimizing Guest OS to Run Stateless and Serverless Apps in the Cloud
  • Felipe Huici, NEC Laboratories Europe, will showcase the utility and design of unikernels in his talk Unikraft: Fast, Specialized Unikernels the Easy Way
  • Roman Shaposhnik, VP Product and Strategy, and Kathy Giori, Ecosystem Engagement Lead, both of Zededa will speak on RISC-V on Edge: Porting EVE and Alpine Linux to RISC-V
  • Konstantine Osipov, ScyllaDB Director of Engineering, will address the tradeoffs between hash or range-based sharding in his talk on Avoiding Data Hotspots at Scale

Register Now

While P99 CONF is totally free, open and online, make sure you register now to keep the dates booked on your calendar. We’re just six weeks to the event and are looking forward to seeing you all there!

REGISTER FOR P99 CONF NOW!

The post P99 CONF Agenda Now Online appeared first on ScyllaDB.

Apache Cassandra 4.0 vs. Scylla 4.4: Comparing Performance

This is part two of a two-part blog series on the relative performance of the recently released Apache Cassandra 4.0. In part one, we compared Cassandra 4.0 vs. Cassandra 3.11. In part two we will compare Apache Cassandra 4.0 and 3.11 with the performance of Scylla Open Source 4.4.

On July 27, 2021, after almost six years of work, the engineers behind Apache Cassandra bumped its major revision number from 3 to 4. Over almost the same period of time, Scylla emerged from its earliest beta (October 2015), proceeded through four major releases, and is currently at minor release 4.4.

In the fast-paced world of big data many other advances have occurred:  there are new JVMs, new system kernels, new hardware, new libraries and even new algorithms. Progress in all those areas presented Cassandra with some unprecedented opportunities to achieve new levels of performance. Similarly, Scylla did not stand still over this period, as we consistently improved our NoSQL database engine with new features and optimizations.

Let’s compare the performance of the latest release of Scylla Open Source 4.4 against Cassandra 4.0 and Cassandra 3.11. We measured the latencies and throughputs at different loads, as well as the speed of common administrative operations like adding/replacing a node or running major compactions.

TL;DR Scylla Open Source 4.4 vs. Cassandra 4.0 Results

The detailed results and the fully optimized setup instructions are shared below. We compared two deployment options in the AWS EC2 environment:

  1. The first is an apples-to-apples comparison of 3-node clusters.
  2. The second is a larger-scale setup where we used node sizes optimal for each database. Scylla can utilize very large nodes so we compared a setup of 4 i3.metal machines (288 vCPUs in total) vs. 40 (!) i3.4xlarge Cassandra machines (640 vCPUs in total — almost 2.5x the Scylla’s resources).

Key findings:

  • Cassandra 4.0 has better P99 latency than Cassandra 3.11 by 100x!
  • Cassandra 4.0 speeds up admin operations by up to 34% compared to Cassandra 3.11
  • Scylla has 2x-5x better throughput than Cassandra 4.0 on the same 3-node cluster
  • Scylla has 3x-8x better throughput than Cassandra 4.0 on the same 3-node cluster while P99 <10ms
  • Scylla adds a node 3x faster than Cassandra 4.0
  • Scylla replaces a node 4x faster than Cassandra 4.0
  • Scylla doubles a 3-node cluster capacity 2.5x faster than Cassandra 4.0
  • A 40 TB cluster is 2.5x cheaper with Scylla while providing 42% more throughput under P99 latency of 10 ms
  • Scylla adds 25% capacity to a 40 TB optimized cluster 11x faster than Cassandra 4.0.
  • Scylla finishes compaction 32x faster than Cassandra 4.0
  • Cassandra 4.0 can achieve a better latency with 40 i3.4xlarge nodes than 4 i3.metal Scylla nodes when the throughput is low and the cluster is being underutilized. Explanation below.

A peek into the results: the 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Both Cassandras quickly become functionally nonoperational, serving requests with tail latencies that exceed 1 second.

A peek into the results: the 99-percentile (P99) latencies in different scenarios, as measured on 3 x i3.4xlarge machines (48 vCPUs in total) under load that puts Cassandra 4.0 at halfway to saturation. Scylla excels at response times: Apache Cassandra 4.0 P99 latencies are anywhere between 80% to 2,200% greater than Scylla 4.4.

A peek into the results: the maximum throughput (measured in operations per second) achieved on 3 x i3.4xlarge machines (48 vCPUs). Scylla leads the pack, processing from 2x to 5x more requests than either of the Cassandras.

A peek into the results: the time taken by replacing a 1 TB node, measured under Size-Tiered Compaction Strategy (STCS) and Leveled Compaction Strategy (LCS). By default (STCS) Scylla is almost 4x faster than Cassandra 4.0.

A peek into the results: latencies of SELECT query, as measured on 40 TB cluster on uneven hardware — 4 nodes (288 vCPUs) for Scylla and 40 nodes (640 vCPUs) for Apache Cassandra.

Limitations of Our Testing

It’s important to note that this basic Apache Cassandra 4.0 performance analysis does not cover all factors in deciding whether to stay put on Cassandra 3.x, upgrade to Cassandra 4.0, or to migrate to Scylla Open Source 4.4. Users may be wondering if the new features of Cassandra 4.0 are compelling enough, or how changes between implemented features compare between Cassandra and Scylla. For instance, you can read more about the difference in CDC implementations here, and how Scylla’s Lightweight Transactions (LWT) differ from Cassandra’s here. Apart from comparison of basic administrative tasks like adding one or more nodes which is covered below, benchmarking implementation of specific features is beyond the scope of consideration.

Plus there are issues of risk aversion based on stability and maturity for any new software release — for  example, the ZGC garbage collector we used currently employs Java 16, which is supported by Cassandra, but not considered production-ready; newer JVMs are not officially supported by Cassandra yet.

Cluster of Three i3.4xlarge Nodes

3-Node Test Setup

The purpose of this test was to compare the performance of Scylla vs. both versions of Cassandra on the exact same hardware. We wanted to use relatively typical current generation servers on AWS so that others could replicate our tests, and reflect a real-world setup.

Cassandra/Scylla Loaders
EC2 Instance type i3.4xlarge c5n.9xlarge
Cluster size 3 3
vCPUs (total) 16 (48) 36 (108)
RAM (total) 122 (366) GiB 96 (288) GiB
Storage (total) 2x 1.9TB NVMe in RAID0 (3.8 TB) Not important for a loader (EBS-only)
Network Up to 10 Gbps 50 Gbps

We set up our cluster on Amazon EC2, in a single Availability Zone within us-east-2. Database cluster servers were initialized with clean machine images (AMIs), running CentOS 7.9 with Scylla Open Source 4.4 and Ubuntu 20.04 with Cassandra 4.0 or Cassandra 3.11 (which we’ll refer to as “C*4” and “C*3”, respectively).

Apart from the cluster, three loader machines were employed to run cassandra-stress in order to insert data and, later, provide background load to mess with the administrative operations.

Once up and running, the databases were loaded by cassandra-stress with random data organized into the default schema at RF=3. The loading continues until the cluster’s total disk usage reaches approx. 3 TB (or 1 TB per node). The exact disk occupancy would, of course, depend on running compactions and the size of other related files (commitlogs, etc.). Based on the size of the payload, this translated to ~3.43 billion partitions. Then we flushed the data and waited until the compactions finished, so we can start the actual NoSQL database benchmarking.

Throughput and Latencies

The actual benchmarking is a series of simple invocations of cassandra-stress with CL=QUORUM. For 30 minutes we keep firing 10,000 requests per second and monitor the latencies. Then we increase the request rate by another 10,000 for another 30 min, and so on. (20,000 in case of larger throughputs). The procedure repeats until the DB is no longer capable of withstanding the traffic, i.e. until cassandra-stress cannot achieve the desired throughput or until the 90-percentile latencies exceed 1 second.

Note: This approach means that throughput numbers are presented with 10k/s granularity (in some cases 20k/s).

We have tested our databases with the following distributions of data:

  1. “Real-life” (Gaussian) distribution, with sensible cache-hit ratios of 30-60%
  2. Uniform distribution, with a close-to-zero cache hit ratio
  3. “In-memory” distribution, expected to yield almost 100% cache hits

Within these scenarios we ran the following workloads:

  • 100% writes
  • 100% reads
  • 50% writes and 50% reads

“Real-life” (Gaussian) Distribution

In this scenario we issue queries that touch partitions randomly drawn from a narrow Gaussian distribution. We make an Ansatz about the bell curve: we assume that its six-sigma spans the RAM of the cluster (corrected for the replication factor). The purpose of this experiment is to model a realistic workload, with a substantial cache hit ratio but less than 100%, because most of our users observe the figures of 60-90%. We can expect Cassandra to perform well in this scenario because its key cache is denser than Scylla’s, i.e. it efficiently stores data in RAM, though it relies on SSTables stored in the OS page cache which can be heavyweight to look up. By comparison, Scylla uses a row-based cache mechanism. This Gaussian distribution test should indicate which uses the more efficient caching mechanism for reads.

Mixed Workload – 50% reads and 50% writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload consists of 50% reads and 50% writes, randomly targeting a “realistic” Gaussian distribution. C*3 quickly becomes nonoperational, C*4 is a little better, meanwhile Scylla maintains low and consistent write latencies in the entire range.

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 80k/s 40k/s 30k/s 1.33x 2x
Maximum throughput with 90% latency <10ms 80k/s 30k/s 10k/s 3x 2.66x
Maximum throughput with 99% latency <10ms 80k/s 30k/s 2.66x

The 90- and 99-percentile latencies of SELECT queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload consists of 50% reads and 50% writes, randomly targeting a “realistic” Gaussian distribution. C*3 quickly becomes nonoperational, C*4 is a little better; meanwhile Scylla maintains low and consistent response times in the entire range.

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 90k/s 40k/s 40k/s 1x 2.25x
Maximum throughput with 90% latency <10ms 80k/s 30k/s 10k/s 3x 2.66x
Maximum throughput with 99% latency <10ms 70k/s 10k/s 7x

Uniform Distribution (disk-intensive, low cache hit ratio)

In this scenario we issue queries that touch random partitions of the entire dataset. In our setup this should result in high disk traffic and/or negligible cache hit rates, i.e. that of a few %.

Writes Workload – Only Writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed, i.e. every partition in the 1 TB dataset has an equal chance of being updated. C*3 quickly becomes nonoperational, C*4 is a little better; meanwhile Scylla maintains low and consistent write latencies up until 170,000-180,000 ops/s.

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 180k/s 50k/s 40k/s 1.25x 3.6x
Maximum throughput with 90% latency <10ms 180k/s 40k/s 20k/s 2x 3.5x
Maximum throughput with 99% latency <10ms 170k/s 30k/s 5.66x

Reads Workload – Only Reads

The 90- and 99-percentile latencies of SELECT queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed, i.e. every partition in the 1 TB dataset has an equal chance of being selected. Scylla serves 90% of queries in a <5 ms time until the load reaches 70’000 ops/s. Please note that almost all reads are served from disk.

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 80k/s 40k/s 30k/s 1.25x 2x
Maximum throughput with 90% latency <10ms 70k/s 40k/s 30k/s 1.25x 1.75x
Maximum throughput with 99% latency <10ms 60k/s 20k/s 3x

Mixed Workload – 50% reads and 50% writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed, i.e. every partition in the 1 TB dataset has an equal chance of being selected/updated. At 80,000 ops/s Scylla maintains the latencies of 99% of queries in a single-figure regime (in milliseconds).

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 90k/s 40k/s 40k/s 1x 2.25x
Maximum throughput with 90% latency <10ms 80k/s 40k/s 20k/s 2x 2x
Maximum throughput with 99% latency <10ms 80k/s 30k/s 2.66x

The 90- and 99-percentile latencies of SELECT queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed, i.e. every partition in the 1 TB dataset has an equal chance of being selected/updated. Under such conditions Scylla can handle over 2x more traffic and offers highly predictable response times.

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 90k/s 40k/s 40k/s 1x 2.25x
Maximum throughput with 90% latency <10ms 80k/s 30k/s 20k/s 1.5x 2.66x
Maximum throughput with 99% latency <10ms 60k/s 20k/s 3x

Uniform Distribution (memory-intensive, high cache hit ratio)

In this scenario we issue queries touching random partitions from a small subset of the dataset, specifically: one that fits into RAM. To be sure that our subset resides in cache and thus no disk IO is triggered, we choose it to be safely small, at an arbitrarily picked value of 60 GB. The goal here is to evaluate both databases at the other extreme end: where they both serve as pure in-memory datastores.

Writes Workload – Only Writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed over 60 GB of data, so that every partition resides in cache and has an equal chance of being updated. Cassandras instantly become nonoperational; Scylla withstands over 5x higher load and maintains low and consistent write latencies over the entire range.

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 200k/s 40k/s 40k/s 1x 5x
Maximum throughput with 90% latency <10ms 200k/s 40k/s 20k/s 2x 5x
Maximum throughput with 99% latency <10ms 200k/s 40k/s 5x

Reads Workload – Only Reads

The 90- and 99-percentile latencies of SELECT queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed over 60 GB of data, so that every partition resides in cache and has an equal chance of being selected. Scylla withstands over 3x higher load than C*4 and 4x greater than C*3.

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 300k/s 80k/s 60k/s 1.33x 3.75x
Maximum throughput with 90% latency <10ms 260k/s 60k/s 40k/s 1.5x 4.33x
Maximum throughput with 99% latency <10ms 240k/s 40k/s 6x

Mixed Workload – 50% reads and 50% writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed over 60 GB of data, so that every partition resides in cache and has an equal chance of being selected/updated. Scylla withstands over 3x higher load than any of the Cassandras.

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 180k/s 40k/s 40k/s 1x 4.5x
Maximum throughput with 90% latency <10ms 160k/s 40k/s 20k/s 2x 4x
Maximum throughput with 99% latency <10ms 160k/s 40k/s 4x

The 90- and 99-percentile latencies of SELECT queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed over 60 GB of data, so that every partition resides in cache and has an equal chance of being selected/updated. Scylla withstands over 3x higher load than any of the Cassandras.

Metric Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 180k/s 40k/s 40k/s 1x 4.5x
Maximum throughput with 90% latency <10ms 160k/s 40k/s 20k/s 2x 4x
Maximum throughput with 99% latency <10ms 160k/s 20k/s 8x

Adding Nodes

The timeline of adding 3 nodes to an already existing 3-node cluster (ending up with six i3.4xlarge machines). Total time for Scylla 4.4 to double the cluster size was 94 minutes 57 seconds. For Cassandra 4.0, it took 238 minutes 21 seconds (just shy of 4 hours); Cassandra 3.11 took 270 minutes (4.5 hours). While Cassandra 4.0 noted a 12% improvement over Cassandra 3.11, Scylla completes the entire operation even before either version of Cassandra bootstraps its first new node.

One New Node

In this benchmark, we measured how long it takes to add a new node to the cluster. The reported times are the intervals between starting a Scylla/Cassandra node and having it fully finished bootstrapping (CQL port open).

Cassandra 4.0 is equipped with a new feature — Zero Copy Streaming — which allows for efficient streaming of entire SSTables. An SSTable is eligible for ZCS if all of its partitions need to be transferred, which can be the case when LeveledCompactionStrategy (LCS) is enabled. Willing to demonstrate this feature, we run the next benchmarks with the usual SizeTieredCompactionStrategy (STCS) compared to LCS.

The time needed to add a node to an already existing 3-node cluster (ending up with 4 i3.4xlarge machines). Cluster is initially loaded with 1 TB of data at RF=3. Cassandra 4.0 showed an improvement over Cassandra 3.11, but Scylla still wins by a huge margin.

Strategy Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11
STCS 36 minutes 56 seconds 1 hour 47 minutes 1 second 2 hours 6 minutes
LCS 44 minutes 11 seconds 1 hour 39 minutes 45 seconds 2 hours 23 minutes 10 seconds
Strategy Cassandra 4.0 vs Cassandra 3.11 Scylla 4.4.3 vs Cassandra 4.0
STCS -15% -65%
LCS -30% -55%

Doubling Cluster Size

In this benchmark, we measured how long it takes to double the cluster node count, going from 3 nodes to 6 nodes. Three new nodes are added sequentially, i.e. waiting for the previous one to fully bootstrap before starting the next one. The reported time spans from the instant the startup of the first new node is initiated, all the way until the bootstrap of the third new node finishes.

The time needed to add 3 nodes to an already existing 3-node cluster of i3.4xlarge machines, preloaded with 1 TB of data at RF=3. Cassandra 4.0 performed moderately better than Cassandra 3.11. but Scylla still leads the pack.

Strategy Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11
STCS 1 hour 34 minutes 57 seconds 3 hours 58 minutes 21 seconds 4 hours 30 minutes 7 seconds
LCS 2 hours 2 minutes 37 seconds 3 hours 44 minutes 6 seconds 4 hours 44 minutes 46 seconds
Strategy Cassandra 4.0 vs Cassandra 3.11 Scylla 4.4.3 vs Cassandra 4.0
STCS -11% -60%
LCS -21% -45%

Replace node

In this benchmark, we measured how long it took to replace a single node. One of the nodes is brought down and another one is started in its place. Throughout this process the cluster is being agitated by a mixed R/W background load of 25,000 ops at CL=QUORUM.

The time needed to replace a node in a 3-node cluster of i3.4xlarge machines, preloaded with 1 TB of data at RF=3. Cassandra 4.0 noted an improvements over Cassandra 3.11. but Scylla is still the clear winner, taking around an hour to do what Cassandra 4.0 took over 3 hours to accomplish.

Strategy Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11
STCS 54 minutes 19 seconds 3 hours 28 minutes 46 seconds 4 hours 35 minutes 56 seconds
LCS 1 hour 9 minutes 18 seconds 3 hours 19 minutes 17 seconds 5 hours 4 minutes 9 seconds
Strategy Cassandra 4.0 vs Cassandra 3.11 Scylla 4.4.3 vs Cassandra 4.0
STCS -24% -73%
LCS -34% -65%

Major Compaction

In this benchmark, we measured how long it takes to perform a major compaction on a single node loaded with roughly 1TB of data. Thanks to Scylla’s sharded architecture, it can perform the major compactions on each shard concurrently, while Cassandra is single-thread bound. The result of major compaction is the same in both Scylla and Cassandra: a read is served by a single SSTable. In the later section of this blogpost we also measure the speed of a major compaction in a case where there are many small Cassandra nodes (which get higher parallelism). We observed worse major compaction performance in Cassandra 4.0.0 with the default num_tokens: 16 parameter.

Major compaction of 1 TB of data at RF=1 on i3.4xlarge machine. Scylla demonstrates the power of sharded architecture by compacting on all cores concurrently. In our case Scylla is up to 60x faster and this figure should continue to scale linearly with the number of cores.

Scylla 4.4.3 Cassandra 4.0 Cassandra 3.11
Major Compaction (num_tokens: 16) num_tokens: 16
not recommended
21 hours, 47 minutes, 34 seconds
(78,454 seconds)
24 hours, 50 minutes, 42 seconds
(89,442 seconds)
Major Compaction (num_tokens: 256) 36 minutes, 8 seconds (2,168 seconds) 37 hours, 56 minutes, 32 seconds
(136,592 seconds)
23 hours, 48 minutes, 56 seconds
(85,736 seconds)

“4 vs. 40” Benchmark

Now let us compare both databases installed on different hardware, where Scylla gets four powerful 72-core servers, meanwhile Cassandra gets the same i3.4xlarge servers as before, just… forty of them. Why would anyone ever consider such a test? After all, we’re comparing some 4 machines to 40 very different machines. In terms of CPU count, RAM volume or cluster topology these both are like apples and oranges, no?

Not really.

Due to its sharded architecture and custom memory management Scylla can utilize really big hunks of hardware. And by that we mean the-biggest-one-can-get. Meanwhile, Cassandra and its JVM’s garbage collectors excel when they go heavily distributed, with many smaller nodes on the team. So, the true purpose of this test is to show that both CQL solutions can perform similarly in a pretty fair duel, yet Cassandra requires about 2.5x more hardware, for 2.5x the cost. What’s really at stake now is a 10x reduction in the administrative burden: your DBA has either 40 servers to maintain… or just 4. And, as you’ll see, the advantage can go even further than 10x.

4 vs. 40 Node Setup

We set up clusters on Amazon EC2 in a single Availability Zone within us-east-2 datacenter, but this time the Scylla cluster consists of 4 i3.metal VMs. The competing Cassandra cluster consisted of 40 i3.4xlarge VMs. Servers are initialized with clean machine images (AMIs) of Ubuntu 20.04 (Cassandra 4.0) or CentOS 7.9 (Scylla 4.4).

Apart from the cluster, fifteen loader machines were used to run cassandra-stress to insert data, and — later — to provide background load at CL=QUORUM to mess with the administrative operations.

Scylla Cassandra Loaders
EC2 Instance type i3.metal i3.4xlarge c5n.9xlarge
Cluster size 4 40 15
Storage (total) 8x 1.9 TB NVMe in RAID0
(60.8 TB)
2x 1.9 TB NVMe in RAID0
(152 TB)
Not important for a loader (EBS-only)
Network 25 Gbps Up to 10 Gbps 50 Gbps
vCPUs (total) 72 (288) 16 (640) 36 (540)
RAM (total) 512 (2048) GiB 122 (4880) GiB 96 (1440) GiB

Once up and running, both databases were loaded with random data at RF=3 until the cluster’s total disk usage reached approximately 40 TB. This translated to 1 TB of data per Cassandra node and 10 TB of data per Scylla node. After loading was done, we flushed the data and waited until the compactions finished, so we can start the actual benchmarking.

A Scylla cluster can be 10x smaller in node count and run on a cluster 2.5x cheaper, yet maintain the equivalent performance of Cassandra 4.

Throughput and Latencies

Mixed Workload – 50% reads and 50% writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on:

  • 4-node Scylla cluster (4 x i3.metal, 288 vCPUs in total)
  • 40-node Cassandra cluster (40 x i3.4xlarge, 640 vCPUs in total).

Workload is uniformly distributed, i.e. every partition in the multi-TB dataset has an equal chance of being selected/updated. Under low load Cassandra slightly outperforms Scylla.The reason is that Scylla runs more compaction automatically when it is idle and the default scheduler tick of 0.5 ms hurts the P99 latency. There is a parameter that controls it but we wanted to provide out-of-the-box results with zero custom tuning or configuration.

Metric Scylla 4.4.3 Cassandra 4.0 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 600k/s 600k/s 1x
Maximum throughput with 99% latency <10ms 600k/s 450k/s 1.33x

The 90- and 99-percentile latencies of SELECT queries, as measured on:

  • 4-node Scylla cluster (4 x i3.metal, 288 vCPUs in total)
  • 40-node Cassandra cluster (40 x i3.4xlarge, 640 vCPUs in total).

Workload is uniformly distributed, i.e. every partition in the multi-TB dataset has an equal chance of being selected/updated. Under low load Cassandra slightly outperforms Scylla.

Metric Scylla 4.4.3 Cassandra 4.0 Scylla 4.4.3 vs. Cassandra 4.0
Maximum throughput 600k/s 600k/s 1x
Maximum throughput with 99% latency <10ms 500k/s 350k/s 1.42x

Scaling the cluster up by 25%

In this benchmark, we increase the capacity of the cluster by 25%:

  • By adding a single Scylla node to the cluster (from 4 nodes to 5)
  • By adding 10 Cassandra nodes to the cluster (from 40 nodes to 50 nodes)

Scylla 4.4.3 Cassandra 4.0 Scylla 4.4 vs. Cassandra 4.0
Add 25% capacity 1 hour, 29 minutes 16 hours, 54 minutes 11x faster

Major Compaction

In this benchmark we measure the throughput of a major compaction. To compensate for Cassandra having 10 times more nodes (each having 1/10th of the data), this benchmark measures throughput of a single Scylla node performing major compaction and the collective throughput of 10 Cassandra nodes performing major compactions concurrently.

Throughput of a major compaction at RF=1 (more is better). Scylla runs on a single i3.metal machine (72 vCPUs) and competes with a 10-node cluster of Cassandra 4 (10x i3.4xlarge machines; 160 vCPUs in total). Scylla can split this problem across CPU cores, which Cassandra cannot do, so – effectively – Scylla performs 32x better in this case.

Scylla 4.4.3 Cassandra 4.0 Scylla 4.4 vs. Cassandra 4.0
Major Compaction 1,868 MB/s 56 MB/s 32x faster

Summary

On identical hardware, Scylla Open Source 4.4.3 withstood up to 5x greater traffic and in almost every tested scenario offered lower latencies than Apache Cassandra 4.0.

We also demonstrated a specific use-case where choosing Scylla over Cassandra 4 would result in $170,000 annual savings in the hardware costs alone, not to mention the ease of administration or environmental impact.

Nonetheless, Cassandra 4 is undeniably far better than Cassandra 3.11. It improved query latencies in almost all tested scenarios and sped up all the processes that involve streaming. Even if you choose not to take advantage of Scylla for its superior performance, upgrading from Cassandra 3.11 to Cassandra 4.0 is a wise idea.

Yet if you are determined to take the effort of an upgrade, then why not aim higher and get even more performance? Or at least keep the same performance and pocket the difference via savings?

While the benchmarks speak for themselves, we also hope that you don’t just take our word for Scylla’s superior performance. That’s why we provided everything that’s needed to re-run them yourself.

Beyond performance benchmarks, there are even more reasons to run Scylla: the feature set is bigger. For example, our CDC implementation is easier to manage and consume, implemented as standard CQL-readable tables. Also, Scylla’s Lightweight Transactions (LWT) are more efficient than Cassandra’s. Scylla provides observability through Scylla Monitoring Stack to watch over your clusters using Grafana and Prometheus. All of that you get with Scylla Open Source. With Scylla Enterprise NoSQL on top of it, you also get unique features like our Incremental Compaction Strategy (ICS) for additional storage efficiency, workload prioritization and more.

Whether you’re a CTO, systems architect, lead engineer, SRE or DBA — your time to consider Scylla is right now and your organization is unlikely to regret it.

Supplementary Information

Here you can check out detailed results of latency/throughput benchmarks, JVM settings and cassandra.yaml from Cassandra 3 and Cassandra 4, as well as cassandra-stress invocations used to run benchmarks. Scylla used default configuration.

Cassandra 3.11 configuration

JVM settings JVM version: OpenJDK 8
-Xms48G
-Xmx48G
-XX:+UseG1GC
-XX:G1RSetUpdatingPauseTimePercent=5
-XX:MaxGCPauseMillis=500
-XX:InitiatingHeapOccupancyPercent=70
-XX:ParallelGCThreads=16
cassandra.yaml Only settings changed from the default configuration are mentioned here.

disk_access_mode: mmap_index_only
row_cache_size_in_mb: 10240
concurrent_writes: 128
file_cache_size_in_mb: 2048
buffer_pool_use_heap_if_exhausted: true
disk_optimization_strategy: ssd
memtable_flush_writers: 4
trickle_fsync: true
concurrent_compactors: 16
compaction_throughput_mb_per_sec: 960
stream_throughput_outbound_megabits_per_sec: 7000

Cassandra 4.0 configuration

JVM settings JVM version: OpenJDK 16

-Xmx70G
-Xmx70G
-XX:ConcGCThreads=16
-XX:+UseZGC

-XX:ConcGCThreads=16
-XX:ParallelGCThreads=16
-XX:+UseTransparentHugePages
-verbose:gc
-Djdk.attach.allowAttachSelf=true
-Dio.netty.tryReflectionSetAccessible=true

cassandra.yaml Only settings changed from the default configuration are mentioned here.

disk_access_mode: mmap_index_only
row_cache_size_in_mb: 10240
concurrent_writes: 128
file_cache_size_in_mb: 2048
buffer_pool_use_heap_if_exhausted: true
disk_optimization_strategy: ssd
memtable_flush_writers: 4
trickle_fsync: true
concurrent_compactors: 16
compaction_throughput_mb_per_sec: 960
stream_throughput_outbound_megabits_per_sec: 7000

In major compaction benchmarks, the parameter compaction_throughput_mb_per_sec was set to 0 to make sure the compaction was not throttled.

Cassandra-stress parameters

Only the important facts and options are mentioned below.

  • Scylla’s Shard-aware Java driver was used.
  • Background loads were executed in the loop (so duration=5m is not a problem).
  • REPLICATION_FACTOR is 3 (except for major compaction benchmark).
  • COMPACTION_STRATEGY is SizeTieredCompactionStrategy unless stated otherwise.
  • loadgenerator_count is the number of generator machines (3 for “3 vs 3” benchmarks, 15 for “4 vs 40”).
  • BACKGROUND_LOAD_OPS is 1000 in major compaction, 25000 in other benchmarks.
  • DURATION_MINUTES is 10 for in-memory benchmarks, 30 for other benchmarks.
Inserting data write cl=QUORUM
-schema "replication(strategy=SimpleStrategy,replication_factor={REPLICATION_FACTOR})" "compaction(strategy={COMPACTION_STRATEGY})"
-mode native cql3threads and throttle parameters were chosen for each DB separately, to ensure 3TB were inserted quickly, yet also to provide headroom for minor compactions and avoid timeouts/large latencies.In case of “4 vs 40” benchmarks additional parameter maxPending=1024 was used.
Background load for replace node mixed ratio(write=1,read=1)
duration=5m
cl=QUORUM
-pop dist=UNIFORM(1..{ROW_COUNT})
-mode native cql3
-rate "threads=700 throttle={BACKGROUND_LOAD_OPS // loadgenerator_count}/s"
Background load for new nodes / major compaction mixed ratio(write=1,read=1)
duration=5m
cl=QUORUM
-pop dist=UNIFORM(1..{ROW_COUNT})
-mode native cql3
-rate "threads=700 fixed={BACKGROUND_LOAD_OPS // loadgenerator_count}/s"
Cache warmup in Gaussian latency / throughput mixed ratio(write=0,read=1)
duration=180m
cl=QUORUM -pop dist=GAUSSIAN(1..{ROW_COUNT},{GAUSS_CENTER},{GAUSS_SIGMA})
-mode native cql3
-rate "threads=500 throttle=35000/s"
-node {cluster_string}')
Latency / throughput – Gaussian duration={DURATION_MINUTES}m
cl=QUORUM
-pop dist=GAUSSIAN(1..{ROW_COUNT},{GAUSS_CENTER},{GAUSS_SIGMA})
-mode native cql3
"threads=500 fixed={rate // loadgenerator_count}/s"
Latency / throughput – uniform / in-memory duration={DURATION_MINUTES}m
cl=QUORUM
-pop dist=UNIFORM(1..{ROW_COUNT})
-mode native cql3
-rate "threads=500 fixed={rate // loadgenerator_count}/s"In case of “4 vs 40” benchmarks additional parameter maxPending=1024 was used.

 

The post Apache Cassandra 4.0 vs. Scylla 4.4: Comparing Performance appeared first on ScyllaDB.

Full Query Logging With Apache Cassandra 4.0

The release of Apache Cassandra 4.0 comes with a bunch of new features to improve stability and performance. It also comes with a number of valuable new tools for operators to get more out of their Cassandra deployment. In this blog post, we’ll take a brief look at full query logging, one of the new features that comes with the Cassandra 4.0 release

What Are Full Query Logs?

First off, we need to understand what counts as a full query log (FQL) in Cassandra. Full query logs record all successful Cassandra Query Language (CQL) requests. Audit logs (also a new feature of Cassandra 4.0), on the other hand, contain both successful and unsuccessful CQL requests. (To learn about the different forms of logging and diagnostic events in Cassandra 4.0, check out this blog by Instaclustr Co-Founder and CTO Ben Bromhead.)

The FQL framework was implemented to be lightweight from the very beginning so there is no need to worry about the performance. This is achieved by a library called Chronicle Queues, which is designed for low latency and high-performance messaging for critical applications.

Use Cases for Full Query Logs

There are a number of exciting use cases for full query logs in Cassandra. Full query logs allow you to log, replay, and compare CQL requests live without affecting the performance of your production environment. This allows you to:

  • Examine traffic to individual nodes to help with debugging if you notice a performance issue in your environment 
  • Compare performance between two different versions of Cassandra in different environments
  • Compare performance between nodes with different settings to help with optimizing your cluster
  • Audit logs for security or compliance purposes

Configuring Full Query Logs in Cassandra

The settings for full query logs are adjustable either in the Cassandra configuration file, cassandra.yaml, or with nodetool.

See the following example from the Cassandra documentation for one way to approach configuration settings:

# default options for full query logging - these can be overridden from command line
# when executing nodetool enablefullquerylog
#full_query_logging_options:
   # log_dir:
   # roll_cycle: HOURLY
   # block: true
   # max_queue_weight: 268435456 # 256 MiB
   # max_log_size: 17179869184 # 16 GiB
   # archive command is "/path/to/script.sh %path" where %path is replaced with the file being rolled:
   # archive_command:
   # max_archive_retries: 10

In this case, you would just need to add an existing directory that has permissions for reading, writing, and execution to log_dir. The log segments here are rolled hourly, but can also be set to roll daily or minutely

The max_queue_weight, which sets the maximum weight for in-memory queue records waiting to be written prior to blocking or dropping, is set here to 268435456 Bytes (equivalent to 256 MiB). This is also the default value. 

And the max_log_size option, which sets the maximum size of rolled files that can be retained on the disk before the oldest file is deleted, is set here to 17179869184 Bytes (equivalent 16 GiB).

After configuring the settings for your full query logs in cassandra.yaml, you can execute this command using nodetool to enable full query logging:

$ nodetool enablefullquerylog --path /tmp/cassandrafullquerylog

You must do this on a node-by-node basis for each node where you want to have full Cassandra query logs. 

If you prefer to, you can also set the configuration details for the full query logs within the syntax of the nodetool enablefullquerylog. To learn how to do so, check out the Cassandra documentation. 

With the great power of the FQL framework to log all your queries, you might wonder what happens when you log a statement that contains some sensitive information in it. For example, if an operator creates a new role and specifies a password for the role, will the password be visible in the log? This seems like it would be a sensitive security issue. 

The answer is that, no, there will not be any passwords visible. The Cassandra implementation is quite aggressive when it comes to the obfuscation of queries containing passwords in it by obfuscating the remaining part of statements when it finds passwords in them. It would also obfuscate passwords in case a query with passwords is not successful, which might happen when an operator makes a mistake in CQL query syntax.

From the point of view of the observability, you get the status of FQL by the respective getfullquerylog nodetool subcommand and you can disable FQL in runtime by disablefullquerylog subcommand. Of course, you achieve the same with calling respective JMX methods on StorageService MBean.

How to View Full Query Logs

Now that you have your full Cassandra query logs configured correctly and enabled for your chosen nodes, you probably want to view them sometimes. That’s where the fqltool command comes in. fqldump allows you to view logs (converted from binary to a format understandable to us humans). fqlreplay will replay logs, and fqlcompare outputs any differences between your full query logs. 

Together, fqlreplay and fqlcompare let you revisit different sets of production traffic to help you analyze performance between different configurations or different nodes, or to help with debugging issues.

Conclusion

With enhancements for stability and performance, along with cool new features like live full query logging, Cassandra 4.0 is a big step forward for the Cassandra community. To start using Cassandra 4.0, sign up for your free trial of Instaclustr Managed Cassandra and select the preview release of Cassandra 4.0 as your software version. 

The post Full Query Logging With Apache Cassandra 4.0 appeared first on Instaclustr.

Cassandra 4.0 vs. Cassandra 3.11: Performance Comparison


This is part one of a two-part blog series on the relative performance of the recently released Apache Cassandra 4.0. In this post we’ll compare Cassandra 4.0 versus Cassandra 3.11. In part two of this series, we’ll compare both of these Cassandra releases with the performance of Scylla Open Source 4.4.

Apache Cassandra 3.0 was originally released in November of 2015. Its last minor release, Cassandra 3.11, was introduced in June of 2017. Since then users have awaited a major upgrade to this popular wide column NoSQL database. On July 27, 2021, Apache Cassandra 4.0 was finally released. For the open source NoSQL community, this long-awaited upgrade is a significant milestone. Kudos to everyone involved in its development and testing!


Apache Cassandra has consistently been ranked amongst the most popular databases in the world, as per the DB-engines.com ranking, often sitting in the top 10.

TL;DR Cassandra 4.0 vs Cassandra 3.0 Results

As the emphasis of Cassandra 4.0 release was on stability, the key performance gain is achieved due to a major upgrade of the JVM (OpenJDK 8 → OpenJDK 16) and the usage of ZGC instead of G1GC. As you can quickly observe, the latencies under maximum throughput were drastically improved! You can read more about the new Java garbage collectors (and their various performance test results) in this article.

P99 latencies at one half (50%) of maximum throughput of Cassandra 4.0. Cassandra 4.0 reduced these long-tail latencies between 80% – 99% over Cassandra 3.11.

Maximum throughput for Cassandra 4.0 vs. Cassandra 3.11, measured in 10k ops increments, before latencies become unacceptably high. While many cases produced no significant gains for Cassandra 4.0, some access patterns saw Cassandra 4.0 capable of 25% – 33% greater throughput over Cassandra 3.11.

In our test setup, which we will describe in greater detail below, Cassandra 4.0 showed a 25% improvement for a write-only disk-intensive workload and 33% improvements for cases of read-only with either a low or high cache hit rate. Otherwise max throughput between the two Cassandra releases was relatively similar.

This doesn’t tell the full story as most workloads wouldn’t be executed in maximum utilization and the tail latency in max utilization is usually not good. In our tests, we marked the throughput performance at SLA of under 10msec in P90 and P99 latency. At this service level Cassandra 4.0, powered by the new JVM/GC, can perform twice that of Cassandra 3.0.

Outside of sheer performance, we tested a wide range of administrative operations, from adding nodes, doubling a cluster, node removal, and compaction, all of them under emulated production load. Cassandra 4.0 improves these admin operation times up to 42%.

For users looking for improvements in throughputs for other use cases Cassandra 4.0’s results may be slight or negligible.

Test Setup

We wanted to use relatively typical current generation servers on AWS so that others could replicate our tests, and reflect a real-world setup.

Cassandra 4.0/3.11 Loaders
EC2 Instance Type i3.4xlarge c5n.9xlarge
Cluster Size 3 3
vCPUs 16 36
Memory (GiB) 122 96
Storage 2x 1.9TB NVMe in RAID0 Not important for a loader (EBS-only)
Network Up to 10 Gbps 50 Gbps

We set up our cluster on Amazon EC2, in a single Availability Zone within us-east-2. Database cluster servers were initialized with clean machine images (AMIs), running Cassandra 4.0 (which we’ll refer to as “C*4” below) and Cassandra 3.11 (“C*3”) on Ubuntu 20.04.

Apart from the cluster, three loader machines were employed to run cassandra-stress in order to insert data and, later, provide background load to mess with the administrative operations.

Once up and running, the databases were loaded by cassandra-stress with 3 TB of random data organized into the default schema. At the replication factor of 3, this means approximately 1 TB of data per node. The exact disk occupancy would, of course, depend on running compactions and the size of other related files (commitlogs, etc.). Based on the size of the payload, this translated to ~3.43 billion partitions. Then we flushed the data and waited until the compactions finished, so we can start the actual benchmarking.

Limitations of Our Cassandra 4.0 Testing

It’s important to note that this basic performance analysis does not cover all factors in deciding whether to stay put on Cassandra 3.x, upgrade to Cassandra 4.0, or to migrate to a new solution. Users may be wondering if the new Cassandra 4.0 features are compelling enough. Plus there are issues of risk aversion based on stability and maturity for any new software release — for  example, the ZGC garbage collector we used currently employs Java 16, which is supported by Cassandra, but not considered production-ready; newer JVMs are not officially supported by Cassandra yet.

Throughputs and Latencies

The actual benchmarking is a series of simple invocations of cassandra-stress with CL=QUORUM. For 30 minutes we keep firing 10,000 requests per second and monitor the latencies. Then we increase the request rate by another 10,000 for another 30 min, and so on. (20,000 in case of larger throughputs). The procedure repeats until the DB is no longer capable of withstanding the traffic, i.e. until cassandra-stress cannot achieve the desired throughput or until the 90-percentile latencies exceed 1 second.

Note: This approach means that throughput numbers are presented with 10k/ops granularity (in some cases 20k/ops).

We tested Cassandra 4.0 and 3.11 with the following distributions of data:

  • “Real-life” (Gaussian) distribution, with sensible cache-hit ratios of 30-60%
  • Uniform distribution, with a close-to-zero cache hit ratio, which we’ll call “disk-intensive”
  • “In-memory” distribution, expected to yield almost 100% cache hits, which we’ll call “memory-intensive”

Within these scenarios we ran the following workloads:

  • 100% writes
  • 100% reads
  • 50% writes and 50% reads

“Real-life” (Gaussian) Distribution

In this scenario we issue queries that touch partitions randomly drawn from a narrow Gaussian distribution. We make an Ansatz about the bell curve: we assume that its six-sigma spans the RAM of the cluster (corrected for the replication factor). The purpose of this experiment is to model a realistic workload, with a substantial cache hit ratio but less than 100%, because most of our users observe the figures of 60-90%. We expect Cassandra to perform well in this scenario because its key cache is dense, i.e. it efficiently stores data in RAM, though it relies on SSTables stored in the OS page cache which can be heavyweight to look up.

Mixed Workload – 50% reads and 50% writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload consists of 50% reads and 50% writes, randomly targeting a “realistic” Gaussian distribution. C*3 quickly becomes nonoperational, C*4 is a little better but doesn’t achieve greater than 40k/ops.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11
Maximum throughput 40k/s 30k/s 1.33x
Maximum throughput with 90% latency <10ms 30k/s 10k/s 3x
Maximum throughput with 99% latency <10ms 30k/s

The 90- and 99-percentile latencies of SELECT queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload consists of 50% reads and 50% writes, randomly targeting a “realistic” Gaussian distribution. C*3 quickly becomes nonoperational, C*4 is a little better but doesn’t achieve greater than 40k/ops.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11
Maximum throughput 40k/s 40k/s 1x
Maximum throughput with 90% latency < 10ms 30k/s 10k/s 3x
Maximum throughput with 99% latency < 10ms 10k/s

Uniform Distribution (low cache hit ratio)

In this scenario we issue queries that touch random partitions of the entire dataset. In our setup this should result in negligible cache hit rates, i.e. that of a few %.

Writes Workload – Only Writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed, i.e. every partition in the 1 TB dataset has an equal chance of being updated. C*3 quickly becomes nonoperational, C*4 is a little better, achieving up to 50k/ops.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs Cassandra 3.11
Maximum throughput 50k/s 40k/s 1.25x
Maximum throughput with 90% latency < 10 ms 40k/s 20k/s 2x
Maximum throughput with 99% latency < 10 ms 30k/s

Reads Workload – Only Reads

The 90- and 99-percentile latencies of SELECT queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed, i.e. every partition in the 1 TB dataset has an equal chance of being selected. C*4 serves 90% of queries in a <10 ms time until the load reaches 40k ops. Please note that almost all reads are served from disk.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs Cassandra 3.11
Maximum throughput 40k/s 30k/s 1.25x
Maximum throughput with 90% latency < 10 ms 40k/s 30k/s 1.25x
Maximum throughput with 99% latency < 10 ms 20k/s

Mixed Workload – 50% reads and 50% writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed, i.e. every partition in the 1 TB dataset has an equal chance of being selected/updated. Both C*4 and C*3 throughputs up to 40k ops, but the contrast was significant: C*4’s P90s were nearly single-digit, while C*3s P90s were over 500 ms, and its P99s were longer than a second.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs Cassandra 3.11
Maximum throughput 40k/s 40k/s 1x
Maximum throughput with 90% latency < 10 ms 40k/s 20k/s 2x
Maximum throughput with 99% latency < 10 ms 30k/s

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed, i.e. every partition in the 1 TB dataset has an equal chance of being selected/updated. C*3 can barely maintain sub-second P90s at 40k ops, and not P99s. C*4 almost achieved single-digit latencies in the P90 range, and had P99s in the low hundreds of milliseconds.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs Cassandra 3.11
Maximum throughput 40k/s 40k/s 1x
Maximum throughput with 90% latency < 10 ms 30k/s 20k/s 1.5x
Maximum throughput with 99% latency < 10 ms 20k/s

In-Memory Distribution (high cache hit ratio)

In this scenario we issue queries touching random partitions from a small subset of the dataset, specifically: one that fits into RAM. To be sure that our subset resides in cache and thus no disk IO is triggered, we choose it to be… safely small, at an arbitrarily picked value of 60 GB. The goal here is to evaluate both DBs at the other extreme end: where they both serve as pure in-memory datastores.

Writes Workload – Only Writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed over 60 GB of data, so that every partition resides in cache and has an equal chance of being updated. Both versions of Cassandra quickly become nonoperational beyond 40k ops, though C*4 maintains single-digit latencies up to that threshold. C*3 can only maintain single-digit P90 latencies at half that throughput — 20k ops.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs Cassandra 3.11
Maximum throughput 40k/s 40k/s 1x
Maximum throughput with 90% latency < 10 ms 40k/s 20k/s 2x
Maximum throughput with 99% latency < 10 ms 40k/s

Reads Workload – Only Reads

The 90- and 99-percentile latencies of SELECT queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed over 60 GB of data, so that every partition resides in cache and has an equal chance of being selected. C*4 can achieve 80k ops before becoming functionally non-performant, whereas C*3 can only achieve 60k ops. C*4 can also maintain single digit millisecond latencies for P99s up to 40k ops, whereas C*3 quickly exceeds that latency threshold even at 20k ops.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs Cassandra 3.11
Maximum throughput 80k/s 60k/s 1.33x
Maximum throughput with 90% latency < 10 ms 60k/s 40k/s 1.5x
Maximum throughput with 99% latency < 10 ms 40k/s

Mixed Workload – 50% reads and 50% writes

The 90- and 99-percentile latencies of UPDATE queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed over 60 GB of data, so that every partition resides in cache and has an equal chance of being selected/updated. C*4 can maintain single-digit long-tail latencies up to 40k ops. C*3 can only maintain single-digit P90 latencies at half that rate (20k ops) and quickly rises into hundreds of milliseconds for P90/P99 latencies at 40k ops. Both C*4 and C*3 fail to achieve reasonable latencies beyond those ranges.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs Cassandra 3.11
Maximum throughput 40k/s 40k/s 1x
Maximum throughput with 90% latency < 10 ms 40k/s 20k/s 2x
Maximum throughput with 99% latency < 10 ms 40k/s

The 90- and 99-percentile latencies of SELECT queries, as measured on three i3.4xlarge machines (48 vCPUs in total) in a range of load rates. Workload is uniformly distributed over 60 GB of data, so that every partition resides in cache and has an equal chance of being selected/updated. C*4 and C*3 can only maintain single-digit millisecond long-tail latencies at 20k ops throughput (and C*3 only for P90; its P99s are already in the hundreds of milliseconds even at 20k ops). C*4 can achieve single digit P90 latencies at 40k ops, but P99 latencies rise into double-digit milliseconds.

Metric Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs Cassandra 3.11
Maximum throughput 40k/s 40k/s 1x
Maximum throughput with 90% latency < 10 ms 40k/s 20k/s 2x
Maximum throughput with 99% latency < 10 ms 20k/s

Administrative Operations

Beyond the speed of raw performance, users have day-to-day administrative operations they need to perform: including adding a node to a growing cluster, or replacing a node that has died. The following tests benchmarked performance around these administrative tasks.

Adding Nodes

The timeline of adding 3 nodes to an already existing 3-node cluster (ending up with six i3.4xlarge machines), doubling the size of the cluster. Cassandra 4 exhibited a 12% speed improvement over Cassandra 3.

One New Node

In this benchmark, we measured how long it took to add a new node to the cluster. The reported times are the intervals between starting a Cassandra node and having it fully finished bootstrapping (CQL port open).

Cassandra 4.0 is equipped with a new feature, Zero Copy Streaming (ZCS), which basically allows efficient streaming of entire SSTables. An SSTable is eligible for ZCS if all of its partitions need to be transferred, which can be the case when  LeveledCompactionStrategy (LCS) is enabled. Willing to demonstrate this feature, we run the next benchmarks with the usual SizeTieredCompactionStrategy (STCS) compared to LCS since the first cannot benefit from zero copy streaming.

The time needed to add a node to an already existing 3-node cluster (ending up with 4 i3.4xlarge machines). Cluster is initially loaded with 1 TB of data at RF=3. C*4 noted an 15% speed improvement over C*3 using STCS. but 30% faster compared to C*3 when using LCS.

Strategy Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11
STCS 1 hour 47 minutes 1 second 2 hours 6 minutes 15% faster
LCS 1 hour 39 minutes 45 seconds 2 hours 23 minutes 10 seconds 30% faster

Doubling the Cluster Size

In this benchmark, we measured how long it took to double the cluster node count: we go from 3 nodes to 6 nodes. Three new nodes were added sequentially, i.e. waiting for the previous one to fully bootstrap before starting the next one. The reported time spans from the instant the startup of the first new node is initiated, all the way until the bootstrap of the third new node finishes.

The time needed to add 3 nodes to an already existing 3-node cluster of i3.4xlarge machines, preloaded with 1 TB of data at RF=3. C* 4 was 12% faster than C*3 using STCS, and 21% faster when using LCS.

Strategy Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11
STCS 3 hours 58 minutes 21 seconds 4 hours 30 minutes 7 seconds 12% faster
LCS 3 hours 44 minutes 6 seconds 4 hours 44 minutes 46 seconds 21% faster

Replace Node

In this benchmark, we measured how long it took to replace a single node. One of the nodes is brought down and another one is started in its place.

The time needed to replace a node in a 3-node cluster of i3.4xlarge machines, preloaded with 1 TB of data at RF=3. Cassandra 4.0 noted significant improvement over Cassandra 3.11.

Strategy Cassandra 4.0 Cassandra 3.11 Cassandra 4.0 vs. Cassandra 3.11
STCS 3 hours 28 minutes 46 seconds 4 hours 35 minutes 56 seconds 24% faster
LCS 3 hours 19 minutes 17 seconds 5 hours 4 minutes 9 seconds 34% faster

Summary

Apache Cassandra 4.0 performance is undeniably better than Apache Cassandra 3.11. It improved latencies under almost all conditions, and could often sustain noticeably improved throughputs. As well, it sped up the process of streaming, which is very useful in administrative operations.

Key findings:

  • Cassandra 4.0 has better P99 latency than Cassandra 3.11 by up to 100x!
  • Cassandra 4.0 throughputs can be up to 33% greater compared to Cassandra 3.11, but more importantly, under an SLA of < 10 ms in P99 latency, Cassandra 4.0 can be 2x to 3x more performing.
  • Cassandra 4.0 speeds up streaming up to 34% faster than Cassandra 3.11

Dive Deeper

We just released Part 2 of our Cassandra 4.0 benchmark analysis, in which we compare the performance of Apache Cassandra, both 3.11 and 4.0, against Scylla Open Source 4.4.

COMPARE SCYLLA AND CASSANDRA PERFORMANCE

Appendix

Cassandra 3.11 configuration

JVM settings JVM version: OpenJDK 8
-Xms48G
-Xmx48G
-XX:+UseG1GC
-XX:G1RSetUpdatingPauseTimePercent=5
-XX:MaxGCPauseMillis=500
-XX:InitiatingHeapOccupancyPercent=70
-XX:ParallelGCThreads=16
cassandra.yaml Only settings changed from the default configuration are mentioned here.

disk_access_mode: mmap_index_only
row_cache_size_in_mb: 10240
concurrent_writes: 128
file_cache_size_in_mb: 2048
buffer_pool_use_heap_if_exhausted: true
disk_optimization_strategy: ssd
memtable_flush_writers: 4
trickle_fsync: true
concurrent_compactors: 16
compaction_throughput_mb_per_sec: 960
stream_throughput_outbound_megabits_per_sec: 7000

Cassandra 4.0 configuration

JVM settings JVM version: OpenJDK 16

-Xmx70G
-Xmx70G
-XX:ConcGCThreads=16
-XX:+UseZGC

-XX:ConcGCThreads=16
-XX:ParallelGCThreads=16
-XX:+UseTransparentHugePages
-verbose:gc
-Djdk.attach.allowAttachSelf=true
-Dio.netty.tryReflectionSetAccessible=true

cassandra.yaml Only settings changed from the default configuration are mentioned here.

disk_access_mode: mmap_index_only
row_cache_size_in_mb: 10240
concurrent_writes: 128
file_cache_size_in_mb: 2048
buffer_pool_use_heap_if_exhausted: true
disk_optimization_strategy: ssd
memtable_flush_writers: 4
trickle_fsync: true
concurrent_compactors: 16
compaction_throughput_mb_per_sec: 960
stream_throughput_outbound_megabits_per_sec: 7000

Cassandra-stress parameters

  • Background loads were executed in the loop (so duration=5m is not a problem).
  • REPLICATION_FACTOR is 3.
  • COMPACTION_STRATEGY is SizeTieredCompactionStrategy unless stated otherwise.
  • loadgenerator_count is the number of generator machines (3 for these benchmarks).
  • DURATION_MINUTES is 10 for in-memory benchmarks.
Inserting data write cl=QUORUM
-schema "replication(strategy=SimpleStrategy,replication_factor={REPLICATION_FACTOR})" "compaction(strategy={COMPACTION_STRATEGY})"
-mode native cql3threads and throttle parameters were chosen for each DB separately, to ensure 3TB were inserted quickly, yet also to provide headroom for minor compactions and avoid timeouts/large latencies.
Cache warmup in Gaussian latency / throughput mixed ratio(write=0,read=1)
duration=180m
cl=QUORUM -pop dist=GAUSSIAN(1..{ROW_COUNT},{GAUSS_CENTER},{GAUSS_SIGMA})
-mode native cql3
-rate "threads=500 throttle=35000/s"
-node {cluster_string}')
Latency / throughput – Gaussian duration={DURATION_MINUTES}m
cl=QUORUM
-pop dist=GAUSSIAN(1..{ROW_COUNT},{GAUSS_CENTER},{GAUSS_SIGMA})
-mode native cql3
"threads=500 fixed={rate // loadgenerator_count}/s"
Latency / throughput – uniform / in-memory duration={DURATION_MINUTES}m
cl=QUORUM
-pop dist=UNIFORM(1..{ROW_COUNT})
-mode native cql3
-rate "threads=500 fixed={rate // loadgenerator_count}/s"

 

The post Cassandra 4.0 vs. Cassandra 3.11: Performance Comparison appeared first on ScyllaDB.

Guide to Apache Cassandra Data and Disaster Recovery

Apache Cassandra is a distributed database management system where data is replicated amongst multiple nodes and can span across multiple data centers. Cassandra can run without service interruption even when one or more nodes are down. It is because of this architecture that Cassandra is highly available and highly fault tolerant. 

However, there are a number of common scenarios we encounter where nodes or data needs to be recovered in a Cassandra cluster, including:

  • Cloud instance retirement
  • Availability zone outages
  • Errors made from client applications and accidental deletions
  • Corrupted data/SStables
  • Disk failures
  • Catastrophic failures that require an entire cluster rebuild

Depending on the nature of the issue, the options available for recovery include the following:

Single Dead Node

Causes for this scenario include cloud instance retirement or physical server failure. 

For Instaclustr Managed Cassandra clusters, this is detected and handled automatically by our Support team. Outage or data loss is highly unlikely assuming Instaclustr’s recommended replication factor (RF) of three is being used. The node will simply be replaced and data will be streamed from other replicas in the cluster. Recovery time is dependent on the amount of data.

A Node That Was Unavailable for a Short Period of Time

This could be due to rolling restart, cloud availability zone outage, or a number of other general errors.

For this scenario:

  • It is handled automatically by Cassandra
  • Recovery time is dependent on downtime in this case
  • There is no potential for data loss and no setup required

If a node in Cassandra is not available for a short period of time, the data to be replicated on the node is stored on a peer node. This data is called hints. Once the original node becomes available, the hints are transferred to the node, and the node is caught up with the missed data. This process is known as hinted handoffs within Cassandra. 

There are time and storage restrictions for hints. If a node is not available for a longer duration than configured (i.e. hinted handoff window), no hints are saved for it. In such a case, Instaclustr will make sure repairs are run following best practices to ensure data remains consistent.

Restore Keystore, Corrupted Tables, Column Factory, Accidentally Deleted Rows, Transaction Logs, and Archived Data

In this case, if you are a Managed Cassandra customer:

  • Email Instaclustr
  • Recovery time dependent on amount of data
  • Anything after the last backup will be lost
  • Setup involves backups, which is handled by Instaclustr
  • Causes include general error, human error, testing, and even malware or ransomware

Note:

  • Corrupted Tables/Column Factory: This assumes corruption is at the global cluster level and not at the individual node level
  • Accidentally deleted rows: This is not as straightforward as you would potentially need to restore elsewhere (either to another table/keyspace or cluster) and read from there. Instaclustr can help with this.
  • Transaction logs: Not a common restore method
  • Archived data: Backups only go back 7 days, so another method to handle this must be used

Rebuild an Entire Cluster With the Same Settings

You may need to rebuild a cluster from backup due to accidental deletion or any other total data loss (e.g. human error). Another common reason for using restore-from-backup is to create a clone of a cluster for testing purposes. 

All Instaclustr-managed clusters have backups enabled, and we provide a self-service restore feature. Complex restore scenarios can be achieved by contacting our 24×7 support team

When restoring a cluster, backups can be used to restore data from a point-in-time to a new cluster via the Instaclustr Console or Provisioning API. See the following from Instaclustr’s Cassandra Documentation:

“When restoring a cluster, you may choose to:

  • Restore all data or only selected tables
    • If restoring a subset of tables, the cluster schema will still be restored in its entirety and therefore the schemas for any non-restored keyspaces and tables will still be applied. However, only backup data for the selected tables will be restored.
  • Restore to a point-in-time
    • The restore process will use the latest backup data for each node, up to the specified point-in-time. If restoring a cluster with Continuous Backup enabled, then commit logs will also be restored.

Once the Restore parameters are submitted, a new cluster will be provisioned under the account with a topology that matches the designated point-in-time.”

For clusters with basic data backups enabled (once per node per 24 hour period), anything written after the last backup could potentially be lost. If the “continuous backup” feature is enabled, then this recovery point is reduced to five minutes.

Recovery time is dependent on the amount of data to be restored. The following chart aims to detail the restore download speeds for different instances provided in the Instaclustr managed service offering:

Instance Download Speed* Commit Log Replay Speed Startup Speeds (No Commit Logs)
i3.2xlarge 312 MB/s 10GB (320 Commitlogs): 12 seconds20GB (640 Commitlogs): 28 seconds30GB (960 Commitlogs): 1 minute 7 seconds 82 seconds per node

*The download speed is the approximate rate that data is transferred from cloud provider storage service to the instance. It may vary depending on region, time of day, etc.

Application Effect

This really is a question about quorum, consistency level, and fault tolerance. Assuming a replication factor of three and LOCAL_QUORUM consistency is being used (as per Instaclustr’s Cassandra best practices), your application will not see any outage.

Instaclustr Managed Cassandra Backup and Restore Options

The following are some of the different Cassandra backup and restore options available for Instaclustr’s customers.

Snapshot Backup

By default, each node takes a full backup once per 24 hours and uploads the data to an S3 bucket in the same region. Backup times are staggered at random hours throughout the day. The purpose of these backups is to recover the cluster data in case of data corruption or developer error (the most common example being accidentally deleting data). Backups are kept for seven days (or according to the lifecycle policy set on the bucket). For clusters that are hosted in Instaclustr’s AWS account, we do not replicate data across regions.

Continuous Backup

Continuous Backup can optionally be enabled to perform backups more frequently. Enabling Continuous Backup for a cluster will increase the frequency of snapshot backups to once every three hours, and additionally, enable commit log backups once every five minutes. This option provides a reduced window of potential data loss (i.e. a lower RPO). This feature can be enabled on any cluster as part of the enterprise add-ons package, for an additional 20% of cluster monthly price, or as per your contract.

Restore (Same Region)

Cluster backups may be used to restore data from a point-in-time to a new cluster, via the Console or using the Provisioning API. Data will be restored to a new cluster in the same region. This feature is fully automated and self-serve. 

Customized network configurations created outside the Instaclustr Console are not automatically restored, including peering.

For more information, read our documentation on Cassandra cluster restore operations. 

Next up, we’ll review some common failure scenarios and how they are resolved in greater detail.

Scenario 1: AZ Failure

For maximum fault tolerance, a Cassandra cluster should be architected using three (3) racks, which are each mapped to AWS Availability Zones (AZs). This configuration allows the loss of one AZ (rack) and QUORUM queries will still be successful.

Testing AZ Failure 

Instaclustr Technical Operations can assist with testing a disaster recovery (DR) situation by simulating the loss of one or more AZ. This is achieved by stopping the service (Cassandra) on all nodes in one rack (AZ) to simulate a DR situation. 

Below are two tests to consider:

First test: Stop Cassandra on all nodes in one rack

  • Rack = AZ
  • Simulates case where some or all nodes become unreachable in an AZ, but not necessarily failed
  • Measures impact to the application (we assume you’ll run some traffic through during the simulated outage)

Second test: Terminate all instances in one rack

  • Simulate a complete AZ failure
  • Instaclustr will replace each instance until the cluster is recovered
  • Measures recovery time

Our process in the case of an instance failure is:

  • Alerts would be triggered to PagerDuty detecting nodes down
  • Rostered engineer would investigate by:
    • Attempting to ssh to the instance
    • CheckingAWS instance status via CLI
  • Replace instance if it is unreachable (uses restore from backup).
  • Notify the customer of actions

Scenario 2: Region Failure

Failover DC in Another Region

In this option, another data center (DC) is added to the cluster in a secondary region. All data, and new writes, are continuously replicated to the additional DC. Reads are serviced by the primary DC. This is essentially a “hot standby” configuration.

Advantages:

  • Full cross-region redundancy
  • Very low Recovery Point Objective (RPO) and Recovery Time Objective (RTO). 
    1. RPO: Frequency of backups
    2. RTO: Amount of tolerable downtime for business
  • Simple to set up
  • Guaranteed AWS capacity in secondary region should the primary region become unavailable
  • Each DC is backed up to S3 in the same region
  • Protects against data corruption (at SSTable level)

Disadvantages

  • Does not protect against accidental deletes or malicious queries (all mutations are replicated to the secondary DC)

Cost

  • There is a potential to run the secondary DC with fewer or smaller instance types. At the very minimum it will need to service the full write load of the primary DC, as well as having sufficient storage for the data set. 
  • Cost includes additional AWS infrastructure cost, as well as Instaclustr management fees

Testing Region Failure 

Similar to simulating an AZ failure, a region outage can be simulated by:

First test: Stop Cassandra on all nodes in primary region

  • Quickest simulation, allows you to test how your application will respond (we assume you’ll run some traffic through during the simulated outage).

Second test: Terminate all instances in one region

  • Simulate a complete AZ failure (particularly where ephemeral instances are used)
  • Allows you to test how your application will respond (we assume you’ll run some traffic through during the simulated outage).
  • Measures recovery time for primary region after it becomes available again
    • All instances replaced or rebuilt from secondary DC
    • A repair would (likely) be required to catch up on writes missed during the outage.  

Scenario 3: Accidental/Malicious Delete, Data Corruption, etc.

For these scenarios, typically a partial data restore is required.  The recovery method, as well as the amount of data that can be recovered, will invariably depend on the nature of the data loss. You should contact support@instaclustr.com as soon as possible for expert advice. 

We have extensive experience in complex data recovery scenarios.

Case Study: Accidentally Dropped Table

Problem: Human error caused a production table to be accidentally dropped. 

Solution: Instaclustr TechOps was able to quickly reinstate the table data from the snapshots stored on disk. 

Recovery time: Less than one hour. 

Case Study: Data Corruption Caused by Cluster Overload

Problem: Unthrottled data loads caused SSTable corruption on disk and cluster outage. 

Solution: Combination of online and offline SSTable scrubs. Some data was also able to be recovered from recent backups.  A standalone instance was used to complete some scrubs and data recovery, in order to parallelise the work and reduce impact to live prod clusters.

Recovery time: Approximately 90% of the data was available within the same business day. Several weeks to recover the remainder. A small percentage of data was not able to be recovered.

Conclusion

In this post, we discussed common failure scenarios within Apache Cassandra, their respective causes and recovery solutions, as well as other nuances involved. Instaclustr has invested the time and effort to gain industry-leading, expert knowledge on the best practices for Cassandra. We are the open source experts when it comes to architecting the right solution to protecting and managing your data within Cassandra. 

Have more questions on Cassandra disaster recovery and other best practices? Reach out to schedule a consultation with one of our experts. Or, sign up for a free trial to get started with Cassandra right away.

The post Guide to Apache Cassandra Data and Disaster Recovery appeared first on Instaclustr.

Understanding Apache Cassandra Memory Usage

This article describes Apache Cassandra components that contribute to memory usage and provides some basic advice on tuning.

Cassandra memory usage is split into JVM heap and offheap.

Heap is managed by the JVM’s garbage collector.

Offheap is manually managed memory, which is used for:

  • Bloom filters: Used to quickly test if a SSTable contains a partition
  • Index summary: A search lookup of index positions
  • Compression metadata
  • Key cache (key_cache_size_in_mb): Used to store SSTable positions for a given partition key (so you can skip Index summary and Index scanning for data position)
  • File cache (file_cache_size_in_mb): 32MB of this are reserved for pooling buffers. The remaining MB serve as a cache holding uncompressed SSTables.
  • Row cache (row_cache_size_in_mb)
  • Counter cache (counter_cache_size_in_mb)
  • Offheap memtables (memtable_allocation_type)
  • Direct ByteBuffer (ByteBuffer.allocateDirect)
  • Memory mapped files (default disk_access_mode is mmap): Reading Data.db and Index.db files will use memory mapped files, which is where the operating system loads parts of the file into memory pages

Bloom Filters

Bloom filters are a data structure that lets you determine if a specific element is present in a set. Bloom filters let you look at data in Cassandra and determine between one of two possibilities for a given partition: 

1. It definitely does not exist in the given file, or:

2. It probably does exist in the file

If you want to make your bloom filters more accurate, configure them to consume more RAM. You can adjust this behavior for your bloom filters by changing bloom_filter_fp_chance to a float between 0 and 1. This parameter defaults to 0.1 for tables using LeveledCompactionStrategy, and 0.01 otherwise.

Bloom filters are stored offheap in RAM. As the bloom_filter_fp_chance gets closer to 0, memory usage increases, but does not increase in a linear fashion. 

Values for bloom_filter_fp_chance for false positives are usually between 0.01 (1%) to 0.1 (10%) chance.

You should adjust the parameter for bloom_filter_fp_chance depending on your use case. If you need to avoid excess IO operations, you should set  bloom_filter_fp_chance to a low number like 0.01. If you want to save RAM and care less about IO operations, you can use a higher  bloom_filter_fp_chance number. If you rarely read, or read by performing range slices over all the data (like Apache Spark does when scanning a whole table), an even higher number may be optimal. 

Index Summary 

Cassandra stores the offsets and index entries in offheap.

Compression Metadata

Cassandra stores the compression chunk offsets in offheap.

Key Cache

The key cache saves Cassandra from having to seek for the position of a partition. The key cache saves a good deal of time given how small it is, so it is worth using for at-large numbers. The global limit for the key cache is controlled in cassandra.yaml by setting key_cache_size_in_mb

There is also a per-table setting defined in the schema, in the property caching under keys, with the default set to ALL.

Row Cache

Compared to the key cache, the row cache saves more time but takes up more space. So the row cache should only be used for static rows or hot rows. The global limit for row cache is controlled in cassandra.yaml by setting row_cache_size_in_mb.

There is also a per-table setting defined in the schema, in the property caching under key rows_per_partition, with the default set to NONE.

Counter Cache

Counter cache helps cut down on counter locks’ contention for hot counter cells. Only the local (clock, count) tuple of a counter cell is, not the whole counter, so it is relatively cheap. You can adjust the global limit for counter cache managed in cassandra.yaml by setting counter_cache_size_in_mb.

Offheap Memtables

Since Cassandra 2.1, offheap memory can be used for memtables.

This is set via memtable_allocation_type in cassandra.yaml. If you want to have the least impact on reads, use offheap_buffers to move the cell name and value to DirectBuffer objects. 

With offheap_objects you can move the cell offheap. Then you only have a pointer to the offheap data. 

Direct ByteBuffer

There are a few miscellaneous places where Cassandra allocates offheap, such as HintsBuffer, and certain compressors such as LZ4 will also use offheap when file cache is exhausted.

Memory Mapped Files

By default, Cassandra uses memory mapped files. If the operating system is unable to allocate memory to map the file to, you will see message such as:

Native memory allocation (mmap) failed to map 12288 bytes for committing reserved memory.

If this occurs, you will need to reduce offheap usage, resize to hardware with more available memory, or enable swap.

Maximum Memory Usage Reached

This is a common log message about memory that often causes concern:

INFO o.a.c.utils.memory.BufferPool Maximum memory usage reached (536870912), cannot allocate chunk of 1048576.

Cassandra has a cache that is used to store decompressed SSTable chunks in offheap memory. In effect, this cache performs a similar job to the OS page cache, except the data doesn’t need to be decompressed every time it is fetched.

This log message just means that the cache is full. When this is full, Cassandra will allocate a ByteBuffer outside the cache, which can be a degradation of performance (since it has to allocate memory). This is why this message is only at INFO level and not WARN.

The default chunk cache size is 512MB. It could be modified by altering file_cache_size_in_mb in cassandra.yaml.

Examining Memory Usage

Running nodetool info will provide heap and offheap memory usage.

However, comparing this with actual memory usage will usually show a discrepancy. 

For example: with Cassandra running in Docker, Cassandra was using 16.8GB according to docker stats; nodetool info reported 8GB heap, 4GB heap. This leaves 4.8GB not accounted for. Where does the memory go?

This happens because offheap usage reported by nodetool info only includes:

  • Memtable offheap
  • Bloom filter offheap
  • Index summary offheap
  • Compression metadata offheap

Other sources of offheap usage are not included, such as file cache, key cache, and other direct offheap allocations.

To get started with Apache Cassandra, sign up for a free trial of Instaclustr Managed Cassandra today. Or, connect with one of our experts to get advice on optimizing Apache Cassandra memory usage for your unique environment.

The post Understanding Apache Cassandra Memory Usage appeared first on Instaclustr.

Change Data Capture (CDC) With Kafka Connect and the Debezium Cassandra Connector (Part 2)

In Part 1 of this two-part blog series we discovered that Debezium is not a new metallic element (but the name was inspired by the periodic table), but a distributed open source CDC technology, and that CDC (in this context) stands for Change Data Capture (not Centers for Disease Control or the Control Data Corporation). 

We introduced the Debezium architecture and its use of Kafka Connect and explored how the Debezium Cassandra Connector (on the source side of the CDC pipeline) emits change events to Kafka for different database operations. 

In the second part of this blog series, we examine how Kafka sink connectors can use the change data, discover that Debezium also propagates database schema changes (in different ways), and summarize our experiences with the Debezium Cassandra Connector used for customer deployment. 

1. How Does a Kafka Connect Sink Connector Process the Debezium Change Data?

At the end of Part 1, we had a stream of Cassandra change data available in a Kafka topic, so the next question is how is this processed and applied to a target sink system? From reading the Debezium documentation, the most common deployment is by means of Kafka Connect, using Kafka Connect sink connectors to propagate the changes into sink systems:

But Debezium only comes with source connectors, not sink connectors; the sink connectors are not directly part of the Debezium ecosystem. Where do you get sink connectors that can work with the Debezium change data from?

Based on my previous experiences with Kafka Connect Connectors, I wondered which open source sink connectors are possibly suitable for this task, given that:

  • The Debezium change data format is a complex JSON structure
  • It isn’t based on a standard
  • Each database produces a different format/content, and
  • The actions required on the sink system side will depend on:
    • The sink system,
    • The operation type, and 
    • The data values 

What’s perhaps not so obvious from the example so far is that Debezium also detects and propagates schema changes. In the above example, the schema is just sent explicitly in each message as the Kafka key, which can change. For example, if columns are added to the database table that is being monitored, then the schema is updated to include the new columns. So the sink connector will also have to be able to interpret and act on schema changes. 

The Debezium documentation does address at least one part of this puzzle (the complexity of the JSON change data structure), by suggesting that:

“… you might need to configure Debezium’s new record state extraction transformation. This Kafka Connect SMT (Single Message Transform) propagates the after structure from Debezium’s change event to the sink connector. This is in place of the verbose change event record that is propagated by default.”

Another relevant feature (in incubation) is Debezium Event Deserialization:

“Debezium generates data change events in the form of a complex message structure. This message is later on serialized by the configured Kafka Connect converter and it is the responsibility of the consumer to deserialize it into a logical message. For this purpose, Kafka uses the so-called SerDes. Debezium provides SerDes (io.debezium.serde.DebeziumSerdes) to simplify the deserialization for the consumer either being it Kafka Streams pipeline or plain Kafka consumer. The JSON SerDe deserializes JSON encoded change events and transforms it into a Java class.”

I had a look at Kafka Streams Serdes in a previous blog, so the Debezium Deserializer looks useful for Kafka Streams processing or custom Kafka consumers, but not so much so for off-the-shelf Kafka sink connectors.

2. Single Message Transforms

Let’s have a look at a simple SMT example from this blog.

As I discovered in my last blog, Kafka Connect JDBC Sink Connectors make different assumptions about the format of the Kafka messages. Some assume a flat record structure containing just the new names/values, others require a full JSON Schema and Payload, others allow custom JSON sub-structure extraction, and others (e.g. the one I ended up using after customizing the IBM sink connector) allow for arbitrary JSON objects to be passed through (e.g. for inserting into a PostgreSQL JSONB data type column).

These differences are due to the fact that they were designed for use in conjunction with different source systems and source connectors. All of the connectors I looked at only allowed for insert or upsert (insert if the row doesn’t exist, else update) operations, but not deletes. Some customization would therefore be required to cope with the full range of operations emitted by Debezium source connectors. 

For a simple use case where you only want to insert/upsert new records, Debezium provides a bridge between the complex change message format and simpler formats expected by JDBC sink connectors, in the form of a “UnwrapFromEnvelope” single message transform. This can be used in either the source or sink connector side by adding these two lines to the connector configuration:

"transforms": "unwrap",

transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"

Note that by default this SMT will filter out delete events, but there is an alternative, or possibly more recent SMT called io.debezium.transforms.ExtractNewRecordState, which also allows optional metadata to be included

3. Data Type Mappings

The Debezium Cassandra connector represents changes with events that are structured like the table in which the row exists, and each event contains field values and Cassandra data types. However, this implies that we need a mapping between the Cassandra data types and the sink system data types. There is a table in the documentation that describes the mapping from Cassandra data types to literal types (schema type) and semantic types (schema name), but currently not to “logical types”, Java data types, or potential sink system data types (e.g. to PostgreSQL, Elasticsearch, etc.). Some extra research is likely to be needed to determine the complete end-to-end Cassandra to sink system data type mappings. 

4. Duplicates and Ordering

The modern periodic table was invented by Mendeleev in 1869 to organize the elements, ordered by atomic weight. Mendeleev’s table predicted many undiscovered elements, including Germanium (featured in Part 1), which he predicted had an atomic weight of 70, but which wasn’t discovered until 1886.

The Debezium Cassandra Connector documentation highlights some other limitations of the current implementation. In particular, both duplicate and out-of-order events are possible. This implies that the Kafka sink connector will need to understand and handle both of these issues, and the behavior will also depend on the ability of the sink system to cope with them.

If the target system is sensitive to either of these issues, then it’s possible to use Kafka streams to deduplicate events:

and re-order (or wait for out-of-order) events (within a window):

Germanium arrived “out of order” but luckily Mendeleev had left a gap for it, so he didn’t have to redo his table from scratch.

The missing element, Germanium (actual atomic weight 72.6), a shiny metalloid.
(Source: Shutterstock)

5. Scalability Mismatch

Another thing you will need to watch out for is a potential mismatch of scalability between the source and sink systems. Cassandra is typically used for high-throughput write workloads and is linearly scalable with more nodes added to the cluster. Kafka is also fast and scalable and is well suited for delivering massive amounts of events from multiple sources to multiple sinks with low latency. It can also act as a buffer to absorb unexpected load spikes.

However, as I discovered in my previous pipeline experiments, you have to monitor, tune, and scale the Kafka Connect pipeline to keep the events flowing smoothly end-to-end. Even then, it’s relatively easy to overload the sink systems and end up with a backlog of events and increasing lag.

So if you are streaming Debezium change events from a high-throughput Cassandra source database you may have to tune and scale the target systems, optimize the performance of the Kafka Connect sink connectors and number of connector tasks running, only capture change events for a subset of the Cassandra tables and event types (you can filter out events that you are not interested in), or even use Kafka Streams processing to emit only significant business events, in order for the sink systems to keep up! And as we discovered (see below), the Debezium Cassandra Connector is actually on the sink-side of Cassandra as well.

(Source: Shutterstock)

Germanium transistors ran “hot” and even with a heat sink couldn’t match the speed of the new silicon transistors which were used in the CDC 6600. But Germanium has made a come-back: “Germanium Can Take Transistors Where Silicon Can’t

6. Our Experiences With the Debezium Cassandra Connector

As mentioned in Part 1, the Debezium Cassandra Connector is “special” — Cassandra is different from the other Debezium connectors since it is not implemented on top of the Kafka Connect framework. Instead, it is a single JVM process that is intended to reside on each Cassandra node and publish events to a Kafka cluster via a Kafka producer.

The Debezium Cassandra Connector has multiple components as follows:

  • Schema processor handles updates to schemas (which had a bug, see below)
  • Commit log processor, reads, and queues the Commit logs (which had a throughput mismatch issue)
  • Snapshot processor, which handles initial snapshot on start up (potentially producing lots of data)
  • Queue processor — a Kafka producer that emits change data to Kafka

We found and fixed some bugs (e.g. schema changes not being detected correctly or fast enough, see Report 1 and Report 2), and configured and optimized it for the customer use case. 

The Debezium Cassandra connector eventually achieved good performance, but it did need some effort to fix and configure it to work consistently and fast (i.e. changes consistently detected in under 1s). Some of these were already flagged in the Debezium documentation (“How the Cassandra Connector Works” and “When Things Go Wrong”), and relate to what happens when a Connector first starts up on a table producing an initial “snapshot” (potentially too much data), and the limitations of Cassandra commit logs (e.g. commit logs are per node, not per cluster; there is a delay between event logging and event capture, and they don’t record schema changes). 

We also had to ensure that the scalability mismatch flagged above between Cassandra and downstream systems wasn’t also an issue on the source side (the Debezium Cassandra Connector side). If the Debezium connector can’t process the Cassandra commit logs in a timely manner, then the Cassandra CDC directory fills up, and Cassandra will start rejecting writes, which is not a desirable outcome. To reduce the risk of the directory filling up under load, we changed how frequently the offsets were committed (at the cost of getting more duplicates if Debezium goes down on a node and has to reprocess a commit log) so that the Debezium Cassandra Connector could achieve better throughput and keep up. Just to be on the safe side, we also set up an alert to notify our technical operations team when the directory is getting too full.

Finally, in contrast to the examples in Part 1, which use the default Debezium human-readable JSON encoding for the schema and values, we used the more efficient binary encoding using Apache Avro. This means that you also have to have a Kafka Schema Registry running (which we provide as a Kafka Managed Service add-on). The Debezium Cassandra Connector encodes both the schema and the change data as Avro, but only puts a very short identifier to the schema in the Kafka key rather than the entire explicit schema, and sends schema changes to the Schema Registry. The Kafka Connect sink connector has to decode the Kafka record key and value from Avro, detect any changes in the schema, and get the new schema from the registry service. This reduces both the message size and Kafka cluster storage requirements significantly and is the recommended approach for high-throughput production environments.

There’s also another approach. For other connectors (e.g. MySQL), the schema changes can be propagated using a different mechanism, via a Kafka schema change topic.

7. Conclusions

The code for our forked Debezium Cassandra Connector can be found here, but we also contributed the code for the new schema evolution approach back to the main project.

Contact us if you are interested in running the Debezium Cassandra (or other) connectors, and check out our managed platform if you are interested in Apache Cassandra, Kafka, Kafka Connect, and Kafka Schema Registry.

What did we learn in this blog series? Debezium and Kafka Connect are a great combination for change data capture (CDC) across multiple heterogeneous systems, and can fill in the complete CDC picture – just as Trapeziums can be used to tile a 2D plane (In Part 1 we discovered that “Trapezium”, but not Debezium, is the only official Scrabble word ending in “ezium”).  M.C Escher was well known for his clever Tessellations (the trick of covering an area with repeated shapes without gaps). Here’s a well-known example “Day and Night” (which is now held by the National Gallery of Victoria in Melbourne, Australia, see this link for a detailed picture).

“Day and Night” at the Escher Museum, The Hague for debezium cdc kafka
“Day and Night” at the Escher Museum, The Hague. (Source: https://commons.wikimedia.org/wiki/File:Gevel_Escher_in_Het_Paleis_300_dpi.jpg)

Acknowledgements

Many thanks to the Instaclustr Debezium Cassandra project team who helped me out with information for this blog and/or worked on the project, including:

  • Chris Wilcox
  • Wesam Hasan
  • Stefan Miklosovic
  • Will Massey

The post Change Data Capture (CDC) With Kafka Connect and the Debezium Cassandra Connector (Part 2) appeared first on Instaclustr.

Change Data Capture (CDC) With Kafka Connect and the Debezium Cassandra Connector (Part 1)

It’s Quiz time! What does CDC stand for?

  1. Centers for Disease Control and Prevention
  2. Control Data Corporation
  3. Change Data Capture

(1) In the last year the U.S. Centers for Disease Control and Prevention (cdc.gov) has certainly been in the forefront of the news, and their Apache Kafka Covid-19 pipeline was the inspiration for my ongoing Kafka Connect streaming data pipeline series. So when colleagues at work recently mentioned CDC for databases I was initially perplexed.

(2) The second CDC that came to mind was the Control Data Corporation. This CDC was a now-defunct 1960s mainframe computer company, perhaps most famous for the series of supercomputers built by Seymour Cray before he started his own company. The CDC 6600 was the first computer to hit 1 megaFLOPS (arguably making it the first “supercomputer”), which it achieved by replacing the older and hotter-running Germanium transistors (see section 5.5 below) with the new silicon transistor.

CDC 6600 https://www.computerhistory.org/revolution/supercomputers/10/33/57

(3) But soon another word was introduced into the conversation which clarified the definition of CDC, Debezium! Although it sounds like a newly discovered metallic element, it’s actually a new open source distributed platform for Change Data Capture from multiple different databases. Like germanium, most metallic elements end in “ium”, and surprisingly the latest “ium” element, moscovium, was only officially named in 2016. The name Debezium (DBs + “ium”) was indeed inspired by the periodic table of elements

1. What is Change Data Capture?

Change Data Capture has been around for almost as long as databases (I even wrote one for Oracle in the 1990s!) and is an approach used to capture changes in one database to propagate to other systems to reuse (often other databases, but not exclusively). 

Some basic approaches rely on SQL to detect rows with changes (e.g. using timestamps and/or monotonically increasing values), or internal database triggers. Oracle, for example, produces a “change table” that can be used by external applications to get changes of interest to them. These approaches typically put extra load on the database and were therefore often only run as batch jobs (e.g. at night when the normal load was low or even paused), so the change data was only available to downstream systems once every 24 hours—far from real time!

More practical CDC solutions involve the use of custom applications (or sometimes native functionality) on the database to access the commit logs and make changes available externally, so called log-based CDC. Debezium uses this approach to get the change data, and then uses Kafka and Kafka Connect to make it available scalably and reliably to multiple downstream systems.

2. CDC Use Cases

The Debezium Github has a good introduction to Debezium Change Data Capture use cases, and I’ve thought of a few more (including some enabled by Kafka Streams):

  1. Cache invalidation, updating or rebuilding Indexes, etc.
  2. Simplifying and decoupling applications. For example:
    1. Instead of an application having to write to multiple systems, it can write to just one, and then:
    2. Other applications can receive change data from the one system, enabling multiple applications to share a single database
  3. Data integration across multiple heterogeneous systems
  4. For triggering real-time event-driven applications
  5. To use database changes to drive streaming queries
  6. To recreate state in Kafka Streams/Tables
  7. To build aggregate objects using Kafka Streams

The last two use cases (or possibly design patterns) use the power of Kafka Streams. There is also a recent enhancement for Kafka Streams which assists in relational to streams mapping using foreign key joins.

3. Kafka Connect JDBC Source Connectors

Recently I’ve been experimenting with Kafka Connect JDBC and PostgreSQL sink connectors for extensions to my pipeline blogs. But what I hadn’t taken much notice of was that there were also some JDBC source connectors available. For example, the Aiven open source jdbc connector comes in sink and source flavors. It supports multiple database types and has configurations for setting the poll interval and the query modes (e.g. batch and various incremental modes). However, it will put extra load onto the database, and the change data will be out of date for as long as the poll interval, and it only works for SQL databases, not for NoSQL databases such as Cassandra. 

There is at least one open source Kafka Connect Cassandra source connector, which we mention on our support pages, but it is not suitable for production environments.

So how does Debezium work for production CDC scenarios?

4. How does Debezium Work? Connectors + Kafka Connect

Debezium has “Connectors” (to capture change data events) for the following list of databases currently:

  • MySQL
  • MongoDB
  • PostgreSQL
  • SQL Server
  • Oracle (preview)
  • Cassandra (preview)

Debezium requires custom database side applications/plugins to be installed for some of these (PostgreSQL and Cassandra), which means that you may need the cooperation of your cloud provider if you are running them in the cloud.

Debezium “Connectors” always write the change data events to a Kafka cluster, and most of them are in fact Kafka Connect source connectors, so you will most likely need a Kafka Connect cluster as well, which brings other advantages. Here’s an example of the Debezium architecture for a PostgreSQL source, which uses both Kafka Connect source and sink connectors.

Using Kafka and Kafka Connect clusters to capture and deliver change data capture messages has many potential benefits. Kafka is very fast and supports high message throughput. It works well as a buffer to absorb load spikes to ensure that the downstream systems are not overloaded and that no messages are lost. It also supports multiple producers, enabling change data capture from multiple sources. 

Moreover, multiple sinks can subscribe to the same messages, allowing for sharing and routing of the CDC data among heterogeneous target systems. The loose coupling between consumers and producers is beneficial for the CDC use case, as the consumers can consume from multiple heterogeneous data sources, and transform, filter, and act on the change types depending on the target systems. 

One of my favorite Kafka “super powers” is its ability to “replay” events. This is useful for the CDC use case for scenarios involving rebuilding the state of downstream systems (e.g. reindexing) without putting extra load on the upstream databases (essentially using Kafka as a cache). An earlier (pre-Kafka) heterogeneous open source CDC system, DataBus (from LinkedIn), also identified this as a vital feature, which they called “look-back”.

“Looking back” at event streams comes with Kafka by default

5. Change Data Capture With the Debezium Cassandra Connector

Instaclustr recently built, deployed, configured, monitored, secured, ran, tested, fixed, and tuned the first Debezium Cassandra Connector on our managed platform for a customer. The customer use case was to replicate some tables from Cassandra into another analytical database, in close to real time, and reliably at scale. The rest of this blog series looks at the Debezium Cassandra Connector in more detail, explores how the change data could be used by downstream applications, and summarises our experiences from this project. 

Because the Cassandra connector is “incubating” it’s important to understand that the following changes will be ignored:

  • TTL on collection-type columns
  • Range deletes
  • Static columns
  • Triggers
  • Materialized views
  • Secondary indices
  • Lightweight transactions

5.1 What “Change Data” is Produced by the Debezium Cassandra “Connector”?

Most Debezium connectors are implemented as Kafka Connect source connectors, except for the Debezium Cassandra Connector, which just uses a Kafka producer to send the change data to Kafka as shown in this diagram: 

From the documentation for the Debezium Cassandra “Connector”, I discovered that the Debezium Cassandra Connector:

  1. Writes events for all Casandra insert, update and delete operation
  2. On a single table
  3. To a single Kafka topic
  4. All change events have a key and value
  5. The Kafka key is a JSON object that contains:
    1. A “key schema”, an array of fields including the name, type and possibly a “logicalType” (which is not really explained anywhere), and
    2. A payload (name value pairs);
    3. For each column in the primary key of the table
  6. The Kafka value is a bit more complicated, see below

The value of a change event message is more complicated, is JSON, and contains the following fields:

  • op: The type of operation, i = insert, u = update, d = delete
  • after: The state of the row after the operation occurred
  • source: Source metadata for the event 
  • ts_ms: The time at which Debezium processed the event

Unlike some other Debezium connectors, there is no “before” field (as the Cassandra commit log doesn’t contain this information), and some fields are optional (after, and ts_ms). Note that the value of “after” fields is null if their state is unchanged, except for primary key fields which always have a value.

Here are some simplified examples, for keeping web page metrics.

First, an insert (where the table primary key is page, and there are also visits and conversions):

INSERT INTO analytics (page, visits, conversions) VALUES ("/the-power-of-kafka-partitions", 4533, 55);

Results in this (simplified, the source fields are omitted for clarity) message value:

{
 "op": "c",
 "ts_ms": 1562202942832,
 "after": {
  "page": {
    "value": "/the-power-of-kafka-partitions/",
    "deletion_ts": null,
    "set": true
  },
  "visits": {
    "value": 4533,
    "deletion_ts": null,
    "set": true
  },
  "conversions": {
    "value": 55,
    "deletion_ts": null,
    "set": true
  }
 },
 "source": {
  ...
 }
}

Note that each of the after fields has a new value as the record has been created.

Now let’s see what happens with an UPDATE operation:

UPDATE analytics
SET visits=4534
WHERE page="/the-power-of-kafka-partitions/";

An UPDATE results in a change message with this value:

{
 "op": "u",
 "ts_ms": 1562202944319,
 "after": {
  "page": {
   "value": "/the-power-of-kafka-partitions/",
   "deletion_ts": null,
   "set": true
  },
  "visits": {
   "value": 4534,
   "deletion_ts": null,
   "set": true
  },
  "conversions": null
 },
 "source": {
  ...
 }
}

Note that for this UPDATE change event, the op field is now “u”, the after visits field has the new value, the after conversions field is null as it didn’t change, but the after page value is included as it’s the primary key (even though it didn’t change).

And finally a DELETE operation:

DELETE FROM analytics
WHERE page="/the-power-of-kafka-partitions/";

Results in this message value:

{
 "op": "d",
 "ts_ms": 1562202946329,
 "after": {
  "page": {
  "value": "/the-power-of-kafka-partitions/",
  "deletion_ts": 1562202947401,
  "set": true
  },
  "visits": null,
  "conversions": null  }
 },
 "source": {
  ...
 }
}

The op is now “d”, the after field only contains a value for page (the primary key) and deletion_ts now has a value, and other field values are null.

To summarize:

  • For all operation types the primary key after fields have values
  • For updates, non-primary key after fields have values if the value changed, otherwise null
  • For deletes, non-primary key after fields have null values

And now it’s time for a brief etymological interlude, brought to you by the suffix “ezium”:

Oddly enough, there is only one word permitted in Scrabble which ends in “ezium”, trapezium! Sadly. Debezium hasn’t made it into the Scrabble dictionary yet. A trapezium (in most of the world) is a quadrilateral with at least one pair of parallel sides. But note that in the U.S. this is called a trapezoid, and a “trapezium” has no parallel sides. Here’s an “Australian” Trapezium = U.S. Trapezoid:

Read Next: Change Data Capture (CDC) With Kafka Connect and the Debezium Cassandra Connector (Part 2)

The post Change Data Capture (CDC) With Kafka Connect and the Debezium Cassandra Connector (Part 1) appeared first on Instaclustr.

The CAP Theorem With Apache Cassandra and MongoDB

MongoDB and Apache Cassandra are both popular NoSQL distributed database systems. In this article, I will review how the CAP and PACELC theorems classify these systems. I will then show how both systems can be configured to deviate from their classifications in production environments. 

The CAP Theorem

The CAP theorem states that a distributed system can provide only two of three desired properties: consistency, availability, and partition tolerance. 

Consistency (C): Every client sees the same data. Every read receives the data from the most recent write. 

Availability (A): Every request gets a non-error response, but the response may not contain the most recent data.

Partition Tolerance (P): The system continues to operate despite one or more breaks in inter-node communication caused by a network or node failure. 

Because a distributed system must be partition tolerant, the only choices are deciding between availability and consistency. If part of a cluster becomes unavailable, a system will either: 

  • Safeguard data consistency by canceling the request even if it decreases the availability of the system. Such systems are called CP systems. 
  • Provide availability even though inconsistent data may be returned. These systems are AP distributed systems. 

According to the CAP theorem, MongoDB is a CP system and Cassandra is an AP system.

CAP theorem provides an overly simplified view of today’s distributed systems such as MongoDB and Cassandra. Under normal operations, availability and consistency are adjustable and can be configured to meet specific requirements. However, in keeping with CAP, increasing one state decreases the other. Hence, it would be more correct to describe the default behavior of MongDB or Cassandra as CP or AP. This is discussed in more detail below.

PACELC Theorem

The PACELC theorem was proposed by Daniel J. Abadi in 2010 to address two major oversights of CAP:

  1. It considers the behavior of a distributed system only during a failure condition (the network partition)
  2. It fails to consider that in normal operations, there is always a tradeoff between consistency and latency

PACELC is summarized as follows: In the event of a partition failure, a distributed system must choose between Availability (A) and Consistency, else (E) when running normally it must choose between latency (L) or consistency (C).

MongoDB is classified as a PC+EC system. During normal operations and during partition failures, it emphasizes consistency. Cassandra is a PA+EL system. During a partition failure it favors availability. Under normal operations, Cassandra gives up consistency for lower latency. However, like CAP, PACELC describes a default behavior

(As an aside, there are no distributed systems that are AC or PC+EC. These categories describe stand-alone ACID-compliant relational database management systems).

Apache Cassandra vs. MongoDB Architectures

MongoDB is a NoSQL document database. It is a single-master distributed system that uses asynchronous replication to distribute multiple copies of the data for high availability. A MongoDB is a group of instances running mongod and maintaining the same data. The MongoDB documentation refers to this grouping as a replica set. But, for simplicity, I will use the MongoDB cluster. 

A MongoDB cluster is composed of two types of data-bearing members: 

Primary: The primary is the master node and receives all write operations. 

Secondaries: The secondaries receive replicated data from the primary to maintain an identical data set. 

By default, the primary member handles all reads and writes. Optionally, a MongoDB client can route some or all reads to the secondary members. Writes must be sent to the primary. 

If the primary member fails, all writes are suspended until a new primary is selected from one of the secondary members. According to the MongoDB documentation, this process requires up to 12 seconds to complete. 

To increase availability, a cluster can be distributed across geographically distinct data centers. The maximum size of a MongoDB replicaset is 50 members. 

A MongoDB cluster.

A Cassandra cluster is a collection of instances, called nodes, connected in a peer-to-peer “share nothing” distributed architecture. There is no master node and every node can perform all database operations and each can serve client requests. Data is partitioned across nodes based on a consistent hash of its partitioning key. A partition has one or many rows and each node may have one or more partitions. However, a partition can reside only on one node.  

Data has a replication factor that determines the number of copies (replicas) that should be made. The replicas are automatically stored on different nodes. 

The node that first receives a request from a client is the coordinator. It is the job of the coordinator to forward the request to the nodes holding the data for that request and to send the results back to the coordinator. Any node in the cluster can act as a coordinator. 

The CAP Theorem With Apache Cassandra and MongoDB4.png (2)
Cassandra cluster showing coordinator. 

Consistency

By default, MongoDB is a strongly consistent system. Once a write completes, any subsequent read will return the most recent value.

Cassandra, by default, is an eventually consistent system. Once a write completes, the latest data eventually becomes available provided no subsequent changes are made.

It is important to keep in mind that MongoDB becomes an eventually consistent system when read operations are done on the secondary members. This happens because of replication lag (the delay between when the data is written to the primary and when that data is available on the secondary). The larger the lag, the greater chance that reads will return inconsistent data. 

Tunable Consistency

Both MongoDB and Cassandra have “tunable consistency.” That is, the levels of consistency and availability are adjustable to meet certain requirements. Individual read and write operations define the number of members or replicas that must acknowledge a request in order for that request to succeed. In MongoDB, this level is called read concern or write concern. In Cassandra, the level of acknowledgment is the consistency level of the operation. The definitions of each are shown in Tables 1 and 2. 

Read Concern Description
Local Returns data from the instance without guaranteeing the data has been written to a majority of the instances. This is equivalent to a read uncommitted isolation in a relational database.
Available Same as local. 
Majority Guarantees that a majority of the cluster members acknowledged the request. A majority read returns only committed data.
Write Concern Description
0 Does not require an acknowledgment of the write. 
1 Requires acknowledgment from the primary member only.
<number> Checks if the operation has replicated to the specified number of instances.
Majority Checks if the operations have propagated to the majority.
Table 1: MongoDB Read and Write Concerns 
Consistency Level Description
ONE, TWO, THREE How many replicas need to respond to a read or write request. 
LOCAL_ONE One replica in the same data center as the coordinator must successfully respond to the read or write request. Provides low latency at the expense of consistency
LOCAL_QUORUM A quorum (majority) of the replica nodes in the same data center as the coordinator must respond to the read or write request. Avoids latency of inter-data-center communication.
QUORUM A quorum (majority) of the replicas in the cluster need to respond to a read or write request for it to succeed. Used to maintain strong consistency across the entire cluster.
EACH_QUORUM The read or write must succeed on a quorum of replica nodes in each data center. Used only for writes to a multi-data-center cluster to maintain the same level of consistency across data centers.
ALL  The request must succeed on all replicas. Provides the highest consistency and the lowest availability of any other level.
ANY A write must succeed on at least one node or, if all replicas are down, a hinted handoff has been written. Guarantees that a write will never fail at the expense of having the lowest consistency. Delivers the lowest consistency and highest availability.
Table 2: Cassandra Consistency Levels

One important limitation is that no combination of read and write concerns can make MongoDB strongly consistent once reads are permitted on the secondary members. 

In Cassandra, data reads can be made strongly consistent if operations follow the formula (R = read consistency, W = write consistency, and N = replication factor):

R + W > N

Read and write consistency can be adjusted to optimize a specific operation. For example:

Optimization Read Consistency Write Consistency
Write ALL ONE
Read ONE ALL
Balance QUORUM QUORUM
None: High Latency With Low Availability ALL ALL
Table 3: Achieving Strong Consistency in Cassandra

The last setting is an extreme example of how you can get very strong consistency but lose all fault tolerance. If only one replica becomes unavailable, the query fails. 

On the other hand, MongoDB could be configured with read and write concerns so low that only eventual consistency is possible. 

The majority read/write concern differs from Cassandra’s quorum consistency level. Majority will return only committed data from a majority of the nodes. Cassandra read at quorum can return uncommitted data. 

Both systems can submit statements with linearizable consistency (data must be read and written in sequential order across all processes) with some restrictions. 

MongoDB provides linearizable consistency when you combine “majority” write concern with “linearizable” read concern. However, to use the linearizable read concern you must read data from the primary. 

Cassandra uses a modified Paxos consensus protocol to implement lightweight transactions. Lightweight transactions use a consistency level of SERIAL or LOCAL_SERIAL (equivalent to QUORUM and LOCAL_QUORUM). 

Losing Consistency During Normal Operations

Replicating multiple copies of data is how Cassandra and MongoDB increase availability. Data in these copies can become inconsistent during normal operations. However, as we shall see, Cassandra has background processes to resolve these inconsistencies.

When MongoDB secondary members become inconsistent with the primary due to replication lag, the only solution is waiting for the secondaries to catch up. MongoDB 4.2 can throttle replication whenever the lag exceeds a configurable threshold.

However, if the data in the secondaries becomes too stale, the only solution is to manually synchronize the member by bootstrapping the member after deleting all data, by copying a recent data directory from another member in the clusteror, or by restoring a snapshot backup.

Cassandra has two background processes to synchronize inconsistent data across replicas without the need for bootstrapping or restoring data: read repairs and hints. 

If Cassandra detects that replicas return inconsistent data to a read request, a background process called read repair imposes consistency by selecting the last written data to return to the client. 

If a Cassandra node goes offline, the coordinator attempting to write the unavailable replica temporarily stores the failed writes as hints on their local filesystem. If hints have not expired (three hours by default), they are written to the replica when it comes back online, a process known as hinted handoffs. 

Anti-entropy repairs (a.k.a. repairs) is a manual process to synchronize data among the replicas. Running repairs is part of the routine maintenance for a Cassandra cluster. 

Loss of Consistency During a Failure

If the primary member fails, MongoDB preserves consistency by suspending writes until a new primary is elected. Any writes to the failed primary that have not been replicated are rolled back when it returns to the cluster as a secondary. Later versions of MongoDB (4.0 and later) also create rollback files during rollbacks.

When a Cassandra node becomes unavailable, processing continues and failed writes are temporarily saved as hints on the coordinator. If the hints have not expired, they are applied to the node when it becomes available. 

Availability

Both MongoDB and Cassandra get high availability by replicating multiple copies of the data. The more copies, the higher the availability. Clusters can be distributed across geographically distinct data centers to further enhance availability. 

The maximum size MongoDB cluster is 50 members with no more than seven voting members. There is no hard limit to the number of nodes in a Cassandra cluster, but there can be performance and storage penalties for setting the replication factor too high. A typical replication factor is three for most clusters or five when there are very high availability requirements. Conversely, when data availability is less critical, say with data that can easily be recreated, the replication factor can be lowered to save space and to improve performance.

Fault Tolerance

If the primary member of a MongoDB cluster becomes unavailable for longer than electionTimeoutMillis (10 seconds by default), the secondary members hold an election to determine a new primary as long as a majority of members are reachable.

During the election, which can take up to 12 seconds, a MongoDB cluster is only partially available: 

  1. No writes are allowed until a new primary is elected
  2. Data can be read from the secondary if a majority of the secondary members are online and reads from the secondary members have been enabled

Because all Cassandra nodes are peers, a cluster can tolerate the loss of multiple replicas provided the consistency level of the operation is met. Even if a majority of replicas are lost, it is possible to continue operations by reverting to the default consistency level of ONE or LOCAL_ONE.

Fault Tolerance With Multiple Data Centers

Both MongoDB and Cassandra clusters can span geographically distinct data centers in order to increase high availability. If one data center fails, the application can rely on the survivors to continue operations. 

How MongoDB responds to the loss of a data center depends upon the number and placement of the members among the data centers. Availability will be lost if the data center containing the primary server is lost until a new primary replica is elected. 

The data can still be available for reads if it is distributed over multiple data centers even if one of the data centers fails. If the data center with a minority of the members goes down, the cluster can serve read and write operations; if the data center with the majority goes down, it can only perform read operations.

With three data centers, if any data center goes down, the cluster remains writeable as the remaining members can hold an election.

The fault tolerance of a Cassandra cluster depends on the number of data centers, the replication factor, and how much consistency you are willing to sacrifice. As long as the consistency level can be met, operations can continue.

Number of Failed Nodes Without Compromising High Availability
RF 1 Consistency Level 1 DC 2 DC 3 DC
2 ONE/LOCAL_ONE 1 node 3 nodes total 5 nodes total
2 LOCAL_QUORUM 0 nodes 0 nodes 0 nodes
2 QUORUM 0 nodes 1 node total 2 nodes total
2 EACH_QUORUM 0 nodes 0 nodes 0 nodes
3 LOCAL_ONE 2 nodes 2 nodes from each DC 2 nodes from each DC
3 ONE 2 nodes 5 nodes total 8 nodes total
3 LOCAL_QUORUM 1 node 2 nodes total 1 node from each DC or 2 nodes total
3 QUORUM 1 node 2 nodes total 4 nodes total
3 EACH_QUORUM 2 nodes 1 node each DC 1 node each DC
5 ONE  4 nodes 9 nodes 14 nodes
5 LOCAL_ONE 4 nodes 4 nodes from each DC 4 nodes from each DC
5 LOCAL_QUORUM 2 nodes 2 nodes from each DC 2 nodes from each DC
5 QUORUM 2 nodes 4 nodes total 7 nodes total
5 LOCAL_QUORUM 2 nodes Could survive loss of a DC Could survive loss of 2 DCs 
5 EACH_QUORUM 2 nodes from each 2 nodes from each 2 nodes from each
Table 4: Fault Tolerance of Cassandra

1 Assumes the replication factor for each DC 

Conclusions on Cassandra vs. MongoDB

With the CAP or PACELC theorems, MongoDB is categorized as a distributed system that guarantees consistency over availability while Cassandra is classified as a system that favors availability. However, these classifications only describe the default behavior of both systems. MongoDB remains strongly consistent only as long as all reads are directed to the primary member. Even when reads are limited to the primary member, consistency can be loosened by read and write concerns. Cassandra can also be made strongly consistent through adjustments in consistency levels as long as a loss of availability and increased latency is acceptable. 

Want to learn more about how to identify the best technology for your data layer? Contact us to schedule a time with our experts.

The post The CAP Theorem With Apache Cassandra and MongoDB appeared first on Instaclustr.

Why AWS PrivateLink Is Not Recommended With Kafka or Cassandra

AWS PrivateLink (also known as a VPC endpoint) is a technology that allows the user to securely access services using a private IP address. It is not recommended to configure an AWS PrivateLink connection with Apache Kafka or Apache Cassandra mainly due to a single entry point problem. PrivateLink only exposes a single IP to the user and requires a load balancer between the user and the service. Realistically, the user would need to have an individual VPC endpoint per node, which is expensive and may not work. 

Using PrivateLink, it is impossible by design to contact specific IPs within a VPC in the same way you can with VPC peering. VPC peering allows a connection between two VPCs, while PrivateLink publishes an endpoint that others can connect to from their own VPC.

Cassandra

The load-balancing policy for queries is set in the Cassandra driver (e.g. DCAwareRoundRobinPolicy or TokenAwarePolicy). This determines which node is selected as the coordinator node. The driver will connect directly to that node when making the query. AWS PrivateLink exposes only a single endpoint (IP) making all access to nodes connect through that load balancer, and therefore the load balancing policy in the driver is circumvented. DCAwareRoundRobinPolicy is able to effectively load balance anyway, so the load balancing policy being circumvented is not a real issue. For TokenAwarePolicy this can be a problem because the query is not routed to the node with the data, so a different node is called on forcing extra processing overhead and another network hop.

Kafka Connection Without AWS PrivateLink Image
Kafka Connection Without AWS PrivateLink

Kafka Connection With AWS PrivateLink Image
Kafka Connection With AWS PrivateLink

Kafka uses partitions that can have copies (dependent on replication factor) to increase durability and availability, and allow Kafka to failover to a broker with a replica of the partition if that broker fails. Each partition has one master that the driver needs to connect to directly for writes. When using AWS PrivateLink with Kafka, ports must be configured to route traffic from the load balancer. 

In the graphic above, AWS PrivateLink cuts off individual connections from the application to the brokers. Drivers aren’t intelligent enough to be cut off by a load balancer unless each connection has been configured through PrivateLink which can be tedious and is already built into Kafka. AWS PrivateLink defeats the purpose of these connections between our application and Kafka and causes a single point of failure if servers were to go down. While it has been found that it’s possible to connect Kafka and PrivateLink through Confluent, this comes with a lot of limitations. Just using VPC peering instead of PrivateLink would make it much easier to establish connections in our case.

Conclusion 

AWS PrivateLink is intended to provide a secure, private connection between different accounts and VPCs to simplify the structure of your network. But when using it to connect with technologies like Apache Kafka and Apache Cassandra, PrivateLink makes things more complex. The single endpoint causes much more stress than these technologies need, which is why using PrivateLink is not recommended.

Have more questions on AWS PrivateLink or other best practices for Apache Cassandra and Apache Kafka? Reach out to schedule a consultation with one of our experts.

The post Why AWS PrivateLink Is Not Recommended With Kafka or Cassandra appeared first on Instaclustr.

Analyzing Cassandra Data using GPUs, Part 2

Analyzing Cassandra Data using GPUs, Part 1

Why You Shouldn’t Run Nodetool Removenode

There comes a time when your Apache Cassandra implementation will require scaling down or general maintenance (significant drop in disk usage or a number of reads and writes per second, etc.). Removing a node, whether it is working or not, falls under this scaling and maintenance. When scaling down, it is necessary to have the tools to remove a dead or live node. When such a case arises, the nodetool utility provides us with the following options: nodetool decommission, nodetool removenode, and nodetool assassinate.

Decommission steams data from the decommissioned node. This guarantees consistency before, during, and after the operation.

nodetool decommission

  • Data owned by this node will be streamed to the other nodes
  • Cassandra process will still be running on this node but outside the cluster
  • This process can be killed or the instance can be shut down safely at this point
  • Don’t decommission a node without good reason

Removenode alters the token allocation and streams data from the remaining replicas, which entertains the possibility of violating consistency and losing data. Removenode should only be used as a last resort where decommission cannot be used. Note that removenode will trigger streaming to other nodes in the cluster. On an already overloaded cluster, this will place additional strain leading to further instability.

nodetool removenode

  • Data owned by this node will be streamed to the new token ranges from live replicas within the cluster. Upon completion, the node will be removed.

Assassinate should be viewed as an absolute last resort option when forcing a node out of a cluster, and you should fully understand the implications. There is a significant chance of data loss in this case, especially when replication factors and consistency levels are not in line with best practices. This is due to the fact that the assassinate command will simply drop the node without streaming any data from other replicas or the leaving node itself.

We have seen numerous cases of removenode and/or assassinate being used during an outage situation with disastrous results. Removenode is a particularly bad idea in this situation as removenode places additional load on the UN nodes in terms of streaming (compounding the issue of loss of capacity while having a down node). Also when a cluster is unstable, like during an outage situation, you shouldn’t be making token changes because messing with token ownership can result in data loss or data unavailability on all nodes. Again, we have been involved in too many incidents where a customer has found themselves in this situation. Had they persisted with recovering the down node, they would have been in a much better position for recovery and the overall outage time would have been greatly reduced. 

What you should do during a node outage is fix the node or follow the replace procedure if you can’t fix the down node. Never use removenode to recover a node that has failed.

Downsizing 

A valid use for removenode is for permanent removal of a node, i.e. downsizing. 

However, this is still not recommended, and you should use nodetool decommission instead, as decommission is consistent and removenode isn’t. Decommission streams data from the leaving node, so any special data that node has falls onto other nodes, reducing the chance of data loss. Removenode streams from any replica but not the leaving node, so it can miss data that resides only on the leaving node, or in the case of quorum writes, break quorum guarantees.

Replacing a Node 

  1. Starting a new node by indicating to Cassandra that it is a replacement

Note: Make sure that the original node is stopped (DN) before beginning the replace operation. If replacing an existing node, everything in the Cassandra DIR needs to be deleted (delete data replace), and autho_bootstrap needs to be set to true so the node will stream off other nodes within the cluster.

  • Similar procedure to adding a node
  • Configure the new node similarly to the old node (cassandra.yaml, rackdc etc.)
  • Use the same Cassandra version
  • Edit cassandra-env.sh or jvm.option as appropriate with:
    • JVM_OPTS=”$JVM_OPTS -Dcassandra.replace_address=<old_ip_address>”
  • Once the node is UN, remove the JVM_OPTS line
  • Adjust the seeds as needed

If you need to decommission:

nodetool decommission

Check the joining status with nodetool status from another node in the cluster: nodetool status

Check the streaming status with nodetool netstats from another node in the cluster: nodetool netstats

You should see some streaming at this point.

Have more questions on configuring node types or other best practices for Apache Cassandra? Reach out to schedule a consultation with one of our experts.

The post Why You Shouldn’t Run Nodetool Removenode appeared first on Instaclustr.

Exploring Data @ Netflix

By Gim Mahasintunan on behalf of Data Platform Engineering.

Supporting a rapidly growing base of engineers of varied backgrounds using different data stores can be challenging in any organization. Netflix’s internal teams strive to provide leverage by investing in easy-to-use tooling that streamlines the user experience and incorporates best practices.

In this blog post, we are thrilled to share that we are open-sourcing one such tool: the Netflix Data Explorer. The Data Explorer gives our engineers fast, safe access to their data stored in Cassandra and Dynomite/Redis data stores.

Netflix Data Explorer on GitHub

History

We began this project several years ago when we were onboarding many new Dynomite customers. Dynomite is a high-speed in-memory database, providing highly available cross datacenter replication while preserving Redis-like semantics. We wanted to lower the barrier for adoption so users didn’t need to know datastore-specific CLI commands, could avoid mistakenly running commands that might negatively impact performance, and allow them to access the clusters they frequented every day.

As the project took off, we saw a similar need for our other datastores. Cassandra, our most significant footprint in the fleet, seemed like a great candidate. Users frequently had questions on how they should set up replication, create tables using an appropriate compaction strategy, and craft CQL queries. We knew we could give our users an elevated experience, and at the same time, eliminate many of the common questions on our support channels.

We’ll explore some of the Data Explorer features, and along the way, we’ll highlight some of the ways we enabled the OSS community while still handling some of the unique Netflix-specific use cases.

Multi-Cluster Access

By simply directing users to a single web portal for all of their data stores, we can gain a considerable increase in user productivity. Furthermore, in production environments with hundreds of clusters, we can reduce the available data stores to those authorized for access; this can be supported in OSS environments by implementing a Cluster Access Control Provider responsible for fetching ownership information.

Browsing your accessible clusters in different environments and regions

Schema Designer

Writing CREATE TABLE statements can be an intimidating experience for new Cassandra users. So to help lower the intimidation factor, we built a schema designer that lets users drag and drop their way to a new table.

The schema designer allows you to create a new table using any primitive or collection data type, then designate your partition key and clustering columns. It also provides tools to view the storage layout on disk; browse the supported sample queries (to help design efficient point queries); guide you through the process of choosing a compaction strategy, and many other advanced settings.

Dragging and dropping your way to a new Cassandra table

Explore Your Data

You can quickly execute point queries against your cluster in Explore mode. The Explore mode supports full CRUD of records and allows you to export result sets to CSV or download them as CQL insert statements. The exported CQL can be a handy tool for quickly replicating data from a PROD environment to your TEST environment.

Explore mode gives you quick access to table data

Support for Binary Data

Binary data is another popular feature used by many of our engineers. The Data Explorer won’t fetch binary value data by default (as the persisted data might be sizable). Users can opt-in to retrieve these fields with their choice of encoding.

Choosing how you want to decode blob data

Query IDE

Efficient point queries are available in the Explore mode, but you may have users that still require the flexibility of CQL. Enter the Query mode, which includes a powerful CQL IDE with features like autocomplete and helpful snippets.

Example of free-form Cassandra queries with autocomplete assistance

There are also guardrails in place to help prevent users from making mistakes. For instance, we’ll redirect the user to a bespoke workflow for deleting a table if they try to perform a “DROP TABLE…” command ensuring the operation is done safely with additional validation. (See our integration with Metrics later in this article.)

As you submit queries, they will be saved in the Recent Queries view as well — handy when you are trying to remember that WHERE clause you had crafted before the long weekend.

Dynomite and Redis Features

While C* is feature-rich and might have a more extensive install base, we have plenty of good stuff for Dynomite and Redis users too. Note, the terms Dynomite and Redis are used interchangeably unless explicitly distinguished.

Key Scanning

Since Redis is an in-memory data store, we need to avoid operations that inadvertently load all the keys into memory. We perform SCAN operations across all nodes in the cluster, ensuring we don’t strain the cluster.

Scanning for keys on a Dynomite cluster

Dynomite Collection Support

In addition to simple String keys, Dynomite supports a rich collection of data types, including Lists, Hashes, and sorted and unsorted Sets. The UI supports creating and manipulating these collection types as well.

Editing a Redis hash value

Supporting OSS

As we were building the Data Explorer, we started getting some strong signals that the ease-of-use and productivity gains that we’d seen internally would benefit folks outside of Netflix as well. We tried to balance codifying some hard-learned best practices that would be generally applicable while maintaining the flexibility to support various OSS environments. To that end, we’ve built several adapter layers into the product where you can provide custom implementations as needed.

The application was architected to enable OSS by introducing seams where users could provide their implementations for discovery, access control, and data store-specific connection settings. Users can choose one of the built-in service providers or supply a custom provider.

The diagram below shows the server-side architecture. The server is a Node.js Express application written in TypeScript, and the client is a Single Page App written in Vue.js.

Data Explorer architecture and service adapter layers

Demo Environment

Deploying a new tool in any real-world environment is a time commitment. We get it, and to help you with that initial setup, we have included a dockerized demo environment. It can build the app, pull down images for Cassandra and Redis, and run everything in Docker containers so you can dive right in. Note, the demo environment is not intended for production use.

Overridable Configuration

The Data Explorer ships with many default behaviors, but since no two production environments are alike, we provide a mechanism to override the defaults and specify your custom values for various settings. These can range from which port numbers to use to which features should be disabled in a production environment. (For example, the ability to drop a Cassandra table.)

CLI Setup Tool

To further improve the experience of creating your configuration file, we have built a CLI tool that provides a series of prompts for you to follow. The CLI tool is the recommended approach for building your configuration file, and you can re-run the tool at any point to create a new configuration.

The CLI allows you to create a custom configuration

You can also generate multiple configuration files and easily switch between them when working with different environments. We have instructions on GitHub on working with more than one configuration file.

Service Adapters

It’s no secret that Netflix is a big proponent of microservices: we have discovery services for identifying Cassandra and Dynomite clusters in the environment; access-control services that identify who owns a data store and who can access it; and LDAP services to find out information about the logged-in user. There’s a good chance you have similar services in your environment too.

To help enable such environments, we have several pre-canned configurations with overridable values and adapter layers in place.

Discovery

The first example of this adapter layer in action is how the application finds Discovery information — these are the names and IP addresses of the clusters you want to access. The CLI allows you to choose from a few simple options. For instance, if you have a process that can update a JSON file on disk, you can select “file system.” If instead, you have a REST-based microservice that provides this information, then you can choose “custom” and write a few lines of code necessary to fetch it.

Choosing to discover our data store clusters by reading a local file

Metrics

Another example of this service adapter layer is integration with an external metrics service. We progressively enhance the UI by displaying keyspace and table metrics by implementing a metrics service adapter. These metrics provide insight into which tables are being used at a glance and help our customers make an informed decision when dropping a table.

Without metrics support
With optional metrics support

OSS users can enable the optional Metrics support via the CLI. You then just need to write the custom code to fetch the metrics.

CLI enabling customization of advanced features

i18n Support

While internationalization wasn’t an explicit goal, we discovered that providing Netflix-specific messages in some instances yielded additional value to our internal users. Fundamentally, this is similar to how resource bundles handle different locales.

We are making en-NFLX.ts available internally and en-US.ts available externally. Enterprise customers can enhance their user’s experience by creating custom resource bundles (en-ACME.ts) that link to other tools or enhance default messages. Only a small percentage of the UI and server-side exceptions use these message bundles currently — most commonly to augment messages somehow (e.g., provide links to internal slack channels).

Final Thoughts

We invite you to check out the project and let us know how it works for you. By sharing the Netflix Data Explorer with the OSS community, we hope to help you explore your data and inspire some new ideas.


Exploring Data @ Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Adelphi: Apache Cassandra™ testing goes Cloud Native

Instaclustr’s Support for Cassandra 2.1, 2.2, 3.0, and 3.11

As the official release of Cassandra 4.0 approaches, some of our customers who are still using Cassandra 2.1 and 2.2 have asked us where we stand on support. The Apache Foundation project will end support for the following versions on the following dates:

  • Apache Cassandra 3.11 will fully be maintained until April 30, 2022 and with critical fixes only until April 30, 2023
  • Apache Cassandra 3.0 will be maintained until April 30, 2022 with critical fixes only
  • Apache Cassandra 2.2 is not supported as of April 30, 2021
  • Apache Cassandra 2.1 is not supported as of April 30, 2021

Instaclustr will continue to support these versions for our customers for 12 months beyond these dates. During this period of extended support, it will be limited to critical issues only. This means Instaclustr customers will receive support to the following dates:

  • Apache Cassandra 3.11 will be maintained until April 30, 2024 with critical fixes only
  • Apache Cassandra 3.0 will be maintained until April 30, 2023 with critical fixes only
  • Apache Cassandra 2.2 is not supported as of April 30, 2021
  • Apache Cassandra 2.1 is not supported as of April 30, 2021

The Apache Cassandra project has put in an impressive effort to support these versions thus far, and we’d like to take the opportunity to thank all the contributors and maintainers of these older versions.

Our extended support is provided to enable customers to plan their migrations with confidence. We encourage those of you considering upgrades to explore the significant advantages of Cassandra 4.0 which currently has a beta in preview on our managed platform and is expected to be in full general availability release by June 2021.

If you have any further concerns or wish to discuss migration, please get in touch with your Customer Success representative.

The post Instaclustr’s Support for Cassandra 2.1, 2.2, 3.0, and 3.11 appeared first on Instaclustr.