Going Head-to-Head: Scylla vs Amazon DynamoDB

Going Head-to-Head: Scylla vs Amazon DynamoDB

“And now for our main event! Ladies and gentlemen, in this corner, weighing in at 34% of the cloud infrastructure market, the reigning champion and leader of the public cloud…. Amazon!” Amazon has unparalleled expertise at maximizing scalability and availability for a vast array of customers using a plethora of software products. While Amazon offers software products like DynamoDB, it’s database-as-a-service is only one of their many offerings.

“In the other corner is today’s challenger — young, lightning quick and boasting low-level Big Data expertise… ScyllaDB!” Unlike Amazon, our company focuses exclusively on creating the best database for distributed data solutions.

A head-to-head database battle between Scylla and DynamoDB is a real David versus Goliath situation. It’s Rocky Balboa versus Apollo Creed. Is it possible Scylla could deliver an unexpected knockout punch against DynamoDB? [SPOILER ALERT: Our results will show Scylla has 1/4th the latencies and is only 1/7th the cost of DynamoDB — and this is in the most optimized case for Dynamo. Watch closely as things go south for Dynamo in Round 6. Please keep reading to see how diligent we were in creating a fair test case and other surprise outcomes from our benchmark battle royale.]

To be clear, Scylla is not a competitor to AWS at all. Many of our customers deploy Scylla to AWS, we ourselves find it to be an outstanding platform, and on more than one occasion we’ve blogged about its unique bare metal instances. Here’s further validation — our Scylla Cloud service runs on top of AWS. But we do think we might know a bit more about building a real-time big data database, so we limited the scope of this competitive challenge solely to Scylla versus DynamoDB, database-to-database.

Scylla is a drop-in replacement for Cassandra, implemented from scratch in C++. Cassandra itself was a reimplementation of concepts from the Dynamo paper. So, in a way, Scylla is the “granddaughter” of Dynamo. That means this is a family fight, where a younger generation rises to challenge an older one. It was inevitable for us to compare ourselves against our “grandfather,” and perfectly in keeping with the traditions of Greek mythology behind our name.

If you compare Scylla and Dynamo, each has pros and cons, but they share a common class of NoSQL database: Column family with wide rows and tunable consistency. Dynamo and its Google counterpart, Bigtable, were the first movers in this market and opened up the field of massively scalable services — very impressive by all means.

Scylla is much younger opponent, just 4.5 years in age. Though Scylla is modeled on Cassandra, Cassandra was never our end goal, only a starting point. While we stand on the shoulders of giants in terms of existing design, our proven system programing abilities have come heavily into play and led to performance to the level of a million operations per second per server. We recently announced feature parity (minus transactions) with Cassandra, and also our own database-as-a-service offering, Scylla Cloud.

But for now we’ll focus on the question of the day: Can we take on DynamoDB?

Rules of the Game

With our open source roots, our culture forces us to be fair as possible. So we picked a reasonable benchmark scenario that’s supposed to mimic the requirements of a real application and we will judge the two databases from the user perspective. For the benchmark we used Yahoo! Cloud Serving Benchmark (YCSB) since it’s a cross-platform tool and an industry standard. The goal was to meet a Service Level Agreement of 120K operations per second with a 50:50 read/write split (YCSB’s workload A) with a latency under 10ms in the 99% percentile. Each database would provision the minimal amount of resources/money to meet this goal. Each DB should be populated first with 1 billion rows using the default, 10 column schema of YCSB.

We conducted our tests using Amazon DynamoDB and Amazon Web Services EC2 instances as loaders. Scylla also used Amazon Web Services EC2 instances for servers, monitoring tools and the loaders.

These tests were conducted on Scylla Open Source 2.1, which is the code base for Scylla Enterprise 2018.1. Thus performance results for these tests will hold true across both Open Source and Enterprise. However, we use Scylla Enterprise for comparing Total Cost of Ownership

DynamoDB is known to be tricky when the data distribution isn’t uniform, so we selected uniform distribution to test Dynamo within its sweet spot. We set 3 nodes of i3.8xl for Scylla, with replication of 3 and quorum consistency level, loaded the 1 TB dataset (replicated 3 times) and after 2.5 hours it was over, waiting for the test to begin.

Scylla Enterprise Amazon DynamoDB
Scylla Cluster
  • i3.8xlarge | 32 vCPU | 244 GiB | 4 x 1.9TB NVMe
  • 3-node cluster on single DC | RF=3
  • Dataset: ~1.1TB (1B partitions / size: ~1.1Kb)
  • Total used storage: ~3.3TB
Provisioned Capacity
  • 160K write | 80K read (strong consistency)
  • Dataset: ~1.1TB (1B partitions / size: ~1.1Kb)
  • Storage size: ~1.1 TB (DynamoDB table metrics)
  • Workload-A: 90 min, using 8 YCSB clients, every client runs on its own data range (125M partitions)
  • Loaders: 4 x m4.2xlarge (8 vCPU | 32 GiB RAM), 2 loaders per machine
  • Scylla workloads runs with Consistency Level = QUORUM for writes and reads.
  • Scylla starts with a cold cache in all workloads.
  • DynamoDB workloads ran with dynamodb.consistentReads = true
  • Sadly for DynamoDB, each item weighted 1.1kb – YCSB default schema, thus each write originated in two accesses

Let the Games Begin!

We started to populate Dynamo with the dataset. However, not so fast..

High Rate of InternetServerError

Turns out the population stage is hard on DynamoDB. We had to slow down the population rate time and again, despite it being well within the reserved IOPS. Sometimes we managed to populate up to 0.5 billion rows before we started to receive the errors again.

Each time we had to start over to make sure the entire dataset was saved. We believe DynamoDB needs to break its 10GB partitions through the population and cannot do it in parallel to additional load without any errors. The gory details:

  • Started population with Provisioned capacity: 180K WR | 120K RD.
    • ⚠ We hit errors on ~50% of the YCSB threads causing them to die when using ≥50% of write provisioned capacity.
    • For example, it happened when we ran with the following throughputs:
      • 55 threads per YCSB client = ~140K throughput (78% used capacity)
      • 45 threads per YCSB client = ~130K throughput (72% used capacity)
      • 35 threads per YCSB client = ~96K throughput (54% used capacity)

After multiple attempts with various provisioned capacities and throughputs, eventually a streaming rate was found that permitted a complete database population. Here are the results of the population stage:

YCSB Workload / Description Scylla Open Source 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
100% Write

1B partitions (~1.1Kb)


Overall Throughput(ops/sec): 104K
Avg Load (scylla-server): ~85%

INSERT operations (Avg): 125M
Avg. 95th Percentile Latency (ms): 8.4
Avg. 99th Percentile Latency (ms): 11.3
Overall Throughput(ops/sec): 51.7K
Max Consumed capacity: WR 75%

INSERT operations (Avg): 125M
Avg. 95th Percentile Latency (ms): 7.5
Avg. 99th Percentile Latency (ms): 11.6

Scylla completed the population at twice the speed but more importantly, worked out of the box without any errors or pitfalls.

YCSB Workload A, Uniform Distribution

Finally, we began the main test, the one that gauges our potential user workload with an SLA of 120,000 operations. This scenario is supposed to be DynamoDB’s sweet spot. The partitions are well balanced and the load isn’t too high for DynamoDB to handle. Let’s see the results:

YCSB Workload /
Scylla Open Source 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
Workload A
50% Read / 50% Write

1B partitions (~1.1Kb)


Duration: 90 min.
Overall Throughput(ops/sec): 119.1K
Avg Load (scylla-server): ~58%

READ operations (Avg): ~39.93M
Avg. 95th Percentile Latency (ms): 5.0
Avg. 99th Percentile Latency (ms): 7.2

UPDATE operations (Avg): ~39.93M
Avg. 95th Percentile Latency (ms): 3.4
Avg. 99th Percentile Latency (ms): 5.6
Overall Throughput(ops/sec): 120.1K
Avg Load (scylla-server): ~WR 76% | RD 76%

READ operations (Avg): ~40.53M
Avg. 95th Percentile Latency (ms): 12.0
Avg. 99th Percentile Latency (ms): 18.6

UPDATE operations (Avg): ~40.53M
Avg. 95th Percentile Latency (ms): 13.2
Avg. 99th Percentile Latency (ms): 20.2

After all the effort of loading the data, DynamoDB was finally able to demonstrate its value. DynamoDB met the throughput SLA (120k OPS). However, it failed to meet the latency SLA of 10ms for 99%, but after the population difficulties we were happy to get to this point.

Scylla on the other hand, easily met the throughput SLA, with only 58% load and latency. That was 3x-4x better than DynamoDB and well below our requested SLA. (Also, what you don’t see here is the huge cost difference, but we’ll get to that in a bit.)

We won’t let DynamoDB off easy, however. Now that we’ve seen how DynamoDB performs with its ideal uniform distribution, let’s have a look at how it behaves with a real life use-case.

Real Life Use-case: Zipfian Distribution

A good schema design goal is to have the perfect, uniform distribution of your primary keys. However, in real life, some keys are accessed more than others. For example, it’s common practice to use UUID for the customer or the product ID and to look them up. Some of the customers will be more active than others and some products will be more popular than others, so the differences in access times can go up to 10x-1000x. Developers cannot improve the situation in the general case since if you add an additional column to the primary key in order to improve the distribution, you may improve the specific access but at the cost of complexity when you retrieve the full information about the product/customer. 

Keep in mind what you store in a database. It’s data such as how many people use Quora or how many likes NBA teams have:

With that in mind, let’s see how ScyllaDB and DynamoDB behave given a Zipfian distribution access pattern. We went back to the test case of 1 billion keys spanning 1TB of pre-replicated dataset and queried it again using YCSB Zipfian accesses. It is possible to define the hot set of partitions in terms of volume — how much data is in it — and define the percentile of access for this hot set as part from the overall 1TB set.

We set a variety of parameters for the hot set and the results were pretty consistent – DynamoDB could not meet the SLA for Zipfian distribution. It performed well below its reserved capacity — only 42% utilization — but it could not execute 120k OPS. In fact, it could do only 65k OPS. The YCSB client experienced multiple, recurring ProvisionedThroughputExceededException (code: 400) errors, and throttling was imposed by DynamoDB.

YCSB Workload /
Scylla 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
Workload A
50% Read / 50% Write

1B partitions

Distribution: Zipfian

Duration: 90 min.

Hot set: 10K partitions
Hot set access: 90%
Overall Throughput(ops/sec): 120.2K
Avg Load (scylla-server): ~55%

READ operations (Avg): ~40.56M
Avg. 95th Percentile Latency (ms): 6.1
Avg. 99th Percentile Latency (ms): 8.6

UPDATE operations (Avg): ~40.56M
Avg. 95th Percentile Latency (ms): 4.4
Avg. 99th Percentile Latency (ms): 6.6
Overall Throughput(ops/sec): 65K
Avg Load (scylla-server): ~WR 42% | RD 42%

READ operations (Avg): ~21.95M
Avg. 95th Percentile Latency (ms): 6.0
Avg. 99th Percentile Latency (ms): 9.2

UPDATE operations (Avg): ~21.95M
Avg. 95th Percentile Latency (ms): 7.3
Avg. 99th Percentile Latency (ms): 10.8

Why can’t DynamoDB meet the SLA in this case? The answer lies within the Dynamo model. The global reservation is divided to multiple partitions, each no more than 10TB in size.

DynamoDB partition equations

This when such a partition is accessed more often it may reach its throttling cap even though overall you’re well within your global reservation. In the example above, when reserving 200 writes, each of the 10 partitions cannot be queried more than 20 writes/s

The Dress that Broke DynamoDB

If you asked yourself, “Hmmm, is 42% utilization the worst I’d see from DynamoDB?” we’re afraid we have some bad news for you. Remember the dress that broke the internet? What if you have an item in your database that becomes extremely hot? To explore this, we tested a single hot partition access and compared it.

The Dress that Broke the Internet

We ran a single YCSB, working on a single partition on a 110MB dataset (100K partitions). During our tests, we observed a DynamoDB limitation when a specific partition key exceeded 3000 read capacity units (RCU) and/or 1000 write capacity units (WCU).

Even when using only ~0.6% of the provisioned capacity (857 OPS), the YCSB client experienced ProvisionedThroughputExceededException (code: 400) errors, and throttling was imposed by DynamoDB (see screenshots below).

It’s not that we recommend not planning for the best data model. However, there will always be cases when your plan is far from reality. In the Scylla case, a single partition still performed reasonably well: 20,200 OPS with good 99% latency.

Scylla vs DynamoDB – Single (Hot) Partition

YCSB Workload /
Scylla 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
Workload A
50% Read / 50% Write


Single partition (~1.1Kb)

Distribution: Uniform

Duration: 90 min.
Overall Throughput(ops/sec): 20.2K
Avg Load (scylla-server): ~5%

READ operations (Avg): ~50M
Avg. 95th Percentile Latency (ms): 7.3
Avg. 99th Percentile Latency (ms): 9.4

UPDATE operations (Avg): ~50M
Avg. 95th Percentile Latency (ms): 2.7
Avg. 99th Percentile Latency (ms): 4.5
Overall Throughput(ops/sec): 857
Avg Load (scylla-server): ~WR 0.6% | RD 0.6%

READ operations (Avg): ~2.3M
Avg. 95th Percentile Latency (ms): 5.4
Avg. 99th Percentile Latency (ms): 10.7

UPDATE operations (Avg): ~2.3M
Avg. 95th Percentile Latency (ms): 7.7
Avg. 99th Percentile Latency (ms): 607.8
Screenshot 1: Single partition.

Screenshot 1: Single partition. Consumed capacity: ~0.6% -> Throttling imposed by DynamoDB

Additional Factors

Cross-region Replication and Global Tables

We compared the replication speed between datacenters and a simple comparison showed that DynamoDB replicated in 370ms on average to a remote DC while Scylla’s average was 82ms. Since the DynamoDB cross-region replication is built on its streaming api, we believe that when congestion happens, the gap will grow much further into a multi-second gap, though we haven’t yet tested it.

Beyond replication propagation, there is a more burning functional difference — Scylla can easily add regions on demand at any point in the process with a single command:

ALTER KEYSPACE mykespace WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor': '3', '<exiting_dc>' : 3, <new_dc> : 4};

In DynamoDB, on the other hand, you must define your global tables ahead of time. This imposes a serious usability issue and a major cost one as you may need to grow the amount of deployed datacenters over time.

Why start with global Tables..? (quote)

Explicit Caching is Expensive and Bad for You

DynamoDB performance can improve and its high cost can be reduced in some cases when using DAX. However, Scylla has a much smarter and more efficient embedded cache (the database nodes have memory, don’t they?) and the outcome is far better for various reasons we described in a recent blog post.


This is another a major advantage of Scylla — DynamoDB locks you to the AWS cloud, significantly decreasing your chances of ever moving out. Data gravity is significant. No wonder they’re going after Oracle!

Scylla is an open source database. You have the freedom to choose between our community version, an Enterprise version and our new fully managed service. Scylla runs on all major cloud providers and opens the opportunity for you to run some datacenters on one provider and others on another provider within the same cluster. One of our telco customers is a great example of the hybrid model — they chose to run some of their datacenters on-premise and some on AWS.

Our approach for “locking-in” users is quite different — we do it solely by the means of delivering quality and value such that you won’t want to move away from us. As of today, we have experienced exactly zero customer churn.

No Limits

DynamoDB imposes various limits on the size of each cell — only 400kb. In Scylla you can effectively store megabytes. One of our customers built a distributed storage system using Scylla, keeping large blobs in Scylla with single-digit millisecond latency for them too.

Another problematic limit is the sort key amount, DynamoDB cannot hold more than 10GB items. While this isn’t a recommended pattern in Scylla either, we have customers who keep 130GB items in a single partition. The effect of these higher limits is more freedom in data modeling and fewer reasons to worry. 

Total Cost of Ownership (TCO)

We’re confident the judges would award every round of this battle to Scylla so far, and we haven’t even gotten to comparing the total cost of ownership. The DynamoDB setup, which didn’t even meet the required SLA and which caused us to struggle multiple times to even get working, costs 7 times more than the comparable Scylla setup.

Scylla Enterprise
(3 x i3.8xlarge + Scylla Enterprise license)
Amazon DynamoDB
(160K write | 80K Read + Business-level Support)

Year-term Estimated Cost: ~$71K

Year-term Estimated Cost: ~$524K

  • DynamoDB 1-year term: ~$288K
  • Monthly fee : ~$19.7K/month (~236K annual)

Note that only 3 machines were needed for Scylla; not much of a challenge in terms of administration. And, as we mentioned earlier, you can offload all your database administration with our new fully managed cloud service, Scylla Cloud. (By the way, Scylla Cloud comes in at 4-6x less expensive than DynamoDB, depending on the plan.)

Final Decision: A Knockout!

Uniform 99% ms Latency
Zipfian Distribution Throughput
  • DynamoDB failed to achieve the required SLA multiple times, especially during the population phase.
  • DynamoDB has 3x-4x the latency of Scylla, even under ideal conditions
  • DynamoDB is 7x more expensive than Scylla
  • Dynamo was extremely inefficient in a real-life Zipfian distribution. You’d have to buy 3x your capacity, making it 20x more expensive than Scylla
  • Scylla demonstrated up to 20x better throughput in the hot-partition test with better latency numbers
  • Last but not least, Scylla provides you freedom of choice with no cloud vendor lock-in (as Scylla can be run on various cloud vendors, or even on-premises).

Still not convinced? Listen to what our users have to say.

If you’d like to try your own comparison, remember that our product is open source. Feel free to download now. We’d love to hear from you if you have any questions about how we stack up or if you’d like to share your own results. And we’ll end with a final reminder that our Scylla Cloud (now available in Early Access) is built on Scylla Enterprise, delivering similar price-performance advantages while eliminating administrative overhead.

The post Going Head-to-Head: Scylla vs Amazon DynamoDB appeared first on ScyllaDB.

Scylla Manager 1.3 Release Announcement

Scylla Manager Release

The Scylla Enterprise team is pleased to announce the release of Scylla Manager 1.3, a production-ready release of Scylla Manager for Scylla Enterprise customers.

Scylla Manager 1.3 adds a new Health Check, which works as follows.. Scylla nodes are already reporting on their status through “nodetool status” and via Scylla Monitoring Stack dashboards; but in some cases, it is not enough. A node might report an Up-Normal (UN) status, while in fact, it is slow or not responding to CQL requests. This might be a result of an internal problem in the node, or an external issue (for example, a blocked CQL port somewhere between the application and the Scylla node).

Scylla Manager’s new Health Check functionality helps identify such issues as soon as possible, playing a similar role to an application querying the CQL interface from outside the Scylla cluster.

Scylla Manager 1.3 automatically adds a new task to each a new managed cluster. This task is a health check which sends a CQL OPTION command to each Scylla node and measures the response time. If there is a response faster than 250ms the node is considered to be ‘up’. If there is no response or the response takes longer than 250 ms, the node is considered to be ‘down’. The results are available using the “sctool status” command.

Scylla Manager 1.3 Architecture, including the Monitoring Stack, and the new CQL base Health Check interface to Scylla nodes.

If you have enabled the Scylla Monitoring stack, Monitoring stack 2.0 Manager dashboard includes the same cluster status report. A new Alert was defined in Prometheus Alert Manager, to report when a Scylla node health check fails and the node is considered ‘down’.

Example of Manager 1.3 Dashboard, including an active repair running, and Health Check reports of all nodes responding to CQL.

Related links:

Upgrade to Scylla Manager 1.3

Read the upgrade guide carefully. In particular, you will need to redefine scheduled repairs. Please contact Scylla Support team for help in installing and upgrading Scylla Manager.


Scylla Grafana Monitoring 2.0 now includes the Scylla Manager 1.3 dashboard

About Scylla Manager

Scylla Manager adds centralized cluster administration and recurrent task automation to Scylla Enterprise. Scylla Manager 1.x includes automation of periodic repair. Future releases will provide rolling upgrades, recurrent backup, and more. With time, Scylla Manager will become the focal point of Scylla Enterprise cluster management, including a GUI front end. Scylla Manager is available for all Scylla Enterprise customers. It can also be downloaded from scylladb.com for a 30-day trial.

The post Scylla Manager 1.3 Release Announcement appeared first on ScyllaDB.

Rolling Reboots with cstarpar

Welcome to the third post in our cstar series. So far, the first post gave an introduction to cstar, while the second post explained how to extend cstar with custom commands. In this post we will look at cstar’s cousin cstarpar. Both utilities deliver the same topology-aware orchestration, yet cstarpar executes commands locally, allowing operations cstar is not capable of.

Using ssh

cstarpar relies heavily on ssh working smoothly and without any user prompts. When we run a command with cstar, it will take the command, ssh into the remote host, and execute the command on our behalf. For example, we can run hostname on each node of a 3-node cluster:

$ cstar run --seed-host --command hostname
$ cat ~/.cstar/jobs/8ff6811e-31e7-4975-bec4-260eae885ef6/ec2-*/out

If we switch to cstarpar, it will execute the hostname command locally and we will see something different:

$ cstarpar --seed-host hostname
$ cat ~/.cstar/jobs/a1735406-ae58-4e44-829b-9e8d4a90fd06/ec2-*/out

To make cstarpar execute commands on remote machines we just need to make the command explicitly use ssh:

$ cstarpar --seed-host "ssh {} hostname"
cat ~/.cstar/jobs/2c54f7a1-8982-4f2e-ada4-8b45cde4c4eb/ec2-*/out

Here we can see the hostname was executed on the remote hosts.


The true advantage of local execution is that there is no need for interaction with the remote host. This approach allows operations that would normally prevent that interaction, such as reboots. For example, the following command reboots the entire cluster in a topology-aware fashion, albeit very roughly because it gracelessly kills all processes, including Cassandra:

$ cstarpar --seed-host -- "ssh {} sudo reboot &"

Note that this example used the sudo reboot & command. The reboot command on its own causes the reboot immediately. This is so drastic that it causes Python’s subprocess module to think an error occured. Placing the & after the command, directing to run the command in the background, allows the shell execution return back to Python cleanly. Once the host is down, cstarpar will mark the host as such in the job status report.

It is important to ensure the hosts are configured to start the Cassandra process automatically after the reboot, because just like cstar, cstartpar will proceed with next hosts only if all hosts are up and will otherwise wait indefinitely for the rebooted host to come back.

Since cstarpar can execute local commands and scripts, it need not support complex commands in the same way cstar does. To run a complex command with cstarpar, we can use a script file. To illustrate this, the script below will add a graceful shutdown of Cassandra before executing the actual reboot:

$ cat ~/gentle_reboot.sh

echo "Draining Cassandra"
ssh ${FQDN} nodetool drain && sleep 5

echo "Stopping Cassandra process"
ssh ${FQDN} sudo service cassandra stop && sleep 5

echo "Rebooting"
ssh ${FQDN} sudo reboot &

The reboot command then runs like this:

$ cstarpar --seed-host -- "bash /absolute/path/to/gentle_reboot.sh {}"

