Making a Scalable and Fault-Tolerant Database System: Partitioning and Replication

Note to readers: this blog entry uses JavaScript to render LaTeX equations, make sure it’s enabled.

Today’s databases commonly store terabytes to petabytes of data and handle hundreds of thousands to millions of client requests per second. A single server machine is not sufficient to handle such workloads. How do we make a database that doesn’t collapse under its own weight? How do we make a scalable system?

But there’s more: modern databases are fault tolerant. Even if a piece of the system fails, the system still runs. How is that done? And how is this related scalability?


In our quest to make a scalable system we’ll first identify a logical unit of data which we’ll call a partition. Partitions shall be independent pieces that can be distributed across multiple machines.

Consider a key-value store. It is a type of database with a simple interface: to the user, it looks like a set of (key, value) pairs, e.g.:

key value
“k1” “aaa”
“k2” “bbb”
“k3” “ccc”

The store might support operations such as writing a value under a key, reading a value under a key or performing an atomic compare-and-set.

In such a store we can define a partition to simply be a (key, value) pair. Thus above, the pair \((“k1”, “aaa”)\) would be a partition, for example. I’m assuming that the different keys are not related, hence placing them far away from each other does not introduce certain subtle challenges (the next example explains what I have in mind).

On the other hand, consider a relational database. This type of database consists of relations (hence the name): sets of tuples of the form \((x_1, x_2, \dots, x_n)\), where \(x_1\) comes from some set \(X_1\), \(x_2\) comes from some set \(X_2\), and so on. For example, suppose \(X_1\) is the set of integers, \(X_2\) is the set of words over the English alphabet, and \(X_3\) is the set of real numbers; an example relation over \(X_1\), \(X_2\) and \(X_3\) would be

X1 X2 X3
0 “aba” 1.5
42 “universe” π
1 “cdc” -1/3

The sets \(X_i\) are usually called columns, the tuples — rows, and the relations — tables. In fact, a key-value store can be thought of as a special case of a relational database with two columns and a particular constraint on the set of pairs.

Some columns of a table in a relational database may serve a special purpose. For example, in Cassandra and Scylla, clustering columns are used to sort the rows. Therefore rows are not independent, unlike the (key, value) pairs from the previous example.

Maintaining a global order on rows would make it very hard to scale out the database. Thus, in Cassandra and Scylla, rows are grouped into partitions based on another set of columns, the partitioning columns. Each table in these databases must have at least one partitioning column. A partition in such a table is then a set of rows which have the same values in these columns; these values together form the partition key of the row. Order based on clustering columns is maintained within each partition, but not across partitions.

Suppose our table has a partitioning column pk and a clustering column ck. Each column has the integer type. Here’s what the user might see after querying it:

pk ck
0 2
0 3
2 0
2 1
1 5

In this example there are 3 partitions: \(\{(0, 2), (0, 3)\}\), \(\{(2, 0), (2, 1)\}\), and \(\{(1, 5)\}\). Note that within each partition the table appears sorted according to the clustering column, but there is no global order across the partitions.

For a general relational database we can assume the same strategy: a subset of columns in a table is identified as partitioning columns, and the partitions are sets of rows with common values under these columns. This will lead us to what’s usually called horizontal partitioning; the name is probably related to the way tables are usually depicted.

The important thing is that each partition has a way of uniquely identifying it, i.e. it has a key; for relational databases, the key is the tuple of values under partitioning columns for any row inside the partition (the choice of row does not matter by definition). For key-value stores where partitions are key-value pairs, the partition’s key is simply… the key.

In the above example, the key of partition \(\{(0, 2), (0, 3)\}\) is \(0\), for \(\{(2, 0), (2, 1)\}\) it’s \(2\), and for \(\{(1, 5)\}\) it’s \(1\).

How to make a large database

We’ve identified the partitions of our database. Now what?

Here’s how a non-distributed store looks like:

There is one node which stores all the partitions. This works fine as long as:

  • you make regular backups so that you don’t lose your data in case your node burns down and you don’t care if you lose the most recently made writes,
  • all the partitions fit in a single node,
  • the node keeps up with handling all client requests.

And now we’re going to cut our data into pieces.

This cutting is called sharding. The parts that we obtained are typically called shards in the database world. In this example we have 3 shards: one with keys \(\{key1, key2, key3\}\), one with \(\{key4, key5, key6\}\), and one with \(\{key7, key8\}\). Sometimes shards are also called ranges or tablets.

Scylla also uses “shards” to refer to CPU cores, which is a term that comes from the Seastar framework Scylla is built on top of. This concept of shard is not related in any way to our current discussion.

Now we have a sharded database; for now we’ve put each shard on a single node. Within each shard a number of partitions is kept.

Sharding solves two issues:

  • Partitions occupy space, but now they are distributed among a set of nodes. If our DB grows and more partitions appear we can just add more nodes. This is a valid strategy as long as the partitions themselves don’t grow too big: in Scylla we call this “the large partition problem” and it’s usually solved by better data modeling. You can read about how Scylla handles large partitions.
  • A node doesn’t need to handle all client requests: they are distributed according to the partitions that the clients want to access. This works as long as there is no partition that is everyone’s favorite: in Scylla we call this “the hot partition problem” and as in the case of large partitions, it’s usually solved by better data modeling. Scylla also collects statistics about the hottest partitions which can be investigated by the nodetool toppartitions command.

Our database now has the ability to scale out nicely if we take care to distribute data uniformly across partitions. It also provides some degree of fault tolerance: if one of the nodes burn down, we only lose that one shard, not all of them. But that’s not enough, as every shard may contain important data.

The solution is replication: let’s keep each shard on a subset of nodes!

We probably don’t want to keep each shard on every node; that would undo our previous scalability accomplishment (but it may be a valid strategy for some tables). We keep them on a number of nodes large enough to achieve a high degree of fault tolerance. The location of each copy also matters; we may want to put each shard on at least two nodes which reside in different geographical locations.

For shard \(S\), the set of nodes to which this shard is replicated will be called the replica set of \(S\). In the example above, the replica of shard \(shard5\) is \(\{A, B, E\}\).

Partitioning schemes and replication strategies

Each partition in our store is contained in a single shard, and each shard is replicated to a set of nodes. I will use the phrase partitioning scheme to denote the method of assigning partitions to shards, and replication strategy to denote the method of assigning shards to their replica sets.

Formally, let \(\mathcal K\) be the set of partition keys (recall that each key uniquely identifies a partition), \(\mathcal S\) be the set of shards, and \(\mathcal N\) be the set of nodes. There are two functions:

  • \(\mathcal P: \mathcal K \rightarrow \mathcal S\), the partitioning scheme,
  • \(\mathcal R: \mathcal S \rightarrow \mathbb P(\mathcal N)\), the replication strategy.

For a set \(X\), the symbol \(\mathbb P(X)\) denotes the power set of \(X\), i.e. the set of all subsets of \(X\).

For example, suppose that \(\mathcal K = \{1, 2, 3, 4, 5, 6\}\), \(\mathcal S = \{s_1, s_2, s_3\}\), and \(\mathcal N = \{A, B, C\}\). One partitioning scheme could be:
\[ \mathcal P = \{ \langle 1, s_1 \rangle, \langle 2, s_2 \rangle, \langle 3, s_3 \rangle, \langle 4, s_2 \rangle, \langle 5, s_3 \rangle, \langle 6, s_6 \rangle \} \].

The notation \(f = \{\langle x_1, y_1 \rangle, \langle x_2, y_2 \rangle, \dots\}\) denotes a function \(f\) such that \(f(x_1) = y_1\), \(f(x_2) = y_2\), and so on.

One replication strategy could be:
\[ \mathcal R = \{ \langle s_1, \{A, B\} \rangle, \langle s_2, \{B, C\} \rangle, \langle s_3, \{C\} \rangle \} \].
To calculate where each key is, we simply compose the functions: \(\mathcal R \circ \mathcal P\). For example:
\[ (\mathcal R \circ \mathcal P)(3) = \mathcal R(\mathcal P(3)) = \mathcal R(s_2) = \{B, C\} \].
The end result for this partitioning scheme and replication strategy is illustrated below.

Sometimes the replication strategy returns not a set of nodes, but an (ordered) list. The first node in this list for a given shard usually has a special meaning and is called a primary replica of this shard, and the others are called secondary replicas.

We could argue that the notion of “partitioning scheme” is redundant: why not simply map each partition to a separate shard (i.e. each shard would consist of exactly one partition)? We could then drop this notion altogether and have replication strategies operate directly on partitions.

To this complaint I would answer that separating these two concepts is simply a good “engineering practice” and makes it easier to think about the problem. There are some important practical considerations:

  • The partitioning scheme should be fast to calculate. Therefore it should only look at the partition key and not, for example, iterate over the entire set of existing partitions. That last thing may even be impossible since the set of partitions may be of unbounded size.
  • If the partitioning scheme maps partition keys to a small, maintainable number of shards (say a couple thousands of them), we can afford the replication strategy to be a bit more complex:
    • It can look at the entire set of shards when deciding where to replicate a shard.
    • It can make complex decisions based on the geographical location of nodes.
    • Due to the small number of shards we can easily cache its results.

Consistent hashing

A partitioning scheme is a method of grouping partitions together into shards; a replication strategy decides where the shards are stored. But how do we choose these two functions? If we consider all variables, the task turns out to be highly non-trivial; to make our lives simpler let’s make the following assumptions (it’s just the beginning of a longer list, unfortunately):

  • Our database nodes have the same storage capacity.
  • Our database nodes have the same computational power (both in terms of CPU and I/O).

From the top of my head I can list the following goals that we would like to achieve given these assumptions:

  • Storage should be uniformly distributed across nodes.
  • Client requests should be uniformly distributed across nodes.

It will be convenient to make some further assumptions:

  • There are many partitions (like a lot). With large data sets and some good data modeling the number of partitions may count in millions, if not billions.
  • The sizes of the large partitions are still small compared to the entire data set.
  • Similarly, the numbers of requests to the most popular partitions are still small compared to the total number of requests.
  • The distribution of workload across partitions is static.

One “obviously” desirable property of a partitioning scheme that comes to mind is to have it group partitions into shards of roughly equal size (in terms of storage and popularity of the contained partitions). That’s not strictly necessary, however: if the number of shards themselves is big enough, we can afford to have them a bit imbalanced. The replication strategy can then place some “smaller” shards together with some “bigger” shards on common replica sets, thus achieving the balancing of partitions across nodes. In other words, the responsibility of load balancing can be shifted between the partitioning scheme and the replication strategy.

Since we assumed that workload distribution is static, we can afford to have our two functions statically chosen (and not, say, change dynamically during normal database operation). However, there is still the problem of adding and removing nodes, which forces at least the replication strategy function to adjust (since its range \(\mc N\) changes); with that in mind let’s state another condition:

  • if a new node is added, it should be easy to move the data; the moving process should not overload a single node but distribute the strain across multiple nodes.

A similar condition could be stated for the problem of removing a node.

Without further ado, let us introduce a simple partitioning scheme that uses a technique called consistent hashing. Consider a circle.

Let \(C\) denote the set of points on the circle. We start by choosing a small (finite) subset of points on the circle \(T \subseteq C\):

The elements of \(T\) are called tokens.

Now define a function \(first: C \rightarrow T\) which, given a point on the circle, returns the first token “to the right” of this point (imagine you start walking the circle from the point in the clockwise direction, stopping at the first token). E.g.:

As the example shows for the point \(z\), if \(p \in T\), then \(first(p) = p\).

Now suppose that we have a function \(hash: \mathcal K \rightarrow C\) that given a partition key returns a point on the circle. Let’s assume that the function distributes the hashes uniformly across the circle. Our partitioning scheme is defined as follows:
\[ \mathcal P(k) = first(hash(k)). \]

Thus the range of \(\mathcal P\) is the set of tokens, \(T\); in other words, tokens are the shards of this partitioning scheme.

This scheme has a nice property which will turn out to be useful for us soon. Suppose that we randomly sample \(l\) tokens uniformly from the circle; then, on average, the portion of the circle that is mapped to any given token is \(1/l\). Thus, on average, the portion of the circle that is mapped to any \(m\) of these \(l\) tokens is \(m/l\). If the numbers \(l\) and \(m\) are large enough, we can expect the portion of the circle mapped to any \(m\) tokens to be pretty close to this average.

