The Mutant Monitoring System Scylla Training Series

At ScyllaDB, we created our Mutant Monitoring System blog series as a fun and informative way to teach users the ropes. As a quick recap of the backstory, mutants have emerged from the shadows and are now wreaking havoc on the earth. Increased levels of malicious mutant behavior pose a threat to national security and the general public. To better protect the citizens and understand more about the mutants, the Government enacted the Mutant Registration Act. As required by the act, each mutant must wear a small device that reports on his/her actions every second. The overall mission is to help the Government keep the Mutants under control by building a Mutant Monitoring System (MMS).

The Mutant Monitoring series has been a great tool used to train new and experienced Scylla users on key concepts such as setup, failover, compactions, multi-datacenters, and integrations with third-party applications. The series is also good for developers to learn how to use Scylla in their applications in various programming languages.

In this post, I will go over each day of our training series and explain what you can get out of it.

Day 1: Setting up Scylla. On the first day, we explored the backstory of Division 3 and decided that Scylla is the best choice for the database backend. Using Docker, we set up a Scylla cluster and created the initial mutant catalog keyspace and table. After the tables were made, we added a few mutants to the catalog that serve as our main mutant characters for the entire series.

Day 2: Building the Tracking System. For day two, we picked up where we left off and built the tracking system which is a time-series collection of mutant data such as timestamp, location, and attributes based on their abilities. We also discussed the schema design in depth and went over the compaction strategy and clustering key and how to insert data and run queries.

Day 3: Analyzing Data. With the tracking system setup, we were now able to begin analyzing the data in day 3 and did this using Presto in Docker. Presto is a distributed SQL query engine for Big Data technologies like Scylla. With it, we showed how to run complex queries such as full-text search, comparing values, and querying data.

Day 4: Node Failure Scenarios. At Division 3, our mutant data-centers were experiencing more and more cyber attacks by evil mutants and sometimes we experienced downtime and cannot track our IoT sensors. By day 4, we realized that we needed to prepare for disaster scenarios so that we know for sure that we can survive an attack. In this exercise, we went through a node failure scenario, consistency levels, and how to add a node and repair the Scylla cluster.

Day 5: Visualizing Data with Apache Zeppelin. On day 5, we learned how to use Apache Zeppelin to visualize data from the Mutant Monitoring System. Apache Zeppelin is a Java Web-based solution that allows users to interact with a variety of data sources like MySQL, Spark, Hadoop, and Scylla. Once in Zeppelin, you can run CQL queries and view the output in a table format with the ability to save the results. Also, the query can be visualized in an array of different graphs.

Day 6 and 7: Multi-datacenter Scylla Deployment. Division 3 decided that they must prepare for disaster readiness by expanding the Scylla cluster across geographic regions in a multi-datacenter configuration. On day 6, we will set up a new Scylla cluster in another datacenter and learned how to convert our existing keyspaces to be stored in both datacenters and went over site failure scenarios. On day 7, we expanded this topic and went over consistency levels for multi-datacenter Scylla deployments.

Day 8: Scylla Monitoring. For day 8, we explained how to set up the Scylla Monitoring Stack which runs in Docker and consists of Prometheus and Grafana containers. We chose to run the monitoring stack so we can examine important Scylla specific details such as performance, latency, node availability, and more from the cluster.

Day 9: Connecting to Scylla with Node.js. Division 3 wanted to teach their development team how to create applications that can interact with the Scylla Cluster so they can build the next-generation tools for the Mutant Monitoring System. On day 9, we explored how to connect to a Scylla cluster using Node.js with the Cassandra driver and also went over the available Cassandra API’s for other programming languages.

Day 10: Backup and Restore. On day 10 we were told that Division 3 implemented a new policy for Scylla Administrators to learn how to backup and restore the mutant data in the cluster. Throughout the lesson, we explained how to simulate data loss and how to backup and restore data from Scylla.

Days 11 and 12: Using the Cassandra Java Driver. Division 3 decided that we must use more applications to connect to the mutant catalog and decided to hire Java developers to create powerful applications that can monitor the mutants. On day 11 and 12, we will explore how to connect to a Scylla cluster using the Cassandra driver for Java using basic query statements and then explained how to modify a Java program to use prepared statements

Day 13: Materialized Views. The Mutant Monitoring System had been receiving a lot of data and Division 3 wanted to find better ways to sort and store data so it can be quickly analyzed with applications. Luckily, having found an exciting feature in Scylla called Materialized Views, they provided us with directives to learn how to use it to help our application developers prevent further acts of terror. On day 13, we explained what Materialized Views are and how to use it with the Mutant Monitoring System.

Day 14: Using Apache Spark with Scylla. On day 14, Division 3 wanted to dive back into data analytics to learn how to prevent the attacks. For this training, we went over how to use Apache Spark, Hive, and Superset to analyze and visualize the data from the Mutant Monitoring system.

Day 15: Storing Binary Blobs in Scylla. Day 15 concluded our final Java programming series by explaining how to store binary files in Scylla. With this ability, we were able to learn how to store images of the mutants in the catalog keyspace using the blob table type. With the images stored, Division 3 was able to see what the mutant looks like whenever they want.

Day 16: The Mutant Monitoring Web Console. Day 16 is the final post in the series for now and we explained how to create a Mutant Monitoring Web Console in Node.js. The web console has a central interface that displays photos of the mutants and their basic information, as well as tracking information such as heat, telepathy, speed, and current location.

We hope that the Mutant Monitoring System has been educational for Scylla users. Throughout this series, we discussed a variety of topics ranging from running and configuring Scylla, recovering from disasters, expanding across multiple datacenters, using Scylla with different programming languages, and how to integrate Scylla with third-party applications like Spark and Presto. This series is done for now but we hope that the practical knowledge it provides will live on for some time.

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post The Mutant Monitoring System Scylla Training Series appeared first on ScyllaDB.

AdGear and ScyllaDB at the Big Data Montreal Meetup

AdGear and ScyllaDB

Last month, Mina Naguib, Director of Operations Engineering at AdGear, and Glauber Costa, VP of Field Engineering at ScyllaDB, teamed up at the Big Data Montreal meetup to discuss Real-time Big Data at Scale, examining how Scylla helped Adwords achieve their goal of 1 million queries per second with single-digit millisecond latencies.

Click the video below to see AdGear present their real-time advertising use case along with the results of switching from Cassandra to Scylla. Glauber explains how Scylla enabled AdGear to achieve high throughput at predictably low latencies, all while keeping TCOs under tight control.

AdGear is an online ad serving platform that compiles vast amounts of consumer data that’s used by online exchanges to bid on ad placements in real-time. AdGear has very little time to enter a bid; auctions often close in less than 100 milliseconds. Cassandra simply couldn’t keep up. While it performed about 1,000,000 bids per second, the latency and tuning complexity made it costly and difficult to keep up with online exchanges.

In this talk, Mina discusses how Scylla enables AdGear to hit their extreme throughput requirements, serving the same number of requests as Cassandra with half the hardware. Originally running a 31-node Cassandra cluster, AdGear easily downsized to a 16-node Scylla cluster in which each node serves more than twice as many queries as Cassandra’s. At peak traffic, read latency fell from 21 milliseconds to less than 5 ms on Scylla.

Glauber picks up and discusses common and familiar problems with Cassandra; unpredictable latency, tuning complexity, node sprawl, and garbage collection woes. As Glauber shows, Scylla users can accomplish more with less hardware, demonstrating that a single Scylla node can achieve 1 million OPS, with 1 ms 99% latency, all while auto-tuning and scaling up and out efficiently and painlessly.

To learn more about Scylla’s close-to-the-hardware architecture and how it helped AdGear amp up their ad serving platform, check out the video above.

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post AdGear and ScyllaDB at the Big Data Montreal Meetup appeared first on ScyllaDB.

Apache Kafka “Kongo” 6.3 – Production Kafka Application Scaling on Instaclustr

The goal of this blog is to scale the Kongo IoT application on Production Instaclustr Kafka clusters. We’ll compare various approaches including scale out, scale up and multiple clusters. There are two versions to the story. In the Blue Pill version scaling everything goes according to plan and scaling is easy. If you are interested in the journey to get there, read the Red Pill version, where we encounter and overcome some scalability challenges caused by the large number of consumers.


Let’s go scaling!


Even if the Congo river is only the second biggest river in the world, it does have the biggest rapids in the world! The Inga rapids occur where the river drops 100m, have an average flow of 43 thousand m³/s, and are perfect for a scalability challenge! (and yes, the coloured dots are kayaks):

It’s time to shoot the rapids and see how far we can scale Kongo. Our goal is to see how far Kongo will scale by increasing the Kafka cluster size (number of nodes and/or CPU cores per node) or cluster architecture.


Kayaking on the Inga Rapids


This is your last chance. After this, there is no turning back. You take the blue pill—the story ends, you wake up in your bed and believe whatever you want to believe. You take the red pill—you stay in Wonderland, and I show you how deep the rabbit hole goes.


This blog has two “versions”. In the first version (The Blue Pill) everything goes according to plan and Kongo is easy to scale on Kafka. In the second version (The Red Pill) come with me on the journey that finally got us to there!

1. Blue Pill – Scaling is Easy


In order to scale the Kongo client application I had to make a change to run it across multiple processes and instances. I split the application into multiple “districts” with a subset of the Goods, Warehouses and Trucks. Each process manages a district and uses dedicated sensor and rfid topics, but a common violations topic. This enabled the overall producer rate to be increased while retaining the correct ratio of primary producer, consumer and secondary consumer events.


The maximum throughput for each configuration was determined by increasing the primary producer rate until the consumer SLA was violated. The SLA was 1 second average end-to-end response time (total time from producer timestamp to the consumer receiving each record). A soak test was then run multiple times at the maximum load to get reliable results.


For simplicity, we’ll report the total system throughput achieved (total throughput event/s = primary producers + secondary producers + consumers). We’ll treat the Kafka cluster as a black box for the time being, but will monitor the Kafka cluster and the client instance(s) CPU utilisation. I’m also not doing any tuning of Kafka or the client side application so these results are indicative only and not the absolute best possible. Note that as the Kongo IoT application wasn’t specifically designed for benchmarking (it’s non-deterministic), and we’re benchmarking an asynchronous distributed messaging system, on Amazon cloud infrastructure (dedicated VMs but shared infrastructure), there is inevitably variation in results across runs. The results are the average of three runs.

1.1 Scaling Out: 3 to 6 node Kafka cluster

For the first attempt at scaling Kongo we’ll start with a 3 node x 2 CPU cores per node Kafka cluster similar to the one we used in the last blog (this is the smallest Professional size node offered by Instaclustr). We’ll use these results as the baseline to attempt to improve on. Note that this cluster has different settings to the previous test cluster (Broker to Broker encryption is turned on), and a change to the application code to improve scalability (see 2.3), so the results are not comparable. Here’s the baseline cluster specifications:


3 nodes x Production EBS: tiny 500, replication factor 3, node size: r4.large-500, 500 GB Disk, 15.25 GB RAM, 2 CPU cores.


The baseline result was 90,000 events/s (3×2 bar).

One way of scaling Kafka is to increase the number of broker nodes. Let’s compare the 3 node cluster with a 6 node cluster with the same size nodes. In theory doubling the number of nodes will give double the throughput, and the observed result (6×2 bar) was very close to this (176,000 events/s, x2 bar):

The Kafka cluster was at 85% CPU Utilization for the 3×2 configuration, and 90-95% for the 6×2 result, and 14 and 23 CPU cores were used on the client side (running on two instances).

1.2 Scaling Up: 2 to 4 CPU core nodes

Another way to increase capacity is to scale up by increasing the size of the nodes. For the next experiment I created a bigger 3 x 4 CPU core per node cluster with the following specifications:


3 nodes x Production EBS: standard 750, replication factor 3, node size: r4.xlarge-750, 750 GB Disk, 30.5 GB RAM, 4 CPU cores.


This result was surprising, giving a throughput of 400,00 events/s (red bar) which is significantly more than double (green bar) the baseline result (blue bar).

What’s going on here? The Kafka cluster utilisation was 85%, and the number of client cores increased to 54 (4 processes running across 2 instances). I revisited the 3×2 baseline results by running the benchmark at even higher loads. It turns out that it is possible to achieve higher sustainable throughputs (consumers keeping up with producers, 200,000 events/s, pale blue bar), but the SLA was violated (i.e. average response times >> 1s). The 3×4 result (400,000 events/s) is twice this higher result:


See the Response Times section below (1.5) for further investigation.

1.3 An Architectural Alternative: Multiple clusters

A third way of scaling Kafka applications is to use multiple clusters. Multiple clusters is a flexible approach to scalability as it’s easier to diagnose if a cluster, workload, or topic is reaching maximum capacity, and different workloads/topics can be scaled independently. The Kongo application is suitable for scaling in this way because the secondary producers send data to the violations topic. I.e. it’s an output of the application and is not consumed by it. The violations topic is also responsible for up to 40% of the total events in and out of the system, so moving it to another cluster may take a significant load off the main cluster (the cluster left handling the primary producer and consumer loads). This diagram shows the division of work between the two Kafka clusters:


This experiment was tried with a combination of a larger 6 node x 2 cores per node cluster for the main cluster, and a smaller 3 node x 2 cores per node cluster for the secondary producer events and the violations topic.

This multi-cluster approach (grey bar) gave an 87% throughput improvement over the single 6 node cluster result, which exceeds the 50% increase in the number of cluster cores (from 12 to 18):


Kafka cluster CPU utilisation was 95% for the primary cluster and 80% for the secondary cluster, and 37 client instance cores were used (4 processes on 2 instances).

1.4 All Results

The following graph compares all the results, showing that the scale up (3×4 bar) configuration gave the best improvement over the baseline (3×2 bar) result (close to 4 times the throughput), followed by the two cluster configuration (grey bar, which gave a 3.6 times improvement), and the scale-out configuration (6×2 bar, close to twice the throughput):

1.5 Response Times

Because I was focussed on achieving maximum throughput for the SLA of 1 second, I initially missed a significant difference in the scale-up results. It turns out that there is a big difference in the minimum response times (measured as average response time at very low throughput). The 2 CPU core nodes have a higher latency (230 ms average) than the 4 CPU core nodes (20 ms average):

What does this mean? It explains why a higher throughput was achieved when ignoring the SLAs for the 2 CPU core node configuration, as the SLA is more easily violated (even with some headroom left) on 2 CPU core nodes. It also explains why the throughput of the scale-up configuration (4 cores per node) was more than expected.  If latencies matter it’s a good idea to use larger node sizes.

A difference is also confirmed by the Instaclustr Kafka Brokers Monitoring Metric, Synthetic Transaction Latency. On the 2 CPU core nodes the maximum latency is 120ms, but on the 4 CPU core nodes, it’s only 20ms (average 10 ms c.f. 3 ms).

Why are the 2 CPU cores nodes slower? In theory, they use the same speed CPUs. One possible explanation is network performance. 2 core R4 instances have “up to 10 Gigabit” network performance, with 32 core instances having a full 10 Gigabit.  It’s possible that a 2 core R4 actually only has 2/32 * 10 = 0.625 Gigabits (625 Megabits) available to it which could result in increased latency. However, according to this theory, an 8 node instance should have even less latency (but after extra testing it has comparable latency to the 4 core instances).

