In-Memory Scylla, or Racing the Red Queen

Racing the Red Queen“Now, here, you see, it takes all the running you can do, to keep in the same place. If you want to get somewhere else, you must run at least twice as fast as that!” — The Red Queen to Alice, Alice Through the Looking Glass

In the world of Big Data, if you are not constantly evolving you are already falling behind. This is at the heart of the Red Queen syndrome, which was first applied to the evolution of natural systems. It applies just as much to the evolution of technology. ‘Now! Now!’ cried the Queen. ‘Faster! Faster!’ And so it is with Big Data.

Over the past decade, many databases have shifted from storing all their data on Hard Disk Drives (HDDs) to Solid State Drives (SSDs) to drop latencies to just a few milliseconds. To get ever-closer to “now.” The whole industry continues to run “twice as fast” just to stay in place.

So as fast storage NVMe drives become commonplace in the industry, they practically relegate SATA SSDs to legacy status; they are becoming “the new HDDs”.

For some use cases even NVMe is still too slow, and users need to move their data to in-memory deployments instead, where speeds for Random Access Memory (RAM) are measured in nanoseconds. Maybe not in-memory for everything — first, because in-memory isn’t persistent, and also because it can be expensive! — but at least for their most speed-intensive data.

All this acceleration is certainly good news for any I/O intensive, latency sensitive applications which will now be able to use those storage devices as a substrate of workloads that used to need to be kept in memory for performance reasons. However, do the speed of accesses in those devices really match what they advertise? And what workloads are most likely to need the extra speed provided by hosting their data in-memory?

In this article we will examine the performance claims of latency-bound access in a real NVMe devices and show that there is still a place for in-memory solutions for extremely latency sensitive applications. To address those workloads, ScyllaDB added an in-memory option to Scylla Enterprise 2018.1.7. We will discuss how that all ties together in a real database like Scylla and how users can benefit from the new addition to the Scylla product.

Storage Speed Hierarchy

Various storage devices have different access speeds. Faster devices are usually more expensive and have less capacity. The table below shows a brief summary of devices in broad use in modern servers and their access latencies.

Device Latency
Register 1 cycle
Cache 2-10ns
DRAM 100-200ns
NVMe 10-100μs
SATA SSD 400μs
Hard Disk Drive (HDD)  10ms

It would be great of course to have all your data in fastest storage available: register or cache, but if your data fits in there it is probably not considered a Big Data environment. On the other hand, if the workload is backed by a spinning disk it is hard to expect good latencies for requests that need to access the underlying storage.. Considering size vs speed tradeoff NVMe does not look so bad here. Moreover, in real life situations the workload needs to fetch data from various places in the storage array to compose a request. In hypothetical scenario with in which two files are accessed for every storage-bound request and access time around ~50μs the cost of a storage-bound access is around 100μs, which is not too bad at all. But how reliable are those access numbers in real life?

Real World Latencies

In practice, we see that NVMe latencies may be much higher than that, though. Even larger than what spinning disks provide. There are a couple of reasons for that. First the technology limitation: SSD becomes slower as it fills up and data is written and rewritten. The reason, is that an SSD has an internal Garbage Collection (GC) process that looks for free blocks and it becomes more time consuming the less free space there is. We saw that some disks may have latencies of hundreds of milliseconds in worst case scenarios. To avoid this problem, freed blocks have to be explicitly discarded by the operating system to make GC unnecessary. This is done by running the fstrim utility periodically (which we absolutely recommend to do), but ironically fstrim that runs in the background may cause latencies by itself. Another reason for larger-than-promised latencies is that a query does not run in isolation. In a real I/O-intensive system like a database, usually there are a lot of latency sensitive accesses such as queries that run in parallel and consume disk bandwidth concurrently with high-throughput patterns like bulk writes and data reorganization (like compactions in ScyllaDB). As a result, latency sensitive requests may end up in a device queue and result in increased tail latency.

It is possible to observe all those scenarios in practice with the ioping utility. ioping is very similar to well-known networking ping utility, but instead of sending requests over the network it sends them to a disk. Here is the result of the test we did on AWS:

No other IO:
99 requests completed in 8.54 ms, 396 KiB read, 11.6 k iops, 45.3 MiB/s generated 100 requests in 990.3 ms, 400 KiB, 100 iops, 403.9 KiB/s min/avg/max/mdev = 59.6 us / 86.3 us / 157.8 us / 27.2 us

Read/Write fio benchmark:
99 requests completed in 34.2 ms, 396 KiB read, 2.90 k iops, 11.3 MiB/s generated 100 requests in 990.3 ms, 400 KiB, 100 iops, 403.9 KiB/s min/avg/max/mdev = 73.0 us / 345.2 us / 5.74 ms / 694.3 us

99 requests completed in 300.3 ms, 396 KiB read, 329 iops, 1.29 MiB/s generated 100 requests in 1.24 s, 400 KiB, 80 iops, 323.5 KiB/s min/avg/max/mdev = 62.2 us / 3.03 ms / 83.4 ms / 14.5 ms

As we can see under normal condition the disk provides latencies in the promised range, but when the disk is under load, max latency can be very high.

Scylla Node Storage Model

To understand what benefit one will have from keeping all the data in memory we need to consider how Scylla storage model works. Here is a schematic describing the storage model of a single node.

Scylla Storage Model

When the database is queried a node tries to locate the requested data in cache and memtables, both of which reside in RAM. If the data is in the cache – good, all that is needed is to combine the data from the cache with the data from memtable (if any) and a reply can be sent right away. But what if the cache has no data (and no indication that data is not present in permanent storage as well)? In this case, the bottom part of the diagram has to be invoked and storage has to be contacted.

The format the data is stored in is called an sstable. Depending on the configured compaction strategy, and on how recently queried data was written and on other factors, multiple sstables may have to be contacted to satisfy a request. Let’s take a closer look at the sstable format.

Very Brief Description Of the SSTable Format

Each sstable consist of multiple files. Here is a list of files for a hypothetical non-compressed sstable.


Most of those files (green ones) are very small and their content is kept in memory while the sstable is open. But there are two exceptions: Data and Index (as indicated in red). Let’s take a closer look at what those two contain.

SSTable Data Format

The Data file stores the actual data. It is sorted according to partition keys, which makes binary search possible. But searching for a specific key in a large file may require a lot of disk access, so to make the task more efficient there is another file, Index, that holds a sorted list of keys and offsets into the Data file where data for those keys can be found.

As one can see, each access to an sstable requires at least two reads from disk (it may be even more depending on the size of the data that has to be read and the place of the key in the index file).

Benchmarking Scylla

Let’s look at how those maximum latencies can affect the behaviour of Scylla. The benchmark was run on a cluster in the Google Compute Engine (GCE) with one NVMe disk. We have experienced that NVMe on GCE is somewhat slow, so in a way it helps to emphasis in-memory benefits. Below is a graph of 99th percentile for access to two different tables. One is a regular table on NVMe disk (red) and another is in memory (green).

p99 Latencies In-Memory vs. SSD

The 99th percentile latency for the on-disk table is much higher and has much more variation in it. There is another line in the graph (in blue) that plots the number of compaction running in the system. It can be seen that the blue graph matches the red one which means that 99th percentile latency of an on-disk table is affected greatly by the compaction process. High on-disk latencies here are a direct result of tail latencies that occurred because user read was queued after compaction read.

Having performance of 20ms for P99 isn’t much for Scylla but in this case, a single not-so-fast NVMe disk was used. Adding more NVMes in raid0 setup will allow for more parallelism and will mitigate the negative effects of queuing, but doing so also increases the price of the setup and at some point will erase all the price benefits of using NVMe while not necessarily achieving the same performance as in-memory setup. In-memory setup allows you to get low and consistent latency at a reasonable price point.


Two new configuration steps are needed to make use of the feature. First, one needs to specify how much memory should be left for in-memory sstable storage. It can be done by adding in_memory_storage_size_mb to scylla.yaml file or specifying --in-memory-storage-size-mb on a command line. After memory is reserved in-memory table can be created by executing:

     key blob PRIMARY KEY,
     "C0" blob,

) WITH compression = {}
  AND read_repair_chance = '0'
  AND speculative_retry = 'ALWAYS'
  AND in_memory = 'true'
  AND compaction = {'class':'InMemoryCompactionStrategy'};

Note new in_memory property there that is set to true and new compaction strategy. Strictly speaking it is not required to use InMemoryCompactionStrategy with in-memory tables but this compaction strategy compacts much more aggressively to get rid of data duplication as fast as possible to save memory.

Note that mix of in-memory and regular tables is supported.


Despite what the vendors may say, real world storage devices can present high tail latencies in the face of competing requests, even for newer technology like NVMe. Workloads that cannot tolerate a jump in latencies under any circumstances can benefit greatly from the new Scylla enterprise in-memory feature. If, on the other hand, a workload can cope with occasionally higher latency for a low number of requests it is beneficial to let Scylla manage what data is held in memory with its usual caching mechanism and to use regular on-disk tables with fast NVMe storage.

Find out more about Scylla Enterprise

The post In-Memory Scylla, or Racing the Red Queen appeared first on ScyllaDB.

Scylla Open Source Release 2.3.3

Scylla Software Release

The Scylla team announces the release of Scylla Open Source 2.3.3, a bugfix release of the Scylla Open Source 2.3 stable branch. Release 2.3.3, like all past and future 2.3.y releases, is backward compatible and supports rolling upgrades.

Note that the latest stable release of Scylla Open Source is release 3.0 and you are encouraged to upgrade to it.

Related links:

Issues solved in this release:

  • A race condition between the “nodetool snapshot” command and Scylla running compactions may result in a nodetool error: Scylla API server HTTP POST to URL ‘/storage_service/snapshots’ failed: filesystem error: link failed: No such file or directory #4051
  • A bootstrapping node doesn’t wait for schema before joining the ring, which may result in a node fail to bootstrap, with error “storage_proxy – Failed to apply mutation”. In particular, this error manifests when a user defined type is used. #4196
  • Counters: Scylla rejects SSTables that contain counters that were created by Cassandra 2.0 and earlier. Due to #4206, Scylla mistakenly rejected some SSTables that were created by Cassandra 2.1 as well.
  • Core: In very rare cases, the commit log replay fails. Commit log replay is used after a node was unexpectedly restarted #4187
  • In some rare cases, during service stop, Scylla exited #4107

The post Scylla Open Source Release 2.3.3 appeared first on ScyllaDB.

How To Set Up A Cluster With Even Token Distribution

Apache Cassandra is fantastic for storing large amounts of data and being flexible enough to scale out as the data grows. This is all fun and games until the data that is distributed in the cluster becomes unbalanced. In this post we will go through how to set up a cluster with predictive token allocation using the allocate_tokens_for_keyspace setting, which will help to evenly distribute the data as it grows.

Unbalanced clusters are bad mkay

An unbalanced load on a cluster means that some nodes will contain more data than others. An unbalanced cluster can be caused by the following:

  • Hot spots - by random chance one node ends up responsible for a higher percentage of the token space than the other nodes in the cluster.
  • Wide rows - due to data modelling issues, for example a partition row which grows significantly larger than the other rows in the data.

The above issues can have a number of impacts on individual nodes in the cluster, however this is a completely different topic and requires a more detailed post. In summary though, a node that contains disproportionately more tokens and/or data than other nodes in the cluster may experience one or more of the following issues:

  • Run out storage more quickly than the other nodes.
  • Serve more requests than the other nodes.
  • Suffer from higher read and write latencies than the other nodes.
  • Time to run repairs is longer than other nodes.
  • Time to run compactions is longer than other nodes.
  • Time to replace the node if it fails is longer than other nodes.

What about vnodes, don’t they help?

Both issues that cause data imbalance in the cluster (hot spots, wide rows) can be prevented by manual control. That is, specify the tokens using the initial_token setting in the casandra.yaml file for each node and ensure your data model evenly distributes data across the cluster. The second control measure (data modelling) is something we always need to do when adding data to Cassandra. The first point however, defining the tokens manually, is cumbersome to do when maintaining a cluster, especially when growing or shrinking it. As a result, token management was automated early on in Cassandra (version 1.2 - CASSANDRA-4119) through the introduction of Virtual Nodes (vnodes).

Vnodes break up the available range of tokens into smaller ranges, defined by the num_tokens setting in the cassandra.yaml file. The vnode ranges are randomly distributed across the cluster and are generally non-contiguous. If we use a large number for num_tokens to break up the token ranges, the random distribution means it is less likely that we will have hot spots. Using statistical computation, the point where all clusters of any size always had a good token range balance was when 256 vnodes were used. Hence, the num_tokens default value of 256 was the recommended by the community to prevent hot spots in a cluster. The problem here is that the performance for operations requiring token-range scans (e.g. repairs, Spark operations) will tank big time. It can also cause problems with bootstrapping due to large numbers of SSTables generated. Furthermore, as Joseph Lynch and Josh Snyder pointed out in a paper they wrote, the higher the value of num_tokens in large clusters, the higher the risk of data unavailability .

Token allocation gets smart

This paints a pretty grim picture of vnodes, and as far as operators are concerned, they are caught between a rock and hard place when selecting a value for num_tokens. That was until Cassandra version 3.0 was released, which brought with it a more intelligent token allocation algorithm thanks to CASSANDRA-7032. Using a ranking system, the algorithm feeds in the replication factor of a keyspace, the number of tokens, and the partitioner, to derive token ranges that are evenly distributed across the cluster of nodes.

The algorithm is configured by settings in the cassandra.yaml configuration file. Prior to this algorithm being added, the configuration file contained the necessary settings to configure the algorithm with the exception of the one to specify the keyspace name. When the algorithm was added, the allocate_tokens_for_keyspace setting was introduced into the configuration file. The setting allows a keyspace name to be specified so that during the bootstrap of a node we query the keyspace for its replication factor and pass that to the token allocation algorithm.

However, therein lies the problem, for existing clusters updating this setting is easy, as a keyspace already exists, but for a cluster starting from scratch we have a chicken and egg situation. How do we specify a keyspace that doesn’t exist!? And there are other caveats, too…

  • It works for only a single replication factor. As long as all the other keyspaces are using the same replication as the one specified for allocate_tokens_for_keyspace all is fine. However, if you have keyspaces with a different replication factor they can potentially cause hot spots.
  • It works when nodes are only added to the cluster. The process for token distribution when a node is removed from the cluster remains unchanged, and hence can cause hot spots.
  • It works with only the default partitioner, Murmur3Partitioner.

Additionally, this is no silver bullet for all unbalanced clusters; we still need to make sure we have a data model that evenly distributes data across partitions. Wide partitions can still be an issue and no amount of token shuffling will fix this.

Despite these drawbacks, this feature gives us the ability to allocate tokens in a more predictable way whilst leveraging the advantage of vnodes. This means we can specify a small value for vnodes (e.g. 4) and still be able to avoid hot spots. The question then becomes, in the case of starting a brand new cluster from scratch, which comes first the chicken or the egg?

One does not simply start a cluster… with evenly distributed tokens

While it might be possible to rectify an unbalance cluster due to unfortunate token allocations, it is better for the token allocation to be set up correctly when the cluster is created. To set up a brand new cluster that takes advantage of the allocate_tokens_for_keyspace setting we need to use the following steps. The method below takes into account a cluster with nodes that spread across multiple racks. The examples used in each step, assumes that our cluster will be configured as follows:

  • 4 vnodes (num_tokens = 4).
  • 3 racks with a single seed node in each rack.
  • A replication factor of 3, i.e. one replica per rack.

1. Calculate and set tokens for the seed node in each rack

We will need to set the tokens for the seed nodes in each rack manually. This is to prevent each node from randomly calculating its own token ranges. We can calculate the token ranges that we will use for the initial_token setting using the following python code:

$ python

Python 2.7.13 (default, Dec 18 2016, 07:03:39)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> num_tokens = 4
>>> num_racks = 3
>>> print "\n".join(['[Node {}] initial_token: {}'.format(r + 1, ','.join([str(((2**64 / (num_tokens * num_racks)) * (t * num_racks + r)) - 2**63) for t in range(num_tokens)])) for r in range(num_racks)])
[Node 1] initial_token: -9223372036854775808,-4611686018427387905,-2,4611686018427387901
[Node 2] initial_token: -7686143364045646507,-3074457345618258604,1537228672809129299,6148914691236517202
[Node 3] initial_token: -6148914691236517206,-1537228672809129303,3074457345618258600,7686143364045646503

We can then uncomment the initial_token setting in the cassandra.yaml file in each of the seed nodes, set it to value generated by our python command, and set the num_tokens setting to the number of vnodes. When the node first starts the value for the initial_token setting will used, subsequent restarts will use the num_tokens setting.

Note that we need to manually calculate and specify the initial tokens for only the seed node in each rack. All other nodes will be configured differently.

2. Start the seed node in each rack

We can start the seed nodes one at a time using the following command:

$ sudo service cassandra start

When we watch the logs, we should see messages similar to the following appear:

INFO  [main] ... - This node will not auto bootstrap because it is configured to be a seed node.
INFO  [main] ... - tokens manually specified as [-9223372036854775808,-4611686018427387905,-2,4611686018427387901]

After starting the first of the seed nodes, we can use nodetool status to verify that 4 tokens are being used:

$ nodetool status
Datacenter: dc1
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load       Tokens       Owns (effective)  Host ID                               Rack
UN  99 KiB     4            100.0%            5d7e200d-ba1a-4297-a423-33737302e4d5  rack1

We will wait for this message appear in logs, then start the next seed node in the cluster.

INFO  [main] ... - Starting listening for CQL clients on ...

Once all seed nodes in the cluster are up, we can use nodetool ring to verify the token assignments in the cluster. It should look something like this:

$ nodetool ring

Datacenter: dc1
Address        Rack        Status State   Load            Owns                Token
                                                                              7686143364045646503   rack1       Up     Normal  65.26 KiB       66.67%              -9223372036854775808  rack2       Up     Normal  65.28 KiB       66.67%              -7686143364045646507  rack3       Up     Normal  99.03 KiB       66.67%              -6148914691236517206   rack1       Up     Normal  65.26 KiB       66.67%              -4611686018427387905  rack2       Up     Normal  65.28 KiB       66.67%              -3074457345618258604  rack3       Up     Normal  99.03 KiB       66.67%              -1537228672809129303   rack1       Up     Normal  65.26 KiB       66.67%              -2  rack2       Up     Normal  65.28 KiB       66.67%              1537228672809129299  rack3       Up     Normal  99.03 KiB       66.67%              3074457345618258600   rack1       Up     Normal  65.26 KiB       66.67%              4611686018427387901  rack2       Up     Normal  65.28 KiB       66.67%              6148914691236517202  rack3       Up     Normal  99.03 KiB       66.67%              7686143364045646503