But how close? Let’s look at an example. Suppose we sample \(l = 1000\) tokens from the circle and look at the first \(m = 250\). What portion is mapped to these 250 tokens? “On average” it should be \(1/4\). I have repeated this experiment 10000 times and obtained the following results:

This shows that we can be almost certain that the portion of the circle mapped to our \(250\) out of \(1000\) randomly chosen tokens is between \(1/5\) and \(1/3\).

Now let’s define a simple replication strategy based on the circle of tokens. Suppose that we have chosen \(l\) tokens equal to some multiple \(m\) of the number of our nodes; say we have \(N\) nodes and we’ve chosen \(m \cdot N\) tokens. To each token we assign one of the nodes, called its owner, such that each node owns \(m\) tokens. Formally, we define a function \(owner: T \rightarrow \mathcal N\), with the property that the sets \(owner^{-1}(n)\) for \(n \in \mathcal N\) have size \(m\). Recall that the tokens are the shards in our partitioning scheme; we can define a replication strategy simply as follows:
\[ \mathcal R(t) = owner(t). \]

Now if the tokens were chosen randomly with uniform distribution, then:

  • each node owns tokens that together are assigned on average \(1/N\) of the circle, where \(N\) is the number of nodes; if the multiple \(m\) is large enough (a couple hundreds will do), each portion is very close to that average,
  • by our previous assumption, the hashes of partition keys are uniformly distributed around the circle,
  • therefore each node replicates roughly \(1/N\) of all partitions.

Some nodes may get large/hot partitions assigned, but hopefully the hash function doesn’t decide to put all large/hot partitions in one place (it probably won’t since it uses only the key, it doesn’t care about the size/popularity of a partition), so each node receives a similar portion of their share of the problematic partitions. Even if one node is unlucky and gets a bit more of these, our assumptions say that they won’t make a significant difference (compared to the total workload generated by “common” partitions).

Furthermore, suppose we add another node and assign it additional \(m\) randomly chosen tokens, so we end up with \(m \cdot (N+1)\) tokens in total. Since randomly choosing \(m \cdot N\) tokens followed by randomly choosing additional \(m\) tokens is the same as randomly choosing \(m \cdot (N + 1)\) tokens in the first place (assuming the tokens are chosen independently), we now expect each of the \(N + 1\) nodes to replicate roughly \(1 / (N+1)\) of all partitions. Finally, convince yourself that during the process the new node stole from each existing node roughly \(1 / (N+1)\) of data owned by that node.

This replication strategy assigns only one replica to each shard, so it’s not very useful in practice (unless you don’t care about losing data), but it serves as an example for showing how the circle of randomly chosen tokens helps us distribute workload uniformly across nodes. In a follow up to this post we will look at some more complex replication strategies based on the circle of tokens. By the way, Scylla and Cassandra use this scheme; look for the phrase token ring.

At Scylla we’re used to speaking about token ranges, also called vnodes: the intervals on the ring between two tokens with no token in between. There is a one-to-one correspondence between tokens and vnodes: the function that maps a token to the vnode whose right boundary is that token and left boundary is the token immediately preceding it on the ring. Vnodes are closed from the right side (i.e. they include the right boundary) and opened from the left side. The partitioning scheme can be thought of as mapping each partition key to the vnode which contains the key’s hash in between its boundaries.

One more thing: we’ve made an assumption that there exists a hash function \(hash: \mathcal K \rightarrow C\) mapping partition keys uniformly onto the circle. In practice, any “good enough” hash function will do (it doesn’t have to be a cryptographic hash). Scylla represents the circle of tokens simply using the set of 64-bit integers (with some integers reserved due to implementation quirks) and uses the murmur hash function.

Putting This Into Practice

Now that you understand better how partitions and replication work in Scylla, you can see how this was applied to the practical use of making a shard-aware driver in Python. A shard-aware driver needs to understand exactly which shard data is replicated on, to ensure one of the replicas is used as the coordinator node for a transaction. Part 1 takes the above and shows how Scylla and Cassandra drivers work similarly, yet differ because of shard-awareness, and Part 2 shows how to implement a faster-performing shard-aware Python driver.

The post Making a Scalable and Fault-Tolerant Database System: Partitioning and Replication appeared first on ScyllaDB.

Building Netflix’s Distributed Tracing Infrastructure

by Maulik Pandey

Our Team — Kevin Lew, Narayanan Arunachalam, Elizabeth Carretto, Dustin Haffner, Andrei Ushakov, Seth Katz, Greg Burrell, Ram Vaithilingam, Mike Smith and Maulik Pandey

@Netflixhelps Why doesn’t Tiger King play on my phone?” — a Netflix member via Twitter

This is an example of a question our on-call engineers need to answer to help resolve a member issue — which is difficult when troubleshooting distributed systems. Investigating a video streaming failure consists of inspecting all aspects of a member account. In our previous blog post we introduced Edgar, our troubleshooting tool for streaming sessions. Now let’s look at how we designed the tracing infrastructure that powers Edgar.

Distributed Tracing: the missing context in troubleshooting services at scale

Prior to Edgar, our engineers had to sift through a mountain of metadata and logs pulled from various Netflix microservices in order to understand a specific streaming failure experienced by any of our members. Reconstructing a streaming session was a tedious and time consuming process that involved tracing all interactions (requests) between the Netflix app, our Content Delivery Network (CDN), and backend microservices. The process started with manual pull of member account information that was part of the session. The next step was to put all puzzle pieces together and hope the resulting picture would help resolve the member issue. We needed to increase engineering productivity via distributed request tracing.

If we had an ID for each streaming session then distributed tracing could easily reconstruct session failure by providing service topology, retry and error tags, and latency measurements for all service calls. We could also get contextual information about the streaming session by joining relevant traces with account metadata and service logs. This insight led us to build Edgar: a distributed tracing infrastructure and user experience.

Figure 1. Troubleshooting a session in Edgar

When we started building Edgar four years ago, there were very few open-source distributed tracing systems that satisfied our needs. Our tactical approach was to use Netflix-specific libraries for collecting traces from Java-based streaming services until open source tracer libraries matured. By 2017, open source projects like Open-Tracing and Open-Zipkin were mature enough for use in polyglot runtime environments at Netflix. We chose Open-Zipkin because it had better integrations with our Spring Boot based Java runtime environment. We use Mantis for processing the stream of collected traces, and we use Cassandra for storing traces. Our distributed tracing infrastructure is grouped into three sections: tracer library instrumentation, stream processing, and storage. Traces collected from various microservices are ingested in a stream processing manner into the data store. The following sections describe our journey in building these components.

Trace Instrumentation: how will it impact our service?

That is the first question our engineering teams asked us when integrating the tracer library. It is an important question because tracer libraries intercept all requests flowing through mission-critical streaming services. Safe integration and deployment of tracer libraries in our polyglot runtime environments was our top priority. We earned the trust of our engineers by developing empathy for their operational burden and by focusing on providing efficient tracer library integrations in runtime environments.

Distributed tracing relies on propagating context for local interprocess calls (IPC) and client calls to remote microservices for any arbitrary request. Passing the request context captures causal relationships between microservices during runtime. We adopted Open-Zipkin’s B3 HTTP header based context propagation mechanism. We ensure that context propagation headers are correctly passed between microservices across a variety of our “paved road” Java and Node runtime environments, which include both older environments with legacy codebases and newer environments such as Spring Boot. We execute the Freedom & Responsibility principle of our culture in supporting tracer libraries for environments like Python, NodeJS, and Ruby on Rails that are not part of the “paved road” developer experience. Our loosely coupled but highly aligned engineering teams have the freedom to choose an appropriate tracer library for their runtime environment and have the responsibility to ensure correct context propagation and integration of network call interceptors.

Our runtime environment integrations inject infrastructure tags like service name, auto-scaling group (ASG), and container instance identifiers. Edgar uses this infrastructure tagging schema to query and join traces with log data for troubleshooting streaming sessions. Additionally, it became easy to provide deep links to different monitoring and deployment systems in Edgar due to consistent tagging. With runtime environment integrations in place, we had to set an appropriate trace data sampling policy for building a troubleshooting experience.

Stream Processing: to sample or not to sample trace data?

This was the most important question we considered when building our infrastructure because data sampling policy dictates the amount of traces that are recorded, transported, and stored. A lenient trace data sampling policy generates a large number of traces in each service container and can lead to degraded performance of streaming services as more CPU, memory, and network resources are consumed by the tracer library. An additional implication of a lenient sampling policy is the need for scalable stream processing and storage infrastructure fleets to handle increased data volume.

We knew that a heavily sampled trace dataset is not reliable for troubleshooting because there is no guarantee that the request you want is in the gathered samples. We needed a thoughtful approach for collecting all traces in the streaming microservices while keeping low operational complexity of running our infrastructure.

Most distributed tracing systems enforce sampling policy at the request ingestion point in a microservice call graph. We took a hybrid head-based sampling approach that allows for recording 100% of traces for a specific and configurable set of requests, while continuing to randomly sample traffic per the policy set at ingestion point. This flexibility allows tracer libraries to record 100% traces in our mission-critical streaming microservices while collecting minimal traces from auxiliary systems like offline batch data processing. Our engineering teams tuned their services for performance after factoring in increased resource utilization due to tracing. The next challenge was to stream large amounts of traces via a scalable data processing platform.

Mantis is our go-to platform for processing operational data at Netflix. We chose Mantis as our backbone to transport and process large volumes of trace data because we needed a backpressure-aware, scalable stream processing system. Our trace data collection agent transports traces to Mantis job cluster via the Mantis Publish library. We buffer spans for a time period in order to collect all spans for a trace in the first job. A second job taps the data feed from the first job, does tail sampling of data and writes traces to the storage system. This setup of chained Mantis jobs allows us to scale each data processing component independently. An additional advantage of using Mantis is the ability to perform real-time ad-hoc data exploration in Raven using the Mantis Query Language (MQL). However, having a scalable stream processing platform doesn’t help much if you can’t store data in a cost efficient manner.

Storage: don’t break the bank!

We started with Elasticsearch as our data store due to its flexible data model and querying capabilities. As we onboarded more streaming services, the trace data volume started increasing exponentially. The increased operational burden of scaling ElasticSearch clusters due to high data write rate became painful for us. The data read queries took an increasingly longer time to finish because ElasticSearch clusters were using heavy compute resources for creating indexes on ingested traces. The high data ingestion rate eventually degraded both read and write operations. We solved this by migrating to Cassandra as our data store for handling high data ingestion rates. Using simple lookup indices in Cassandra gives us the ability to maintain acceptable read latencies while doing heavy writes.

In theory, scaling up horizontally would allow us to handle higher write rates and retain larger amounts of data in Cassandra clusters. This implies that the cost of storing traces grows linearly to the amount of data being stored. We needed to ensure storage cost growth was sub-linear to the amount of data being stored. In pursuit of this goal, we outlined following storage optimization strategies:

  1. Use cheaper Elastic Block Store (EBS) volumes instead of SSD instance stores in EC2.
  2. Employ better compression technique to reduce trace data size.
  3. Store only relevant and interesting traces by using simple rules-based filters.

We were adding new Cassandra nodes whenever the EC2 SSD instance stores of existing nodes reached maximum storage capacity. The use of a cheaper EBS Elastic volume instead of an SSD instance store was an attractive option because AWS allows dynamic increase in EBS volume size without re-provisioning the EC2 node. This allowed us to increase total storage capacity without adding a new Cassandra node to the existing cluster. In 2019 our stunning colleagues in the Cloud Database Engineering (CDE) team benchmarked EBS performance for our use case and migrated existing clusters to use EBS Elastic volumes. By optimizing the Time Window Compaction Strategy (TWCS) parameters, they reduced the disk write and merge operations of Cassandra SSTable files, thereby reducing the EBS I/O rate. This optimization helped us reduce the data replication network traffic amongst the cluster nodes because SSTable files were created less often than in our previous configuration. Additionally, by enabling Zstd block compression on Cassandra data files, the size of our trace data files was reduced by half. With these optimized Cassandra clusters in place, it now costs us 71% less to operate clusters and we could store 35x more data than our previous configuration.