1.6 Server vs. Client Side Resources

As I was collecting both server and client side instance CPU Utilisation and load metrics, I was able to analyse this data and found a significant trend. The Kafka server is more efficient and uses fewer cores than the client side. The client side uses approximately double the number of cores as the server side. The actual ratio of client to server cores depends to some extent on the node sizes so the following are averaged results, however the relationship between load and client cores is consistent, which allows the graph to be used to predict the expected number of client cores for a target load (e.g. 100 client cores for 1M events/s, extrapolation from 400k to 1M events/s):

To achieve scalability the application must therefore be designed to scale out on multiple client instances, and have sufficient resources on the client side.

2. Red Pill – Scaling is Harder

Watch out for whirlpools


In the Red Pill version of the world, achieving linear scalability had some challenges.

Initial attempts to achieve linear scalability with Kongo failed. I could only get a 15% improvement with scale out and 50% with scale up. The fundamental problem turned out to be the large number of consumers. However, before finding the cause and managing to drastically reduce the number of consumers needed at higher loads, I tried to mitigate several issues I discovered running the benchmark with the large number of consumers. These insights may still be useful if your application really requires a large number (100s) of consumers, as scalability is harder to achieve with increasing numbers of consumers.

2.1 The Kafka “Key Parking Problem” (Hash Collisions)

At the end of the previous blog (6.2), I noted that during soak tests I was getting a “too many open files” exceptions which I fixed. It’s odd that I didn’t get this error when running the autonomic ramp-up tests. In hindsight, there was obviously something suspicious going on. Checking the logs I noticed that some of the consumers were actually timing out resulting in less than the expected total number of consumers running concurrently:

SensorConsumer 1036 closing after timeout (...)

2018-06-15 05:27:16,666 INFO  [Thread-494] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [info:341] [Consumer clientId=consumer-494, groupId=KongoSensorConsumer] Revoking previously assigned partitions [kongo-sensor--115, kongo-sensor--114, kongo-sensor--116]

2018-06-15 05:27:16,667 INFO  [Thread-494] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [info:336] [Consumer clientId=consumer-494, groupId=KongoSensorConsumer] (Re-)joining group


To diagnose this problem I added code to Kongo to accurately keep track of the number of active consumer threads, and prevent an increase in the number of consumer threads if a decrease in threads due to timeouts is detected. But this doesn’t solve the underlying timeout problem. For some reason, some of the consumers were not receiving events.

How does consumer concurrency work? The number of partitions is important. For consumers in the same consumer group, the number of partitions must be greater than or equal to the number of consumers. I had sufficient partitions so this wasn’t the problem. I wondered if the data wasn’t being evenly balanced across the partitions? And if some partitions even had no data?

I tried using the kafka-consumer-groups console command to check if all the partitions had non-zero offsets.  They did, so all partitions were “obviously” in use. Although, the offsets were not identical (would you expect them to be?).  This gave me a valuable clue. Given that I was running Kongo as a benchmark the data created for each run was different to the previous runs. It was, therefore, possible that some partitions were being used in one run but not the next run. The final clue was when I noticed that the problem was only occurring for one of the two consumer types, SensorConsumers.  The difference between RFID and Sensor consumers in the Kongo code was that the RFID topic didn’t use a key, but the Sensor topic did. The key was the location. For the benchmarking, I was using 100 warehouse locations + 200 trucks = 300 locations. This gives 300 keys for 300 partitions which I assumed was sufficient. To check I did some statistical analysis to see how many distinct values were produced using the kafka default partitioner, which uses this function:


return Utils.abs(Utils.murmur2(record.key())) % numPartitions;


However, using a 300 value location key only gives on average 200 unique values, so only 200 partitions are actually receiving data on any run. Only 200 consumer threads receive data, and the rest time out. A small set of key values will result in hashing collisions, so some partitions will get more messages than others, and some partitions may not get any messages at all! A Kafka Key/Partition “Parking Problem”?

Car park “hash” collision (a variation of “Knuth’s Parking Problem” [See Note 1]).


For Kafka keys to work correctly you have to ensure that:


number of key values >>>

number of partitions >=

number of consumers (in a group)


Some workarounds are to implement your own partitioner function, create a finer grained key, or not use a key.

In Kongo, a simple partitioner function could be a 1-1 mapper that transforms the location GUID to a single partition (e.g. by using a stateful hashmap with the key=location and value, where value is a counter incrementing from 0, modulo the number of partitions, but would need some way to persist the state).  A finer grained location key (more values) can be created by concatenating location with say the metric name and works well (giving several thousand key values and a data in most partitions most of the time). Not using a key is the easiest solution and ensures perfect load balancing across all brokers. This is good for benchmarking, but in production, there may be good reasons for having a key. If event order matters, then you need keys and partitions. If you want to manually assign consumers to partitions you also need a key. If records are being used in Kafka streams or sent to Cassandra, then you need a key (or you can create a key just in time from something in the key value).  And if you want to use a topic cleanup policy of log compaction (which retains at least the latest value for each key) you need a key  (the default policy is deleted which compacts based on time or size). In hindsight, a better key for the RFID topic in Kongo is Goods, as there are lots of Goods (10,000) and we want actions on Goods to be done in order (e.g. loads, unloads).

2.2 Cloudy with a chance of Rebalancing Storms


To try and further understand what was limiting the scalability I took a closer look at the benchmarking harness I had written for Kongo to check that it was working as intended.  The code was designed to start the target number of consumers as fast as possible before starting the Kongo IoT simulation and running the target load. However, it was apparent that it was not reliably starting the target number of consumer threads every run. Sometimes there were exceptions, there were rebalancing “storms”, and some of the consumers did not receive events from the start of the soak test. The autonomic code worked without any problems (probably due to the relatively slow and incremental increase in consumers as the load ramped up).

One of the exceptions suggested increasing the time for, which I tried without fixing the problem See here for more details of how max poll interval works.


The real cause of the problem turned out to be the way that Kafka balances consumers in a group across the partitions in topics. How does it work? Each consumer group has a single broker elected as the group coordinator.  The first consumer in the group becomes the group leader. The leader is responsible for managing the assignment of partitions to each new consumer in the group. I.e. there’s one broker and one consumer responsible for rebalancing a consumer group. Here’s an extract from the documentation:


When a consumer wants to join a consumer group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator (this will include all consumers that sent a heartbeat recently and are therefore considered alive) and it is responsible for assigning a subset of partitions to each consumer. It uses an implementation of PartitionAssignor interface to decide which partitions should be handled by which consumer. After deciding on the partition assignment, the consumer leader sends the list of assignments to the GroupCoordinator which sends this information to all the consumers. Each consumer only sees his own assignment – the leader is the only client process that has the full list of consumers in the group and their assignments. This process repeats every time a rebalance happens.


Not surprisingly this doesn’t scale, and for reliability, it’s important to wait until each consumer has been started and is polling (as it’s in the poll that everything related to the protocol happens) before trying to start another consumer.  See Note 2 below for more resources.

Once this fix was introduced the consumers were consistently all ready to go by the time the soak test started. However, this didn’t make any appreciable difference to the throughput.  The average consumer startup rate achieved was around 10 per second, an average time of 100ms. I.e. it takes a non-zero period of time to start a consumer thread. I wondered if there was a correlation between the time to start a new consumer and the number of consumers already in the group, but there wasn’t. However, the maximum time to start a new consumer was sometimes in excess of 20 seconds, due to particularly large rebalancing storms.  This could be a downside of having a single consumer group with a very large number of consumers, better design options may be to use manual consumer partition assignment or a larger number of topics and consumer groups with fewer consumers per group.


2.3 Watch out for delays in consumer threads

Whirlpools can cause serious delays in kayaking

It turned out that the main problem with achieving the expected scalability was a design flaw on the consumer side. I had originally used the Guava EventBus as a loosely coupled mechanism to deliver each event to many subscribed objects. EventBus is an asynchronous mechanism and allows the processing of each event (e.g. Sensor/Goods checks, and Goods/Trucks co-location events) to be carried out independently to the consumer thread, allowing the consumer to focus purely on getting events from Kafka and handing them over for processing as fast as possible. However, what I hadn’t realised was that the EventBus post method itself is actually synchronous, and waits until the event has been delivered to every subscriber before returning. It was simple enough to wrap the put method in a thread pool so that the consumers can immediately return to looping through the records and polling. This resulted in a significant reduction in the number of consumers needed to process events within the 1s SLA at higher loads (from hundreds to tens) and solved the scalability problem.

3. Kafka Scalability Conclusions

Which pill did you pick? In the Blue Pill world, Kafka is easy to scale with small numbers of consumers and sufficient client-side resources. In the Red Pill world, we encountered several scalability challenges related to having a large number of consumers.  If you really need lots of consumers to make sure you have sufficient key values to ensure all the partitions are able to be used by the consumers and watch out for rebalancing storms. Reducing the number of consumers needed (e.g. by reducing the time spent processing events in the consumer) resulted in the biggest improvement in scalability.

The Instaclustr managed Kafka service makes it really easy to create different sized Kafka clusters for benchmarking and for provisioning flexible alternatives for production, but I didn’t try all the permutations. There are bigger instance sizes available (up to 8 cores per node using  r4.2xlarge-4500 nodes), and another option for scaling is to add nodes incrementally (Under the “Add” tab in the Instaclustr console). Scale-out and up, and multiple clusters are all useful strategies for scalability, however, larger instance sizes have less latency and are a good choice for applications requiring low latency.


Note 1: Knuth’s Parking Problem (dates from 1962!)

”A certain one-way street has m parking spaces in a row numbered 1 to m. A man and his dozing wife drive by, and suddenly, she wakes up and orders him to park immediately. He dutifully parks at the first available space…”