We can then move to the next step.

3. Create only the keyspace for the cluster

On any one of the seed nodes we will use cqlsh to create the cluster keyspace using the following commands:

$ cqlsh NODE_IP_ADDRESS -u ***** -p *****

Connected to ...
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cassandra@cqlsh> CREATE KEYSPACE keyspace_with_replication_factor_3
    WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 3}
    AND durable_writes = true;

Note that this keyspace can be any name, it can even be the keyspace that contains the tables we will use for our data.

4. Set the number of tokens and the keyspace for all remaining nodes

We will set the num_tokens and allocate_tokens_for_keyspace settings in the cassandra.yaml file on all of the remaining nodes as follows:

num_tokens: 4
allocate_tokens_for_keyspace: keyspace_with_replication_factor_3

We have assigned the allocate_tokens_for_keyspace value to be the name of keyspace created in the previous step. Note that at this point the Cassandra service on all other nodes is still down.

5. Start the remaining nodes in the cluster, one at a time

We can start the remaining nodes in the cluster using the following command:

$ sudo service cassandra start

When we watch the logs we should see messages similar to the following appear to say that we are using the new token allocation algorithm:

INFO  [main] ... - JOINING: waiting for ring information
INFO  [main] ... - Using ReplicationAwareTokenAllocator.
WARN  [main] ... - Selected tokens [...]
INFO  ... - JOINING: Finish joining ring

As per step 2 when we started the seed nodes, we will wait for this message to appear in the logs before starting the next node in the cluster.

INFO  [main] ... - Starting listening for CQL clients on ...

Once all the nodes are up, our shiny, new, evenly-distributed-tokens cluster is ready to go!

Proof is in the token allocation

While we can learn a fair bit from talking about the theory for the allocate_tokens_for_keyspace setting, it is still good to put it to the test and see what difference it makes when used in a cluster. I decided to create two clusters running Apache Cassandra 3.11.3 and compare the load distribution after inserting some data. For this test, I provisioned both clusters with 9 nodes using tlp-cluster and generated load using tlp-stress. Both clusters used 4 vnodes, but one of the clusters was setup using the even token distribution method described above.

Cluster using random token allocation

I started with a cluster that uses the traditional random token allocation system. For this cluster I set num_tokens: 4 and endpoint_snitch: GossipingPropertyFileSnitch in the cassandra.yaml on all the nodes. Nodes were split across three racks by specifying the rack in the file.

Once the cluster instances were up and Cassandra was installed, I started each node one at a time. After all nodes were started, the cluster looked like this:

ubuntu@ip-172-31-39-54:~$ nodetool status
Datacenter: dc1
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN   65.29 KiB  4            16.1%             4ada2c52-0d1b-45cd-93ed-185c92038b39  rack1
UN   65.29 KiB  4            20.4%             c282ef62-430e-4c40-a1d2-47e54c5c8685  rack2
UN  65.29 KiB  4            21.2%             48d865d7-0ad0-4272-b3c1-297dce306a34  rack1
UN  87.7 KiB   4            24.5%             27aa2c78-955c-4ea6-9ea0-3f70062655d9  rack1
UN   65.29 KiB  4            30.8%             bd2d745f-d170-4fbf-bf9c-be95259597e3  rack3
UN  70.36 KiB  4            25.5%             056e2472-c93d-4275-a334-e82f87c4b53a  rack3
UN  70.37 KiB  4            24.8%             06b0e1e4-5e73-46cb-bf13-626eb6ce73b3  rack2
UN   65.29 KiB  4            23.8%             137602f0-3248-459f-b07c-c0b3e647fa48  rack2
UN  99.03 KiB  4            12.9%             cd92c974-b32e-4181-9e14-fb52dd27b09e  rack3

I ran tlp-stress against the cluster using the command below. This generated a write-only load that randomly inserted 10 million unique key value pairs into the cluster. tlp-stress inserted data into a newly created keyspace and tabled called tlp_stress.keyvalue.

tlp-stress run KeyValue --replication "{'class':'NetworkTopologyStrategy','dc1':3}" --cl LOCAL_QUORUM --partitions 10M --iterations 100M --reads 0 --host

After running tlp-stress the cluster load distribution for the tlp_stress keyspace looked like this:

ubuntu@ip-172-31-39-54:~$ nodetool status tlp_stress
Datacenter: dc1
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN   1.29 GiB   4            20.8%             4ada2c52-0d1b-45cd-93ed-185c92038b39  rack1
UN   2.48 GiB   4            39.1%             c282ef62-430e-4c40-a1d2-47e54c5c8685  rack2
UN  1.82 GiB   4            35.1%             48d865d7-0ad0-4272-b3c1-297dce306a34  rack1
UN  3.45 GiB   4            44.1%             27aa2c78-955c-4ea6-9ea0-3f70062655d9  rack1
UN   2.16 GiB   4            54.3%             bd2d745f-d170-4fbf-bf9c-be95259597e3  rack3
UN  1.71 GiB   4            29.1%             056e2472-c93d-4275-a334-e82f87c4b53a  rack3
UN  1.14 GiB   4            26.2%             06b0e1e4-5e73-46cb-bf13-626eb6ce73b3  rack2
UN   2.61 GiB   4            34.7%             137602f0-3248-459f-b07c-c0b3e647fa48  rack2
UN  562.15 MiB  4            16.6%             cd92c974-b32e-4181-9e14-fb52dd27b09e  rack3

I verified the data load distribution by checking the disk usage on all nodes using pssh (parallel ssh).

ubuntu@ip-172-31-39-54:~$ pssh -ivl ... -h hosts.txt "du -sh /var/lib/cassandra/data"
[1] ... [SUCCESS]
1.2G    /var/lib/cassandra/data
[2] ... [SUCCESS]
3.5G    /var/lib/cassandra/data
[3] ... [SUCCESS]
1.3G    /var/lib/cassandra/data
[4] ... [SUCCESS]
2.5G    /var/lib/cassandra/data
[5] ... [SUCCESS]
2.7G    /var/lib/cassandra/data
[6] ... [SUCCESS]
1.8G    /var/lib/cassandra/data
[7] ... [SUCCESS]
564M    /var/lib/cassandra/data
[8] ... [SUCCESS]
2.2G    /var/lib/cassandra/data
[9] ... [SUCCESS]
1.9G    /var/lib/cassandra/data

As we can see from the above results, there was large load distribution across nodes. Node held the smallest amount of data (roughly 560 MB), whilst node held six times that amount of data (~ roughly 3.5 GB). Effectively the difference between the smallest and largest data load is 3.0 GB!!

Cluster using predictive token allocation

I then moved on to setting up the cluster with predictive token allocation. Similar to the previous cluster, I set num_tokens: 4 and endpoint_snitch: GossipingPropertyFileSnitch in the cassandra.yaml on all the nodes. These settings were common to all nodes in this cluster. Nodes were again split across three racks by specifying the rack in the file.

I set the initial_token setting for each of the seed nodes and started the Cassandra process on them one at a time. One seed node allocated to each rack in the cluster.

The initial keyspace that would be specified in the allocate_tokens_for_keyspace setting was created via cqlsh using the following command:

CREATE KEYSPACE keyspace_with_replication_factor_3 WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'} AND durable_writes = true;

I then set allocate_tokens_for_keyspace: keyspace_with_replication_factor_3 in the cassandra.yaml file for the remaining non-seed nodes and started the Cassandra process on them one at a time. After all nodes were started, the cluster looked like this:

ubuntu@ip-172-31-36-11:~$ nodetool status keyspace_with_replication_factor_3
Datacenter: dc1
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN   65.4 KiB   4            32.3%             5ece457c-a3af-4173-b9c8-e937f8b63d3b  rack2
UN  117.45 KiB  4            33.3%             55a94591-ad6a-48a5-b2d7-eee7ea06912b  rack3
UN   70.49 KiB  4            33.3%             93054390-bc83-487c-8940-b99e7b85e5c2  rack3
UN   104.3 KiB  4            35.4%             5d7e200d-ba1a-4297-a423-33737302e4d5  rack1
UN  65.41 KiB  4            31.2%             ecd00ff5-a90a-4d33-b7ab-bdd22e3e50b8  rack1
UN  65.39 KiB  4            33.3%             64802174-885a-4c04-b530-a9b4685b1b96  rack1
UN   65.39 KiB  4            33.3%             0846effa-e4ac-4a19-845e-2162cd2b7680  rack3
UN  104.32 KiB  4            35.4%             5ad47bc0-9bcc-4fc5-b5b0-0a15ad63345f  rack2
UN  65.4 KiB   4            32.3%             4128ca20-b4fa-4173-88b2-aac62539a6d8  rack2

I ran tlp-stress against the cluster using the same command that was used to test the cluster with random token allocation. After running tlp-stress the cluster load distribution for the tlp_stress keyspace looked like this:

ubuntu@ip-172-31-36-11:~$ nodetool status tlp_stress
Datacenter: dc1
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN   2.16 GiB   4            32.3%             5ece457c-a3af-4173-b9c8-e937f8b63d3b  rack2
UN  2.32 GiB   4            33.3%             55a94591-ad6a-48a5-b2d7-eee7ea06912b  rack3
UN   2.32 GiB   4            33.3%             93054390-bc83-487c-8940-b99e7b85e5c2  rack3
UN   1.84 GiB   4            35.4%             5d7e200d-ba1a-4297-a423-33737302e4d5  rack1
UN  2.01 GiB   4            31.2%             ecd00ff5-a90a-4d33-b7ab-bdd22e3e50b8  rack1
UN  2.32 GiB   4            33.3%             64802174-885a-4c04-b530-a9b4685b1b96  rack1
UN   2.32 GiB   4            33.3%             0846effa-e4ac-4a19-845e-2162cd2b7680  rack3
UN  1.83 GiB   4            35.4%             5ad47bc0-9bcc-4fc5-b5b0-0a15ad63345f  rack2
UN  2.16 GiB   4            32.3%             4128ca20-b4fa-4173-88b2-aac62539a6d8  rack2

I again verified the data load distribution by checking the disk usage on all nodes using pssh.

ubuntu@ip-172-31-36-11:~$ pssh -ivl ... -h hosts.txt "du -sh /var/lib/cassandra/data"
[1] ... [SUCCESS]
1.9G    /var/lib/cassandra/data
[2] ... [SUCCESS]
2.4G    /var/lib/cassandra/data
[3] ... [SUCCESS]
1.9G    /var/lib/cassandra/data
[4] ... [SUCCESS]
2.4G    /var/lib/cassandra/data
[5] ... [SUCCESS]
2.4G    /var/lib/cassandra/data
[6] ... [SUCCESS]
2.2G    /var/lib/cassandra/data
[7] ... [SUCCESS]
2.1G    /var/lib/cassandra/data
[8] ... [SUCCESS]
2.4G    /var/lib/cassandra/data
[9] ... [SUCCESS]
2.2G    /var/lib/cassandra/data

As we can see from the above results, there was little variation in the load distribution across nodes compared to a cluster that used random token allocation. Node held the smallest amount of data (roughly 1.83 GB) and nodes,,, and held the largest amount of data (roughly 2.32 GB each). The difference between the smallest and largest data load being roughly 400 MB which is significantly less than the data size difference in the cluster that used random token allocation.


Having a perfectly balanced cluster takes a bit of work and planning. While there are some steps to set up and caveats to using the allocate_tokens_for_keyspace setting, the predictive token allocation is a definite must use when setting up a new cluster. As we have seen from testing, it allows us to take advantage of num_tokens being set to a low value without having to worry about hot spots developing in the cluster.

Distributed Database Things to Know: Consistent Hashing

As with Riak, which I wrote about in 2013, Cassandra remains one of the core active distributed database projects alive today that provides an effective and reliable consistent hash ring for the clustered distributed database system. This hash function is an algorithm that maps data to variable length to data that’s fixed. This consistent hash is a kind of hashing that provides this pattern for mapping keys to particular nodes around the ring in Cassandra.

ValuStor — a memcached alternative built on Scylla

Derek Ramsey, Software Engineering Manager at Sensaphone, gave an overview of ValuStor at Scylla Summit 2018. Sensaphone is a maker of remote monitoring solutions for the Industrial Internet of Things (IIoT). Their products are designed to watch over your physical plant and equipment — such as HVAC systems, oil and gas infrastructure, livestock facilities, greenhouses, food, beverage and medical cold storage. Yet there is a lot of software behind the hardware of IIoT. ValuStor is an example of ostensible “hardware guys” teaching the software guys a thing or two.

Overview and Origins of ValuStor

Open Source Initiative Approved License LogoDerek began his Scylla Summit talk with an overview: what is ValuStor? It is a NoSQL memory cache and a persistent database utilizing a Scylla back-end, designed for key-value and document store data models. It was implemented as a single header-only database abstraction layer and comprises three components: the Scylla database, a ValuStor Client and Cassandra driver (which Scylla can use since it is CQL-compliant).

ValuStor is released as free open source under the MIT license. The MIT license is extremely permissive, allowing anyone to “use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the software.” Which means that you can bake it into your own software, services and infrastructure freely and without reservation.

Derek then went back a bit into history to describe how Sensaphone began down this path. What circumstances gave rise to the origins of ValuStor? It all started when Sensaphone launched a single-node MySQL database. As their system scaled to hundreds of simultaneous connections, they added memcached to help offload their disk write load on the backend database. Even so, they kept scaling, and their system was insufficient to handle the growing load. It wasn’t the MySQL database that was the issue. It was memcached that was unable to handle all the requests.

For the short term, they began by batching requests. Yet Sensaphone needed to address the fundamental architectural issues, including the need for redundancy and scalability. There was also a cold cache performance risk (also known as the “thundering herd” problem) if they ever needed to restart.

No one in big data takes the decision to replace infrastructure lightly. Yet when they did the cutover it was surprisingly quick. It took only three days from inception to get Scylla running in production. And, as of last November, Sensaphone had already been in production for a year.

Since its initial implementation, Sensaphone added two additional use cases: managing web sessions, and for a distributed message queuing fan out for a producer/consumer application (a publish-subscribe design pattern akin to an in-house RabbitMQ or ActiveMQ). Derek recommended that anyone interested check out the published Usage Guide on GitHub.

Comparisons to memcached

Derek made a fair but firm assessment of the limitations of memcached for Sensaphone’s environment. First he cited the memcached FAQ itself, where it says it is not recommended for sessions, since if the cache is ever lost you may lock users off your site. While the PHP manual had a section on sessions in memcached, there is simply no guarantee of survivability of user data.

Second, Derek cited the minimal security implementation (e.g., SASL authentication). There has been a significant amplification of attacks on memcached in recent years (such as DDoS attacks), and while there are ways to minimize risks, there is no substitution for built-in end-to-end encryption.

Derek listed the basic, fundamental architectural limitations: “it has no encryption, no failover, no replication, and no persistence. Of course it has no persistence — it’s a RAM cache.” That latter point, while usually a feature for performance in memcached’s favor, was exactly what leads to the cold cache problem when your server inevitably crashes or has to be restarted.

Sensaphone was resorting to use batching to maintain a semblance of performance, whereas batch is an antipattern for Scylla.

ValuStor: How does it work?

ValuStor Client

Derek described client design, which kept ease-of-use first and foremost. There are only two API functions: get and store. (Deletes are not done directly, Instead, setting a time-to-live — TTL — of 1 second on data is effectively a delete.)

Implemented as an abstraction layer means you can use your data in a native programming language with native data types: integers, floating point, strings, JSON, blobs, bytes, and UUIDs.

For fault tolerance, ValuStor also added a client-side write queue for a backlog function, and automatic adaptive consistency (more on this later).

Cassandra Driver

The Cassandra driver supports thread safety and is multi-threaded. “Practically speaking, that means you can throw requests at it and it will automatically scale to use more CPU resources as required and you don’t need to do any special locking,” Derek explained. “Unlike memcached… [where] the C driver is not thread-safe.”

ValuStor also offers connection control, so if a Scylla node goes down it will automatically re-establish the connection with a different node. It is also datacenter-aware and will choose your datacenters intelligently.

Scylla Database Server

The Scylla server at the heart of ValuStor offers various architectural advantages. “First and foremost is performance. With the obvious question, ‘How in the world can a persistent database compete with RAM-only caching?’”

Derek then described how Scylla offers its own async and userspace I/O schedulers. Such architectural features can, at times, result in Scylla responsiveness with sub-millisecond latencies.

Scylla also has its own cache and separate memtable, which acts as a sort of cache. “In our use case at Sensaphone we have 100% cache hits all the time. We never have to hit the disk, even though it has one, and since our database has never actually gone down we’ve never actually even had to load it from disk except for maintenance periods.”

In terms of cache warming, Derek provided some advice, “The cold cache penalty is actually less severe for Scylla if you use heat-weighted load balancing because Scylla will automatically warm up your cache for you for the nodes you restart.”

Derek then turned to the issues of security. His criticisms were sobering: “Memcached is what I call ‘vulnerable by design.’” In the latest major issue, “their solution was simply to disable UDP by default rather than fix the problem.”

“By contrast, ValuStor comes with complete TLS support right out of the box.” That includes client authentication and server certified verification by domain or IP, over-the-wire encryption within and across datacenters, and of course basic password authentication and access control. You can read more about TLS setup in the ValuStor documentation.

“Inevitably, though, the database is going to go offline from the client perspective. Either you have network outage or you’ll have hardware issues on your database server.” Derek then dove down into a key additional feature for fault tolerance: a client-side write queue on the producer side. It buffers up and performs automatic retries. When the database is back up, it clears its backlog. The client keeps the requests serialized, so that data is not written in the wrong order. “Producers keep on producing and your writes are simply delayed. They aren’t lost.”

Derek then noted “Scylla has great redundancy. You can set your custom data replication factor per keyspace. It can be changed on the fly. And the client driver is aware of this and will route your traffic to the nodes that actually have your data.” You can also set different replication factors per datacenter, and the client is also aware of your multi-datacenter topology.