Replication and Conclusion

For this post, I used a simple three node cluster provisioned with tlp-cluster. cstarpar relies heavily on ssh working smoothly and without user prompts. Initially, I attempted the connection without any specific ssh configuration on my laptop or the AWS hosts, the ssh calls looked like this:

$ cstarpar --seed-host ${SEED_IP} --ssh-identity-file=${PATH_TO_KEY}  --ssh-username ubuntu "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@{} hostname"

In the gentle_reboot.sh I also had to add some options:

ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i ${PATH_TO_KEY} ubuntu@${FQDN} sudo reboot &

Once configured, I was able to harness the full power of cstarpar, which supplements cstar functionality by executing commands locally. This was demonstrated to be useful for operations for which the cstar’s mode of operation is not well suited, such as reboots. Importantly, to leverage the most value from cstarpar, it is critical to have ssh configured to run smoothly and without any user prompts.

Worry-Free Ingestion: Flow Control of Writes in Scylla

Scylla Flow Control

This blog post is based on a talk I gave last month at the third annual Scylla Summit in San Francisco. It explains how Scylla ensures that ingestion of data proceeds as quickly as possible, but not quicker. It looks into the existing flow-control mechanism for tables without materialized views, and into the new mechanism for tables with materialized views, which is introduced in the upcoming Scylla Open Source 3.0 release.


In this post we look into ingestion of data into a Scylla cluster. What happens when we make a large volume of update (write) requests?

We would like the ingestion to proceed as quickly as possible but without overwhelming the servers. An over-eager client may send write requests faster than the cluster can complete earlier requests. If this is only a short burst of requests, Scylla can absorb the excess requests in a queue or numerous queues distributed throughout the cluster (we’ll look at the details of these queues below). But had we allowed the client to continue writing at this excessive rate, the backlog of uncompleted writes would continue to grow until the servers run out of memory and possibly crash. So as the backlog grows, we need to find a way for the server to tell the client to slow down its request rate. If we can’t slow down the client, we have to start failing new requests.

Cassandra’s CQL protocol does not offer any explicit flow-control mechanisms for the server to slow down a client which is sending requests faster than the server can handle them. We only have two options to work with: delaying replies to the client’s requests, and failing them. How we can use these two options depends on what drives the workload: We consider two different workload models — a batch workload with bounded concurrency, and an interactive workload with unbounded concurrency:

  1. In a batch workload, a client application wishes to drive the server at 100% utilization for a long time, to complete some predefined amount of work. There is a fixed number of client threads, each running a request loop: preparing some data, making a write request, and waiting for its response. The server can fully control the request rate by rate-limiting (delaying) its replies: If the server only sends N replies per second, the client will only send N new requests per second. We call this rate-limiting of replies, or throttling.

  2. In an interactive workload, the client sends requests driven by some external events (e.g., activity of real users). These requests can come at any rate, which is unrelated to the rate at which the server completes previous requests. For such a workload, if the request rate is at or below the cluster’s capacity, everything is fine and the request backlog will be mostly empty. But if the request rate is above the cluster’s capacity, the server has no way of slowing down these requests and the backlog grows and grows. If we don’t want to crash the server (and of course, we don’t), we have no choice but to return failure for some of these requests.

    When we do fail requests, it’s also important how we fail: We should fail fresh new, not yet handled, client requests. It’s a bad idea to fail requests to which we had already devoted significant work — if the server spends valuable CPU time on requests which will end up being failed anyway, throughput will lower. We use the term admission control for a mechanism which fails a new request when it believes the server will not have the resources needed to handle the request to completion.

For these reasons Scylla utilizes both throttling and admission control. Both are necessary. Throttling is a necessary part of handling normal batch workloads, and admission control is needed for unexpected overload situations. In this post, we will focus on the throttling part.

We sometimes use the term backpressure to describe throttling, which metaphorically takes the memory “pressure” (growing queues) which the server is experiencing, and feeds it back to the client. However, this term may be confusing, as historically it was used for other forms of flow control, not for delaying replies as a mechanism to limit the request rate. In the rest of this document I’ll try to avoid the term “backpressure” in favor of other terms like throttling and flow control.

Above we defined two workload models — interactive and and batch workloads. We can, of course, be faced by a combination of both. Moreover, even batch workloads may involve several independent batch clients, starting at different times and working with different concurrencies. The sum of several such batch workloads can be represented as one batch workload with a changing client concurrency. E.g., a workload can start with concurrency 100 for one minute, then go to concurrency 200 for another minute, etc. Our flow control algorithms need to reasonably handle this case as well, and react to a client’s changing concurrency. As an example, consider that the client doubled the number of threads. Since the total number of writes the server can handle per second remains the same, now each client thread will need to send requests at half the rate it sent earlier when there were just half the number of threads.

The problem of background writes

Let’s first look at writes to regular Scylla tables which do not have materialized views. Later we can see how materialized views further complicate matters.

A client sends an update (a write request) to a coordinator node, which sends the update to RF replicas (RF is the replication factor — e.g., 3). The coordinator then waits for first CL (consistency level — e.g., 2) of those writes to have completed, at which point it sends a reply to the client, saying that the desired consistency-level has been achieved. The remaining ongoing writes to replicas (RF-CL — in the above examples =1 remaining write) will then continue “in the background”, i.e., after the response to the client, and without the client waiting for them to finish.

The problem with these background writes is that a batch workload, upon receiving the server’s reply, will send a new request before these background writes finish. So if new writes come in faster than we can finish background writes, the number of these background writes can grow without bound. But background writes take memory, so we cannot allow them to grow without bound. We need to apply some throttling to slow the workload down.

The slow node example

Before we explain how Scylla does this throttling, it is instructive to look at one concrete — and common — case where background writes pile up and throttling becomes necessary.

This is the case where one of the nodes happens to be, for some reason, consistently slower than the others. It doesn’t have to be much slower — even a tiny bit slower can cause problems:

Consider, for example, three nodes and a table with RF=3, i.e., all data is replicated on all three nodes, so all writes need to go to all three. Consider than one node is just 1% slower: Two of the nodes can complete 10,000 replica writes per second, while the third can only complete 9,900 replica writes per second. If we do CL=2 writes, then every second 10,000 of these writes can complete after node 1 and 2 completed their work. But since node 3 can only finish 9,900 writes in this second, we will have added 100 new “background writes” waiting for the write to node 3 to complete. We will continue to accumulate 100 additional background writes each second and, for example, after 100 seconds we will have accumulated 10,000 background writes. And this will continue until we run out of memory, unless we slow down the client to only 9,900 writes per second (and in a moment, we’ll explain how). It is possible to demonstrate this and similar situations in real-life Scylla clusters. But to make it easier to play with different scenarios and flow-control algorithms, we wrote a simple simulator. In the simulator we can exactly control the client’s concurrency, the rate at which each replica completes write requests, and then graph the lengths of the various queues, the overall write performance, and so on, and investigate how those respond to different throttling algorithms.

In our simple “slow node” example, we see the following results from the simulator:

Simulator Results, Figure 1

Simulator Results 2

In the top graph, we see that a client with fixed concurrency (arbitrarily chosen as 50 threads) writing with CL=2 will, after a short burst, get 10,000 replies each second, i.e., the speed of the two fastest nodes. But while staying at that speed, we see in the bottom graph that the backlog of background writes grows continuously — 100 every second, as we suspected. We need to slow down the client to curb this growth.

It’s obvious from the description above that any consistent difference in node performance, even much smaller than 1%, will eventually cause throttling to be needed to avoid filling the entire memory with backlogged writes. In real-life such small performance differences do happen in clouds, e.g., because some of the VMs have busier “neighbors” than others.

Throttling to limit background writes

Scylla applies a simple, but effective, throttling mechanism: When the total amount of memory that background writes are currently using goes over some limit — currently 10% of the shard’s memory — the coordinator starts throttling the client by no longer moving writes from foreground to background mode. This means that the coordinator will only reply when all RF replica writes have completed, with no additional work left in the background. When this throttling is on, the backlog of background writes does not continue to grow, and replies are only sent at the rate we can complete all the work, so a batch workload will slow down its requests to the same rate.

It is worth noting that when throttling is needed, the queue of background writes will typically hover around its threshold size (e.g., 10% of memory). When a flow-control algorithm always keeps a full queue, it is said to suffer from the bufferbloat problem. The typical bufferbloat side-effect is increased latency, but happily in our case this is not an issue: The client does not wait for the background writes (since the coordinator has already returned a reply), so the client will experience low latency even when the queue of background writes is full. Nevertheless, the full queue does have downsides: it wastes memory and it prevents the queue from absorbing writes to a node that temporarily goes down.

Let’s return to our “slow node” simulation from above, and see how this throttling algorithm indeed helps to curb the growth of the backlog of background writes:

Simulator Results 3

Simulator Results 4

As before, we see in the top graph that the server starts by sending 10,000 replies per second, which is the speed of the two fastest nodes (remember we asked for CL=2). At that rate, the bottom graph shows we are accruing a backlog of 100 background writes per second, until at time 3, the backlog has grown to 300 items. In this simulation we chose 300 as background write limit (representing the 10% of the shard’s memory in real Scylla). So at that point, as explained above, the client is throttled by having its writes wait for all three replica writes to complete. Those will only complete at rate of 9,900 per second (the rate of the slowest node), so the client will slow down to this rate (top graph, starting from time 3), and the background write queue will stop growing (bottom graph). If the same workload continues, the background write queue will remain full (at the threshold 300) — if it temporarily goes below the threshold, throttling is disabled and the queue will start growing back to the threshold.

The problem of background view updates

After understanding how Scylla throttles writes to ordinary tables, let’s look at how Scylla throttles writes to materialized views. Materialized views were introduced in Scylla 2.0 as an experimental feature — please refer to this blog post if you are not familiar with them. They are officially supported in Scylla Open Source Release 3.0, which also introduces the throttling mechanism we describe now, to slow down ingestion to the rate at which Scylla can safely write the base table and all its materialized views.

As before, a client sends a write requests to a coordinator, and the coordinator sends them to RF (e.g., 3) replica nodes, and waits for CL (e.g., 2) of them to complete, or for all of them to complete if the backlog of background write reached the limit. But when the table (also known as the base table) has associated materialized views, each of the base replicas now also sends updates to one or more paired view replicas — other nodes holding the relevant rows of the materialized views.

The exact details of which updates we send, where, and why is beyond the scope of this post. But what is important to know here is that the sending of the view updates always happens asynchronously — i.e., the base replica doesn’t wait for it, and therefore the coordinator does not wait for it either — only the completion of enough writes to the base replicas will determine when the coordinator finally replies to the client.

The fact that the client does not wait for the view updates to complete has been a topic for heated debate ever since the materialized view feature was first designed for Cassandra. The problem is that if a base replica waits for updates to several view replicas to complete, this hurts high availability which is a cornerstone of Cassandra’s and Scylla’s design.

Because the client does not wait for outstanding view updates to complete, their number may grow without bound and use unbounded amounts of memory on the various nodes involved — the coordinator, the RF base replicas and all the view replicas involved in the write. As in the previous section, here too we need to start slowing down the client, until the rate when the system completes background work at the same rate as new background work is generated.

To illustrate the problem Scylla needed to solve, let’s use our simulator again to look at a concrete example, continuing the same scenario we used above. Again we have three nodes, RF=3, client with 50 threads writing with CL=2. As before two nodes can complete 10,000 base writes per second, and the third only 9,900. But now we introduce a new constraint: the view updates add considerable work to each write, to the point that the cluster can now only complete 3,000 writes per second, down from the 9,900 it could complete without materialized views. The simulator shows us (top graph below) that, unsurprisingly, without a new flow-control mechanism for view writes the client is only slowed down to 9,900 requests per second, not to 3,000. The bottom graph shows that at this request rate, the memory devoted to incomplete view writes just grows and grows, by as many as 6,900 (=9,900-3,000) updates per second:

Simulator Results 5

Simulator Results 6

So, what we need now is to find a mechanism for the coordinator to slow down the client to exactly 3,000 requests per second. But how do we slow down the client, and how does the coordinator know that 3,000 is the right request rate?

Throttling to limit background view updates

Let us now explain how Scylla 3.0 throttles the client to limit the backlog of view updates. We begin with two key insights:

  1. To slow down a batch client (with bounded concurrency), we can add an artificial delay to every response. The longer the delay is, the lower the client’s request rate will become.
  2. The chosen delay influences the size of the view-update backlog: Picking a higher delay slows down the client and slows the growth of the view update backlog, or even starts reducing it. Picking a lower delay speeds up the client and increases the growth of the backlog.

Basically, our plan is to devise a controller, which changes the delay based on the current backlog, trying to keep the length of the backlog in a desired range. The simplest imaginable controller, a linear function, works amazingly well:

(1) delay = α ⋅ backlog

Here α is any constant. Why does this deceptively-simple controller work?

Remember that if delay is too small, backlog starts increasing, and if delay is too large, the backlog starts shrinking. So there is some “just right” delay, where the backlog size neither grows nor decreases. The linear controller converges on exactly this just-right delay:

  1. If delay is lower than the just-right one, the client is too fast, the backlog increases, so according to our formula (1), we will increase delay.
  2. If delay is higher than the just-right one, the client is too slow, the backlog shrinks, so according to (1), we will decrease delay.

Let’s add to our simulator the ability to delay responses by a given delay amount, and to vary this delay according to the view update backlog in the base replicas, using formula (1). The result of this simulation looks like this:

Simulator Results 7

Simulator Results 8

In the top graph, we see the client’s request rate gradually converging to exactly the request rate we expected: 3,000 requests per second. In the bottom graph, the backlog length settles on about 1600 updates. The backlog then stops growing any more — which was our goal.

But why did the backlog settle on 1600, and not on 100 or 1,000,000? Remember that the linear control function (1) works for any α. In the above simulation, we took α = 1.0 and the result was convergence on backlog=1600. If we change α, the delay to which we converge will still have to be the same, so (1) tells us that, for example, if we double α to 2.0, the converged backlog will halve, to 800. In this manner, if we gradually change α we can reach any desired backlog length. Here is an example, again from our simulator, where we gradually changed α with the goal of reaching a backlog length of 200:

Simulator Results 9

Simulator Results 10

Indeed, we can see in the lower graph that after over-shooting the desired queue length 200 and reaching 700, the controller continues to increase to decrease the backlog, until the backlog settles on exactly the desired length — 200. In the top graph we see that as expected, the client is indeed slowed down to 3,000 requests per second. Interestingly in this graph, we also see a “dip”, a short period where the client was slowed down even further, to just 2,000 requests per second. The reason for this is easy to understand: The client starts too fast, and a backlog starts forming. At some point the backlog reached 700. Because we want to decrease this backlog (to 200), we must have a period where the client sends less than 3,000 requests per second, so that the backlog would shrink.

In controller-theory lingo, the controller with the changing α is said to have an integral term: the control function depends not just on the current value of the variable (the backlog) but also on the previous history of the controller.

In (1), we considered the simplest possible controller — a linear function. But the proof above that it converges on the correct solution did not rely on this linearity. The delay can be set to any other monotonically-increasing function of the backlog:

(2) delay = f(backlog / backlog0 delay0

(where backlog0 is a constant with backlog units, and delay0 is a constant with time units).

In Scylla 3.0 we chose this function to be a polynomial, selected to allow relatively-high delays to be reached without requiring very long backlogs in the steady state. But we do plan to continue improving this controller in future releases.


A common theme in Scylla’s design, which we covered in many previous blog posts, is the autonomous database, a.k.a. zero configuration. In this post we covered another aspect of this theme: When a user unleashes a large writing job on Scylla, we don’t want him or her to need to configure the client to use a certain speed or risk overrunning Scylla. We also don’t want the user to need to configure Scylla to limit an over-eager client. Rather, we want everything to happen automatically: The write job should just just run normally without any artificial limits, and Scylla should automatically slow it down to exactly the right pace — not too fast that we start piling up queues until we run out of memory, but also not too slow that we let available resources go to waste.

In this post, we explained how Scylla throttles (slows down) the client by delaying its responses, and how we arrive at exactly the right pace. We started with describing how throttling works for writes to ordinary tables — a feature that had been in Scylla for well over a year. We then described the more elaborate mechanisms we introduce in Scylla 3.0 for throttling writes to tables with materialized views. For demonstration purposes, we used a simulator for the different flow-control mechanisms to better illustrate how they work. However, these same algorithms have also been implemented in Scylla itself — so go ahead and ingest some data! Full steam ahead!

Flow Control Finish

The post Worry-Free Ingestion: Flow Control of Writes in Scylla appeared first on ScyllaDB.

Step by Step Monitoring Cassandra with Prometheus and Grafana

In this blog, I’m going to give a detailed guide on how to monitor a Cassandra cluster with Prometheus and Grafana.

For this, I’m using a new VM which I’m going to call “Monitor VM”. In this blog post, I’m going to work on how to install the tools. In a second one, I’m going to go through the details on how to do use and configure Grafana dashboards to get the most out of your monitoring!

High level plan

Monitor VM

  1. Install Prometheus
  2. Configure Prometheus
  3. Install Grafana

Cassandra VMs

  1. Download prometheus JMX-Exporter
  2. Configure JMX-Exporter
  3. Configure Cassandra
  4. Restart Cassandra

Detailed Plan

Monitor VM

Step 1. Install Prometheus

$ wget https://github.com/prometheus/prometheus/releases/download/v2.3.1/prometheus-2.3.1.linux-amd64.tar.gz
  $ tar xvfz prometheus-*.tar.gz
  $ cd prometheus-*

Step 2. Configure Prometheus

        $ vim /etc/prometheus/prometheus.yaml
    scrape_interval: 15s

  # Cassandra config
    - job_name: 'cassandra'
      scrape_interval: 15s
        - targets: ['cassandra01:7070', 'cassandra02:7070', 'cassandra03:7070']

Step 3. Create storage and start Prometheus

  $ mkdir /data
  $ chown prometheus:prometheus /data
  $ prometheus --config.file=/etc/prometheus/prometheus.yaml

Step 4. Install Grafana

  $ wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana_5.1.4_amd64.deb
  $ sudo apt-get install -y adduser libfontconfig
  $ sudo dpkg -i grafana_5.1.4_amd64.deb

Step 5. Start Grafana

  $ sudo service grafana-server start

Cassandra Nodes

Step 1. Download JMX-Exporter:

  $ mkdir /opt/jmx_prometheus
  $ wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.3.0/jmx_prometheus_javaagent-0.3.0.jar

Step 2. Configure JMX-Exporter

  $ vim /opt/jmx_prometheus/cassandra.yml
  lowercaseOutputName: true
  lowercaseOutputLabelNames: true
  whitelistObjectNames: [
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(Connection|Streaming), scope=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Value)
      name: cassandra_$1_$3
        address: "$2"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(ColumnFamily), name=(RangeLatency)&amp;gt;&amp;lt;&amp;gt;(Mean)
      name: cassandra_$1_$2_$3
    - pattern: org.apache.cassandra.net&amp;lt;type=(FailureDetector)&amp;gt;&amp;lt;&amp;gt;(DownEndpointCount)
      name: cassandra_$1_$2
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(Keyspace), keyspace=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Mean|95thPercentile)
      name: cassandra_$1_$3_$4
        "$1": "$2"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(Table), keyspace=(\S*), scope=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Mean|95thPercentile)
      name: cassandra_$1_$4_$5
        "keyspace": "$2"
        "table": "$3"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(ClientRequest), scope=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Mean|95thPercentile)
      name: cassandra_$1_$3_$4
        "type": "$2"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(\S*)(?:, ((?!scope)\S*)=(\S*))?(?:, scope=(\S*))?,
      name: cassandra_$1_$5
        "$1": "$4"
        "$2": "$3"

Step 3. Configure Cassandra

  echo 'JVM_OPTS="$JVM_OPTS -javaagent:/opt/prometheus-exporter/jmx_prometheus_javaagent-0.3.0.jar=7070:/opt/prometheus-exporter/cassandra.yaml"' &amp;gt;&amp;gt; conf/cassandra-env.sh

Step 4. Restart Cassandra

  $ nodetool flush
  $ nodetool drain
  $ sudo service cassandra restart

And now, if you have no errors (and you shouldn’t!) your Prometheus is ingesting your Cassandra metrics!

Wait for the next blog post where I will guide you through a good Grafana configuration!

Introducing Transient Replication

Transient Replication is a new experimental feature soon to be available in 4.0. When enabled, it allows for the creation of keyspaces where replication factor can be specified as a number of copies (full replicas) and temporary copies (transient replicas). Transient replicas retain the data they replicate only long enough for it to be propagated to full replicas, via incremental repair, at which point the data is deleted. Writing to transient replicas can be avoided almost entirely if monotonic reads are not required because it is possible to achieve a quorum of acknowledged writes without them.

This results in a savings in disk space, CPU, and IO. By deleting data as soon as it is no longer needed, transient replicas require only a fraction of the disk space of a full replica. By not having to store the data indefinitely, the CPU and IO required for compaction is reduced, and read queries are faster as they have less data to process.

So what are the benefits of not actually keeping a full copy of the data? Well, for some installations and use cases, transient replicas can be almost free if monotonic reads are disabled. In future releases where monotonic reads are supported with Transient Replication, enabling monotonic reads would reduce the savings in CPU and IO, but even then they should still be significant.

Transient Replication is designed to be transparent to applications:

  • Consistency levels continue to produce the same results for queries.
  • The number of replicas that can be lost before data loss occurs is unchanged.
  • The number of replicas that can be unavailable before some queries start to timeout or return unavailable is unchanged (with the exception of ONE).

With Transient Replication, you can go from 3 replicas to 5 replicas, two of which are transient, without adding any hardware.

If you are running an active-passive 2 DC setup with 3 replicas in each DC, you can make one replica in each DC transient and still have four full copies of the data in total.

Feature support

Transient Replication is not intended to fully replace Cassandra’s existing approach to replication. There are features that currently don’t work with transiently replicated keyspaces and features that are unlikely ever to work with them.

You can have keyspaces with and without Transient Replication enabled in the same cluster, so it is possible to use Transient Replication for just the use cases that are a good fit for the currently available functionality.

Currently unsupported but coming:

  • Monotonic reads
  • Batch log
  • LWT
  • Counters

Will never be supported:

  • Secondary indexes
  • Materialized views

How Transient Replication works


Transient replication extends Cassandra’s existing consistent hashing algorithm to designate some replicas of a point or range on the consistent hash ring as transient and some as full. The following image depicts a consistent hash ring with three replicas A, B, and C. The replicas are located at tokens 5, 10, 15 respectively. A key k hashes to token 3 on the ring.

A consistent hash ring without Transient Replication

Replicas are selected by walking the ring clockwise starting at the point on the ring the key hashes to. At RF=3, the replicas of key k **are **A, B, C. With Transient Replication, the last N replicas (where N is the configured number of transient replicas) found while walking the ring are designated as transient.

There are no nodes designated as transient replicas or full replicas. All nodes will fully replicate some ranges on the ring and transiently replicate others.