D. E. Knuth. Notes on “open” addressing. Unpublished memorandum, 1963. (Memo dated July 22, 1963. With annotation “My first analysis of an algorithm, originally done during Summer 1962 in Madison”

Referenced in: Alfredo Viola. Distributional Analysis of the Parking Problem and Robin Hood Linear Probing Hashing with Buckets. Discrete Mathematics and Theoretical Computer Science, DMTCS, 2010, 12 (2), pp.307-332. <hal-00990469>


Note 2: Further links for Kafka Consumer Rebalancing

  1. Kafka Client-side Assignment Proposal
  2. Kafka Consumer rebalancing frequently
  3. Kafka Consumers: Reading Data from Kafka

The post Apache Kafka “Kongo” 6.3 – Production Kafka Application Scaling on Instaclustr appeared first on Instaclustr.

Re-Bootstrapping Without Bootstrapping

During a cluster’s lifespan, there will be scenarios where a node has been offline for longer than the gc_grace_seconds window or has entered an unrecoverable state. Due to CASSANDRA-6961’s introduction in Cassandra 2.0.7, the process for reviving nodes that have been offline for longer than gc_grace_seconds has been dramatically shortened in cases where the cluster does not ingest deletion mutations.

Before we visit the new workflows, let’s gain a firm understanding of what occurs when a node is offline.

If a node has been offline for longer than gc_grace_seconds a few things happen:

  • Some hints will never be delivered to the revived node.
  • Some tombstones will never be delivered to the revived node.
  • The revived node is not consistent since it has missed recent mutation events.

If the downed node is able to be revived, it may experience the following problems:

  • Read requests can serve deleted data which tombstones were meant to remove, but the tombstones may not yet have been delivered.
  • If a tombstone expired and was forgotten by the cluster by exceeding the gc_grace_seconds window, the matching data has no recollection of a deletion event and can continue to propagate around the cluster as “live data”. This previously-deleted data is called “zombie data”.
  • Read requests can serve stale data that has not been updated during the time the node has been offline.

The procedure for solving the problems listed above changed in Cassandra 2.0.7 and may not be common knowledge. This post will explain this new way to resolve the problems caused by a node being offline for a longer than ideal period, as well as the new streamlined solution for clusters that do not ingest deletions.

Post-1.2.17/2.0.9 Resolution

While the following instructions are meant to highlight an older procedure for restoring a node that has been dead for longer than gc_grace_seconds, this procedure also works for nodes that are within clusters that ingest deletions.

Stop the Node

This procedure assumes at least one of the following scenarios is true:

  • The node is unrecoverable.
  • The node has been offline for longer than gc_grace_seconds.
  • The formerly online node will be swapped out and has now been shut down.

Start a New Node

A new node should be launched with the same settings as the previous node, but not allowed to auto-start and join the existing Cassandra cluster. Once the node has been launched without auto-starting the Cassandra process, we’ll want to enable the replace_address_first_boot flag.

Note that CASSANDRA-7356 introduced a more friendly approach to the previous replace_address flag that was introduced via CASSANDRA-5916. The new replace_address_first_boot flag should always be preferred since it allows an operator to forget to remove the flag after the node starts up, without going through another replace_address process upon an unwitting restart.

Activate this replace_address_first_boot flag by adding the following line to the bottom of the on the new node:

JVM_OPTS="$JVM_OPTS -Dcassandra.replace_address_first_boot=<dead_node_ip>"

With the flag, the Cassandra node will:

  • Join the gossip ring.
  • Report itself as the new owner of the previous IP’s token ranges thereby avoiding any token shuffling.
  • Begin a bootstrap process with those token ranges.
  • Move into the UN (Up/Normal) state as shown via nodetool status.
  • At this point it will begin accepting reads and writes.

Post-2.0.7 Resolution

Post-Cassandra 2.0.7 the new recommendation changes in cases where deletion mutations are not ingested by the cluster.

WARNING: This section is only relevant if the cluster does not ingest deletion mutations. If the cluster ingests deletions and a node has been offline and cannot complete a full repair before the gc_grace_seconds window expires, resurrecting zombie data is still a concern. If zombie data is a concern, please follow the Post-1.2.17/2.0.2 Resolution.

If zombie data is not a concern, but ensuring highly consistent nodes is a priority, following these instructions can ensure the node is fully consistent before allowing the node to respond to read requests.

Set the Node to Hibernate Mode

Add the following line to

JVM_OPTS="$JVM_OPTS -Dcassandra.join_ring=false"

The above line ensures that the node starts up in a hibernation state.

Start Cassandra

After the above JVM option has been added, start the node that will be revived. If Cassandra was installed using Debian or RHEL packages, use the command:

sudo service cassandra start

Cassandra will then start in hibernation mode. During this time the node will be able to communication with the Cassandra cluster without accepting client requests. This is important because we will be able to perform maintenance operations while not serving stale data.

Repair Missing Data

The next step will be to repair any missing data on the revived node using:

nodetool repair

The repair process, similar to the bootstrap process, will:

  • Build Merkle trees on each of the nodes holding replicas.
  • Find all missing information required to make each Merkle leaf identical.
  • Stream all missing information into the revived node.
  • Compact all new SSTables as they are being written.

Because the repair process does not have to start from a 0-bytes baseline, we should effectively stream far less information repairing a node rather than bootstrapping or using the replace_address_first_boot procedure on a clean node.

The status for the repair can be confirmed in a few ways:

  • The nodetool repair command will return when the repair is complete.
  • The system.log will output a statement upon repair completion.
  • The nodetool compactionstats will have far fewer pending compactions.

Join the Repaired Node to the Cluster

Once the repair and subsequent pending compactions have been completed, have the revived node join the cluster using:

nodetool join

The above command is analogous to the last hidden step of the bootstrap process:

  • Announce to the cluster that an existing node has re-joined the cluster.

Follow-up Task: Don’t Go into Hibernate Mode Upon Restart

The only follow-up required with this procedure is to remove the following line within

JVM_OPTS="$JVM_OPTS -Dcassandra.join_ring=false"

Removing the above line will ensure that the node does not go into hibernation mode upon the next restart of the Cassandra process.


The process for reviving nodes that have been offline for longer than gc_grace_seconds has been shortened dramatically in situations where the cluster does not ingest deletion mutations. (Due to CASSANDRA-6961’s introduction in Cassandra 2.0.7.)

Instead of requiring the legacy process of:

  • Removing the downed node from the cluster.
  • Bootstrapping a clean node into the cluster.
  • Performing a rolling repair.
  • Performing a rolling cleanup.

Or using the newer process of:

  • Replacing a new, clean node by its IP address.

We can now follow the simplified process of:

  • Starting the downed node in hibernation mode.
  • Repairing all missing data.
  • Start allowing the revived node to communicate with client requests.
  • Ensure the node will not enter hibernation mode upon restart.

While the number of steps seem to be relatively the same between the legacy process and the newest process, the newer process:

  • Prevents streaming a node’s worth of data while making use of previously held data.
  • Does not require repairing all nodes within the affected data center.
  • Does not require cleaning up all nodes within the affected data center.

Hopefully this guide is found useful in alleviating the maintenance window of an operator by making use of previously held data without shifting token ownership and creating time consuming follow up tasks.

Hooking up Spark and Scylla: Part 1

spark scylla

Welcome to part 1 of an in-depth series of posts revolving around the integration of Spark and Scylla. In this series, we will delve into many aspects of a Scylla/Spark solution: from the architectures and data models of the two products, through strategies to transfer data between them and up to optimization techniques and operational best practices.

The series will include many code samples which you are encouraged to run locally, modify and tinker with. The Github repo contains the docker-compose.yaml file which you can use to easily run everything locally.

In this post, we will introduce the main stars of our series:

  • Spark;
  • Scylla and its data model;
  • How to run Scylla locally in Docker;
  • A brief overview of CQL (the Cassandra Query Language);
  • and an overview of the Datastax Spark/Cassandra Connector.

Let’s get started!


So what is Spark, exactly? It is many things – but first and foremost, it is a platform for executing distributed data processing computations. Spark provides both the execution engine – which is to say that it distributes computations across different physical nodes – and the programming interface: a high-level set of APIs for many different tasks.

Spark includes several modules, all of which are built on top of the same abstraction: the Resilient, Distributed Dataset (RDD). In this post, we will survey Spark’s architecture and introduce the RDD. Spark also includes several other modules, including support for running SQL queries against datasets in Spark, a machine learning module, streaming support, and more; we’ll look into these modules in the following post.

Running Spark

We’ll run Spark using Docker, through the provided docker-compose.yaml file. Clone the scylla-code-samples repository, and once in the scylla-and-spark/introduction directory, run the following command:

docker-compose up -d spark-master spark-worker

After Docker pulls the images, Spark should be up and running, composed of a master process and a worker process. We’ll see what these are in the next section. To actually use Spark, we’ll launch the Spark shell inside the master container:

It takes a short while to start, but eventually, you should see this prompt:

As the prompt line hints, this is actually a Scala REPL, preloaded with some helpful Spark objects. Aside from interacting with Spark, you can evaluate any Scala expression that uses the standard library data types.

Let’s start with a short Spark demonstration to check that everything is working properly. In the following Scala snippet, we distribute a list of Double numbers across the cluster and compute its average:

You can copy the code line by line into the REPL, or use the  :paste command to paste it all in at once. Type Ctrl-D once you’re done pasting.

Spark’s primary API, by the way, is in Scala, so we’ll use that for the series. There do exist APIs for Java, Python and R – but there are two main reasons to use the Scala API: first, it is the most feature complete; second, its performance is unrivaled by the other languages. See the benchmarks in this post for a comparison.

Spark’s Architecture

Since Spark is an entire platform for distributed computations, it is important to understand its runtime architecture and components. In the previous snippet, we noted that the created RDD is distributed across the cluster nodes. Which nodes, exactly?

Here’s a diagram of a typical Spark application:

There are several moving pieces to that diagram. The master node and the worker nodes represent two physical machines. In our setup, they are represented by two containers. The master node runs the Cluster Manager JVM (also called the Spark Master), and the worker node runs the Worker JVM. These processes are independent of individual jobs running on the Spark cluster and typically outlive them.

The Driver JVM in the diagram represents the user code. In our case, the Spark REPL that we’ve launched is the driver. It is, in fact, a JVM that runs a Scala REPL with a preloaded sc: SparkContext object (and several other objects – we’ll see them in later posts in the series).

The SparkContext allows us to interact with Spark and load data into the cluster. It represents a session with the cluster manager. By opening such a session, the cluster manager will assign resources from the cluster’s available resources to our session and will cause the worker JVMs to spawn executors.

The data that we loaded (using sc.parallelize) was actually shipped to the executors; these JVMs actually do the heavy lifting for Spark applications. As the driver executes the user code, it distributes computation tasks to the executors. Before reading onwards, ask yourself: given Spark’s premise of parallel data processing, which parts of our snippet would it make sense to distribute?

Amusingly, in that snippet, there are exactly 5 characters of code that are directly running on the executors (3 if you don’t include whitespace!): _ + _ . The executors are only running the function closure passed to reduce.

To see this more clearly, let’s consider another snippet that does a different sort of computation:

In this snippet, which finds the Person with the maximum age out of those generated with age > 10, only the bodies of the functions passed to filter and reduce are executed on the executors. The rest of the program is executed on the driver.

As we’ve mentioned, the executors are also responsible for storing the data that the application is operating on. In Spark’s terminology, the executors store the RDD partitions – chunks of data that together comprise a single, logical dataset.

These partitions are the basic unit of parallelism in Spark; for example, the body of filter would execute, in parallel, on each partition (and consequently – on each executor). The unit of work in which a transformation is applied to a partition is called a Task. We will delve more deeply into tasks (and stages that are comprised of them) in the next post in the series.

Now that you know that executors store different parts of the dataset – is it really true that the function passed to reduce is only executed on the executors? In what way is reduce different, in terms of execution, from filter? Keep this in mind when we discuss actions and transformations in the next section.


Let’s discuss RDDs in further depth. As mentioned, RDDs represent a collection of data rows, divided into partitions and distributed across physical machines. RDDs can contain any data type (provided it is Serializable) – case classes, primitives, collection types, and so forth. This is a very powerful aspect as you can retain type safety whilst still distributing the dataset.

An important attribute of RDDs is that they are immutable: every transformation applied to them (such as map, filter, etc.) results in a new RDD. Here’s a short experiment to demonstrate this point; run this snippet, line-by-line, in the Spark shell:

The original RDD is unaffected by the filter operation – line 6 prints out 1000, while line 8 prints out a different count.

Another extremely important aspect to Spark’s RDDs is laziness. If you’ve run the previous snippet, you no doubt have noted that line 4 is executed instantaneously, while the lines that count the RDDs are slower to execute. This happens because Spark aggressively defers the actual execution of RDD transformations, until an action, such as count is executed.

To use more formal terms, RDDs represent a reified computation graph: every transformation applied to them is translated to a node in the graph representing how the RDD can be computed. The computation is promoted to a first-class data type. This presents some interesting optimization opportunities: consecutive transformations can be fused together, for example. We will see more of this in later posts in the series.

The difference between actions and transformations is important to keep in mind. As you chain transformations on RDDs, a chain of dependencies will form between the different RDDs being created. Only once an action is executed will the transformations run. This chain of dependencies is referred to as the RDD’s lineage.

A good rule of thumb for differentiating between actions and transformations in the RDD API is the return type; transformations often result in a new RDD, while actions often result in types that are not RDDs.

For example, the zipWithIndex method, with the signature:

def zipWithIndex(): RDD[(T, Long)]

is a transformation that will assign an index to each element in the RDD.

On the other hand, the take method, with the signature:

def take(n: Int): Array[T]

is an action; it results in an array of elements that is available in the driver’s memory space.

This RDD lineage is not just a logical concept; Spark’s internal representation of RDDs actually uses this direct, acyclic graph of dependencies. We can use this opportunity to introduce the Spark UI, available at http://localhost:4040/jobs/ after you launch the Spark shell. If you’ve run one of the snippets, You should see a table such as this:

By clicking the job’s description, you can drill down into the job’s execution and see the DAG created for executing this job:

The Spark UI can be extremely useful when diagnosing performance issues in Spark applications. We’ll expand more on it in later posts in the series.

We can further divide transformations to narrow and wide transformations. To demonstrate the difference, consider what happens to the partitions of an RDD in a map:

map is a prime example of a narrow transformation: the elements can stay in their respective partitions; there is no reason to move them around. Contrast this with the groupBy method:

def groupBy[K](f: T => K): RDD[(K, Iterable[T])]

As Spark executes the provided f for every element in the RDD, two elements in different partitions might be assigned the same key. This will cause Spark to shuffle the data in the partitions and move the two elements into the same physical machine in order to group them into the Iterable.

Avoiding data shuffles is critical for coaxing high performance out of Spark. Moving data between machines over the network in order to perform a computation is a magnitude slower than computing data within a partition.

Lastly, it is important to note that RDDs contain a pluggable strategy for assigning elements to partitions. This strategy is called a Partitioner and it can be applied to an RDD using the partitionBy method.

To summarize, RDDs are Spark’s representation of a logical dataset distributed across separate physical machines. RDDs are immutable, and every transformation applied to them results in a new RDD and a new entry in the lineage graph. The computations applied to RDDs are deferred until an action occurs.

We’re taking a bottom-up approach in this series to introducing Spark. RDDs are the basic building block of Spark’s APIs, and are, in fact, quite low-level for regular usage. In the next post, we will introduce Spark’s DataFrame and SQL APIs which provide a higher-level experience.


Let’s move on to discuss Scylla. Scylla is an open-source NoSQL database designed to be a drop-in replacement for Apache Cassandra with superior performance. As such, it uses the same data model as Cassandra, supports Cassandra’s existing drivers, language bindings and connectors. In fact, Scylla is even compatible with Cassandra’s on-disk format.

This is where the similarities end, however; Scylla is designed for interoperability with the existing ecosystem, but is otherwise a ground-up redesign. For example, Scylla is written in C++ and is therefore free from nuisances such as stop-the-world pauses due to the JVM’s garbage collector. It also means you don’t have to spend time tuning that garbage collector (and we all know what black magic that is!).

Scylla’s Data Model

Scylla (and Cassandra) organize the stored data in tables (sometimes called column families in Cassandra literature). Tables contain rows of data, similar to a relational database. These rows, however, are divided up to partitions based on their partition key; within each partition, the rows are sorted according to their clustering columns.

The diagram illustrates how rows are divided between the cluster nodes. Although it is convenient to visualize, note that in practice, partition key values are not contiguous within the partition; the values are hashed before being assigned to a partition.

The partition key and the clustering columns together define the table’s primary key. Scylla is particularly efficient when fetching rows using a primary key, as it can be used to find the specific partition and offset within the partition containing the requested rows.

Scylla also supports storing the usual set of data types in columns you would expect – integers, doubles, booleans, strings – and a few more exotic ones, such as UUIDs, IPs, collections and more. See the documentation for more details.

In contrast to relational databases, Scylla does not perform joins between tables. It instead offers rich data types that can be used to denormalize the data schema – lists, sets, and maps. These work best for small collections of items (that is, do not expect to store an entire table in a list!).

Moving upwards in the hierarchy, tables are organized in keyspaces– besides grouping tables together, keyspaces also define a replication strategy; see the documentation for CREATE KEYSPACE for more details.

Running Scylla Locally

To run Scylla locally for our experiments, we’ve added the following entry to our docker-compose.yaml file’s services section:

This will mount the ./data/node1 directory from the host machine’s current directory on /var/lib/scylla within the container, and limit Scylla’s resource usage to 1 processor and 256MB of memory. We’re being conservative here in order to run 3 nodes and make the setup interesting. Also, Spark’s much more of a memory hog, so we’re going easy on your computer.

NOTE: This setup is entirely unsuitable for a production environment. See ScyllaDB’s reference for best practices on running Scylla in Docker.

The docker-compose.yaml file provided in the sample repository contains 3 of these entries (with separate data volumes for each node), and to run the nodes, you can launch the stack:

After that is done, check on the nodes’ logs using docker-compose logs:

You should see similar log lines (among many other log lines!) on the other nodes.

There are two command-line tools at your disposal for interacting with Scylla: nodetool for administrative tasks, and cqlsh for applicative tasks. We’ll cover cqlsh and CQL in the next section. You can run both of them by executing them in the node containers.

For example, we can check the status of nodes in the cluster using nodetool status:

The output you see might be slightly different, but should be overall similar- you can see that the cluster consists of 3 nodes, their addresses, data on disk, and more. If all nodes show UN (*U*p, *N*ormal) as their status, the cluster is healthy and ready for work.

A Brief Overview of CQL

To interact with Scylla’s data model, we can use CQL and cqlsh – the CQL Shell. As you’ve probably guessed, CQL stands for Cassandra Query Language; it is syntactically similar to SQL, but adapted for use with Cassandra. Scylla supports the CQL 3.3.1 specification.

CQL contains data definition commands and data manipulation commands. Data definition commands are used for creating and modifying keyspaces and tables, while data manipulation commands can be used to query and modify the tables’ contents.

For our running example, we will use cqlsh to create a table for storing stock price data and a keyspace for storing that table. As a first step, let’s launch it in the node container:

We can use the DESC command to see the existing keyspaces:

The USE command will change the current keyspace; after we apply it, we can use DESC again to list the tables in the keyspace:

These are useful for exploring the currently available tables and keyspaces. If you’re wondering about the available commands, there’s always the HELP command available.

In any case, let’s create a keyspace for our stock data table:

As we’ve mentioned before, keyspaces define the replication strategy for the tables within them, and indeed in this command we are defining the replication strategy to use SimpleStrategy. This strategy is suitable for use with a single datacenter. The replication_factor setting determines how many copies of the data in the keyspace are kept; 1 copy means no redundancy.

With the keyspace created, we can create our table:

Our table will contain a row per symbol, per day. The query patterns for this table will most likely be driven by date ranges. Within those date ranges, we might be querying for all symbols, or for specific ones. It is unlikely that we will drive queries on other columns.

Therefore, we compose the primary key of symbol and day. This means that data rows will be partitioned between nodes according to their symbol, and sorted within the nodes by their day value.

We can insert some data into the table using the following INSERT statement:

Now, let’s consider the following query under that data model – the close prices in January 2010 for all symbols::

This would be executed as illustrated in the following diagram:

The partitioning structure allows for parallel fetching of the data from each partition. The rows are combined on the query coordinator before being returned to the client.

If this reminds you of how RDDs execute data processing tasks – it should! Partitioning data by an attribute between several partitions and operating on the partitions in parallel is a very effective way of handling large datasets. In this series, we will show how you can efficiently copy a Scylla partition into a Spark partition, in order to continue processing the data in Spark.

With the data in place, we can finally move on to processing the data stored in Scylla using Spark.

The Datastax Spark/Cassandra Connector

The Datastax Spark/Cassandra connector is an open-source project that will allow us to import data in Cassandra into Spark RDDs, and write Spark RDDs back to Cassandra tables. It also supports Spark’s SQL and DataFrame APIs, which we will discuss in the next post.

Since Scylla is compatible with Cassandra’s protocol, we can seamlessly use the connector with it.

We’ll need to make sure the connector ends up on the Spark REPL’s classpath and configure it with Scylla’s hostname, so we’ll re-launch the shell in the Spark master’s container, with the addition of the --packages and --conf arguments:

The shell should now download the required dependencies and make them available on the classpath. After the shell’s prompt shows up, test that everything worked correctly by making the required imports, loading the table as an RDD and running .count() on the RDD:

The call to count should result in 4 (or more, if you’ve inserted more data than listed in the example!).

The imports that we’ve added bring in syntax enrichments to the standard Spark data types, making the interaction with Scylla more ergonomic. You’ve already seen one of those enrichments: sc.cassandraTable is a method added to SparkContext for conveniently creating a specialized RDD backed by a Scylla table.

The type of that specialized RDD is CassandraTableScanRDD[CassandraRow]. As hinted by the type, it represents a scan of the underlying table. The connector exposes other types of RDDs; we’ll discuss them in the next post.

Under the hood, the call to .count() translates to the following query in Scylla:

The entire table is loaded into the Spark executors, and the rows are counted afterward by Spark. If this seems inefficient to you – it is! You can also use the .cassandraCount() method on the RDD, which will execute the count directly on Scylla.

The element contained in the RDD is of type CassandraRow. This is a wrapper class for a sequence of untyped data, with convenience getters for retrieving fields by name and casting to the required type.

Here’s a short example of interacting with those rows:

This will extract the first row from the RDD, and extract the symbol and the close fields’ values. Note that if we try to cast the value to a wrong type, we get an exception:

Again, as in the case of row counting, the call to first will first load the entire table into the Spark executors, and only then return the first row. To solve this inefficiency, the CassandraRDD class exposes several functions that allow finer-grained control over the queries executed.

We’ve already seen cassandraCount, that delegates the work of counting to Scylla. Similarly, we have the where method, that allows you to specify a CQL predicate to be appended to the query:

The benefit of the where method compared to applying a filter transformation on an RDD can be drastic. In the case of the filter transformation, the Spark executors must read the entire table from Scylla, whereas when using where, only the matching data will be read. Compare the two queries that’ll be generated:

Obviously, the first query is much more efficient!

Two more useful examples are select and limit. With select, you may specify exactly what data needs to be fetched from Scylla, and limit will only fetch the specified number of rows:

The query generated by this example would be as follows:

These methods can be particularly beneficial when working with large amounts of data; it is much more efficient to fetch a small subset of data from Scylla, rather than project or limit it after the entire table has been fetched into the Spark executors.

Now, these operations should be enough for you to implement useful analytical workloads using Spark over data stored in Scylla. However, working with CassandraRow is not very idiomatic to Scala code; we’d much prefer to define data types as case classes and work with them.

For that purpose, the connector also supports converting the CassandraRow to a case class, provided the table contains columns with names matching the case class fields. To do so, we specify the required type when defining the RDD:

The connector will also helpfully translate columns written in snake case (e.g., first_name) to camel case, which means that you can name both your table columns and case class fields idiomatically.


This post has been an overview of Spark, Scylla, their architectures and the Spark/Cassandra connector. We’ve taken a broad-yet-shallow look at every component in an attempt to paint the overall picture. Over the next posts, we will dive into more specific topics. Stay tuned!

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Hooking up Spark and Scylla: Part 1 appeared first on ScyllaDB.

New York Meetup Wrap Up – Cassandra & Kubernetes

On 18th July 2018, Instaclustr co-hosted the NoSQL meetup with JPMorgan Chase in New York City. There was a fantastic turnout — over 125 attendees from various technical backgrounds and industries, including finance, retail and adtech.


To start the evening, an introduction and overview of JPMC’s usage of Cassandra was given by Adam Carson, CTO of the Digital Group at JPMC.

The main presentation, Cassandra and Kubernetes, presented by Instaclustr’s SVP of Engineering and co-founder Adam Zegelin, was well received and prompted a lengthy Q&A session from the attendees, followed by additional one-on-one discussions to close out the evening.


Presented was first a brief overview of Linux containers; Adam discussed their strengths and benefits, how they compare with equivalent and competing process sandboxing technologies from other platforms such as Solaris Zones and FreeBSD jails, and a comparison against complete system virtualization technologies such as KVM or Xen.


Containers on Linux are a combination of various kernel-level APIs and user-space applications that provide an extensible and pluggable framework for process sandboxing. This flexibility makes containers extremely powerful, but also complex to utilise without proper tooling. Solutions such as Kubernetes, Docker, rkt and containerd strive to streamline and simplify the act of running containers on Linux, and bring containers in-line with the first-class sandboxing and virtualization technologies of other platforms.


Containers provide a separation of concerns. They allow you to package, install and run all the libraries, dependencies and userland tools for an application without interference or conflicts from other applications or components. This separation compares to that offered by virtual machines, but with additional flexibility and improved performance.


An introduction to Kubernetes followed, with Adam explaining the fundamentals, and also the merits and benefits of using Kubernetes as a Linux container orchestration and management platform.


The Kubernetes API has become the de-facto container orchestration API. It’s supported in the cloud, with first-class offerings from all the major providers, including AWS EKS, Google Kubernetes Engine and Azure Kubernetes Service. For on-premise installations, Kubernetes can be run directly, or via the numerous third-party compatible implementations and extensions, including Pivotal Kontainer Service and RedHat OpenShift.


Combining containers, Kubernetes and Cassandra, Adam then followed with an in-depth look into Instaclustr’s cassandra-operator, a work-in-progress open source project with a goal to ease the deployment and management of Apache Cassandra on Kubernetes clusters.


Kubernetes has no built-in understanding of the processes and procedures required to deploy, scale and manage a Cassandra cluster. The cassandra-operator extends the Kubernetes API via CustomResourceDefinitions to add first-class support for managing Cassandra clusters. At runtime, the operator coordinates with both the Kubernetes API and Cassandra (via JMX) to provide a seamless integration.


During an initial deployment, the cassandra-operator will automatically manage configuration,  Cassandra seed node allocation and node placement (network topology). The cassandra-operator correctly handles bi-directional scaling of Cassandra clusters, correctly adding or decommissioning nodes when required.


Current in-progress and future features include adding monitoring support (via direct integration with Prometheus), automatic backup and restore, scheduled repair, and more.


The post New York Meetup Wrap Up – Cassandra & Kubernetes appeared first on Instaclustr.

Exploring How the Scylla Data Cache Works

Scylla Data Cache

Scylla persists data on disk. Writes to Scylla are initially accumulated in RAM in memtables, which at some point get flushed to an sstable on disk and removed from RAM. This means that reads need to look into both active memtables and sstables for relevant information. Access to data on disk is slower than to RAM, so we use caching to speed up access to data on disk.

The cache is a read-through data source that represents data stored in all sstables on disk. This data may not fit in RAM, so the cache is capable of storing only a subset of it in memory.

The cache populates itself from disk during reads, if necessary, expanding into all available memory. It evicts the least recently used data when room must be made for current memory requests.

The diagram below illustrates the composition of data sources:

Cache Granularity in Scylla 1.7

Let’s start with a reminder of the structure of data in Scylla:

  • At the highest level, we have partitions, identified by a partition key
  • Partitions contain rows, identified by a clustering key
  • Rows contain cells, identified by column names

Below you can find an example of a table schema in CQL and how its columns map to different elements:

Before Scylla 2.0, the cache was optimized for small partitions and any given partition was either cached completely or not at all. In other words, the cache had a granularity of a partition for both population and eviction.

However, this doesn’t work well for partitions with many rows. Reads that query only a subset of rows will have to read the whole partition from disk if it’s absent in the cache in order for the data to get into the cache. This may result in significant read amplification as partitions are getting larger, which directly translates to higher read latency. The diagram below depicts the situation:

In the example above, the read selects only a single row from a partition with 5 rows, but it will need to read the whole partition from disk to populate the cache.

Full partition population may also pollute the cache with cold rows, which are not likely to be needed soon. They take up space that could otherwise be used for hot rows. This reduces the effectiveness of caching.

Having partition granularity of eviction also caused problems with large partitions. Evicting large amounts of data at once from memory will make the CPU unavailable for other tasks for a while, which adds latency to operations pending on that shard.

As a temporary measure to avoid those problems, our Scylla 1.7 release introduced a limit for the size of the partition that is cached (10 MiB by default, controlled by the max_cached_partition_size_in_bytes config option). When a populating read detects that the partition is larger than this, the population is abandoned and the partition key is marked as too large to cache. This read and subsequent ones will go directly to sstables, reading only a slice of data determined by sstable index granularity. This effectively disabled the cache for large partitions.

Row-granularity Population

In Scylla 2.0, the cache switched to row granularity for population. It could now cache a subset of a partition, which solves the problem of read amplification for partitions with many rows.

The cache doesn’t simply record information about the presence of rows that are cached, it also records information about which ranges of rows are fully populated in the cache. This is especially important for multi-row range queries, which would otherwise never be sure if the cache holds all of the relevant rows, and would have to go to disk. For example, consider a query that selects the first N rows from a given partition in the following situation:

The cache records the information that the sstables don’t contain any rows before the one that is in cache, so the read doesn’t need to go to sstables.

In Scylla 2.0 and 2.1, the cache still has the problem of eviction granularity being at the partition level. When Scylla evicts a large partition, all of it has to go away. There are two issues with that:

  • It stalls the CPU for a duration roughly proportional to the size of the partition, causing latency spikes.
  • It will require subsequent reads to repopulate the partition, which slows them down.

Row-granularity Eviction

Scylla 2.2 switches eviction to row granularity, solving the problems mentioned earlier. The cache is capable of freeing individual rows to satisfy memory reclamation requests:

Rows are freed starting from the least recently used ones, with insertion counting as a use. For example, a time-series workload, which inserts new rows in the front of a partition, will cause eviction from the back of the partition. More recent data in the front of the partition will be kept in cache:

Row-granularity Merging of In-memory Partition Versions

Scylla 2.4 will come with yet another latency improvement related to the merging of in-memory versions of partition data.

Scylla aims to provide partition-level write isolation, which means that reads must not see only parts of a write made to a given partition, but either all or nothing. To support this, the cache and memtables use MVCC internally. Multiple versions of partition data, each holding incremental writes, may exist in memory and later get merged. That merging used to be performed as an indivisible task, which blocks the CPU for its duration. If the versions to be merged are large, this will noticeably stall other tasks, which in turn impacts the perceived request latency.

One case in which we may get large versions is when a memtable is moved to cache after it’s flushed to disk. Data in a memtable for a given partition will become a new partition version on top of what’s in cache. If there are just a few partitions written to, that can easily generate large versions on a memtable flush. This is tracked under the following GitHub issue:

The graph below illustrates this effect. It was taken for a workload that appends rows to just a few partitions, hence there are few, but large, partitions in memtables. You can see that after a memtable is flushed (“memtable” group on the “Runtime” graph rises), the “memtable_to_cache” group on the “Runtime” graph rises, which correlates directly with latency spikes on the “Read latency” graph.

In Scylla 2.4, this is solved by making partition version merging preemptable. It hooks up into the general preemption mechanism in the Seastar framework, which periodically sets the preemption flag every 500 microseconds (aka, task quota) by default, which allows currently running operations to yield. Partition version merging will check this flag after processing each row and will yield when raised. This way, the impact on latency is limited to the duration of task quota.

Below is a graph showing a similar workload on ee61660b (scheduled for release in 2.4). You can see that the impact of a memtable flush on read latency is no longer significant:

Test Results: Scylla 1.7 vs Master (pre-2.4)

Here’s an experiment that illustrates the improvements using a time-series like workload:

  • There are 10 clients concurrently prepending rows to 10 partitions, each client into a different partition. The ingestion rate is about 20k rows/s
  • There is one client that is querying for the head of one of the partitions with limit 1 and at a frequency of 10 reads per second

The purpose of the test is to see how the latency of reads looks when partitions grow too large to fit in cache and are therefore evicted.

I used a modified1 scylla-bench for load generation, with the following command lines:

  • ./scylla-bench -workload timeseries -mode write -concurrency 10 -partition-count 10 -clustering-row-count 1000000000 -max-rate 20000
  • ./scylla-bench -workload sequential -mode read -concurrency 1 -partition-count 1 -no-lower-bound -max-rate 10

The Scylla servers are started with one shard and 4 GiB of memory.

I compared Scylla 1.7.5 with the master branch commit ee61660b76 (scheduled for release after 2.3).

Behavior on Scylla 1.7.5

The graphs below show the time frame around the event of the cache getting forcefully invalidated using RESTful API2 during the test.

We can see that reads start to miss in the cache but do not populate it. See how “Cache Used Bytes” stays flat. We can see that reads start to hit the large partition markers, which is shown on the “reads from uncached large partitions” graph. This means reads will go directly to sstables. The latency of reads jumps to above 100ms after eviction because each read now has to read from sstables on disk.

We can also see periodic latency spikes before the cache gets evicted. Those are coming from cache updates after a memtable flush. This effect was described earlier in the section titled “Row-granularity merging of in-memory partition versions.”

Behavior on the Master Branch

On the graphs below, you can see the time frame around the event of the cache getting forcefully invalidated using RESTful API3 at 18:18:07:

The first thing to notice is the latency spike caused by forceful cache invalidation. This is an artifact of cache invalidation via RESTful API, which is done without yielding. Normal eviction doesn’t cause such spikes, as we will show later.

Other than that, we can observe that read latency is the same before and after the event of eviction. There is one read that misses in cache after eviction, which you can tell from the graph titled “Reads with misses.” This read will populate the cache with the head of the partition, and later reads that query that range will not miss.

You can see on the graphs titled “Partitions” and “Used Bytes” that the cache dropped all data, but after that, the cache gets populated on memtable flushes. This is possible because the range into which incoming writes fall is marked in cache as complete. The writes are to the head of the partition, before any existing row, and that range was marked as complete by the first read that populated it.

Another interesting event to look at is the time frame around the cache filling up all of the shard’s RAM. This is when the internal cache eviction kicks in:

Eviction happens when the cache saturates memory at about 18:08:27 on those graphs. This is where the spike of “Partition Evictions” happens and where unused system tables get evicted first. Then we have only spikes of “Row evictions” because eviction happens from large partitions populated by the test. Those are not evicted fully, hence no “Partition Evictions.” Because eviction happens starting from least recently used rows, reads will keep hitting in the cache, which is what you can see on the “Reads with misses” graph, which stays flat at 0.

We can also see that there are no extraordinary spikes of read/write latency related to eviction.

Test Results: Scylla Master (pre-2.4) vs Cassandra

We ran the same workload on GCE to compare the performance of Scylla against Cassandra.

A single-node cluster was running on n1-standard-32 and the loaders were running on n1-standard-4. Both Scylla and Cassandra were using default configurations.

Below you can find latency graphs during the test for both servers.

Cassandra 3.11.2:

Scylla Master:

As you can see, these latency tests show noticeable differences for the cacheable workload that benefits from Cassandra’s usage of the Linux page cache. We will conduct a full time series comparison later on.

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

1: The modifications are about making sure the “timeseries” write workload and “sequential” read workload use the same set of partition keys

2: curl -X POST http://localhost:10000/lsa/compact

3: curl -X POST http://localhost:10000/lsa/compact

The post Exploring How the Scylla Data Cache Works appeared first on ScyllaDB.

Q&A With DataStax Co-Founder Matt Pfeil

In celebration of Apache Cassandra’s 10th anniversary, we had Patrick McFadin, our VP of Developer Relations, have a Q&A with DataStax Co-Founder Matt Pfeil.

Q: You and Jonathan Ellis decided to quit your jobs and start a company devoted to supporting Apache Cassandra. What was is about Cassandra that impressed you so much that you you decided to make such a crazy move?

A: I came from a world where we ran MySQL in a very distributed format. Hundreds of machines and we did all the sharding ourselves. Cassandra’s P2P architecture and resiliency was incredibly attractive. Also, we had worked with it pre-Riptano at Rackspace.

Q: In those early days, there was a lot of new and different databases arriving on the scene. Did you ever get worried about the Cassandra project being viable?

A: Definitely. During the 2009 to 2011 timeframe it felt like a new NoSQL project was launching weekly. It’s sort of like how Crypto is today. New coin every week. We spent a TON of time just making sure we had community users who were happy.

Q: What was one of the most important moments you saw in the Apache Cassandra community?

A: The Cassandra Summits we ran. They kept attracting bigger crowds with bigger company names with bigger use cases.

Q: You spent a lot of time trying to convince big companies that they should use Cassandra. What was the biggest challenge then and how does that contrast to today?

A: It was never a challenge of the theory of Cassandra. People would ask questions and the tech spoke for itself. The challenges in the early days were more about how early the software was.  People are scared of new, pre-V1.0 things in production. As time progressed and you had big names using it, it iterated to implementation challenges. It’s new, so people had to learn new things and that takes time.

Q: What advice would you give to someone working on an early stage technology with breakout potential?

A: Get users. Do whatever you can to make them successful in production and get them talking about it to the world. Repeat.

Join the Apache Cassandra Celebration


Reaper 1.2 Released

We are happy to announce the release of Cassandra Reaper 1.2!

It’s been four months since the last minor release of Reaper, being 1.1. In this period the community has delivered almost three times as many new features, improvements and bug fixes, as the previous four months going back to Reaper 1.0.

Performance and Stability at Scale

With Reaper known to be deployed and tested on clusters of 500+ nodes, the stabilization efforts and performance improvements continue. Some highlights of these are:

  • Improved performance for long-running Reaper instances with lots of repair runs history,
  • Improved performance for Reaper instances in charge of many clusters and tables, especially when Cassandra-3+ is used for backend storage,
  • Improved and Expanded Continuous Integration for Travis and CircleCI,
  • Daily Rollover of logs, ensuring performance is stable day after day.

Reaper is built with the objectives of running as a distributed system while supporting repairs across multiple regions and clusters. This is no small feat and depends upon all our wonderful engaged users providing a wide variety of valuable feedback and contributions.

Improved CircleCI tests

Support for Listing Snapshots

Reaper can now visualize all the snapshots in your clusters. Reaper can also take cluster-wide snapshots for you at the click of a button, or from the command line cluster-wide snapshots can now be created by using the tool.

Listing of snapshots is only supported against clusters of Cassandra-2.1+

Listing Snapshots

Ease of Scripting Clusters and Schedules

With the rise of containerization, and especially immutable containers, Reaper is easier than ever to configure to start up automatically with clusters and schedules registered and ready to go. From Mesos to Kubernetes, you can feel safe that no matter how many times you redeploy new instances of Reaper, no matter how many times you wipe Reaper’s backend storage, your Cassandra clusters will always get repaired according to your schedules without ever having to touch Reaper’s UI.

The typical use-case we’ve seen so far is using spreaper in custom docker image’s to register cluster and schedules.

This comes with the added benefit that the Reaper backend storage can now be treated as volatile space. For example when using the Cassandra backend the reaper_db keyspace can be freely deleted (when Reaper is stopped). And beyond the need for history of repair runs there’s really no reason to backup this data. (Backups are still a must when upgrading Reaper.)

We’ve also added bash completion of spreaper. Debian systems should get it installed, otherwise it needs to be run manually with

. src/packaging/etc/bash_completion.d/spreaper

Web Authentication

Your Reaper instances can now be protected with users and passwords. Authentication (and authorization) is implemented using Apache Shiro.

For more information read the documentation.

Segment Coalescing and Repair Threads

Against Cassandra-2.2+ clusters, Reaper’s repairs will now group together all token ranges that share the same replicas into common segments. For clusters using the default vnodes setting of num_tokens: 256 this results in a rather dramatic decrease in repair duration. Note that The Last Pickle recommends that new clusters are configured with num_tokens: 32 (or 4 with Cassandra-3+). Also note that as token ranges get grouped together Reaper’s back-pressure mechanism: the intensity setting and checks against pending compactions; become less efficient.

For clusters running Cassandra-2.2+ Reaper now has a new option Repair Threads. This specifies how many threads can be used within a submitted repair where multiple token ranges exist. Each token range can not be processed or split by multiple threads, so the repair thread option only works where coalescing occurs.

And the selection of segments has been made simpler. Instead of selecting the total number of segments for a whole cluster, which requires a bit of contextual and variable knowledge like number of nodes and vnodes, the segment count is now per node. That is you choose how many segments you would like the repair on each node to be split into. If this selection is less than the number of vnodes than it can be expected that token ranges will be coalesced.

Automated Purging of Completed Repairs

Repair run history can really accumulate over time. While we seek to address any burdens that this ever imposes, from performance to UI clutter, we have also added the ability to routinely purge completed repair runs.

To use automatic purging configure the reaper yaml with the following options:

purgeRecordsAfterInDays: 0
numberOfRunsToKeepPerUnit: 50

Either/or of these settings can be set. The purgeRecordsAfterInDays setting determines how many days completed repair runs are kept. A value of zero disables the feature. The numberOfRunsToKeepPerUnit setting determines how many completed repair runs (of the same type) are kept. A value of zero disables the feature.

What’s Next for Reaper

The Reaper community is growing and we’re constantly seeing contributions coming in from new contributors. This is a wonderful sign of an establishing open source application, and we’d like to extend our thanks to Spotify for starting the Cassandra Reaper journey.

The next release of Reaper and what it holds is ultimately up to what old and new contributors offer. If you’d like to take a peek at what’s in progress take a look at our current pull requests in action. And if you’d like to influence what 1.3 or beyond looks like, get involved!

Upgrade to 1.2

The upgrade to 1.2 is recommended for all Reaper users. The binaries are available from yum, apt-get, Maven Central, Docker Hub, and are also downloadable as tarball packages. Remember to backup your database before starting the upgrade.

All instructions to download, install, configure, and use Reaper 1.2 are available on the Reaper website.

Let’s meet at the Scylla Summit 2018!

scylla summit 2018


We are excited to host our third annual Scylla Summit this year, and we would love to see you there. We had a very successful summit last year. Our growing community had the opportunity to hear firsthand from Scylla users about their success and also from our engineers about the underlying architecture that enables us to deliver predictable low latencies at high throughput out of the box. What’s coming on our product roadmap? We’ll talk about that too!

Just in case you’d to see the kind of content we had at last year’s Summit, we’ve made the recordings available here. In one of our keynotes, we heard from mParticle about their experiences running Scylla to power their mission-critical environment. We also had talks about integrating Scylla with various products like Kafka, Spark, and KairosDB.

Our CTO, Avi Kivity, talked about our plans to move Scylla more and more towards row granularity, and this year we will hear about the advances we’ve made in this area for 2018. I talked about the very beginning of last year’s conference about our adaptive controllers implementation, and this year we will show the advancements we’ve made in that area like the compaction controllers.

But there’s so much more than talks! At Scylla Summit, a great track is the hallway track, where you can freely learn about Scylla, share your experiences and discuss topics of interest with experts in the field.

We are already receiving many exciting proposals from our users, and the call for speakers is open until August 17th. If you are doing something interesting with Scylla, we would love to hear from you. And if you are just interested in coming and learning, the registration is already open. There’s still time to get the super early-bird discount!

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Let’s meet at the Scylla Summit 2018! appeared first on ScyllaDB.

Measuring Performance Improvements in Scylla 2.2

performance improvements

When we released Scylla 2.2, we announced that it includes many performance (throughput, latency) improvements. We’re glad for the opportunity to quantify some of those improvements. In a recent post, we described a large partition use case with the improved query paging of Scylla 2.2. In this blog post, we will put Scylla 2.2 to the test against Scylla 2.1, comparing the two versions with read and write workloads. This post is a collaborative effort between Larisa Ustalov and myself, with the help of many others.

Highlights of Our Results:

  • Read benchmark: 42% reduction in 99th percentile latency with 1kB cells
  • Write benchmark: 18% throughput increase

Workloads Tested

We tested with two Scylla use cases: Write and Read. Details, including a description of the workload and results of each workload, are included below.

Read Workload – Latency Test

We tested the impact of Scylla 2.2 on read workload latencies, and in particular, the improvement from changing the row digest hash from md5 to xxHash #2884. To isolate the latency change, the load throughput was fixed to 80K/s, resulting with a CPU load of ~50%. You can find complete details on the test setup in the appendix below.


Results – Extracted from cassandra-stress

Latencies (lower is better)

Scylla 2.1 Scylla 2.2 Improvement
Mean latency 2.6 ms 1.8 ms 23%
95% latency 4.9 ms 3.3 ms 32%
99% latency 7.8 ms 4.5 ms 42%

Scylla 2.1 – 99% latency over time: (each line represents one node in the cluster)

Scylla 2.2 – 99% latency over time:

Summary: Running the same workload on Scylla 2.2 results in lower read latency than Scylla 2.1

Write Workload

The write workload was designed to test the effect of the new CPU controller on Scylla 2.2. The impact of the controller is greater when Scylla is fully loaded and needs to balance resources between background tasks, like compactions, foreground tasks, and write requests. To test that, we injected the maximum throughput, writing 500GB of data sequentially. Complete details of test setup are in the appendix below.


Average operations per second for the entire test.

Scylla 2.1 Scylla 2.2 Improvement
Ops ~354K ~418K +18%

Throughput over time:

Scylla 2.1

Scylla 2.2

The initial decline in throughput in the first ~15 minutes is expected. As more data accumulates on the storage, compactions kick-in and take resources away from the real-time requests. The difference between the releases is the controller. In Scylla 2.2, it is doing a better job of stabilizing the system and provides more consistent throughput during compactions. This effect is more evident when looking at the number of concurrent compactions. Compared to Scylla 2.1, Scylla 2.2 more consistently runs the same number of compactions, resulting in smoother performance.

Scylla 2.1 – Number of active compactions

Scylla 2.2 – Number of active compactions

Summary: Using the same setup, Scylla 2.2 can handle higher write throughput than Scylla 2.1


Our performance comparison of Scylla 2.2 and Scylla 2.1 demonstrates significant improvements with write throughput and read latency for two simplistic use cases. Stay tuned for additional benchmarks of Scylla 2.2 with future releases.

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

Appendix – Test Setup

  • Scylla Cluster
    • Nodes: 3
    • Instance type: I3.8xlarge
    • Scylla 2.1.6 AMI (, us-east-1)
    • Scylla 2.2rc3 AMI (ami-917521ee, us-east-1)
  • Loaders
    • Servers: 4
    • Instance type: c4.4xlarge
    • Workloads (all of them)
    • Replication Factor(RF) = 3
    • Consistency Level(CL) = QUORUM
    • Compaction Strategy: Size-Tiered

Read Workload

  • Data: 1,000,000,000 (1 Billion) keys with 1,024 bytes each (raw data 1 TB)
    cassandra-stress(c-s) command used to populate data:
  • 4 loaders, each running 150 threads, limit to a value – 20000/s
  • cassandra-stress command used to populate the data:
    cassandra-stress write no-warmup cl=QUORUM n=1000000000 -schema 'replication(factor=3)' -port jmx=6868 -mode cql3 native -rate threads=200 -col 'size=FIXED(1024) n=FIXED(1)' -pop seq=1..1000000000"
  • cassandra-stress command used to read the data:
    cassandra-stress read no-warmup cl=QUORUM duration=100m -schema keyspace=keyspace$2 'replication(factor=3)' -port jmx=6868 -mode cql3 native -rate 'threads=150 limit=20000/s' -errors ignore -col 'size=FIXED(1024) n=FIXED(1)' -pop 'dist=gauss(1..750000000,500000000,250000000)'

Write Workload

  • Date: 10^9 keys with 10^3 bytes each (raw data 1 TB)
  • 4 loaders, each running 1,000 threads
  • cassandra-stress command:
    cassandra-stress write no-warmup cl=QUORUM duration=120m -schema keyspace=keyspace$2 'replication(factor=3)' -port jmx=6868 -mode cql3 native -rate 'threads=1000' -errors ignore -pop 'seq=1..500000000'

The post Measuring Performance Improvements in Scylla 2.2 appeared first on ScyllaDB.

Data Autonomy: Why It’s No Longer a “Nice to Have”

Who do you trust most with your enterprise’s valuable data and where should you leave it?

It’s a question nearly all companies are mulling over today.

Data is your most valuable asset, and retaining control over its location and portability is imperative to remaining competitive. You can’t afford to have cloud lock-in, and yet, many enterprises find themselves exactly that: locked in.

Blind Trust to Wise Discretion

The public cloud was built on trust. You stored your business’s data on AWS, Microsoft Azure, Google Cloud Platform, or a platform-as-a-service with the underlying assumption that your data would be protected from competitive threats. Even given potential risks, if you wanted to use a public cloud, you had limited choices. Trust was implied. You jumped in. Make no mistake, these major public cloud providers remain great cloud options today.

But they are not without competitive risks.

What happens when Amazon, Microsoft, or Google acquires a business and suddenly, they’re your competitor? It wasn’t too long ago that Walmart announced to its vendors that they were no longer allowed to use AWS. Amazon was encroaching as a major retail competitor and it no longer made sense to have Walmart’s sensitive data entrusted on their competitor’s cloud.

The point is: business shifts overnight. Data is your most valuable asset. The big public cloud providers are major businesses that continue to grow and expand into new markets. You cannot afford to have your data locked-in anywhere.

So where do you store your data?

Most businesses are grappling with the choices: on-premises, private cloud, public, or hybrid cloud. Being able to move data when and where needed between private and public clouds and across cloud vendors without losing services or disrupting business is the goal.

Enter Data Autonomy

Data autonomy gives you the power to maintain control of your data, no matter where it resides. This means having the flexibility to store, run, and handle your data in any type of environment (public, hybrid, on-premises, or multi-cloud), and to do it securely with granular data access control. It enables your company to be data-agile, so you can respond to sudden shifts in the market, new opportunities, and unforeseen threats quickly, without losing data or services.

To have data autonomy, you need an agile data layer that can handle the characteristics of today’s cloud apps, which are:

  • Contextual
  • Always on
  • Real time
  • Distributed
  • Scalable

To make this happen, you need a data management platform built for cloud applications and hybrid cloud.

DataStax Enterprise (DSE) is the always-on data platform powered by the best distribution of Apache Cassandra™. It allows you to choose your own infrastructure – so your most sensitive data is completely in your control and lets you configure where and how you want your data stored. You can keep critical data on your own infrastructure and other data in public clouds.

DSE is cloud-agnostic and independent of underlying cloud infrastructure, so you can move data without having to rewrite code and security privileges for each app (all your apps already work against DataStax Enterprise); this gives you the ability to quickly and easily move data and respond to market changes. You also won’t lose data management services (search, analytics, graph, security, etc.), as they are included in DSE when you move from one public cloud to another or to a private cloud.

With DataStax Enterprise, your data remains yours and you can use any cloud you want knowing that you have the power to move data at your discretion, and stay agile, competitive, and protected from unforeseen market changes.

On-Demand: How to Power Innovation with Geo-Distributed Data Management in Hybrid Cloud (webinar)


Undetectable tombstones in Apache Cassandra

One of the usual suspects for performance issues in the read path of Apache Cassandra is the presence of tombstones. We are used to check how many tombstones are accessed per read early in the process, to identify the possible cause of excessive GC pauses or high read latencies.
While trying to understand unexpected high read latencies for a customer a few months ago, we found out that one special (although fairly common) kind of tombstone was not counted in the metrics nor traced in the logs : primary key deletes.

A tale of two tombstones

Tombstones in Cassandra divide in two main categories:

  • cell tombstone
  • range tombstone

The former is a tombstone that shadows a single cell. This kind of tombstone is usually generated by TTLs, where once a cell expires it turns into a tombstone. It can also be the result of updating a cell to a null value or of deleting a single cell.

The latter is a tombstone that can shadow multiple cells. This kind of tombstone is generated by DELETE statements that can have different spans:

  • Partition deletes: a single tombstone will shadow the whole partition
  • Multiple rows deletes: a single tombstone will shadow a subset of the partition
  • Single row deletes: a single tombstone will shadow a single row only

To be fully accurate, range tombstones are always composed of two tombstones, one for each bound of the shadowed range.

Considering the following table:

CREATE TABLE test.tombstones (
    id int,
    clust1 text,
    clust2 text,
    val1 text,
    val2 text,
    PRIMARY KEY (id, clust1, clust2)

Placing a partition level range tombstone would be achieved using the following query:

DELETE FROM test.tombstones WHERE id = ?

Placing a multi-row range tombstone would require the following query:

DELETE FROM test.tombstones WHERE id = ? AND clust1 = ?

Since Apache Cassandra 3.0.0 it is also possible to perform the following kind of range delete thanks to CASSANDRA-6237:

DELETE FROM test.tombstones WHERE id = ? AND clust1 > ? AND clust1 <= ?

Creating a single row range tombstone would be done as follows:

DELETE FROM test.tombstones WHERE id = ? AND clust1 = ? AND clust2 = ?

And we can create cell tombstones by either deleting/nulling a single column (or setting a TTL at write time):

DELETE val1 from test.tombstones WHERE id = ? AND clust1 = ? AND clust2 = ? 


UPDATE test.tombstones SET val1 = null WHERE id = ? AND clust1 = ? AND clust2 = ? 

Detecting the bug

Our customer had some pretty powerful hardware with top notch SSD drives and more than enough CPU and RAM to get the best out of Apache Cassandra 3.11.1.

Most tables were exhibiting correct read perfomance but one, and the cluster was going through a lot of long GC pauses (> 1s):

Keyspace     Table             p50           p75           p95
ks1          table1       24985.15      62174.25     670692.26
ks1          table2         772.01       1965.33       9462.06
ks1          table3          39.03         56.19       2807.62

All 3 tables had the same data model and nodetool tablestats output showed no tombstones per read. Since what we were seeing made no sense, we suspected that one of the metrics we were using was not accurate.

Although we weren’t seeing any tombstone read according to the logs and the metrics, the customer confirmed that DELETE statements were performed regularly on the full primary key, so we experimented to find out why Cassandra behaved this way.

Reproducing the problem was fairly easy and we used CCM to test different versions of C*, using the test.tombstones table defined above.

We wrote a short Python script to populate the table with a configurable number of rows, on a single partition spreading over 10 clustering keys:

futures = []
for i in range(int(sys.argv[1])):
        insert into test.tombstones(id, clust1, clust2, val1, val2) 
        values(1,'""" + str(i%10) + """','""" + str(i) + """','test','test')"""))
    if i%1000==0 and i>0:
        for future in futures:
            rows = future.result()
        print str(i+1) + " rows..."
        futures = []

Then we made another script to delete a subset of the rows by primary key (single row deletes):

futures = []
for i in range(int(sys.argv[1])):
        DELETE FROM test.tombstones 
        WHERE id=1 
        AND clust1 = '""" + str(str(i%10)) + """' 
        AND clust2='""" + str(i) + """'"""))
    if i%1000==0 and i>0:
        for future in futures:
            rows = future.result()
        print str(i+1) + " rows..."
        futures = []