In terms of availability, Derek reminded the audience of the CAP theorem, “it states you can have Consistency, Availability or Partition tolerance. Pick any two.” This leads to the quorum problem (where you require n/2 + 1 nodes being available), which can lead to fragility issues in multi-datacenter deployments.

To illustrate, Derek showed the following series of graphics:

Quorum Problem: Example 1

Let’s say you have a primary datacenter with three nodes, and a secondary datacenter with two nodes. The outage of any two nodes will not cause a problem in quorum.

Quorum Problem: Example 1 (Primary Datacenter Failure)

However, if your primary datacenter goes offline, your secondary datacenter would not work if it required a strict adherence to quorum being set at n/2 +1.

Quorum Problem: Example 2

In a second example Derek put forth, if you had a primary with three nodes, and two secondary sites, then if your primary site went down, you could still keep operating if the primary site went offline, since there would still be four nodes, which meets the n/2 + 1 requirement.

Quorum Problem: Example 2 (Secondary site failures)

However, if both of your secondary datacenters went offline, Derek observed this failure would have the unfortunate effect of bringing your primary datacenter down with it, even if there was nothing wrong with your main cluster.

“The solution to this problem is automatic adaptive consistency. This is done on the client side.” Since Scylla is an eventually consistent database with tunable consistency, this buys ValuStor “the ability to adaptively downgrade the consistency on a retry of the requests.” This dramatically reduces the issues of likelihood of inconsistency. It also works well with Hinted Handoffs, which further reduces problems when individual nodes go offline.

Derek took the audience on a brief refresher on consistency levels, including ALL, QUORUM, and ONE/ANY. You can learn more about these reading the Scylla documentation and even teach yourself more going through our console demo.

Next, Derek covered the scalability of Scylla. “The Scylla architecture itself is nearly infinitely scalable. Due to the shard-per-core design you can keep throwing new cores and new machines at it and it’s perfectly happy to scale up. With the driver shard-aware, it will automatically route traffic to the appropriate location.” This is contrasted with memcached, which requires manual sharding. “This is not ideal.”

Using ValuStor

Configuring ValuStor is accomplished in a C++ template class. Once you’ve created the table in the database, you don’t even need to write any other CQL queries.

ValuStor Configuration

This is an example of a minimal configuration. There are more options for SSL.

ValuStor Usage

Here is an example of taking a key-value and storing it, checking to see if the operation was successful, performing automatic retries if it did not, and handling errors if the operation still fails.


ValuStor Comparisons

When designing ValuStor Derek emphasized, “We wanted all of the things on the left-hand side. In evaluating some of the alternatives, none of them really met our needs.”

In particular, Derek took a look at the complexity of Redis. It has dozens of commands. It has master-slave replication. And for Derek’s bottom line, it’s not going to perform as well as Scylla. He cited the recent change in licensing to Commons Clause, which has caused some confusion and consternation in the market. He also pointed out that if you do need the complexity of Redis, you can move to Pedis, which uses the Seastar engine at its heart for better performance.

What about MongoDB or CouchDB?

Derek also made comparisons to MongoDB and CouchDB, since ValuStor has full native JSON support and can also be used as a document store. “It’s not as full-featured, but depending on your needs, it might actually be a good solution.” He cited how Mongo also recently went through a widely-discussed licensing change (which we covered in a detailed blog).

Derek Ramsey at Scylla Summit 2018

Derek Ramsey at Scylla Summit 2018

What’s Next for ValuStor

Derek finished by outlining the feature roadmap for ValuStor.

  • SWIG bindings will allow it to connect to a wide variety of languages
  • Improvements to the command line will allow scripts to use ValuStor
  • Expose underlying Futures, to process multiple requests from a single thread for better performance, and lastly,
  • A non-template configuration option

To learn more, you can watch Derek’s presentation below, check out Derek’s slides, or peruse the ValuStor repository on Github.

The post ValuStor — a memcached alternative built on Scylla appeared first on ScyllaDB.

Testing Cassandra compatible APIs

In this quick blog post, I’m going to assess how the databases that advertise themselves as “Cassandra API-compatible” fare in the compatibility department.

But that is all I will do, only API testing, and not an extensive testing, just based on the APIs I see used often. Based on this, you can start building an informed decision on whether or not to change databases.

The contenders:

  • Apache Cassandra 4.0
    • Installation: Build from Source
  • Yugabyte –
    • “YCQL is a transactional flexible-schema API that is compatible with the Cassandra Query Language (CQL). “
    • Installation: Docker
  • ScyllaDB –
    • “Apache Cassandra’s wire protocol, a rich polyglot of drivers, and integration with Spark, Presto, and Graph tools make for resource-efficient and performance-effective coding.”
    • Installation: Docker
  • Azure Cosmos –
    • “Azure Cosmos DB provides native support for NoSQL and OSS APIs including MongoDB, Cassandra, Gremlin and SQL”
    • Installation: Azure Portal Wizard

All installations were done with the containers as they are provided. Cosmos DB used all defaults as they were provided by the wizard interface.

The CQL script used to test was this one:

What I’m not doing on this blog post: performance testing, feature comparison and everything else that is not testing the API. Those might all be more or less important for other use cases, but that is not the scope of this blog.

What was tested

In this test, the following CQL APIs were tested:

  1. Keyspace Creation
  2. Table Creation
  3. Adding a Column to a table (Alter table)
  4. Data Insert
  5. Data Insert with TTL (Time-to-live)
  6. Data Insert with LWT (Lightweight Transactions)
  7. Select Data
  8. Select data with a full table scan (ALLOW FILTERING)
  9. Creating a Secondary Index (2I)

Cassandra 4.0

  • All statements worked (as expected)


LWT Not supported


2i Is Not supported


LWT Not supported


2i Not Supported

Results Table

So, with these results, which are not a full comparison (I have left out other parts offered in these systems), you can decide if it is compatible enough for you.

Scylla Enterprise Release 2018.1.10

Scylla Enterprise Release

The Scylla team is pleased to announce the release of Scylla Enterprise 2018.1.10, a production-ready Scylla Enterprise minor release. Scylla Enterprise 2018.1.10 is a bug fix release for the 2018.1 branch, the latest stable branch of Scylla Enterprise. Scylla Enterprise customers are encouraged to upgrade to Scylla Enterprise 2018.1.10 in coordination with the Scylla support team.

The major fix in this release is avoiding reactor stalls while merging MemTable into the cache. This improvement was gradually added to the open source release, and is now backported to Scylla Enterprise. Related open source issues include: #2012, #2576, #2715, #3053, #3093, #3139, #3186, #3215, #3402, #3526, #3532, #3608, #4030

Other issues fixed in this release are listed below, with open source references, if present:

  • Counters: Scylla rejects SSTables that contain counters that were created by Cassandra 2.0 and earlier. Due to #4206, Scylla mistakenly rejected some SSTables that were created by Cassandra 2.1 as well.
  • TLS: Scylla now disables TLS1.0 by default and forces minimum 128 bit ciphers #4010. More on encryption in transit (client to server) here
  • Core: In very rare cases, the commit log replay fails. Commit log replay is used after a node was unexpectedly restarted #4187
  • On rare cases, and under heavy load, for example, during repair, Scylla Enterprise might OOM and exit with an error such as “compaction_manager - compaction failed: std::bad_alloc (std::bad_alloc)”. #3717, #3716
  • In some rare cases, during service stop, Scylla exited #4107
  • scylla_setup: An option to select server NIC was added #3658
  • Scylla Enterprise 2018.1 fails to run scylla_setup script with or without `--nic` flag on Ubuntu 16.04 when the NIC is not eth0 (--nic flag is ignored when passed)
  • CQL: Selecting from a partition with no clustering restrictions (single partition scan) might have resulted in a temporary loss of writes #3608

Related Links

The post Scylla Enterprise Release 2018.1.10 appeared first on ScyllaDB.

Reaper 1.4 Released

Cassandra Reaper 1.4 was just released with security features that now expand to the whole REST API.

Security Improvements

Reaper 1.2.0 integrated Apache Shiro to provide authentication capabilities in the UI. The REST API remained fully opened though, which was a security concern. With Reaper 1.4.0, the REST API is now fully secured and managed by the very same Shiro configuration as the Web UI. Json Web Tokens (JWT) were introduced to avoid sending credentials over the wire too often. In addition spreaper, Reaper’s command line tool, has been updated to provide a login operation and manipulate JWTs.

The documentation was updated with all the necessary information to handle authentication in Reaper and even some samples on how to connect LDAP directories through Shiro.

Note that Reaper doesn’t support authorization features and it is impossible to create users with different rights.
Authentication is now enabled by default for all new installs of Reaper.

Configurable JMX port per cluster

One of the annoying things with Reaper was that it was impossible to use a different port for JMX communications than the default one, 7199.
You could define specific ports per IP, but that was really for testing purposes with CCM.
That long overdue feature has now landed in 1.4.0 and a custom JMX can be passed when declaring a cluster in Reaper:

Configurable JMX port

TWCS/DTCS tables blacklisting

In general, it is best to avoid repairing DTCS tables, as it can generate lots of small SSTables that could stay out of the compaction window and generate performance problems. We tend to recommend not to repair TWCS tables either, to avoid replicating timestamp overlaps betwen nodes that can delay the deletion of fully expired SSTables.

When using the auto-scheduler though, it is impossible to specify blacklists, as all keyspaces and all tables get automatically scheduled by Reaper.

Based on the initial PR of Dennis Kline that was then re-worked by our very own Mick, a new configuration setting allows automatically blacklisting of TWCS and DTCS tables for all repairs:

blacklistTwcsTables: false

When set to true, Reaper will discover the compaction strategy for all tables in the keyspace and remove any table with either DTCS or TWCS, unless they are explicitely passed in the list of tables to repair.

Web UI improvements

The Web UI reported decommissioned nodes that still appeared in the Gossip state of the cluster, with a Left state. This has been fixed and such nodes are not displayed anymore.
Another bug was the number of tokens reported in the node detail panel, which was nowhere near matching reality. We now display the correct number of tokens and clicking on this number will open a popup containing the list of tokens the node is responsible for:


Work in progress

Work in progress will introduce the Sidecar Mode, which will collocate a Reaper instance with each Cassandra node and support clusters where JMX access is restricted to localhost.
This mode is being actively worked on currently and the branch already has working repairs.
We’re now refactoring the code and porting other features to this mode like snapshots and metric collection.
This mode will also allow for adding new features and permit Reaper to better scale with the clusters it manages.

Upgrade to Reaper 1.4.0

The upgrade to 1.4 is recommended for all Reaper users. The binaries are available from yum, apt-get, Maven Central, Docker Hub, and are also downloadable as tarball packages. Remember to backup your database before starting the upgrade.

All instructions to download, install, configure, and use Reaper 1.4 are available on the Reaper website.

How to build your very own Cassandra 4.0 release

Over the last few months, I have been seeing references to Cassandra 4.0 and some of its new features. When that happens with a technology I am interested in, I go looking for the preview releases to download and test. Unfortunately, so far, there are no such releases. But, I am still interested, so I’ve found it necessary to build my own Cassandra 4.0 release. This is in my humble opinion not the most desirable way to do things since there is no Cassandra 4.0 branch yet. Instead, the 4.0 code is on the trunk. So if you do two builds a commit or two apart, and there are typically at least three or four commits a week right now, you get a slightly different build. It is, in essence, a moving target.

All that said and done, I decided if I could do it, then the least I could do is write about how to do it and let everyone who wants to try it learn how to avoid a couple of dumb things I did when I first tried it.

Building your very own Cassandra 4.0 release is actually pretty easy. It consists of five steps:

  1. Make sure you have your prerequisites
    1. Java SDK 1.8 or Java 1.11 Open Source or Oracle
    2. Ant 1.8
    3. Git CLI client
    4. Python >=2.7<3.0
  2. Download the GIT repository
    1. git clone
  3. Build your new Cassandra release
    1. Cd cassandra
    2. Ant
  4. Run Cassandra
    1. Cd ./bin
    2. ./cassandra
  5. Have fun
    1. ./nodetool status
    2. ./cqlsh

I will discuss each step in a little bit more detail:

Step 1) Verify, and if necessary, install your prerequisites

For Java, you can confirm the JDK presence by typing in:

john@Lenny:~$javac -version
javac 1.8.0_191

For ant:

john@Lenny:~$ ant -version
Apache Ant(TM) version 1.9.6 compiled on July 20 2018

For git:

john@Lenny:~$ git --version
git version 2.7.4

For Python:

john@Lenny:~$ python --version
Python 2.7.12

If you have all of the right versions, you are ready for the next step. If not, you will need to install the required software which I am not going to go into here.

Step 2) Clone the repository

Verify you do not already have an older copy of the repository:

john@Lenny:~$ ls -l cassandra
ls: cannot access 'cassandra': No such file or directory

If you found a Cassandra directory, you will want to delete or move it or your current directory elsewhere. Otherwise:

john@Lenny:~$ git clone
Cloning into 'cassandra'...
remote: Counting objects: 316165, done.
remote: Compressing objects: 100% (51450/51450), done.
remote: Total 316165 (delta 192838), reused 311524 (delta 189005)
Receiving objects: 100% (316165/316165), 157.78 MiB | 2.72 MiB/s, done.
Resolving deltas: 100% (192838/192838), done.
Checking connectivity... done.
Checking out files: 100% (3576/3576), done.

john@Lenny:~$ du -sh *
294M cassandra

At this point, you have used up 294 MB on your host and you have an honest-for-real git repo clone on your host – in my case, a Lenovo laptop running Windows 10 Linux subsystem.

And your repository looks something like this:

  john@Lenny:~$ ls -l cassandra
  total 668
  drwxrwxrwx 1 john john    512 Feb  6 15:54 bin
  -rw-rw-rw- 1 john john    260 Feb  6 15:54
  -rw-rw-rw- 1 john john 101433 Feb  6 15:54 build.xml
  -rw-rw-rw- 1 john john   4832 Feb  6 15:54 CASSANDRA-14092.txt
  -rw-rw-rw- 1 john john 390460 Feb  6 15:54 CHANGES.txt
  drwxrwxrwx 1 john john    512 Feb  6 15:54 conf
  -rw-rw-rw- 1 john john   1169 Feb  6 15:54
  drwxrwxrwx 1 john john    512 Feb  6 15:54 debian
  drwxrwxrwx 1 john john    512 Feb  6 15:54 doc
  -rw-rw-rw- 1 john john   5895 Feb  6 15:54
  drwxrwxrwx 1 john john    512 Feb  6 15:54 examples
  drwxrwxrwx 1 john john    512 Feb  6 15:54 ide
  drwxrwxrwx 1 john john    512 Feb  6 15:54 lib
  -rw-rw-rw- 1 john john  11609 Feb  6 15:54 LICENSE.txt
  -rw-rw-rw- 1 john john 123614 Feb  6 15:54 NEWS.txt
  -rw-rw-rw- 1 john john   2600 Feb  6 15:54 NOTICE.txt
  drwxrwxrwx 1 john john    512 Feb  6 15:54 pylib
  -rw-rw-rw- 1 john john   3723 Feb  6 15:54 README.asc
  drwxrwxrwx 1 john john    512 Feb  6 15:54 redhat
  drwxrwxrwx 1 john john    512 Feb  6 15:54 src
  drwxrwxrwx 1 john john    512 Feb  6 15:54 test
  -rw-rw-rw- 1 john john  17215 Feb  6 15:54
  drwxrwxrwx 1 john john    512 Feb  6 15:54 tools

Step 3) Build your new Cassandra 4.0 release

Remember what I said in the beginning? There is no branch for Cassandra 4.0 at this point, so building from the trunk is quite simple:

john@Lenny:~$ cd cassandra
john@Lenny:~/cassandra$ ant
Buildfile: /home/john/cassandra/build.xml

Total time: 1 minute 4 seconds

That went quickly enough. Let’s take a look and see how much larger the directory has gotten:

john@Lenny:~$ du -sh *
375M cassandra

Our directory grew by 81MB pretty much all in the new build directory which now has 145 new files including ./build/apache-cassandra-4.0-SNAPSHOT.jar. I am liking that version 4.0 right in the middle of the filename.

Step 4) Start Cassandra up. This one is easy if you do the sensible thing

john@Lenny:~/cassandra$ cd ..
john@Lenny:~$ cd cassandra/bin
john@Lenny:~/cassandra/bin$ ./cassandra
john@Lenny:~/cassandra/bin$ CompilerOracle: dontinline org/apache/cassandra/db/Columns$Serializer.deserializeLargeSubset (Lorg/apache/cassandra/io/util/DataInputPlus;Lorg/apache/cassandra/db/Columns;I)Lorg/apache/cassandra/db/Columns;
CompilerOracle: dontinline org/apache/cassandra/db/Columns$Serializer.serializeLargeSubset (Ljava/util/Collection;ILorg/apache/cassandra/db/Columns;ILorg/apache/cassandra/io/util/DataOutputPlus;)V
CompilerOracle: dontinline org/apache/cassandra/db/Columns$Serializer.serializeLargeSubsetSize (Ljava/util/Collection;ILorg/apache/cassandra/db/Columns;I)I

INFO [MigrationStage:1] 2019-02-06 21:26:26,222 - Initializing system_auth.role_members
INFO [MigrationStage:1] 2019-02-06 21:26:26,234 - Initializing system_auth.role_permissions
INFO [MigrationStage:1] 2019-02-06 21:26:26,244 - Initializing system_auth.roles

We seem to be up and running. Its time to try some things out:

Step 5) Have fun

We will start out making sure we are up and running by using nodetool to connect and display a cluster status. Then we will go into the CQL shell to see something new. It is important to note that since you are likely to have nodetool and cqlsh already installed on your host, you need to use the ./ in front of your commands to ensure you are using the 4.0 version. I have learned the hard way that forgetting the ./ can result in some very real confusion.

  john@Lenny:~/cassandra/bin$ ./nodetool status
  Datacenter: datacenter1
  |/ State=Normal/Leaving/Joining/Moving
  --  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
  UN  115.11 KiB  256          100.0%            f875525b-3b78-49b4-a9e1-2ab0cf46b881  rack1
  john@Lenny:~/cassandra/bin$ ./cqlsh
  Connected to Test Cluster at
  [cqlsh 5.0.1 | Cassandra 4.0-SNAPSHOT | CQL spec 3.4.5 | Native protocol v4]
  Use HELP for help.
  cqlsh> desc keyspaces;

  system_traces  system_auth  system_distributed     system_views
  system_schema  system       system_virtual_schema