The following image depicts a consistent hash ring at RF=3/1 (three replicas, one of which is transient). The replicas of k are still A, B, and C, but C is now transiently replicating k.

A consistent hash ring with Transient Replication

Normally all replicas of a range receive all writes for that range, as depicted in the following image.

Normal write behavior

Transient replicas do not receive writes in the normal write path.

Transient write behavior

If sufficient full replicas are unavailable, transient replicas will receive writes.

Transient write with unavailable node

This optimization, which is possible with Transient Replication, is called Cheap Quorums. This minimizes the amount of work that transient replicas have to do at write time, and reduces the amount of background compaction they will have to do.

Cheap Quorums and monotonic reads: Cheap Quorums may end up being incompatible with an initial implementation of monotonic reads, and operators will be able to make a conscious trade off between performance and monotonic reads.

Rapid write protection

In keyspaces utilizing Transient Replication, writes are sent to every full replica and enough transient replicas to meet the requested consistency level (to make up for unavailable full replicas). In addition, enough transient replicas are selected to reach a quorum in every datacenter, though unless the consistency level requires it, the write will be acknowledged without ensuring all have been delivered.

Because not all replicas are sent the write, it’s possible that insufficient replicas will respond, causing timeouts. To prevent this, we implement rapid write protection, similar to rapid read protection, that sends writes to additional replicas if sufficient acknowledgements to meet the consistency level are not received promptly.

The following animation shows rapid write protection in action.

Animation of rapid write protection preventing a write timeout

Rapid write protection is configured similarly to rapid read protection using the table option additional_write_policy. The policy determines how long to wait for acknowledgements before sending additional mutations. The default is to wait for P99 of the observed latency.

Incremental repair

Incremental repair is used to clean up transient data at transient replicas and propagate it to full replicas.

When incremental repair occurs transient replicas stream out transient data, but don’t receive any. Anti-compaction is used to separate transient and fully replicated data so that only fully replicated data is retained once incremental repair completes.

The result of running an incremental repair is that all full replicas for a range are synchronized and can be used interchangeably to retrieve the repaired data set for a query.

Read path

Reads must always include at least one full replica and can include as many replicas (transient or full) as necessary to achieve the desired consistency level. At least one full replica is required in order to provide the data not available at transient replicas, but it doesn’t matter which full replica is picked because incremental repair synchronizes the repaired data set across full replicas.

Reads at transient replicas are faster than reads at full replicas because reads at transient replicas are unlikely to return any results if monotonic reads are disabled, and they haven’t been receiving writes.

Creating keyspaces with Transient Replication

Transient Replication is supported by SimpleStrategy and NetworkTopologyStrategy. When specifying the replication factor, you can specify the number of transient replicas in addition to the total number of replicas (including transient replicas). The syntax for a replication factor of 3 replicas total with one of them being transient would be “3/1”.

ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'DC1' : '3/1'};
ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : '3/1'};

Monotonic reads are not supported with Transient Replication in 4.0, so any existing tables in the keyspace must have monotonic reads disabled by setting read_repair = 'NONE'

Once the keyspace has been altered, you will need to run incremental repair and then nodetool cleanup to ensure transient data is cleaned up.

Operational matters

Transient replication requires rolling incremental repair to be run regularly in order to move data from transient replicas to full replicas. By default transient replicas will receive 1% of writes for transiently replicated ranges due to rapid write protection. If a node is down for an extended period of time, its transient replicas will receive additional write load and that data should be cleaned up using incremental repair. Running incremental repair regularly will ensure that the size of each repair is small.

It’s also a good idea to run a small number of vnodes with transient replication so that when a node goes down the load is spread out over several other nodes that transiently replicate that range. Larges numbers of vnodes are known to be problematic, so it’s best to start with a cluster that is already close to or at its maximum size so that a small number of vnodes will be sufficient. If you intend to grow the cluster in the future, you will need to be cognizant of how this will interact with the number of vnodes you select.

While the odds of any data loss should multiple nodes be permanently lost remain the same with transient replication, the magnitude of potential data loss does not. With 3/1 transient replication the permanent loss of two nodes could result in the loss of the entirety of the repaired data set. If you are running a multi-DC setup with a high level of replication such as 2 DCs, with 3/1 replicas in each, then you will have 4 full copies total and the added risk of transient replication is minimal.

Experimental features

Experimental features are a relatively new idea for Apache Cassandra. Although we recently voted to make materialized views an experimental feature retroactively, Transient Replication is the first experimental feature to be introduced as such.

The goal of introducing experimental features is to allow for incremental development across multiple releases. In the case of Transient Replication, we can avoid a giant code drop that heavily modifies the code base, and the associated risks with incorporating a new feature that way.

What it means for a feature to be experimental doesn’t have a set definition, but for Transient Replication it’s intended to set expectations. As of 4.0, Transient Replication’s intended audience is expert operators of Cassandra with the ability to write the book on how to safely deploy Transient Replication, debug any issues that result, and if necessary contribute code back to address problems as they are discovered.

It’s expected that the feature set for Transient Replication will not change in minor updates to 4.0, but eventually it should be ready for use by a wider audience.

Next steps for Transient Replication

If increasing availability or saving on capacity sounds good to you, then you can help make transient replication production-ready by testing it out or even deploying it. Experience and feedback from the community is one the of the things that will drive transient replication bug fixing and development.

Getting started with GraphQL and Apache Cassandra

GraphQL positioning

Yet another API specification

Reaper 1.3 Released

Cassandra Reaper 1.3 was released a few weeks ago, and it’s time to cover its highlights.

Configurable pending compactions threshold

Reaper protects clusters from being overloaded by repairs by not submitting new segments that involve a replica with more than 20 pending compactions. This usually means that nodes aren’t keeping up with streaming triggered by repair and running more repair could end up seriously harming the cluster.

While the default value is a good one, there may be cases where one would want to tune the value to match its specific needs. Reaper 1.3.0 allows this by adding a new configuration setting to add in your yaml file:

maxPendingCompactions: 50

Use this setting with care and for specific cases only.

More nodes metrics in the UI

Following up on the effort to expand the capabilities of Reaper beyond the repair features, the following informations are now available in the UI (and through the REST API), when clicking on a node in the cluster view.

Progress of incoming and outgoing streams is now displayed:
Streaming progress

Compactions can be tracked:
Streaming progress

And both thread pool stats and client latency metrics are displayed in tables:
Streaming progress

Streaming progress

What’s next for Reaper?

As shown in a previous blog post, we’re working on an integration with Spotify’s cstar in order to run and schedule topology aware commands on clusters managed by Reaper.

We are also looking to add a sidecar mode, which would collocate an instance of Reaper on each Cassandra node, allowing to use Reaper on clusters where JMX access is restricted to localhost.

The upgrade to 1.3 is recommended for all Reaper users. The binaries are available from yum, apt-get, Maven Central, Docker Hub, and are also downloadable as tarball packages. Remember to backup your database before starting the upgrade.

All instructions to download, install, configure, and use Reaper 1.3 are available on the Reaper website.

Introduction to tlp-stress

If you’re a frequent reader of our blog, you may have noticed we’ve been spending a lot of time looking at performance tuning. We’ve looked at tuning Compression, Garbage Collection, and how you can use Flame Graphs to better understand Cassandra’s internals. To do any sort of reasonable performance tuning you need to be able to apply workloads to test clusters. With Cassandra, that means either writing a custom tool to mimic your data model or using Cassandra stress to try to put load on a cluster.

Unfortunately, in the wise words of Alex Dejanovski, modeling real life workloads with cassandra-stress is hard. Writing a stress profile that gives useful results that can be very difficult, sometimes impossible. As Alex mentions in his post, cassandra-stress will do batches whether or not you’ve asked it to, and disabling that functionality is far from straightforward.

Outside of the world of Cassandra, I’ve done a fair bit of benchmarking and performance profiling. One of the tools I’ve been most impressed with is fio. The author, Jens Axboe, recognizes that when doing performance testing there’s a handful of patterns that come up frequently. With very little work, fio can be configured to benchmark reads, writes, mixed, and the operations can be random or sequential. The idea is that you start with a predefined idea of a job, and configure it via parameters to run the workload you’re interested in. Once you understand the tool, creating a new workload takes minutes, not hours, and is fairly straightforward. It can output the results as JSON, so when you’re done it’s easy to archive and process.

Benchmarking Cassandra is, of course, different than benchmarking a filesystem. Workloads vary significantly between applications, but ultimately we still see a lot of patterns. Time series and key value workloads are very common, for instance. We decided to create a stress tool that shipped with a variety of commonly run workloads, and allow the tool to tweak their behavior. If the desired workload could not be configured based on what we ship, it should be straightforward to create a new one.

Thus, tlp-stress was born.

tlp-stress is written in Kotlin, so it runs on the JVM. We make use of the Datastax Java Driver and make use of best practices to maximize query throughput. Metrics are tracked using the same instrumentation as the Java driver itself, which will make exporting them to tools like Prometheusa no brainer in the long run (see related issue). We chose Kotlin because it gives us access to stable Java libraries and runs on the JVM without a lot of the mental overhead of Java. We’ve used Kotlin internally for over a year at TLP and have found it to be as easy to write as Python while still providing static typing, great IDE support, while not giving up the libraries we rely on and know well.

There’s documentation for tlp-stress, which includes examples generated by the tool itself.

Let’s take a look at a practical example. Let’s say we’re looking to understand how Cassandra will perform when we’ve got a key-value style workload that’s 90% reads. This isn’t the usual use case people talk about a lot - usually we’re discussing optimizing Time Series and similar write heavy workloads.

We can put together this workload rather trivially.

Note: For demonstration purposes I’m only running 10 million operations - for real testing we’d want to set this up to run over several days.

We’ll fire up a test using the run subcommand, specifiying one of the available tests, this one is KeyValue. We’ll limit ourselves to 10,000 partition keys by specifiying -p 10k (note the human friendly inputs) 90% reads with -r .9 and specify the compaction option:

$ tlp-stress run KeyValue -n 10M -p 10k -r .9 --compaction "{'class':'LeveledCompactionStrategy'}" 
Creating schema
Executing 10000000 operations
Creating tlp_stress: 
 IF NOT EXISTS tlp_stress
 WITH replication = {'class': 'SimpleStrategy', 'replication_factor':3 }

Creating Tables
                        key text PRIMARY KEY,
                        value text
                        ) WITH compaction = {'class':'LeveledCompactionStrategy'}
Preparing queries
Initializing metrics
1 threads prepared.
                  Writes                                    Reads                  Errors
  Count  Latency (p99)  5min (req/s) |   Count  Latency (p99)  5min (req/s) |   Count  5min (errors/s)
  13415          15.61             0 |  122038          25.13             0 |       0                0
  33941          15.02        5404.6 |  306897           16.6       48731.6 |       0                0
  54414          15.56        5404.6 |  490757          24.15       48731.6 |       0                0

Note that we didn’t have to write a single query or a schema. That’s because tlp-stress includes common workloads and features out of the box. We can find out what those workloads are by running the list command:

$ tlp-stress list 
Available Workloads:



What if we have a use case that this test is close to, but doesn’t match exactly? Perhaps we know our workload will be fairly heavy on each request, The next thing we can do is customize the data in specific fields. Note the schema in the above table has a table called keyvalue, which has a value field. Let’s suppose our workload is caching larger blobs of data, maybe 100,000 - 150,000 characters per request. Not a problem, we can tweak the field.

$ tlp-stress run KeyValue -n 10M -p 10k -r .9 --compaction "{'class':'LeveledCompactionStrategy'}" --field.keyvalue.value='random(100000,150000)' 
Creating schema
Executing 10000000 operations
Creating tlp_stress: 
 IF NOT EXISTS tlp_stress
 WITH replication = {'class': 'SimpleStrategy', 'replication_factor':3 }

Creating Tables
                        key text PRIMARY KEY,
                        value text
                        ) WITH compaction = {'class':'LeveledCompactionStrategy'}
keyvalue.value, random(100000,150000)
Preparing queries
Initializing metrics
1 threads prepared.
                  Writes                                    Reads                  Errors
  Count  Latency (p99)  5min (req/s) |   Count  Latency (p99)  5min (req/s) |   Count  5min (errors/s)
   1450          43.75             0 |   13648           40.7             0 |       0                0
   3100          35.29         512.8 |   28807          42.81        4733.8 |       0                0
   4655          49.32         512.8 |   42809          44.25        4733.8 |       0                0

Note in the above example, --field.keyvalue.value corresponds to the keyvalue table and the value field, and we’re using random values between 100k-150k characters. We could have specified --field.keyvalue.value='cities()' to pick randomly from a cities of the world list, or --field.keyvalue.value='firstname()' to pick from a list of randomly supplied names. This feature is mostly undocumented and will be receiving some attention soon to make it a lot more useful than the current options, but you should be able to see where we’re going with this.

In addition to our stress tool we’ve also begun work on another tool, cleverly named tlp-cluster, to launch test clusters to aid in development and diagnostics. To be clear, this tool is in it’s early infancy and we don’t expect it to be used by the general Cassandra population. It isn’t a substitute for provisioning tools like Salt and Ansible. With that warning out of the way, if you’re interested in checking it out, we’ve put the project on GitHub and we’re working on some Documentation.

We’ll be building out these tools over the next few months to further help us understand existing Cassandra clusters as well as profile the latest and greatest features that get merged into trunk. By focusing our effort on what we know, tooling and profiling, our goal is to expose and share as much knowledge as possible. Check out the tlp-stress documentation, it’ll be the best source of information over time. Please let us know if you’re finding tlp-stress useful by giving us a shout on Twitter!

Audit Logging in Apache Cassandra 4.0

Database audit logging is an industry standard tool for enterprises to capture critical data change events including what data changed and who triggered the event. These captured records can then be reviewed later to ensure compliance with regulatory, security and operational policies.

Prior to Apache Cassandra 4.0, the open source community did not have a good way of tracking such critical database activity. With this goal in mind, Netflix implemented CASSANDRA-12151 so that users of Cassandra would have a simple yet powerful audit logging tool built into their database out of the box.

Why are Audit Logs Important?

Audit logging database activity is one of the key components for making a database truly ready for the enterprise. Audit logging is generally useful but enterprises frequently use it for:

  1. Regulatory compliance with laws such as SOX, PCI and GDPR et al. These types of compliance are crucial for companies that are traded on public stock exchanges, hold payment information such as credit cards, or retain private user information.
  2. Security compliance. Companies often have strict rules for what data can be accessed by which employees, both to protect the privacy of users but also to limit the probability of a data breach.
  3. Debugging complex data corruption bugs such as those found in massively distributed microservice architectures like Netflix’s.

Why is Audit Logging Difficult?

Implementing a simple logger in the request (inbound/outbound) path sounds easy, but the devil is in the details. In particular, the “fast path” of a database, where audit logging must operate, strives to do as little as humanly possible so that users get the fastest and most scalable database system possible. While implementing Cassandra audit logging, we had to ensure that the audit log infrastructure does not take up excessive CPU or IO resources from the actual database execution itself. However, one cannot simply optimize only for performance because that may compromise the guarantees of the audit logging.

For example, if producing an audit record would block a thread, it should be dropped to maintain maximum performance. However, most compliance requirements prohibit dropping records. Therefore, the key to implementing audit logging correctly lies in allowing users to achieve both performance and reliability, or absent being able to achieve both allow users to make an explicit trade-off through configuration.

Audit Logging Design Goals

The design goal of the Audit log are broadly categorized into 3 different areas:

Performance: Considering the Audit Log injection points are live in the request path, performance is an important goal in every design decision.

Accuracy : Accuracy is required by compliance and is thus a critical goal. Audit Logging must be able to answer crucial auditor questions like “Is every write request to the database being audited?”. As such, accuracy cannot be compromised.

Usability & Extensibility: The diverse Cassandra ecosystem demands that any frequently used feature must be easily usable and pluggable (e.g., Compaction, Compression, SeedProvider etc...), so the Audit Log interface was designed with this context in mind from the start.


With these three design goals in mind, the OpenHFT libraries were an obvious choice due to their reliability and high performance. Earlier in CASSANDRA-13983 the chronical queue library of OpenHFT was introduced as a BinLog utility to the Apache Cassandra code base. The performance of Full Query Logging (FQL) was excellent, but it only instrumented mutation and read query paths. It was missing a lot of critical data such as when queries failed, where they came from, and which user issued the query. The FQL was also single purpose: preferring to drop messages rather than delay the process (which makes sense for FQL but not for Audit Logging). Lastly, the FQL didn’t allow for pluggability, which would make it harder to adopt in the codebase for this feature.

As shown in the architecture figure below, we were able to unify the FQL feature with the AuditLog functionality through the AuditLogManager and IAuditLogger abstractions. Using this architecture, we can support any output format: logs, files, databases, etc. By default, the BinAuditLogger implementation comes out of the box to maintain performance. Users can choose the custom audit logger implementation by dropping the jar file on Cassandra classpath and customizing with configuration options in cassandra.yaml file.


Fig 1. AuditLog Architecture Figure.

What does it log

Each audit log implementation has access to the following attributes. For the default text-based logger, these fields are concatenated with | to yield the final message.

  • user: User name(if available)
  • host: Host IP, where the command is being executed
  • source ip address: Source IP address from where the request initiated
  • source port: Source port number from where the request initiated
  • timestamp: unix time stamp
  • type: Type of the request (SELECT, INSERT, etc.,)
  • category - Category of the request (DDL, DML, etc.,)
  • keyspace - Keyspace(If applicable) on which request is targeted to be executed
  • scope - Table/Aggregate name/ function name/ trigger name etc., as applicable
  • operation - CQL command being executed

Example of Audit log messages

Type: AuditLog
LogMessage: user:anonymous|host:|source:/|port:53418|timestamp:1539978679457|type:SELECT|category:QUERY|ks:k1|scope:t1|operation:SELECT * from k1.t1 ;

Type: AuditLog
LogMessage: user:anonymous|host:|source:/|port:53418|timestamp:1539978692456|type:SELECT|category:QUERY|ks:system|scope:peers|operation:SELECT * from system.peers limit 1;

Type: AuditLog
LogMessage: user:anonymous|host:|source:/|port:53418|timestamp:1539980764310|type:SELECT|category:QUERY|ks:system_virtual_schema|scope:columns|operation:SELECT * from system_virtual_schema.columns ;

How to configure

Auditlog can be configured using cassandra.yaml. If you want to try Auditlog on one node, it can also be enabled and configured using nodetool.

cassandra.yaml configurations for AuditLog

  • enabled: This option enables/ disables audit log
  • logger: Class name of the logger/ custom logger.
  • audit_logs_dir: Auditlogs directory location, if not set, default to cassandra.logdir.audit or cassandra.logdir + /audit/
  • included_keyspaces: Comma separated list of keyspaces to be included in audit log, default - includes all keyspaces
  • excluded_keyspaces: Comma separated list of keyspaces to be excluded from audit log, default - excludes no keyspace
  • included_categories: Comma separated list of Audit Log Categories to be included in audit log, default - includes all categories
  • excluded_categories: Comma separated list of Audit Log Categories to be excluded from audit log, default - excludes no category
  • included_users: Comma separated list of users to be included in audit log, default - includes all users
  • excluded_users: Comma separated list of users to be excluded from audit log, default - excludes no user

Note: BinAuditLogger configurations can be tuned using cassandra.yaml properties as well.

List of available categories are: QUERY, DML, DDL, DCL, OTHER, AUTH, ERROR, PREPARE

NodeTool command to enable AuditLog

enableauditlog: Enables AuditLog with yaml defaults. yaml configurations can be overridden using options via nodetool command.

nodetool enableauditlog


--excluded-categories Comma separated list of Audit Log Categories to be excluded for audit log. If not set the value from cassandra.yaml will be used

--excluded-keyspaces Comma separated list of keyspaces to be excluded for audit log. If not set the value from cassandra.yaml will be used

--excluded-users Comma separated list of users to be excluded for audit log. If not set the value from cassandra.yaml will be used

--included-categories Comma separated list of Audit Log Categories to be included for audit log. If not set the value from cassandra.yaml will be used

--included-keyspaces Comma separated list of keyspaces to be included for audit log. If not set the value from cassandra.yaml will be used

--included-users Comma separated list of users to be included for audit log. If not set the value from cassandra.yaml will be used

--logger Logger name to be used for AuditLogging. Default BinAuditLogger. If not set the value from cassandra.yaml will be used

NodeTool command to disable AuditLog

disableauditlog: Disables AuditLog.

nodetool disableuditlog

NodeTool command to reload AuditLog filters

enableauditlog: NodeTool enableauditlog command can be used to reload auditlog filters when called with default or previous loggername and updated filters

nodetool enableauditlog --loggername <Default/ existing loggerName> --included-keyspaces <New Filter values>


Now that Apache Cassandra ships with audit logging out of the box, users can easily capture data change events to a persistent record indicating what happened, when it happened, and where the event originated. This type of information remains critical to modern enterprises operating in a diverse regulatory environment. While audit logging represents one of many steps forward in the 4.0 release, we believe that it will uniquely enable enterprises to use the database in ways they could not previously.

The Last Pickle Is Hiring

The Last Pickle (TLP) intends to hire a team member in the United States to work directly with customers. You will be part of the TLP tech team, delivering high quality consulting services including expert advice, documentation and run books, diagnostics and troubleshooting, and proof-of-concept code.

Responsibilities Include

  • Delivering world-class consulting to The Last Pickle’s customers globally.
  • Ensuring timely response to customer requests.
  • Guaranteeing professional standards are maintained in both the content and delivery of consulting services to customers.
  • Continually develop skills and expertise to ensure The Last Pickle is able to deliver “Research-Driven Consultancy”.
  • Continued conference speaking engagements, authoring technical blog posts, and contributions to the Open Source community.

Skills and Experience

  • 3+ years experience using and or troubleshooting Apache Cassandra or DataStax Enterprise in production systems.
  • Code and/or other contributions to the Apache Cassandra community.
  • Spoken in public about Cassandra or related Big Data platforms.
  • Experience working remotely with a high-level of autonomy.

In return we offer:

  • Being part of a globally recognised team of experts.
  • Flexible workday and location.
  • Time to work on open source projects and support for public speaking.
  • As much or as little travel as you want.
  • No on-call roster.
  • A great experience helping companies big and small be successful.

If this sounds like the right job for you let us know by emailing careers@thelastpickle.com.

Finding Bugs in Cassandra’s Internals with Property-based Testing

As of September 1st, the Apache Cassandra community has shifted the focus of Cassandra 4.0 development from new feature work to testing, validation, and hardening, with the goal of releasing a stable 4.0 that every Cassandra user, from small deployments to large corporations, can deploy with confidence. There are several projects and methodologies that the community is undertaking to this end. One of these is the adoption of property-based testing, which was previously introduced here. This post will take a look at a specific use of this approach and how it found a bug in a new feature meant to ensure data integrity between the client and Cassandra.

Detecting Corruption is a Property