The procedure was to run the first script to add 10,000 rows, flush all memtables and then run the deletion script to remove 1,000 rows (so that tombstones are stored in different memtables/SSTables than the shadowed data).

Reading the table in Apache Cassandra 2.0.13 gave the following traces:

cqlsh> tracing on
Now tracing requests.

cqlsh> select count(*) from test.tombstones where id = 1;


(1 rows)

Tracing session: 9849e2b0-7f8f-11e8-9faa-4f54a5a22ee7

 activity                                                                  | timestamp    | source    | source_elapsed
                                                        execute_cql3_query | 15:39:37,762 | |              0
    Parsing select count(*) from test.tombstones where id = 1 LIMIT 10000; | 15:39:37,762 | |             66
                                                       Preparing statement | 15:39:37,762 | |            162
                            Executing single-partition query on tombstones | 15:39:37,763 | |           1013
                                              Acquiring sstable references | 15:39:37,763 | |           1035
                                               Merging memtable tombstones | 15:39:37,763 | |           1080
                       Partition index with 16 entries found for sstable 3 | 15:39:37,763 | |           1786
                               Seeking to partition beginning in data file | 15:39:37,763 | |           1803
 Skipped 0/1 non-slice-intersecting sstables, included 0 due to tombstones | 15:39:37,764 | |           2308
                                Merging data from memtables and 1 sstables | 15:39:37,764 | |           2331
                                  Read 9000 live and 3000 tombstoned cells | 15:39:37,796 | |          34053
                                                          Request complete | 15:39:37,831 | |          69508                                                        