We got a nice cluster with one node and we see the usual built-in key spaces. Well um… not exactly. We see two new key spaces system_virtual_schema and system_views. Those look very interesting.

In my next blog, I’ll be talking more about Cassandra’s new virtual table facility and how very useful it is going to be someday soon. I hope.

What Apache Cassandra™ Developers Want


One of the fun things we were able to do at conferences we attended in 2018 was to use some “conversation starters”. We asked our booth visitors at Strata NYC and AWS re:Invent to contribute to a community comment block (aka sticky notes on our booth wall). We had four prompts on the post its:

Welcome to the Context Economy

I like the analysts at the 451 Research Group. They’re smart, experienced, no-nonsense, and have a good track record of identifying meaningful movements in the technology space. So when they released their report on 2019 IT trends, I made sure to get my copy and thoroughly go through it.

Some of their identified trends will be instantly recognized by anyone keeping up with our industry—data becoming increasingly important among successful organizations, a ‘no-trust’ security model being implemented by smart companies, and the increasing tilt toward hybrid and multi-cloud deployments for modern applications and data platforms.

No surprises there.

But one of their trends held my attention more than the others: “The Context Economy Begins to Flourish”. I found their term ‘Context Economy’ particularly insightful and the content on why it’s important to be spot on.

What is the ‘Context Economy’?

451 Group defines the context economy as a paradigm where data without context will be mostly worthless to organizations; only contextual data (data and its associated relationships with other data elements) will deliver the lifeblood needed by today’s digital applications whose success hinges on providing the personalized experience needed to attract and retain customers.

451 Group’s research shows that those leading the pack in their respective markets are the ones (naturally) who prioritize contextual experiences:

What are the Requirements for the Context Economy?

No doubt some of you are thinking, “Well, duh… Of course data relationships and context matters—isn’t this what relational databases have been doing for 40 years now?”

Actually, no. This is exemplified by 451 Group saying that only now will the context economy begin to flourish.

RDBMSs do indeed store and enforce relationships between data elements, however they are crippled in supporting today’s modern cloud-style applications in several ways.

First, RDBMSs cannot handle the multiworkload requirements. Data context demands a blending and blurring of transactional, analytical, search, in-memory, and graph operations in the same database, with little to no resource or data contention. For example, today’s fraud applications must digest incoming transactions, analyze the current request in conjunction with searching historical buying behavior, which is then mapped to other highly connected data to determine whether to approve or deny the transaction, and then send the response back to the requesting application in the blink of an eye.

Next, data context requires multimodel support. Today’s applications utilize micro-services architectures to componentize the app’s various functions. Each component has its own best-of-breed data model requirements, which then must be stored and accessed with other models in real time. Further, today’s data relationships differ from those in past generations in that they are highly connected, meaning they outstrip the ability of an RDBMS to efficiently store and query the connected data. A graph database, however, is built for this very purpose.

Lastly, data context needs multilocation capabilities for all supported data models. And make no mistake: this requirement goes beyond the master/slave or multi-master architectures of all RDBMSs and other NoSQL vendors, which are painfully hard to administer and woefully inadequate at supporting continuous availability, both read and write functions at any location, hybrid and multi-cloud support, and high-speed data synchronization.

The Context Database for the Context Economy

At DataStax, our customers are building contextual applications on DataStax Enterprise (DSE) because it natively supports multi-workload, multi-model, and multi-location. Multi-workload is handled via DSE Analytics, DSE Search, inmemory functionality, and distributed graph capabilities, all of which run in the same cluster/database.

DSE also supports all key data models, including tabular, key-value, document, and graph, which can all exist in the same cluster/database. Finally, DSE and its underlying foundation of open source Apache Cassandra™ are the gold standard when it comes to globally distributing and synchronizing data everywhere as well as transparently running in hybrid and multi-cloud fashion.

The 451 Group’s report on 2019 trends, and in particular their point on the context economy, dovetails well with how we see DataStax customers beating out their competition. For more information on DSE, see our Resources website page, get registered for our upcoming Accelerate Conference, and also be sure to download the latest version of DSE to try in your environment.

Anomalia Machina 7 – Kubernetes Cluster Creation and Application Deployment

Kubernetes – Greek: κυβερνήτης = Helmsman

If you are Greek hero about to embark on an epic aquatic quest (encountering one eyed rock throwing monsters, unpleasant weather, a detour to the underworld, tempting sirens, angry gods, etc) then having a trusty helmsman is mandatory (Even though the helmsman survived the Cyclops, like all of Odysseus’s companions, he eventually came to a sticky end).

Anomalia Machina 7 - Odysseus and Polyphemus

Anomalia Machina 7 - Kubernetes


A few months ago we decided to try Kubernetes to see how it would help with our quest to scale our Anomalia Machina application (a massively scalable Anomaly Detection application using Apache Kafka and Cassandra). This blog is a recap of our initial experiences (reported in this webinar) creating a Kubernetes cluster on AWS and deploying the application.

Kubernetes is an automation system for the management, scaling and deployment of containerized applications. Why is Kubernetes interesting?

  • It’s Open Source, and cloud provider (multi cloud) and programming language (polyglot programming) agnostic
  • You can develop and test code locally, then deploy at scale
  • It helps with resource management, you can deploy an application to Kubernetes and it manages application scaling
  • More powerful frameworks built on the Kubernetes APIs are becoming available (e.g. Istio)

Kubernetes and AWS EKS

Our goal was to try Kubernetes out on AWS using the newish AWS EKS, the Amazon Elastic Container Service for Kubernetes. Kubernetes is a sophisticated four year old technology, and it isn’t trivial to learn or deploy on a production cloud platform. To understand, set-up, and use it, you need familiarity with a mixture of Docker, AWS, security, networking, linux, systems administration, grid computing, resource management and performance engineering!  To get up to speed faster, I accepted an invitation to attend an AWS Get-it-done-athon in Sydney for a few days last year to try out AWS EKS. Here’s the documentation I used: EKS user guide, step by step guide, and a useful EKS workshop which includes some extensions.


At a high level Kubernetes has a Master (controller) and multiple (worker) Nodes.

Anomalia Machina 7 - Kubernetes Master (controller) and multiple (worker) Nodes.

Kubernetes has a lot of layers,  a bit like Russian nesting dolls. The smallest doll is the code, the Java classes. Then you have to package it as a Java JAR file, then turn the JAR into a Docker image and upload it to the Docker Hub. The Kubernetes layers include nodes (EC2 instances), pods which run on nodes, and deployments which have 1 or more pods. A Kubernetes Deployment (one of many controller types), declaratively tells the Control Plane what state (and how many) Pods there should be. Finally the control plane, the master, which runs everything. This is the focus of AWS EKS.

Anomalia Machina 7 - Kubernetes has a lot of layers, a bit like Russian nesting dolls

This is a high level overview of AWS EKS. It looks simple enough, but there are a few complications to solve before you can get your application running.

Anomalia Machina 7 - High level overview of AWS EKS


Anomalia Machina 7 - Toto, I have feeling we're not in Kansas anymore

“Toto, I’ve a feeling we’re not in Kansas anymore”

Each AWS EKS runs in an AWS region, and the worker nodes must run in the same region. When I created EKS last year there were only 3 supported regions. There are now lots more EKS regions. Unfortunately my Kafka and Cassandra test clusters were in Ohio, which was not one of the EKS regions at the time. Virginia was the nearest EKS region. Ohio is only 500 km away, so I hoped the application would work ok (at least for testing) across regions. However, running EKS in a different region to other parts of your application may be undesirable due to increased complexity, latency, and extra data charges. It also introduced some unnecessary complications for this experiment as well.

Kubernetes Snakes and Ladders

It turns out that there are lots of steps to get AWS EKS running, and they didn’t all work smoothly (the “snakes”), particularly with the cross region complications. Let’s roll the dice and test our luck.

Anomalia Machina 7 - Rolling Dice

Anomalia Machina 7 - Snakes and Ladder

First we have to prepare things:

1 – Create Amazon EKS service role (5 steps)

2 – Create your Amazon EKS Cluster VPC (12 steps)

3 – Install and Configure kubectl for Amazon EKS (5 steps)

4 – Install aws-iam-authenticator for Amazon EKS (10 steps)

5 – Download and Install the Latest AWS CLI (6 steps)


Many (38) steps and a few hours later we were ready to create our AWS EKS cluster.


6 – Create Your Amazon EKS Cluster (12 steps)

7 – Configure kubectl for Amazon EKS (3 steps)


Next, more steps to launch and configure worker nodes:


8 – Add key pairs to EKS region (extra steps due to multiple regions, down a snake)

9 – Launch and Configure Amazon EKS Worker Nodes (29 steps)


We finally got 3 worker nodes started.


10 – Deploying the Kubernetes Web GUI dashboard is a good idea if you want to see what’s going on (it runs on worker nodes). But then down another snake as the documentation wasn’t 100% clear (12 steps).


11 – Deploy the application (5 steps).  Does it work?

12 – Not yet… down another snake. Configure, debug, test application (30 steps)


After 100+ steps (and 2 days) we could finally ramp up our application on the Kubernetes cluster and test it out. It worked (briefly). The completed “game”, with steps (1-12) numbered, looked like this:

Anomalia Machina 7 - Snakes and ladder completed game with Kubernetes

Here’s the Kubernetes GUI dashboard with the 3 worker nodes and the anomaly detection application pipeline deployed. You can kill pods, and EKS magically replaces them. You can ask for more pods and EKS creates more – just don’t ask for too many!

Anomalia Machina 7 - Kubernetes GUI Dashboard

(Mis)understanding Kubernetes Scaling

Anomalia Machina 7 - (Mis)understanding Kubernetes Scaling


It’s useful to understand the basics of AWS EKS/Kubernetes scaling (I initially didn’t).  When you create a Kubernetes deployment you specify how many replicas (pods) you want. Pods are designed to run a single instance of an application and are the basic unit of concurrency in Kubernetes. They are also designed to be ephemeral – they can be started, killed, and restarted on different nodes. Once running, you can request more or less replicas.  I started out with 1 replica which worked ok.


I then wanted to see what happened if I requested more replicas. I had 3 worker nodes (EC2 instances), so was curious to see how many pods Kubernetes would allow – 1, 2, 3, or more?  (Note that some of the worker node resources are already used by Kubernetes for the GUI and monitoring). I soon found out that by default Kubernetes doesn’t limit the number of pods at all! I.e. it assumes pods use no resources, and allows more pods to be created even when all the worker nodes have run out of spare resources. As a result the worker nodes were soon overloaded, and the whole system became unresponsive – the Kubernetes GUI froze and I couldn’t delete pods. To try and get things back to a working state I manually stopped the EC2 worker instances from the AWS console.


Did this work? No. What I hadn’t realised was that the worker nodes are created as part of an auto scaling group.  

Anomalia Machina 7 - Auto Scaling Groups

To change the number of instances (and therefore nodes) you need to change the auto scaling group minimum and maximum sizes. The number of instances will scale elastically within these limits. After I manually stopped the worker instances I checked later and I found that (1) they had all started again, and (2) they were all saturated again (as Kubernetes was still trying to achieve the desired number of replicas for the deployment).


Therefore, another useful thing to understand about AWS EKS and Kubernetes is that Kubernetes resources are controlled declaratively by the higher layers, but AWS EC2 instances are controlled by AWS. If you need to kill off all the pods you have to change the number of desired replicas to 0 in the Kubernetes deployment.  But if you need more worker nodes you have to change the max/desired sizes in the workers autoscaling group, as Kubernetes cannot override these values. Apparently Kubernetes does not by default enable elastic node auto scaling using the workers auto scaling group. You need the Kubernetes Cluster autoscaler configured for this to work.

Further Resources

Here are a couple of other AWS EKS reviews:


Next blog(s) I’ll provide an update on using Kubernetes to scale the production version of the Anomalia Machina Application (with Prometheus and OpenTracing) and the results at scale.

The post Anomalia Machina 7 – Kubernetes Cluster Creation and Application Deployment appeared first on Instaclustr.

Powering A Next Generation Streaming TV Experience

Sling TV is an over-the-top (OTT) live streaming content platform that instantly delivers live TV and on-demand entertainment via the internet and customer owned and managed devices. This includes a variety of smart televisions, tablets, game consoles, computers, smartphones and streaming devices (16 platforms total). It is currently the number one live TV streaming service with approximately 2.3MM customers.

Outperforming the Competition

With so many options available to potential “cord cutters” (ie, people seeking a wireless- or Internet-based service over a paid TV subscription service), it is important to provide a first-class experience that makes your product stand out in a market that is becoming more and more saturated.

As such, it is critical for Sling TV to provide a highly resilient service that is personalized to each user and scale on demand to keep up with our expanding customer base and changes on the internet. This includes the need to be highly available and resilient while having the ability to centralize business logic across our sixteen platforms to deliver a common experience to our customers.

We also want to reduce the time to market for features in a continuous deployment model and ultimately enable a deployment unit of “datacenter” allowing for another instance of the Sling TV backend to be built on demand as needed in a hybrid cloud environment.

On the backend, we needed a common data store for our core customer and personalized content that is available in all data centers serving our middleware stack. The solution would need to provide media distribution capabilities that include authentication, program catalogs and feature meta-data storage. We had a big list of needs to fulfill, and we wanted a proven solution that would support our next-gen architecture goals.

Why Sling Chose DataStax and DSE Built on Apache Cassandra

We chose DataStax Enterprise (DSE) for three main reasons:

  1. Not only was it important for us to have a proven solution to support our goals, we were also looking for a partner—as opposed to just a vendor—to help us achieve the success we had envisioned.
  2. Having a database designed for hybrid cloud infrastructure was a key part of our strategy, as we needed to be able to replicate data across remote data centers.
  3. DSE would give us virtually an unlimited ability to scale horizontally to keep up with future growth.

DSE has become a key part of our hybrid cloud strategy and has enabled our software to run in private and public clouds with close to the same tooling. With DSE, we are now able to replicate data across the country in less than two seconds, which is a big win for us. We look forward to leveraging DSE to power the future growth of Sling TV.

eBook:The Way of Customer 360


Introducing the Humans of DataStax

At DataStax, we pride ourselves on hiring the most innovative, collaborative, and hard-working people we can find, drawing on our highly distributed and global workforce to propel the company forward.

With more than 65% of our employees being 100% distributed, we have the advantage of being able to pick from a global field of qualified candidates. This also gives us a diverse and interesting team, which we will now be featuring in a series of videos called “Humans of DataStax.”

The first two videos will feature HR Specialist Lynsey ReysNickel and Developer Advocate Amanda Moran.  Lynsey, who came to DataStax on a returnship program for people seeking to re-enter the workforce after a long absence, uses her skill from academia to develop and implement global HR programs and processes. Amanda, who recently joined DataStax, works as an intermediary between Cassandra and DataStax developers.

Amanda and Lynsey illustrate our drive to hire people who demonstrate the core behaviors of accountability, visibility, respect, and growth mindset.

Said Amanda: “I’m right there in the middle, helping out our customers and our engineering team. It’s all about creating the best product that we possibly can to make our developers’ lives and our users’ lives easier.” Amanda also spends her time ensuring that DataStax development projects matter to both our customers and our employees.

For Lynsey, working at DataStax was a chance to get back to work with a company whose values align with hers.

“Leadership has been very supportive of my return to the workplace in a returnship position, and I can see that through the demonstration of their core values and core behaviors,” she said. “I have the same core values.”

You can find the full videos here.

Engineering to Improve Marketing Effectiveness (Part 3) — Scaling Paid Media campaigns

Engineering to Improve Marketing Effectiveness (Part 3) — Scaling Paid Media campaigns

This is the third blog of the series on Marketing Technology at Netflix. This blog focuses on the marketing tech systems that are responsible for campaign setup and delivery of our paid media campaigns. The first blog focused on solving for creative development and localization at scale. The second blog focused on scaling advertising at Netflix through easier ad assembly and trafficking.

Netflix’s Marketing team is interested in raising awareness about our content, our brand and get new consumers excited about signing up to our service. We use a combination of paid media, owned media (e.g. via Netflix twitter handle), and earned media (publicity) to reach people all over the world. We use a mix of art and science to decide which titles to promote in which market.

The Netflix Marketing Tech team focuses on automation and experimentation to help our marketing team save time and money. These marketing campaigns help raise brand awareness on services/channels outside of the Netflix product itself (e.g. social media channels).

What are we solving for?

The marketing tech team’s goal is to build scalable systems which enable marketers at Netflix to efficiently manage, measure, experiment and learn about tactics that help unlock the effectiveness of our paid media efforts.

Improving Incremental Marketing Effectiveness: Netflix wants to use paid media to drive incremental value to the business. For instance, if we are interested in using a campaign to get people to sign up, we only want to measure the success of our campaign on users we caused to sign up, which is a subset of total signups. We measure this through ongoing experiments and control/holdout groups. This lets us understand the difference between all signups (correlational) and incremental signups (causal). For more detailed information on this topic, please refer to some of the work done in this space — incrementality bidding and attribution and measuring ad effectiveness.

Enabling Marketing At Scale: Netflix is now available in over 190 countries and advertises globally outside of our service in dozens of languages, for hundreds of pieces of content, ultimately using millions of promotional assets created solely for the purposes of marketing. One trailer for a title can quickly turn into hundreds of individual ads with permutations by languages, aspect ratios, subtitles, testing variations, etc. Our Marketing Tech platforms need to support the assembly and delivery of a wide variety of campaigns and plan type combinations, on a variety of external advertising platforms at global scale.

Enabling Easy Experimentation: Similar to how we test on the Netflix Product, the Netflix Marketing team embraces experimentation to identify the best marketing tactics for spending paid media dollars. The Marketing Tech team seeks to create technology that will enable our partners in marketing to spend more of their time on strategic and creative decisions. Our teams use experimentation to guide their instincts on the best performing campaigns. We enable experimentation through methodologies like A/B testing and geo-based quasi testing.

Near Real-time Measurement: As an advertiser running paid media across multiple global ad platforms, our systems need to provide accurate, near real-time answers about campaign performance. We then use this data to adapt and optimize our campaign spend in order to achieve our conversion goals.

