Planet Cassandra

All your NoSQL Apache Cassandra resources in one place.

Surveying the Cassandra-compatible database landscape


The popularity of Apache Cassandra and the applicability of it’s development model has seen it clearly emerge as the leading NoSQL technology for scale, performance and availability. One only needs to survey the ever increasing range of Cassandra-compatible options now available on the market to gain a further proof point to its popularity.

As we get started with 2018, the range of Cassandra-compatible offerings available on the market include:

  • Datastax Enterprise
  • ScyllaDB
  • Yugabyte
  • Azure Cosmos DB

We all know that the database is a key foundational technology for any application. You need to ensure you choose a product that meets the functional requirements of your use case, is robust and scalable, makes efficient use of compute resources and will be usable by your dev team and supportable by your ops team now and into the future.  Selection of the database technology for a new application therefore deserves rigorous consideration of your specific requirements.


This blog post surveys the current state and key considerations for people evaluating these offerings and finishes with an overview of some of in progress development for Apache Cassandra that should ensure it remains the default, and best, choice for the majority of use cases.

This post provides some high-level considerations that should help you to narrow down contenders for evaluation. For each technology we consider:

  • Breadth of production deployment – How widely is the product used in production?
  • Licensing model – Is the product open source? If so, what open source licensing model is used and what are the implications of that?
  • Strength of community – Is the product dependent on a single vendor for ongoing support or is it backed by a range of invested organisations? Does the breadth of user community allow access to required expertise?
  • Functionality – Does the technology have any particular functional advantages or limitations that stand out for the comparable technologies?
  • Scalability and performance – Has the system demonstrated ability to operate at scale? Is it able to deliver low latencies and make efficient use of available compute resources?

Datastax Enterprise

Datastax Enterprise (DSE) is a closed source product derived from Apache Cassandra. For core Cassandra features, it is driver level compatible with Apache Cassandra. Online migration from DSE to Apache Cassandra can be achieved with minimal effort where DSE proprietary features have not been used. However, DSE contains a number of extensions that are not included in Apache Cassandra and such as bundling Spark and Solr into the same application and providing customer security and compaction providers.

Breadth of production deployment: DSE has been used in production by many organisations over several years.

Licensing model: DSE is a closed-source, proprietary product derived from open source products. Use of DSE requires payment of a licensing fee to Datastax.

Strength of community: As a proprietary product, support and enhancement of DSE is entirely reliant on Datastax. However, DSE does build on contributions from the communities for the underlying open source products.

Functionality: Functional enhancements in DSE vs the open source products are generally enterprise-specific features (such as LDAP authentication integration), relatively simple integration of the other included products (Spark, Solr) and the entirely proprietary DSE Graph graph database functionality.

Scalability and Performance: In general, DSE performance will be very similar to the underlying open source productions. However, Datastax does claim some proprietary performance improvements.


Scylla is effectively a re-implementation of Apache Cassandra in C++ with an aim of providing highly optimised performance. From a functional point of view, it provides most, but not all, functions of Cassandra and generally doesn’t aim to provide additional functions to Cassandra. It is driver-level compatible with Apache Cassandra but migration to/from Scylla requires an application-level migration strategy such as dual-writes.

Breadth of production deployment:  Scylla 1.0 was released in March 2016. Several organisations are reported as running it in production although level of production deployment would be a small fraction of Apache Cassandra deployment.

Licensing Model: Scylla is open source but licensed under the AGPL (Gnu Affero General Public Use Licence). This license requires that any organisation making a modified version of the product (even for internal use) must publish those modifications. As a result of this requirement, many organisations (particularly large tech orgs that tend to adopt and drive enhancements to open source projects) will not adopt software using the AGPL.

Strength of Community: Scylla is largely dependent on a single company (ScyllaDB) for all development and support.

Functionality: Scylla generally aims to be functionally compatible with Cassandra although not all features are currently available (light weight transactions being one notable exception).

Scalability and Performance: Improved performance is Scylla’s main objectives. Scylla has published many benchmarks demonstrating substantial performance improvements. However, the most significant gains are seen when running large machines with high performance IO and performance gains in more typical cloud deployments (for manageability) are often less than these benchmarks.


Yugabyte is a new database aiming to offer both SQL and NoSQL functionality in a single database. It is driver compatible with Cassandra (although there is also a Yugabyte-specific fork of the Cassandra driver) and also Redis with announced plans for PostgresSQL compatibility.

Breadth of production deployment: Yugabyte is currently in Beta with production release planned for 2018.

Licensing Model: Yugabyte is Apache 2.0 Licensed open source software. A closed source “enterprise” edition is also offered with additional manageability and other features.

Strength of Community: Yugabyte is a new product developed by a single company (Yugabyte) and all development and support of the product is dependant on this company.

Functionality: The core Yugabyte engine supports full ACID transactions and a different replication model to Apache Cassandra. Presumably additional features will also be required for PostgresSQL compatibility. While Yugabyte claims compatibility with core Cassandra features it seems likely that, given the differences in underlying engine models, there will be semantic differences that are not readily apparent (for example, Yugabyte already claims differences in consistency semantics).

Scalability and Performance: Yugabyte have published benchmarks claiming improved performance for some scenarios. However, tuning of the Apache Cassandra configuration for their comparison benchmarks was extremely poor and, in any event, the very different architecture of Yugabyte is likely to lead to quite different performance characteristics versus Apache Cassandra depending on the use case.

Azure Cosmos DB

Cosmos DB is a Microsoft Azure offering designed to provide a globally distributed database with NoSQL functionality. It supports multiple APIs including SQL, Javascript, Gremlin (graph), MongoDB and Cassandra.

Breadth of production deployment: Cosmos DB was released in May 2017 although it builds on Azure DocumentDB which was released in 2014. The Cassandra API was released into preview in November 2017.

Licensing model: Cosmos DB is a proprietary, closed source, technology offered only as an Azure service.

Strength of Community: Cosmos DB is developed and supported by Microsoft.

Functionality:  Cosmos DB claims Cassandra compatibility but without providing a detailed breakdown of supported/not support Cassandra features and it seems unlikely there would be complete feature compatibility (at a minimum, the approach to consistency levels is quite different). The documented strategy to import data from Cassandra in Cosmos DB is via CQLshell COPY FROM / COPY TO commands which export data via CSV and generally aren’t suitable for production-size datasets.

Scalability and Performance: Cosmos provide latency SLAs for the 99th percentile which are comparable to other latency focus offerings such as Scylla. Cost effectiveness at scale is hard to gauge and is dependent on Azure pricing.

Apache Cassandra

Apache Cassandra is the inspiration and genesis for all of these offerings. From it’s 1.0 release in October 2011, Apache Cassandra is now at version 3.11 with version 4.0 in development. It aims to provide virtually unlimited scalability and the ability to run with highest levels of availability and full global distribution. Many household name internet services (eg Apple, Uber, Spotify, Instagram) rely on Apache Cassandra as a core component of their architecture.

Breadth of production deployment: Production deployment of Apache Cassandra are likely an order magnitude greater than any of the other products mentioned above.

Licensing model: Apache Cassandra  is Apache 2.0 Licensed open source software.

Strength of Community: Apache Cassandra development is governed by the Apache Foundation, the same organisation and governance rules that some of the most successful open source projects such as Hadoop, Spark, Tomcat and the original Apache web server. Apache Cassandra committers are employed by close to 10 different companies with regular contributions from a wide range of companies. Apple, as a key user, is one of the most active contributors to the project.

Functionality: While some of the other products above are aiming to extend the functionality of Cassandra, Cassandra define the core feature set that the others are aiming to emulate.

Scalability and Performance: There can be little question as the the scalability of Cassandra with production clusters in the thousands of nodes holding petabytes of data and service millions of operations per second. The Cassandra community is alway working to improve the performance of Cassandra with several major performance initiatives currently underway in the project (eg CASSANDRA-13476).

The post Surveying the Cassandra-compatible database landscape appeared first on Instaclustr.

Analyzing Cassandra Performance with Flame Graphs

One of the challenges of running large scale distributed systems is being able to pinpoint problems. It’s all too common to blame a random component (usually a database) whenever there’s a hiccup even when there’s no evidence to support the claim. We’ve already discussed the importance of monitoring tools, graphing and alerting metrics, and using distributed tracing systems like ZipKin to correctly identify the source of a problem in a complex system.