Although we deleted 1000 rows, Apache Cassandra 2.0 reports 3000 tombstones were read: Read 9000 live and 3000 tombstoned cells.
Dumping one of the SSTables to JSON, we see that each row is composed of 3 cells:

        {"key": "00000001","columns": [
                ["0:100:","",1530711392080000], ["0:100:val1","test",1530711392080000], ["0:100:val2","test",1530711392080000],
                ["0:1000:","",1530711392233000], ["0:1000:val1","test",1530711392233000], ["0:1000:val2","test",1530711392233000],

So C* 2.0 does not count the tombstones but rather the individual cells that are shadowed by the tombstones (at least in this specific case).

Now with Apache Cassandra 2.1.18 to 3.11.1:

cqlsh> paging off
Disabled Query paging.
cqlsh> tracing on
Now Tracing is enabled
cqlsh> select count(*) from test.tombstones where id = 1;


(1 rows)

Tracing session: e39af870-7e14-11e8-a105-4f54a5a22ee7

 activity                                                                                        | timestamp                  | source    | source_elapsed
                                                                              Execute CQL3 query | 2018-07-02 18:28:01.783000 | |              0
                Parsing select count(*) from test.tombstones where id = 1; [SharedPool-Worker-1] | 2018-07-02 18:28:01.784000 | |             53
                                      Read 9000 live and 0 tombstone cells [SharedPool-Worker-2] | 2018-07-02 18:28:01.810000 | |          27004
                                                                                Request complete | 2018-07-02 18:28:01.825313 | |          42313

Since 2.1.x all the way to 3.11.1 (with 2.2.x and 3.0.x having the same behavior), there are no more tombstones reported : Read 9000 live and 0 tombstone cells.

The issue was not affecting the other kinds of range tombstones (partition and multiple rows) and cell tombstones were correctly counted as well.

It was then necessary to go down the read path to understand why some tombstones could be missed.

Merging mechanisms in Apache Cassandra

Being built on top of an LSM tree storage engine, C* has to merge cells that can spread in memtables and several SSTables in order to return back a consistent view of the queried data.
While the local node has to merge both cells and tombstones together to get the current state of the rows, it also has to exchange tombstones with other nodes in case of digest mismatch, to achieve read repairs.
The output of the merge will be:

  • a list of rows, containing a list of cells which can each be live or not (tombstones are cells that aren’t live anymore)
  • a list of tombstones which contains only range tombstones.

Different kinds of tombstones get merged differently with the data they shadow though:

  • Partition and multi row tombstones: data doesn’t survive the merge and the range tombstones are returned as such so that they can be used for digest comparison and read repair.
  • Cell tombstones: they are returned as cells with liveness set to false.
  • Single row tombstones: they are merged upstream and aren’t returned as tombstones. The row will be returned with no cell and liveness set to false.

After a redesign of the read path in 2.1 that aimed at optimizing performance by taking out unnecessary tombstoned cells during the merge phase, single row tombstones stopped being counted and became impossible to detect. Only range tombstones that survived the merge and individually tombstoned cells were counted, but a row with no cell would be skipped silently.

Patched for 3.11.2 and 4.0

CASSANDRA-8527 was committed for 3.11.2 and the upcoming 4.0 in order to count all empty, (non live) rows as tombstones:

cqlsh> SELECT count(*) from test.tombstones where id = 1;


(1 rows)

Tracing session: c0c19e20-7f91-11e8-90d3-b70479b8c91e

 activity                                                                                 | timestamp                  | source    | source_elapsed | client
                                                                       Execute CQL3 query | 2018-07-04 15:54:21.570000 | |              0 |
 Parsing SELECT count(*) from test.tombstones where id = 1; [Native-Transport-Requests-1] | 2018-07-04 15:54:21.570000 | |            173 |
                                        Preparing statement [Native-Transport-Requests-1] | 2018-07-04 15:54:21.570000 | |            315 |
                             Executing single-partition query on tombstones [ReadStage-2] | 2018-07-04 15:54:21.571000 | |            947 |
                                               Acquiring sstable references [ReadStage-2] | 2018-07-04 15:54:21.571000 | |           1061 |
  Skipped 0/1 non-slice-intersecting sstables, included 0 due to tombstones [ReadStage-2] | 2018-07-04 15:54:21.571000 | |           1271 |
                                                Key cache hit for sstable 1 [ReadStage-2] | 2018-07-04 15:54:21.572000 | |           1698 |
                                  Merged data from memtables and 1 sstables [ReadStage-2] | 2018-07-04 15:54:21.613000 | |          42574 |
                               Read 9000 live rows and 1000 tombstone cells [ReadStage-2] | 2018-07-04 15:54:21.613000 | |          42746 |
                                                                         Request complete | 2018-07-04 15:54:21.620624 | |          50624 |

This allows us to safely rely on traces and metrics when troubleshooting high read latencies in Apache Cassandra 3.11.2+.


Tombstones generated by full primary key deletes will not be reported in the logs nor counted in the metrics between Apache Cassandra 2.1.0 up to 3.11.1.

You can only guess about their presence if you’re observing unexpected latencies that cannot be explained by anything else.
As a general rule, it is advised to ask the dev teams if they are performing DELETE statements and which type precisely, even if the metrics suggest that there are no tombstones.

Upgrading to 3.11.2+ allows you to detect those tombstones both in logs and metrics. As a consequence, they will now be counted in the failure threshold above which C* will cancel in flight queries, while they were succeeding before the upgrade.

How to migrate data from Cassandra to Elassandra in Docker containers

A client recently asked us to migrate a Cassandra cluster running in Docker containers to Elassandra, with the data directory persisted via a bind mount. Elassandra is a fork of Cassandra integrated closely with Elasticsearch, to allow for a highly scalable search infrastructure.

To prepare the maintenance plan, we tested some of the methods as shown below.

The following are the commands used if you would like to test the process locally. Docker commands are used on one node at a time throughout the process to execute test statements. The Cassandra container is named my_cassandra_container, and the test Elassandra container is called my_elassandra_container. Replace the local directory /Users/youruser below as appropriate.

Start the Cassandra Container

First, start a container with the latest Cassandra version (3.11.2), binding the data volume locally as datadir.

In our use case, variables such as data center were pre-determined, but note that Cassandra and Elassandra have different default values in the container startup scripts for some of the variables. In the example below, data center, rack, snitch, and token number will be sent explicitly via environment variables flags (-e), but you can alternatively adjust these in the configuration files before starting Elassandra.

It will take about 15 seconds for this to start up before Cassandra is ready to accept the write statement following this. If you’re following the logs, look for “Created default superuser role ‘cassandra'” before proceeding.

docker run --name my_cassandra_container -e CASSANDRA_DC=DC1 -e CASSANDRA_RACK=RAC1 -e CASSANDRA_ENDPOINT_SNITCH=SimpleSnitch  -e CASSANDRA_NUM_TOKENS=8 -v /Users/youruser/mytest/datadir:/var/lib/cassandra -d cassandra:latest

Copy Configuration Files

Copy the Cassandra configuration files to a local location for ease of editing.

docker cp my_cassandra_container:/etc/cassandra/ /Users/youruser/mytest/cassandra

Create and Validate Test Data

Next, create some data in Cassandra using cassandra-stress as a data generator.

docker exec -it my_cassandra_container cassandra-stress write n=20000 -pop seq=1..20000 -rate threads=4

For comparison later, do a simple validation of the data by executing count and sample queries.

docker exec -it my_cassandra_container cqlsh -e "select count(*) from keyspace1.standard1"
docker exec -it my_cassandra_container cqlsh -e "select * from keyspace1.standard1 limit 1"

Stop and Remove the Cassandra Container

To prepare for the migration, stop Cassandra and remove the container.

docker exec -it my_cassandra_container nodetool flush
docker stop my_cassandra_container
docker rm my_cassandra_container

Install Elassandra Container

On a new container, install the latest Elassandra version using the same local data and configuration file paths as above. Again, it will take 15 seconds or so before the next statement can be run. If you are following the logs, look for “Elassandra started.”

docker run --name my_elassandra_container -e CASSANDRA_DC=DC1 -e CASSANDRA_RACK=RAC1 -e CASSANDRA_ENDPOINT_SNITCH=SimpleSnitch  -e CASSANDRA_NUM_TOKENS=8 -v /Users/youruser/mytest/datadir:/var/lib/cassandra -d strapdata/elassandra:latest

Validate Data

Now that Elassandra is running, re-validate the data. Note that at this point, only the fork of Cassandra is running, not integrated yet with Elasticsearch.

docker exec -it my_elassandra_container cqlsh -e "select count(*) from keyspace1.standard1"
docker exec -it my_elassandra_container cqlsh -e "select * from keyspace1.standard1 limit 1"

Repeat the above steps on remaining nodes.

Enable Elasticsearch

To enable the Elasticsearch part of Elassandra, stop Cassandra on all nodes. A rolling update does not work for this step. Enable Elasticsearch by updating the elasticsearch.yml configuration file as below. (Note that you have linked it to your local filesystem via the cp statement, so edit it directly on your local machine.)

docker stop my_elassandra_container
docker cp my_elassandra_container:/opt/elassandra- /Users/youruser/mytest/cassandra

vi /Users/youruser/mytest/cassandra/elasticsearch.yml Test Cluster  ## Name of cluster  ## Listen address
http.port: 9200

Restart and Validate Elassandra

Finally, restart and test the Elassandra container.

docker start my_elassandra_container

docker exec -it my_elassandra_container curl -X GET http://localhost:9200/

Sample output:

Elassandra GET Output

Elassandra GET Output

Thank you to Valerie Parham-Thompson for assistance in testing.

Apache Cassandra LDAP Authentication

We’ve seen an increasing need for LDAP integration into Apache Cassandra, and continually hearing of cases where people have written their own LDAP authenticators for Cassandra.

However, if you search around you’ll have a hard time finding one of these implementations, and you’ll likely have to write one yourself, which is no easy feat.

So, to solve this issue we’ve created an open source LDAP authenticator plug-in for Apache Cassandra that goes hand in hand with the existing CassandraAuthorizer implementation. At the moment it supports a basic usage of LDAP which should suffice for most cases, however improvements are welcome if you need to modify it to suit your needs and encouraged to submit pull requests for any enhancements.

This plug-in authenticator is freely available for anyone for use and is also be included in support scope for customers with Apache Cassandra Enterprise Support from Instaclustr.


The LDAPAuthenticator is implemented using JNDI, and authentication requests will be made by Cassandra to the LDAP server using the username and password provided by the client. At this time only plain text authentication is supported.

If you configure a service LDAP user in the file, on startup Cassandra will authenticate the service user and create a corresponding role in the system_auth.roles table. This service user will then be used for future authentication requests received from clients. Alternatively (not recommended), if you have anonymous access enabled for your LDAP server, the authenticator allows authentication without a service user configured. The service user will be configured as a superuser role in Cassandra, and you will need to log in as the service user to define permissions for other users once they have authenticated.

On successful authentication of a client, a corresponding role will be created in the system_auth.roles table. The password for the role is not stored in the roles table, and credentials will always be passed through directly to the configured LDAP server. The only credentials stored in Cassandra is the Distinguished Name of the user. However, if caching is enabled the password/hashed password will be stored in the cache, in memory only, on the nodes. Permissions-wise, this role will have no access to any keyspaces/tables, so GRANT’s will need to be issued before the user can perform any useful queries.

Once created, the role will never be deleted, and all authentication of the role will be handled through LDAP while the LDAPAuthenticator is in place. Removing or disabling the user in LDAP will disallow future connections as that user, but not clean up the user from system_auth.roles. This can be done manually if so desired and should be done if you wish to switch to a different authentication mechanism.
Regarding security, as the authenticator only supports plain text from clients you should ensure you have enabled and are using client encryption in Cassandra. On the LDAP side, you must use LDAPS otherwise credentials will be sent in the clear between Cassandra and the LDAP server. As all SSL configuration is performed through JNDI, simply specifying LDAPS as your protocol for the LDAP server (assuming it’s enabled on your server) will enable LDAPS.  

On 3.11 and later versions, a cache has been implemented to avoid thrashing your LDAP server. This cache will be populated with the username and either the provided password or a hash of the password based on the cache_hashed_password property in Note that hashing the password will incur a performance hit as the hash needs to be calculated on each auth. The password/hash is only stored in memory on the Cassandra nodes, so if you don’t enable hashing ensure appropriate security controls are in place for your Cassandra nodes.

LDAP JNDI properties can be set via the file. Simply specify the desired property and it will be set as part of the servers context. For example, you can set the LDAP read timeout like so: 2000

These properties and their documentation can be found here.


To transition to the LDAPAuthenticator you can do so in a rolling fashion with no downtime as long as you are using AllowAllAuthenticator to start with, or you handle auth failures from both LDAP and your old password authenticator in your client and try the alternate auth on the next request. Pre-creating the LDAP users in system.roles is also possible however not recommended as you will need to store the LDAP user passwords in Cassandra for it to work.

Alternatively you can do the switch with downtime with no issues however this requires turning off all the nodes simultaneously. To ensure no errors on startup due to creation of service roles you should start one node first and wait until it’s running before starting the rest of the nodes.

You can find the LDAP authenticator source code on GitHub here, with instructions on setup and usage in the README. Currently the authenticator is supported for 2.2, 3.0, and 3.11 versions. Use the corresponding branch in the repo for your desired version.

The post Apache Cassandra LDAP Authentication appeared first on Instaclustr.

Apache Kafka “Kongo” 6.2 – Production Kongo on Instaclustr

In this blog (parts 6.1 and 6.2) we deploy the Kongo IoT application to a production Kafka cluster, using Instraclustr’s Managed Apache Kafka service on AWS.  In part 6.1 we explored Kafka cluster creation and how to deploy the Kongo code. Then we revisited the design choices made previously regarding how to best handle the high consumer fan out of the Kongo application, by running a series of benchmarks to compare the scalability of different options. In this part (6.2) we explore how to scale the application on the Instaclustr Kafka cluster we created, and introduce some code changes for autonomically scaling the producer load and optimising the number of consumer threads.

Scaling Kongo for Production

As a result of the benchmarking in 6.1 we are more confident that the design we used, Option 3, is a good choice for the production version of Kongo.

For production, some extra changes were made to the Kongo code including:

  • Adding the kafka broker bootstrap server IPs and the SCRAM username/password;
  • Adding the ability to spin up multiple consumers for Sensor and RFID topics;
  • Introducing a wait before starting the simulation to ensure the target number of Consumers are created and working
  • Introducing a timeout for the Sensor and RFID consumers to ensure that they have processed all the available events before closing down correctly
  • Adding latency and throughput metrics for producers, consumers, and total events.
    • Latency is measured in a similar way to the benchmarks (time from event production to being sent to each Goods object at the same location as the event).  

All seven topics produced or consumed by Kongo were created manually with sensible default partitions (9 to start with).

The goal is to deploy Kongo and achieve maximum throughput and minimum end-to-end latency.

Initially running Kongo with guessed fixed numbers of consumers highlighted several problems:

  1. The producer was running flat out, and overloaded the Kongo cluster. This introduced long delays between event production and consumption, and an inability to increase the number of consumers with any meaningful results.
  2. It’s hard to guess the correct number of consumer threads. They may need to be higher than guessed, and may not be in the exact ratio of the number of events received by each consumer type (5:1 RFID to Sensor ratio) as each consumer type may take a different time to process events.

Here’s the Sankey diagram showing the fan-out problem and different rates of rfid and sensor events from a previous blog:

Sankey Diagram

To manage the 1st problem I added a “sleep” to the Kongo simulation code to run a simulated hour and produce the output, and then delay for a while before repeating. This enables us to limit the producer throughput, and ensures that the consumers can consistently keep up with the production rate.

There are at least three approaches to solve the 2nd problem and optimise the production rate and the number of threads for each consumer type.

The first is to benchmark all the possible permutations (i.e. production rate, number of consumer threads, latencies, throughputs, etc). I started out doing this and soon realised it was too Kafkaesque in practice, as it was time consuming and difficult to obtain consistent results.

A second approach could be to use simple performance modelling to calculate the number of consumer threads required in advance and then just set them to this number. This is certainly possible in theory, e.g. using Little’s law which relates concurrency in a system to the response time multiplied by the throughput. The number of threads for each consumer is the average time spent processing each event times the throughput. For example, 80 is the number of Sensor Consumer threads predicted for a response time of 40ms and a throughput of 2,000 TPS. But accurately measuring the response time is tricky (you only want the time spent processing each event on the consumer side, not any delays due to queuing etc).

I therefore settled on a third option, an “autonomic” version of the Kongo application. I added code to slowly ramp up the production event rate, and automatically increase the number of consumer threads as latency increases beyond a threshold. The rate is only increased again once the latencies are consistently below the threshold, and reverts to the last sustainable rate again if the SLA cannot be achieved with increased consumers.

The number of partitions was increased to 300 for Sensor and RFID topics to ensure that this approach didn’t hit any partition related limits.   The following graph shows the number of sensor and rfid consumer threads started for increasing producer event rates using this approach (with linear extrapolation above 20,000 events/s):

Producer Events vs Consumer Threads

What’s interesting about these results? As expected, the number of threads for each consumer type is different, and increases with the producer rate. The ratio of RFID:Sensor consumer threads starts out higher (6:1) than predicted (5:1) but reduces to 1.3:1 (i.e. as the producer rate increases the number of sensor and rfid consumer threads tends to converge).

What’s not obvious from this graph is that Kongo is actually producing “secondary” producer events. As the application receives sensor and rfid events it checks for Sensor+Goods and Goods+Truck Location violations, and sends violations events to another Kafka topic. Currently these events are not being consumed, but are in the Kafka connector and streams extensions (disabled for these initial tests).  On average (it differs across runs) the ratio of secondary producer events (violations) to primary producer events is about 1.2:1.

The following graph shows the full picture. The X axis is primary producer event rate, and the Y axis shows the rate for consumers, secondary producers, and total events in and out for the cluster. The total load is approximately 3.2 times the primary producer rate.

Producer and Consumer throughput

Extrapolation predicts that at 30,000 events/s primary producer load the total load will be close to 100,000 events/s. This has obvious implications for how the Kafka cluster and the Kongo application will need to scale as the load increases.

As one final check I decided to run Kongo at close to the maximum producer load with the predicted number of Consumer threads for an extended period of time (i.e. a soak test).  It turns out there were still a few more surprises in store.

Creating lots of consumers takes time, and you can’t just create them all at once. I needed to add a delay between each consumer start to ensure that all the consumers were created successfully before the simulation starts.

The other problem was somewhat surprising given that the benchmarking had previously worked on the identical setup. Just before reaching the target number of consumer threads I started seeing this exception:

org.apache.kafka.common.KafkaException: Too many open files

Instaclustr techops assured me that the Kafka cluster still had plenty of spare file descriptors, so the problem was obviously on the client side. Further investigation revealed that the default number of open files for AWS Linux is only 1024 which I was exceeding.

You can increase the number of open files on AWS Linux by adding these two lines to the /etc/security/limits.conf file (nofile is the maximum number of open file descriptors):

ec2-user soft nofile 10000
ec2-user hard nofile 10000

Then logout and login and check with ulimit -n.

This worked and the Kongo application ran for several minutes at the sustained rate of 17,000 production events/s, with 222 Sensor and 300 RFID consumer threads, giving a median consumer latency of 1000ms and 99th percentile of 4800ms.

Scaling Conclusions

A small Instaclustr Kafka Production cluster is good for exploring application and Kafka cluster sizing questions before ramping up to a bigger cluster.  The Instaclustr Kafka Production clusters on AWS all use r4 instances which provide predictable performance for benchmarking (and production). By comparison, the Instaclustr Kafka Developer clusters use t2 instances which provide burstable CPU, but they are limited to the base rate (e.g. 20% for t2.medium) when the CPU credits are used. These are good for application debugging and testing but not ideal for benchmarking as performance can be unpredictable.

Benchmarking some of the Kongo design options, using an autonomic version of Kongo to find the maximum throughput and optimal number of consumer threads, and final “soak” testing to find any final surprises gives us confidence that we will be able to scale the Kongo application on a larger Instaclustr Kafka cluster.

An alternative way to manage high consumer fan outs is to freeze the flow!

Lena river delta (winter)

The post Apache Kafka “Kongo” 6.2 – Production Kongo on Instaclustr appeared first on Instaclustr.

General Availability – Managed Apache Kafka

Instaclustr announces that our Managed Apache Kafka on the Instaclustr Managed Platform has now reached General Availability with full production SLAs and security controls applied. Apache Kafka adds to Instaclustr’s existing open source offerings of Apache Cassandra, Apache Spark and Elassandra, providing customers the opportunity to use a single managed service provider for a complete suite of leading open source data processing and storage technologies delivering reliability at scale.

Apache Kafka is the leading streaming and queuing technology for large-scale, always-on applications. Apache Kafka is widely used in applications architectures to fill needs including:

  • Provide a buffering mechanism in front of a processing (i.e. deal with temporary incoming message rate greater than processing app can deal with)
  • Allow producers to publish messages with guaranteed delivery even if the consumers are down when the message is published
  • As an event store for events sourcing or Kappa architecture
  • Facilitating flexible, configurable architectures with many producers -> many consumers by separating the details who what is consuming messages for the apps that produce them (and vice-versa)
  • Performing stream analytics (with Kafka Streams)

Delivered through the Instaclustr Platform, Instaclustr’s Managed Apache Kafka provides the management features that have made Instaclustr’s Managed Apache Cassandra the leading managed service for Cassandra:

  • Support on AWS, Azure, GCP and IBM cloud
  • Automated provisioning and configuration of clusters with Kafka and Zookeeper
  • Run in our cloud provider account with a fixed, infrastructure inclusive, cost or use your own cloud provider account
  • Provision, configure and monitor your cluster using the Instaclustr Console or REST APIs.
  • Management of access via IP ranges or security groups.
  • Option of connection to your cluster using public IPs or private IPs and VPC peering.
  • Private network clusters with no public IPs.
  • Highly responsive, enterprise-grade, 24×7 support from Instaclustr’s renowned support team.

Enhancements for the General Availability milestone include:

  • Instaclustr Console functionality to allow management of Kafka user accounts (to be released next week);
  • Topic-levels metrics available through the Instaclustr console and monitoring API;
  • Automated daily backup of configuration data from Zookeeper;
  • Internal security enhancements to bring our Kafka offering up to the standards required for our SOC2 certification, bringing SOC2 to our Managed Kafka offering.

Instaclustr has been running Apache Kafka in production for internal use since 2017 and for the last few months working with Early Access Program customers Siteminder, Lendi, Kaiwoo and Paidy to ensure our offering is well suited to a range of customer requirements.

Our early access program has already delivered benefits to participating customers such as Lendi:

“We see Apache Kafka as a core capability for our architectural strategy as we scale our business. Getting set up with Instaclustr’s Kafka service was easy and significantly accelerated our timelines. Instaclustr consulting services were also instrumental in helping us understand how to properly use Kafka in our architecture.” Glen McRae, CTO, Lendi

and Siteminder:

“As very happy users of Instaclustr’s Cassandra and Spark managed services, we’re excited about the new Apache Kafka managed service. Instaclustr quickly got us up and running with Kafka and provided the support we needed throughout the process.” Mike Rogers, CTO, SiteMinder

With this general availability milestone, Instaclustr’s Managed Kafka is ready for full production usage with SLAs available up to 99.95%. Our technical operations team is ready to migrate existing Kafka clusters to Instaclustr with zero downtime.

For more information on Instaclustr’s Managed Apache Kafka offering please contact or sign up for a free trial.

The post General Availability – Managed Apache Kafka appeared first on Instaclustr.

Cassandra CQL Cheatsheet

Every now and then I find myself looking for a couple of commands I do often. In some other software/technologies we sometimes find a thing called a “cheatsheet” that displays the more used (and some more obscure commands) of that software/technology.

I tried to find one for CQL and since I didn’t find one… I created a CQL one! This is not an extensive, exhaustive list, its just the commands I tend to use to most and related ones.

Suggestions are accepted! Leave your suggestions in the comments below!

Also, Printable version coming soon!

DISCLAIMER: This is for the latest Cassandra version (3.11.2)

Without further conversation, here it is:

CQLSH Specific


$ cqlsh [node_ip] -u username -p password

Use Color

$ cqlsh [node_ip] -C -u username -p password

Execute command

$ cqlsh -e ‘describe cluster’

Execute from file

$ cqlsh -f cql_commands.cql

Set consistency


Run commands from file

$cqlsh> SOURCE ‘/home/cjrolo/cql_commands.cql’ ;

Capture output to file

$cqlsh> CAPTURE ‘/home/cjrolo/cql_output.cql’ ;

Enable Tracing

$cqlsh> TRACING ONE;

Vertical Printing of Rows

$cqlsh> EXPAND ON;

Print tracing session

$cqlsh> SHOW SESSION 898de000-6d83-11e8-9960-3d86c0173a79;

Full Reference:


CQL Commands

Create Keyspace

CREATE KEYSPACE carlos WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’ : 3};

Alter Keyspace

ALTER KEYSPACE carlos WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’ : 1};