How are we solving them?

The tech systems which solve for these problems can be broken into the following types:

  • Systems which are responsible for automating the workflows used for buying paid media and unlocking efficiencies in those flows (media planner)
  • Systems which help with creative development & localization and assembly of ads from the creative assets.
  • Systems which are responsible for marketing campaign creation and execution (campaign management system)
  • Systems which are responsible for collecting analytics and insights into how our campaigns are performing on ad platforms like Google (advertising insights)
  • Systems that enable changing budget on live campaigns in order to optimize our marketing spend (ad budget optimization)

Here is a highly simplified life cycle of a paid media marketing campaign.

Lifecycle of a paid media campaign

This blog will focus mainly on the campaign management and ad budget optimization systems.

Campaign Management System

First some terminology for those less familiar with this space. A campaign is a set of advertisements with a single idea or theme. An advertising campaign is typically broadcast through several media channels like programmatic, digital reserve, TV, print, Billboards, etc. A campaign consists of the following:

  • Objective: A campaign has specific goals and objectives.
  • Target Audience: It refers to the set of audience and languages within regions that are targeted by the campaign.
  • Catalog: contains information about all ads that are part of the campaign. It consists of a list of titles, assets links and ad formats e.g videos ads, carousel, etc.
  • Budget: is the spend that is associated with a campaign. You can apply this spend to each day the campaign runs (daily budget) or over the lifetime of the campaign (lifetime budget).

Programmatic bidding is the automated bidding on advertising inventory, for the opportunity to show an ad to a specific internet user, in a specific context.

The programmatic marketing team at Netflix is responsible for planning, setting up and executing marketing campaigns on the ad platforms. Each campaign requires a separate structure, and combinations of different geography, inventory sources, etc. We built our campaign management system in order to automate large parts of the campaign creation process. The system automates the process of campaign creation by abstracting out complexities in ad catalog setup, budget recommendation, experimentation, audience and campaign setup.

The complexity of the campaigns is further increased because the programmatic marketing team often runs experiments as part of a campaign. For example, we may use a campaign to test the relative effectiveness of a 30 second creative vs. a 6 second creative. Other experiments might try to determine optimal campaign parameters like budget allocation. Without tooling, all of the combinatorics would lead to dramatic increase in campaign setup time and complexity.

To solve this, our campaign management system has the notion of plan type (recipe) — pre-built combinations of various factors such as campaign type, objective, etc. With the help of this feature, we enable the selection of an appropriate recipe and add setup information for multiple cross-country and cross-platform tests in a single place. This dramatically reduces campaign setup time, removes error prone manual steps, and increases our confidence in test learnings.

System architecture

The Campaign Management Service relies on a variety of technologies to achieve its goals. The majority of the service layer is written in Kotlin and Java. Cassandra is the primary store for most data. The UI is built on top of Node.js using React components which communicates with the backend service via REST. Titus provides container based jobs which can be used for heavy lifting, such as uploading videos to ad platforms.

Ad Budget Optimization System

When we run a marketing campaign with an objective of increasing incremental new sign ups, we are faced with the challenge of spending marketing budgets across the globe. For example, should the next incremental dollar get allocated to marketing in the U.S. or Thailand if incremental sign up and revenue is our ultimate objective? Stated simply, this is a budget allocation problem with several hard to measure factors that are behind it — Netflix product/market fit, cost of media, etc.

We solve this problem by building a system to dynamically distribute budgets across campaigns and countries to maximize incremental revenue. The system retrieves live campaign performance data from ad platforms and calculates the budget allocation per country and platform based on the current spend and the number of days remaining in the campaign.

System architecture

There are three main components in the budget optimization system. The front-end provides a CRUD interface for entering campaign metadata (campaign description, start and end dates, budget, etc). It also uses the Netflix workflow orchestration engine Meson to enable the ability to view, select and execute specific budget runs. The next component is a data ETL pipeline which is responsible for calculating input metrics like spend, lift, etc and persisting those in Hive tables. The third component is the backend which reads campaign metadata from S3, spend/lift metrics from Hive and applies machine learning models to calculate the budget allocation per county and ad platform. The updated budget allocations are then pushed to the external platforms through API integrations.

Future challenges

As our business is evolving, our systems need to scale to support for accelerated experimentation and expanded creative inventory. We will also continue to fine tune our systems to make our workflows as automated as possible. The less time and effort spent manually creating ad campaigns, the faster we will be able to move as a business. Our goal is to continue to build an efficient, robust and scalable marketing platform that is greater than the sum of its parts and which ultimately enables consumer delight.


In summary, we’ve discussed the mechanics of creation, delivery and optimization of Netflix campaigns at global scale. Some of the details themselves are worth follow-up posts and we’ll be publishing them in the future. We need back-end engineers to build robust data pipelines and facilitate communication and integration between our many services. We’re also looking for front-end engineers to build beautiful, intuitive user interfaces that are a pleasure to use and provide a cohesive look and feel across our ecosystem. If you’re interested in joining us in working on some of these opportunities within Netflix’s Marketing Tech, we’re hiring!

Authored By Jayashree Biswas, Gopal Krishnan

Engineering to Improve Marketing Effectiveness (Part 3) — Scaling Paid Media campaigns was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

14 Things To Do When Setting Up a New Cassandra Cluster

Over the last few years, we’ve spent a lot of time reviewing clusters, answering questions on the Cassandra User mailing list, and helping folks out on IRC. During this time, we’ve seen the same issues come up again and again. While we regularly go into detail on individual topics, and encourage folks to read up on the fine print later, sometimes teams just need a starting point to help them get going. This is our basic tuning checklist for those teams who just want to get a cluster up and running and avoid some early potholes.

  1. Adjust The Number of Tokens Per Node

    When the initial work on vnodes in Cassandra was started on the mailing list, 256 tokens was chosen in order to ensure even distribution. This carried over into CASSANDRA-4119 and became part of the implementation. Unfortunately, there were several unforeseen (and unpredictable) consequences of choosing a number this high. Without spending a lot of time on the subject, 256 can cause issues with bootstrapping new nodes (lots of SSTables), repair takes longer, and CPU usage is overall higher. Since Cassandra 3.0, we’ve had the ability to allocate tokens in a more predictable manner that avoids hotspots and leverages the advantage of vnodes. We thus recommend using a value of 4 here, which gives a significant improvement in token allocation. In order to use this please ensure you’re using the allocate_tokens_for_keyspace setting in cassandra.yaml file. It’s important to do this up front as there’s not an easy way to change this without setting up a new data center and doing migration.

    Note: Using 4 tokens won’t allow you to add a single node to a cluster of 100 nodes and have each node get an additional 1% capacity. In practice there’s not much benefit from doing that, as you would always want to expand a cluster by roughly 25% in order to have a noticeable improvement, and 4 tokens allows for that.

  2. Configure Racks, Snitch, and Replication

    Cassandra has the ability to place data around the ring in a manner that ensures we can survive losing a rack, an availability zone, or entire data center. In order to do this correctly, it needs to know where each node is placed, so that it can place copies of data in a fault-tolerant manner according to the replication strategy. Production workloads should ALWAYS use the NetworkTopologyStrategy, which takes the racks and data centers into account when writing data. We recommend using GossipingPropertyFileSnitch as a default. If you’re planning on staying within a cloud provider it’s probably easier to use the dedicated snitch, such as the EC2Snitch, as they figure out their rack and data center automatically. These should all be set up before using the cluster for production as changing it is extremely difficult.

  3. Set up Internode Encryption

    We discuss how to do this in our guide on setting up internode encryption. I won’t go deep into the details here since Nate did a good job of that in his post. This falls in the “do it up front or you’ll probably never do it” list with #1 and #2.

  4. Set up client authentication.

    Using authentication for your database is a good standard practice, and pretty easy to set up initially. We recommend disabling the Cassandra user altogether once auth is set up, and increasing the replication factor (RF) of the system_auth keyspace to a few nodes per rack. For example, if you have 3 racks, use RF=9 for system_auth. It’s common to read folks advising increasing the system_auth to match the cluster size no matter how big it is, I’ve found this to be unnecessary. It’s also problematic when you increase your cluster size. If you’re using 1000 nodes per data center, you’ll need a higher setting, but it’s very unlikely your first cluster will be that big. Be sure to bump up the roles_validity_in_ms, permissions_validity_in_ms, and credentials_validity_in_ms settings (try 30 seconds) to avoid hammering those nodes with auth requests.

    Using authentication is a great start, but adding a layer of authorization to specific tables is a good practice as well. Lots of teams use a single Cassandra cluster for multiple purposes, which isn’t great in the long term but is fine for starting out. Authorization lets you limit what each user can do on each keyspace and that’s a good thing to get right initially. Cassandra 4.0 will even have the means of restricting users to specific data centers which can help segregate different workloads and reduce human error.

    The Security section of the documentation is worth reading for more details.

  5. Disable Dynamic Snitch

    Dynamic snitch is a feature that was intended to improve the performance of reads by preferring nodes which are performing better. Logically, it makes quite a bit of sense, but unfortunately it doesn’t behave that way in practice. In reality, we suffer from a bit of the Observer Effect, where the observation of the situation affects the outcome. As a result, the dynamic snitch generates quite a bit of garbage, so much in fact that using it makes everything perform significantly worse. By disabling it, we make the cluster more stable overall and end up with a net reduction in performance related problems. A fix for this is actively being worked on for Cassandra 4.0 in CASSANDRA-14459.

  6. Set up client encryption

    If you’re going to set up authentication, it’s a good idea to set up client encryption as well, or else everything (including authentication credentials) is sent over cleartext.

    Again, the Security section of the documentation is worth reading for more details on this, I won’t rehash what’s there.

  7. Increase counter cache (if using counters)

    Counters do a read before write operation. The counter cache allows us to skip reading the value off disk. In counter heavy clusters we’ve seen a significant performance improvement by increasing the counter cache. We’ll dig deeper into this in a future post and include some performance statistics as well as graphs.

  8. Set up sub range repair (with Reaper)

    Incremental repair is a great idea but unfortunately has been the cause of countless issues, which we discussed in our blog previously. Cassandra 4.0 should fix the remaining issues with incremental repair by changing the way anti compaction works.

    Read up on the details in CASSANDRA-9143

  9. Setup Monitoring

    Without good monitoring it’s just not possible to make good decisions on a single server, and the problem is compounded when dealing with an entire cluster. There’s a number of reasonable monitoring solutions available. At the moment we’re fans of Prometheus, if you chose to self host, and Datadog if you prefer hosted, but this isn’t meant to be an exhaustive list. We recommend aggregating metrics from your application and your databases into a single monitoring system.

    Once you’ve got all your metrics together, be sure to create useful dashboards that expose:

    • Throughput
    • Latency
    • Error rate

    These metrics must be monitored from all layers to truly understand what’s happening when there’s an issue in production.

    Going an extra step or two past normal monitoring is distributed tracing, made popular by the Google Dapper paper. The open source equivalent is Zipkin, which Mick here at The Last Pickle is a contributor to and wrote a bit about some time back.

  10. Backups

    Cassandra’s fault tolerance makes it easy to (incorrectly) disregard the usual advice of having a solid backup strategy. Just because data is replicated doesn’t mean we might not need a way to recover data later on. This is why we always recommend having a backup strategy in place.

    There are hosted solutions like Datos, Cassandra Snapshots, volume snapshots (LVM, EBS Volumes, etc), incremental Cassandra backups, home rolled tools, etc. It’s not possible to recommend a single solution for every use case. The right solution is going to be dependent on your requirements.

  11. Basic GC Tuning

    The default Cassandra JVM argument is, for the most part, unoptimized for virtually every workload. We’ve written a post on GC tuning so we won’t rehash here. Most people assume GC tuning is an exercise in premature optimization and are quite surprised when they can get a 2-5x improvement in both throughput (queries per second) and p99 latency.

  12. Disable Materialized Views

    When materialized views (MVs) were added to Cassandra 3 everyone, including me, was excited. It was one of the most interesting features added in a long time and the possibility of avoiding manual denormalization was very exciting. Unfortunately, writing such a feature to work correctly turned out to be extremely difficult as well.

    Since the release of the feature, materialized views have retroactively been marked as experiemental, and we don’t recommend them for normal use. Their use makes it very difficult (or impossible) to repair and bootstrap new nodes into the cluster. We’ll dig deeper into this issue in a later post. For now, we recommend setting the following in cassandra.yaml file:

    enable_materialized_views: false

  13. Configure Compression

    We wrote a post a while back discussing tuning compression and how it can help improve performance, especially on read heavy workloads.

  14. Dial Back Read Ahead

    Generally speaking, whenever data is read off disk, it’s put in the page cache (there are exceptions). Accessing data in the page cache is significantly faster than accessing data off disk, since it’s reading from memory. Read ahead simply reads extra data. The logic is that adding extra data to a read is cheap, since the request is already being made, so we might as well get as much data in the page cache as we can.

    There’s a big problem with this.

    First, if read ahead is requesting information that’s not used, it’s a waste of resources to pull extra data off disk. Pulling extra data off disk means the disk is doing more work. If we had a small amount of data, that might be fine. The more data we have, the higher the ratio of disk space to memory, and that means it keeps getting less and less likely we’ll actually have the data in memory. In fact, we end up finding it likely we’ll only use some of the data, some of the time.

    The second problem has to do with page cache churn. If you have 30GB of page cache available and you’re accessing 3TB of data, you end up pulling a lot of data into your page cache. The old data needs to be evicted. This process is an incredible waste of resources. You can tell if this is happening on your system if you see a kswapd process taking up a lot of CPU, even if you have swap disabled.

    We recommend setting this to 0 or 8 sectors (4KB) on local SSD, and 16 (KB) if you’re using EBS in AWS and you’re performing reads off large partitions.

    blockdev --setra 8

    You’ll want to experiment with the read ahead setting to find out what works best for your environment.

There’s quite a bit to digest, so I’ll stop here. This isn’t a comprehensive list of eveything that can be adjusted by any means, but it’s a good start if you’re looking to do some basic tuning. We’ll be going deeper into some of the topics in later posts!

Handling a Cassandra transactional workload

Overview of Cassandra

As previously mentioned in my notes on lightweight transactions, Cassandra does not support ACID transactions. Cassandra was built to support a brisk ingest of writes while being distributed for availability. Follow the link to my previous post above to learn more specifics about LWTs and the Paxos algorithm.

Here I’ll cover some other ways to handle a transactional workload in Cassandra.

Batch mode

The standard write path for Cassandra is from client to memtable, and commit log to sstable. The write is stored on the memtable and commitlog of replica nodes (as configured using replication factor) before it is considered complete.

The batch write path includes, in addition, a batch log which is used to group updates that are then considered complete (or not) together. This is an expensive operation unless batch writes affect a single partition key.

As with lightweight transactions, time coordination among nodes remains important with batched writes.


To avoid expensive lightweight transactions or batched writes, software can be installed beside Cassandra to manage writes that need to be done together. Applications coordinate with the software to introduce locks to the write path to ensure atomicity and isolation of updates; the software used manages these locks.

Two software tools that can be used for this type of workaround are Apache Zookeeper and Hashicorp Consul. Both of these tools are typically used to manage distributed configuration but can be leveraged to assist with Cassandra transactions. Whereas Zookeeper was originally created as an in-memory data store, Consul was built to be a configuration manager.


Because Zookeeper is essentially a data store, several libraries were created for the locking functionality. Two of these are Google’s Cages and Netflix’s Curator (now maintained as an Apache project). Note that Zookeeper and the Cages/Curator libraries have not been updated in several years. There is no reason application developers could not write similar functionality within their main application to interact with Zookeeper, perhaps using these as references.


Cages is a Java library used to synchronize the movement of data among distributed machines, making Cassandra transactional workloads an ideal use case.

Cages includes several classes for reading and writing data. A pertinent one for transactional workloads is ZkWriteLock, used to wrap statements inside a lock stored in Zookeeper. Note that this lock stored in Zookeeper has nothing to do with the underlying Cassandra functionality, and must be adhered to by all parts of the application. Indeed, the application or another user could bypass the lock and interact directly with Cassandra.


Curator was created specifically to manage Zookeeper, resulting in a tighter integration. Curator works similarly to Cages, though, wrapping statements in a mutex and requiring the application to observe the locks to ensure data consistency.


Consul is also a distributed storage system used to manage configuration and similar data. It is recently developed and remains up-to-date.

The distribution of Consul storage is highly flexible and performant, making it a great alternative to Zookeeper. The basic interaction from the application remains the same: the application would store a lock as a key-value in Consul, and all writes from the application would need to respect the lock.


Introducing extra steps in the write path is not free with regard to performance.

In addition to the lag inherent to locking, Zookeeper can become a bottleneck. This can be avoided by scaling the Zookeeper clusters. A feature called Observer helps to reduce time spent getting a quorum from the Zookeeper cluster.

Regardless, there is an upper limit — of about 5-10K operations per second — that you can perform per second against Zookeeper, so take this into consideration when planning an architecture.


If the transactional workload is infrequent and minimal, lightweight transactions should suffice. However, if transactions are a core function of the application, we recommend using Zookeeper or Consul to manage write locks. Zookeeper has a longer history, but Consul is more up-to-date and provides great flexibility and performance, giving us a preference for Consul.

DZone: How to Run Apache Cassandra on Kubernetes

With Kubernetes’ popularity skyrocketing and the adoption of Apache Cassandra growing as a NoSQL database well-suited to matching the high availability and scalability needs of cloud-based applications, it should be no surprise that more developers are looking to run Cassandra databases on Kubernetes. However, many devs are finding that doing so is relatively simple to get going with, but considerably more challenging to execute at a high level.

Read on to explore how to run Apache Cassandra on Kubernetes.

The post DZone: How to Run Apache Cassandra on Kubernetes appeared first on Instaclustr.

Anomalia Machina 6 – Application Tracing with OpenTracing: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra

In the previous blog (Anomalia Machina 5 – Application Monitoring with Prometheus) we explored how to better understand an Open Source system using Prometheus for distributed metrics monitoring. In this blog we have a look at another way of increasing visibility into a system using OpenTracing for distributed tracing.

1 A history of Tracing

Over a hundred years ago one of the first automated duplicating machines was invented, the Mimeoscope. It was a light table which allowed operators to make quick copies of pictures by tracing them.

History of tracing - Anomalia Machina 6