In this post, we demonstrate property-based testing in Cassandra through the integration of the QuickTheories library introduced as part of the work done for CASSANDRA-13304.

This ticket modifies the framing of Cassandra’s native client protocol to include checksums in addition to the existing, optional compression. Clients can opt-in to this new feature to retain data integrity across the many hops between themselves and Cassandra. This is meant to address cases where hardware and protocol level checksums fail (due to underlying hardware issues) — a case that has been seen in production. A description of the protocol changes can be found in the ticket but for the purposes of this discussion the salient part is that two checksums are added: one that covers the length(s) of the data (if compressed there are two lengths), and one for the data itself. Before merging this feature, property-based testing using QuickTheories was used to uncover a bug in the calculation of the checksum over the lengths. This bug could have led to silent corruption at worst or unexpected errors during deserialization at best.

The test used to find this bug is shown below. This example tests the property that when a frame is corrupted, that corruption should be caught by checksum comparison. The test is wrapped inside of a standard JUnit test case but, once called by JUnit, execution is handed over to QuickTheories to generate and execute hundreds of examples. These examples are dictated by the types of input that should be generated (the arguments to forAll). The execution of each individual example is done by checkAssert and its argument, the roundTripWithCorruption function.

public void corruptionCausesFailure()
                integers().between(0, Byte.MAX_VALUE).map(Integer::byteValue),

The roundTripWithCorruption function is a generalization of a unit test that worked similarly but for a single case. It is given an input to transform and a position in the transformed output to insert corruption, as well as what byte to write to the corrupted position. The additional arguments (the compressor and checksum type) are used to ensure coverage of Cassandra’s various compression and checksumming implementations.

private void roundTripWithCorruption(Pair<String, Integer> inputAndCorruptablePosition,
                                     byte corruptionValue,
                                     Compressor compressor,
                                     ChecksumType checksum) {
    String input = inputAndCorruptablePosition.left;
    ByteBuf expectedBuf = Unpooled.wrappedBuffer(input.getBytes());
    int byteToCorrupt = inputAndCorruptablePosition.right;
    ChecksummingTransformer transformer = new ChecksummingTransformer(checksum, DEFAULT_BLOCK_SIZE, compressor);
    ByteBuf outbound = transformer.transformOutbound(expectedBuf);

    // make sure we're actually expecting to produce some corruption
    if (outbound.getByte(byteToCorrupt) == corruptionValue)

    if (byteToCorrupt >= outbound.writerIndex())
    try {
        int oldIndex = outbound.writerIndex();
        ByteBuf inbound = transformer.transformInbound(outbound, FLAGS);

        // verify that the content was actually corrupted
        Assert.assertEquals(expectedBuf, inbound);
    } catch(ProtocolException e) {

The remaining piece is how those arguments are generated — the arguments to forAll mentioned above. Each argument is a function that returns an input generator. For each example, an input is pulled from each generator and passed to roundTripWithCorruption. The compressors() and checksums() generators aren’t copied here. They can be found in the source and are based on built-in generator methods, provided by QuickTheories, that select a value from a list of values. The second argument, integers().between(0, Byte.MAX_VALUE).map(Integer::byteValue), generates non-negative numbers that fit into a single byte. These numbers will be passed as the corruptionValue argument.

The inputWithCorruptiblePosition generator, copied below, generates strings to use as input to the transformation function and a position within the output byte stream to corrupt. Because compression prevents knowledge of the output size of the frame, the generator tries to choose a somewhat reasonable position to corrupt by limiting the choice to the size of the generated string (it’s uncommon for compression to generate a larger string and the implementation discards the compressed value if it does). It also avoids corrupting the first two bytes of the stream which are not covered by a checksum and therefore can be corrupted without being caught. The function above ensures that corruption is actually introduced and that corrupting a position larger than the size of the output does not occur.

private Gen<Pair<String, Integer>> inputWithCorruptablePosition()
    return inputs().flatMap(s -> integers().between(2, s.length() + 2)
                   .map(i -> Pair.create(s, i)));

With all those pieces in place, if the test were run before the bug were fixed, it would fail with the following output.

java.lang.AssertionError: Property falsified after 2 example(s) 
Smallest found falsifying value(s) :-
{(c,3), 0, null, Adler32}

Cause was :-
java.lang.IndexOutOfBoundsException: readerIndex(10) + length(16711681) exceeds writerIndex(15): UnpooledHeapByteBuf(ridx: 10, widx: 15, cap: 54/54)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1401)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1388)
    at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:870)
    at org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer.transformInbound(ChecksummingTransformer.java:289)
    at org.apache.cassandra.transport.frame.checksum.ChecksummingTransformerTest.roundTripWithCorruption(ChecksummingTransformerTest.java:106)
Other found falsifying value(s) :- 
{(c,3), 0, null, CRC32}
{(c,3), 1, null, CRC32}
{(c,3), 9, null, CRC32}
{(c,3), 11, null, CRC32}
{(c,3), 36, null, CRC32}
{(c,3), 50, null, CRC32}
{(c,3), 74, null, CRC32}
{(c,3), 99, null, CRC32}

Seed was 179207634899674

The output shows more than a single failing example. This is because QuickTheories, like most property-based testing libraries, comes with a shrinker, which performs the task of taking a failure and minimizing its inputs. This aids in debugging because there are multiple failing examples to look at often removing noise in the process. Additionally, a seed value is provided so the same series of tests and failures can be generated again — another useful feature when debugging. In this case, the library generated an example that contains a single byte of input, which will corrupt the fourth byte in the output stream by setting it to zero, using no compression, and using Adler32 for checksumming. It can be seen from the other failing examples that using CRC32 also fails. This is due to improper calculation of the checksum, regardless of the algorithm. In particular, the checksum was only calculated over the least significant byte of each length rather than all eight bytes. By corrupting the fourth byte of the output stream (the first length’s second-most significant byte not covered by the calculation), an invalid length is read and later used.

Where to Find More

Property-based testing is a broad topic, much of which is not covered by this post. In addition to Cassandra, it has been used successfully in several places including car operating systems and suppliers’ products, GNOME Glib, distributed consensus, and other distributed databases. It can also be combined with other approaches such as fault-injection and memory leak detection. Stateful models can also be built to generate a series of commands instead of running each example on one generated set of inputs. Our goal is to evangelize this approach within the Cassandra developer community and encourage more testing of this kind as part of our work to deliver the most stable major release of Cassandra yet.

Custom commands in cstar

Welcome to the next part of the cstar post series. The previous post introduced cstar and showed how it can run simple shell commands using various execution strategies. In this post, we will teach you how to build more complex custom commands.

Basic Custom Commands

Out of the box, cstar comes with three commands:

$ cstar
usage: cstar [-h] {continue,cleanup-jobs,run} ...


positional arguments:
    continue            Continue a previously created job (*)
    cleanup-jobs        Cleanup old finished jobs and exit (*)
    run                 Run an arbitrary shell command

Custom commands allow extending these three with anything one might find useful. Adding a custom command to cstar is as easy as placing a file to ~/.cstar/commands or /etc/cstar/commands. For example, we can create ~/.cstar/commands/status that looks like this:

#!/usr/bin/env bash
nodetool status

With this file in place, cstar now features a brand new status command:

$ cstar
usage: cstar [-h] {continue,cleanup-jobs,run,status} ...


positional arguments:
    continue            Continue a previously created job (*)
    cleanup-jobs        Cleanup old finished jobs and exit (*)
    run                 Run an arbitrary shell command

A command like this allows us to stop using:

cstar run --command "nodetool status" --seed-host <host_ip>

And use a shorter version instead:

cstar status --seed-host <host_ip>

We can also declare the command description and default values for cstar’s options in the command file. We can do this by including commented lines with a special prefix. For example, we can include the following lines in our ~/.cstar/commands/status file:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: all
# C* description: Run nodetool status

nodetool status

Once we do this, the status will show up with a proper description in cstar’s help and running cstar status --seed-host <host_ip> will be equivalent to:

cstar status --seed-host <host_ip> --cluster-parallel --dc-parallel --strategy all

When cstar begins the execution of a command, it will print an unique ID of the command being run. This ID is needed for resuming a job but more on this later. We also need the job ID to examine the output of the commands. We can find the output in:

$ ~/.cstar/jobs/<job_id>/<hostname>/out

Parametrized Custom Commands

When creating custom commands, cstar allows declaring custom arguments as well. We will explain this feature by introducing a command that deletes snapshots older than given number of days.

We will create a new file, ~/.cstar/commands/clear-snapshots, that will start like this:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: all
# C* description: Clear snapshots older than given number of days
# C* argument: {"option":"--days", "name":"DAYS", "description":"Snapshots older than this many days will be deleted", "default":"7", "required": false}

The new element here is the last line starting with # C* argument:. Upon seeing this prefix, cstar will parse the remainder of the line as a JSON payload describing the custom argument. In the case above, cstar will:

  • Use --days as the name of the argument.
  • Save the value of this argument into a variable named DAYS. We will see how to access this in a bit.
  • Associate a description with this argument.
  • Use 7 as a default value.
  • Do not require this option.

With this file in place, cstar already features the command in its helps:

$ cstar
usage: cstar [-h] {continue,cleanup-jobs,run,status,clear-snapshots} ...


positional arguments:
    continue            Continue a previously created job (*)
    cleanup-jobs        Cleanup old finished jobs and exit (*)
    run                 Run an arbitrary shell command
    status              Run nodetool status
    clear-snapshots     Clear snapshots older than given number of days

$ cstar clear-snapshots --help
usage: cstar clear-snapshots [-h] [--days DAYS]
                             [--seed-host [SEED_HOST [SEED_HOST ...]]]
                             <other default options omitted>

optional arguments:
 -h, --help            show this help message and exit
 --days DAYS
                       Snapshots older than this many days will be deleted
 --seed-host [SEED_HOST [SEED_HOST ...]]
                       One or more hosts to use as seeds for the cluster (edited)
 <other default options omitted>

Now we need to add the command which will actually clear the snapshots. This command needs to do three things:

  • Find the snapshots that are older than given number of days.
    • We will use the -mtime filter of the find utility:
    • find /var/lib/cassandra/*/data/*/*/snapshots/ -mtime +"$DAYS" -type d
    • Note we are using "$DAYS" to reference the value of the custom argument.
  • Extract the snapshot names from the findings.
    • We got absolute paths to the directories found. Snapshot names are the last portion of these paths. Also, we will make sure to keep each snapshot name only once:
    • sed -e 's#.*/##' | sort -u
  • Invoke nodetool clearsnapshot -t <snapshot_name> to clear each of the snapshots.

Putting this all together, the clear-snapshots file will look like this:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: all
# C* description: Clear up snapshots older than given number of days
# C* argument: {"option":"--days", "name":"DAYS", "description":"Snapshots older than this many days will be deleted", "default":"7", "required": false}

find /var/lib/cassandra/data/*/*/snapshots/ -mtime +"$DAYS" -type d |\
sed -e 's#.*/##' |\
sort -u |\
while read line; do nodetool clearsnapshot -t "${line}"; done

We can now run the clear-snpahsots command like this:

$ cstar clear-snapshots --days 2 --seed-host <seed_host>

Complex Custom Commands

One of the main reasons we consider cstar so useful is that the custom commands can be arbitrary shell scripts, not just one-liners we have seen so far. To illustrate this, we are going to share two relatively complicated commands.

Upgrading Cassandra Version

The first command will cover a rolling upgrade of the Cassandra version. Generally speaking, the upgrade should happen as quickly as possible and with as little downtime as possible. This is the ideal application of cstar’s topology strategy: it will execute the upgrade on as many nodes as possible while ensuring a quorum of replicas stays up at any moment. Then, the upgrade of a node should follow these steps:

  • Create snapshots to allow rollback if the need arises.
  • Upgrade the Cassandra installation.
  • Restart the Cassandra process.
  • Check the upgrade happened successfully.

Clearing the snapshots, or upgrading SSTables is something that should not be part of the upgrade itself. Snapshots being just hardlinks will not consume excessive space and Cassandra (in most cases) can operate with older SSTable versions. Once all nodes are upgraded, these actions are easy enough to perform with dedicated cstar commands.

The ~/.cstar/commands/upgrade command might look like this:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: topology
# C* description: Upgrade Cassandra package to given target version
# C* argument: {"option":"--snapshot-name", "name":"SNAPSHOT_NAME", "description":"Name of pre-upgrade snapshot", "default":"preupgrade", "required": false}
# C* argument: {"option":"--target-version", "name":"VERSION", "description":"Target version", "required": true}

# -x prints the executed commands commands to standard output
# -e fails the entire script if any of the commands fails
# -u fails the script if any of the variables is not bound
# -o pipefail instrucs the interpreter to return right-most non-zero status of a piped command in case of failure
set -xeuo pipefail

# exit if a node is already on the target version
if [[ $(nodetool version) == *$VERSION ]]; then
  exit 0

# create Cassandra snapshots to allow rollback in case of problems
nodetool clearsnapshot -t "$SNAPSHOT_NAME"
nodetool snapshot -t "$SNAPSHOT_NAME"

# upgrade Cassandra version
sudo apt-get install -y cassandra="$VERSION"

# gently stop the cassandra process
nodetool drain && sleep 5 && sudo service cassandra stop

# start the Cassandra process again
sudo service cassandra start

# wait for Cassandra to start answering JMX queries
for i in $(seq 60); do
    if ! nodetool version 2>&1 > /dev/null; then
    sleep 1s

# fail if the upgrade did not happen
if ! [[ $(nodetool version) == *$VERSION ]]; then
  exit 1

When running this command, we can be extra-safe and use the --stop-after option:

$ cstar upgrade --seed-host <host_name> --target-version 3.11.2 --stop-after 1

This will instruct cstar to upgrade only one node and exit the execution. Once that happens, we can take our time to inspect the node to see if the upgrade went smoothly. When we are confident enough, we can resume the command. Output of each cstar command starts with a highlighted job identifier, which we can use with the continue command:

$ cstar continue <job_id>

Changing Compaction Strategy

The second command we would like to share performs a compaction strategy change in a rolling fashion.

Compaction configuration is a table property. It needs an ALTER TABLE CQL statement execution to change. Running a CQL statement is effective immediately across the cluster. This means once we issue the statement, each node will react to the compaction change. The exact reaction depends on the change, but it generally translates to increased compaction activity. It is not always desirable to have this happen: compaction can be an intrusive process and affect the cluster performance.

Thanks to CASSANDRA-9965, there is a way of altering compaction configuration on a single node via JMX since Cassandra version 2.1.9. We can set CompactionParametersJson MBean value and change the compaction configuration the node uses. Once we know how to change one node, we can have cstar do the same but across the whole cluster.

Once we change the compaction settings, we should also manage the aftermath. Even though the change is effective immediately, it might take a very long time until each SSTable undergoes a newly configured compaction. The best way of doing this is to trigger a major compaction and wait for it to finish. After a major compaction, all SSTables are organised according to the new compaction settings and there should not be any unexpected compaction activity afterwards.

While cstar is excellent in checking which nodes are up or down, it does not check for other aspects of nodes health. It does not have the ability to monitor compaction activity. Therefore we should include the wait for major compaction in the command we are about to build. The command will then follow these steps:

  • Stop any compactions that are currently happening.
  • Set the CompactionParametersJson MBean to the new value.
    • We will use jmxterm for this and assume the JAR file is already present on the nodes.
  • Run a major compaction to force Cassandra to organise SSTables according to the new setting and make cstar wait for the compactions to finish.
    • This step is not mandatory. Cassandra would re-compact the SSTables eventually.
    • Doing a major compaction will cost extra resources and possibly impact the node’s performance. We do not recommend doing this at all nodes in parallel.
    • We are taking advantage of the topology strategy which will guarantee a quorum of replicas free from this load at any time.

The ~/.cstar/commands/change-compaction command might look like this:

#! /bin/bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: topology
# C* description: Switch compaction strategy using jmxterm and perform a major compaction on a specific table
# C* argument: {"option":"--keyspace-name", "name":"KEYSPACE", "description":"Keyspace containing the target table", "required": true}
# C* argument: {"option":"--table", "name":"TABLE", "description":"Table to switch the compaction strategy on", "required": true}
# C* argument: {"option":"--compaction-parameters-json", "name":"COMPACTION_PARAMETERS_JSON", "description":"New compaction parameters", "required": true}
# C* argument: {"option":"--major-compaction-flags", "name":"MAJOR_COMPACTION_FLAGS", "description":"Flags to add to the major compaction command", "default":"", "required": false}
# C* argument: {"option":"--jmxterm-jar-location", "name":"JMXTERM_JAR", "description":"jmxterm jar location on disk", "required": true}

set -xeuo pipefail

echo "Switching compaction strategy on $KEYSPACE.$TABLE"
echo "Stopping running compactions"
nodetool stop COMPACTION
echo "Altering compaction through JMX..."
echo "set -b org.apache.cassandra.db:columnfamily=$TABLE,keyspace=$KEYSPACE,type=ColumnFamilies CompactionParametersJson $COMPACTION_PARAMETERS_JSON"  | java -jar $JMXTERM_JAR --url -e

echo "Running a major compaction..."

The command requires options specifying which keyspace and table to apply the change on. The jmxterm location and the new value for the compaction parameters are another two required arguments. The command also allows passing in flags to the major compaction. This is useful for cases when we are switching to SizeTieredCompactionStrategy, where the -s flag will instruct cassandra to produce several size-tiered files instead of a single big file.

Running the nodetool compact command will not return until the major compaction finishes. This will cause the execution on one node to not complete until this happens. Consequently, cstar will see this long execution and dutifully wait for it to complete before moving on to other nodes.

Here is an example of running this command:

$ cstar change-compaction --seed-host <host_name> --keyspace tlp_stress --table KeyValue --jmxterm-jar-location /usr/share/jmxterm-1.0.0-uber.jar --compaction-parameters-json "{\"class\":\"LeveledCompactionStrategy\",\"sstable_size_in_mb\":\"120\"}"

This command also benefits from the --stop-after option. Moreover, once all nodes are changed, we should not forget to persist the schema change by doing the actual ALTER TABLE command.


In this post we talked about cstar and its feature of adding custom commands. We have seen:

  • How to add a simple command to execute nodetool status on all nodes at once.
  • How define custom parameters for our commands, which allowed us to build a command for deleting old snapshots.
  • That the custom commands are essentially regular bash scripts and can include multiple statements. We used this feature to do a safe and fast Cassandra version upgrade.
  • That the custom commands can call external utilities such as jmxterm, which we used to change compaction strategy for a table in a rolling fashion.

In the next post, we are going to look into cstar’s cousin called cstarpar. cstarpar differs in the way commands are executed on remote nodes and allows for heavier operations such as rolling reboots.

How to Tweak the Number of num_tokens (vnodes) in Live Cassandra Cluster

Some clients have asked us to change the number of num_tokens as their requirement changes.
For example lower number of num_tokens are recommended is using DSE search etc..
The most important thing during this process is that the cluster stays up, and is healthy and fast. Anything we do needs to be deliberate and safe, as we have production traffic flowing through.

The process includes adding a new DC with a changed number of num_tokens, decommissioning the old DC one by one, and letting Cassandra automatic mechanisms distribute the existing data into the new nodes.

The below procedure is based on the assumption that you have 2 DC DC1 & DC2.


1. Run Repair To Keep Data Consistent Across Cluster

Make sure to run a full repair with nodetool repair. More detail about repairs can be found here. This ensures that all data is propagated from the datacenter which is being decommissioned.

2. Add New DC DC3 And Decommission Old Datacenter DC1

Step 1: Download and Install a similar Cassandra version to the other nodes in the cluster, but do not start.

How-To stop Cassandra

Note: Don’t stop any node in DC1 unless DC3 added.

If you used the Debian package, Cassandra starts automatically. You must stop the node and clear the data.
Stop the node:
Packaged installations: $ sudo service cassandra stop
Tarball installations: nodetool stopdaemon
If for some reason the previous command doesn’t work, find the Cassandra Java process ID (PID), and then kill the process using its PID number:
$ ps auwx | grep cassandra
$ sudo kill pid

Step 2: Clear the data from the default directories once the node is down.