Once you’ve narrowed down the problem to a single system, what do you do? To figure this out, it’s going to depend on the nature of the problem, of course. Some issues are temporary, like a dead disk. Some are related to a human-introduced change, like a deployment or a wrong configuration setting. These have relatively straightforward solutions. Replace the disk, or rollback the deployment.

What about problems that are outside the scope of a simple change? One external factor that we haven’t mentioned so far is growth. Scale can be a difficult problem to understand because reproducing the issue is often nuanced and complex. These challenges are sometimes measured in throughput, (requests per second), size (terabytes), or latency (5ms p99). For instance, if a database server is able to serve every request out of memory, it may get excellent throughput. As the size of the dataset increases, random lookups are more and more likely to go to disk, decreasing throughput. Time Window Compaction Strategy is a great example of a solution to a scale problem that’s hard to understand unless the numbers are there. The pain of compaction isn’t felt until you’re dealing with a large enough volume of data to cause performance problems.

During the times of failure we all too often find ourselves thinking of the machine and its processes as a black box. Billions of instructions executing every second without the ability to peer inside and understand its mysteries.

Fortunately, we’re not completely blind as to what a machine is doing. For years we’ve had tools like debuggers and profilers available to us. Oracle’s JDK offers us Java Flight Recorder, which we can use to analyze running processes locally or in production:

mission control

Profiling with flight recorder is straightforward, but interpreting the results takes a little bit of work. Expanding the list of nested tables and looking for obvious issues is a bit more mental work than I’m interested in. It would be a lot easier if we could visualize the information. It requires a commercial license to use in production, and only works with the Oracle JDK.

That brings us back to the subject of this post: a way of generating useful visual information called a flame graph. A flame graph let’s us quickly identify the performance bottlenecks in a system. They were invented by Brendan Gregg. This is also part one of a very long series of performance tuning posts, we will be referring back to it as we dive deeper into the internals of Cassandra.

Swiss Java Knife

The approach we’ll examine in this post is utilizing the Swiss Java Knife, usually referred to as SJK, to capture the data from the JVM and generate the flame graphs. SJK is a fantastic collection of tools. Aside from generating flame graphs, we can inspect garbage collection statistics, watch threads, and do a variety of other diagnostic tasks. It works on Mac, Linux, and both the Oracle JDK and the OpenJDK.

I’ve downloaded the JAR, put it in my $HOME/bin and set up a shell function to call it easily:

sjk () {
        java -jar ~/bin/sjk-plus-0.8.jar "$@"

On my laptop I’m running a workload with cassandra-stress. I’ve prepopulated the database, and started the stress workload with the following command:

cassandra-stress read n=1000000

For the first step of our analysis, we need to capture the stack frames of our running Java application using the stcap feature of SJK. To do this, we need to pass in the process id and the file to which we will dump the data. The dumps are written in a binary format that we’ll be able to query later:

sjk stcap -p 92541 -i 10ms -o dump.std

Then we can analyze the data. If all we have is a terminal, we can print out a histogram of the analysis. This can be pretty useful on it’s own if there’s an obvious issue. In this case, we can see that a lot of time is spent in sun.misc.Unsafe.park, meaning threads are just waiting around, parked:

$ sjk ssa -f dump.std --histo
Trc     (%)  Frm  N  Term    (%)  Frame
372447  96%  372447       0   0%
309251  80%  309251  309251  80%  sun.misc.Unsafe.park(Native Method)
259376  67%  259376       0   0%  java.util.concurrent.locks.LockSupport.park(
254388  66%  254388       0   0%
 55709  14%  55709        0   0%  java.util.concurrent.ThreadPoolExecutor$
 52374  13%  52374        0   0%  org.apache.cassandra.concurrent.NamedThreadFactory$$Lambda$6/ Source)
 52374  13%  52374        0   0%  org.apache.cassandra.concurrent.NamedThreadFactory.lambda$threadLocalDeallocator$0(
 44892  11%  44892        0   0%  io.netty.util.concurrent.DefaultThreadFactory$
 44887  11%  44887        0   0%  java.util.concurrent.ThreadPoolExecutor.runWorker(
 42398  11%  42398        0   0%
 42398  11%  42398        0   0%  io.netty.util.concurrent.SingleThreadEventExecutor$
 42398  11%  42398        0   0%
 42398  11%  42398        0   0%
 42398  11%  42398        0   0%
 42398  11%  42398    42398  11% Method)
 42398  11%  42398        0   0%
 42398  11%  42398        0   0%

Now that we have our stcap dump, we can generate a flame graph with the following command:

sjk ssa --flame -f dump.std > flame-sjk.svg

When you open the SVG in a browser, you should end up with an image which looks something like this:


If you open the flame graph on your machine you can mouse over the different sections to see the method call and percentage of time it’s taking. The wider the bar, the more frequent it’s present in the stacks. It’s very easy to glance at the graph to understand where the time is spent in our program.

This is not the only technique for generating flame graphs. Brendan Gregg has a long list of links and references I recommend reading at the bottom of his FlameGraph page. I intend to write a utility to export the SJK format to the format that Brendan uses on his blog as it’s a little nicer to look, has a better mouseover, supports drill down, and has a search. They also support differential flame graphs, which are nice if you’re doing performance comparisons across different builds.

I hope you’ve enjoyed this post on visualizing Cassandra’s performance using FlameGraphs. We’ve used this tool several times with the teams we’ve worked with to tune Cassandra’s configurations and optimize performance. In the next post in this series we’ll be examining how to tune garbage collection parameters to maximize throughput while keeping latency to a minimum.

Meltdown's Impact on Cassandra Latency

What impact on latency should you expect from applying the kernel patches for the Meltdown security vulnerability?

TL;DR expect a latency increase of at least 20% for both reads and writes.


The Meltdown vulnerability, formally CVE-2017-5754, allows rogue processes to access kernel memory. Simple demonstrations have already appeared online on how to expose passwords and ssh private keys from memory. The consequences of this, in particular on shared hosts (ie cloud) are considered “catastrophic” by security analysts. Initially discovered in early 2017, the vulnerability was planned to be publicly announced on the 9th January 2018. However, due to the attention generated by the frequency of Linux kernel ‘page-table isolation’ (KPTI) patches committed late in 2017 the news broke early on 3rd January 2018.


Without updated hardware, the Linux kernel patches impact CPU usage. While userspace programs are not directly affected, anything that triggers a lot of interrupts to the CPU, such as a database’s use of IO and network, will suffer. Early reports are showing evidence of CPU usage taking a hit between 5% and 70%. Because of the potential CPU performance hit and lack of evidence available, The Last Pickle used a little time to see what impacts we could record for ourselves.


The hardware used for testing was a Lenovo X1 Carbon (gen 5) laptop. This machine runs an Intel Core i7-5600U CPU with 8Gb RAM. Running on it is Ubuntu 17.10 Artful. The unpatched kernel was version 4.13.0-21, and the patched kernel version 4.13.0-25. A physical machine was used to avoid the performance variances encountered in the different cloud environments.

The Ubuntu kernel was patched according to instructions here and the ppa:canonical-kernel-team/pti repository.


A simple schema, but typical of many Cassandra usages, was used on top of Cassandra-3.11.1 via a 3 node ccm cluster. The stress execution ran with 32 threads. Running stress, three nodes, and a large number threads on one piece of hardware was intentional so to increase thread/context switching and kernel overhead.

The stress run was limited to 5k requests per second so to avoid saturation, which occurred around 7k/s. The ratio of writes to reads was 1:1, with reads being split between whole partitions and single rows. The table used TWCS and was tuned down to 10 minute windows, so to ensure compactions ran during an otherwise short stress run. The stress ran for an hour against both the unpatched and patched kernels.

ccm stress user profile=stress.yaml ops\(insert=2,by_partition=1,by_row=1\) duration=1h -rate threads=32 throttle=5000/s -graph file=meltdown.html title=Meltdown revision=cassandra-3.11.1-unpatched


The following graphs show that over every percentile a 20%+ latency increase occurs. Sometimes the increase is up around 50%.

Meltdown Cassandra median

Meltdown Cassandra 95th

Meltdown Cassandra 99th

Meltdown Cassandra stats


The full stress results are available here.

Announcing Instaclustr support for AWS I3 Instance Types

Instaclustr delivers improved Cassandra price/performance with AWS I3 Instance Types.

Instaclustr has released support for a new AWS I3 instance type with our Apache Cassandra, Spark & Elassandra Managed Service.

I3 is the latest generation of AWS’s “Storage Optimised” family. It is designed to support I/O intensive workload, and is backed by low-latency SSD. Instaclustr offers support for the i3.2xlarge instance type, which provides 8 vCPUs, 61 GiB memory, and 1 x 1900 GB of locally attached SSD. As a comparison, the previous generation i2.2xlarge offered 8 vCPUs, 61 GiB memory and 2×800 GB SSDs.

We conducted Cassandra benchmarking of the I3.2xlarge type and compared with results of the previous generation I2.2xlarge. The results of the testing indicate a slight improvement in performance between generations, delivered at a lower price.

Our testing procedure is:

  1. Insert data to fill disks to ~30% full.
  2. Wait for compactions to complete and EBS burst credits to regenerate.
  3. Run a 2 hour test with 500 threads with a mix of 10 inserts : 10 simple queries : 1 range query. Quorum consistency for all operations. You can see the stress spec we used for these tests here:
# Instaclustr standard YAML profile for cassandra-stress
# adapted from Apache Cassandra example file
# Insert data:
# cassandra-stress user profile=stress-spec.yaml n=25000000 cl=QUORUM ops\(insert=1\) -node file=node_list.txt -rate threads=100
# Note: n=25,000,000 will produce ~280G of data
# ensure all compactions are complete before moving to mixed load test
# Mixed load test
# cassandra-stress user profile=stress-spec.yaml duration=4h cl=QUORUM ops\(insert=1,simple1=10,range1=1\) -node file=node_list.txt -rate threads=30 -log file=mixed_run2_cms.log
# Keyspace info
keyspace: stresscql2
# The CQL for creating a keyspace (optional if it already exists)
keyspace_definition: |
  CREATE KEYSPACE stresscql2 WITH replication = {'class': 'NetworkTopologyStrategy', 'AWS_VPC_US_WEST_2': 3};
#keyspace_definition: |
#  CREATE KEYSPACE stresscql2 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
# Table info
table: typestest
# The CQL for creating a table you wish to stress (optional if it already exists)
table_definition: |
  CREATE TABLE typestest (
        name text,
        choice boolean,
        date timestamp,
        address inet,
        dbl double,
        lval bigint,
        ival int,
        uid timeuuid,
        value blob,
        PRIMARY KEY((name,choice), date, address, dbl, lval, ival, uid)
    AND compaction = { 'class':'LeveledCompactionStrategy' }
    AND comment='A table of many types to test wide rows'
# Optional meta information on the generated columns in the above table
# The min and max only apply to text and blob types
# The distribution field represents the total unique population
# distribution of that column across rows.  Supported types are
#      EXP(min..max)                        An exponential distribution over the range [min..max]
#      EXTREME(min..max,shape)              An extreme value (Weibull) distribution over the range [min..max]
#      GAUSSIAN(min..max,stdvrng)           A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
#      GAUSSIAN(min..max,mean,stdev)        A gaussian/normal distribution, with explicitly defined mean and stdev
#      UNIFORM(min..max)                    A uniform distribution over the range [min, max]
#      FIXED(val)                           A fixed distribution, always returning the same value
#      Aliases: extr, gauss, normal, norm, weibull
#      If preceded by ~, the distribution is inverted
# Defaults for all columns are size: uniform(4..8), population: uniform(1..100B), cluster: fixed(1)
  - name: name
    size: uniform(1..1000)
    population: uniform(1..500M)     # the range of unique values to select for the field (default is 100Billion)
  - name: date
    cluster: uniform(20..1000)
  - name: lval
    population: gaussian(1..1000)
    cluster: uniform(1..4)
  - name: value
    size: uniform(100..500)
  partitions: fixed(1)       # number of unique partitions to update in a single operation
                                  # if batchcount > 1, multiple batches will be used but all partitions will
                                  # occur in all batches (unless they finish early); only the row counts will vary
  batchtype: UNLOGGED               # type of batch to use
  select: uniform(1..10)/10       # uniform chance any single generated CQL row will be visited in a partition;
                                  # generated for each partition independently, each time we visit it
# List of queries to run against the schema
      cql: select * from typestest where name = ? and choice = ? LIMIT 1
      fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
      cql: select name, choice, uid  from typestest where name = ? and choice = ? and date >= ? LIMIT 10
      fields: multirow            # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
      cql: select name, choice, uid from typestest where name = ? and choice = ? LIMIT 1
      fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)

As with any generic benchmarking results for different data models or application may vary significantly from the benchmark. However, we have found it to be a good test for comparison of relative performance that reflects will in many use cases.

Our most recent benchmarking used Cassandra 3.11 compared a 3 node i2.2xlarge cluster and a 3 node i3.2xlarge. Driving operations to the point where latency on each node was similar, 3 node i3.2xlarge cluster yielded 13,424 operations/sec, a 31% improvement over the i2.2xlarge while delivering lower latency. Meanwhile i3.2xlarge is much cheaper than i2.2xlarge. For example, the pricing for i3.2xlarge instance type is 22% less than i2.2xlarge in US East (North Virginia). The significant price reduction between i2 and i3 generations add to the significant improvement in price/performance ratio between generations.

AWS Instance type ops/sec median simple read latency (ms) median range read latency (ms) median write latency (ms)
i2.2xlarge 10,234 55.1 57.1 31.8
i3.2xlarge 13,424 39.1 40.0 24.3

Table 1: Results summary

Note that the latencies in Table 1 are high because the cluster were pushed roughly to maximum throughput (ops/sec). As always, benchmark results may not translate to meaningful results for real world applications, and we strongly recommend that you do performance testing for your particular use case.

For full pricing, sign up or log on to our console or contact

The post Announcing Instaclustr support for AWS I3 Instance Types appeared first on Instaclustr.

Announcing Instaclustr support for Microsoft Azure Managed Disks

Instaclustr delivers improved reliability and security on Azure with Azure Managed Disk support.

Instaclustr has released support for Azure Managed Disks with our Apache Cassandra, Spark and Elassandra Managed Service on Microsoft Azure. All new Azure clusters will automatically take advantage of the new storage type.

Managed disks are a new storage type launched by Microsoft recently, designed to ease management and increase security and reliability.

Managed disks offer increased reliability by automatically storing three replicas of data for increased fault tolerance and high availability. Using managed disks also does away with using storage accounts for disks. Managed disks are aware of availability sets and are automatically allocated to different fault domains, eliminating the single point of failure that existed previously.

Managed disks offer increased security by automatically encrypting data at rest using Storage Service Encryption in a user-transparent manner.

Instaclustr continues to use Premium SSD-backed storage for managed disks to ensure high performance storage. We have not seen any decrease in Cassandra performance when using managed disks in stress benchmarking tests.

Existing customers using Azure will be upgraded to Azure Managed Disks. Instaclustr’s Technical Operations team will contact affected customers in the coming months to schedule the upgrade.

For full pricing, sign up or log on to our console or contact

The post Announcing Instaclustr support for Microsoft Azure Managed Disks appeared first on Instaclustr.

Apache Kafka Christmas Tree Light Simulation – Seasons Greetings From Instaclustr

Apache Kafka Christmas tree light simulation

What would you do if you received a request like this?

Apache Kafka Christmas Tree

Looks like poor old Santa confused us with a company that produces Instant Forests.  But maybe Santa knew I’d been trying out Decision Tree Machine Learning (including Random Forests) on our performance data (see previous blogs on Machine Learning with Spark)?

But then I started thinking. Obviously, Santa is “magic” (what online shop can deliver presents to millions of people on Christmas Eve!), so maybe all I had to do was come up with was a simulated virtual tree and Santa could handle the rest?

Where to start? What (in computer science or mathematics) is tree-shaped?

On the first day of Christmas
my true love sent to me:
A Quincunx machine in a Pear Tree

On the second day of Christmas
my true love sent to me:
2 Bean Machines
And a Quincunx machine in a Pear Tree

On the third day of Christmas
my true love sent to me:
3 Galton Boards
2 Bean Machines
and a Quincunx machine in a Pear Tree

On the fourth day of Christmas
my true love sent to me:
4 Pascal’s Triangles
3 Galton Boards
2 Bean Machines
and a Quincunx machine in a Pear Tree

Ok, that’s enough! Ten identical presents are rather boring.  A Quincunx machine (also called Bean machine or Galton Board) is a vertical board with nails banged into it in the shape of a Christmas tree.

Apache Kafka Galton Board

Here is a clip of one in action.

Balls are dropped from the top (where the “Star” would be on a tree), and bounce either left or right as they hit the nails and drop down to the next level, and so on, until they drop out the bottom. They are collected into bins at the bottom and the height of the bins will eventually approximate a bell curve. Overlaying Pascal’s triangle onto the nails shows the number of different paths that can be taken to get to each bin (as each value is the sum of the two values above it to the left and right, starting with a 1 at the top Star position). The bins towards the middle are reached by more paths and will collect more balls:

 Apache Kafka Pascal triangle

Obviously, I’m not the first person to connect these ideas.  Here’s a Pascal’s Christmas Tree simulator, a Quincunx simulation, and this blog does a full The 12 days of Pascal’s triangular Christmas!

Pascal Christmas Tree simulator

The Pascal’s Christmas tree simulator (image above) inspired me to use a similar approach for a Christmas tree lights simulator. The tree will be a Galton board, with lights at each nail position (baubles in the above picture). The positions are traditionally rows and columns starting from row 0 at the top, and columns starting from 0 on the left. The star is therefore (0,0).  A light will be ON if a ball hits the nail, and will stay on for a second, and then turn OFF.  Lights will turn on/off (twinkle!) as balls enter from the top, and drop down to the bottom. If balls drop in fast enough then multiple lights will be on at once.

Time to open up an early Christmas present (Kafka, which is on the Instaclustr roadmap for 2018) and use it to write a scalable Christmas tree lights simulation. Some imagination may be required (perhaps enhanced with a few glasses of Christmas port).

The design choices I had to make were around the number of producers, consumers and topics, what roles they would have, what the message structure would be, and how to handle time.

We’ll start with a single Kafka Producer to drop balls onto the top of the tree (at the Star), with a new ball arriving every second. The “star” is just a simple java producer running in a thread which sends a (0,0) message to the “tree” topic, and sleeps for a second before repeating until the desired number of balls have been released.

Kafka Christmas Tree

Initially, I had thought of using multiple topics to capture the topology of the relationship between the lights, but his would have been too complicated for larger trees. A simpler solution using a single topic (called “tree”) is more obvious.  I also decided on a very simple message structure using the Kafka message key as the row value, and the message value as the column. Note that for multiple partition topics this may not be ideal as the key is used for distributing messages across partitions and the small number of row values may not result in a good hash function.

Currently the tree only has the top starlight illuminated all the time.  What’s missing? Gravity!  To simulate gravity we need a consumer/producer pair, I called this “twinkle”. To twinkle, we receive a message (ball location) from the tree topic, randomly decide if the ball will go left or right, and then publish a message consisting of the new location back to the “tree” topic.   Will this work? Not really. This is just an infinite loop, what we really need is a delay queue or explicit timestamp handling, so we can delay the processing of the new ball location until the 1 second ON time has elapsed. A hack for this is to sleep the Twinkle application consumer thread so it only checks for new messages periodically. This is what we have so far.

Apache Kafka Christmas Tree

What’s missing? What should we do with balls that reach the bottom of the tree (the last row)? We could just make them vanish (by not sending a message back to the tree topic). Or better (in the spirit of the Galton Board) let’s add a “count” topic and a Count Consumer to add up the number of balls that fall into each bin under the tree:

Apache Kafka Galton Board Christmas Tree

The final problem is how to display the tree and lights state? The simplest solution is to have another consumer which subscribes to the “tree” topic and changes the state of the correct lights for each new message in the topic (change of ball location). This is possible and was the approach I tried first. However, the Twinkle and Display consumers have to be in different consumer groups (because of the way the Kafka protocol works, to ensure that they both get every message published to the tree topic), and computing the state and handling timing was tricky:

Apache Kafka Christmas Board Diagram

An improved version computes the state change of the lights in the Tinkle application (step C), and sends a state change message (light location OFF or ON) to the corresponding dedicated topic (Light OFF, Light ON). Every second, the State Display consumer applies all the OFF messages first, and then the ON messages, and then prints the tree lights state (in ASCII). Each “*” is an ON light ON, each “.” is an OFF light.

Apache Kafka Christmas Tree Diagram

Here’s a sequence that could be printed for a simple tree with 3 rows, and a single ball dropped in at the top:

Time 1: star lights up (0,0)


Time 2:  light at (1,1) ON


Time 3: light at (2,1) ON


Time 4: All lights OFF


Here’s an (edited) run with 10 rows and 100 balls:

Welcome to the Instaclustr XMAS Tree Simulator!!!!!



Etc (lots of balls)


Etc (no more balls arriving, eventually end up with all lights OFF)



A Happy Normally Distributed Xmas! Counts:

col 0 = 1
col 1 = 2
col 2 = 7
col 3 = 19
col 4 = 28
col 5 = 25
col 6 = 14
col 7 = 4
col 8 = 0
col 9 = 0

Total events counted = 100

More interesting christmas lights with colour could have been simulated by using a richer message structure, e.g. a message number as the key, and a compound value type consisting of (row, col, colour).

Here’s a simplified topology diagram showing just the relationship between producers, topics and consumers.

Apache Kafka Simplified topology diagram

Is this the best/correct approach for:

  • Computing and keeping track of the state of the lights?
    • Probably not. Kafka streams and a KTable (which maintains state) may be better. Here’s a blog.
  • Handling time?
    • Probably not. As I mentioned, using a delay queue or explicit timestamp handling would be better. Essentially I’m using Kafka as a discrete event simulation (DES) which it isn’t really designed for, but in theory, it should work as all you need is events and timestamps. I (briefly) tried using the different timestamp extractors but had a few issues, I suspect that they are designed to work (best) with Kafka streams. So maybe Santa could get a few elves to add some code for this, perhaps using windows.

Will Santa be happy with this? He should be! Given that it’s written in Kafka it will scale massively. It should be easy to increase the speed of the simulation (i.e. run it flat out), increase the size of the tree, and even simulate a forest of trees, each with a slightly different algorithm and/or starting condition, and multiple different ways of displaying the tree/lights. There’s also reprocessing (that I mentioned in this blog), where Kafka persists all the messages, so consumers can choose which messages to process. Consumers could display any historic state of the tree lights.

Canberra was in the news a few years ago with a charity fundraising world record for the number of lights on an artificial Christmas tree.  What does ½ a million lights look like?

Christmas Tree Canberra

I just had to see if the Kafka Christmas lights simulator was up to the challenge. It was.

A simulated tree with 500,000 lights and 100,000 balls dropped in ran in 555s, processed over 400 million producer + consumer events, and ran 180 times faster than real-time (running flat out), achieving an average throughput of 43 million events per minute, not bad with everything (Kafka broker and java code) running on my laptop. Why stop there? 1 million and 2 million worked fine, can I claim the world record for the most lights on a simulated Christmas tree?

Here’s the code (Java). Note that I made a minor change to the twinkle application to correctly process the Star light.  The Star producer now sends a special message (-1, -1) to Twinkle which interprets this as a special case, i.e. an arriving ball with no location yet, and sends a message back to the tree topic with the star location (0,0) and a (0,0) message to the Light ON topic.

Please check out our Spark Streaming, Kafka and Cassandra Tutorial for details on installing and running Kafka and Zookeeper and visit our GitHub to access all Java Code.

Java Kafka Code

package kafkaxmas;

public class KafkaProperties {
public static final Boolean DEBUG = false;
        public static final Boolean DISPLAY = true;
        public static final String KAFKA_SERVER_URL = "localhost";
        public static final int KAFKA_SERVER_PORT = 9092;
        public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
        public static final int CONNECTION_TIMEOUT = 100000;
        public static final int DELAY = 1000;
        public static final String TOPIC = "topicTree";
        public static final String TOPIC2 = "topicCount";
        public static final String TOPICOFF = "topicOff";
        public static final String TOPICON = "topicOn";
        public static final String CLIENT_ID = "KafkaXmasTreeDemo";
        public static final int TREE_ROWS = 10;
        public static final int BALLS = 1000;

        private KafkaProperties() {}

package kafkaxmas;

 * Top level main program to run Xmas Tree Simulation.
 * start star producer, twinkle consumer/producer, and consumers for count and display.

public class XmasTree {
 public static void main(String[] args) {
     boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
     System.out.println("Welcome to the Instaclustr XMAS Tree Simulator!!!!!");
     // Start balls dropping onto top of tree.
         StarProducer producerThread = new StarProducer(KafkaProperties.TOPIC, isAsync);
     // Start state display consumer. New version with both topics, OFF and ON, passed via args.
     StateDisplayConsumer displayTree = new StateDisplayConsumer(KafkaProperties.TOPICOFF, KafkaProperties.TOPICON);
     // start count consumer, subscribes to TOPIC2
     CountConsumer counts = new CountConsumer(KafkaProperties.TOPIC2);
     // start twinkle consumer/producer application, subscribe to same topic as the star producer.
     Twinkle twinkleThread = new Twinkle(KafkaProperties.TOPIC);
     // Note that even though the star producer eventually stops, the other threads keep running for ever.

package kafkaxmas;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

// Producer which drops balls onto the top of the tree.
// Drop one "ball" onto top of tree every DELAY period, stops when number of balls dropped == BALLS.
// Message is of format: key=row, value=col
// Changed to send (-1, -1) rather than the original (0, 0), to indicate that the ball needs to be processed specially by Twinkle.

public class StarProducer extends Thread {
        private final KafkaProducer<Integer, Integer> producer;
    private final String topic;
    private final Boolean isAsync;
    private final Boolean debug = KafkaProperties.DEBUG;
    private final long zeroTime = System.currentTimeMillis();

    public StarProducer(String topic, Boolean isAsync) {
        Properties props = new Properties();        
        props.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put("", "StarProducer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        producer = new KafkaProducer<>(props);
        this.topic = topic;
        this.isAsync = isAsync;

    public void run() {
        int messageNo = 1;
        int maxMessages = KafkaProperties.BALLS;
        while (messageNo <= maxMessages)
                        int row = -1;
                        int col = -1;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                    col), new DemoCallBack(startTime, row, col));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                    if (debug) System.out.println("Sent message: (" + row + ", " + col + ")");
                } catch (InterruptedException | ExecutionException e) {
            long nowTime = System.currentTimeMillis();
            if (debug) System.out.println("Star *** " + (nowTime - zeroTime) + ": " + messageNo);
            try {
                        } catch (InterruptedException e) {

class DemoCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final int message;

    public DemoCallBack(long startTime, int key, int message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;

    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
                "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                    "), " +
                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {

package kafkaxmas;

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Collections;
import java.util.Properties;
import java.util.Random;

 * Consumer/producer Pair. 
 * Receives from tree topic, sends to count and On/Off state change topics.
 * Simulated gravity by reading event from tree topic, transforming it the next row and randomly left or right cols.
 * If event is special (-1, -1) value then a new ball has arrived, send new event (0,0) to tree topic and a Light ON state change.
 * Else
 *      Send current position Light OFF state change
 *      If on bottom row, then send event to count topic.
 *      Else transform to next row and randomly left or right and send new event to tree topic and light ON state change.
 *  What are valid producer configs?

public class Twinkle extends ShutdownableThread {
        private final KafkaProducer<Integer, Integer> producer;
        private final KafkaConsumer<Integer, Integer> consumer;
    private final String topic;
    Boolean debug = KafkaProperties.DEBUG;
    int maxRow = KafkaProperties.TREE_ROWS;     // size of tree
    static Random rand = new Random();
    boolean display = KafkaProperties.DISPLAY; // display ASCII tree or not

    public Twinkle(String topic) {
        super("Twinkle", false);
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "Twinkle");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");

        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
        // producer, shared across all output topics
        Properties pprops = new Properties();        
        pprops.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        pprops.put("", "TwinkleProducer");
        pprops.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        pprops.put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        producer = new KafkaProducer<>(pprops);

    public void doWork() {
                int row;
        int col;
        long pollInterval = 1000;
        ConsumerRecords<Integer, Integer> records = consumer.poll(pollInterval);
        for (ConsumerRecord<Integer, Integer> record : records)
                        if (debug) System.out.println("Twinkle got records = " + records.count());
                        if (debug) System.out.println("Twinkle: processing record = (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
                        row = record.key();
                        col = record.value();
                        if (row == -1)
                        // ball dropped in from the top, put it in Star location (0,0) and turn it ON for a second
                                        row = 0;
                                        col = 0;
                                        // put (0,0) into topic and send ON event                                       
                                        producer.send(new ProducerRecord<Integer, Integer>(this.topic, row, col));
                                if (debug) System.out.println("Twinkle STAR ON + (" + row + ", " + col  + ") ON");
                                producer.send(new ProducerRecord<Integer, Integer>(KafkaProperties.TOPICON, row, col));
                                // turn light OFF in current position
                                if (debug) System.out.println("Twinkle + (" + row + ", " + col + ") OFF");
                                producer.send(new ProducerRecord<Integer, Integer>(KafkaProperties.TOPICOFF, row, col));
                                // increment row (gravity!), if row >= maxRow then don't publish back to tree topic, send to count topic instead
                                int nextRow = row + 1;
                                if (nextRow >= maxRow)
                                        if (debug) System.out.println("Twinkle, ball on maxRow!");
                                        // ball drops off bottom, so send event to TOPIC2 for counting
                                        producer.send(new ProducerRecord<Integer, Integer>(KafkaProperties.TOPIC2, row, col));
                                else // random pick left or right direction and send new location back to tree topic and ON state change
                                        int nextCol = col;
                                        // choose left or right bulb
                                        if (rand.nextBoolean())
                                                nextCol += 1;
                                        if (debug) System.out.println("Twinkle: next " + nextRow + ", " + nextCol);
                                        producer.send(new ProducerRecord<Integer, Integer>(this.topic, nextRow, nextCol));
                                        if (debug) System.out.println("Twinkle + (" + nextRow+ ", " + nextCol + ") ON");
                                        producer.send(new ProducerRecord<Integer, Integer>(KafkaProperties.TOPICON, nextRow, nextCol));
        // processed all records obtained in poll above, now sleep for some time so that lights will stay on for a while.
        try {
                } catch (InterruptedException e) {

    public String name() {
        return null;

    public boolean isInterruptible() {
        return false;

package kafkaxmas;

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

// consumer to count the number of balls in each bin when they drop off the bottom

public class CountConsumer extends ShutdownableThread {
        private final KafkaConsumer<Integer, Integer> consumer;
    private final String topic;
    private final int maxCols = KafkaProperties.TREE_ROWS;
    private long counts[] = new long[maxCols];
    int balls = KafkaProperties.BALLS; // number of balls expected
    Boolean debug = KafkaProperties.DEBUG;
    int runningTotal = 0;
    long startTime = System.currentTimeMillis();

    public CountConsumer(String topic) {
        super("XmasTreeCountConsumer", false);
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "XmasTreeCountConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");

        consumer = new KafkaConsumer<>(props);
        this.topic = topic;

    public void doWork() {
        long pollInterval = 1000;
        ConsumerRecords<Integer, Integer> records = consumer.poll(pollInterval);
        for (ConsumerRecord<Integer, Integer> record : records) {
                        if (debug) System.out.println("Count Consumer records = " + records.count());
                        if (debug) System.out.println("Count Consumer, Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
                        if (record.value() < maxCols)
                if (debug) System.out.println("Count = " + runningTotal);
        // only display counts at end
        long sum = 0;
        for (int i=0; i < maxCols; i++)
                        sum += counts[i];
        if (sum >= balls)
                        System.out.println("A Happy Normally Distributed Xmas! Counts:");
                        for (int i=0; i < maxCols; i++)
                                System.out.println("col " + i + " = " + counts[i]);
                        System.out.println("Total events counted = " + sum);
                        long endTime = System.currentTimeMillis();
                        System.out.println("Total time = " + (endTime - startTime));
        try {
                } catch (InterruptedException e) {

    public String name() {
        return null;

    public boolean isInterruptible() {
        return false;

package kafkaxmas;

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

 * Consumer for the tree display, receives events from ON and OFF topics and displays ASCII tree.
 * Modified version, single consumer subscribes to 2 topics and uses records method to return events for each topic in turn.
 * Consumer docs:
 * Consumer config docs:

public class StateDisplayConsumer extends ShutdownableThread {
        private final KafkaConsumer<Integer, Integer> consumer;
    private final Boolean debug = KafkaProperties.DEBUG; 
    private final String topic1;
    private final String topic2;
    // all lights off by default (false)
    private final int maxRows = KafkaProperties.TREE_ROWS;
    private final int maxCols = maxRows;
    private final boolean[][] tree = new boolean[maxRows][maxCols];     

    public StateDisplayConsumer(String topic1, String topic2) {
        super("XmasTreeStateDisplayConsumer", false);
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "XmasTreeStateDisplayConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");

        consumer = new KafkaConsumer<>(props);
        this.topic1 = topic1;
        this.topic2 = topic2;
        consumer.subscribe(Arrays.asList(topic1, topic2));

    public void doWork() {
        // compute current state of lights from OFF and ON messages

                // Process events in OFF Topic first so if a light changes from OFF to ON instantaneously it will stay on.
        long pollInterval = 1000;
        // get records for all topics
        ConsumerRecords<Integer, Integer> recordsAll = consumer.poll(pollInterval);   
        // get records for OFF topic only
        //ConsumerRecords<Integer, Integer> recordsOFF = (ConsumerRecords<Integer, Integer>) recordsAll.records(KafkaProperties.TOPICOFF);
        Iterable<ConsumerRecord<Integer, Integer>> recordsOFF = recordsAll.records(topic1);
        for (ConsumerRecord<Integer, Integer> record : recordsOFF)
                        //if (debug) System.out.println("Display Consumer OFF records = " + recordsOFF.count());
                        if (debug) System.out.println("Display Consumer, OFF Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());                   
                    // paranoid check in case we had a bigger tree in a previous run and some messages are still hanging around unprocessed.
                        if (record.key() < maxRows && record.value() < maxCols)
                        tree[record.key()][record.value()] = false;    
        // Now process ON topic messages
        Iterable<ConsumerRecord<Integer, Integer>> recordsON = recordsAll.records(topic2);
        //      ConsumerRecords<Integer, Integer> records2 = consumer.poll(pollInterval);  
                for (ConsumerRecord<Integer, Integer> record : recordsON)
                        //if (debug) System.out.println("Display Consumer ON records = " + recordsON.count());
                if (debug) System.out.println("Display Consumer, ON Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());                    
                // paranoid check in case we had a bigger tree in a previous run and some messages are still hanging around unprocessed.
                if (record.key() < maxRows && record.value() < maxCols)
                        tree[record.key()][record.value()] = true;
                // display tree as ASCII
        for (int i=0; i < maxRows; i++)
                        int indent = (maxRows/2) - (i/2);
                        for (int a=0; a < indent; a++)
                                System.out.print(" ");
                        for (int j=0; j <= i; j++)
                                if (tree[i][j])
                                else System.out.print(".");     
        // only display the tree every second
        try {
                } catch (InterruptedException e) {

    public String name() {
        return null;

    public boolean isInterruptible() {
        return false;

The post Apache Kafka Christmas Tree Light Simulation – Seasons Greetings From Instaclustr appeared first on Instaclustr.

Should you use incremental repair?

After seeing a lot of questions surrounding incremental repair on the mailing list and after observing several outages caused by it, we figured it would be good to write down our advices in a blog post.

Repair in Apache Cassandra is a maintenance operation that restores data consistency throughout a cluster. It is advised to run repair operations at leasts every gc_grace_seconds to ensure that tombstones will get replicated consistently to avoid zombie records if you perform DELETE statements on your tables.

Repair also facilitates recovery from outages that last longer than the hint window, or in case hints were dropped. For those operators already familiar with the repair concepts, there were a few back-to-basics moments when the behavior of repair changed significantly in the release of Apache Cassandra 2.2. The introduction of incremental repair as the default along with the generalization of anti-compaction created a whole new set of challenges.

How does repair work?

To perform repairs without comparing all data between all replicas, Apache Cassandra uses merkle trees to compare trees of hashed values instead.

Merkle tree

During a repair, each replica will build a merkle tree, using what is called a “validation compaction”. It is basically a compaction without the write phase, the output being a tree of hashes.

Validation compaction

Merkle trees will then be compared between replicas to identify mismatching leaves, each leaf containing several partitions. No difference check is made on a per partition basis : if one partition in a leaf is not in sync, then all partitions in the leaf are considered as not being in sync. When more data is sent over than is required it’s typically called overstreaming. Gigabytes of data can be streamed, even for one bit of difference. To mitigate overstreaming, people started performing subrange repairs by specifying the start/end tokens to repair by smaller chunks, which results in having less partitions per leaf.

With clusters growing in size and density, performing repairs within gc_grace_seconds started to get more and more challenging, with repairs sometimes lasting for tens of days. Some clever folks leveraged the immutable nature of SSTables and introduced incremental repair in Apache Cassandra 2.1.

What is incremental repair?

The plan with incremental repair was that once some data had been repaired, it would be marked as such and never needed to be repaired anymore.
Since SSTables can contain tokens from multiple token ranges, and repair is performed by token range, it was necessary to be able to separate repaired data from unrepaired data. That process is called anticompaction.


Once a repair session ends, each repaired SSTable will be split into 2 SSTables : one that contains the data that was repaired in the session (ie : data that belonged to the repaired token range) and another one with the remaining unrepaired data. The newly created SSTable containing repaired data will be marked as such by setting its repairedAt timestamp to the time of the repair session.
When performing validation compaction during the next incremental repair, Cassandra will skip the SSTables with a repairedAt timestamp higher than 0, and thus only compare data that is unrepaired.

Incremental repair

Incremental repair was actually promising enough that it was promoted as the default repair mode in C* 2.2, and anticompaction was since then also performed during full repairs.
To say the least, this was a bit of a premature move from the community as incremental repair has a few very annoying drawbacks and caveats that would make us consider it an experimental feature instead.

The problems of incremental repair

The most nasty one is filed in the Apache Cassandra JIRA as CASSANDRA-9143 with a fix ready for the unplanned 4.0 release. Between validation compaction and anticompaction, an SSTable that is involved in a repair can be compacted away as part of the standard compaction process on one node and not on the others. Such an SSTable will not get marked as repaired on that specific node while the rest of the cluster will consider the data it contained as repaired.
Thus, on the next incremental repair run, all the partitions contained by that SSTable will be seen as inconsistent and it can generate a fairly large amount of overstreaming. This is a particularly nasty bug when incremental repair is used in conjunction with Level Compaction Strategy (LCS). LCS is a very intensive strategy where SSTables get compacted way more often than with STCS and TWCS. LCS creates fixed sized SSTables, which can easily lead to have thousands of SSTables for a single table. The way streaming occurs in Apache Cassandra during repair makes that overstreaming of LCS tables could create tens of thousands of small SSTables in L0 which can ultimately bring nodes down and affect the whole cluster. This is particularly true when the nodes use a large number of vnodes.
We have seen happening on several customers clusters, and it requires then a lot of operational expertise to bring back the cluster to a sane state.

In addition to the bugs related to incorrectly marked sstables, there is significant overhead of anti-compaction. It was kind of a big surprise for users upgrading from 2.0/2.1 to 2.2 when trying to run repair. If there is already a lot of data on disk, the first incremental repair can take a lot of time (if not forever) and create a similar situation as above with a lot of SSTables being created due to anticompaction. Keep in mind that anticompaction will rewrite all SSTables on disk to separate repaired and unrepaired data.
While it’s not necessary anymore to “prepare” the migration to incremental repair, we would strongly advise against running it on a cluster with a lot of unrepaired data, without first marking SSTables as repaired. This would require to run a full repair first to make sure data is actually repaired, but now even full repair performs anticompaction, so… you see the problem.

A safety measure has been set in place to prevent SSTables going through anticompaction to be compacted, for valid reasons. The problem is that it will also prevent that SSTable from going through validation compaction which will lead repair sessions to fail if an SSTable is being anticompacted. Given that anticompaction also occurs with full repairs, this creates the following limitation : you cannot run repair on more than one node at a time without risking to have failed sessions due to concurrency on SSTables. This is true for incremental repair but also full repair, and it changes a lot of the habit you had to run repair in previous versions.

The only way to perform repair without anticompaction in “modern” versions of Apache Cassandra is subrange repair, which fully skips anticompaction. To perform a subrange repair correctly, you have three options :

Regardless, it is extremely important to note that repaired and unrepaired SSTables can never be compacted together. If you stop performing incremental repairs once you started, you could end up with outdated data not being cleaned up on disk due to the presence of the same partition in both states. So if you want to continue using incremental repair, make sure it runs very regularly, and if you want to move back to full subrange repairs you will need to mark all SSTables as unrepaired using sstablerepairedset.

Note that due to subrange repair not performing anti-compaction, is not possible to perform subrange repair in incremental mode.

Repair : state of the art in late 2017

Here’s our advice at the time of writing this blog post, based on our experience with customers : perform full subrange repair exclusively for now and do not ever run incremental repair. Just pretend that feature does not exist for now.

While the idea behind incremental repair is brilliant, the implementation still has flaws that can cause severe damage to a production cluster, especially when using LCS and DTCS. The improvements and fixes planned for 4.0 will need to be thoroughly tested to prove they fixed incremental repair and allow it to be safely used as a daily routine.

We are confident that future releases will make incremental repair better, allowing the operation to be safe and blazing fast compared to full repairs.

How AdStage moved beyond startup scale with Apache Cassandra as a data service

Startup growth is never a bad thing – but the strain it puts on an IT team is real. Jason Wu of AdStage told me how AdStage addressed its data scale issues, and why Apache Cassandra-as-a-Database service was the right choice.

The post How AdStage moved beyond startup scale with Apache Cassandra as a data service appeared first on Instaclustr.

Pick‘n’Mix: Cassandra, Spark, Zeppelin, Elassandra, Kibana, & Kafka


 \ käf-kə-ˈesk \

Marked by a senseless, disorienting, menacing, nightmarishly complexity.

Kafkaesque Cassandra, Spark, Zeppelin, Elassandra, Kibana, & Kafka

One morning when I woke from troubled dreams, I decided to blog about something potentially Kafkaesque: Which Instaclustr managed open-source-as-a-service(s) can be used together (current and future)? Which combinations are actually possible? Which ones are realistically sensible? And which are nightmarishly Kafkaesque!?

In previous blogs, I’ve explored Instaclustr managed Apache Cassandra, Spark (batch), Spark Streaming, Spark MLLib, and Zeppelin.

Instaclustr also supports managed Elassandra and Kibana. Elassandra is an integrated Elasticsearch and Cassandra service which computes secondary indexes for data and supports fast queries over the indexes. Kibana is an open source data visualization plugin for Elasticsearch.  Together with Logstash they form the “Elastic stack” (previously the ELK stack).

Apache Kafka, a distributed streaming platform (massively horizontally scalable, high-throughput low-latency, high-reliability, high-availability real-time streaming data processing), is another popular service in the same Open Source ecosystem as Cassandra, Spark and Elasticsearch.  Kafka is on the Instaclustr product roadmap for 2018, and we have a tutorial on spark streaming with kafka and cassandra to wet your appetite.

Rather than jumping straight into a deep dive of Elassandra and/or Kafka I’m going to take a more architectural perspective. I started by putting all the services of interest on a diagram, and then connecting them together based on documented support for each integration combination and direction (source and/or sink):

Kafka Architecture Diagram

Note that Cassandra, Elassandra, Spark (batch) and Spark Streaming, Spark MLLib, Zeppelin and Kibana are tightly integrated, and support most logically possible interactions. Instaclustr also co-locates all of these services on the same nodes by default.

I’ve spent some time examining the Kafka documentation to check what related ecosystem services it can connect to, and in what direction. Kafka supports Source and Sink Connectors which enable integration with numerous other services. Lots of different event sources are supported, enabling data to be ingested from both external and internal devices and systems. AWS S3 is supported as a Kafka sink only, and JDBC as both source and sink. Elassandra is supported as a sink only, and Spark Streaming and Cassandra as source and sink.

Also note that implicitly most services can “talk to” themselves (i.e. read data from, process data, and write data back. This is what the card replacement rule achieves). What’s more interesting is that they can also interact with themselves on the same or different clusters, and for the same or different locations (e.g. in another AWS AZ, or in another region). The diagram shows a Service interacting with itself (same cluster), another instance of the service in the same location (different cluster), and another instance in a different cluster and location (different location):

Kafka Architecture Diagram

This opens up powerful internal service architectural richness and use cases. For example: Differentiation of clusters running the same service (e.g. write-intensive Cassandra cluster feeding data into a read-intensive cassandra cluster); A Kafka cluster dedicated to ingestion only, connecting to others for processing; mirroring or replicating data from one Cassandra cluster (location) to another (e.g. using Spark to read from a Cassandra cluster in one location and write to a Cassandra cluster in another location); Peer-to-Peer Kafka clusters, where each cluster subscribes to events that are local to all other Kafka clusters and aggregates the events locally), etc.

Kafka – some key features

Key features of Apache Kafka

The main Kafka APIs are Connectors, Producers, Consumers and Streams. Kafka is stream event-based, and producers publish events onto one or more topics. Topics are multi-subscriber and can have zero or more consumers that process events. Kafka maintains a (partitioned) immutable commit log of events for each topic, and therefore keeps all published events for a specified retention period. This approach for message processing has a number of benefits. The more obvious benefits are speed, fault-tolerance, high concurrency and scalability. The surprising benefits are that consumers and producers can be very loosely coupled, and events can be shared! More than one consumer can consume the same event, and consumers also control which events to consume – they can consume new events and also re-consume past events.

Kafka’s performance is claimed to be constant with respect to data size, so storing events for an arbitrary length of time (as long as you have disk space!) is encouraged, by design. Because events can be processed more than once, by the same or different consumers, what do we end up with? A database for streaming events!

Let’s explore some permutations of the ecosystem of services (not all permutations will be covered in this blog), starting with Kafka.  In answer to the question “What is Kafka good for?”, the Kafka documentation suggests two broad classes of application. The focus of this blog is on the first use case – getting (streaming) data between systems.

  1. Building real-time streaming data pipelines that reliably get data between systems or applications (this blog)
  2. Building real-time streaming applications that transform or react to the streams of data (next blog).

Use Case: Kafka as a Database (Teenagers bedroom. Stuff goes in, stuff rarely comes out).

Kafka as a database Instaclustr

Kafka only, one or more source connectors, producer(s) publishing to topic(s). No consumers:

Kafka Architecture diagram

This is a trivial, and somewhat counterintuitive use case for Kafka but illustrates one of the surprising benefits of the architecture, that it is designed from the “bed” up as an event streaming database – not just for event movement. All the events arriving will be published to topic(s), and persisted to disk. Events can subsequently be consumed multiple times by multiple consumers, who do not have to be subscribed yet. Is this interesting? Yes! It suggests lots of powerful use cases around event persistence, and reprocessing/replaying of events, and adding derived events (e.g. failure handling, support for multiple consumers and purposes for DevOps to maintain derived stateful data back in Kafka for future use, as well as for processing events from past, present and future, including predictions, in a unified manner).

Use Case: Kafka as a temporary Buffer (Doctors waiting room)

Kafka as a temporary buffer

This pattern has one Kafka cluster feeding into another one:

Kafka Architecture Diagram

This “Buffer” (waiting room) pattern has a Kafka cluster dedicated solely to event ingestion, and another cluster for the event consumers.  This leverages the ability of Kafka to store events indefinitely, and isolate event producers from consumers.  The event production and consumption rates can be significantly different with no loss of events or overloading of consumers. This pattern is ideal for use cases where an incoming event storm can temporarily exceed the processing capacity of the consumers cluster, or if there is some other temporary failure or slowdown preventing the consumers processing events in real-time. The Ingestion cluster buffers all the events until the consumers are ready to process them again. In the wild, this buffer pattern is used by Netflix.

Kafka can act as a event buffer, concentrator, and router in-front of other services in our ecosystem as well. For example, Cassandra, Spark streaming or Elassandra can all be sinks for Kafka events.

Kafka Architecture Diagram 5

Use Case: Kafka active-passive replication

In the Use Cases so far we’ve only used Kafka as a pass-through buffer or longer term persistence mechanism. Kafka producers and consumers can publish/subscribe to/from multiple topics, enabling more complex topologies to be created. In particular, some less obvious patterns can be used to support data replication across multiple Kafka clusters and locations.  

There are a couple of use cases for data replication across clusters/locations. One is for reliability/redundancy and is often called active-passive replication. Data from the source (active) cluster is copied to the passive (target) cluster.  The “passive” cluster can be used in case of failure of the active cluster, or it can be used to reduce latency for consumers that are geo-located near it.

Kafka Architecture Diagram

Use Case: Kafka active-active replication

A more interesting use case is when unique events are collected at different locations, and must be shared among all the locations. This can be between just two locations, or many (P2P). This is an active-active pattern and can be viewed as a generalisation of the active-passive pattern as each cluster acts as both a source and a target for every other cluster, and the events copied from other clusters need to be merged with the events from the local cluster in a new topic (Topic 2 in the diagram below), from which consumers can get all the events. Note that it has to be different topic otherwise you get an event loop!

Kafka Architecture Diagram

(Magic) Mirror Maker

In Japan, bronze mirrors are known as magic mirrors, or makkyo (魔鏡). One side is brightly polished, while an embossed design decorates the reverse side. Remarkably, when light is directed onto the face of the mirror, and reflected to a flat surface, an image “magically” appears (usually the one featured on its back):

Magic Mirror Maker Makkyo

For the use cases involving events being moved between Kafka clusters, how can this be achieved? One obvious mechanism is just to pretend that the clusters are “local”, and read data from the source cluster topic with a consumer and publish it to another topic on the target cluster. This approach can work with low-latency WANs (e.g. clusters on the same AWS AZ). However, there are also a number of more sophisticated solutions. Mirror maker can be used (which also just reads data from the source cluster using a consumer and publishes it to the target cluster using a producer!). Will mirror maker actually work for the active-active use case given that mirror maker can only read/write to/from topics of the same name? Maybe, here’s a trick. More sophisticated solutions exist, including uReplicator from Uber.

Next blog:

What’s the difference between Spark and Kafka Streaming? Event reprocessing/replaying, unifying stream and batch processing, producing and using state, fun with time, large messages, topic discovery, and more!

Kafka Comic

The post Pick‘n’Mix: Cassandra, Spark, Zeppelin, Elassandra, Kibana, & Kafka appeared first on Instaclustr.

TLP Dashboards for Datadog users, out of the box.

We had the pleasure to release our monitoring dashboards designed for Apache Cassandra on Datadog last week. It is a nice occasion to share our thoughts around Cassandra Dashboards design as it is a recurrent question in the community.

We wrote a post about this on the Datadog website here.

For people using Datadog we hope this will give more details on how the dashboards were designed, thus on how to use the dashboards we provided. For others, we hope this information will be useful in the process of building and then using your own dashboards, with the technology of your choice.

The Project

Building an efficient, complete, and readable set of dashboards to monitor Apache Cassandra is time consuming and far from being straightforward.

Those who tried it probably noticed it requires a fair amount of time and knowledge with both the monitoring technology in use (Datadog, Grafana, Graphite or InfluxDB, metrics-reporter, etc) and of Apache Cassandra. Creating dashboards is about picking the most relevant metrics, aggregations, units, chart type and then gather them in a way that this huge amount of data actually provides usable information. Dashboards need to be readable, understandable and easy to use for the final operator.

On one hand, creating comprehensive dashboards is a long and complex task. On the other hand, every Apache Cassandra cluster can be monitored roughly the same way. Most production issues can be detected and analyzed using a common set of charts, organized the same way, for all the Apache Cassandra clusters. Each cluster may require additional operator specific dashboards or charts depending on workload and merging of metrics outside of Cassandra, but those would supplement the standard dashboards, not replace them. There are some differences depending on the Apache Cassandra versions in use, but they are relatively minor and not subject to rapid change.

In my monitoring presentation at the 2016 Cassandra Summit I announced that we were working on this project.

In December 2017 it was release for Datadog users. If you want to get started with these dashboards and you are using Datadog, see how to do this documentation on Datadog integration for Cassandra.

Dashboard Design

Our Approach to Monitoring

The dashboards have been designed to allow the operator to do the following:

  1. Easily detect any anomaly (Overview Dashboard)
  2. Be able to efficiently troubleshoot and fix the anomaly (Themed Dashboards)
  3. Find the bottlenecks and optimize the overall performance (Themed Dashboards)

The 2 later points above can be seen as the same kind of operations which can be supported by the same set of dashboards.

Empowering the operator

We strongly believe that showing the metrics to the operator can be a nice entry point for learning about Cassandra. Each of the themed dashboards monitor a distinct internal processes of Cassandra. Most of the metrics related to this internal process are then grouped up within a Dashboard. We think it makes it easier for the operator to understand Cassandra’s internal processes.

To make it clearer, let’s consider the example of someone completely new to Cassandra. On first repair, the operator starts an incremental repair without knowing anything about it and latencies increase substantially after a short while. Classic.

The operator would notice a read latency in the ‘Overview Dashboard’, then aim at the ‘Read Path Dashboard’. There the operator would be able to notice that the number of SSTables went from 50 to 800 on each node, or for a table. If the chart is there out of the box, even if not knowing what an SSTable is the operator can understand something changed there and that it relates to the outage somehow. The operator would then search in the right direction, probably solving the issue quickly, and possibly learning in the process.

What to Monitor: Dashboards and Charts Detail

Here we will be really focusing on charts details and indications on how to use each chart efficiently. While this post is a discussion of dashboards available for DataDog, the metrics can be visualized using any tool, and we believe this would be a good starting point when setting up monitoring for Cassandra.

In the graphs, the values and percentiles chosen are sometime quite arbitrary and often depend on the use case or Cassandra setup. The point is to give a reference, a starting point on what could be ‘normal’ or ‘wrong’ values. The Apache Cassandra monitoring documentation, the mailing list archive, or #cassandra on #freenode (IRC) are good ways to answer questions that might pop while using dashboards.

Some dashboards are voluntary duplicated across dashboards or within a dashboard, but with distinct visualisation or aggregation.

Detect anomalies: Overview Dashboard

We don’t try to troubleshoot at this stage. We want to detect outages that might impact the service or check that the Cassandra cluster is globally healthy. To accomplish this, this Overview Dashboard aims at both being complete and minimalist.

Complete as we want to be warned anytime “something is happening“ in the Cassandra cluster. Minimalist because we don’t want to miss an important information here because of the flood of non-critical or too low level informations. These charts aim answer the question: “Is Cassandra healthy?”.

TLP Dashboards - Overview

Troubleshoot issues and optimize Apache Cassandra: Themed dashboards

The goal here is to divide the information into smaller, more meaningful chunks. When having an issue, it will often only affect one of the subsystems of Cassandra, so the operator can have all the needed information in one place when working on a specific issue, without having irrelevant informations (for this specific issue) hiding more important information.

For this reason these dashboards must maximize the information on a specific theme or internal process of Cassandra and show all the low level information (per table, per host). We are often repeating charts from other dashboards, so we always find the information we need as Cassandra users. This is the contrary to the overview dashboard needs mentioned above that just shows “high level” information.

Read Path Dashboard

In this dashboard we are concerned about any element that could impact a high level client read. In fact, we want to know about everything that could affect the read path in Cassandra by just looking at this dashboard.

TLP Dashboards - Read Path Dashboard

Write Path Dashboard

This dashboard focuses on a comprehensive view of the various metrics which affect write latency and throughput. Long garbage collection pause times will always result in dips in throughput and spikes in latency, so it is featured prominently on this dashboard.

TLP Dashboards - Write Path Dashboard

SSTable management Dashboard

This dashboard is about getting a comprehensive view of the various metrics which impact the asynchronous steps the data goes through after a write, from the flush to the data deletion with all the compaction processes in between. Here we will be willing to be aware of disk space evolution and make sure asynchronous management of SSTables is happening efficiently or as expected.

TLP Dashboards - SSTable Management Dashboard

Alerting, Automated Anomaly Detection.

To conclude, when happy with monitoring dashboards, it is a good idea to add some alerting rules.

It is important to detect all the anomalies as quickly as possible. To bring monitoring to the next level of efficiency, it is good to be warned automatically when something goes wrong.

We believe adding alerts on each of the “Overview Dashboard” metrics will be sufficient to detect most issues and any major outage, or at least be a good starting point. For each metric, the alerting threshold should be high enough not to trigger false alerts to ensure a mitigating action can be taken. Some alerts should use absolute value (Disk space available, CPU, etc), while others will require relative values. Manually tuning some alerts will be required based on configuration and workload, such as alerting on the latencies.

The biggest risk on alerting is probably to be flooded by false alerts as the natural inclination to start ignoring them, which leads to missing valid ones. As a global guideline, any alert should trigger an action, if it does not, this alert is relatively useless and adds noise.