We observed that Edgar users explored less than 1% of collected traces. This insight leads us to believe that we can reduce write pressure and retain more data in the storage system if we drop traces that users will not care about. We currently use a simple rule based filter in our Storage Mantis job that retains interesting traces for very rarely looked service call paths in Edgar. The filter qualifies a trace as an interesting data point by inspecting all buffered spans of a trace for warnings, errors, and retry tags. This tail-based sampling approach reduced the trace data volume by 20% without impacting user experience. There is an opportunity to use machine learning based classification techniques to further reduce trace data volume.

While we have made substantial progress, we are now at another inflection point in building our trace data storage system. Onboarding new user experiences on Edgar could require us to store 10x the amount of current data volume. As a result, we are currently experimenting with a tiered storage approach for a new data gateway. This data gateway provides a querying interface that abstracts the complexity of reading and writing data from tiered data stores. Additionally, the data gateway routes ingested data to the Cassandra cluster and transfers compacted data files from Cassandra cluster to S3. We plan to retain the last few hours worth of data in Cassandra clusters and keep the rest in S3 buckets for long term retention of traces.

Table 1. Timeline of Storage Optimizations

Secondary advantages

In addition to powering Edgar, trace data is used for the following use cases:

Application Health Monitoring

Trace data is a key signal used by Telltale in monitoring macro level application health at Netflix. Telltale uses the causal information from traces to infer microservice topology and correlate traces with time series data from Atlas. This approach paints a richer observability portrait of application health.

Resiliency Engineering

Our chaos engineering team uses traces to verify that failures are correctly injected while our engineers stress test their microservices via Failure Injection Testing (FIT) platform.

Regional Evacuation

The Demand Engineering team leverages tracing to improve the correctness of prescaling during regional evacuations. Traces provide visibility into the types of devices interacting with microservices such that changes in demand for these services can be better accounted for when an AWS region is evacuated.

Estimate infrastructure cost of running an A/B test

The Data Science and Product team factors in the costs of running A/B tests on microservices by analyzing traces that have relevant A/B test names as tags.

What’s next?

The scope and complexity of our software systems continue to increase as Netflix grows. We will focus on following areas for extending Edgar:

  • Provide a great developer experience for collecting traces across all runtime environments. With an easy way to to try out distributed tracing, we hope that more engineers instrument their services with traces and provide additional context for each request by tagging relevant metadata.
  • Enhance our analytics capability for querying trace data to enable power users at Netflix in building their own dashboards and systems for narrowly focused use cases.
  • Build abstractions that correlate data from metrics, logging, and tracing systems to provide additional contextual information for troubleshooting.

As we progress in building distributed tracing infrastructure, our engineers continue to rely on Edgar for troubleshooting streaming issues like “Why doesn’t Tiger King play on my phone?”. Our distributed tracing infrastructure helps in ensuring that Netflix members continue to enjoy a must-watch show like Tiger King!

We are looking for stunning colleagues to join us on this journey of building distributed tracing infrastructure. If you are passionate about Observability then come talk to us.

Building Netflix’s Distributed Tracing Infrastructure was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Making a Shard-Aware Python Driver for Scylla, Part 2

This is the second part of a presentation given by Alexys Jacob, CTO of Numberly at the virtual Europython 2020 Conference in July, entitled A deep dive and comparison of Python drivers for Cassandra and Scylla. He also gave the same talk, updated, to PyCon India; we’ll use slides from the latter where they are more accurate or illustrative.

If you missed Part 1, which highlights the design considerations behind such a driver, make sure to check it out!

Alexys noted the structural differences between the Cassandra driver and the Scylla driver fork. “The first thing to see is that the token aware Cassandra driver opens a control connection when it connects for the first time to the cluster. This control connection allows your Cassandra driver to know about the cluster topology: how many nodes there are, which are up, which are down, what are the schemas, etc. It needs to know all this. So it opens a special connection, which refreshes from time to time.”

“Then it will open one connection per node because this is how the token aware policy will be applied to select the right connection based on the query.”

For Scylla, you still need to know about the cluster topology, but instead of opening one connection per node, we will be opening one connection per core per node.

“The token calculation will still be useful to select the right node from the token perspective but then we will need to add a Shard ID calculation because we need to go down to the shard or the CPU core.”

Alexys then turned this into the following “TODO” list to create the shard-aware driver:

“The first thing is since we will be using the same kind of control connection we just use this as-is. There’s nothing to change here.”

“We will need to change the connection class object because now, when we are going to open a connection per core per node we will need to be able to detect if we are talking to a Scylla cluster or to a Cassandra cluster.”

Alexys noted that, for the Scylla driver, “we want it to retain maximum compatibility with Cassandra. So you can use the Scylla driver to query your Cassandra cluster as well.”

“The HostConnection pool will use those shard-aware connections and open one connection to every core of every node. The token calculation that selects the right node will be the same. we will just use the vanilla and already existing and efficient token aware policy. But then we will need to extend it and add in the cluster when you issue the query. We will need the Cluster class to pass down the routing key to the connection pool. Then we will apply the shard id calculation and then implement the logic based on the shard id of selecting the right connection to the right node and to the right shard.”

Okay, Sounds like a plan! Let’s do this.

Before Alexys went any further, he introduced ScyllaDB’s own Israel Fruchter, who was in the virtual audience that day, and thanked him for the work he put in on the shard-aware driver code, especially his work on CI testing.

“Now we’ll get down into the code.”

“So the first thing that needed to be done is to add the shard information to the connections. So a connection now has a shard_id assigned to it and sharding information. This sharding information comes from the server responses when we issue a query.” (You will see that in the red square in the image above.)

This is what the logic looks like. “What’s interesting to note is that the Cassandra protocol allows only for connection message options being passed on the server response. This means that when the client initially connects to Cassandra or Scylla it has no way of passing any kind of information to the server. So we are dependent on the server’s response to know about shard information or whatever it is that we need.”

“If we look at the the message options that we get back after we have connected to the server the first one is one of the most interesting for us, because the Scylla shard information
tells us which shard_id or core was assigned to the connection by the server.”

“Since we have no way of asking anything when we connect we are dependent on the server shard allocation to the connection that we open. This is a protocol limitation.”

“Now we are going to change the host connection pool class. We need to get the connection object for every core of the node. We got rid of the single connection that we had before and replaced it with a dict where the keys are the shard_id and the values are the connections that are bound to the specified shard.”

“The first time we connect as you can see in the first rectangle, the first connection allows us to detect if we are connecting to a shard-aware cluster. A Scylla cluster, for instance. This is where we get the first glance at the sharding information and we store it.”

“This second part here with the four _ in range we can see that we are doing a synchronous and optimistic way to get a connection to every core. We open a new connection to the node and store shard_id plus connection object on the dict. We will do this twice as many times as there are cores on the remote node until we have a connection to all shards. Maybe.”

“Because if you remember the client, when it connects, cannot specify which shards it wants to connect to. So you just happen to connect to the server and you get a shard assigned . That’s why the initial implementation was trying and saying ‘okay let’s try twice as much as there are cores available on the remote node and hopefully we’ll get a full connection and a connection for every shard.’ If we were lucky, fine. Keep on moving. If not we would raise an exception.”

Not Acceptable!

“Noooo! The first time I saw this I understood this flow in the client not being able to request a specific shard_id because of the protocol limitation. So there is no deterministic and secure way to get the connection to all remote shards of a Scylla node. Connecting also synchronously means that the startup of our application would be as slow as connecting to (hopefully) all shards of all nodes. Not acceptable.”

“The second thing that came to my mind was, ‘Hey, this also means that all the current shard-aware drivers, since it’s the protocol limitation it’s not bound to Python. It’s not python’s problem. It’s a flaw or a lacking in the protocol itself.”

“That means that all the current shard-aware drivers are lying. Since none of them, even today, can guarantee to always have a connection for a given routing key. All of this is opportunistic and optimistic. You will eventually get one, but not all your queries will be able to use the direct connection to the right shard.”

“So I wrote an RFC on the Scylla dev mailing list to discuss this problem and the good news is that the consensus to a solution was recently found. It will take the form of a new shard allocation algorithm that will be placed on the server side. That will be made available as a new listening port on Scylla.”

“Since we want Scylla and Cassandra on their default port to retain the same behavior if we want to change a bit and add on the server a new kind of port allocation we need to do it on the new on your new port. It will be a shard aware port. It will just use a sort of modulo on the client’s socket source port to calculate and assign the correct shard to the connection. It means that the clients on the side will just have to calculate and select the right socket source port to get a connection to the desired shard id.”

“This is work in progress. It’s not done yet. So I worked on implementing a softer, optimistic and asynchronous way of dealing with this problem.”

“I wrote two functions. The first one is the optimistic one. It tries to open a connection and only stores it if the reported shard_id was not connected before. Else we close it. So we are only interested in keeping connections to missing shard_ids. We just open it and if it works good; if it doesn’t we’ll see later.”

“Then I switched the startup logic to schedule as many of those optimistic attempts to get a connection to a missing shard_id as there are shards available on the remote node. For example, when you start connecting if you have 50 cores on your remote server you will issue and schedule asynchronously 50 attempts to get a connection to shards. Maybe 25 of them will give you different and unique shard_ids connected. Maybe two of them. Maybe 50 of them. (Lucky you!)”

“But now we don’t care. It’s optimistic, asynchronous, and it will go on and on again like this as we use the driver. The result is a faster application startup time. As fast as the usual Cassandra driver and with non-blocking optimistic shard connections.”

“The cluster object should now pass down the routing key as well to the pool. You can see here that when you issue a query using the query function and the cluster object looks up for a connection we added the routing key so that we could apply the shard_id calculation. This shard_id calculation is a bit obscure. Lucky me, Israel was there to to implement it in
the first place.”

Optimizing Performance with Cython

“However, the first time I tried to use this pure Python implemented shard_id calculation it was very bad in driver performance. We were slower than the Cassandra driver.”

“What Israel did is to move this shard_id computation to Cython because actually the Cassandra driver is using a lot of Cython in the background when you install it. He managed to cut its latency impact by almost 7x. Kudos again Israel. It was a very impressive move. It made the difference from the driver perspective.”

Wrapping It Up


“In the main shard awareness logic, in the host connection pool, this is basically where the connection selection happens and everything is glued together. If we are on a shard-aware communication with a cluster we will calculate the shard_id now from the routing key token. Then we will use the routing key, the shard_id, and we will try to look up in our connection dict if we happen to have a connection to this direct shard_id, to this direct core. If we do? Triumph! We will use this direct connection and route the query there.”

“That’s the best case scenario. If not, we will pray and asynchronously issue a new attempt to connect to a missing shard. Maybe it will be the shard we were trying to look at before. Maybe it will be another one. But that means that as you issue more queries to your cluster, the more chance you get to have a connection to all shards and all cores. If you don’t have one, we will just randomly pick an existing connection. So at worst we would behave just as if we were using the Cassandra driver.”

Performance Results

“Does the Scylla driver live up to our expectations? Is it fast? How did it work in production? Because to you and to me the real value must be taken from production? So let’s see.”

“The first expectation we checked was if there was an increase in the number of open connections from the cluster? Because now that we are opening not only one connection to each node but one connection to each core of each node we expected to see this increase. When we deployed the Scylla driver we saw this increase.”

“The second one was also an expectation to have more CPU requirements because you open more connections meaning that your driver and your CPU have to handle more connections, keep alives, etc. We saw that we had to increase the limits on our Kubernetes deployments to avoid CPU saturation and throttling.”

“What about the major impact we wanted? We want faster queries. Lower latencies. Right? This is how that work graph looked like:”

“It’s amazing! We gained between 15% and 25% performance boost. At Numberly we like to look at our graphs on the worst case scenarios possible. That means that this is the max of our processing. This is the worst latency that we get from our application perspectives.”

“What’s interesting to note is that the performance boost is progressive. Since we connect to shards in the background in an optimistic fashion the longer our application runs the more chance it has to have a connection to all shards, then the lower the latency gets. Because we begin to always have the right connection to the right core for every query.

“You can see that right after the deployment we already got a fair amount of performance boost. But the longer time passes the more shards connected, and the better the latency. That was very interesting to see.”

“If we apply a moving median on another power-hungry process you can clearly see the big difference that the Scylla shard-aware driver has made in our production applications.”