sudo rm -rf /var/lib/cassandra/*
Step 3: Configure the parameter by similar settings of other nodes in the cluster.
Properties which should be set by comparing to other nodes.
  • Seeds: This should include nodes from live DC because new nodes have to stream data from them.
  • snitch: Keep it similar to the nodes in live DC.
  • cluster_name: Similar to the nodes in another live DC.
  • num_tokens: Number of vnodes required.
  • initial_tokne: Make sure this is commented out.

Set the local parameters below:

  • auto_bootstrap: false
  • listen_address: Local to the node
  • rpc_address: Local to the node
  • data_directory: Local to the node
  • saved_cache_directory: Local to the node
  • commitlog_directory: Local to the node

Cassandra-rackdc.properties:  Set the parameter for new datacenter and rack:

  • dc: “dc name”
  • rack: “rack name”
Set the below configurations files, as needed:
Step 4: Start Cassandra on each node, one by one.
Step 5: Now that all nodes are up and running, alter Keyspaces to set RF in a new datacenter with the number of replicas, as well.
ALTER KEYSPACE Keyspace_name WITH REPLICATION = {‘class’ : ‘NetworkTopologyStrategy’, ‘dc1’ : 3, ‘dc2’ : 3, ‘dc3’ : 3};

Step 6: Finally, now that the nodes are up and empty, we should run “nodetool rebuild” on each node to stream data from the existing datacenter.

nodetool rebuild “Existing DC Name”

Step 7: Remove “auto_bootstrap: false” from each Cassandra.yaml or set it to true after the complete process.

auto_bootstrap: true

Decommission DC1:

Now that we have added DC3 into a cluster, it’s time to decommission DC1. However, before decommissioning the datacenter in a production environment, the first step should be to prevent the client from connecting to it and ensure reads or writes do not query this datacenter.
Step 1: Prevent clients from communicating with DC1
  • First of all, ensure that the clients point to an existing datacenter.
  • Set DCAwareRoundRobinPolicy to local to avoid any requests.
Make sure to change QUORUM consistency level to LOCAL_QUORUM and ONE to LOCAL_ONE.
Step 2: ALTER KEYSPACE to not have a replica in decommissioning DC.
ALTER KEYSPACE “Keyspace_name” WITH REPLICATION = {‘class’ : ‘NetworkTopologyStrategy’, ‘dc2’ : 3, ‘dc3’ : 3};
Step 3: Decommission each node using nodetool decommission.
nodetool decommission
Step 4: Remove all data from data, saved caches, and commitlog directory after all nodes are decommissioned to reclaim disk space.
sudo rm -rf “Data_directory”/“Saved_cache_directory”/“Commitlog_directory”
Step 5: Finally, stop Cassandra as described in Step 1.
Step 6: Decommission each node in DC2 by following the above procedure.

3. Add New DC DC4 And Decommission Old DC2

Hopefully, this blog post will help you to understand the procedure for changing the number of vnodes on a live Cluster. Keep in mind that bootstrapping/rebuilding/decommissioning process time depends upon data size.

Introduction to cstar

Spotify is a long time user of Apache Cassandra at very large scale. It is also a creative company which tries to open source most of the tools they build for internal needs. They released Cassandra Reaper a few years ago to give the community a reliable way of repairing clusters, which we now love and actively maintain. Their latest open sourced tool for Cassandra is cstar, a parallel-ssh equivalent (distributed shell) that is Cassandra topology aware. At TLP, we love it already and are sure you soon will too.

What is cstar?

Running distributed databases requires good automation, especially at scale. But even with small clusters, running the same command or roll restarting a cluster can quickly get tedious. Sure, you can use tools like dsh and pssh, but they run commands on all servers at the same time (or just a given number) and you need to keep a list of the nodes to connect to locally. Each time your cluster scales out/in or if nodes get replaced you need to update the list. If you forget to update you may run commands that won’t touch the whole cluster without noticing.

All commands cannot run on all nodes at the same time either. For instance upgrading sstables, running cleanup, major compaction or restarting nodes will have an impact on either latencies or availability and require more granularity of execution.

Cstar doesn’t suffer any of the above problems. It will discover the topology of the cluster dynamically and tune concurrency based on replication settings. In addition, cstar will run from a single machine (not necessarily within the cluster) that has SSH access to all nodes in the cluster, and perform operations through SSH and SFTP. It requires no dependency, other than nodetool, to be installed on the Cassandra nodes.

Installing cstar

You’ll need to have Python 3 and pip3 installed on your server/laptop and then follow the README instructions which will, in the simplest case, boil down to:

pip3 install cstar

Running cstar

Cstar is built with Python 3 and offers a straightforward way to run simple commands or complex scripts on an Apache Cassandra cluster using a single contact point.

The following command, for example, will perform a rolling restart of Cassandra in the cluster, one node at a time using the one strategy:

cstar run --command="sudo service cassandra restart" --seed-host=<contact_point_ip> --strategy=one

During the execution, cstar will update progress with a clear and pleasant output:

 +  Done, up      * Executing, up      !  Failed, up      . Waiting, up
 -  Done, down    / Executing, down    X  Failed, down    : Waiting, down
Cluster: Test Cluster
DC: dc1
DC: dc2
DC: dc3
2 done, 0 failed, 1 executing

If we want to perform cleanup with topology awareness and have only one replica at a time, running the command for each token range (leaving a quorum of unaffected replicas at RF=3), we can use the default topology strategy:

cstar run --command="nodetool cleanup" --seed-host=<contact_point_ip> --strategy=topology

This way, we’ll have several nodes processing the command to minimize the overall time spent on the operation and still ensure low impact on latencies:

 +  Done, up      * Executing, up      !  Failed, up      . Waiting, up
 -  Done, down    / Executing, down    X  Failed, down    : Waiting, down
Cluster: Test Cluster
DC: dc1
DC: dc2
DC: dc3
5 done, 0 failed, 12 executing

Finally, if we want to run a command that doesn’t involve pressure on latencies and display the outputs locally, we can use strategy all and add the -v flag to display the command outputs:

cstar run --command="nodetool getcompactionthroughput" --seed-host=<contact_point_ip> --strategy=all -v

Which will give us the following output:

 +  Done, up      * Executing, up      !  Failed, up      . Waiting, up
 -  Done, down    / Executing, down    X  Failed, down    : Waiting, down
Cluster: Test Cluster
DC: dc1
DC: dc2
DC: dc3
0 done, 0 failed, 39 executing
Host node1.mycompany.com finished successfully
Current compaction throughput: 0 MB/s

Host node21.mycompany.com finished successfully
Current compaction throughput: 0 MB/s

Host node10.mycompany.com finished successfully
Current compaction throughput: 0 MB/s


Host node7.mycompany.com finished successfully
Current compaction throughput: 0 MB/s

Host node18.mycompany.com finished successfully
Current compaction throughput: 0 MB/s

 +  Done, up      * Executing, up      !  Failed, up      . Waiting, up
 -  Done, down    / Executing, down    X  Failed, down    : Waiting, down
Cluster: Test Cluster
DC: dc1
DC: dc2
DC: dc3
39 done, 0 failed, 0 executing
Job cff7f435-1b9a-416f-99e4-7185662b88b2 finished successfully

How cstar does its magic

When you run a cstar command it will first connect to the seed node you provided and run a set of nodetool commands through SSH.

First, nodetool ring will give it the cluster topology with the state of each node. By default, cstar will stop the execution if one node in the cluster is down or unresponsive. If you’re aware that nodes are down and want to run a command nonetheless, you can add the --ignore-down-nodes flag to bypass the check.

Then cstar will list the keyspaces using nodetool cfstats and build a map of the replicas for all token ranges for each of them. This will allow it to identify which nodes contain the same token ranges, using nodetool describering, and apply the topology strategy accordingly. As shown before, the topology strategy will not allow two nodes that are replicas for the same token to run the command at the same time. If the cluster does not use vnodes, the topology strategy will run the command every RF node. If the cluster uses vnodes but is not using NetworkTopologyStrategy (NTS) for all keyspaces nor spreading across racks, chances are only one node will be able to run the command at once, even with the topology strategy.If both NTS and racks are in use, the topology strategy will run the command on a whole rack at a time.

By default, cstar will process the datacenters in parallel, so 2 nodes being replicas for the same tokens but residing in different datacenters can be processed at the same time.

Once the cluster has been fully mapped execution will start in token order. Cstar is very resilient because it uploads a script on each remote node through SFTP and runs it using nohup. Each execution will write output (std and err) files along with the exit code for cstar to check on regularly. If the command is interrupted on the server that runs cstar, it can be resumed safely as cstar will first check if the script is still running or has finished already on each node that hasn’t gone through yet.
Note that interrupting the command on the cstar host will not stop it on the remote nodes that are already running it.
Resuming an interrupted command is done simply by executing : cstar continue <job_id>

Each time a node finishes running the command cstar will check if the cluster health is still good and if the node is up. This way, if you perform a rolling restart and one of the nodes doesn’t come back up properly, although the exit code of the restart command is 0, cstar will wait indefinitely to protect the availability of the cluster. That is unless you specified a timeout on the job. In such a case, the job will fail. Once the node is up after the command has run, cstar will look for the next candidate node in the ring to run the command.

A few handy flags

Two steps execution

Some commands may be scary to run on the whole cluster and you may want to run them on a subset of the nodes first, check that they are in the expected state manually, and then continue the execution on the rest of the cluster. The --stop-after=<number-of-nodes> flag will do just that. Setting it to --stop-after=1 will run the command on a single node and exit. Once you’ve verified that you’re happy with the execution on that one node you can process the rest of the cluster using cstar continue <job_id>.

Retry failed nodes

Some commands might fail mid-course due to transient problems. By default, cstar continue <job_id> will halt if there is any failed execution in the history of the job. In order to resume the job and retry the execution on the failed nodes, add the --retry-failed flag.

Run the command on a specific datacenter

To process only a specific datacenter add the --dc-filter=<datacenter-name> flag. All other datacenters will be ignored by cstar.

Datacenter parallelism

By default, cstar will process the datacenters in parallel. If you only want only one datacenter to process the command at a time, add the --dc-serial flag.

Specifying a maximum concurrency

You can forcefully limit the number of nodes running the command at the same time, regardless of topology, by adding the --max-concurrency=<number-of-nodes> flag.

Wait between each node

You may want to delay executions between nodes in order to give some room for the cluster to recover from the command. The --node-done-pause-time=<time-in-seconds> flag will allow to specify a pause time that cstar will apply before looking for the next node to run the command on.

Run the command regardless down nodes

If you want to run a command while nodes are down in the cluster add the --ignore-down-nodes flag to cstar.

Run on specific nodes only

If the command is meant to run on some specific nodes only you can use either the --host or the --host-file flags.

Control the verbosity of the output

By default, cstar will only display the progress of the execution as shown above in this post. To get the output of the remote commands, add the -v flag. If you want to get more verbosity on the executions and get debug loggings use either -vv (very verbose) or -vvv (extra verbose).

You haven’t installed it already?

Cstar is the tool that all Apache Cassandra operators have been waiting for to manage clusters of all sizes. We were happy to collaborate closely with Spotify to help them open source it. It has been built and matured at one of the most smart and successful start-ups in the world and was developed to manage hundreds of clusters of all sizes. It requires no dependency to be installed on the cluster and uses SSH exclusively. Thus, it will comply nicely with any security policy and you should be able to run it within minutes on any cluster of any size.

We love cstar so much we are already working on integrating it with Reaper as you can see in the following video :

We’ve seen in this blog post how to run simple one line commands with cstar, but there is much more than meets the eye. In an upcoming blog post we will introduce complex command scripts that perform operations like upgrading a Cassandra cluster, selectively clearing snapshots, or safely switching compaction strategies in a single cstar invocation.

Assassinate - A Command of Last Resort within Apache Cassandra

The nodetool assassinate command is meant specifically to remove cosmetic issues after nodetool decommission or nodetool removenode commands have been properly run and at least 72 hours have passed. It is not a command that should be run under most circumstances nor included in your regular toolbox. Rather the lengthier nodetool decommission process is preferred when removing nodes to ensure no data is lost. Note that you can also use the nodetool removenode command if cluster consistency is not the primary concern.

This blog post will explain:

  • How gossip works and why assassinate can disrupt it.
  • How to properly remove nodes.
  • When and how to assassinate nodes.
  • How to resolve issues when assassination attempts fail.

Gossip: Cassandra’s Decentralized Topology State

Since all topological changes happen within Cassandra’s gossip layer, before we discuss how to manipulate the gossip layer, let’s discuss the fundamentals of how gossip works.

From Wikipedia:

A gossip (communication) protocol is a procedure or process of computer-computer communication that is based on the way social networks disseminate information or how epidemics spread… Modern distributed systems often use gossip protocols to solve problems that might be difficult to solve in other ways, either because the underlying network has an inconvenient structure, is extremely large, or because gossip solutions are the most efficient ones available.

The gossip state within Cassandra is the decentralized, eventually consistent, agreed upon topological state of all nodes within a Cassandra cluster. Cassandra gossip heartbeats keep the topological gossip state updated, are emitted via each node in the cluster, and contain the following information:

  • What that node’s status is, and
  • What its neighbors statuses’ are.

When a node goes offline the gossip heartbeat will not be emitted and the node’s neighbors will detect that the node is offline (with help from an algorithm which is tuned by the phi_convict_threshold parameter as defined within the cassandra.yaml), and the neighbor will broadcast an updated status saying that the neighbor node is unavailable until further notice.

However, as soon as the node comes online, two things will happen:

  1. The revived node will:
    • Ask a neighbor node what the current gossip state is.
    • Modify the received gossip state to include its own status.
    • Assume the modified state as its own.
    • Broadcast the new gossip state across the network.
  2. A neighbor node will:
    • Discover the revived node is back online, either by:
      • First-hand discovery, or
      • Second-hand gossiping.
    • Update the received gossip state with the new information.
    • Modify the received gossip state to include its own status.
    • Assume the modified state as its own.
    • Broadcast the new gossip state across the network.

The above gossip protocol is responsible for the UN/DN, or Up|Down/Normal, statuses seen within nodetool status and is responsible for ensuring requests and replicas are properly routed to the available and responsible nodes, among other tasks.

Differences Between Assassination, Decommission, and Removenode

There are three main commands used to take a node offline: assassination, decommission, and removenode. Having the node be in the LEFT state ensures that each node’s gossip state will eventually be consistent and agree that:

  • The deprecated node has in fact been deprecated.
  • The deprecated node was deprecated after a given timestamp.
  • The deprecated token ranges are now owned by a new node.
  • Ideally, the deprecated LEFT stage will be automatically purged in 72 hours.

Underlying Actions of Decommission and Removenode on the Gossip Layer

When nodetool decommission and nodetool removenode commands are run, we are changing the state of the gossip layer to the LEFT state for the deprecated node.

Following the gossip protocol procedure in the previous section, the LEFT status will spread across the cluster as the the new truth since the LEFT status has a more recent timestamp than the previous status.

As more nodes begin to assimilate the LEFT status, the cluster will ultimately reach consensus that the deprecated node has LEFT the cluster.

Underlying Actions of Assassination

Unlike nodetool decommission and nodetool removenode above, when nodetool assassinate is used we are updating the gossip state to the LEFT state, then forcing an incrementation of the gossip generation number, and updating the application state to the LEFT state explicitly, which will then propagate as normal.

Removing Nodes: The “Proper” Way

When clusters grow large, an operator may need to remove a node, either due to hardware faults or horizontally scaling down the cluster. At that time, the operator will need to modify the topological gossip state with either a nodetool decommission command for online nodes or nodetool removenode for offline nodes.

Decommissioning a Node: While Saving All Replicas

The typical command to run on a live node that will be exiting the cluster is:

nodetool decommission

The nodetool decommission command will:

  • Extend certain token ranges within the gossip state.
  • Stream all of the decommissioned node’s data to the new replicas in a consistent manner (the opposite of bootstrap).
  • Report to the gossip state that the node has exited the ring.

While this command may take a while to complete, the extra time spent on this command will ensure that all owned replicas are streamed off the node and towards the new replica owners.

Removing a Node: And Losing Non-Replicated Replicas

Sometimes a node may be offline due to hardware issues and/or has been offline for longer than gc_grace_seconds within a cluster that ingests deletion mutations. In this case, the node needs to be removed from the cluster while remaining offline to prevent “zombie data” from propagating around the cluster due to already expired tombstones, as defined by the gc_grace_seconds window. In the case where the node will remain offline, the following command should be run on a neighbor node:

nodetool removenode $HOST_ID

The nodetool removenode command will:

  • Extend certain token ranges within the gossip state.
  • Report to the gossip state that the node has exited the ring.
  • Will NOT stream any of the decommissioned node’s data to the new replicas.

Increasing Consistency After Removing a Node

Typically a follow up repair is required in a rolling fashion around the data center to ensure each new replica has the required information:

nodetool repair -pr

Note that:

  • The above command will only repair replica consistencies if the replication factor is greater than 1 and one of the surviving nodes contains a replica of the data.
  • Running a rolling repair will generate disk, CPU, and network load proportional to the amount of data needing to be repaired.
  • Throttling a rolling repair by repairing only one node at a time may be ideal.
  • Using Reaper for Apache Cassandra can schedule, manage, and load balance the repair operations throughout the lifetime of the cluster.

How We Can Detect Assassination is Needed

In either of the above cases, sometimes the gossip state will continue to be out of sync. There will be echoes of past statuses that claim not only the node is still part of the cluster, but it may still be alive. And then missing. Intermittently.

When the gossip truth is continuously inconsistent, nodetool assassinate will resolve these inconsistencies, but should only be run after nodetool decommission or nodetool removenode have been run and at least 72 hours have passed.

These issues are typically cosmetic and appear as similar lines within the system.log:

2014-09-26 01:26:41,901 DEBUG [Reconnection-1] Cluster - Failed reconnection to /172.x.y.zzz:9042 ([/172.x.y.zzz:9042] Cannot connect), scheduling retry in 600000 milliseconds

Or may appear as UNREACHABLE within the nodetool describecluster output:

Cluster Information:
        Name: Production Cluster
       Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
       Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
       Schema versions:
              65e78f0e-e81e-30d8-a631-a65dff93bf82: [172.x.y.z]
              UNREACHABLE: [172.x.y.zzz]

Sometimes you may find yourself looking even deeper and spot the deprecated node within nodetool gossipinfo months after removing the node:

TOKENS: not present

Note that the LEFT status should stick around for 72 hours to ensure all nodes come to the consensus that the node has been removed. So please don’t rush things if that’s the case. Again, it’s only cosmetic.

In all of these cases the truth may be slightly outdated and an operator may want to set the record straight with truth-based gossip states instead of cosmetic rumor-filled gossip states that include offline deprecated nodes.

How to Run the Assassination Command

Pre-2.2.0, operators used to have to use Java MBeans to assassinate a token (see below). Post-2.2.0, operators can use the nodetool assassinate method.

From an online node, run the command:

nodetool assassinate $IP_ADDRESS

Internally, the nodetool assassinate command will execute the unsafeAssassinateEndpoint command over JMX on the Gossiper MBean.

Java Mbeans Assassination

If using a version of Cassandra that does not yet have the nodetool assassinate command, we’ll have to rely on jmxterm.

You can use the following command to download jmxterm:

wget http://downloads.sourceforge.net/cyclops-group/jmxterm-1.0.0-uber.jar

Then we’ll want to use the Gossiper MBean and run the unsafeAssassinateEndpoint command:

echo "run -b org.apache.cassandra.net:type=Gossiper unsafeAssassinateEndpoint $IP_TO_ASSASSINATE" \
    | java -jar jmxterm-1.0.0-uber.jar -l $IP_OF_LIVE_NODE:7199

Both of the assassination commands will trigger the same MBean command over JMX, however the nodetool assassinatecommand is preferred for its ease of use without additional dependencies.

Resolving Failed Assassination Attempts: And Why the First Attempts Failed

When clusters grow large enough, are geospatially distant enough, or are under intense load, the gossip state may become a bit out of sync with reality. Sometimes this causes assassination attempts to fail and while the solution may sound unnerving, it’s relatively simple once you consider how gossip states act and are maintained.

Because gossip states can be decentralized across high latency nodes, sometimes gossip state updates can be delayed and cause a variety of race conditions that may show offline nodes as still being online. Most times these race conditions will be corrected in relatively short-time periods, as tuned by the phi_convict_threshold within the cassandra.yaml (between a value of 8 for hardware and 12 for virtualized instances). In almost all cases the gossip state will converge into a global truth.

However, because gossip state from nodes that are no longer participating in gossip heartbeat rounds do not have an explicit source and are instead fueled by rumors, dead nodes may sometimes continue to live within the gossip state even after calling the assassinate command.

To solve these issues, you must ensure all race conditions are eliminated.

If a gossip state will not forget a node that was removed from the cluster more than a week ago:

  • Login to each node within the Cassandra cluster.
  • Download jmxterm on each node, if nodetool assassinate is not an option.
  • Run nodetool assassinate, or the unsafeAssassinateEndpoint command, multiple times in quick succession.
    • I typically recommend running the command 3-5 times within 2 seconds.
    • I understand that sometimes the command takes time to return, so the “2 seconds” suggestion is less of a requirement than it is a mindset.
    • Also, sometimes 3-5 times isn’t enough. In such cases, shoot for the moon and try 20 assassination attempts in quick succession.

What we are trying to do is to create a flood of messages requesting all nodes completely forget there used to be an entry within the gossip state for the given IP address. If each node can prune its own gossip state and broadcast that to the rest of the nodes, we should eliminate any race conditions that may exist where at least one node still remembers the given IP address.

As soon as all nodes come to agreement that they don’t remember the deprecated node, the cosmetic issue will no longer be a concern in any system.logs, nodetool describecluster commands, nor nodetool gossipinfo output.

Recap: How To Properly Remove Nodes Completely

Operators shouldn’t opt for the assassinate command as a first resort for taking a Cassandra node out since it is sledgehammer and most of the time operators are dealing with a screw.

However, when operators follow best practices and perform a nodetool decommission for live nodes or nodetool removenode for offline nodes, sometimes lingering cosmetic issues may lead the operator to want to keep the gossip state consistent.

After at least a week of inconsistent gossip state, nodetool assassinate or the unsafeAssassinateEndpoint command may be used to remove deprecated nodes from the gossip state.

When a single assassination attempt does not work across an entire cluster, sometimes the assassination attempt needs to be attempted multiple times on all node within the cluster simultaneously. Doing so will ensure that each node modifies its own gossip state to accurately reflect the deprecated node’s absence within the gossip state as well as ensuring no node will further broadcast rumors of a false gossip state.

Incremental Repair Improvements in Cassandra 4

In our previous post, “Should you use incremental repair?”, we recommended to use subrange full repairs instead of incremental repair as CASSANDRA-9143 could generate some severe instabilities on a running cluster. As the 4.0 release approaches, let’s see how incremental repair was modified for the next major version of Apache Cassandra in order to become reliable in production.

Incremental Repair in Pre-4.0 Clusters

Since Apache Cassandra 2.1, incremental repair was performed as follows:

  • The repair coordinator will ask all replicas to build Merkle trees only using SSTables with a RepairedAt value of 0 (meaning they haven’t been part of a repair yet).
    Merkle trees are hash trees of the data they represent, they don’t store the original data.
  • Mismatching leaves of the Merkle trees will get streamed between replicas
  • When all streaming is done, anticompaction is performed on all SSTables that were part of the repair session

But during the whole process, SSTables could still get compacted away as part of the standard automatic compactions. If that happened, the SSTable would not get anticompacted and all the data it contains would not be marked as repaired. In the below diagram, SSTable 1 is compacted with 3, 4 and 5, creating SSTable 6 during the streaming phase. This happens before anticompaction is able to split apart the repaired and unrepaired data:

SSTable 1 gets compacted away before anticompaction could kick in
SSTable 1 gets compacted away before anticompaction could kick in.

If this happens on a single node, the next incremental repair run would find differences as the previously repaired data would be skipped on all replicas but one, which would lead potentially to a lot of overstreaming. This happens because Merkle trees only contain hashes of data, and in Cassandra, the height of the tree is bounded to prevent over allocation of memory. The more data we use to build our tree, the larger the tree would be. Limiting the height of the tree means the hashes in the leaves are responsible for bigger ranges of data.

Already repaired data in SSTable 6 will be part of the Merkle tree computation
Already repaired data in SSTable 6 will be part of the Merkle tree computation.

If you wonder what troubles can be generated by this bug, I invite you to read my previous blog post on this topic.

Incremental repair in 4.0, the theory

The incremental repair process is now supervised by a transaction to guarantee its consistency. In the “Prepare phase”, anticompaction is performed before the Merkle trees are computed, and the candidate SSTables will be marked as pending a specific repair. note that they are not marked as repaired just yet to avoid inconsistencies in case the repair session fails.

If a candidate SSTable is currently part of a running compaction, Cassandra will try to cancel that compaction and wait up to a minute. If the compaction successfully stops within that time, the SSTable will be locked for future anticompaction, otherwise the whole prepare phase and the repair session will fail.

Incremental repair in 4.0

SSTables marked as pending repair are only eligible to be compacted with other tables marked as pending.
SSTables in the pending repair pool are the only ones participating in both Merkle tree computations and streaming operations :

Incremental repair in 4.0

During repair, the pool of unrepaired SSTables receives newly flushed ones and compaction takes place as usual within it. SSTables that are being streamed in are part of the “pending repair” pool. This prevents two potential problems: If the streamed SSTables were put in the unrepaired pool, it could get compacted away as part of normal compaction tasks and would never be marked as repaired If the streamed SSTables were put in the repaired pool and the repair session failed, we would have data that is marked as repaired on some nodes and not others, which would generate overstreaming during the next repair

Once the repair succeeds, the coordinator sends a request to all replicas to mark the SSTables in pending state as repaired, by setting the RepairedAt timestamp (since anticompaction already took place, Cassandra just needs to set this timestamp).

Incremental repair in 4.0

If some nodes failed during the repair, the “pending repair” SSTables will be released and eligible for compaction (and repair) again. They will not be marked as repaired :

Incremental repair in 4.0

The practice

Let’s see how all of this process takes place by running a repair and observing the behavior of Cassandra.

To that end, I created a 5 node CCM cluster running locally on my laptop and used tlp-stress to load some data with a replication factor of 2 :

bin/tlp-stress run BasicTimeSeries -i 1M -p 1M -t 2 --rate 5000  --replication "{'class':'SimpleStrategy', 'replication_factor':2}"  --compaction "{'class': 'SizeTieredCompactionStrategy'}"  --host

Node was then stopped and I deleted all the SSTables from the tlp_stress.sensor_data table :

Datacenter: datacenter1
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns    Host ID                               Rack
UN  247,07 KiB  1            ?       dbccdd3e-f74a-4b7f-8cea-e8770bf995db  rack1
UN  44,08 MiB  1            ?       3ce4cca5-da75-4ede-94b7-a37e01d2c725  rack1
UN  44,07 MiB  1            ?       3b9fd30d-80c2-4fa6-b324-eaecc4f9564c  rack1
UN  43,98 MiB  1            ?       f34af1cb-4862-45e5-95cd-c36404142b9c  rack1
UN  44,05 MiB  1            ?       a5add584-2e00-4adb-8949-716b7ef35925  rack1

I ran a major compaction on all nodes to easily observe the anticompactions. On node, we then have a single SSTable on disk :

sensor_data-f4b94700ad1d11e8981cd5d05c109484 adejanovski$ ls -lrt *Data*
-rw-r--r--  1 adejanovski  staff  41110587 31 aoû 15:09 na-4-big-Data.db

The sstablemetadata tool gives us interesting information about this file :

sstablemetadata na-4-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-4-big
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Bloom Filter FP chance: 0.01
Minimum timestamp: 1535720482962762 (08/31/2018 15:01:22)
Maximum timestamp: 1535720601312716 (08/31/2018 15:03:21)
SSTable min local deletion time: 2147483647 (no tombstones)
SSTable max local deletion time: 2147483647 (no tombstones)
Compressor: org.apache.cassandra.io.compress.LZ4Compressor
Compression ratio: 0.8694195642299255
TTL min: 0
TTL max: 0
First token: -9223352583900436183 (001.0.1824322)
Last token: 9223317557999414559 (001.1.2601952)
minClusteringValues: [3ca8ce0d-ad1e-11e8-80a6-91cbb8e39b05]
maxClusteringValues: [f61aabc1-ad1d-11e8-80a6-91cbb8e39b05]
Estimated droppable tombstones: 0.0
SSTable Level: 0
Repaired at: 0
Pending repair: --
Replay positions covered: {CommitLogPosition(segmentId=1535719935055, position=7307)=CommitLogPosition(segmentId=1535719935056, position=20131708)}
totalColumnsSet: 231168
totalRows: 231168
Estimated tombstone drop times: 
   Drop Time | Count  (%)  Histogram 
   50th      0 
   75th      0 
   95th      0 
   98th      0 
   99th      0 
   Min       0 
   Max       0 
Partition Size: 
   Size (bytes) | Count  (%)  Histogram 
   179 (179 B)  | 56330 ( 24) OOOOOOOOOOOOOOOOOOo
   215 (215 B)  | 78726 ( 34) OOOOOOOOOOOOOOOOOOOOOOOOOO.
   310 (310 B)  |   158 (  0) 
   372 (372 B)  |  1166 (  0) .
   446 (446 B)  |  1691 (  0) .
   535 (535 B)  |   225 (  0) 
   642 (642 B)  |    23 (  0) 
   770 (770 B)  |     1 (  0) 
   50th      215 (215 B)
   75th      258 (258 B)
   95th      258 (258 B)
   98th      258 (258 B)
   99th      372 (372 B)
   Min       150 (150 B)
   Max       770 (770 B)
Column Count: 
   Columns | Count   (%)  Histogram 
   2       |   3230 (  1) .
   3       |     34 (  0) 
   50th      1
   75th      1
   95th      1
   98th      1
   99th      2
   Min       0
   Max       3
Estimated cardinality: 222877
EncodingStats minTTL: 0
EncodingStats minLocalDeletionTime: 1442880000 (09/22/2015 02:00:00)
EncodingStats minTimestamp: 1535720482962762 (08/31/2018 15:01:22)
KeyType: org.apache.cassandra.db.marshal.UTF8Type
ClusteringTypes: [org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimeUUIDType)]
RegularColumns: data:org.apache.cassandra.db.marshal.UTF8Type

It is worth noting the cool improvements sstablemetadata has gone through in 4.0, especially regarding the histograms rendering. So far, and as expected, our SSTable is not repaired and it is not pending a running repair.

Once the repair starts, the coordinator node executes the Prepare phase and anticompaction is performed :

sensor_data-f4b94700ad1d11e8981cd5d05c109484 adejanovski$ ls -lrt *Data*
-rw-r--r--  1 adejanovski  staff  20939890 31 aoû 15:41 na-6-big-Data.db
-rw-r--r--  1 adejanovski  staff  20863325 31 aoû 15:41 na-7-big-Data.db

SSTable na-6-big is marked as pending our repair :

sstablemetadata na-6-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-6-big
Repaired at: 0
Pending repair: 8e584410-ad23-11e8-ba2c-0feeb881768f
Replay positions covered: {CommitLogPosition(segmentId=1535719935055, position=7307)=CommitLogPosition(segmentId=1535719935056, position=21103491)}

na-7-big remains in the “unrepaired pool” (it contains tokens that are not being repaired in this session) :

sstablemetadata na-7-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-7-big
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Bloom Filter FP chance: 0.01
Repaired at: 0
Pending repair: --

Once repair finishes, another look at sstablemetadata on na-6-big shows us that it is now marked as repaired :

sstablemetadata na-6-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-6-big
Estimated droppable tombstones: 0.0
SSTable Level: 0
Repaired at: 1535722885852 (08/31/2018 15:41:25)
Pending repair: --

Again, I really appreciate not having to compute the repair date by myself thanks to an sstablemetadata output that is a lot more readable than it was before.

Reliable incremental repair

While Apache Cassandra 4.0 is being stabilized and there are still a few bugs to hunt down, incremental repair finally received the fix it deserved to make it production ready for all situations. The transaction that encloses the whole operation will shield Cassandra from inconsistencies and overstreaming, making cyclic repairs a fast and safe operation. Orchestration is still needed though as SSTables cannot be part of 2 distinct repair sessions that would run at the same time, and it is advised to use a topology aware tool to perform the operation without hurdles.
It it worth noting that full repair in 4.0 doesn’t involve anticompaction anymore and does not mark SSTables as repaired. This will bring full repair back to its 2.1 behavior and allow to run it on several nodes at the same time without fearing conflicts between validation compactions and anticompactions.

So you have a broken Cassandra SSTable file?

Every few months I have a customer come to me with the following concern: my compactions for one of my Cassandra tables are stuck or my repairs fail when referencing one of the nodes in my Cassandra cluster. I take a look or just ask a couple of questions and it becomes apparent that the problem is a broken SSTable file. Occasionally, they will come to me in a panic and tell me that they have looked at their logs and discovered they have a broken SSTable file.

Don’t panic. A broken SSTable file is not a crisis.

A broken SSTable file does not represent lost data or an unusable database. Well, that’s true unless you are using a Replication Factor (RF) of ONE. The cluster is still going to operate, and queries should be working just fine. But… it does need to be fixed. There are four ways to fix the problem which I will explain in this post, one of which I freely admit is not one of the community’s recommended ways, but is often the simplest and quickest with minimal downside risk.

Before I begin to explain the ways to repair an SSTable, I will spend a few lines to explain what an SSTable file is, then I will walk you through the four options from easiest and safest to the most difficult and risky.

An SSTable file is not a file. It’s a set of eight files. One of those eight contains the actual data. The others contain metadata used by Cassandra to find specific partitions and rows in the data file. Here is a sample list of the files:

mc-131-big-CRC.db Checksums of chunks of the data file.
mc-131-big-Data.db The data file that contains all of rows and columns.
mc-131-big-Digest.crc32 Single checksum of the data file.
mc-131-big-Filter.db Bloom filter containing partial checksums of all partition and cluster keys.
mc-131-big-Index.db A single level index of the partitions and cluster keys in the data file.
mc-131-big-Statistics.db Bunch of metadata that Cassandra keeps about this file including information about the columns, tombstones etc.
mc-131-big-Summary.db An index into the index file. Making this a second level index.
mc-131-big-TOC.txt This list of file names. No idea why it exists.

The “mc” is the SSTable file version. This changes whenever a new release of Cassandra changes anything in the way data is stored in any of the files listed in the table above.

The number 131 is the sequence number of the SSTable file. It increases for each new SSTable file written through memtable flush, compaction, or streaming from another node.

The word “big” was added to Cassandra SSTable files starting in Cassandra 2.2. I have no idea what its purpose is.

The rest of the file name parts are explained in the chart above.

When you get the dreaded error that an SSTable file is broken, it almost always is because an internal consistency check such as “column too long” or “one of the checksums has failed to validate”. This has relatively little effect on normal reads against the table except for the request where the failure took place. It has a serious effect on compactions and repairs, stopping them in their tracks.

Having repairs fail can result in long-term consistency issues between nodes and eventually the application returning incorrect results. Having compactions fail will degrade read performance in the short term and cause storage space problems in the long term.

So… what are the four options?

  1. Nodetool scrub command – Performed online with little difficulty. It usually has a low success rate in my own personal experience.
  2. Offline sstablescrub – Must be performed offline. The tool is in /usr/bin with a package install. Otherwise its in $CASSANDRAHOME/bin. Its effectiveness rate is significantly better than the Nodetool scrub, but it requires the node to be down to work. And it takes forever…
  3. rm -f – Performed offline. it must also be followed immediately with a Nodetool repair when you bring the node back up. This is the method I have successfully used most often but it also has some consistency risks while the repairs complete.
  4. Bootstrap the node – This is kind of like number 3 but it has less theoretical impact on consistency.

Let us get into the details

It starts out like this. You are running a Nodetool repair and you get an error:
$ nodetool repair -full

[2018-08-09 17:00:51,663] Starting repair command #2 (4c820390-9c17-11e8-8e8f-fbc0ff4d2cb8), repairing keyspace keyspace1 with repair options (parallelism: parallel, primary range: false, incremental: false, job threads: 1, ColumnFamilies: [], dataCenters: [], hosts: [], # of ranges: 768, pull repair: false)

error: Repair job has failed with the error message: [2018-08-15 09:59:41,659] Some repair failed

— StackTrace —

java.lang.RuntimeException: Repair job has failed with the error message: [2018-08-15 09:59:41,659] Some repair failed

You see the error. But it doesn’t tell you a whole lot. Just that the repair failed. Next step look at the Cassandra system.log file you want to see the errors:
$ grep -n -A10 ERROR /var/log/cassandra/system.log

ERROR [RepairJobTask:8] 2018-08-08 15:15:57,726 RepairRunnable.java:277 – Repair session 2c5f89e0-9b39-11e8-b5ee-bb8feee1767a for range [(-1377105920845202291,-1371711029446682941], (-8865445607623519086,-885162575564883…. 425683885]]] Sync failed between / and /

/var/log/cassandra/debug.log:ERROR [RepairJobTask:4] 2018-08-09 16:16:50,722 RepairSession.java:281 – [repair #25682740-9c11-11e8-8e8f-fbc0ff4d2cb8] Session completed with the following error

/var/log/cassandra/debug.log:ERROR [RepairJobTask:4] 2018-08-09 16:16:50,726 RepairRunnable.java:277 – Repair session 25682740-9c11-11e8-8e8f-fbc0ff4d2cb8…… 7115161941975432804,7115472305105341673], (5979423340500726528,5980417142425683885]]] Validation failed in /

/var/log/cassandra/system.log:ERROR [ValidationExecutor:2] 2018-08-09 16:16:50,707 Validator.java:268 – Failed creating a merkle tree for [repair #25682740-9c11-11e8-8e8f-fbc0ff4d2cb8 on keyspace1/standard1,

The first error message Sync Failed is misleading although sometimes it can be a clue. Looking further, you see Validation failed in / This tells us that the error occurred on which just happens to be the node we are on. Finally, we get the message showing the keyspace and table the error occurred on. Depending on the message, you might see the table file number mentioned. In this case it was not mentioned.

Looking in the directory tree we see that we have the following SSTable files:

4,417,919,455 mc-30-big-Data.db
8,831,253,280 mc-45-big-Data.db
374,007,490 mc-49-big-Data.db
342,529,995 mc-55-big-Data.db
204,178,145 mc-57-big-Data.db
83,234,470 mc-59-big-Data.db
3,223,224,985 mc-61-big-Data.db
24,552,560 mc-62-big-Data.db
2,257,479,515 mc-63-big-Data.db
2,697,986,445 mc-66-big-Data.db
5,285 mc-67-big-Data.db

At this point we have our repair options. I’ll take them one at a time.

Online SSTable repair – Nodetool scrub

This command is easy to perform. It is also the option least likely to succeed.


  1. Find out which SSTable is broken.
  2. Run nodetool scrub keyspace tablename.
  3. Run nodetool repair.
  4. Run nodetool listsnapshots.
  5. Run nodetool clearsnapshot keyspacename -t snapshot name.

We did the whole “find out what table is broken” thing just above, so we aren’t going to do it again. We will start with step 2.

Scrub will take a snapshot and rebuild your table files. The one(s) that are corrupt will disappear. You will lose at least a few rows and possibly all the rows from the corrupted SSTable files. Hence the need to do a repair.

$ nodetool scrub keyspace1 standard1

After the scrub, we have fewer SStable files and their names have all changed. There is also less space consumed and very likely some rows missing.

2,257,479,515 mc-68-big-Data.db
342,529,995 mc-70-big-Data.db
3,223,224,985 mc-71-big-Data.db
83,234,470 mc-72-big-Data.db
4,417,919,455 mc-73-big-Data.db
204,178,145 mc-75-big-Data.db
374,007,490 mc-76-big-Data.db
2,697,986,445 mc-77-big-Data.db
1,194,479,930 mc-80-big-Data.db

So we do a repair.

$ nodetool repair -full

[2018-08-09 17:00:51,663] Starting repair command #2 (4c820390-9c17-11e8-8e8f-fbc0ff4d2cb8), repairing keyspace keyspace1 with repair options (parallelism: parallel, primary range: false, incremental: false, job threads: 1, ColumnFamilies: [], dataCenters: [], hosts: [], # of ranges: 768, pull repair: false) [2018-08-09 18:14:09,799] Repair session 4cadf590-9c17-11e8-8e8f-fbc0ff4d2cb8 for range [(-1377105920845202291,… [2018-08-09 18:14:10,130] Repair completed successfully [2018-08-09 18:14:10,131] Repair command #2 finished in 1 hour 13 minutes 18 seconds

After the repair, we have almost twice as many SSTable files with data pulled in from other nodes to replace the corrupted data lost by the scrub process.

2,257,479,515 mc-68-big-Data.db
342,529,995 mc-70-big-Data.db
3,223,224,985 mc-71-big-Data.db
83,234,470 mc-72-big-Data.db
4,417,919,455 mc-73-big-Data.db
204,178,145 mc-75-big-Data.db
374,007,490 mc-76-big-Data.db
2,697,986,445 mc-77-big-Data.db
1,194,479,930 mc-80-big-Data.db
1,209,518,945 mc-88-big-Data.db
193,896,835 mc-89-big-Data.db
170,061,285 mc-91-big-Data.db
63,427,680 mc-93-big-Data.db
733,830,580 mc-95-big-Data.db
1,747,015,110 mc-96-big-Data.db
16,715,886,480 mc-98-big-Data.db
49,167,805 mc-99-big-Data.db

Once the scrub and repair are completed, you are almost done.

One of the side effects of the scrub is a snapshot called pre-scrub-<timestamp>. If you don’t want to run out of diskspace, you are going to want to remove it, preferably with the nodetool.

$ nodetool listsnapshots

Snapshot Details:

Snapshot name Keyspace name Column family name True size Size on disk

pre-scrub-1533897462847 keyspace1 standard1 35.93 GiB 35.93 GiB

$ nodetool clearsnapshot -t pre-scrub-1533897462847

Requested clearing snapshot(s) for [all keyspaces] with snapshot name [pre-scrub-1533897462847]

If the repair still fails to complete, we get to try one of the other methods.

Offline SSTable repair utility – sstablescrub

This option is a bit more complex to do but it often will work when the online version won’t work. Warning: it is very slow.


  1. Bring the node down.
  2. Run the sstablescrub command.
  3. Start the node back up.
  4. Run nodetool repair on the table.
  5. Run nodetool clearsnapshot to remove the pre-scrub snapshot.

If the node is not already down, bring it down. I usually do the following commands:

$ nodetool drain

$ pkill java

$ ps -ef |grep cassandra

root 18271 14813 0 20:39 pts/1 00:00:00 grep –color=auto cassandra

Then issue the sstablescrub command with the -n option unless you have the patience of a saint. Without the -n option, every column in every row in every SSTable file will be validated. Single threaded. It will take forever. In preparing for this blog post, I forgot to use the -n and found that it took 12 hours to scrub 500 megabytes of a 30 GB table. Not willing to wait 30 days for the scrub to complete, I stopped it and switched to the -n option completing the scrub in only… hang on for this, 6 days. So, um, maybe this isn’t going to be useful in most real-world situations unless you have really small tables.

$ Sstablescrub -n keyspace1 standard1

Pre-scrub sstables snapshotted into snapshot pre-scrub-1533861799166

Scrubbing BigTableReader(path=’/home/cassandra/data/keyspace1/standard1-afd416808c7311e8a0c96796602809bc/mc-88-big-Data.db’) (1.126GiB)…

Unfortunately, this took more time than I wanted to take for this blog post. Once you have the table scrubbed, you restart Cassandra and delete.

Delete the file and do a Nodetool repair – rm

This option works every time. It is no more difficult to do than the offline sstablescrub command and its success rate is 100%. It’s usually much faster than the offline sstablescrub option. In my prep for the blog post, this approach took only two hours for my 30 GB table. The only drawback I can see is that for the time it takes to do the repair on the table after the delete is performed, there is an increased risk of consistency problems esp if you are using CF=1 which should be a fairly uncommon use case.


  1. Stop the node.
  2. cd to the offending keyspace and sstable directory.
  3. If you know which sstable file is bad (if you learned about the problem from stalled compactions, you will know) just delete it. If not, delete all files in the directory.
  4. Restart the node.
  5. Nodetool repair.

$ nodetool drain

$ pkill java

$ ps -ef |grep cassandra

root 18271 14813 0 20:39 pts/1 00:00:00 grep –color=auto cassandra

$ cd /var/lib/cassandra/data/keyspace1/standard1-afd416808c7311e8a0c96796602809bc/

$ pwd


If you know the SSTable file you want to delete, you can delete just that one with rm -f *nnn*. If not, as in this case, you do them all.

$ sudo rm -f *

rm: cannot remove ‘backups’: Is a directory

rm: cannot remove ‘snapshots’: Is a directory

$ ls

backups snapshots

$systemctl start cassandra

$ nodetool status

Datacenter: datacenter1



|/ State=Normal/Leaving/Joining/Moving

— Address Load Tokens Owns (effective) Host ID Rack

UN 1.35 MiB 256 100.0% c92d9374-cf3a-47f6-9bd1-81b827da0c1e rack1

UN 41.72 GiB 256 100.0% 3c9e61ae-8741-4a74-9e89-cfa47768ac60 rack1

UN 30.87 GiB 256 100.0% c36fecad-0f55-4945-a741-568f28a3cd8b rack1

$ nodetool repair keyspace1 standard1 -full

[2018-08-10 11:23:22,454] Starting repair command #1 (51713c00-9cb1-11e8-ba61-01c8f56621df), repairing keyspace keyspace1 with repair options (parallelism: parallel, primary range: false, incremental: false, job threads: 1, ColumnFamilies: [standard1], dataCenters: [], hosts: [], # of ranges: 768, pull repair: false) [2018-08-10 13:02:36,097] Repair completed successfully [2018-08-10 13:02:36,098] Repair command #1 finished in 1 hour 39 minutes 13 seconds

The SSTable file list now looks like this:

229,648,710 mc-10-big-Data.db
103,421,070 mc-11-big-Data.db
1,216,169,275 mc-12-big-Data.db
76,738,970 mc-13-big-Data.db
773,774,270 mc-14-big-Data.db
17,035,624,448 mc-15-big-Data.db
83,365,660 mc-16-big-Data.db
170,061,285 mc-17-big-Data.db
758,998,865 mc-18-big-Data.db
2,683,075,900 mc-19-big-Data.db
749,573,440 mc-1-big-Data.db
91,184,160 mc-20-big-Data.db
303,380,050 mc-21-big-Data.db
3,639,126,510 mc-22-big-Data.db
231,929,395 mc-23-big-Data.db
1,469,272,390 mc-24-big-Data.db
204,485,420 mc-25-big-Data.db
345,655,100 mc-26-big-Data.db
805,017,870 mc-27-big-Data.db
50,714,125 mc-28-big-Data.db
11,578,088,555 mc-2-big-Data.db
170,033,435 mc-3-big-Data.db
1,677,053,450 mc-4-big-Data.db
62,245,405 mc-5-big-Data.db
8,426,967,700 mc-6-big-Data.db
1,979,214,745 mc-7-big-Data.db
2,910,586,420 mc-8-big-Data.db
14,097,936,920 mc-9-big-Data.db

Bootstrap the node

If you are using consistency factor (CF) ONE on reads, or you are really concerned about consistency overall, use this approach instead of the rm -f approach. It will insure that the node with missing data will not participate in any reads until all data is restored. Depending on how much data the node has to recover, it will often take longer than any of the other approaches. Although since bootstrapping can operate in parallel, it may not.


  1. Shut down the node.
  2. Remove all of the files under the $CASSANDRA_HOME. Usually /var/lib/Cassandra.
  3. Modify /etc/cassandra/conf/cassandra-env.sh.
  4. Start Cassandra. – When the server starts with no files, it will connect to one of its seeds, recreate the schema and request all nodes to stream data to it to replace the data it has lost. It will not re-select new token ranges unless you try to restart it with a different IP than it had before.
  5. Modify the /ect/cassandra/conf/cassandra-env.sh file to remove the change in Step 3.

$ nodetool drain

$ sudo pkill java

$ ps -ef |grep java

$ vi /etc/cassandra/conf/cassandra-env.sh

Add this line at the end of the file:

JVM_OPTS=”$JVM_OPTS -Dcassandra.replace_address=″

$ systemctl start cassandra

Wait for the node to join the cluster

During the bootstrap we see messages like this in the log:

INFO [main] 2018-08-10 13:39:06,780 StreamResultFuture.java:90 - [Stream #47b382f0-9cc4-11e8-a010-51948a7598a1] Executing streaming plan for Bootstrap

INFO [StreamConnectionEstablisher:1] 2018-08-10 13:39:06,784 StreamSession.java:266 – [Stream #47b382f0-9cc4-11e8-a010-51948a7598a1] Starting streaming to / >/code>

Later on we see:

INFO [main] 2018-08-10 14:18:16,133 StorageService.java:1449 - JOINING: Finish joining ring

INFO [main] 2018-08-10 14:18:16,482 SecondaryIndexManager.java:509 – Executing pre-join post-bootstrap tasks for: CFS(Keyspace=’keyspace1′, ColumnFamily=’standard1′)

INFO [main] 2018-08-10 14:18:16,484 SecondaryIndexManager.java:509 – Executing pre-join post-bootstrap tasks for: CFS(Keyspace=’keyspace1′, ColumnFamily=’counter1′)

INFO [main] 2018-08-10 14:18:16,897 StorageService.java:2292 – Node / state jump to NORMAL

WARN [main] 2018-08-10 14:18:16,899 StorageService.java:2324 – Not updating token metadata for / because I am replacing it

When we do a nodetool status we see:

$ nodetool status

Datacenter: datacenter1



|/ State=Normal/Leaving/Joining/Moving

— Address Load Tokens Owns (effective) Host ID Rack

UN 30.87 GiB 256 100.0% c92d9374-cf3a-47f6-9bd1-81b827da0c1e rack1

UN 41.72 GiB 256 100.0% 3c9e61ae-8741-4a74-9e89-cfa47768ac60 rack1

UN 30.87 GiB 256 100.0% c36fecad-0f55-4945-a741-568f28a3cd8b rack1

The node is up and running in less than one hour. Quicker than any of the options. Makes you think about your choices, doesn’t it?

If you have a keyspace with RF=1 then options 3 and 4 are not viable. You will lose data. Although with RF=1 and a corrupted SSTable file you are going to lose some data anyway.

A last view at the list of SSTable files shows you this:

773,774,270 mc-10-big-Data.db
17,148,617,040 mc-11-big-Data.db
749,573,440 mc-1-big-Data.db
170,033,435 mc-2-big-Data.db
1,677,053,450 mc-3-big-Data.db
62,245,405 mc-4-big-Data.db
8,426,967,700 mc-5-big-Data.db
229,648,710 mc-6-big-Data.db
103,421,070 mc-7-big-Data.db
1,216,169,275 mc-8-big-Data.db
76,738,970 mc-9-big-Data.db


If you run into corrupted SSTable files, don’t panic. It won’t have any impact on your operations in the short term unless you are using RF=ONE or CF=ONE.

Find out which node has the broken SSTable file.

Then, because its easiest and low risk, try the online nodetool scrub command.

If that does not work, then you have three choices. Offline Scrub works but is usually too slow to be useful. Rebuilding the whole node seems to be overkill but it will work, and it will maintain consistency on reads. If you have a lot of data and you want to solve the problem fairly quickly, just remove the offending SSTable file and do a repair.

All approaches have an impact on the other nodes in the cluster.

The first three require a repair which computes merkle trees and streams data to the node being fixed. The amount to be streamed is most with the delete but the total time for the recovery was less in my example. That may not always be the case. In the bootstrap example, the total time was very similar to the delete case because my test case had only one large table. If there were several large tables, the delete approach would have been the fastest to get the node back to normal.

Approach Scrub phase Repair phase Total Recovery time
Online Scrub 1:06 1:36 2:42
*Offline Scrub 144:35 1:37 146:22
Delete files 0:05 1:36 1:41
Bootstrap 0:05 1:45 1:50

All sample commands show the user in normal Linux user mode. That is because in my test environment the Cassandra cluster belonged to my user id. Most production Cassandra clusters run as the Cassandra Linux user. In that case, some amount of user id switching or sudo operations would be required to do the work.

The offline scrub time was estimated. I did not want to wait for six days to see if it was really going to take that long.

All sample output provided here was from a three-node cluster running Cassandra 3.11.2 running on Fedora 28 using a vanilla Cassandra install with pretty much everything in cassandra.env defaulted.

I corrupted the SSTable file using this command:

$ printf '\x31\xc0\xc3' | dd of=mc-8-big-Data.db bs=1 seek=0 count=100 conv=notrunc

The Fine Print When Using Multiple Data Directories

One of the longest lived features in Cassandra is the ability to allow a node to store data on more than one than one directory or disk. This feature can help increase cluster capacity or prevent a node from running out space if bootstrapping a new one will take too long to complete. Recently I was working on a cluster and saw how this feature has the potential to silently cause problems in a cluster. In this post we will go through some fine print when configuring Cassandra to use multiple disks.

Jay… what?

The feature which allows Cassandra to store data on multiple disks is commonly referred to as JBOD [pronounced jay-bod] which stands for “Just a Bunch Of Disks/Drives”. In Cassandra this feature is controlled by the data_file_directories setting in the cassandra.yaml file. In relation to this setting, Cassandra also allows its behaviour on disk failure to be controlled using the disk_failure_policy setting. For now I will leave the details of the setting alone, so we can focus exclusively on the data_file_directories setting.

Simple drives, simple pleasures

The data_file_directories feature is fairly straight forward in that it allows Cassandra to use multiple directories to store data. To use it just specify the list of directories you want Cassandra to use for data storage. For example.

    - /var/lib/cassandra/data
    - /var/lib/cassandra/data-extra

The feature has been around from day one of Cassandra’s life and the way in which Cassandra uses multiple directories has mostly stayed the same. There are no special restrictions to the directories, they can be on the same volume/disk or a different volume/disk. As far as Cassandra is concerned, the paths specified in the setting are just the directories it has available to read and write data.

At a high level, the way the feature works is Cassandra tries to evenly split data into each of the directories specified in the data_file_directories setting. No two directories will ever have an identical SSTable file name in them. Below is an example of what you could expect to see if you inspected each data directory when using this feature. In this example the node is configured to use two directories: …/data0/ and …/data1/

$ ls .../data0/music/playlists-3b90f8a0a50b11e881a5ad31ff0de720/
backups                      mc-5-big-Digest.crc32  mc-5-big-Statistics.db
mc-5-big-CompressionInfo.db  mc-5-big-Filter.db     mc-5-big-Summary.db
mc-5-big-Data.db             mc-5-big-Index.db      mc-5-big-TOC.txt

$ ls .../data1/music/playlists-3b90f8a0a50b11e881a5ad31ff0de720/
backups                      mc-6-big-Digest.crc32  mc-6-big-Statistics.db
mc-6-big-CompressionInfo.db  mc-6-big-Filter.db     mc-6-big-Summary.db
mc-6-big-Data.db             mc-6-big-Index.db      mc-6-big-TOC.txt

Data resurrection

One notable change which modified how Cassandra uses the data_file_directories setting was CASSANDRA-6696. The change was implemented in Cassandra version 3.2. To explain this problem and how it was fixed, consider the case where a node has two data directories A and B. Prior to this change in Cassandra, you could have a node that had data for a specific token in one SSTable that was on disk A. The node could also have a tombstone associated with that token in another SSTable on disk B. If the gc_grace_seconds passed, and no compactions were processed to reclaim the data tombstone there would be an issue if disk B failed. In this case if disk B did fail, the tombstone is lost and the data on disk A is still present! Running a repair in this case would resurrect the data by propagating it to other replicas! To fix this issue, CASSANDRA-6696 changed Cassandra so that a token range was always stored on a single disk.

This change did make Cassandra more robust when using the data_file_directories setting, however this change was no silver bullet and caution still needs to be taken when it is used. Most notably, consider the case where each data directory is mounted to a dedicated disk and the cluster schema results in wide partitions. In this scenario one of the disks could easily reach its maximum capacity due to the wide partitions while the other disk still has plenty of storage capacity.

How to lose a volume and influence a node

For a node running Cassandra version less than 3.2 and using the data_file_directories setting there are a number vulnerabilities to watch out for. If each data directory is mounted to a dedicated disk, and one of the disk dies or the mount disappears then this can silently cause problems. To explain this problem, consider the case where we installed and the data is located in /var/lib/cassandra/data. Say we want to add another directory to store data in only this time the data will be on another volume. It makes sense to have the data directories in the same location, so we create the directory /var/lib/cassandra/data-extra. We then mount our volume so that /var/lib/cassandra/data-extra points to it. If the disk backing /var/lib/cassandra/data-extra died or we forgot to put the mount information in fstab and lose the mount on a restart, then we will effectively lose system table data. Cassandra will start because the directory /var/lib/cassandra/data-extra exists however it will be empty.

Similarly, I have seen cases where a directory was manually added to a node that was managed by chef. In this case the node was running out disk space and there was no time to wait for new node to bootstrap. To avoid a node going down an additional volume was attached, mounted, and the data_file_directories setting in the cassandra.yaml modified to include the new data directory. Some time later chef was executed on the node to deploy an update, and as a result it reset cassandra.yaml configuration. Resetting the cassandra.yaml cleared the additional data directory that was listed under data_file_directories setting. When the node was restarted, the Cassandra process never knew that there was another data directory it had to read from.

Either of these cases can lead to more problems in the cluster. Remember how earlier I mentioned that a complete SSTable file will be always stored in a single data directory when using the data_file_directories setting? This behaviour applies to all data stored by Cassandra including its system data! So that means, in the above two scenarios Cassandra could potentially lose system table data. This is a problem because the system data table stores information about what data the node owns, what the schema is, and whether the node has bootstrapped. If the system table is lost and the node restarted the node will think it is a new node, take on a new identity and new token ranges. This results in a token range movement in the cluster. We have covered this topic in more detail in our auto bootstrapping blog post. This problem gets worse when a seed node loses its system table and comes back as a new node. This is because seed nodes never stream data and if a cleanup is run cluster wide, data is then lost.

Testing the theory

We can test these above scenarios for different versions of Cassandra using ccm. I created the following script to setup a three node cluster in ccm with each node configured to be a seed node and use two data directories. We use seed nodes to show the worst case scenario that can occur when a node using multiple data directories loses one of the directories.


# This script creates a three node CCM cluster to demo the data_file_directories
# feature in different versions of Cassandra.

set -e


echo "Cluster Name: ${CLUSTER_NAME}"
echo "Cluster Version: ${CLUSTER_VERSION}"
echo "Number nodes: ${NUMBER_NODES}"


# Modifies the configuration of a node in the CCM cluster.
function update_node_config {
                          num_tokens:32 \
                          endpoint_snitch:GossipingPropertyFileSnitch \

  for key_value_setting in ${CASSANDRA_YAML_SETTINGS}
    setting_key=$(echo ${key_value_setting} | cut -d':' -f1)
    setting_val=$(echo ${key_value_setting} | cut -d':' -f2)
    sed -ie "s/${setting_key}\:\ .*/${setting_key}:\ ${setting_val}/g" \

  # Create and configure the additional data directory

  sed -ie '/data_file_directories:/a\'$'\n'"- ${extra_data_dir}
    " ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra.yaml

  mkdir ${extra_data_dir}

  sed -ie "s/dc=.*/dc=datacenter1/g" \
  sed -ie "s/rack=.*/rack=rack${1}/g" \

  # Tune Cassandra memory usage so we can run multiple nodes on the one machine
  sed -ie 's/\#MAX_HEAP_SIZE=\"4G\"/MAX_HEAP_SIZE=\"500M\"/g' \
  sed -ie 's/\#HEAP_NEWSIZE=\"800M\"/HEAP_NEWSIZE=\"120M\"/g' \

  # Allow remote access to JMX without authentication. This is for
  # demo purposes only - Never do this in production
  sed -ie 's/LOCAL_JMX=yes/LOCAL_JMX=no/g' \
  sed -ie 's/com\.sun\.management\.jmxremote\.authenticate=true/com.sun.management.jmxremote.authenticate=false/g' \