Move forward 80 years (to 1981) for another example of the power of tracing. Sometimes even a simple trace is enough to convey meaning. This outline city skyline is immediately recognisable (And for extra points, the movie?)

City skyline outline - Anomalia Machina 6

You’ve been given a mission to fly into an “abandoned” city that’s been turned into a maximum security penal colony (with no guards inside, just the worst criminals, all with a one way ticket), in a glider, at night. Is a “tracing” of the city sky line going to be sufficient for you to navigate to a safe landing, rescue the president, and escape (all within 24 hours when you’ll explode!?) Evidently, if you believe the cult movie “Escape From New York” (1981), it is.

The wire-frame “tracing” computer graphics on the display screens in the glider were not actually computer-generated. Computer animation was so expensive in the early 1980s that they “simulated” it! To make the film’s wire-frame “computer” animation of Manhattan, they built a model of the (futuristic, it was supposed to be 1997) city and lined the buildings’ edges with glow-in-the-dark tape, filmed under UV light, and moved a camera through the model city.   So, basically a fake computer animation, of a model, of a future version of an actual city!

Nose View Manhattan - Anomalia Machina 6

Fast forward again the present day. We want to increase observability into our distributed Anomalia Machina system.  Here’s the pipeline we’ll be using showing the main components including Kafka load generator, Kafka, Kafka consumer, and detector pipeline which writes and reads data to/from Cassandra and runs the algorithm:

Functional Architecture Prototype - Anomalia Machina 6

In the previous blog we used Prometheus to collect metrics including latency and throughput from each stage of the pipeline. Metrics are useful, but don’t tell the complete story. In particular, they are typically aggregations of metrics, and don’t have context (e.g. dependency and order information) or the ability to drill down to single events. This is where distributed tracing comes into play. The paper “Principled workflow-centric tracing of distributed systems” identifies some use cases for distributed tracing including:

  • Anomaly detection (identifying anomalous workflows)
  • Diagnosing steady-state problems
  • Service Level Objectives (SLOs)
  • Distributed profiling
  • Resource attribution
  • Workload modeling.

This list isn’t complete, they missed out performance modelling (what you get when you combine profiling with resource attribution and workload modelling), and architecture discovery and modelling (APM vendors have different names for this e.g. service dependency maps, application topology discovery etc).

Topology maps (which are also used for systems other than just software, such as the famous London underground map) are particularly informative as they abstract away unnecessary details of a system to reveal only the important information (E.g. that there animals hiding on the underground!)

Topology Map, hidden animals - Anomalia Machina 6

In this blog we’re going to explore how to go about adding distributed tracing to Anomalia Machina using OpenTracing. We may not find a hiding Hound, but we will hopefully learn something interesting about distributed tracing.

2 What is OpenTracing?

“OpenTracing provides an open, vendor-neutral standard API for describing distributed transactions, specifically causality, semantics and timing.”

The most important thing to understand about OpenTracing is that’s a specification, not an implementation. To make it work you need (1) application instrumentation and (2) an OpenTracing compatible tracer.  Here’s a list of the supported tracers. Many are open source, and a few are commercial APM tools (including DataDog, and possibly Dynatrace in the future).

The specification is a language-independent and vendor-neutral specification of the semantics required to enable distributed tracing. It consists of a Data Model, an API, and semantic conventions. You use the API to instrument your code, but then you need to pick a tracer implementation to actually generate the traces, collect them, and visualise them. This diagram shows the high-level relationship between the traced applications (on the left), the API (in the middle), and the tracer implementations (on the right).

OpenTracing microservices process - Anomalia Machina 6

The core concept in OpenTracing is (oddly enough) Spans (rather than Traces):

“Spans are logical units of work in a distributed system, and they all have a name, a start time, and a duration. Spans are associated with the distributed system component that generated them.” (i.e. they are associated with the tracer instance that produced them).

A simple way of illustrating spans is with Bridges. The simplest bridge (e.g. the Sydney Harbour Bridge, the “Coathanger”) has a single span.

Illustrating spans with bridges - Anomalia Machina 6

Another important concept is relationships:

“Relationships are the connections between Spans. A Span may reference zero or more other Spans that are causally related. These connections between Spans help describe the semantics of the running system, as well as the critical path for latency-sensitive (distributed) transactions.”

OpenTracing defines two types of references: ChildOf and FollowsFrom. ChildOf is for synchronous dependencies where a result is typically returned from the child to the parent. FollowsFrom is for asynchronous relationships where the parent does not wait for the child.

So what about Traces? After some investigation it appears that Traces are actually just 1 or more spans that are related:

“Traces in OpenTracing are defined implicitly by their Spans. In particular, a Trace can be thought of as a directed acyclic graph (DAG) of Spans, where the edges between Spans are References.”

Logically a span with no parent is the first (or only) span (source node) in a trace, and spans with no other spans referencing them are sink nodes.

Some bridges have multiple spans, so a journey from one side to the other can be viewed as a multi-span trace.

Millau Viaduct, multiple spans - Anomalia Machina 6

The Millau Viaduct in France has 8 spans. Each journey across the viaduct is a trace.

3 Tracing Anomalia Machina

On the application side you have to instrument your code to produce spans. See the guides for the language specific clients available and how to use them. Alternatively, you can use a contributed library which adds OpenTracing instrumentation to target frameworks. For cross process tracing you need to use a 3rd party library to inject/extract context across process boundaries. At last count there are 87 contributions including libraries for Spring, Elasticsearch, JDBC, Cassandra, and Kafka!

This documentation explains how to use the API to record traces for Java. The simplest way of creating new spans is to use the default scope manager. If there is a Scope, by default it will act as the parent to any newly started Span.  However, as the default relationship is CHILD_OF, and you also can’t pass a Scope across threads, I decided to use spans explicitly and create new spans manually with an explicit relationship to the parent span. Here’s some example code that mimics the Anomalia Machine pipeline and produces spans.

public class BlogTest {
                // Single tracer demo
                static Tracer tracer = initTracer("BlogTest");