“From this we also got an unexpected and cool side effect. Since the cluster load was reduced and the client’s latency was lower for the same workload we could cut by half the number of replicas on our deployment [from 4 to 2]. So we saved actual resources on our Kubernetes cluster.”

“Recent additions that we’ve done on the driver: we added some helpers to allow you to check for shard-awareness, and to check for opportunistic shard-aware connections, so you can actually see how fully connected you are or are not. When it becomes available we also will be changing the driver to deterministically select this time the shard_ id when it connects. There are two open pull requests already for this.

“We’re going to still work on improving the documentation. And since it’s a Cassandra driver fork we will rebase the latest improvements as well.”

Watch the Video

If you’d like to see Alexys’ full presentation, you can view his slides online, and check out the full video here.

Try it for Yourself!

Alexys then invited the audience to try the Scylla driver. “It’s working great. It’s in production for us for almost almost a month now, with the great impact that you’ve seen.”


The post Making a Shard-Aware Python Driver for Scylla, Part 2 appeared first on ScyllaDB.

Making a Shard-Aware Python Driver for Scylla, Part 1

At the virtual Europython 2020 Conference in July, Alexys Jacob, CTO of Numberly gave a talk entitled A deep dive and comparison of Python drivers for Cassandra and Scylla. (He also gave the same talk, updated, to PyCon India; we’ll use slides from the latter where they are more accurate or illustrative.)

We’ll break Alexys’ great talk into two parts. In this first part, Alexys covers how Scylla works, and makes the case for a shard-aware driver. In Part 2, Alexys will describe the actual implementation and the performance results.

Alexys is a perennial champion of Python, Scylla, Gentoo Linux and open source overall, having spoken at numerous conferences, including our own past Scylla Summit events. Known across the industry as @ultrabug, Alexys began his Europython talk by promising the audience “diagrams, emojis, Python code, and hopefully some amazing performance graphs as well!”

He also took a moment at the start of his talk to note that Discord, the very platform Europython had chosen to host the community discussion component of the online conference, was a Scylla user, and directed users to check out how Mark Smith described Discord’s own use of Scylla.

Data Distribution in Scylla and Cassandra, A Primer

Alexys noted that this session was for advanced users. So if users needed the basics of consistent hashing, he referred them to his prior talk at Europython 2017 on the topic.

Even for users who were not entirely familiar with the concept he offered a basic primer on how Scylla and Apache Cassandra’s token ring architecture works.

First, both Apache Cassandra and Scylla organize a cluster of nodes (or instances) into a ring architecture. The nodes on a ring should be homogenous (having the same amount of CPU, RAM and disk) to ensure that they can store and process the same relative quantity of information on each.

Further, each node is equivalent in importance. There is no leader or primary node. No node has a special role, and thus, no single node to be a point of failure. “They all do the same thing.”

This topology is called a “token ring” because each node in the cluster is responsible for roughly the same number of tokens, each of which represents a partition of data, a subset of data stored on the node. A partition appears as a group of sorted rows and is the unit of access of queried data. This data is usually replicated across nodes thanks to a setting that is called the replication factor.

The replication factor defines how many times data is replicated on nodes. For example a replication factor of 2 means that a given token or token range or partition will be stored on two nodes. This is the case here where partition 1 and 2 is stored on node X. Partition 2 is also stored on Node Y, while partition 1 is also stored on Node Z. So if we were to lose Node X we could still read the data from partition 1 from Node Z.

“This is how high availability is achieved and how Cassandra and Scylla favor availability and partition tolerance. They are called ‘AP’ on the CAP Theorem.” This kind of token ring architecture helps with data distribution among the nodes. Queries should in theory be evenly distributed between nodes.

Alexys noted there are still ways to get an imbalance of data, especially if certain nodes had more frequently-accessed data, or if certain partitions were particularly large. “To counter this effect that we are calling a ‘hot node’ or ‘hot partition’ we need to add more variance in partition to node allocation.This is done using what is called ‘virtual nodes’ (vNodes) so instead of placing physical nodes on the ring we will place many virtual instances of them called virtual nodes.”

“A virtual node represents a contiguous range of tokens owned by a single node so it’s just a smaller slice of a partition but,” Alexys noted that the way the data was shuffled between nodes was more balanced. “So if you look at how now the partitions are distributed among nodes you see that there is more variance into this which will end up in distributing the query better.”

You can learn more about how data is modeled and distributed in Scylla in our Scylla University course, and also some pro tips for how to do it from our Tech Talk on Best Practices for Data Modeling in Scylla.

At this point, everything said was the same between Scylla and Cassandra, but Alexys noted that here the two systems began to diverge. “Scylla goes one step further. On each Scylla node tokens of a vNode are further distributed among the CPU cores of the node. These are called shards. This means that the data stored on a Scylla cluster is not only bound to a node but can be traced down to one of its CPU cores.”

“This is a really interesting architecture and low level design. This is the feature that we will leverage in the Python shard-aware driver later on.”

Now that we understand how data is stored and distributed on the cluster, Alexys explained how it’s created by clients on the physical layer. A partition is a unit of data stored on a node and is identified by a partition key. The partitioner or the partition hash function using the partition key will help us determine where the data is stored on the given node in the cluster.

In this case a partition key will be derived from column ID. A hash function, which by default on Cassandra and Scylla is a Murmur3 hash, will be applied to it. This will give you a token that will be placed on the token ring. The token falls into a vNode – a contiguous range of tokens owned by one of the nodes; this is the node that the token will be assigned to.

The main difference was that this partitioning function would be shard-per-node on Cassandra, but it would be shard-per-core (CPU core) on Scylla.

In Scylla, after a token is assigned to a node, the node decides on which core to place the token. To do that, the entire range of tokens is cut into equal 2n pieces, where n is a node configuration parameter, by default 12. Each piece is further cut into S smaller equal pieces, where S is the number of shards; each of the smaller pieces belongs to one of the cores. The token falls into one of these pieces, determining the core to which it is assigned.

This means Scylla provides even more granular distribution and, thus, should be smoother in performance.

Client Driver Queries and Token Awareness

Alexys then described how client drivers query a Cassandra or a Scylla cluster. “Because now that we know this we could guess and expect that client drivers use this knowledge to find out and optimize their query plan.” He described how a naive client would connect to a Cassandra or Scylla cluster by opening the connection to any node of the cluster when it wants to issue a query, and would pick its connection randomly. However, the random node it connects to might not be where the data the client is looking for is stored.

In this case, the node acts as a coordinator for the query. “It is taking the ephemeral responsibility of routing the query internally in the cluster to the right nodes that are replicas for this data.” It is responsible for routing the query to all the replica nodes where the data resides, gathering the responses and then responding back to the client.

If the coordinator is not a replica for the query data it has to issue the queries to all replicas
Itself. “That means that you have an extra hop inside the cluster to get the responses. This is suboptimal of course, as it consumes network and processing power on the coordinator node for something that the client could have guessed right in the first place.”

“Because the partitioner hash function is known, our client library can use it to predict data location on the cluster and optimize the query routing. This is what the Cassandra driver does using the token aware policy.” Using token aware policy drivers, the coordinator should be selected from one of the replicas where the data is stored. Using a token aware driver means you save a network hop, reduce latency and minimize internal server load.

“From the point of view of the Python Cassandra driver the partition key is seen as a routing key” which is used to determine which nodes are the replicas for the query. The query itself must be prepared as a server-side statement. “You can see them a bit like stored procedures in the SQL world.”

“If you see this statement = session.prepare and then you express the the query that you want,” (in this case, SELECT * FROM motorbikes WHERE code = ?) when you have an argument or parameter you just put a question mark. “This is the recommended and most optimal way to query the data because when you have prepared your query it is validated and it lives on the server side. So you don’t have to pass it. You just have to pass a reference to it and then only pass the arguments.” One of the arguments will be the mandatory routing key, partition key, “So statement + routing key = node and it’s very very cool.”

“Another thing to note is that, just like prepared stored procedures, prepared statements are also the safest way because they prevent query injection. So please in production, at the bare minimum, only use prepared statements when you issue queries to Cassandra or Scylla clusters.”

(Pro tips: Learn more about CQL prepared statements in this blog post. Also note Scylla Monitoring Stack allows you to optimize your CQL by observing how many non-prepared statement queries you had that did not leverage token aware drivers.)

The Python Cassandra driver defaults to the token aware policy to route the query and then it also defaults to datacenter aware round robin (DCAwareRoundRobinPolicy) load balancing query routing. While that’s a bit of a mouthful, what it means is that your queries will try to balance across the appropriate nodes in the cluster. Round-robin is pretty basic but it is still pretty efficient and can be used even if your cluster is not spread across multiple datacenters.

“By doing so the query routine will not only hit the right node holding a copy of the data that you seek, but also load balance the queries evenly between all its replicas.” For Cassandra this is sufficient, however Alexys noted again that Scylla goes one step further, sharding data down to the CPU core. “We can do better than this. Token awareness is cool, but if our client had shard awareness it would be even cooler.”

A token aware client could be extended to become a shard-aware client to route its queries not only to nodes but right to their CPU cores. “This is very interesting to do.”

Examples from Other Languages

Alexys noted such shard-aware drivers already existed as forks of the drivers for Java and Go (learn more about those drivers in Scylla University here). But there was no shard-aware driver for Python. As a championing Pythonista, Alexys was undeterred. “When I attended Scylla Summit last year in San Francisco I did some hard lobbying and found some Scylla developers that were willing to help in making this happen for Python as well.”

Just before that Scylla Summit, when Tubi engineer Alex Bantos put the Java shard-aware driver into production, he immediately saw a balancing of transactions across the cluster, which he shared on Twitter. This was indicative of the kinds of performance improvements Alexys wanted to see from a Python shard-aware driver.

Watch the Video

If you’d like to see Alexys’ full presentation, you can view his slides online, and check out the full video here.

Check Out Part Two!

In the second installment, Alexys takes users step-by-step through the design process, shares his performance results from production deployment, and lay out next steps.


The post Making a Shard-Aware Python Driver for Scylla, Part 1 appeared first on ScyllaDB.

C++ (Scylla) in Battle Royale Against Java’s ZGC/Shenandoah/G1 (Cassandra)

We’ve wanted to compare Scylla vs Cassandra 4.0 using the most tuned GC and newer JVM. Luckily, Datastax did an extensive benchmark, comparing multiple GC algorithms and different Java Virtual Machines (JVMs). That’s great because, since we have a stake in C++, the average skeptic developer wouldn’t believe us to be the most honest vendor if we had run tests ourselves. Thus we can just use Datastax’ own test results and run Scylla against it under identical conditions to see how it compares.

The Java results overall present a notable improvement in P99 latencies of its new ZGC and Shenandoah JVM algorithms. However, there are not simple tradeoffs to make between maximum throughput, latency and even stability, as even C* 4.0 does not support JDK11 officially.

As a side note, from our point of view Datastax ran the workload in a less-than-realistic fashion. The dataset was tiny (16GB of SSD volume per node? This isn’t why you use NoSQL) and the consistency level was set to local_one, which means that the coordinator won’t wait for other replicas and keep latency minimal.

The more realistic the environment, the further the gap between Scylla/C++ and Cassandra/Java will grow. More on this at the bottom.

Cassandra’s JVM results

Three nodes of r3.2xl servers reached overall maximum throughput performance of 40k-51k of operations per second. The max P99 read latency reached 50ms. When not pushed to the max, using 25k ops, you can receive single digit latency, <3ms in the Shenandoah case. It will be interesting to repeat the test with a more typical dataset of 1TB/node to reach a clearer conclusion on the JVM GC battle royale winner.

The rest results did indeed show that C* 4 is a marked improvement over C* 3. They also included deprecated JVMs like CMS (see here and here).

As a reminder, only JDK8 is officially supported with C* 4.0. JDK11 is experimentally supported. JDK14 is not listed as supported, even as an experimental configuration. You can read the official status of JDK support for Apache Cassandra here.

For now, pay particular attention to how the two leading contenders, ZGC and Shenandoah, compared using C* 4:

Cassandra 4.0 testing results summary (per DataStax)