for node_num in $(seq ${NUMBER_NODES})
  echo "Adding 'node${node_num}'"
  ccm add node${node_num} \
    -i 127.0.0.${node_num} \
    -j 7${node_num}00 \
    -r 0 \

  update_node_config ${node_num}

  # Set localhost aliases - Mac only
  echo "ifconfig lo0 alias 127.0.0.${node_num} up"
  sudo ifconfig lo0 alias 127.0.0.${node_num} up

sed -ie 's/use_vnodes\:\ false/use_vnodes:\ true/g' \

I first tested Cassandra version 2.1.20 using the following process.

Run the script and check the nodes were created.

$ ccm status
Cluster: 'mutli-dir-test'
node1: DOWN (Not initialized)
node3: DOWN (Not initialized)
node2: DOWN (Not initialized)

Start the cluster.

$ for i in $(seq 1 3); do echo "Starting node${i}"; ccm node${i} start; sleep 10; done
Starting node1
Starting node2
Starting node3

Check the cluster is up and note the Host IDs.

$  ccm node1 nodetool status

Datacenter: datacenter1
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  47.3 KB    32      73.5%             4682088e-4a3c-4fbc-8874-054408121f0a  rack1
UN  80.35 KB   32      71.7%             b2411268-f168-485d-9abe-77874eef81ce  rack2
UN  64.33 KB   32      54.8%             8b55a1c6-f971-4e01-a34b-bb37dd55bb89  rack3