                public static void main(String[] args)
                        int numTraces = 10;
                        for (int i=0; i < numTraces; i++)
                                // simulate a trace consisting of producer, consumer, and detector asynchronous spans
                                Span span1 = tracer.buildSpan("producer").start();
                                Span span2 = tracer.buildSpan("consumer").addReference(References.FOLLOWS_FROM, span1.context()).start();
                public static void producer()
                        try {
                                Thread.sleep(1 + (long)(Math.random()*10));
                        } catch (InterruptedException e) {
                public static void consumer()
                        try {
                                Thread.sleep(1 + (long)(Math.random()*10));
                        } catch (InterruptedException e) {
                public static void detector(Span parent)
                        // the detector span has the consumer as parent, and is asynchronous
                        Span spanDetector = tracer.buildSpan("detector").addReference(References.FOLLOWS_FROM, parent.context()).start();
                        // the detector span has 3 synchronous, i.e. CHILD_OF, spans.
                        Span spanWrite = tracer.buildSpan("cassandraWrite").addReference(References.CHILD_OF, spanDetector.context()).start();
                        Span spanRead = tracer.buildSpan("cassandraRead").addReference(References.CHILD_OF, spanDetector.context()).start();
                        Span spanAnomalyDetector = tracer.buildSpan("anomalyDetector").addReference(References.CHILD_OF, spanDetector.context()).start();
                public static void cassandraWrite()
                        try {
                                Thread.sleep(1 + (long)(Math.random()*10));
                        } catch (InterruptedException e) {
                public static void cassandraRead()
                        try {
                                Thread.sleep(1 + (long)(Math.random()*100));
                        } catch (InterruptedException e) {
                public static void anomalyDetector()
                        try {
                                Thread.sleep(5 + (long)(Math.random()*20));
                        } catch (InterruptedException e) {
            public static JaegerTracer initTracer(String service) {
                SamplerConfiguration samplerConfig = SamplerConfiguration.fromEnv().withType("const").withParam(1);
                ReporterConfiguration reporterConfig = ReporterConfiguration.fromEnv().withLogSpans(true);
                Configuration config = new Configuration(service).withSampler(samplerConfig).withReporter(reporterConfig);
                return config.getTracer();

Note that if you actually want to use the spans outside the process, you have to create a tracer using an actual tracer implementation (although there is a MockTracer API for testing). I picked Jaeger (but also tried Zipkin, they are similar in practice). Documentation on instantiating a Jaeger Tracer is here and here.

Tracers can have different architectures. Here’s the architecture of Jaeger. Jaeger is a distributed tracing system with a number of components.

Architecture of Jaeger - Anomalia Machina 6

Jaeger should scale well in a production environment due to using Cassandra for a scalable data/query store, and Spark for dependency analysis. However, for testing there’s a simple all-in-one executable with in memory only storage that I used (just remember that it doesn’t remember traces when you restart it). It’s interesting to note that there is a bi-directional flow between the jaeger-agent and the jaeger-client which provides an important feedback loop (e.g. for adaptive sampling), but which also makes it harder to use Kafka for span transport.

Once you’ve got everything working you can start Jaeger, run the instrumented code, and then see what’s happened in the Jaeger GUI (http://localhost:16686/search). You can select traces by Service name, operation, etc and filter by other parameters and then Find. You can select a trace from the results or on the timeline graph for more detail.

Trace selection - Anomalia Machina 6

Viewing a single trace gives insights into the total trace time, and the relationships and times of the spans. For this example the trace has 3 asynchronous spans: producer, consumer, and detector, and the detector span has 3 synchronous children: CassandraWrite, CassandraRead and AnomalyDetector.

Total trace time, and the relationships and times of the spans - Anomalia Machina 6

Jaeger also has tabs to Compare 2 traces, or view Dependencies. In theory this was supposed to build a dependency map from all the traces in memory (i.e. doesn’t depend on the search results), but it was initially just blank. I wondered if there was some configuration change required, or if I needed to run Spark to process the traces (Spark is used for production deployments). It turns out that (a) Jaeger only shows dependencies between spans produced by different services (i.e. tracers), and (b) only shows dependencies between synchronous (CHILD_OF) nodes. This took some time to debug as you need both tricks in place for it work. After enhancing the above code example with multiple tracers (one tracer per span), and using CHILD_OF instead of FOLLOWS_FROM, we get multi-coloured traces:

Multiple tracers - Anomalia Machina 6

And the dependencies view is now populated:

Dependencies view - Anomalia Machina 6

This is what we were hoping for, as is correctly shows the expected topology of the workflow, along with the number of spans observed.  Unfortunately there’s not much added information on this graph. Ideally there would be throughputs and average response times. Also the graph is computed from all the traces in memory, so you can’t select (e.g. a time window) or filter (e.g. a transaction type) a subset of traces. For more complex graphs it’s also useful to be able to aggregate or expand sub-graphs.

4 Tracing across process boundaries (Kafka producer to consumers)

So far all we demonstrated with OpenTracing is that we can produce a trace within a process, but there are lots of other tools that can do that.  We can also view the traces remotely from the application that produced them. For large scale distributed systems, what we really need is a way of tracing applications across boundaries: process, machine, and clouds. How can this be done? You need a way of finding the causal relationship between spans. One approach is use Big Data approaches to analyse the separate spans and infer the relationships between them. E.g. The Mystery Machine does this by forming theories about relationships and then trying to disprove them from the empirical data. Another approach is to inject metadata into the cross-process call flows in order to provide sufficient context to build traces across heterogeneous systems. In Facebook, Canopy is the successor to the Mystery Machine and takes this approach by propagating TraceIDs across process boundaries. This is also the approach taken by OpenTracing.  Implementations need to provide a way to inject and extract a spanContext across process boundaries. How they do this will depend on the protocol involved.

Mystery Machine - Anomalia Machina 6

Fortuitously, someone else has already developed a solution for OpenTracing Kafka clients.  On the Kafka producer side, it automatically inserts a span context into the Kafka headers. You just have to add a interceptor to the Kafka producer properties like this:

// Register tracer with GlobalTracer:

// Add TracingProducerInterceptor to sender properties:

// Instantiate KafkaProducer
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

// Send

On the Kafka consumer side, once you have got a single ConsumerRecord (record) to process, you extract the span context and create a new FOLLOWS_FROM span like this (I couldn’t get the helper function, TracingKafkaUtils.extractSpanContext() to work correctly – it kept returning a null context even though on inspection the Kafka headers appeared to have correct span context data):

SpanContext spanContext =  tracer.extract(Format.Builtin.TEXT_MAP, new MyHeadersMapExtractAdapter(record.headers(), false));

newSpan = tracer.buildSpan("consumer").addReference(References.FOLLOWS_FROM, spanContext).start();

But note that as FOLLOWS_FROM won’t produce a dependency map, you need to use CHILD_OF instead.

5 More interesting Kafka Example

The Anomalia Machina pipeline is relatively simple, so I wondered how well OpenTracing would work for discovering and visualising more complex Kafka topologies. For example, would it be possible to visualise the topology of data flow across many Kafka topics? I wrote a simple Markov chain simulator which allows you to choose the number of source topics, intermediate topics, and sink topics, and a graph density, and then produces random traces. The code is in this gist.

Here’s the dependency graph for a run of this code. In practice you would also want to add information about the Kafka producers and consumers (either as extra nodes, or by labelling the edges). There’s also a cool Force directed graph view which allows you to select a node and highlight the dependent nodes.

Dependency graph - Anomalia Machina 6

6 Further Resources

OpenTracing isn’t the only open tracing player. OpenCensus is an alternative. How do they compare?

The approach of having to instrument your code manually is time consuming, and fragile if the APIs change. Another approach is agent based (e.g. which most of the mainstream APM products, such as Dynatrace which uses bytecode injection). It looks like OpenCensus also has a Java agent.

You can’t swap tracers for different parts of the system, as the protocol is tracer implementation specific (c.f. OpenCensus).

There’s also a W3C trace context effort.

Alibaba cloud supports OpenTracing.

Some other useful OpenTracing resources are here, here and here.

This is a good comparison of Zipkin and Jaeger.

You may wonder if you really need both Prometheus and OpenTracing to monitor an application? In theory the OpenTracing spans actually have all of the information needed for monitoring application metrics (e.g. throughput and latency). So why not just collect spans once and use for multiple purposes including monitoring? Well, this is exactly what most commercial APMs do. And in theory (and possibly practice, I tried early on but didn’t exactly get it working) it’s possible to get OpenTracing data into Prometheus.

Finally, here’s the actual model used to produce the wire-frame images for “Escape From New York”, and the resulting movie sequence.

The post Anomalia Machina 6 – Application Tracing with OpenTracing: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra appeared first on Instaclustr.

Wide Partitions in Apache Cassandra 3.11

This article was originally published on Backblaze.


Wide Partitions in Cassandra can put tremendous pressure on the java heap and garbage collector, impact read latencies, and can cause issues ranging from load shedding and dropped messages to crashed and downed nodes.

While the theoretical limit on the number of cells per Partition has always been two billion cells, the reality has been quite different, as the impacts of heap pressure show. To mitigate these problems, the community has offered a standard recommendation for Cassandra users to keep Partitions under 400MB, and preferably under 100MB.

However, in version 3 many improvements were made that affected how Cassandra handles wide Partitions. Memtables, caches, and SSTable components were moved off-heap, the storage engine was rewritten in CASSANDRA-8099 and Robert Stupp made a number of other improvements listed under CASSANDRA-11206.

Working with Backblaze and operating a Cassandra version 3.11 cluster we had the opportunity to test and validate how Cassandra actually handles Partitions with this latest version. We will demonstrate that well designed data models can go beyond the existing 400MB recommendation without nodes crashing through heap pressure.

Below, we walk through how Cassandra writes Partitions to disk in 3.11, look at how wide Partitions impact read latencies, and then present our testing and verification of wide Partition impacts on the cluster, using the work we did with Backblaze.

The Art and Science of Writing Wide Partitions to Disk

First we need to understand what a Partition is and how Cassandra writes Partitions to disk in version 3.11.

Each SSTable contain a set of files, and the (–Data.db) file contains numerous Partitions.

The layout of a Partition in the –Data.db file has three components: a header, followed by zero or one static rows, which is followed by zero or more ordered Clusterable objects. The Clusterable object in this file may either be a Row or a RangeTombstone that deletes data with each wide Partition containing many Clusterable objects. For an excellent in-depth examination of this, see Aaron’s blog post on the Cassandra 3.x Storage Engine.

The –Index.db file stores offsets for the Partitions, as well as the IndexInfo serialized objects for each Partition. These indices facilitate locating the data on disk within the –Data.db file. Stored Partition offsets are represented by a subclass of the RowIndexEntry. This subclass is chosen by the the ColumnIndex and depends on the size of the partition:

  • RowIndexEntry is used when there are no Clusterable objects in the Partition, such as when there is only a static Row. In this case there are no IndexInfo objects to store and so the parent RowIndexEntry class is used rather than a subclass.

  • The IndexEntry subclass holds the IndexInfo objects in memory until the Partition has finished writing to disk. It is used in for Partitions where the total serialized size of the IndexInfo objects is less than the column_index_cache_size_in_kb configuration setting (which defaults to 2KB).

  • The ShallowIndexEntry subclass serializes IndexInfo objects to disk as they are created and references these objects using only their position in the file. This is used in Partitions where the total serialized size of the IndexInfo objects is more than the column_index_cache_size_in_kb configuration setting.

These IndexInfo objects provide a sampling of positional offsets for Rows within a Partition, creating an index. Each object specifies the offset the page starts at, the first Row and the last Row.

So, in general, the bigger the Partition, the more IndexInfo objects need to be created when writing to disk - and if they are held in memory until the Partition is fully written to disk they can cause memory pressure. This is why the column_index_cache_size_in_kb setting was added in Cassandra 3.6 and the objects are now serialized as they are created.

The relationship betweeen Partition size and the number of objects was quantified by Robert Stupp in his presentation, Myths of Big Partitions:

IndexInfo numbers from Robert Stupp

How Wide Partitions Impact Read Latencies

Cassandra’s key cache is an optimization that is enabled by default and helps to improve the speed and efficiency of the read path by reducing the amount of disk activity per read.

Each key cache entry is identified by a combination of the keyspace, table name, SSTable, and the Partition key. The value of the key cache is a RowIndexEntry or one of its subclasses - either IndexedEntry or the new ShallowIndexedEntry. The size of the key cache is limited by the key_cache_size_in_mb configuration setting.

When a read operation in the storage engine gets a cache hit it avoids having to access the –Summary.db and –Index.db SSTable components, which reduces that read request’s latency. Wide Partitions, however, can decrease the efficiency of this key cache optimization because fewer hot Partitions will fit into the allocated cache size.

Indeed, before the ShallowIndexedEntry was added in Cassandra version 3.6, a single wide Row could fill the key cache, reducing the hit rate efficiency. When applied to multiple Rows, this will cause greater churn of additions and evictions of cache entries.

For example, if the IndexEntry for a 512MB Partition contains 100K+ IndexInfo objects and if these IndexInfo objects total 1.4MB, then the key cache would only be able to hold 140 entries.

The introduction of ShallowIndexedEntry objects changed how the key cache can hold data. The ShallowIndexedEntry contains a list of file pointers referencing the serialized IndexInfo objects and can binary search through this list, rather than having to deserialize the entire IndexInfo objects list. Thus when the ShallowIndexedEntry is used no IndexInfo objects exist within the key cache. This increases the storage efficiency of the key cache in storing more entries, but does still require that the IndexInfo objects are binary searched and deserialized from the –Index.db file on a cache hit.

In short, on wide Partitions a key cache miss still results in two additional disk reads, as it did before Cassandra 3.6, but now a key cache hit incurs a disk read to the -Index.db file where it did not before Cassandra 3.6.

Object Creation and Heap Behavior with Wide Partitions in 2.2.13 vs 3.11.3

Introducing the ShallowIndexedEntry into Cassandra version 3.6 creates a measurable improvement in the performance of wide Partitions. To test the effects of this and the other performance enhancement features introduced in version 3 we compared how Cassandra 2.2.13 and 3.11.3 performed when inserting one hundred thousand, one million, or ten million Rows were each written to a single Partition.

The results and accompanying screenshots help illustrate the impact of object creation and heap behavior when inserting Rows into wide Partitions. While version 2.2.13 crashed repeatedly during this test, 3.11.3 was able to write over 30 million Rows to a single Partition before Cassandra Out-of-Memory crashed. The test and results are reproduced below.

Both Cassandra versions were started as single-node clusters with default configurations, excepting heap customization in the cassandra–


In Cassandra only the configured concurrency of memtable flushes and compactors determines how many Partitions are processed by a node and thus pressuring its heap at any one time. Based on this known concurrency limitation, profiling can be done by inserting data into one Partition against one Cassandra node with a small heap. These results extrapolate to production environments.

The tlp-stress tool inserted data in three separate profiling passes against both versions of Cassandra, creating wide Partitions of one hundred thousand (100K), one million (1M), or ten million (10M) Rows.

A tlp-stress profile for wide Partitions was written, as no suitable profile existed. The read to write ratio used the default setting of 1:100.

The following command lines then implemented the tlp-stress tool:

# To write 100000 rows into one partition
tlp-stress run Wide --replication "{'class':'SimpleStrategy','replication_factor': 1}" -n 100K

# To write 1M rows into one partition
tlp-stress run Wide --replication "{'class':'SimpleStrategy','replication_factor': 1}" -n 1M

# To write 10M rows into one partition
tlp-stress run Wide --replication "{'class':'SimpleStrategy','replication_factor': 1}" -n 10M

Each time tlp-stress executed it was immediately followed by a command to ensure the full count of specified Rows passed through the memtable flush and were written to disk:

    nodetool flush

The graphs in the sections below, taken from the Apache NetBeans Profiler, illustrate how the ShallowIndexEntry in Cassandra version 3.11 avoids keeping IndexInfo objects in memory.

Notably, the IndexInfo objects are instantiated far more often, but are referenced for much shorter periods of time. The Garbage Collector is more effective at removing short-lived objects, as illustrated by the GC pause times being barely present in the Cassandra 3.11 graphs compared to Cassandra 2.2 where GC pause times overwhelm the JVM.

Wide Partitions in Cassandra 2.2

Benchmarks were against Cassandra 2.2.13

One Partition with 100K Rows (2.2.13)

The following three screenshots shows the number of IndexInfo objects instantiated during the write benchmark, during compaction, and a heap profile.

The partition grew to be ~40MB.

Objects created during tlp-stress Objects created during tlp-stress

Objects created during subsequent major compaction Objects created during subsequent major compaction

Heap profiled during tlp-stress and major compaction Heap profiled during tlp-stress and major compaction

The above diagrams do not have their x-axis expanded to the full width, but still encompass the startup, stress test, flush, and compaction periods of the benchmark.

When stress testing starts with tlp-stress the CPU Time and Surviving Generations starts to climb. During this time the heap also starts to increase and decrease more frequently as it fills up and then the Garbage Collector cleans it out. In these diagrams the garbage collection intervals are easy to identify and isolate from one another.

One Partition with 1M Rows (2.2.13)

Here, the first two screenshots show the number of IndexInfo objects instantiated during the write benchmark and during the subsequent compaction process. The third screenshot shows the CPU & GC Pause Times and the heap profile from the time writes started through when the compaction was completed.

The partition grew to be ~400MB.

Already at this size the Cassandra JVM is GC thrashing and has occasionally Out-of-Memory crashed.

Objects created during tlp-stress Objects created during tlp-stress

Objects created during subsequent major compaction Objects created during subsequent major compaction

Heap profiled during tlp-stress and major compaction Heap profiled during tlp-stress and major compaction

The above diagrams display a longer running benchmark, with the quiet period during the startup barely noticeable on the very left-hand side of each diagram. The number of garbage collection intervals and the oscillations in heap size are far more frequent. The GC Pause Time during the stress testing period is now consistently higher and comparable to the CPU Time. It only dissipates when the benchmark performs the flush and compaction.

One Partition with 10M Rows (2.2.13)

In this final test of Cassandra version 2.2.13, the results were difficult to reproduce reliably, as more often than not this test Out-of-Memory crashed from GC heap pressure.

The first two screenshots show the number of IndexInfo objects instantiated during the write benchmark and during the subsequent compaction process. The third screenshot shows the GC Pause Time and the heap profile from the time writes started until compaction was completed.

The partition grew to be ~4GB.

Objects created during tlp-stress Objects created during tlp-stress

Objects created during subsequent major compaction Objects created during subsequent major compaction

Heap profiled during tlp-stress and major compaction Heap profiled during tlp-stress and major compaction

The above diagrams display consistently very high GC Pause Time compared to CPU Time. Any Cassandra node under this much duress from garbage collection is not healthy. It is suffering from high read latencies, could become blacklisted by other nodes due to its lack of responsiveness, and even crash altogether from Out-of-Memory errors (as it did often during this benchmark).

Wide Partitions in Cassandra 3.11.3

Benchmarks were against Cassandra 3.11.3

In this series, the graphs demonstrate how IndexInfo objects are created either from memtable flushes or from deserialization off disk. The ShallowIndexEntry is used in Cassandra 3.11.3 when deserializing the IndexInfo objects from the -Index.db file.

Neither form of IndexInfo objects reside long in the heap and thus the GC Pause Time is barely visible in comparison to Cassandra 2.2.13 despite the additional numbers of IndexInfo objects created via deserialization.

One Partition with 100K Rows (3.11.3)

As with the earlier version test of this size, the following two screenshots shows the number of IndexInfo objects instantiated during the write benchmark and during the subsequent compaction process. The third screenshot shows the CPU & GC Pause Time and the heap profile from the time writes started through when the compaction was completed.

The partition grew to be ~40MB, the same as with Cassandra 2.2.13

Objects created during tlp-stress Objects created during tlp-stress

Objects created during subsequent major compaction Objects created during subsequent major compaction

Heap profiled during tlp-stress and major compaction Heap profiled during tlp-stress and major compaction

The diagrams above are roughly comparable to the first diagrams presented under Cassandra 2.2.13, except here the x-axis is expanded to full width. Note there are significantly more instantiated IndexInfo objects, but barely any noticeable GC Pause Time.

One Partition with 1M Rows (3.11.3)

Again, the first two screenshots show the number of IndexInfo objects instantiated during the write benchmark and during the subsequent compaction process. The third screenshot shows the CPU & GC Pause Time and the heap profile over the time writes started until the compaction was completed.

The partition grew to be ~400MB, the same as with Cassandra 2.2.13

Objects created during tlp-stress Objects created during tlp-stress

Objects created during subsequent major compaction Objects created during subsequent major compaction

Heap profiled during tlp-stress and major compaction Heap profiled during tlp-stress and major compaction

The above diagrams show a wildly oscillating heap as many IndexInfo objects are created, and shows many garbage collection intervals, yet the GC Pause Time remains low, if at all noticeable.

One Partition with 10M Rows (3.11.3)

Here again, the first two screenshots show the number of IndexInfo objects instantiated during the write benchmark and during the subsequent compaction process. The third screenshot shows the CPU & GC Pause Time and the heap profile over the time writes started until the compaction was completed.

The partition grew to be ~4GB, the same as with Cassandra 2.2.13

Objects created during tlp-stress Objects created during tlp-stress

Objects created during subsequent major compaction Objects created during subsequent major compaction

Heap profiled during tlp-stress and major compaction Heap profiled during tlp-stress and major compaction

Unlike this profile in 2.2.13, the cluster remains stable as it was when running 1M Rows per Partition. The above diagrams display an oscillating heap when IndexInfo objects are created, and many garbage collection intervals, yet GC Pause Time remains low, if at all noticeable.

Maximum Rows in 1GB Heap (3.11.3)

In an attempt to push Cassandra 3.11.3 to the limit, we ran a test to see how much data could be written to a single Partition before Cassandra Out-of-Memory crashed.

The result was 30M+ rows, which is ~12GB of data on disk.

This is similar to the limit of 17GB of data written to a single Partition as Robert Stupp found in CASSANDRA-9754 when using a 5GB java heap.

Maximum Rows in 1GB Heap

What about Reads

The following graph reruns the benchmark on Cassandra version 3.11.3 over a longer period of time with a read to write ratio of 10:1. It illustrates that reads of wide Partitions do not create the heap pressure that writes do. Reading from wide Partitions


While the 400MB community recommendation for Partition size is clearly appropriate for version 2.2.13, version 3.11.3 shows that performance improvements have created a tremendous ability to handle wide Partitions and they can easily be an order of magnitude larger than earlier versions of Cassandra without nodes crashing through heap pressure.

The trade-off for better supporting wide Partitions in Cassandra 3.11.3 is increased read latency as Row offsets now need to be read off disk. However, modern SSDs and kernel pagecaches take advantage of larger configurations of physical memory providing enough IO improvements to compensate for the read latency trade-offs.

The improved stability and falling back on better hardware to deal with the read latency issue allows Cassandra operators to worry less about how to store massive amounts of data in different schemas and unexpected data growth patterns on those schemas.

Come CASSANDRA-9754 custom B+ tree structures will be used to more effectively lookup the deserialised Row offsets and further avoid the deserialization and instantiation of short-lived unused IndexInfo objects.

Backblaze logo

Advanced Node Replace

Instaclustr has a number of internal tools and procedures that help us keep Cassandra clusters healthy. One of those tools allows us to replace the instance backing a Cassandra node while keeping the IP’s and data. Dynamic Resizing released in June 2017 uses technology developed for that tool to allow customers to scale Cassandra clusters vertically based on demand.

Initially, the replace tool operated by detaching the volumes from the instance being replaced, then re-attaching the volumes to the new instance. This limited the usage of the tool to EBS-backed instances. Another often-requested extension to the tool was resizing a Data Centre to a different node size to upgrade to a newly added node size, or to switch over to the resizable class nodes.

One option for changing instance size where we could not just detach and reattach data volumes was to use Cassandra’s native node replace functionality to replace each instance in the cluster in a rolling fashion. At first, this approach seems attractive and can be conducted with zero downtime. However, quite some time ago we realised that, unless you run a repair between each replacement, this approach has almost certain loss of a small amount of data when any replace operation exceeds the hinted hand-off window. As a result, we relied on fairly tedious and complex methods of rolling upgrades involving attaching and re-attaching EBS volumes.

To address this problem, we have recently extended the replace tool to remove these limitations and support the complete rolling replace  use case. The new “copy data” replace mode replaces a node in the following stages:

  1. Provision the new node of the desired size
  2. Copy most of the data from the old node to the new node
  3. Stop the old node
  4. Reallocate IPs
  5. Join the replacement node to the cluster

Provisioning is trivial with our powerful provisioning system, but copying large amounts from a live node presents some specific challenges.  We had to develop a solution which was able to copy large amounts of data from a live node without creating too much additional load on a cluster that might already be under stress.  We also had to work carefully within constraints created by Cassandra’s hinted handoff system.

We explored a number of solutions to the problem of copying data to the new node while minimising the impact to the running nodes.  After discarding several alternatives, we settled on a solution built on Instaclustr’s existing, proven backup/restore system. This ensures minimal resource strain on the node being replaced as we only need to copy the data added since the last backup was taken and most of the data is already stored in the cloud storage.

Stopping the old node ensuring that no data is lost requires stopping Cassandra and uploading the data that has been written since the previous step. This process usually completes within 10 minutes, ensuring a minimal degradation of cluster performance.

After all of the data is on the new node, the old node is terminated, its public and private IP’s are transferred to the new node, and Cassandra is started on the new node. As the replacement node joins, it receives the data that was missed during the short downtime as hinted handoffs.

The new solution has allowed us to standardise our approach to node replacement for all instance types (local and external storage) using the proven technology of our Cassandra backup system to improve the overall performance of the process. At the moment, this resize functionality is controlled by our administrators and can be requested by customers via our support channel. We will likely make the functionality available directly to users in the future.

The post Advanced Node Replace appeared first on Instaclustr.

Anomalia Machina 5 – Application Monitoring with Prometheus: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra

1 Introduction

In order to scale Anomalia Machina we plan to run the application (load generator and detector pipeline) on multiple EC2 instances. We are working on using Kubernetes (AWS EKS) to automate this, and progress so far is described in this webinar. However, before we can easily run a Kubernetes deployed application at scale we need to be able to collect and view application specific metrics from distributed instances. In this blog we’ll explore how an Open Source monitoring tool called Prometheus could do this.

Anomalia Machina 5 - Frankenstein - Modern Prometheus

Exactly two hundred years ago, Mary Shelley’s 1818 “Frankenstein” was oddly subtitled “The Modern Prometheus”. Who was the original Prometheus?  Prometheus was one of the Greek Titans who was responsible for creating and giving gifts to creatures. By the time he got around to people the supply of approved gifts (e.g. fur, claws, feathers, wings, etc) had run out, so Prometheus decided to steal the sacred fire from Mount Olympus so humans could keep warm (and eat cooked meat, etc). There is a giant mosaic on the Melbourne fire brigade building showing Prometheus stealing fire from Zeus:

Anomalia Machine 5 - Prometheus stealing fire from Zeus

This didn’t turn out so well for either Prometheus (who was punished by being chained to a rock and attacked by a liver-eating eagle every day), or humans (who were given another “gift”, Pandora – don’t open the box!). Here’s the panel to the right of the the above mosaic showing the destructive side of fire resulting from Pandora’s curiosity:

Anomalia Machina 5 - destructive side of fire resulting from Pandora’s curiosity

So Frankenstein and Prometheus are both responsible for creating monsters.  But what does Prometheus have to do with monitoring? Well, Prometheus didn’t really steal fire, as the original was still there. He just copied it.  This is like monitoring, as we don’t want to impact the system being monitored, just gain insight into it at a distance.

2 Prometheus Overview

Prometheus is a popular open source monitoring and alerting system. It was developed originally by SoundCloud, made Open Source and then in 2016 was accepted as the 2nd project in the Cloud Native Computing Foundation (CNCF).

It is intended for monitoring both applications and servers. Here’s what the Prometheus architecture looks like:

Anomalia Machina 5 - Prometheus Architecture Diagram

2.1 Prometheus Components

The main components are the Prometheus server (responsible for service discovery, retrieving metrics from monitored applications, storing metrics, and analysis of time series data with PromQL, a query language), a metrics data model, a built in simple graphing GUI, and native support for Grafana. There’s also an optional alert manager (with alerts defined by the query language), and an optional push gateway (for short lived application monitoring).

Application monitoring tools typically take one of three approaches to capturing metrics: (1) Instrumentation – special custom code has to be added to the source code of the application being monitored, (2) Agents – special general purpose code is added to the application environment, which automatically captures standard application metrics, or (3) Spying – which relies on either interceptors or network taps to observe calls or data flow between systems.

Prometheus allows for a combination of (1) Instrumentation and (2) Agents (which are called Exporters). Instrumentation requires access to the source code, but does allow for the capture of custom metrics, and is programming language agnostic.

There are client libraries for instrumenting applications in many programming languages: with four officially supported client libraries (Go, Java/Scala, Python, Ruby); many unofficial libraries (anyone for LISP?); or you can write your own.

The Exporters allow for automatic instrumentation of supported third-party software, and lots are available for databases, hardware, messaging systems, storage, HTTP, cloud APIs, logging, monitoring and more. The JMX exporter exposes metrics for JVM-based software. Some software exposes Prometheus metrics directly so no exporters are needed.

There is a node exporter for monitoring host hardware and kernel metrics, and the Java client includes collectors for garbage collection, memory pools, JMX, classloading, and thread counts. These can be added individually or use DefaultExports.initialize(); to conveniently register them all.

Anomalia Machina 5 - Way of instrumenting and monitoring brains

2.2 What Prometheus doesn’t do

Prometheus does one thing well – metrics monitoring. Prometheus is therefore not a traditional complete Application Performance Management (APM) solution (c.f. Dynatrace, AppDynamics, etc) as it is focussed on server side metrics collection, graphing and alerting. For example, it doesn’t do distributed call tracing or service topology discovery and visualisation, performance analytics, or End User Experience Monitoring (EUEM, but see this github extension which pushes metrics from browsers to Prometheus).

It also doesn’t do active control, as information flow is strictly one way.  Previous R&D such as the Open Geospatial Consortium (OGC) Sensor Web Enablement, SensorML, and extensions I was involved with for graphing sensor data in a web client over XMPP (CSIRO Sensor and Sensor Networks Research Progress Report July 2006 to June 2007, pp49-50), and the earlier work on Web Services Grid Computing (e.g. the Globus Toolkit and WSRF), were more grandiose in scope and provided a rich semantics and standards for services based management of both sensors and actuators.

Prometheus runs as only a single server by default, but Prometheus can scale using federations of servers – Not to be confused with the musical Prometheus Scale invented by Scriabin!

Anomalia Machina - The Prometheus Scale

The Prometheus Scale

2.3 The Prometheus Data Model

Anomalia Machina - Prometheus Data Model

A time series of 420,000 years of Antarctic ice core data

Prometheus metrics consist of time series data. These are samples of a timestamp (with millisecond precision), and a value (64 bit float).  Every metric has a name and set of key:value pairs (labels). The metric name is a String, and by convention includes the name of the thing being monitored, the logical type, and units. E.g. http_requests_total, which could have labels for “method” (“GET”, “PUT”) and “handler” (E.g. “/login”, “/search” etc). Note that Prometheus automatically adds some labels to metrics,

  • job: The configured job name that the target belongs to.
  • instance: The <host>:<port> part of the target’s URL that was scraped.

Note that as units are not explicit, if you need to convert from one unit to another, you will need to do this manually (and carefully!) in the query language. We’ll come to scaping momentarily.

2.4 The Prometheus Metric Types

Prometheus provides four metric types as follows.

A Counter is used for increasing values only. A Gauge can go up and down, and is useful for the current value of something or counts that can increase and decrease. A Histogram samples observations (e.g. request durations or response sizes) and counts them in configurable buckets. It also provides a sum of all observed values. Similar to a histogram, a summary samples observations (e.g. request durations and response sizes). While it also provides a total count of observations and a sum of all observed values, it calculates configurable quantiles over a sliding time window.

3 Prometheus Example

For Anomalia Machina, what exactly do we want to monitor?  Here’s the functional architecture diagram from previous blogs:

Anomalia Machina 5 - Functional Architecture Prototype

For the application itself, we want to monitor “generic” metrics such as throughput (TPS) and response times (s) for the Kafka load generator (Kafka producer), the Kafka consumer, and the Cassandra client (the anomaly detector).  We also want to monitor some application specific metrics such as the number of rows returned for each Cassandra read, and the number of anomalies detected. It would also be nice to monitor hardware metrics for each AWS EC2 instance the application is running on (e.g. CPU), and eventually the Kafka and Cassandra metrics as well, so we can have all the monitoring in one place.

For simplicity and initial testing we’ll start with a dummy pipeline with 3 methods (producer, consumer, detector).   We use a Counter with the name “prometheusTest_requests_total” to measure the number of times each pipeline stage is successfully executed, and a label called “stage” to distinguish the different stage counts (with “total” used for the total pipeline count). A second counter with the name “prometheusTest_anomalies_total” is used to keep track of the number of anomalies detected. A Gauge (which can go up and down) with the name “prometheusTest_duration_seconds” is used to record the duration of each stage in seconds (using a label called “stage” to distinguish the stages, and “total” for total pipeline duration). The methods are instrumented to increment the counter metrics after each successful stage execution or anomaly detected (using the inc() method), and time stages and set the value of the gauge metric (using the setToTime() method). Here’s the example code:


import io.prometheus.client.Counter;

import io.prometheus.client.Gauge;

import io.prometheus.client.exporter.HTTPServer;

import io.prometheus.client.hotspot.DefaultExports;



// Demo of how we plan to use Prometheus Java client to instrument Anomalia Machina.

// Note that the Anomalia Machina application will have Kafka Producer and Kafka consumer and rest of pipeline running in multiple separate processes/instances.

// So metrics from each will have different host/port combinations.

public class PrometheusBlog {  

static String appName = "prometheusTest";

// counters can only increase in value (until process restart)

// Execution count. Use a single Counter for all stages of the pipeline, stages are distinguished by labels

static final Counter pipelineCounter =

    .name(appName + "_requests_total").help("Count of executions of pipeline stages")



// in theory could also use pipelineCounter to count anomalies found using another label

// but less potential for confusion having another counter. Doesn't need a label

static final Counter anomalyCounter =

    .name(appName + "_anomalies_total").help("Count of anomalies detected")


// A Gauge can go up and down, and is used to measure current value of some variable.

// pipelineGauge will measure duration in seconds of each stage using labels.

static final Gauge pipelineGauge =

    .name(appName + "_duration_seconds").help("Gauge of stage durations in seconds")




public static void main(String[] args) {

// Allow default JVM metrics to be exported



   // Metrics are pulled by Prometheus, create an HTTP server as the endpoint

   // Note if there are multiple processes running on the same server need to change port number.

   // And add all IPs and port numbers to the Prometheus configuration file.

HTTPServer server = null;

try {

server = new HTTPServer(1234);

} catch (IOException e) {



// now run 1000 executions of the complete pipeline with random time delays and increasing rate

int max = 1000;

for (int i=0; i < max; i++)


// total time for complete pipeline, and increment anomalyCounter

pipelineGauge.labels("total").setToTime(() -> {



if (detector());


// total pipeline count


System.out.println("i=" + i);


// increase the rate of execution

try {


} catch (InterruptedException e) {






// the 3 stages of the pipeline, for each we increase the stage counter and set the Gauge duration time

public  static void producer() {

class Local {};

String name = Local.class.getEnclosingMethod().getName();

pipelineGauge.labels(name).setToTime(() -> {

try {

Thread.sleep(1 + (long)(Math.random()*20));

} catch (InterruptedException e) {






public  static void consumer() {

class Local {};

String name = Local.class.getEnclosingMethod().getName();

pipelineGauge.labels(name).setToTime(() -> {

try {

Thread.sleep(1 + (long)(Math.random()*10));

} catch (InterruptedException e) {






// detector returns true if anomaly detected else false

public  static boolean detector() {

class Local {};

String name = Local.class.getEnclosingMethod().getName();

pipelineGauge.labels(name).setToTime(() -> {

try {

Thread.sleep(1 + (long)(Math.random()*200));

} catch (InterruptedException e) {





return (Math.random() > 0.95);




So, now that we’ve instrumented the sample code how do we run Prometheus, and how does Prometheus actually get the metric values from the code? As hinted above, unlike many enterprise APM solutions which have metrics pushed to them, Prometheus gets metrics by Polling (or “scraping”) the instrumented code.  In Prometheus this just means having an HTTP server running in your application code. In the above code we created a HTTP server on port 1234 to allow Prometheus to scrape the metrics. The Prometheus getting started guide provides simple instructions on downloading and running Prometheus.

The only things remaining are Maven dependencies:

<!-- The client -->






<!-- Hotspot JVM metrics-->






<!-- Exposition HTTPServer-->






<!-- Pushgateway exposition-->







And finally telling Prometheus where to scrape from.  For simple deployments and testing this information can be added to the configuration file (the default file is prometheus.yml):


 scrape_interval:     15s # By default, scrape targets every 15 seconds.


# scrape_configs has jobs and targets to scrape for each.


 # job 1 is for testing prometheus instrumentation from multiple application processes.

 # The job name is added as a label job=<job_name> to any timeseries scraped from this config.

 - job_name: 'testprometheus'


   # Override the global default and scrape targets from this job every 5 seconds.

   scrape_interval: 5s


   # this is where to put multiple targets, e.g. for Kafka load generators and detectors


     - targets: ['localhost:1234', 'localhost:1235']


 # job 2 provides operating system metrics (e.g. CPU, memory etc).

 - job_name: 'node'


  # Override the global default and scrape targets from this job every 5 seconds.

   scrape_interval: 5s



     - targets: ['localhost:9100']


In this file you’ll also notice a job called ‘node’ with a port 9100. This requires the Prometheus node exporter to be downloaded and run on the server the application is running on in order to provide node metrics.

Polling for metrics has some pros and cons. Polling too frequently may overload the applications, but polling too infrequently may result in unacceptable lags between events occuring and being detected.  However, it makes for a very loosely coupled and robust system as the applications can run without Prometheus, and Prometheus will continue to try to poll an application that is temporarily unavailable until it is available again. You can even have multiple Prometheus servers polling the same application. If you can’t Poll application metrics for any reason, or the application is highly transient, then Prometheus also offers a push gateway instead.

Anomalia Machina 5 - Sisyphus pushing a boulder up a hill

Sisyphus pushing a boulder up a hill (and repeat)

3.1 Initial Results

What do we see on Prometheus? Not much. Unlike commercial APM tools there are no default dashboards, so you have to create graphs from scratch. This is where expressions come in.  You can view and select metric names from a scroll down menu (or in a browser using http://localhost:9090/metrics) and enter them into the expression box and then execute them. Most of the time you’ll get an error message and have to fix something up. Results can be viewed in a table, or in some cases graphed (as only some result types can be graphed). And by default expressions only go back 5 minutes to find data, if there isn’t any then you get an error.  Note that Prometheus monitors itself, so if you don’t an instrumented application to monitor you can still try out Prometheus.

How do we graph our example data? If you graph a counter you’ll just see an increasing line.

Anomalia Machina 5 - Graph a counter

How do you turn a counter into a rate graph? Simple, with the irate or rate function.

Anomalia Machina 5 - irate or rate function

Here’s the graph of pipeline stage durations, which doesn’t need a rate function as it’s a Gauge not a Counter:

Anomalia Machina 5 - Graph of Pipeline stage duration

The inbuilt Prometheus graphing is limited, and you can’t graph multiple metrics on the same graph. This is where Grafana comes in. Grafana has inbuilt support for Prometheus and is recommended to be used for any serious graphs.  

Once you’ve installed it and have it running, open your browser and go to http://localhost:3000/.  There’s good documentation on both the prometheus and grafana sites on how to use Grafana with Prometheus. Create a Prometheus data source and then add a Prometheus graph. You enter a Prometheus expression as usual.

This graph shows both duration and rate metrics on the same graph.

Anomalia Machina 5 - Duration and rate metrics

A handy hint: If you can’t see anything on the graph it’s likely that you are looking at the wrong time range! A quick fix is to use a “Quick range”, the “Last 5 minutes” is good.

Also note that you can have rules that precompute rates to potentially speed aggregation up.

We can also graph node metrics such as CPU utilisation. This blog has a good explanation.

For example, to compute CPU Utilisation as a percentage use this expression:

100 - (avg by (instance)

(irate(node_cpu_seconds_total{job="node",mode="idle"}[5m])) * 100)

4 What next?

In this blog we’ve seen how Prometheus can be used to monitor an example application. Next we plan to try it out on the actual Anomalia Machine application code, deployed in a production-like environment. This has a few challenges including (1) How will Prometheus discover the services to monitor when there are lots of them, and if they are running in Kubernetes, and potentially highly transient? (2) How can we also get end-to-end call traces and dependency diagrams? Possibly OpenTracing (3) Where should run the Prometheus server? And (4) how can we also include Instaclustr Kafka and Cassandra cluster metrics in Prometheus?

Finally, a cool thing I found is this Instaclustr Cassandra Exporter for Prometheus. It’s really well documented and is a good starting point to understand Prometheus further, and would be an ideal of way of integrating Cassandra metrics from a self-managed cluster into your application monitoring using Prometheus.

If you are interested in using Prometheus for monitoring the Instaclustr managed Cassandra service, there is a 3rd party Prometheus metrics exporter which works with the Instaclustr monitoring API.

To find out more about our Managed Platform for Open Source Technologies, contact us or sign up for a free trial.

The post Anomalia Machina 5 – Application Monitoring with Prometheus: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra appeared first on Instaclustr.

Rolling Reboots with cstarpar

Welcome to the third post in our cstar series. So far, the first post gave an introduction to cstar, while the second post explained how to extend cstar with custom commands. In this post we will look at cstar’s cousin cstarpar. Both utilities deliver the same topology-aware orchestration, yet cstarpar executes commands locally, allowing operations cstar is not capable of.

Using ssh

cstarpar relies heavily on ssh working smoothly and without any user prompts. When we run a command with cstar, it will take the command, ssh into the remote host, and execute the command on our behalf. For example, we can run hostname on each node of a 3-node cluster:

$ cstar run --seed-host --command hostname
$ cat ~/.cstar/jobs/8ff6811e-31e7-4975-bec4-260eae885ef6/ec2-*/out

If we switch to cstarpar, it will execute the hostname command locally and we will see something different:

$ cstarpar --seed-host hostname
$ cat ~/.cstar/jobs/a1735406-ae58-4e44-829b-9e8d4a90fd06/ec2-*/out

To make cstarpar execute commands on remote machines we just need to make the command explicitly use ssh:

$ cstarpar --seed-host "ssh {} hostname"
cat ~/.cstar/jobs/2c54f7a1-8982-4f2e-ada4-8b45cde4c4eb/ec2-*/out

Here we can see the hostname was executed on the remote hosts.


The true advantage of local execution is that there is no need for interaction with the remote host. This approach allows operations that would normally prevent that interaction, such as reboots. For example, the following command reboots the entire cluster in a topology-aware fashion, albeit very roughly because it gracelessly kills all processes, including Cassandra:

$ cstarpar --seed-host -- "ssh {} sudo reboot &"

Note that this example used the sudo reboot & command. The reboot command on its own causes the reboot immediately. This is so drastic that it causes Python’s subprocess module to think an error occured. Placing the & after the command, directing to run the command in the background, allows the shell execution return back to Python cleanly. Once the host is down, cstarpar will mark the host as such in the job status report.

It is important to ensure the hosts are configured to start the Cassandra process automatically after the reboot, because just like cstar, cstartpar will proceed with next hosts only if all hosts are up and will otherwise wait indefinitely for the rebooted host to come back.

Since cstarpar can execute local commands and scripts, it need not support complex commands in the same way cstar does. To run a complex command with cstarpar, we can use a script file. To illustrate this, the script below will add a graceful shutdown of Cassandra before executing the actual reboot:

$ cat ~/

echo "Draining Cassandra"
ssh ${FQDN} nodetool drain && sleep 5

echo "Stopping Cassandra process"
ssh ${FQDN} sudo service cassandra stop && sleep 5

echo "Rebooting"
ssh ${FQDN} sudo reboot &

The reboot command then runs like this:

$ cstarpar --seed-host -- "bash /absolute/path/to/ {}"

Replication and Conclusion

For this post, I used a simple three node cluster provisioned with tlp-cluster. cstarpar relies heavily on ssh working smoothly and without user prompts. Initially, I attempted the connection without any specific ssh configuration on my laptop or the AWS hosts, the ssh calls looked like this:

$ cstarpar --seed-host ${SEED_IP} --ssh-identity-file=${PATH_TO_KEY}  --ssh-username ubuntu "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@{} hostname"

In the I also had to add some options:

ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i ${PATH_TO_KEY} ubuntu@${FQDN} sudo reboot &

Once configured, I was able to harness the full power of cstarpar, which supplements cstar functionality by executing commands locally. This was demonstrated to be useful for operations for which the cstar’s mode of operation is not well suited, such as reboots. Importantly, to leverage the most value from cstarpar, it is critical to have ssh configured to run smoothly and without any user prompts.

Step-by-bstep monitoring Cassandra with with Prometheus and Grafana

In this blog, I’m going to give a detailed guide on how to monitor a Cassandra cluster with Prometheus and Grafana.

For this, I’m using a new VM which I’m going to call “Monitor VM”. In this blog post, I’m going to work on how to install the tools. In a second one, I’m going to go through the details on how to do use and configure Grafana dashboards to get the most out of your monitoring!

High level plan

Monitor VM

  1. Install Prometheus
  2. Configure Prometheus
  3. Install Grafana

Cassandra VMs

  1. Download prometheus JMX-Exporter
  2. Configure JMX-Exporter
  3. Configure Cassandra
  4. Restart Cassandra

Detailed Plan

Monitor VM

Step 1. Install Prometheus

$ wget
  $ tar xvfz prometheus-*.tar.gz
  $ cd prometheus-*

Step 2. Configure Prometheus

        $ vim /etc/prometheus/prometheus.yaml
    scrape_interval: 15s

  # Cassandra config
    - job_name: 'cassandra'
      scrape_interval: 15s
        - targets: ['cassandra01:7070', 'cassandra02:7070', 'cassandra03:7070']

Step 3. Create storage and start Prometheus

  $ mkdir /data
  $ chown prometheus:prometheus /data
  $ prometheus --config.file=/etc/prometheus/prometheus.yaml

Step 4. Install Grafana

  $ wget
  $ sudo apt-get install -y adduser libfontconfig
  $ sudo dpkg -i grafana_5.1.4_amd64.deb

Step 5. Start Grafana

  $ sudo service grafana-server start

Cassandra nodes

Step 1. Download JMX-Exporter:

  $ mkdir /opt/jmx_prometheus
  $ wget

Step 2. Configure JMX-Exporter

  $ vim /opt/jmx_prometheus/cassandra.yml
  lowercaseOutputName: true
  lowercaseOutputLabelNames: true
  whitelistObjectNames: [
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(Connection|Streaming), scope=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Value)
      name: cassandra_$1_$3
        address: "$2"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(ColumnFamily), name=(RangeLatency)&amp;gt;&amp;lt;&amp;gt;(Mean)
      name: cassandra_$1_$2_$3
    - pattern:;lt;type=(FailureDetector)&amp;gt;&amp;lt;&amp;gt;(DownEndpointCount)
      name: cassandra_$1_$2
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(Keyspace), keyspace=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Mean|95thPercentile)
      name: cassandra_$1_$3_$4
        "$1": "$2"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(Table), keyspace=(\S*), scope=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Mean|95thPercentile)
      name: cassandra_$1_$4_$5
        "keyspace": "$2"
        "table": "$3"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(ClientRequest), scope=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Mean|95thPercentile)
      name: cassandra_$1_$3_$4
        "type": "$2"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(\S*)(?:, ((?!scope)\S*)=(\S*))?(?:, scope=(\S*))?,
      name: cassandra_$1_$5
        "$1": "$4"
        "$2": "$3"

Step 3. Configure Cassandra

  echo 'JVM_OPTS="$JVM_OPTS -javaagent:/opt/prometheus-exporter/jmx_prometheus_javaagent-0.3.0.jar=7070:/opt/prometheus-exporter/cassandra.yaml"' &amp;gt;&amp;gt; conf/

Step 4. Restart Cassandra

  $ nodetool flush
  $ nodetool drain
  $ sudo service cassandra restart

And now, if you have no errors (and you shouldn’t!) your Prometheus is ingesting your Cassandra metrics!

Wait for the next blog post where I will guide you through a good Grafana configuration!