Machine, Garbage Collector, JDK, test rate Avg Write P99 [ms] Avg Read P99 [ms] Max P99 [ms]
Cassandra 3.11.6
r3.2xl, CMS JDK8, 25k ops 14.74 17.01 600.11
r3.2xl, CMS JDK8, 40k ops 25.24 24.18 47.92
r3.2xl, G1 JDK8, 25k ops 28.84 25.21 294.05
r3.2xl, G1 JDK8, 40k ops 83.03 66.48 314.16
r3.2xl, Shenandoah JDK8, 25k ops 8.65 9.16 29.88
r3.2xl, Shenandoah JDK8, 40k ops 66.13 49.64 421.35
Cassandra 4.0
r3.2xl, ZGC JDK11, 25k ops 4.21 4.12 23.38
r3.2xl, ZGC JDK11, 40k ops 35.67 32.70 65.34
r3.2xl, ZGC JDK14, 25k ops 2.45 2.81 16.98
r3.2xl, ZGC JDK14, 40k ops 14.87 14.83 36.78
r3.2xl, Shenandoah JDK11, 25k ops 2.72 2.64 29.76
r3.2xl, Shenandoah JDK11, 40k ops 9.13 17.37 29.10
r3.2xl, Shenandoah JDK11, 50k ops 30.60 28.10 61.46
r3.2xl, CMS JDK11, 25k ops 12.25 11.54 28.61
r3.2xl, CMS JDK11, 40k ops 22.25 20.15 41.55
r3.2xl, CMS JDK11, 50k ops 34.58 31.53 55.59
r3.2xl, G1 JDK11, 25k ops 27.15 19.77 327.99
r3.2xl, G1 JDK11, 40k ops 29.70 26.35 315.34
r3.2xl, G1 JDK11, 50k ops 52.71 42.41 344.55
r3.2xl, G1 JDK14, 25k ops 15.24 12.51 324.14
r3.2xl, G1 JDK14, 40k ops 23.76 22.36 302.98
r3.2xl, G1 JDK14, 50k ops 32.71 39.38 422.94

The first thing that’s obvious is that single-digit p99 latencies can only be maintained with ZGC or Shenandoah if you limit each server to 25k ops/second — a rate of only about 3,125 replicated-op/second per VCPU. Anything at a higher volume and you’re in double-digits. Also, the cluster maxed out at around 51k ops/second.

Cassandra was CPU bound (and Java bound) thus it did not hit the EBS storage bottleneck we’ll report below.

Scylla’s performance on the same setup

We repeated the same benchmark in the same conditions — using the same hardware, testing tools, client command line, mix of read/write, etc. — using Scylla Open Source 4.1 as the system under test. Scylla comes configured out of the box; no need to tune, just use the AMI or run Scylla_setup. Sometimes, auto tuning is the biggest advantage.

TL;DR please do go over the results, it’s a shame not to enjoy the journey, so here we go.
Initially we used the Cassandra default Java driver, only on the last test we switched to the Scylla fork which uses shard-aware topology knowledge (a trivial switch) and immediately saw better performance.

25k operations per second

40k operations per second

55k operations per second

70k operations per second

Since the above benchmark is using EBS volumes, a network attached storage based solution, we knew we exhausted the max possible IOPS provided by the system. To continue further testing we switched Scylla to use i3.2xlarge instances, which use fast NVMe SSDs. The reason we chose the i3.2xlarge is to stay within the same 8 vCPU and memory range as the original test did, while using a more performant I/O system.

As a result we were able to increase performance to 100K operations per second — 12,500 replicated-op/second per vCPU — while maintaining P99 latency well below 7ms. At this point the limiting factor becomes the amount of power we can extract from the instance’s CPUs.

To increase the efficiency of the CPUs we added the Scylla shard-aware driver to the testing instance. The result is a decrease of latency by an additional 20% or more, resulting in sub 5ms for P99!

100K operations per second, using i3.2xlarge servers, Cassandra driver

100K operations per second, using i3.2xlarge servers, Scylla shard aware driver

Scylla 4.1 testing results summary:

Machine, test rate and driver type Max Write P99 [ms] Max Read P99 [ms]
r3.2xl, 25K ops, Cassandra default driver 6.78 6.85
r3.2xl, 40K ops, Cassandra default driver 9.78 8.91
r3.2xl, 55K ops, Cassandra default driver 13.7 15.1
r3.2xl, 70K ops, Cassandra default driver 14.7 15.3
r3.2xl, 80K ops, Cassandra default driver 22.9 23.3
i3.2xl, 100K ops, Cassandra default driver 6.47 6.51
i3.2xl, 100K ops, Scylla Shard aware driver 4.28 4.86

Summary of comparison

  • Cassandra 4.0 with 9 different JVM configurations, fully optimized by Datastax, reached maximum performance of 51k OPS at max P99 latency of 60ms on the selected hardware.
  • The supported JDK8 topped 40k ops. Only by employing a JVM that isn’t officially supported in Cassandra 4.0 could they achieve the faster results.
  • Cassandra 4.0 requires 4x the hardware to reach Scylla 4.1’s throughput.

DataStax’ results confirm our own estimations and 4.0 tests of C* 4.0. They are basically similar to the C* 3.11 results since nothing substantial changed in C*’s core. When using a proper dataset, on workload-optimized nodes (10TB-50TB in the Scylla case), the gap between Cassandra and Scylla grows even further in Scylla’s favor.

Functionality wise, there are additional reasons to choose Scylla. For the above price point of Cassandra 4.0, you can buy a fully managed, enterprise grade Scylla Cloud system, and still contribute 50% of your TCO to the charity of your choice or pocket the difference just please your CFO.

Do you concur with our results? You are welcome to challenge us on the Scylla slack channel. Better yet, if you want to see for yourself, run our free test drive or if you are convinced and want to get started just download a Docker image.



Infrastructure and tools used to benchmark Scylla 4.1
Based system used on AWS EC2 instances:
Scylla servers: 3 instances of r3.2xlarge
Stress node: a single c3.2xlarge

For the higher rate testing we used :
Scylla servers: 3 instances of i3.2xlarge
Stress node: a single c3.2xlarge


Benchmarks were done using 8 threads running with rate limiting and 80% writes / 20% reads.
The load tests were conducted initially with each thread sending 50 concurrent queries at a time.
The keyspace was created with a replication factor of 3 and all queries were executed at consistency level LOCAL_ONE, and STCS Compaction Strategy.

The following tlp-stress command was used:
tlp-stress run BasicTimeSeries -d 30m -p 100M -c 50 --pg sequence -t 8 -r 0.2 --rate --populate 200000 --compaction "{'class': 'SizeTieredCompactionStrategy', 'min_threshold' : '2'} AND speculative_retry = 'NONE'"

All workloads ran for 30 minutes, loading between 16 to 40 GB of data per node, allowing a reasonable compaction load.

Setting the stress tool to obtain higher throughput rates

We increased the number of connections in the tlp-stress tool to be able to stress scylla at higher rates. Here is the procedure we followed:

Tlp-stress download site –

Since the number of connections is hard-coded on tlp-stress, we had to download, modify and build it ourselves.

1. Clone tlp-stress repo:

git clone

2. Edit the run.kt file, increasing the connection parameter from 4 , 8 to 8 , 16 :

$vi src/main/kotlin/com/thelastpickle/tlpstress/commands/Run.kt
var builder = Cluster.builder()
.withCredentials(username, password)
.setConnectionsPerHost(HostDistance.LOCAL, 4, 8)
.setConnectionsPerHost(HostDistance.REMOTE, 4, 8)
.setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection)
.setMaxRequestsPerConnection(HostDistance.REMOTE, maxRequestsPerConnection))

3. Build with Gradle

./gradlew shadowJar

You will get the a similar output

~/tlp-stress$ ./gradlew shadowJar
3 actionable tasks: 2 executed, 1 up-to-date

4. Done. Use the shell script wrapper to start and get help:

bin/tlp-stress -h

The post C++ (Scylla) in Battle Royale Against Java’s ZGC/Shenandoah/G1 (Cassandra) appeared first on ScyllaDB.

Building a Low-Latency Distributed Stock Broker Application: Part 4

In the fourth blog of the  “Around the World ” series we built a prototype of the application, designed to run in two georegions.

Recently I re-watched “Star Trek: The Motion Picture” (The original 1979 Star Trek Film). I’d forgotten how much like “2001: A Space Odyssey” the vibe was (a drawn out quest to encounter a distant, but rapidly approaching very powerful and dangerous alien called “V’ger”), and also that the “alien” was originally from Earth and returning in search of “the Creator”—V’ger was actually a seriously upgraded Voyager spacecraft!

Star Trek: The Motion Picture (V’ger)

The original Voyager 1 and 2 had only been recently launched when the movie came out, and were responsible for some remarkable discoveries (including the famous “Death Star” image of Saturn’s moon Mimas, taken in 1980). Voyager 1 has now traveled further than any other human artefact and has since left the solar system! It’s amazing that after 40+ years it’s still working, although communicating with it now takes a whopping 40 hours in round-trip latency (which happens via the Deep Space Network—one of the stations is at Tidbinbilla, near our Canberra office).

Canberra Deep Space Communication Complex (CSIRO)

Luckily we are only interested in traveling “Around the World” in this blog series, so the latency challenges we face in deploying a globally distributed stock trading application are substantially less than the 40 hours latency to outer space and back. In Part 4 of this blog series we catch up with Phileas Fogg and Passepartout in their journey, and explore the initial results from a prototype application deployed in two locations, Sydney and North Virginia.

1. The Story So Far

In Part 1 of this blog series we built a map of the world to take into account inter-region AWS latencies, and identified some “georegions” that enabled sub 100ms latency between AWS regions within the same georegion. In Part 2 we conducted some experiments to understand how to configure multi-DC Cassandra clusters and use java clients, and measured latencies from Sydney to North Virginia. In Part 3 we explored the design for a globally distributed stock broker application, and built a simulation and got some indicative latency predictions. 

The goal of the application is to ensure that stock trades are done as close as possible to the stock exchanges the stocks are listed on, to reduce the latency between receiving a stock ticker update, checking the conditions for a stock order, and initiating a trade if the conditions are met. 

2. The Prototype

For this blog I built a prototype of the application, designed to run in two georegions, Australia and the USA. We are initially only trading stocks available from stock exchanges in these two georegions, and orders for stocks traded in New York will be directed to a Broker deployed in the AWS North Virginia region, and orders for stocks traded in Sydney will be directed to a Broker deployed in the AWS Sydney region. As this Google Earth image shows, they are close to being antipodes (diametrically opposite each other, at 15,500 km apart, so pretty much satisfy the definition of  traveling “Around the World”), with a measured inter-region latency (from blog 2) of 230ms.

Sydney to North Virginia (George Washington’s Mount Vernon house)

The design of the initial (simulated) version of the application was changed in a couple of ways to: 

  • Ensure that it worked correctly when instances of the Broker were deployed in multiple AWS regions
  • Measure actual rather than simulated latencies, and
  • Use a multi-DC Cassandra Cluster. 

The prototype isn’t a complete implementation of the application yet. In particular it only uses a single Cassandra table for Orders—to ensure that Orders are made available in both georegions, and can be matched against incoming stock tickers by the Broker deployed in that region. 

Some other parts of the application are “stubs”, including the Stock Ticker component (which will eventually use Kafka), and checks/updates of Holdings and generation of Transactions records (which will eventually also be Cassandra tables). Currently only the asynchronous, non-market, order types are implemented (Limit and Stop orders), as I realized that implementing Market orders (which are required to be traded immediately) using a Cassandra table would result in too many tombstones being produced—as each order is deleted immediately upon being filled (rather than being marked as “filled” in the original design, to enable the Cassandra query to quickly find unfilled orders and prevent the Orders table from growing indefinitely). However, for non-market orders it is a reasonable design choice to use a Cassandra table, as Orders may exist for extended periods of time (hours, days, or even weeks) before being traded, and as there is only a small number of successful trades (10s to 100s) per second relative to the total number of waiting Orders (potentially millions), the number of deletions, and therefore tombstones, will be acceptable. 

We now have a look at the details of some of the more significant changes that were made to the application.


I created a multi-DC Cassandra keyspace as follows:

CREATE KEYSPACE broker WITH replication = {'class': 'NetworkTopologyStrategy', 'NorthVirginiaDC': '3', 'SydneyDC': '3'};

In the Cassandra Java driver, the application.conf file determines which Data Center the Java client connects to. For example, to connect to the SydneyDC the file has the following settings:

datastax-java-driver { = [

    basic.load-balancing-policy {
        class = DefaultLoadBalancingPolicy
        local-datacenter = "SydneyDC"

The Orders table was created as follows (note that I couldn’t use “limit” for a column name as it’s a reserved word in Cassandra!):

CREATE TABLE broker.orders (
    symbol text,
    orderid text,
    buyorsell text,
    customerid text,
    limitthreshold double,
    location text,
    ordertype text,
    quantity bigint,
    starttime bigint,
    PRIMARY KEY (symbol, orderid)

For the primary key, the partition key is the stock symbol, so that all outstanding orders for a stock can be found when each stock ticker is received by the Broker, and the clustering column is the (unique) orderid, so that multiple orders for the same symbol can be written and read, and a specific order (for an orderid) can be deleted. In a production environment using a single stock symbol partition may result in skewed and unbounded partitions which is not recommended.

The prepared statements for creating, reading, and deleting orders are as follows:

PreparedStatement prepared_insert = Cassandra.session.prepare(
"insert into broker.orders (symbol, orderid, buyorsell, customerid, limitthreshold, location, ordertype, quantity, starttime) values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
PreparedStatement prepared_select = Cassandra.session.prepare(
        "select * from broker.orders where symbol = ?");

PreparedStatement prepared_delete = Cassandra.session.prepare(
        "delete from broker.orders where symbol = ? and orderid = ?");

I implemented a simplified “Place limit or stop order” operation (see Part 3), which uses the prepared_insert statement to create each new order, initially in the Cassandra Data Center local to the Broker where the order was created from, which is then automatically replicated in the other Cassandra Data Center. I also implemented the “Trade Matching Order” operation (Part 3), which uses the prepared_select statement to query orders matching each incoming Stock Ticker, checks the rules, and then if a trade is filled deletes the order.


I created a 3 node Cassandra cluster in the Sydney AWS region, and then added another identical Data Center in the North Virginia AWS regions using Instaclustr Managed Cassandra for AWS. This gave me 6 nodes in total, running on t3.small instances (5 GB SSD, 2GB RAM, 2 CPU Cores). This is a small developer sized cluster, but is adequate for a prototype, and very affordable (2 cents an hour per node for AWS costs) given that the Brokers are currently only single threaded so don’t produce much load. We’re more interested in latency at this point of the experiment, and we may want to increase the number of Data Centers in the future. I also spun up an EC2 instance (t3a.micro) in the same AWS regions, and deployed an instance of the Stock Broker on each (it only used 20% CPU). Here’s what the complete deployment looks like:

3. The Results

For the prototype, the focus was on demonstrating that the design goal of minimizing latency for trading stop and limit orders (asynchronous trades) was achieved. For the prototype, the latency for these order types is measured from the time of receiving a Stock Ticker, to the time an Order is filled. We ran a Broker in each AWS region concurrently for an hour, with the same workload for each, and measured the average and maximum latencies. For the first configuration, each Broker is connected to its local Cassandra Data Center, which is how it would be configured in practice. The results were encouraging, with an average latency of 3ms, and a maximum of 60ms, as shown in this graph.  

During the run, across both Brokers, 300 new orders were created each second, 600 stock tickers were received each second, and 200 trades were carried out each second. 

Given that I hadn’t implemented Market Orders yet, I wondered how I could configure and approximately measure the expected latency for these synchronous order types between different regions (i.e. Sydney and North Virginia)? The latency for Market orders in the same region will be comparable to the non-market orders. The solution turned out to be simple— just re-configure the Brokers to use the remote Cassandra Data Center, which introduces the inter-region round-trip latency which would also be encountered with Market Orders placed on one region and traded immediately in the other region. I could also have achieved a similar result by changing the consistency level to EACH_QUOROM (which requires a majority of nodes in each data center to respond). Not surprisingly, the latencies were higher, rising to 360ms average, and 1200ms maximum, as shown in this graph with both configurations (Stop and Limit Orders on the left, and Market Orders on the right):

So our initial experiments are a success, and validate the primary design goal, as asynchronous stop and limit Orders can be traded with low latency from the Broker nearest the relevant stock exchanges, while synchronous Market Orders will take significantly longer due to inter-region latency. 

Write Amplification

I wondered what else can be learned from running this experiment? We can understand more about resource utilization in multi-DC Cassandra clusters. Using the Instaclustr Cassandra Console, I monitored the CPU Utilization on each of the nodes in the cluster, initially with only one Data Center and one Broker, and then with two Data Centers and a single Broker, and then both Brokers running. It turns out that the read load results in 20% CPU Utilization on each node in the local Cassandra Data Center, and the write load also results in 20% locally.  Thus, for a single Data Center cluster the total load is 40% CPU. However, with two Data Centers things get more complex due to the replication of the local write loads to each other Data Center. This is also called “Write Amplification”.

The following table shows the measured total load for 1 and 2 Data Centers, and predicted load for up to 8 Data Centers, showing that for more than 3 Data Centers you need bigger nodes (or bigger clusters). A four CPU Core node instance type would be adequate for 7 Data Centers, and would result in about 80% CPU Utilization.  

  Number of Data  Centres Local Read Load Local Write Load Remote Write Load Total Write Load Total Load
  1 20 20 0 20 40
  2 20 20 20 40 60
  3 20 20 40 60 80
  4 20 20 60 80 100
  5 20 20 80 100 120
  6 20 20 100 120 140
  7 20 20 120 140 160
  8 20 20 140 160 180


The total cost to run the prototype includes the Instaclustr Managed Cassandra nodes (3 nodes per Data Center x 2 Data Centers = 6 nodes), the two AWS EC2 Broker instances, and the data transfer between regions (AWS only charges for data out of a region, not in, but the prices vary depending on the source region). For example, data transfer out of North Virginia is 2 cents/GB, but Sydney is more expensive at 9.8 cents/GB. I computed the total monthly operating cost to be $361 for this configuration, broken down into $337/month (93%) for Cassandra and EC2 instances, and $24/month (7%) for data transfer, to process around 500 million stock trades. Note that this is only a small prototype configuration, but can easily be scaled for higher throughputs (with incrementally and proportionally increasing costs).


In this blog we built and experimented with a prototype of the globally distributed stock broker application, focussing on testing the multi-DC Cassandra part of the system which enabled us to significantly reduce the impact of planetary scale latencies (from seconds to low milliseconds) and ensure greater redundancy (across multiple AWS regions), for the real-time stock trading function. Some parts of the application remain as stubs, and in future blogs I aim to replace them with suitable functionality (e.g. streaming, analytics) and non-functionality (e.g. failover) from a selection of Kafka, Elasticsearch and maybe even Redis!

The post Building a Low-Latency Distributed Stock Broker Application: Part 4 appeared first on Instaclustr.

Building a Grafana Backend Plugin

Grafana is a great observability platform. One of its key strengths comes from its plugin system. (To give you an idea of how popular the extension of Grafana through plugins is, you can find the entire list of available plugins here.)

In this post I will explain how to build a simple data source plugin for Grafana 7, using Scylla as an example. I’ll focus on how to create a new data source plugin and specifically on backend plugins.

When building your own dashboard you would usually use two types or plugins: datasource and panels.

  • Data source plugins allow grafana to connect to a variety of sources and display the results. There are already many plugins for known databases.
  • Panel plugins make it possible to extend how data is displayed.

Creating a Backend Plugin

In this article I will show you how I created a backend plugin for Scylla that would also work with Apache Cassandra. To make it easier to follow the instructions I have created the following open source repository:

You can look at the commit-list to follow the steps in this post. If you are looking for the actual plugin it is in:

All of this work is open source, available under the Apache 2.0 license.


Grafana has frontend and backend parts. Typically, a user is exposed to the frontend. For example, in the combination of running Grafana with Prometheus as a metric server, the data is collected in the user browser and then displayed.

While we call it a backend plugin, it actually has a frontend and a backend. The backend will be written in Go and the frontend uses react. You don’t need much experience in either to follow along. If you don’t have these already, you’ll need to install the following:

And optionally:

Creating the Plugin

This guide follows the Grafana data source guide and customizes it. In general these are the steps we’ll take:

  • Setup your environment
  • Create a plugin
  • Implement data queries
  • Add support for health checks
  • Enable alerts in Grafana

Setting Up Your Environment

We will need some configuration and I like to do my Grafana configuration from files so I suggest that you create a directory for all Grafana development.

mkdir grafana-dev
cd grafana-dev

Running Grafana in Docker

This is optional. In development I use Docker for running Grafana (I’m also going to use Docker for Scylla). I suggest you would add two scripts to start and stop Grafana in Docker.

This the script I’m going to use to start grafana:

$ cat
docker run -d -p 3000:3000 \
    -e "GF_AUTH_BASIC_ENABLED=false" \
    -v $PWD/grafana/dashboard:/var/lib/grafana/dashboards:z \
    -v $PWD/plugins:/var/lib/grafana/plugins:z \
    -v $PWD/provisioning:/var/lib/grafana/provisioning:z \
    -e "GF_PATHS_PROVISIONING=/var/lib/grafana/provisioning" \
    --name grafana grafana/grafana:7.1.5

This is the script I’m going to use to kill it:

$ cat
docker kill grafana
docker rm -v grafana

If you are familiar with Grafana, note the following:

  • The GF_AUTH_ANONYMOUS_ENABLED flag that supports the new unsigned plugin.
  • The plugins directory, it would allow grafana to find the plugin.
  • The provisioning directory that would allow us to configure the plugin from file.

Creating the base for the plugin

mkdir -p provisioning/datasources/

mkdir plugins

cd plugins

I’m using scylla-datasource as the plugin name.

Follow the instructions to create the plugin

npx @grafana/toolkit@next plugin:create scylla-datasource

cd scylla-datasource

After you have updated the plugin SDK and compile, stop.

Before we do anything else, let’s use git:

git init
git add .
git commit -m "initial commit"

First Test

If you haven’t done so, run the Grafana server, if you are using docker, you can use docker restart grafana.

Check the logs with:

docker logs grafana

And look for msg="Running an unsigned backend plugin".

If you see a warning about not running an unsigned plugin, make sure you get the plugin name right with the GF_AUTH_ANONYMOUS_ENABLED flag.

You can now follow the rest of the tutorial to add your datasource from the dashboard.
I suggest you do that and get back to this post when you’re done.

Post the basic tutorial

Welcome back! In the rest of the post, I will cover multiple subjects that were not clear to me while developing the backend plugin. I tried to break each subject into its own independent section. Besides the explanations you can follow the commits in the repository here.

Data source configuration

Adding a datasource comes with its own configuration.
If you followed the guide, when you added the plugin, you saw a screen such as:

Which is a basic configuration that can help you get started.

Before explaining how to modify it, you should note the two kinds of configurations that share the screen. There is a regular one and backend only.

When an admin configures a data source (in our case, a Scylla database) to be able connect to it securely, we need a username and a password. Once we configure the username and password, we don’t want other users to be exposed to it. Other parts of configurations can be public and we would like users to be able to see them (for example the host it will be connected to).

The configuration has secure and non-secure parts. The secure part can be set or reset, but once configured it will not be sent back to the frontend. The non secure part is shown and can be changed. Grafana supports those two types.

For our example we are going to have a secure username/password and non-secure host.

There are two files we need to change:

  1. src/types.ts that holds the types definitions.
  2. src/ConfigEditor.tsx that holds the code for showing and changing the configuration.


In types.ts look for MyDataSourceOptions this is the interface definition of the non-secure part of the configuration. For our example, we will change the path? to host?.

export interface MyDataSourceOptions extends DataSourceJsonData {
    host?: string;

Now edit ConfigEditor.tsx and change the path to host. Compile your code with yarn build, now when you adds a new plugin it looks like this:

I’ve also changed the host description, you can see the full changes in the commit.


Now let’s add the username and password. Edit the types.ts and update MySecureJsonData.

export interface MySecureJsonData {
    user?: string;
    password?: string;

Edit configEditor.tst, look at how the apiKey is configured and change it to password, note the use of the secretFormField, once configured secret configuration can only be reset and re-assigned.

After you made the changes compile and reload the data source configuration, it should now looks like this:

Note that I’ve changed the user and password text, look at the git commit for full change.

Congratulations, you have seen how to change the backend configuration!

Configure the data source from a datasource.yml file

As I’ve mentioned before, I like to use files for grafana configurations. You can read more about my take on dashboards from files here.

Just like the start script that I’ve mentioned to start the grafana container you can put your plugin configuration in a file, if you use the suggested run script, you already have everything you need for that.

Go back up to the directory that holds the script, under the provisioning directory we already created a datasource directory.

Create a datasource.yml file:

printf "apiVersion: 1\ndatasources:\n- name: scylla-datasource\n type: your-datasource\n orgId: 1\n isDefault:\n jsonData:\n host: ''\n secureJsonData:\n user: 'Scylla-User'\n password: 'Scylla-Password'\n" > provisioning/datasources/datasource.yaml

Make sure you change your-datasource to the data source id you are using, in my case this is scylla-scylla-datasource.

Restart Grafana and make sure that now you have the plugin configured.

Query Configuration

The second frontend configuration, is the query configuration. This is the user’s way to control what will be returned from the data source.

Create a new dashboard, add a panel and set its data source to the newly configured one.
You should see something like:

We will change it to CQL and an optional host.


Edit the types.ts file and change the MyQuery interface to:

export interface MyQuery extends DataQuery {
    queryText?: string;
    host?: string;


And remove the use of constans in the defaultQuery part.
Note that defaultQuery is the place to add default values for your input.

Edit QueryEditor.tsx. This is the file that controls how the query editor looks and behaves.
You should replace constant with host, make sure to remove the number formatting.
This is how it looks like, note that I’ve set the input width.

Template variables

Grafana supports template variables, those variables can be set explicitly or taken from a metric. You can read more about adding variables to a plugin here.

In general, you need to explicitly do the variable replacement before sending the query to the backend plugin.

Todo that you will need to edit src/DataSource.ts.

The latest version of grafana runtime holds the TemplateSrv that can be used to fetch available template variables.

You need the getTemplateSrv to get it.

Edit DataSource.ts and add the following:

import { DataSourceInstanceSettings } from '@grafana/data';
import { DataSourceWithBackend } from '@grafana/runtime';
import { MyDataSourceOptions, MyQuery } from './types';
import { getTemplateSrv } from '@grafana/runtime';
export class DataSource extends DataSourceWithBackend<MyQuery, MyDataSourceOptions> {
  constructor(instanceSettings: DataSourceInstanceSettings) {
  applyTemplateVariables(query: MyQuery) {
    const templateSrv = getTemplateSrv();
    return {
      queryText: query.queryText ? templateSrv.replace(query.queryText) : '',
      host: ? templateSrv.replace( : '',

As you can see, applyTemplateVariables allows you to modify the values before they are sent.
So far we took care of the frontend side, it’s time to move to the backend with go.

Writing the Backend part

Getting the data from the request

Our request holds two parameters, the queryText and the host.
Update the queryModel accordingly:

type queryModel struct {
    Format string `json:"format"`
    QueryText string `json:"queryText"`
    Host string `json:"host"`

Compile with mage and restart Grafana for the change would take effect.
I have added a log line in the code to print the results so I can see that data is transferred between the frontend to the backend.

Connection Management

The base template suggests to use an instance manager for the life cycle management of data source instances. In practice, the im variable inside SampleDatasource can generate new connections when needed.

ScyllaDB and Apache Cassandra use CQL to query the database. We will use the Go driver for that.

In the import part, replace the http with

Change instanceSettings to:

type instanceSettings struct {
    cluster *gocql.ClusterConfig
    session *gocql.Session

The newDataSourceInstance needs to use the host from the backend configuration.

func newDataSourceInstance(setting backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
    type editModel struct {
        Host string `json:"host"`
    var hosts editModel
    err := json.Unmarshal(setting.JSONData, &hosts)
    if err != nil {
        log.DefaultLogger.Warn("error marshalling", "err", err)
        return nil, err
    log.DefaultLogger.Info("looking for host", "host", hosts.Host)
    var newCluster *gocql.ClusterConfig = nil
    newCluster = gocql.NewCluster(hosts.Host)
    session, _ := gocql.NewSession(*newCluster)
    return &instanceSettings{
        cluster: newCluster,
        session: session,
    }, nil

You can see in the code, how to get information from the setting JSONData, remember that there are two parts to the backend data source configuration, this is the unsecured part.
Another thing is how a session is created once and is kept for future use.

Dataframe Response

If you look at the query method, you see the use of Dataframe to to return the result. Dataframe are column oriented, Each data frame has multiple fields you can read more about fields here.

In the generated example, fields are created with their type and results. A Database plugin is an example for a situation where both are created based on the returned results.

We are going to add two helper methods getTypeArray that will use the column type return from Scylla to create the fields and toValue that would be responsible for type casting.

We will use the following import:


Add the following helper methods.

func getTypeArray(typ string) interface{} {
    switch t := typ; t {
        case "timestamp":
            return []time.Time{}
        case "bigint", "int":
            return []int64{}
        case "smallint":
            return []int16{}
        case "boolean":
            return []bool{}
        case "double", "varint", "decimal":
            return []float64{}
        case "float":
            return []float32{}
        case "tinyint":
            return []int8{}
            return []string{}
func toValue(val interface{}, typ string) interface{} {
    if (val == nil) {
        return nil
    switch t := val.(type) {
        case float32, time.Time, string, int64, float64, bool, int16, int8:
            return t
        case gocql.UUID:
            return t.String()
        case int:
            return int64(t)
        case *inf.Dec:
            if s, err := strconv.ParseFloat(t.String(), 64); err == nil {
                return s
            return 0
        case *big.Int:
            if s, err := strconv.ParseFloat(t.String(), 64); err == nil {
                return s
            return 0
            r, err := json.Marshal(val)
            if (err != nil) {
                log.DefaultLogger.Info("Marshalling failed ", "err", err)
            return string(r)

Now we need to change the query method to read the results from the CQL connection.

func (td *SampleDatasource) query(ctx context.Context, query backend.DataQuery, instSetting *instanceSettings) backend.DataResponse {
    // Unmarshal the json into our queryModel
    var qm queryModel

    response := backend.DataResponse{}

    response.Error = json.Unmarshal(query.JSON, &qm)
    if response.Error != nil {
        return response
  log.DefaultLogger.Info("Getting query information", "query", qm.QueryText, "host", qm.Host)
    // Log a warning if `Format` is empty.
vif qm.Format == "" {
        log.DefaultLogger.Warn("format is empty. defaulting to time series")

    // create data frame response
    frame := data.NewFrame("response")
  iter := instSetting.session.Query(qm.QueryText).Iter()

  for _, c := range iter.Columns() {
    log.DefaultLogger.Info("Adding Column", "name", c.Name, "type", c.TypeInfo.Type().String())
    frame.Fields = append(frame.Fields,
        data.NewField(c.Name, nil, getTypeArray(c.TypeInfo.Type().String())),
  for {
      // New map each iteration
      row := make(map[string]interface{})
      if !iter.MapScan(row) {
      vals := make([]interface{}, len(iter.Columns()))
      for i, c := range iter.Columns() {
          vals[i] = toValue(row[c.Name], c.TypeInfo.Type().String())
  if err := iter.Close(); err != nil {
    // add the frames to the response
    response.Frames = append(response.Frames, frame)

    return response

The CQL results are row based, while the DataFrame is column based.

You can see that we first loop over the columns and add each of the fields without values.
We then iterate over the CQL results and use the AppendRow method to add the information to the dataFrame.

A working plugin

With all the pieces in place after compiling the code I have pointed the data source to a Scylla node. This is what it looks like:

Getting the SecureData

As mentioned earlier, the data source configuration holds a Secure part that once set, is only available to the backend.

We are going to add an optional username/password for the database connection.
In sample-plugin.go we will modify newDataSourceInstance

You can get the secure part from the setting variables.

Here we check if a user and password were set, if they are, we use them to create a CQL authenticator.

    var secureData = setting.DecryptedSecureJSONData
    password, hasPassword := secureData["password"]
    user, hasUser := secureData["user"]
    var authenticator *gocql.PasswordAuthenticator = nil
    if hasPassword && hasUser {
        log.DefaultLogger.Info("Adding authenticator for user", "user", user)
        authenticator = &gocql.PasswordAuthenticator{
            Username: user,
            Password: password,

After the CQL cluster connection was created, if an authenticator was created we use it to authenticate the connection:

    if authenticator != nil {
        newCluster.Authenticator = *authenticator

Summary and our next steps

We demonstrated building a Grafana data source with Scylla as an example. Possible next steps we are planning:

  • Pushing the new plugin to the Grafana plug-in repository
  • Update the data source to provide time-series information
  • Presented the time-series information as a graph

Your next steps

We’ve given you all the tools you need to try this yourself! If you run into any issues, make sure to bring them up with our user community in Slack.


The post Building a Grafana Backend Plugin appeared first on ScyllaDB.

Understanding the Impacts of the Native Transport Requests Change Introduced in Cassandra 3.11.5


Recently, Cassandra made changes to the Native Transport Requests (NTR) queue behaviour. Through our performance testing, we found the new NTR change to be good for clusters that have a constant load causing the NTR queue to block. Under the new mechanism the queue no longer blocks, but throttles the load based on queue size setting, which by default is 10% of the heap.

Compared to the Native Transport Requests queue length limit, this improves how Cassandra handles traffic when queue capacity is reached. The “back pressure” mechanism more gracefully handles the overloaded NTR queue, resulting in a significant lift of operations without clients timing out. In summary, clusters with later versions of Cassandra can handle more load before hitting hard limits.


At Instaclustr, we are responsible for managing the Cassandra versions that we release to the public. This involves performing a review of Cassandra release changes, followed by performance testing. In cases where major changes have been made in the behaviour of Cassandra, further research is required. So without further delay let’s introduce the change to be investigated.

  • Prevent client requests from blocking on executor task queue (CASSANDRA-15013)
Versions affected:


Native Transport Requests

Native transport requests (NTR) are any requests made via the CQL Native Protocol. CQL Native Protocol is the way the Cassandra driver communicates with the server. This includes all reads, writes, schema changes, etc. There are a limited number of threads available to process incoming requests. When all threads are in use, some requests wait in a queue (pending). If the queue fills up, some requests are silently rejected (blocked). The server never replies, so this eventually causes a client-side timeout. The main way to prevent blocked native transport requests is to throttle load, so the requests are performed over a longer period.

Prior to 3.11.5

Prior to 3.11.5, Cassandra used the following configuration settings to set the size and throughput of the queue:

  • native_transport_max_threads is used to set the maximum threads for handling requests.  Each thread pulls requests from the NTR queue.
  • cassandra.max_queued_native_transport_requests is used to set queue size. Once the queue is full the Netty threads are blocked waiting for the queue to have free space (default 128).

Once the NTR queue is full requests from all clients are not accepted. There is no strict ordering by which blocked Netty threads will process requests. Therefore in 3.11.4 latency becomes random once all Netty threads are blocked.

Native Transport Requests - Cassandra 3.11.4

Change After 3.11.5

In 3.11.5 and above, instead of blocking the NTR queue as previously described, it throttles. The NTR queue is throttled based on the heap size. The native transport requests are limited in terms of total size occupied in memory rather than the number of them. Requests are paused after the queue is full.

  • native_transport_max_concurrent_requests_in_bytes a global limit on the number of NTR requests, measured in bytes. (default heapSize / 10)
  • native_transport_max_concurrent_requests_in_bytes_per_ip an endpoint limit on the number of NTR requests, measured in bytes. (default heapSize / 40)

Maxed Queue Behaviour

From previously conducted performance testing of 3.11.4 and 3.11.6 we noticed similar behaviour when the traffic pressure has not yet reached the point of saturation in the NTR queue. In this section, we will discuss the expected behaviour when saturation does occur and breaking point is reached. 

In 3.11.4, when the queue has been maxed, client requests will be refused. For example, when trying to make a connection via cqlsh, it will yield an error, see Figure 2.

Cassandra 3.11.4 - queue maxed out, client requests refused
Figure 2: Timed out request

Or on the client that tries to run a query, you may see NoHostAvailableException

Where a 3.11.4 cluster previously got blocked NTRs, when upgraded to 3.11.6 NTRs are no longer blocked. The reason is that 3.11.6 doesn’t place a limit on the number of NTRs but rather on the size of memory of all those NTRs. Thus when the new size limit is reached, NTRs are paused. Default settings in 3.11.6 result in a much larger NTR queue in comparison to the small 128 limit in 3.11.4 (in normal situations where the payload size would not be extremely large).

Benchmarking Setup

This testing procedure requires the NTR queue on a cluster to be at max capacity with enough load to start blocking requests at a constant rate. In order to do this we used multiple test boxes to stress the cluster. This was achieved by using 12 active boxes to create multiple client connections to the test cluster. Once the cluster NTR queue is in constant contention, we monitored the performance using:

  • Client metrics: requests per second, latency from client perspective
  • NTR Queue metrics: Active Tasks, Pending Tasks, Currently Blocked Tasks, and Paused Connections.

For testing purposes we used two testing clusters with details provided in the table below:

Cassandra Cluster size Instance Type Cores RAM Disk
3.11.4 3 M5xl-1600-v2  4 16GB 1600 GB
3.11.6 3 m5xl-1600-v2 4 16GB 1600 GB
Table 1: Cluster Details

To simplify the setup we disabled encryption and authentication. Multiple test instances were set up in the same region as the clusters. For testing purposes we used 12 KB blob payloads. To give each cluster node a balanced mixed load, we kept the number of test boxes generating write load equal to the number of instances generating read load. We ran the load against the cluster for 10 mins to temporarily saturate the queue with read and write requests and cause contention for the Netty threads.

Our test script used cassandra-stress for generating the load, you can also refer to Deep Diving cassandra-stress – Part 3 (Using YAML Profiles) for more information.

In the stressSpec.yaml, we used the following table definition and queries:

table_definition: |
 CREATE TABLE typestest (
       name text,
       choice boolean,
       date timestamp,
       address inet,
       dbl double,
       lval bigint,
               ival int,
       uid timeuuid,
       value blob,
       PRIMARY KEY((name,choice), date, address, dbl, lval, ival, uid)
 ) WITH compaction = { 'class':'LeveledCompactionStrategy' }
   AND comment='A table of many types to test wide rows'
 - name: name
   size: fixed(48)
   population: uniform(1..1000000000) # the range of unique values to select for the field 
 - name: date
   cluster: uniform(20..1000)
 - name: lval
   population: gaussian(1..1000)
   cluster: uniform(1..4)
 - name: value
   size: fixed(12000)
 partitions: fixed(1)       # number of unique partitions to update in a single operation
                                 # if batchcount > 1, multiple batches will be used but all partitions will
                                 # occur in all batches (unless they finish early); only the row counts will vary
 batchtype: UNLOGGED               # type of batch to use
 select: uniform(1..10)/10       # uniform chance any single generated CQL row will be visited in a partition;
                                 # generated for each partition independently, each time we visit it
# List of queries to run against the schema
     cql: select * from typestest where name = ? and choice = ? LIMIT 1
     fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
     cql: select name, choice, uid  from typestest where name = ? and choice = ? and date >= ? LIMIT 10
     fields: multirow            # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
     cql: select name, choice, uid from typestest where name = ? and choice = ? LIMIT 1
     fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)

Write loads were generated with:

cassandra-stress user no-warmup 'ops(insert=10)' profile=stressSpec.yaml cl=QUORUM duration=10m -mode native cql3 maxPending=32768 connectionsPerHost=40 -rate threads=2048 -node file=node_list.txt

Read loads were generated by changing ops to



3.11.4 Queue Saturation Test

The active NTR queue reached max capacity (at 128) and remained in contention under load. Pending NTR tasks remained above 128 throughout. At this point, timeouts were occurring when running 12 load instances to stress the cluster. Each node had 2 load instances performing reads and another 2 performing writes. 4 of the read load instances constantly logged NoHostAvailableExceptions as shown in the example below.

ERROR 04:26:42,542 [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: (com.datastax.driver.core.exceptions.OperationTimedOutException: [] Timed out waiting for server response), (com.datastax.driver.core.exceptions.OperationTimedOutException: [] Timed out waiting for server response), (com.datastax.driver.core.exceptions.OperationTimedOutException: [] Timed out waiting for server response))

The client results we got from this stress run are shown in Table 2.

Box Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
1 700.00 2,862.20 2,078.30 7,977.60 11,291.10 19,495.10 34,426.80
2 651.00 3,054.50 2,319.50 8,048.90 11,525.90 19,528.70 32,950.50
3 620.00 3,200.90 2,426.40 8,409.60 12,599.70 20,367.50 34,158.40
4 607.00 3,312.80 2,621.40 8,304.70 11,769.20 19,730.00 31,977.40
5 568.00 3,529.80 3,011.50 8,216.60 11,618.20 19,260.20 32,698.80
6 553.00 3,627.10 3,028.30 8,631.90 12,918.50 20,115.90 34,292.60
Writes 3,699.00 3,264.55 2,580.90 8,264.88 11,953.77 19,749.57 34,426.80
7 469.00 4,296.50 3,839.90 9,101.60 14,831.10 21,290.30 35,634.80
8 484.00 4,221.50 3,808.40 8,925.50 11,760.80 20,468.20 34,863.10
9 Crashed due to time out
10 Crashed due to time out
11 Crashed due to time out
12 Crashed due to time out
Reads 953.00 4,259.00 3,824.15 9,092.80 14,800.40 21,289.48 35,634.80
Summary 4,652.00 3,761.78 3,202.53 8,678.84 13,377.08 20,519.52 35,634.80
Table 2: 3.11.4 Mixed Load Saturating The NTR Queue

* To calculate the total write operations, we summed the values from 6 instances. For max write latency we used the max value from all instances and for the rest of latency values, we calculated the average of results. Write results are summarised in the Table 2 “Write” row. For the read result we did the same, and results are recorded in the “Read” row. The last row in the table summarises the results in “Write” and “Read” rows.

The 6 write load instances finished normally, but the read instances struggled. Only 2 of the read load instances were able to send traffic through normally, the other clients received too many timeout errors causing them to crash. Another observation we have made is that the Cassandra timeout metrics, under client-request-metrics, did not capture any of the client timeout we have observed.

Same Load on 3.11.6

Next, we proceeded to test 3.11.6 with the same load. Using the default NTR settings, all test instances were able to finish the stress test successfully.

Box Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
1 677.00 2,992.60 2,715.80 7,868.50 9,303.00 9,957.30 10,510.90
2 658.00 3,080.20 2,770.30 7,918.80 9,319.70 10,116.70 10,510.90
3 653.00 3,102.80 2,785.00 7,939.80 9,353.30 10,116.70 10,510.90
4 608.00 3,340.90 3,028.30 8,057.30 9,386.90 10,192.20 10,502.50
5 639.00 3,178.30 2,868.90 7,994.30 9,370.10 10,116.70 10,510.90
6 650.00 3,120.50 2,799.70 7,952.40 9,353.30 10,116.70 10,510.90
Writes 3,885.00 3,135.88 2,828.00 7,955.18 9,347.72 10,102.72 10,510.90
7 755.00 2,677.70 2,468.30 7,923.00 9,378.50 9,982.40 10,762.60
8 640.00 3,160.70 2,812.30 8,132.80 9,529.50 10,418.70 11,031.00
9 592.00 3,427.60 3,101.70 8,262.80 9,579.80 10,452.20 11,005.90
10 583.00 3,483.00 3,160.40 8,279.60 9,579.80 10,435.40 11,022.60
11 582.00 3,503.60 3,181.40 8,287.90 9,588.20 10,469.00 11,047.80
12 582.00 3,506.70 3,181.40 8,279.60 9,588.20 10,460.60 11,014.20
Reads 3,734.00 3,293.22 2,984.25 8,194.28 9,540.67 10,369.72 11,047.80
Summary 7,619.00 3,214.55 2,906.13 8,074.73 9,444.19 10,236.22 11,047.80
Table 3: 3.11.6 Mixed Load

Default Native Transport Requests (NTR) Setting Comparison

Taking the summary row from both versions (Table 2 and Table 3), we produced Table 4.

Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
3.11.4 4652 3761.775 3202.525 8678.839167 13377.08183 20519.52228 35634.8
3.11.6 7619 3214.55 2906.125 8074.733333 9444.191667 10236.21667 11047.8
Table 4: Mixed Load 3.11.4 vs 3.11.6

Figure 2: Latency 3.11.4 vs 3.11.6

Figure 2 shows the latencies from Table 4. From the results, 3.11.6 had slightly better average latency than 3.11.4. Furthermore, in the worst case where contention is high, 3.11.6 handled the latency of a request better than 3.11.4. This is shown by the difference in Latency Max. Not only did 3.11.6 have lower latency but it was able to process many more requests due to not having a blocked queue.

3.11.6 Queue Saturation Test

The default native_transport_max_concurrent_requests_in_bytes is set to 1/10 of the heap size. The Cassandra max heap size of our cluster is 8 GB, so the default queue size for our queue is 0.8 GB. This turns out to be too large for this cluster size, as this configuration will run into CPU and other bottlenecks before we hit NTR saturation.

So we took the reverse approach to investigate full queue behaviour, which is setting the queue size to a lower number. In cassandra.yaml, we added:

native_transport_max_concurrent_requests_in_bytes: 1000000

This means we set the global queue size to be throttled at 1MB. Once Cassandra was restarted and all nodes were online with the new settings, we ran the same mixed load on this cluster, the results we got are shown in Table 5.

3.11.6 Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
Write: Default setting 3,885.00 3,135.88 2,828.00 7,955.18 9,347.72 10,102.72 10,510.90
Write: 1MB setting 2,105.00 5,749.13 3,471.82 16,924.02 26,172.45 29,681.68 31,105.00
Read: Default setting 3,734.00 3,293.22 2,984.25 8,194.28 9,540.67 10,369.72 11,047.80
Read: 1MB setting 5,395.00 2,263.13 1,864.55 5,176.47 8,074.73 9,693.03 15,183.40
Summary: Default setting 7,619.00 3,214.55 2,906.13 8,074.73 9,444.19 10,236.22 11,047.80
Summary: 1MB setting 7,500.00 4,006.13 2,668.18 11,050.24 17,123.59 19,687.36 31,105.00

Table 5: 3.11.6 native_transport_max_concurrent_requests_in_bytes default and 1MB setting 

During the test, we observed a lot of paused connections and discarded requests—see Figure 3. For a full list of Instaclustr exposed metrics see our support documentation.

NTR Test - Paused Connections and Discarded Requests
Figure 3: 3.11.6 Paused Connections and Discarded Requests

After setting native_transport_max_concurrent_requests_in_bytes to a lower number, we start to get paused connections and discarded requests, write latency increased resulting in fewer processed operations, shown in Table 5. The increased write latency is illustrated Figure 4.

Cassandra 3.11.6 Write Latency Under Different Settings
Figure 4: 3.11.6 Write Latency Under Different Settings

On the other hand, read latency decreased, see Figure 5, resulting in a higher number of operations being processed.

Cassandra 3.11.6 Read Latency Under Different Settings
Figure 5: 3.11.6 Read Latency Under Different Settings
Cassandra 3.11.6 Operations Rate Under Different Settings
Figure 6: 3.11.6 Operations Rate Under Different Settings

As illustrated in Figure 6, the total number of operations decreased slightly with the 1MB setting, but the difference is very small and the effect of read and write almost “cancel each other out”. However, when we look at each type of operation individually, we can see that rather than getting equal share of the channel in a default setting of “almost unlimited queue”, the lower queue size penalizes writes and favors read. While our testing identified this outcome, further investigation will be required to determine exactly why this is the case.


In conclusion, the new NTR change offers an improvement over the previous NTR queue behaviour. Through our performance testing we found the change to be good for clusters that have a constant load causing the NTR queue to block. Under the new mechanism the queue no longer blocks, but throttles the load based on the amount of memory allocated to requests.

The results from testing indicated that the changed queue behaviour reduced latency and provided a significant lift in the number of operations without clients timing out. Clusters with our latest version of Cassandra can handle more load before hitting hard limits. For more information feel free to comment below or reach out to our Support team to learn more about changes to 3.11.6 or any of our other supported Cassandra versions.

The post Understanding the Impacts of the Native Transport Requests Change Introduced in Cassandra 3.11.5 appeared first on Instaclustr.