Insert some test data into the cluster.

$ ccm node1 cqlsh
Connected to TLP-578-2120 at
[cqlsh 5.0.1 | Cassandra 2.1.20 | CQL spec 3.2.1 | Native protocol v3]
Use HELP for help.
cqlsh> CREATE KEYSPACE music WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 };
cqlsh> CREATE TABLE music.playlists (
   ...  id uuid,
   ...  song_order int,
   ...  song_id uuid,
   ...  title text,
   ...  artist text,
   ...  PRIMARY KEY (id, song_id));
cqlsh> INSERT INTO music.playlists (id, song_order, song_id, artist, title)
   ...  VALUES (62c36092-82a1-3a00-93d1-46196ee77204, 1,
   ...  a3e64f8f-bd44-4f28-b8d9-6938726e34d4, 'Of Monsters and Men', 'Little Talks');
cqlsh> INSERT INTO music.playlists (id, song_order, song_id, artist, title)
   ...  VALUES (62c36092-82a1-3a00-93d1-46196ee77205, 2,
   ...  8a172618-b121-4136-bb10-f665cfc469eb, 'Birds of Tokyo', 'Plans');
cqlsh> INSERT INTO music.playlists (id, song_order, song_id, artist, title)
   ...  VALUES (62c36092-82a1-3a00-93d1-46196ee77206, 3,
   ...  2b09185b-fb5a-4734-9b56-49077de9edbf, 'Lorde', 'Royals');
cqlsh> exit

Write the data to disk by running nodetool flush on all the nodes.

$ for i in $(seq 1 3); do echo "Flushing node${i}"; ccm node${i} nodetool flush; done
Flushing node1
Flushing node2
Flushing node3

Check we can retrieve data from each node.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

 id          | song_order | song_id     | artist              | title
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

  id          | song_order | song_id     | artist              | title
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

  id          | song_order | song_id     | artist              | title
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

Look for a node that has all of the system.local SSTable files in a single directory. In this particular test, there were no SSTable files in data directory data0 of node1.

$ ls .../node1/data0/system/local-7ad54392bcdd35a684174e047860b377/
$ ls .../node1/data1/system/local-7ad54392bcdd35a684174e047860b377/
system-local-ka-5-CompressionInfo.db  system-local-ka-5-Summary.db          system-local-ka-6-Index.db
system-local-ka-5-Data.db             system-local-ka-5-TOC.txt             system-local-ka-6-Statistics.db
system-local-ka-5-Digest.sha1         system-local-ka-6-CompressionInfo.db  system-local-ka-6-Summary.db
system-local-ka-5-Filter.db           system-local-ka-6-Data.db             system-local-ka-6-TOC.txt
system-local-ka-5-Index.db            system-local-ka-6-Digest.sha1
system-local-ka-5-Statistics.db       system-local-ka-6-Filter.db

Stop node1 and simulate a disk or volume mount going missing by removing the data1 directory entry from the data_file_directories setting.

$ ccm node1 stop

Before the change the setting entry was:

- .../node1/data0
- .../node1/data1

After the change the setting entry was:

- .../node1/data0

Start node1 again and check the logs. From the logs we can see the messages where the node has generated a new Host ID and took ownership of new tokens.

WARN  [main] 2018-08-21 12:34:57,111 SystemKeyspace.java:765 - No host ID found, created c62c54bf-0b85-477d-bb06-1f5d696c7fef (Note: This should happen exactly once per node).
INFO  [main] 2018-08-21 12:34:57,241 StorageService.java:844 - This node will not auto bootstrap because it is configured to be a seed node.
INFO  [main] 2018-08-21 12:34:57,259 StorageService.java:959 - Generated random tokens. tokens are [659824738410799181, 501008491586443415, 4528158823720685640, 3784300856834360518, -5831879079690505989, 8070398544415493492, -2664141538712847743, -303308032601096386, -553368999545619698, 5062218903043253310, -8121235567420561418, 935133894667055035, -4956674896797302124, 5310003984496306717, -1155160853876320906, 3649796447443623633, 5380731976542355863, -3266423073206977005, 8935070979529248350, -4101583270850253496, -7026448307529793184, 1728717941810513773, -1920969318367938065, -8219407330606302354, -795338012034994277, -374574523137341910, 4551450772185963221, -1628731017981278455, -7164926827237876166, -5127513414993962202, -4267906379878550578, -619944134428784565]

Check the cluster status again. From the output we can see that the Host ID for node1 changed from 4682088e-4a3c-4fbc-8874-054408121f0a to c62c54bf-0b85-477d-bb06-1f5d696c7fef

$ ccm node2 nodetool status

Datacenter: datacenter1
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  89.87 KB   32      100.0%            c62c54bf-0b85-477d-bb06-1f5d696c7fef  rack1
UN  88.69 KB   32      100.0%            b2411268-f168-485d-9abe-77874eef81ce  rack2
UN  106.66 KB  32      100.0%            8b55a1c6-f971-4e01-a34b-bb37dd55bb89  rack3

Check we can retrieve data from each node again.

for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

Cassandra 2.1.20 Results

When we run the above test against a cluster using Apache Cassandra version 2.1.20 and remove the additional data directory data1 from node1, we can see that our cql statement fails when retrieving data from node1. The error produced shows that the song_order column is unknown to the node.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

<stdin>:1:InvalidRequest: code=2200 [Invalid query] message="Undefined name song_order in selection clause"

  id          | song_order | song_id     | artist              | title
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

  id          | song_order | song_id     | artist              | title
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

An interesting side note, if nodetool drain is run node1 before it is shut down then the above error never occurs. Instead the following output appears when we run our cql statement to retrieve data from the nodes. As we can see below the query that failed now returns no rows of data.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

 id | song_order | song_id | artist | title

(0 rows)

 id          | song_order | song_id     | artist              | title
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

 id          | song_order | song_id     | artist              | title
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

Cassandra 2.2.13 Results

When we run the above test against a cluster using Apache Cassandra version 2.1.20 and remove the additional data directory data1 from node1, we can see that the cql statement fails retrieving data from node1. The error produced is similar to that produced in version 2.1.20 where id column name is unknown.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

<stdin>:1:InvalidRequest: Error from server: code=2200 [Invalid query] message="Undefined name id in selection clause"

  id          | song_order | song_id     | artist              | title
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

  id          | song_order | song_id     | artist              | title
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

Unlike Cassandra version 2.1.20, node1 never generated a new Host ID or calculated new tokens. This is because it replayed the commitlog and recovered most of the writes that had gone missing.

INFO  [main] ... CommitLog.java:160 - Replaying .../node1/commitlogs/CommitLog-5-1534865605274.log, .../node1/commitlogs/CommitLog-5-1534865605275.log
WARN  [main] ... CommitLogReplayer.java:149 - Skipped 1 mutations from unknown (probably removed) CF with id 5bc52802-de25-35ed-aeab-188eecebb090
INFO  [main] ... StorageService.java:900 - Using saved tokens [-1986809544993962272, -2017257854152001541, -2774742649301489556, -5900361272205350008, -5936695922885734332, -6173514731003460783, -617557464401852062, -6189389450302492227, -6817507707445347788, -70447736800638133, -7273401985294399499, -728761291814198629, -7345403624129882802, -7886058735316403116, -8499251126507277693, -8617790371363874293, -9121351096630699623, 1551379122095324544, 1690042196927667551, 2403633816924000878, 337128813788730861, 3467690847534201577, 419697483451380975, 4497811278884749943, 4783163087653371572, 5213928983621160828, 5337698449614992094, 5502889505586834056, 6549477164138282393, 7486747913914976739, 8078241138082605830, 8729237452859546461]

Cassandra 3.0.15 Results

When we run the above test against a cluster using Apache Cassandra version 3.0.15 and remove the additional data directory data1 from node1, we can see that the cql statement returns no data from node1.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

 id | song_order | song_id | artist | title

(0 rows)

 id          | song_order | song_id     | artist              | title
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

 id          | song_order | song_id     | artist              | title
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

Cassandra 3.11.3 Results

When we run the above test against a cluster using Apache Cassandra version 3.11.3 and remove the additional data directory data1 from node1, the node fails to start and we can see the following error message in the logs.