Drop Keyspace


Create Table

CREATE TABLE carlos.foobar (

  foo int PRIMARY KEY,

  bar int


Alter Table

ALTER TABLE carlos.foobar WITH compaction = { ‘class’ : ‘LeveledCompactionStrategy’} AND read_repair_chance = 0;

Drop Table

DROP TABLE foobar;

Create Role


Create User


Assign Role

GRANT admins TO carlos;

Revoke Role

REVOKE admins FROM carlos;

List Roles


Use Keyspace

USE carlos;


INSERT INTO foobar (foo, bar) VALUES (0, 1);

Insert with TTL

INSERT INTO foobar (foo, bar) VALUES (1, 2) USING TTL 3600;


UPDATE foobar SET bar = 42 WHERE foo = 1;


SELECT * FROM foobar WHERE foo=0;


DELETE FROM foobar WHERE foo = 1;

Full Reference

Cassandra backups using nodetool

Cassandra nodetool provides several types of commands to manage your Cassandra cluster. See my previous posts for an orientation to Cassandra nodetool and using nodetool to get Cassandra information. My colleague has provided an in-depth analysis of backup strategies in Cassandra that you can review to learn more about ways to minimize storage cost and time-to-recovery, and to maximize performance. Below I will cover the nodetool commands used in scripting these best practices for managing Cassandra full and incremental backups.


The basic way to backup Cassandra is to take a snapshot. Since sstables are immutable, and since the snapshot command flushes data from memory before taking the snapshot, this will provide a complete backup.

Use nodetool snapshot to take a snapshot of sstables. You can specify a particular keyspace as an optional argument to the command, like nodetool snapshot keyspace1. This will produce a snapshot for each table in the keyspace, as shown in this sample output from nodetool listsnapshots:

Snapshot Details:
Snapshot name Keyspace name Column family name True size Size on disk
1528233451291 keyspace1 standard1 1.81 MiB 1.81 MiB
1528233451291 keyspace1 counter1 0 bytes 864 bytes

The first column is the snapshot name, to refer to the snapshot in other nodetool backup commands. You can also specify tables in the snapshot command.

The output at the end of the list of snapshots — for example, Total TrueDiskSpaceUsed: 5.42 MiB — shows, as the name suggests, the actual size of the snapshot files, as calculated using the walkFileTree Java method. Verify this by adding up the files within each snapshots directory under your data directory keyspace/tablename (e.g., du -sh /var/lib/cassandra/data/keyspace1/standard1*/snapshots).

To make the snapshots more human readable, you can tag them. Running nodetool snapshot -t 2018June05b_DC1C1_keyspace1 keyspace1 results in a more obvious snapshot name as shown in this output from nodetool listsnapshots:

2018June05b_DC1C1_keyspace1 keyspace1 standard1 1.81 MiB 1.81 MiB
2018June05b_DC1C1_keyspace1 keyspace1 counter1 0 bytes 864 bytes

However, if you try to use a snapshot name that exists, you’ll get an ugly error:

error: Snapshot 2018June05b_DC1C1_keyspace1 already exists.
-- StackTrace -- Snapshot 2018June05b_DC1C1_keyspace1 already exists....

The default snapshot name is already a timestamp (number of milliseconds since the Unix epoch), but it’s a little hard to read. You could get the best of both worlds by doing something like (depending on your operating system): nodetool snapshot -t keyspace1_date +”%s” keyspace1. I like how the results of listsnapshots sorts that way, too. In any case, with inevitable snapshot automation, the human-readable factor becomes largely irrelevant.

You may also see snapshots in this listing that you didn’t take explicitly. By default, auto_snapshot is turned on in the cassandra.yaml configuration file, causing a snapshot to be taken anytime a table is truncated or dropped. This is an important safety feature, and it’s recommended that you leave it enabled. Here’s an example of a snapshot created when a table is truncated:

cqlsh> truncate keyspace1.standard1;

root@DC1C1:/# nodetool listsnapshots
Snapshot Details:
Snapshot name Keyspace name Column family name True size Size on disk
truncated-1528291995840-standard1 keyspace1 standard1 3.57 MiB 3.57 MiB

To preserve disk space (or cost), you will want to eventually delete snapshots. Use nodetool clearsnapshot with the -t flag and the snapshot name (recommended, to avoid deleting all snapshots). Specifying and the keyspace name will additionally filter the deletion to the keyspace specified. For example, nodetool clearsnapshot -t 1528233451291 — keyspace1 will remove just the two snapshot files listed above, as reported in this sample output:

Requested clearing snapshot(s) for [keyspace1] with snapshot name [1528233451291]

Note that if you forget the -t flag or the you will get undesired results. Without the -t flag, the command will not read the snapshot name, and without the delimiter, you will end up deleting all snapshots for the keyspace. Check syntax carefully.

The sstables are not tied to any particular instance of Cassandra or server, so you can pass them around as needed. (For example, you may need to populate a test server.) If you put an sstable in your data directory and run nodetool refresh, it will load into Cassandra. Here’s a simple demonstration:

cqlsh> truncate keyspace1.standard1

cp /var/lib/cassandra/data/keyspace1/standard1-60a1a450690111e8823fa55ed562cd82/snapshots/keyspace1_1528236376/* /var/lib/cassandra/data/keyspace1/standard1-60a1a450690111e8823fa55ed562cd82/

cqlsh> select * from keyspace1.standard1 limit 1;

key | C0 | C1 | C2 | C3 | C4
(0 rows)

nodetool refresh keyspace1 standard1

cqlsh> select count(*) from keyspace1.standard1;

This simple command has obvious implications for your backup and restore automation.


Incremental backups are taken automatically — generally more frequently than snapshots are scheduled — whenever sstables are flushed from memory to disk. This provides a more granular point-in-time recovery, as needed.

There’s not as much operational fun to be had with incremental backups. Use nodetool statusbackup to show if they are Running or not. By default, unless you’ve changed the cassandra.yaml configuration file, they will be not running. Turn them on with nodetool enablebackup and turn them off with nodetool disablebackup.

A nodetool listbackups command doesn’t exist, but you can view the incremental backups in the data directory under keyspace/table/backups. The backups/snapshots nomenclature is truly confusing, but you could think of snapshots as something you do, and backups as something that happen.

Restoring from incrementals is similar to restoring from a snapshot — copying the files and running nodetool refresh — but incrementals require a snapshot.

These various nodetool commands can be used in combination in scripts to automate your backup and recovery processes. Don’t forget to monitor disk space and clean up the files created by your backup processes.

Remember that if you’d like to try out these commands locally, you can use the ccm tool or the Cassandra Docker cluster here.

Backup Strategies in Cassandra

Cassandra is a distributed, decentralized, fault-tolerant system. Data is replicated throughout multiple nodes (centers) across various data centers. The fact that Cassandra is decentralized means that it can survive single or even multi-node failures without losing any data. With Cassandra, there is no single point of failure, making Cassandra a highly available database.

As long as there is one node containing the data, Cassandra can recover the data without resorting to an external backup. If set up right, Cassandra will be able to handle disk or other hardware failures even in the case of an entire data center going down.

However, Cassandra backups are still necessary to recover from the following scenarios:

  1. Errors made in data updates by client applications
  2. Accidental deletions
  3. Catastrophic failures that require the entire cluster to be rebuilt
  4. Data corruption
  5. A desire to rollback cluster to a previous known good state


Setting up a Backup Strategy

When setting up your backup strategy, you should consider some points:

  • Secondary storage footprint: Backup footprints can be much larger than the live database setup depending on the frequency of backups and retention period. It is therefore vital to create an efficient storage solution that decreases storage CAPEX (capital expenditure) as much as possible.
  • Recovery point objective (RPO):  The maximum targeted period in which data might be lost from service due to a significant incident.
  • Recovery time objective (RTO): The targeted duration of time and a Service Level Agreement within which a backup must be restored after a disaster/disruption to avoid unacceptable consequences associated with a break in business continuity.
  • Backup performance: The backup performance should be sufficient enough to at least match the data change rate in the Cassandra Cluster.



Backup Alternatives


Snapshot-Based Backups

The purpose of a snapshot is to make a copy of all or part of keyspaces and tables in a node and to save it into a separate file. When you take a snapshot, Cassandra first performs a flush to push any data residing in the memtables into the disk (SStables), and then makes a hard link to each SSTable file.

Each snapshot contains a manifest.json file that lists the SSTable files included in the snapshot to make sure that the entire contents of the snapshot are present.

Nodetool snapshot operates at the node level, meaning that you will need to run it at the same time on multiple nodes.


Incremental Backups

When incremental backups are enabled, Cassandra creates backups as part of the process of flushing SSTables to disk. The backup consists of a hard link to each data file that is stored in a backup directory. In Cassandra, incremental backups contain only new SStables files, making them dependent on the last snapshot created. Files created due to compaction are not hard linked.


Incremental Backups in Combination with Snapshots

By combining both methods, you can achieve a better granularity of the backups. Data is backed up periodically via the snapshot, and incremental backup files are used to obtain granularity between scheduled snapshots.


Commit Log Backup in Combination with Snapshot

This approach is a similar method to the incremental backup with snapshots. Rather than relying on incremental backups to backup newly added SStables, commit logs are archived. As with the previous solution, snapshots provide the bulk of backup data, while the archive of commit log is used for point-in-time backup.


CommitLog Backup in Combination with Snapshot and Incremental

In addition to incremental backups, commit logs are archived. This process relies on a feature called Commitlog Archiving.  Like with the previous solution, snapshots provide the bulk of backup data, incremental complement and the archive of commit log used for point-in-time backup.

Due to the nature of commit logs, it is not possible to restore commit logs to a different node other than the one it was backed up from. This limitation restricts the scope of restoring commit logs in case of catastrophic hardware failure. (And a node is not fully restored, only its data.)


Datacenter Backup

With this setup, Cassandra will stream data to the backup as it is added. This mechanism prevents cumbersome snapshot-based backups requiring files stored on a network. However, this will not protect from a developer mistake (e.g., deletion of data), unless there is a time buffer between both data centers.



Backup Options Comparison


Snapshot-based backups Simple to manage: Requires simple scheduled snapshot command to run on each of the nodes. Cassandra nodetool utility provides the clearsnapshot command that removes the snapshot files. (Auto snapshots on table drops are not visible to this command.) Potential large RPO: Snapshots require flushing all in-memory data to disk; therefore, frequent snapshot calls will impact the cluster’s performance.

Storage Footprint: Depending on various factors — such as workload type, compaction strategy, or versioning interval — compaction may cause multi-fold data to be backed up, causing an increase in Capital Expenditure (CapEx).

Snapshot storage management overhead: Cassandra admins are expected to remove the snapshot files to a safe location such as AWS S3.

Incremental Backups Better storage utilization: There are no duplicate records in backup, as compacted files are not backed up. Point-in-time backup: companies can achieve better RPO, as backing up from the incremental backup folder is a continuous process. Space management overhead: The incremental backup folders must be emptied after being backed up. Failure to do so may cause severe space issues on the cluster.

Spread across many files: Since incremental backups create files every time a flush occurs, it typically produces many small files, making file management and recovery not easy tasks and that can have an impact on RTO and the Service Level.

Incremental Backups in Combination with Snapshots Large backup files: Only data between snapshots are from the incremental backups.

Point-in-time: It provides point-in-time backup and restores.

Space management overhead: Every time a snapshot is backed up, data needs to be cleaned up.

Operationally burdensome: Requires DBAs to script solutions.

CommitLog Backup in Combination with Snapshot and Incremental Point in time: It provides the best point in time backup and restores. Space management overhead: Every time a snapshot backed-up data needs to be cleaned up, it increases Operational Expenditure (OpEx.)

Restore Complexity: Restore is more complicated as part of the restore will happen from the commit log replay.

Storage overhead: Snapshot-based backup will provide storage overhead because of duplication of data due to compaction, resulting in higher CapEx expenditure.

Highly complex: Due to the nature of dealing with three times the backups, plus the streaming and managing of the commit log, it is a highly sophisticated backup solution.

Datacenter Backup Hot Backup: It can provide a swift way to restore data.

Space management: Using RF = 1, you can avoid data replication

Additional Datacenter: Since it requires a new datacenter to be built, it needs higher CapEx as well as OpEx.

Prone to Developer Mistakes: Will not protect from developer mistakes (unless there is a time buffer, as mentioned above).