ERROR [main] 2018-08-21 16:30:53,489 CassandraDaemon.java:708 - Exception encountered during startup
java.lang.RuntimeException: A node with address / already exists, cancelling join. Use cassandra.replace_address if you want to replace this node.
    at org.apache.cassandra.service.StorageService.checkForEndpointCollision(StorageService.java:558) ~[apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.StorageService.prepareToJoin(StorageService.java:804) ~[apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:664) ~[apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:613) ~[apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:379) [apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:602) [apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:691) [apache-cassandra-3.11.3.jar:3.11.3]

In this case, the cluster reports node1 as down and still shows its original Host ID.

$ ccm node2 nodetool status

Datacenter: datacenter1
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
DN  191.96 KiB  32      100.0%            35a3c8ff-fa20-4f10-81cd-7284caeb00bd  rack1
UN  191.82 KiB  32      100.0%            2ebe4f0b-dc8f-4f46-93cd-37c410174a49  rack2
UN  170.46 KiB  32      100.0%            0384793e-7f59-40aa-a487-97f410dded4b  rack3

After inspecting the SSTables in both data directories we can see that a new Host ID 7d910c98-f69b-41b4-988a-f432b2e54b38 has been assigned to the node even though it failed to start.

$ ./tools/bin/sstabledump .../node1/data0/system/local-7ad54392bcdd35a684174e047860b377/mc-12-big-Data.db | grep host_id
      { "name" : "host_id", "value" : "35a3c8ff-fa20-4f10-81cd-7284caeb00bd", "tstamp" : "2018-08-21T06:24:08.106Z" },

$ ./tools/bin/sstabledump .../node1/data1/system/local-7ad54392bcdd35a684174e047860b377/mc-10-big-Data.db | grep host_id
      { "name" : "host_id", "value" : "7d910c98-f69b-41b4-988a-f432b2e54b38" },

Take away messages

As we have seen from testing there are potential dangers with using multiple directories in Cassandra. By simply removing one of the data directories in the setting a node can become a brand new node and affect the rest of the cluster. The JBOD feature can be useful in emergencies where disk space is urgently needed, however its usage in this case should be temporary.

The use of multiple disks in a Cassandra node, I feel is better done at the OS or hardware layer. Systems like LVM and RAID were designed to allow multiple disks to be used together to make up a volume. Using something like LVM or RAID rather than Cassandra’s JBOD feature reduces the complexity of the Cassandra configuration and the number of moving parts on the Cassandra side that can go wrong. By using the JBOD feature in Cassandra, it subtlety increases operational complexity and reduces the nodes ability to fail fast. In most cases I feel it is more useful for a node to fail out right rather than limp on and potentially impact the cluster in a negative way.

As a final thought, I think one handy feature that could be added to Apache Cassandra to help prevent issues associated with JBOD is the ability to check if the data, commitlog, saved_caches and hints are all empty prior to bootstrapping. If they are empty, then the node proceeds as normal. If they contain data, then perhaps the node could fail to start and print an error message in the logs.

Testing Apache Cassandra 4.0

With the goal of ensuring reliability and stability in Apache Cassandra 4.0, the project’s committers have voted to freeze new features on September 1 to concentrate on testing and validation before cutting a stable beta. Towards that goal, the community is investing in methodologies that can be performed at scale to exercise edge cases in the largest Cassandra clusters. The result, we hope, is to make Apache Cassandra 4.0 the best-tested and most reliable major release right out of the gate.

In the interests of communication (and hopefully more participation), here’s a look at some of the approaches being used to test Apache Cassandra 4.0:

Replay Testing

Workload Recording, Log Replay, and Comparison

Replay testing allows for side-by-side comparison of a workload using two versions of the same database. It is a black-box technique that answers the question, “did anything change that we didn’t expect?”

Replay testing is simple in concept: record a workload, then re-issue it against two clusters – one running a stable release and the second running a candidate build. Replay testing a stateful distributed system is more challenging. For a subset of workloads, we can achieve determinism in testing by grouping writes by CQL partition and ordering them via client-supplied timestamps. This also allows us to achieve parallelism, as recorded workloads can be distributed by partition across an arbitrarily-large fleet of writers. Though linearizing updates within a partition and comparing differences does not allow for validation of all possible workloads (e.g., CAS queries), this subset is very useful.

The suite of Full Query Logging (“FQL”) tools in Apache Cassandra enable workload recording. CASSANDRA-14618 and CASSANDRA-14619 will add fqltool replay and fqltool compare, enabling log replay and comparison. Standard tools in the Apache ecosystem such as Apache Spark and Apache Mesos can also make parallelizing replay and comparison across large clusters of machines straightforward.

Fuzz Testing and Property-Based Testing

Dynamic Test Generation and Fuzzing

Fuzz testing dynamically generates input to be passed through a function for validation. We can make fuzz testing smarter in stateful systems like Apache Cassandra to assert that persisted data conforms to the database’s contracts: acknowledged writes are not lost, deleted data is not resurrected, and consistency levels are respected. Fuzz testing of storage systems to validate these properties requires maintaining a record of responses received from the system; the development of a model representing valid legal states of data within the database; and a validation pass to assert that responses reflect valid states according to that model.

Property-based testing combines fuzz testing and assertions to explore a state space using randomly-generated input. These tests provide dynamic input to the system and assert that its fundamental properties are not violated. These properties can range from generic (e.g., “I can write data and read it back”) to specific (“range tombstone bounds synthesized during short-read-protection reads are properly closed”); and from local to distributed (e.g., “replacing every single node in a cluster results in an identical database”). To simplify debugging, property-based testing libraries like QuickTheories also provide a “shrinker,” which attempts to generate the simplest possible failing case after detecting input or a sequence of actions that triggers a failure.

Unlike model checkers, property-based tests don’t exhaust the state space – but explore it until a threshold of examples is reached. This allows for the computation to be distributed across many machines to gain confidence in code and infrastructure that scales with the amount of computation applied to test it.

Distributed Tests and Fault-Injection Testing

Validating Behavior Under Fault Scenarios

All of the above techniques can be combined with fault injection testing to validate that the system maintains availability where expected in fault scenarios, that fundamental properties hold, and that reads and writes conform to the system’s contracts. By asserting series of invariants under fault scenarios using different techniques, we gain the ability to exercise edge cases in the system that may reveal unexpected failures in extreme scenarios. Injected faults can take many forms – network partitions, process pauses, disk failures, and more.

Upgrade Testing

Ensuring a Safe Upgrade Path

Finally, it’s not enough to test one version of the database. Upgrade testing allows us to validate the upgrade path between major versions, ensuring that a rolling upgrade can be completed successfully, and that contents of the resulting upgraded database is identical to the original. To perform upgrade tests, we begin by snapshotting a cluster and cloning it twice, resulting in two identical clusters. One of the clusters is then upgraded. Finally, we perform a row-by-row scan and comparison of all data in each partition to assert that all rows read are identical, logging any deltas for investigation. Like fault injection tests, upgrade tests can also be thought of as an operational scenario all other types of tests can be parameterized against.

Wrapping Up

The Apache Cassandra developer community is working hard to deliver Cassandra 4.0 as the most stable major release to date, bringing a variety of methodologies to bear on the problem. We invite you to join us in the effort, deploying these techniques within your infrastructure and testing the release on your workloads. Learn more about how to get involved here.

The more that join, the better the release we’ll ship together.

Proposal for a New Cassandra Cluster Key Compaction Strategy

Cassandra storage is generally described as a log-structured merge tree (LSM). In general, LSM storage provides great speed in performing writes, updates and deletes over reads. As a general rule, a write in Cassandra is an order of magnitude faster than a read. Not that reads are necessarily slow, but rather that the entire design of the server is to do writes very quickly and efficiently.

To manage data written to LSM storage, the files created by the fast writes need to be re-organized to help read efficacy and manage storage space. The process to perform this reorganization is called “compaction.” There are currently three generally available compaction strategies, each designed to optimize certain workloads.

Unfortunately, there are many workloads which don’t necessarily fit well into any of the current compaction strategies. What I hope to do here is present a convincing argument for a fourth compaction strategy which I think will fit the needs of many use cases which today are left out in the cold.

I am calling my proposed compaction strategy: Cluster Key Compaction Strategy (CKCS)

Existing strategies

Size Tiered

Size Tiered Compaction Strategy (STCS) is the default compaction strategy and it has worked for many workloads through the years Cassandra has been in existence. It is recognized as having a relatively low write amplification level and it can generally keep the total number of SSTable files reasonably low, limiting the number of SSTable files that need to be referenced to find all the parts of a partition required by a read. One of its largest drawbacks is the amount of disk space required for a compaction.

Leveled Compaction

Leveled Compaction Strategy (LCS) attempts to address the large amount of disk space required for compaction, and at the same time it also works to drastically limit the number of SSTable files required to fulfill a read from a partition to just one or two SSTable files. Its main drawback is the dramatic increase in write amplification for all data stored in LCS. With LCS, SSTable files are allowed to grow only to a predefined fixed size with the requirement that all columns of a specific partition exist in only one SSTable file at each level. When compacting from one level to the next, many SSTable files are both merged and distributed to many SSTable files.

Time Window

Time Window Compaction Strategy (TWCS) uses STCS inside of a set of predefined windows or buckets based on time to store data. It deliberately keeps partitions spread across many SSTable files. By the use of windows, the space required to perform a compaction can be reduced by up to the number of windows. For example, if the number of windows is 20, then the space required for any TWCS compaction will be no more than 1/20 of the space consumed by the table. It also results in the lowest write amplification of any of the compaction strategies. While this is not enforced, it is strongly recommended that TWCS be used only with data that is known to have a limited lifetime, preferably through the Time To Live (TTL) feature of Cassandra. TWCS was designed to store time series data where the data coming in is dividable into well-defined time chunks.

TWCS does not play well with hinted handoffs, read repairs or regular repairs, all of which can end up putting data which might belong in one window into a different window. This is not usually a problem if the data is short-lived or not of a critical nature. But that is not always the case in the real world.

Limits to existing strategies

As discussed above, each strategy has its strengths and weaknesses. Each needs to be carefully evaluated to decide which is best for your application.

STCS requires a large amount of space to perform compactions and may need many SSTable files read to find all parts of a specific row for a given partition, but it has fairly low write amplification.

LCS dramatically reduces the amount of space required for a compaction and greatly improves the likelihood that all the rows of a partition will be in the same place, but it can produce a huge number of SSTable files and it results in a massive increase in write amplification. It’s best used with workloads where reads are 90% or better of the workload.

TWCS is designed to work with time series data only. It is based on server time, having nothing at all to do with anything stored in the data itself. Like LCS, it greatly reduces the space required for compaction and it also has even better write amplification than STCS. It does not work well with Cassandra’s current anti-entropy mechanisms which makes it unsuitable for some kinds of data which might otherwise fit

Why a new strategy

In the last four years I have spent time consulting for different organizations which are using or planning to use Cassandra, and I keep finding workloads which would benefit from a compaction strategy that has features of both LCS and TWCS, and yet is still distinct from either one.

In fact, there are really two distinct use cases one could argue belong to separate strategies, but I think a single strategy could be created to fit both.

I would like to propose Cluster Key Compaction Strategy (CKCS). In the CKCS, SSTable files will be grouped together based on its cluster key values. Either on a set of moving windows, much like TWCS uses where a specific number of windows contain data expected to expire over time to limit the total number of windows, or based on a predefined set of set of partitions for the entire key range. By basing the window selection on cluster key values, the windows become defined outside of current server time, allowing Cassandra anti-entropy tools to work, although this will increase write amplification and SSTable file counts over traditional TWCS. It will also allow data sets which are not time-based to benefit from the compaction space and partition spread out that is in the nature of the current TWCS strategy.

Proposed CKCS Details

The proposed CKCS will use the first column of the cluster key to define buckets which will be used to designate groups of SSTable file sets used to store data. In order to make the definition simple, the data type of that first column will need to be fixed in width and the possible key values well understood. Small integer, integer, large integer and timestamps would be the simplest to use and not, in my opinion, an unnecessarily restricted list.

How the CKCS would work

When a table is created with CKCS one of two bucket definition parameter types will be used.

  1. Moving window variation. Two parameters are used: one defines the unit size much like TWCS and should be caused unit. A unit can be a timeframe (seconds, minutes, hours, days) or it can be a number scale (ones, tens, hundreds, thousands, millions). The second parameter is the window size in units. With the moving window variation, it is assumed that all data written to the table will eventually expire and the number of windows will therefore be limited based on the lifetime of data stored in the table.
  2. Static window variation. One parameter is used: The static window variation assumes long-lived data which is to be spread into multiple windows, or buckets based on the value of the entire contents of the cluster key column. With this variation, the window size is not specified by the user. Instead, the number of windows or buckets is specified. Cassandra will compute the “size” by taking the maximum absolute value range of the column and dividing by the number of desired windows or buckets.

In both approaches, when an SSTable is flushed to disk, behavior is normal. When enough SSTable files have been flushed defined by a compaction threshold variable, instead of compacting the SSTable files together, the data in the existing SSTable files will be distributed to a single SSTable file in each window or bucket. For normal operation of the moving window variation, this will look much like the first compaction in TWCS and probably result in an actual compaction. For the static window variation, this will cause the data in the tables to be distributed out, creating more rather than fewer SSTable files.

After data is distributed to a defined SSTable file window or bucket, compaction proceeds using Size Tiered compaction within that window or bucket. To allow efficient queries based on cluster key ranges, the range of cluster key values for a specific SSTable file will be stored as a part of the SSTable file’s metadata in its statistics file.


This new compaction strategy will have benefits over TWCS and might likely succeed it as the primary time series compaction strategy, as it avoids many of the current issues with TWCS or its predecessor DTCS. In addition, this new strategy will bring some of the benefits of TWCS to database workloads which are not time series in nature.

Large partitions

Large partitions under both STCS and LCS cause significant extra work during compaction. By spreading the partition data out over a number of windows or buckets, partitions can become significantly larger before having the heap and CPU impact on Cassandra during compaction that large partitions do today.

Dealing with anti-entropy

Currently, Cassandra anti-entropy mechanisms tend to work counter-purpose to both TWCS and DTCS and often make it necessary to turn them off to avoid pushing data into the wrong windows. It is also impossible to reload existing data or add a new DC or even a new host without disrupting the windowing.

CKCS will ensure data gets put into the correct windows even with anti-entropy running. It will also allow maintenance activities, including data reloads, adding a new DC or a new host to an existing DC storing data into the correct window.

What CKCS won’t be able to do is ensure a final window compaction since there is never a certain final point in time for a given window. A “final” compaction is still likely to be a good idea; it just won’t ensure that all data will be in a single SSTable file for the window.

Compaction space savings

For both modes, moving window and static window, the compaction space savings will be comparable to what can be accomplished with TWCS or DTCS.

Write amplification

Write amplification benefits should be similar to TWCS for the moving window mode as long as writes take place during the actual time windows and anti-entropy is not generating significant out of window writes. In Static window mode, write amplification should be similar to standard STCS but the number of compactions increases while the sizes will decrease making for the overall I/O workload somewhat less spiky.


Find out how Pythian can help you with Cassandra Services.

Hardware-bound Zero Copy Streaming in Apache Cassandra 4.0

Streaming in Apache Cassandra powers host replacement, range movements, and cluster expansions. Streaming plays a crucial role in the cluster and as such its performance is key to not only the speed of the operations its used in but the cluster’s health generally. In Apache Cassandra 4.0, we have introduced an improved streaming implementation that reduces GC pressure and increases throughput several folds and are now limited, in some cases, only by the disk / network IO (See: CASSANDRA-14556).

Fig 1. Cassandra Streaming To get an understanding of the impact of these changes, let’s first have a look at the current streaming code path. The diagram below illustrates the stream session setup when a node attempts to stream data from a peer. Let’s say, we have a 3 node cluster (Nodes A, B, C). Node C is being rebuilt and has to stream all data that it is responsible for from A & B. C setups a streaming session with each of it’s peers (See: CASSANDRA-4560 how Cassandra applies Ford Fulkerson to optimize streaming peers). It exchanges messages to request ranges and begins streaming data from the selected nodes.

During the streaming phase, A collects all SSTables that have partitions in the requested ranges. It streams each SSTable by serializing individual partitions. Upon receiving the partition, node C reifies the data in memory and then writes it to disk. This is necessary to accurately transfer partitions from all possible SSTables for the requested ranges. This streaming path generates garbage and could be avoided in scenarios where all partitions within the SSTable need to be transmitted. This is common when you’re using LeveledCompactionStrategy or have enabled partitioning SSTables by token range (See: CASSANDRA-6696), etc.

To solve this problem CASSANDRA-14556 adds a Zero Copy streaming path. This significantly speeds up the transfer of SSTables and reduces garbage and unnecessary object creation. It modifies the streaming path to add additional information into the streaming header and uses ZeroCopy APIs to transfer bytes to and from the network and disk. So now, an SSTable may be transferred using this strategy when Cassandra detects that a complete SSTable needs to be transferred.

How do I use this feature?

It just works. This feature is controlled using stream_entire_sstables in cassandra.yaml and is enabled by default. Even though this feature is enabled, it will respect the throttling limits as defined by stream_throughput_outbound_megabits_per_sec.


Cassandra can stream SSTables only bounded by the hardware limitations (Network and Disk IO). With this optimization, we hope to make Cassandra more performant and reliable.

Microbenchmarking this feature shows a marked improvement (higher is better). Block Stream Writers are the ZeroCopy writers and Partial Stream Writers are the existing writers.

Benchmark Mode Cnt Score Error Units
ZeroCopyStreamingBenchmark.blockStreamReader thrpt 10 20.119 ± 1.300 ops/s
ZeroCopyStreamingBenchmark.blockStreamWriter thrpt 10 1339.672 ± 352.242 ops/s
ZeroCopyStreamingBenchmark.partialStreamReader thrpt 10 0.590 ± 0.135 ops/s
ZeroCopyStreamingBenchmark.partialStreamWriter thrpt 10 17.556 ± 0.323 ops/s


If you’re a Cassandra user, we would love to hear back from you. Please send us feedback via user Mailing List, Jira, or IRC (or any combination of the three).

How to migrate data from Cassandra to Elassandra in Docker containers

A client recently asked us to migrate a Cassandra cluster running in Docker containers to Elassandra, with the data directory persisted via a bind mount. Elassandra is a fork of Cassandra integrated closely with Elasticsearch, to allow for a highly scalable search infrastructure.

To prepare the maintenance plan, we tested some of the methods as shown below.

The following are the commands used if you would like to test the process locally. Docker commands are used on one node at a time throughout the process to execute test statements. The Cassandra container is named my_cassandra_container, and the test Elassandra container is called my_elassandra_container. Replace the local directory /Users/youruser below as appropriate.

Start the Cassandra Container

First, start a container with the latest Cassandra version (3.11.2), binding the data volume locally as datadir.

In our use case, variables such as data center were pre-determined, but note that Cassandra and Elassandra have different default values in the container startup scripts for some of the variables. In the example below, data center, rack, snitch, and token number will be sent explicitly via environment variables flags (-e), but you can alternatively adjust these in the configuration files before starting Elassandra.

It will take about 15 seconds for this to start up before Cassandra is ready to accept the write statement following this. If you’re following the logs, look for “Created default superuser role ‘cassandra'” before proceeding.

docker run --name my_cassandra_container -e CASSANDRA_DC=DC1 -e CASSANDRA_RACK=RAC1 -e CASSANDRA_ENDPOINT_SNITCH=SimpleSnitch  -e CASSANDRA_NUM_TOKENS=8 -v /Users/youruser/mytest/datadir:/var/lib/cassandra -d cassandra:latest

Copy Configuration Files

Copy the Cassandra configuration files to a local location for ease of editing.

docker cp my_cassandra_container:/etc/cassandra/ /Users/youruser/mytest/cassandra

Create and Validate Test Data

Next, create some data in Cassandra using cassandra-stress as a data generator.

docker exec -it my_cassandra_container cassandra-stress write n=20000 -pop seq=1..20000 -rate threads=4

For comparison later, do a simple validation of the data by executing count and sample queries.

docker exec -it my_cassandra_container cqlsh -e "select count(*) from keyspace1.standard1"
docker exec -it my_cassandra_container cqlsh -e "select * from keyspace1.standard1 limit 1"

Stop and Remove the Cassandra Container

To prepare for the migration, stop Cassandra and remove the container.

docker exec -it my_cassandra_container nodetool flush
docker stop my_cassandra_container
docker rm my_cassandra_container

Install Elassandra Container

On a new container, install the latest Elassandra version using the same local data and configuration file paths as above. Again, it will take 15 seconds or so before the next statement can be run. If you are following the logs, look for “Elassandra started.”

docker run --name my_elassandra_container -e CASSANDRA_DC=DC1 -e CASSANDRA_RACK=RAC1 -e CASSANDRA_ENDPOINT_SNITCH=SimpleSnitch  -e CASSANDRA_NUM_TOKENS=8 -v /Users/youruser/mytest/datadir:/var/lib/cassandra -d strapdata/elassandra:latest

Validate Data

Now that Elassandra is running, re-validate the data. Note that at this point, only the fork of Cassandra is running, not integrated yet with Elasticsearch.

docker exec -it my_elassandra_container cqlsh -e "select count(*) from keyspace1.standard1"
docker exec -it my_elassandra_container cqlsh -e "select * from keyspace1.standard1 limit 1"

Repeat the above steps on remaining nodes.

Enable Elasticsearch

To enable the Elasticsearch part of Elassandra, stop Cassandra on all nodes. A rolling update does not work for this step. Enable Elasticsearch by updating the elasticsearch.yml configuration file as below. (Note that you have linked it to your local filesystem via the cp statement, so edit it directly on your local machine.)

docker stop my_elassandra_container
docker cp my_elassandra_container:/opt/elassandra- /Users/youruser/mytest/cassandra

vi /Users/youruser/mytest/cassandra/elasticsearch.yml

cluster.name: Test Cluster  ## Name of cluster
network.host:  ## Listen address
http.port: 9200

Restart and Validate Elassandra

Finally, restart and test the Elassandra container.

docker start my_elassandra_container

docker exec -it my_elassandra_container curl -X GET http://localhost:9200/

Sample output:

Elassandra GET Output

Elassandra GET Output

Thank you to Valerie Parham-Thompson for assistance in testing.

Cassandra CQL Cheatsheet

Every now and then I find myself looking for a couple of commands I do often. In some other software/technologies we sometimes find a thing called a “cheatsheet” that displays the more used (and some more obscure commands) of that software/technology.

I tried to find one for CQL and since I didn’t find one… I created a CQL one! This is not an extensive, exhaustive list, its just the commands I tend to use to most and related ones.

Suggestions are accepted! Leave your suggestions in the comments below!

Also, Printable version coming soon!

DISCLAIMER: This is for the latest Cassandra version (3.11.2)

Without further conversation, here it is:

CQLSH Specific


$ cqlsh [node_ip] -u username -p password

Use Color

$ cqlsh [node_ip] -C -u username -p password

Execute command

$ cqlsh -e ‘describe cluster’

Execute from file

$ cqlsh -f cql_commands.cql

Set consistency


Run commands from file

$cqlsh> SOURCE ‘/home/cjrolo/cql_commands.cql’ ;

Capture output to file

$cqlsh> CAPTURE ‘/home/cjrolo/cql_output.cql’ ;

Enable Tracing

$cqlsh> TRACING ONE;

Vertical Printing of Rows

$cqlsh> EXPAND ON;

Print tracing session

$cqlsh> SHOW SESSION 898de000-6d83-11e8-9960-3d86c0173a79;

Full Reference:



CQL Commands

Create Keyspace

CREATE KEYSPACE carlos WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’ : 3};

Alter Keyspace

ALTER KEYSPACE carlos WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’ : 1};

Drop Keyspace


Create Table

CREATE TABLE carlos.foobar (

  foo int PRIMARY KEY,

  bar int


Alter Table

ALTER TABLE carlos.foobar WITH compaction = { ‘class’ : ‘LeveledCompactionStrategy’} AND read_repair_chance = 0;

Drop Table

DROP TABLE foobar;

Create Role


Create User


Assign Role

GRANT admins TO carlos;

Revoke Role

REVOKE admins FROM carlos;

List Roles


Use Keyspace

USE carlos;


INSERT INTO foobar (foo, bar) VALUES (0, 1);

Insert with TTL

INSERT INTO foobar (foo, bar) VALUES (1, 2) USING TTL 3600;


UPDATE foobar SET bar = 42 WHERE foo = 1;


SELECT * FROM foobar WHERE foo=0;


DELETE FROM foobar WHERE foo = 1;

Full Reference