Renaming and reshaping Scylla tables using scylla-migrator

We have recently faced a problem where some of the first Scylla tables we created on our main production cluster were not in line any more with the evolved schemas that recent tables are using.

This typical engineering problem requires either to keep those legacy tables and data queries or to migrate it to the more optimal model with the bandwagon of applications to be modified to query the data the new way… That’s something nobody likes doing but hey, we don’t like legacy at Numberly so let’s kill that one!

To overcome this challenge we used the scylla-migrator project and I thought it could be useful to share this experience.

How and why our schema evolved

When we first approached ID matching tables we chose to answer two problems at the same time: query the most recent data and keep the history of the changes per source ID.

This means that those tables included a date as part of their PRIMARY KEY while the partition key was obviously the matching table ID we wanted to lookup from:

CREATE TABLE IF NOT EXISTS ids_by_partnerid(
partnerid text,
id text,
date timestamp,
PRIMARY KEY ((partnerid), date, id)

Making a table with an ever changing date in the clustering key creates what we call a history table. In the schema above the uniqueness of a row is not only defined by a partner_id / id couple but also by its date!

Quick caveat: you have to be careful about the actual date timestamp resolution since you may not want to create a row for every second of the same partner_id / id couple (we use an hour resolution).

History tables are good for analytics and we also figured we could use them for batch and real time queries where we would be interested in the “most recent ids for the given partner_id” (sometimes flavored with a LIMIT):

SELECT id FROM ids_by_partnerid WHERE partner_id = "AXZAZLKDJ" ORDER BY date DESC;

As time passed, real time Kafka pipelines started to query these tables hard and were mostly interested in “all the ids known for the given partner_id“.

A sort of DISTINCT(id) is out of the scope of our table! For this we need a table schema that represents a condensed view of the data. We call them compact tables and the only difference with the history table is that the date timestamp is simply not part of the PRIMARY KEY:

CREATE TABLE IF NOT EXISTS ids_by_partnerid(
partnerid text,
id text,
seen_date timestamp,
PRIMARY KEY ((partnerid), id)

To make that transition happen we thus wanted to:

  • rename history tables with an _history suffix so that they are clearly identified as such
  • get a compacted version of the tables (by keeping their old name) while renaming the date column name to seen_date
  • do it as fast as possible since we will need to stop our feeding pipeline and most of our applications during the process…

STOP: it’s not possible to rename a table in CQL!

Scylla-migrator to the rescue

We decided to abuse the scylla-migrator to perform this perilous migration.

As it was originally designed to help users migrate from Cassandra to Scylla by leveraging Spark it seemed like a good fit for the task since we happen to own Spark clusters powered by Hadoop YARN.

Building scylla-migrator for Spark < 2.4

Recent scylla-migrator does not support older Spark versions. The trick is to look at the git log and checkout the hopefully right commit that supports your Spark cluster version.

In our case for Spark 2.3 we used git commit bc82a57e4134452f19a11cd127bd4c6a25f75020.

On Gentoo, make sure to use dev-java/sbt-bin since the non binary version is vastly out of date and won’t build the project. You need at least version 1.3.

The scylla-migrator plan

The documentation explains that we need a config file that points to a source cluster+table and a destination cluster+table as long as they have the same schema structure…

Renaming is then as simple as duplicating the schema using CQLSH and running the migrator!

But what about our compacted version of our original table? The schema is different from the source table!…

Good news is that as long as all your columns remain present, you can also change the PRIMARY KEY of your destination table and it will still work!

This make the scylla-migrator an amazing tool to reshape or pivot tables!

  • the column date is renamed to seen_date: that’s okay, scylla-migrator supports column renaming (it’s a Spark dataframe after all)!
  • the PRIMARY KEY is different in the compacted table since we removed the ‘date‘ from the clustering columns: we’ll get a compacted table for free!

Using scylla-migrator

The documentation is a bit poor on how to submit your application to a Hadoop YARN cluster but that’s kind of expected.

It also did not mention how to connect to a SSL enabled cluster (are there people really not using SSL on the wire in their production environment?)… anyway let’s not start a flame war 🙂

The trick that will save you is to know that you can append all the usual Spark options that are available in the spark-cassandra-connector!

Submitting to a Kerberos protected Hadoop YARN cluster targeting a SSL enabled Scylla cluster then looks like this:

export JAR_NAME=target/scala-2.11/scylla-migrator-assembly-0.0.1.jar

spark2-submit \
 --name ScyllaMigratorApplication \
 --class com.scylladb.migrator.Migrator  \
 --conf spark.cassandra.connection.ssl.clientAuth.enabled=True  \
 --conf spark.cassandra.connection.ssl.enabled=True  \
 --conf spark.cassandra.connection.ssl.trustStore.path=jssecacerts  \
 --conf spark.cassandra.connection.ssl.trustStore.password=JKS_PASSWORD  \
 --conf spark.cassandra.input.consistency.level=LOCAL_QUORUM \
 --conf spark.cassandra.output.consistency.level=LOCAL_QUORUM \
 --conf spark.scylla.config=config.yaml \
 --conf spark.yarn.executor.memoryOverhead=1g \
 --conf spark.blacklist.enabled=true  \
 --conf spark.blacklist.task.maxTaskAttemptsPerExecutor=1  \
 --conf spark.blacklist.task.maxTaskAttemptsPerNode=1  \
 --conf spark.blacklist.stage.maxFailedTasksPerExecutor=1  \
 --conf spark.blacklist.stage.maxFailedExecutorsPerNode=1  \
 --conf spark.executor.cores=16 \
 --deploy-mode client \
 --files jssecacerts \
 --jars ${JAR_NAME}  \
 --keytab ${KRB_PRINCIPAL}.keytab  \
 --master yarn \
 --principal ${KRB_PRINCIPAL}  \

Note that we chose to apply a higher consistency level to our reads using a LOCAL_QUORUM instead of the default LOCAL_ONE. I strongly encourage you to do the same since it’s appropriate when you’re using this kind of tool!

Column renaming is simply expressed in the configuration file like this:

# Column renaming configuration.
  - from: date
    to: seen_date

Tuning scylla-migrator

While easy to use, tuning scylla-migrator to operate those migrations as fast as possible turned out to be a real challenge (remember we have some production applications shut down during the process).

Even using 300+ Spark executors I couldn’t get my Scylla cluster utilization to more than 50% and migrating a single table with a bit more than 1B rows took almost 2 hours…

We found the best knobs to play with thanks to the help of Lubos Kosco and this blog post from ScyllaDB:

  • Increase the splitCount setting: more splits means more Spark executors will be spawned and more tasks out of it. While it might be magic on a pure Spark deployment it’s not that amazing on a Hadoop YARN one where executors are scheduled in containers with 1 core by default. We simply moved it from 256 to 384.
  • Disable compaction on destination tables schemas. This gave us a big boost and saved the day since it avoids adding the overhead of compacting while you’re pushing down data hard!

To disable compaction on a table simply:

ALTER TABLE ids_by_partnerid_history WITH compaction = {'class': 'NullCompactionStrategy'};

Remember to run a manual compaction (nodetool compact <keyspace> <table>) and to enable compaction back on your tables once you’re done!

Happy Scylla tables mangling!

Submit Today for the 2020 Scylla User Awards!

We love to see the amazing things our users are doing with Scylla. Our Scylla User Awards will honor accomplishments and contributions across ten categories this year.

What stands out about your use case? Doing something impressive with Scylla Cloud? Using Scylla as a back-end to a huge graph database? Did you overcome limitations with your previous database? Perhaps you’re using Scylla and Kubernetes?

Enter to win the 2020 Scylla User Awards

Submit your entry and you’ll get something for yourself and something to help others! Your valid nomination will get you a limited-edition Scylla hoodie in your size, plus you get to pick a charitable organization that we’ll donate to. Winners will be announced January 12th at our Scylla Summit.

Nominations will be accepted until December 4, 2020.

2020 Scylla User Award Categories

  • Most Innovative Use of Scylla: Got a use case that pushes the bounds of what’s possible? What ground-breaking Big Data driven app did you create? What sets it apart from the others? Tell us everything you can about your system and how you’re using Scylla.
  • Greatest Node Reduction with Scylla: Did you significantly reduce the number of database nodes you need when you migrated to Scylla? Details please! Tell us the numbers, what you migrated from and all about your migration.
  • Best Scylla Cloud Use Case: Tell us how you’re using Scylla Cloud. Why did you choose our DBaaS and how has your experience been? What benefits did your team get back by going with a managed NoSQL solution? Most interesting use case will win in this category.
  • Best Real-Time Use Case: Are you pushing the envelope of real-time? How low are your latencies? Do you have stringent SLAs you have to meet? What’s your rate of ingestion and throughput? How big of a firehose are you streaming data from? Tell us all about your real-time use case to nominate yourself in this category.
  • Best Example of Overcoming NoSQL Limits: If you migrated to Scylla from another NoSQL database, there was certainly a reason to do so. What limitations were holding you back with your previous NoSQL database? Node size? Throughput? Latency issues? Expense? Tell us all about how you got past those limits by moving to Scylla.
  • Best Use of Scylla’s DynamoDB-compatible API: Our Alternator project makes Scylla a drop-in replacement for DynamoDB. Tell us how you used Scylla with our DynamoDB API to either replace or extend your use of Amazon DynamoDB. How was the migration? What benefits did you get from it?
  • Best Use of Scylla with Kubernetes: Using Scylla and Kubernetes? Are you using the Scylla Kubernetes Operator or did you implement your own? How has utilizing Kubernetes impacted your DevOps? Tell us all about it, including as many details as you can.
  • Best Use of Scylla with a Graph Database: Scylla is an excellent back-end to a graph database such as JanusGraph. Do you dream vividly about Gremlin and TinkerPop? Do you chew GraphQL for breakfast? Tell us about your use of Scylla and a graph database. How big is your graph in terms of edges, vertices, and raw terabytes. Give us the numbers, the results and why you chose Scylla in the first place.
  • Scylla Community Member of the Year: How did you contribute to the Scylla project or help the Scylla community? Tell us how active you are on our Slack channel, how many bugs you opened on GitHub, the meetups you’ve hosted, the blogs you’ve written, the open source code you’ve committed, or whatever else you believe helped foster our community to enter for this category.
  • Scylla Humanitarian of the Year Award: Is your company making the world a better place? Whether your company is for-profit or non-profit, tell us how you’re helping the common good with your Scylla-backed systems.

You can nominate yourself in as many categories as you like. We look forward to seeing all the great things you’re doing with Scylla!


The post Submit Today for the 2020 Scylla User Awards! appeared first on ScyllaDB.

How to Keep Pirates “Hackers” Away From Your Booty “Data” with Cassandra RBAC

How to Test and Benchmark Database Clusters

In our recent webinar VP of Solutions Eyal Gutkind and Solutions Architect Moreno Garcia presented on how users perform benchmarks and real-life workload simulations in their environments. Their roles require them to work closely with customers in early adoption — especially during Proof-of-Concepts (POC) and entering into production. They also have a wealth of experience in provisioning and sizing, as well as various migration strategies.

Eyal encouraged users to do benchmarking early in the inception of a project. “Benchmarking will help you fail fast and recover fast before it’s too late.”

Teams should always consider benchmarking as part of the acceptance procedure for any application. What is the overall user experience that you want to bake into the application itself? Ensure that you can support scaling your customer base without compromising user experience or the actual functionality of the app. Users should also consider their disaster recovery targets. Eyal suggested to ask yourself “How much infrastructure overhead am I willing to purchase in order to maintain the availability of my application in dire times?” And, of course, to think about the database’s impacts on the day-to-day lives of your DBAs.

1. Develop Tangible Targets

Eyal set forth the following considerations for your team to set tangible targets:

  • What is the Use Case? — this should determine what database technology you are benchmarking. You have to choose databases that have dominant features to address the needs of your use case.
  • Timeline for testing & deployment — Your timeline should drive your plans. What risks and delays may you be facing?
  • MVP, medium, and long-term application scale — many times people just look at the MVP without considering the application lifecycle. “You’re not going to need 1,000 nodes on day one,” but not thinking ahead can lead to dead-end development. “Have a good estimate on the scaling that you need to support.”
  • Budget for your database — both for your POC and your production needs. Overall application costs, including servers, software licenses, and manpower. Consider how some technologies can start out cheap, but do not scale linearly.

2. Create a Testing Schedule

“Scheduling is more than just install the servers and let’s go for it.” Eyal emphasized that your timeline needs to take into consideration getting consensus from the different stakeholders. Bringing them into the process. Deploying a service or servers might be as easy as the click of a button. But that doesn’t take into account any bureaucratic procedures you may need to bake into your schedule.

“Bake into your schedule the processes needed…. Also make sure you have the right resources to help you evaluate the system. Developers, DBAs, and systems administrators are limited resources in any organization. Make sure you schedule your testing with their availability in mind.”

To help organize your POC schedule, Eyal presented an example project plan. You can use this checklist for your own projects:


This is the checklist that every Scylla user should follow in every proof of concept conducted. Eyal noted, “We’re taking into consideration many of the aspects that you sometimes do not think are part of the benchmarking, but does take a lot of resources and a lot of time from your perspective and from your team.”

3. Understand Data Patterns of Your Application

Data modeling itself can lead to many horror stories with benchmarking. Eyal cited an example of a user who tried to force a blob of one, two, or even five gigabytes into a single cell. Eyal suggested to keep only the metadata in the database, and put the actual object in storage outside of the database. Not only will this make better sense operationally, but also financially.

“Databases are optimized for certain data models and access patterns from your application.” Eyal suggested making sure that the application data can be easily stored and accessed. Also that the developers you have for the POC are trained on data modeling for the target database.

(Reminder, Scylla offers a course in Scylla University on both beginning and advanced data modeling. We can also arrange live training for your team if you wish. We also invite you to register for Scylla Summit 2021, where we will have free online live training!)

Also, realize that not all benchmarking stressing tools have data modeling capabilities in them. Eyal suggested going beyond generic tests. If you have your own data model requirement and can benchmark your application with it, “That’s the best case.” He suggested looking at the synthetic tools to see if they were capable of data model modifications.

4. Benchmarking Hardware

Eyal also noted that, while it is understandable, many benchmarks fail because of underpowered testing environments; using hardware systems too small to provide an adequate test experience. Especially compared to what their production systems would be like. “Too many times we see evaluators trying to squeeze the life out of a dime.” These underpowered test systems can highly but unfairly bias your results. “Think about the amount of CPUs you’re going to give to the system.” To see a system’s benefits, and for a fair performance evaluation, “Don’t run it on a laptop.”

5. Stress Test Tools & Infrastructure

Make sure you have dedicated servers as stressing nodes executing the stressing tools like cassandra-stress and Yahoo Cloud Server Benchmark (YCSB). Because if you run the stress tools on the same server as the database, it can cause resource contentions and collisions. It also helps set up the test as realistically as possible.

In the same vein, use a realistic amount of data, to make sure you’re pushing and creating enough unique data. “There’s a huge difference in performance if you have only 10 gigabytes of data in the server or 10 terabytes. This is not an apples to apples comparison.” (We’ve been pointing out for many years how other databases do not scale linearly in performance.)

Eyal also suggested you look at the defaults that various stress tests use, and modify them for your use case where and if possible. “Let’s say cassandra-stress is doing a 300 byte partition key. That might not be representative of your actual workload. So you want to change that. You want to change how many columns are there, so on and so forth. Make sure that your stressing tool is representing your actual workload.”

6. Disaster Recovery Testing

“You need to test your ability to sustain regular life events. Nodes will crash. Disks will corrupt. And network cables will be disconnected. That will happen for sure, and I will tell you it always happens during peak times. It’s going to be on your Black Friday. It’s going to be during the middle of your game.” Things happen at exactly the wrong time. So you need to take disaster into account and test capacity planning with reduced nodes, a network partition, or other undesired events. This has the added benefit of teaching you about the true capabilities of resiliency of a system.

“The challenging task of coming alive back from a backup must be tested.” So you should also spend a little bit more time (and money) by testing a backup and restore process to see how fast and easy your restoration will be, and that restoration works properly.

7. Observability

Eyal urged the audience to ensure they understand the capabilities of their observability. “A powerful monitoring system is required for you to understand the database and system capabilities.” This allows you to understand the performance of your data model and your application efficiency. You can also discover data patterns.

Demonstration of nosqlbench

After this, Eyal turned the presentation over to Moreno Garcia, who took the audience through a demo of the nosqlbench stressing tool. You can watch it starting around the 18 minute mark in our on-demand webinar.


From POC to Production

Also, once you have finished with your POC you aren’t done yet! For the next steps of getting into production, you will need a checklist to make sure you are ready. Handily we prepared that for you!

If you have further questions about getting ready for your POC or your move to production, we’d welcome to hear your questions in our Slack channel, or feel free to reach out to contact us privately.


The post How to Test and Benchmark Database Clusters appeared first on ScyllaDB.

Scylla Open Source Release 4.2

The Scylla team is pleased to announce the release of Scylla Open Source 4.2, a production-ready release of our open source NoSQL database.

Scylla is an open source, NoSQL database with superior performance and consistently low latencies. Find the Scylla Open Source 4.2 repository for your Linux distribution here. Scylla 4.2 Docker is also available.

Scylla 4.2 includes performance and stability improvements, more operations to the Alternator API (our Amazon DynamoDB-compatible interface) and bug fixes. The release also improves query performance by adding a binary search to SSTable promoted indexes (see below).

Please note that only the last two minor releases of the Scylla Open Source project are supported. Starting today, only Scylla Open Source 4.2 and Scylla 4.1 are supported, and Scylla 4.0 is retired.

Related Links

New features in Scylla 4.2

Performance: Enable binary search in SSTable promoted index

The “promoted index” is a clustering key index within a given partition stored as part of the SSTable index file structure. The primary purpose is to improve the efficiency of column range lookup in large, with many rows, partitions.

Before Scylla 4.2, lookups in the promoted index are done by linearly scanning the index (the lookup is O(N)). However, for large partitions, this approach is costly and inefficient. It consumes a lot of CPU and I/O and does not deliver efficient performance.

Starting from this release, the reader performs a binary search on the MC SSTable promoted index (the search time is now O(log N)).

For example, testing performed with large partitions consisted of 10,000,000 rows(a value size of 2000, partition size of 20 GB and an index size of 7 MB) demonstrates that the new release is 12x faster, CPU utilization is 10x times smaller, disk I/O utilization is 20x less.

The following charts present the differences in search time and disk utilization for two cases:

  • Middle: Queried row is in the middle of a (large) partition
  • Head: Queried row is at the head, as the first element of a (large) partition

As expected, the binary search shines in the middle case and has similar results in the (unlikely) case of searching for the first element.

More details on the results here #4007


  • New Alternator’s SSL options (Alternator Client to Node Encryption on Transit)
    Till this release, Alternator SSL configuration was set in the section server_encryption_options of Scylla configuration file. The name was misleading, as the section was also used for the unrelated node-to-node encryption on transit.

Starting from this release, a new section alternator_encryption_options is used to define Alternator SSL.
The old section is supported for backward compatibility, but it is highly recommended to create the new section in scylla.yaml with the relevant parameters, for example

certificate: conf/scylla.crt
keyfile: conf/scylla.key
priority_string: use default


  • docker: add an option to start Alternator with HTTPS. When using Scylla Docker, you can now use --alternator-https-port in addition to the existing --alternator-port. #6583
  • Implement FilterExpression
    Adding FilterExpression – a newer syntax for filtering results of Query and Scan requests #5038Example usage in query API:
    aws dynamodb query \
    --table-name Thread \
    --key-condition-expression "ForumName = :fn and Subject = :sub" \
    --filter-expression "#v >= :num" \
    --expression-attribute-names '{"#v": "Views"}' \
    --expression-attribute-values file://values.json

  • Alternator: support additional filtering operators #5028
    • All operators are now supported. Previously, only the “EQ” operator was implemented.
    • Either “OR” or “AND” of conditions (previously only “AND”).
    • Correctly returning Count and ScannedCount for post-filter and pre-filter item counts, respectively.

Additional Features

  • SSTable Reshard and Reshape in upload to directory
    Reshard and Reshape are two transformation on SSTables:
    • Reshard – Splitting a SSTable, that is owned by more than one shard (core), into SSTables that are owned by a single shard, for example, when restoring data from a different server, importing sstSSTables from Apache Cassandra, or changing the number of cores in a machine (upscale)
    • Reshape – Rewrite a set of SSTable to to satisfy a compaction strategy’s criteria., for example, restoring data from an old backup, before the strategy update.

Until this release, Scylla first moved files from upload to to data directory, and then reshaped and reshard the data. In some cases, this results in a too high number of files in the data directory.

Starting from Scylla 4.2, Scylla first reshard and reshape the SSTables in the upload dir, and only then move them to data directory. This ensures SSTables already match the number of cores and compaction strategy when moved to the data directory, resulting in a smaller number of overall compactions.

SSTtable Reshard and Reshape also happens on boot (if needed) prior to making the node online – making sure the node doesn’t have a lot of backlog work to do in addition to serving traffic.

More on this in commits

  • Setup: swapfile setup is now part of scylla_setup.
    Scylla setup already warns users when the swap is not set. Now it prompts for the swap file directory #6539
  • Setup: default root disk size is increased from 10G to 30GB. scylla-machine-image #48

Experimental features in Scylla 4.2

  • Change Data Capture (CDC). While functionally complete, we are still testing CDC to validate it is production-ready towards GA in a following Scylla 4.x release. No API updates are expected. Refer to the Scylla Open Source 4.1 Release Notes for more information.
    • There’s a new nodetool command #6498
    • CDC is explicitly disabled on counters #6553
    • Preimage is no longer created for missing rows #7119
    • Postimage is no longer created for deleted rows #7120
    • CDC tables were renamed #6577

For Metics updates from 4.1 to 4.2, see here.
Scylla 4.2 dashboard isare available in the latest Scylla Monitoring Stack release 3.2.4 or later.

Other bugs fix and updates in the release

  • CQL: Impossible WHERE condition returns a non-empty slice #5799
  • Stability: reactor stalled during repair abort #6190
  • Stability: a rare race condition between topology change (like adding a node) and MV insert may cause an “seastar::broken_promise (broken promise)” error message, although there is no real error #5459
  • UX: scylla-housekeeping, a service which checks for the latest Scylla version, return a “cannot serialize” error instead of propagating the actual error #6690
  • UX: scylla_setup: on RAID disk prompt, typing the same disk twice cause traceback #6711
    Stability: Counter write read-before-write is issued with no timeout, which may lead to unbounded internal concurrency if the enclosing write operation timed out. #5069
    replace_first_boot_test and replace_stopped_node_test fail in 4.1: Replace failed to stream #6728
  • Setup: scylla_swap_setup: swap size become 0 when memory size less than 1GB #6659
  • Stability: Scylla freezes when casting a decimal with a negative scale to a float #6720
  • Log: Compaction data reduction log message misses % sign #6727
  • Stability: Prevent possible deadlocks when repair from two or more nodes in parallel #6272
  • Setup: Sscylla fails to identify recognize swap #6650
  • Setup: scylla_setup: systemctl shows up warning after scylla_setup finished #6674
    CDC: cdc: report unexpected exceptions using “error” level in generation management code #6557
  • Storage: SSTable upgrade has a 2x disk space requirement #6682
    Stability: Compaction Backlog can be calculated incorrectly #6021
  • UX: cql transport report a broken pipe error to the log when a client closes its side of a connection abruptly #5661
  • Nodetool: for a multi DC cluster using a non- network topology strategy, Nodetool decommission can not find a new node in local dc despite the node exist #6564
  • Monitoring: Use higher resolution histograms gets more accurate values #5815 #4746
  • Scylla will now ignore a keyspace that is removed while being repaired, or without tables, rather than failing the repair operation. #5942 #6022
  • Monitoring new compaction_manager backlog metrics
  • REST API storage_service/ownership/ (get_effective_ownership) can cause stalls #6380
  • Performance: over-aligned SSTable reads due to file buffer tracking #6290
  • Wrong order of initialization in messaging service constructor #6628
  • Stream ended with unclosed partition and coredump #6478
  • CDC: failed to restore cdc log table from snapshot for postimage enabled only with refresh operation #6561
  • CDC: wrong value for set in cdc_log table after second insert #6084
  • CDC: reactor stalled up to 17000 ms after change cdc log table property #6098
  • Stability: Classify queries based on initiator, instead of target table #5919
  • Setup: scylla_raid_setup cause traceback on CentOS7 #6954
  • CQL: min()/max() return wrong results on some collections and User Defined Type, naively comparing just the byte representations of arguments. #6768
  • CQL: Impossible to set list element to null using scylla_timeuuid_index #6828
  • Setup: Using Systemd slices on CentOS7, with percentage-based parameters does not work and generate warnings and error messages during instance (ami) load #6783
  • Stability: Commitlog replay segfaults when mutations use old schema and memtable space is full #6953
  • Setup: scylla_setup on Ubuntu 16 / GCE fails #6636
  • Stability: a race condition when a node fails while it is both streaming data (as part as repair) and is decommissioned. #6868 #6853
  • Stability: index reader fails to handle failure properly, which may lead to unexpected exit #6924
  • Stability: a rare deadlock when creating staging sstable, for example when repairing a table which has materialized views #6892 #6603
  • Stability: Fix issues introduced by classifying queries based on initiator, instead of target table (#5919), introduced in 4.2 #6907 #6908 #6613
  • Stability: TWCS compaction: partition estimate can become 0, causing an assert in sstables::prepare_summary() #6913
  • Stability: Post-repair view updates keep too many SSTable readers open #6707
  • Stability: Staging SSTables are incorrectly removed or added to backlog after ALTER TABLE / TRUNCATE #6798
  • Stability: Fix schema integrity issues that can arise in races involving materialized views #7420 #7480
  • Install: python scripts core dump with non-default locale #7408
  • Stability: Possible read resources leak #7256
  • Stability: materialized view builder:  Scylla stucks on stopping #7077
  • LWT: Aborted reads of writes which fail with “Not enough replicas available” #7258
  • Stability: “No query parameter set for a session requesting a slow_query_log record” error while tracing slow batch queries #5843
  • Stability: appending_hash<row> ignores cells after first null #4567
  • Stability: Race in schema version recalculation leading to stale schema version in gossip, and warning that schema versions differ on nodes after scylla was restarted with require resharding #7291
  • CQL: min()/max() return wrong results on some collections and User Defined Type, naively  comparing just the byte representations of arguments.  #6768
  • CQL: Impossible to set list element to null using scylla_timeuuid_index #6828
  • Stability: Unnecessary LCS Reshape compaction after startup due to overly sensitive SSTable overlap ratio #6938
  • AWS:  update enhanced networking supported instance list #6991
  • Stability: large reactor stalls when recalculating TLS diffie-helman parameters #6191
  • Stability: a bug introduced in 4.2 may cause unexpected exit for expire timers #7117
  • Stability: too aggressive TWCS compactions after upgrading #6928
  • redis: EXISTS command with multiple keys causes SEGV #6469
  • Performance: removing a potential stall on sstable clean up #6662 #6730
  • Stability: Status of drained node after restart stayed as DN in the output of “nodetool status” on another node #7032
  • Alternator: ignore disabled SSESpecification #7031
  • Alternator: return an error for create-table, but the table is created #6809
  • Stability: RPC handlers are not unregistered on stop #6904
  • Stability: Schema change, adding a counter column into a non-counter table, is incorrectly persisted and results in a crash #7065
  • Debugging: trace decoding on doesn’t work #6982
  • Debugging: gdb scylla databases gives the same address for different shards #7016
  • CDC: better classify errors when fetching CDC generations #6804
  • Stability: Connection dropped: RPC frame LZ4 decompression failure #6925
  • Stability: repair may cause stalls under different failure cases #6940 #6975 #6976 #7115
  • Stability: Materialized view updates use incorrect schema to interpret writes after base table schema is altered #7061
  • Stability: Possible priority inversion between hints and normal write in storage_proxy #7177
  • Stability: Segfault in evaluating multi-column restrictions #7198
  • Stability: in some rare cases, a node may crash when calculating effective ownership #7209
  • Alternator: a bug in the PutItem implementation with write_islation = always_use_lwt. In case of contention but a successful write, the written data could have not matched the intent. #7218
  • Stability: in some rare cases, error recovery from an out-of-memory condition while processing an unpaged query that consumes all of available memory may cause the coordinator node to exit  #7240
  • Stability: Coredump during the decommission process on decommissioning node #7257
  • Stability: out-of-range range tombstones emitted on reader recreation cause fragment stream monotonicity violations #7208
  • Stability: race in schema version recalculation leading to stale schema version in gossip #7200

The post Scylla Open Source Release 4.2 appeared first on ScyllaDB.

Sizing Matters: Sizing Astra for Apache Cassandra Apps

A Haunted House of NoSQL Horrors

True-life tales of what not to do to your database!

Halloween is that frightening time of the year when the veil between the well-ordered world of DevOps and the dark chthonic void of dev/null is thinnest. When chaos monkeys reign supreme and Site Reliability Engineers lock the doors and bolt the shutters.

Today we will share with you true life NoSQL horror stories our engineers have seen and witnessed in the world of big data. But be forewarned! What you are about to read may just make you shudder deep down to your very hyperthreaded cores.

It was a dark and stormy night…

Picture being a user running in production, expecting enterprise grade scalability and stability, frenetically complaining nodes are being “terminated”… and then you wake up in a cold sweat realizing you only provisioned spot instances!!!

Or that one day, you have a boolean column, but belatedly realize when you make a secondary index that you just created terabytes of data split in two huge partitions!!!

Imagine you have the idea to push your client application with maximum parallelism. Unlimited parallelism! “They called me mad! But I’ll show them! Mwah hah hah!” Only then you discover how unlimited parallelism creates a death spiral, generating queuing where final query latency is equal to its own latency plus the sum of latencies of all requests it’s behind in the queue!!!



Then there was the time that poor soul used the same disk for data as the OS/root drive.

Save Yourself While You Can!

Sadly, it is already too late for some developers. They plunged themselves into nightmarish situations from which their sanity can never fully recover, even if their data has been. Yet it’s not too late for you, dear reader, to think about how you can avoid the same fate.

Never create a huge Cartesian product by concatenating multiple IN conditions in a single query. This path only leads to madness!

Also, be wary of that innocuous soul from IT who “just wants to shutdown unused instances,” but ends up killing your production instances! (It was just an accident, wasn’t it?)

You’ve always wanted to have that viral success. And you finally get it. But then the users… They won’t stop! They. Just. Keep. Coming! All targeting that one hot partition (Read our related horror story on the Dress that Broke the Internet here.)

So, “ah hah!” you think. You’ll just redesign your database to have a cache in front of it. This time, you swear, things will be different! Then — the cache goes out of sync with the database! (We have heard horror stories of caches and databases so often we wrote this white paper on why it’s a bad idea.)

Perhaps disaster strikes. A catastrophic outage. But “Don’t panic!” you tell your team, “We religiously keep backups.” They relax a little. Until everyone realizes they actually never had enough time to test the backups or work out precise procedures on how to restore!

The feeling when you discover your multi-AZ deployment was actually done within the same AZ.

Don’t Open That Door to the Basement!

There are many more stories of woe deep in the trenches of NoSQL. Here’s a few more brief examples of what NOT to do:

  • EBS isn’t a panacea. Guess what? You got an email that your block storage got erased
  • Wish DROP_TABLE no longer had autocomplete and that you’d put your fingers on a diet?
  • Selecting a smaller replication factor, thinking that makes a really good savings plan, until it’s not… AT ALL!
  • Keeping your database on a spot instance for cost savings, until you discover it has downsides too.
  • Data necromancy. Because data resurrection with a log structured merge tree is as frightening as resurrection of the dead.
  • Misusing the TTL (Time To Live) attribute, by either setting it to `0` and never deleting any data (uh-oh – storage bloat!) or setting it to a wrong small value and having some data deleted too early… (where did my data go??)

Database Monsters of the World Connect!

If you have big data horror stories of your own to tell, the place to gather this year will be our virtual Scylla Summit 2021. You’ll be able to hear the lore of those who have mastered the alchemy and art of NoSQL. And our free training sessions will keep you from being a subject in our next edition of NoSQL horror stories.


The post A Haunted House of NoSQL Horrors appeared first on ScyllaDB.

Clearing the Air on Cassandra Batches

Apache Cassandra Changelog #1 (October 2020)

Introducing the first Cassandra Changelog blog! Our monthly roundup of key activities and knowledge to keep the community informed.

Cassandra Changelog header

Release Notes


The most current Apache Cassandra releases are 4.0-beta2, 3.11.8, 3.0.22, 2.2.18 and 2.1.22 released on August 31 and are in the repositories. The next cut of releases will be out soon―join the Cassandra mailing list to stay up-to-date.

We continue to make progress toward the 4.0 GA release with the overarching goal of it being at a state where major users should feel confident running it in production when it is cut. Over 1,300 Jira tickets have been closed and less than 100 remain as of this post. To gain this confidence, there are various ongoing testing efforts involving correctness, performance, and ease of use.


With CASSANDRA-15013, the community improved Cassandra's ability to handle high throughput workloads, while having enough safeguards in place to protect itself from potentially going out of memory.


The Harry project is a fuzz testing tool that aims to generate reproducible workloads that are as close to real-life as possible, while being able to efficiently verify the cluster state against the model without pausing the workload itself.


The community published its first Apache Cassandra Usage Report 2020 detailing findings from a comprehensive global survey of 901 practitioners on Cassandra usage to provide a baseline understanding of who, how, and why organizations use Cassandra.

Community Notes

Updates on new and active Cassandra Enhancement Proposals (CEPs) and how to contribute.


CEP-2: Kubernetes Operator was introduced this year and is an active discussion on creation of a community-based operator with the goal of making it easy to run Cassandra on Kubernetes.


CEP-7: Storage Attached Index (SAI) is a new secondary index for Cassandra that builds on the advancements made with SASI. It is intended to replace the existing built-in secondary index implementations.


Cassandra was selected by the ASF Diversity & Inclusion committee to be included in a research project to evaluate and understand the current state of diversity.

User Space


"In vetting MySQL, MongoDB, and other potential databases for IoT scale, we found they couldn't match the scalability we could get with open source Apache Cassandra. Cassandra's built-for-scale architecture enables us to handle millions of operations or concurrent users each second with ease – making it ideal for IoT deployments." - Brett Orr


"Our group is working on a multi-year build, creating a new Index Construction Platform to handle the daily production of the Bloomberg Barclays fixed income indices. This involves building and productionizing an Apache Solr-backed search platform to handle thousands of searches per minute, an Apache Cassandra back-end database to store millions of data points per day, and a distributed computational engine to handle millions of computations daily." - Noel Gunasekar

In the News

Solutions Review - The Five Best Apache Cassandra Books on Our Reading List

ZDNet - What Cassandra users think of their NoSQL DBMS

Datanami - Cassandra Adoption Correlates with Experience

Container Journal - 5 to 1: An Overview of Apache Cassandra Kubernetes Operators

Datanami - Cassandra Gets Monitoring, Performance Upgrades

ZDNet - Faster than ever, Apache Cassandra 4.0 beta is on its way

Cassandra Tutorials & More

A Cassandra user was in search of a tool to perform schema DDL upgrades. Another user suggested to ensure you don't get schema mismatches if running multiple upgrade statements in one migration. See the full email on the user mailing list for other recommended tools.

Start using virtual tables in Apache Cassandra 4.0 - Ben Bromhead, Instaclustr

Benchmarking Apache Cassandra with Rust - Piotr Kołaczkowski, DataStax

Open Source BI Tools and Cassandra - Arpan Patel, Anant Corporation

Build Fault Tolerant Applications With Cassandra API for Azure Cosmos DB - Abhishek Gupta, Microsoft

Understanding Data Modifications in Cassandra - Sameer Shukla, Redgate

Cassandra Changelog footer

Cassandra Changelog is curated by the community. Please send submissions to

Bulldozer: Batch Data Moving from Data Warehouse to Online Key-Value Stores

By Tianlong Chen and Ioannis Papapanagiotou

Netflix has more than 195 million subscribers that generate petabytes of data everyday. Data scientists and engineers collect this data from our subscribers and videos, and implement data analytics models to discover customer behaviour with the goal of maximizing user joy. Usually Data scientists and engineers write Extract-Transform-Load (ETL) jobs and pipelines using big data compute technologies, like Spark or Presto, to process this data and periodically compute key information for a member or a video. The processed data is typically stored as data warehouse tables in AWS S3. Iceberg is widely adopted in Netflix as a data warehouse table format that addresses many of the usability and performance problems with Hive tables.

At Netflix, we also heavily embrace a microservice architecture that emphasizes separation of concerns. Many of these services often have the requirement to do a fast lookup for this fine-grained data which is generated periodically. For example, in order to enhance our user experience, one online application fetches subscribers’ preferences data to recommend movies and TV shows. The data warehouse is not designed to serve point requests from microservices with low latency. Therefore, we must efficiently move data from the data warehouse to a global, low-latency and highly-reliable key-value store. For how our machine learning recommendation systems leverage our key-value stores, please see more details on this presentation.

What is Bulldozer

Bulldozer is a self-serve data platform that moves data efficiently from data warehouse tables to key-value stores in batches. It leverages Netflix Scheduler for scheduling the Bulldozer Jobs. Netflix Scheduler is built on top of Meson which is a general purpose workflow orchestration and scheduling framework to execute and manage the lifecycle of the data workflow. Bulldozer makes data warehouse tables more accessible to different microservices and reduces each individual team’s burden to build their own solutions. Figure 1 shows how we use Bulldozer to move data at Netflix.

Figure 1. Moving data with Bulldozer at Netflix.

As the paved path for moving data to key-value stores, Bulldozer provides a scalable and efficient no-code solution. Users only need to specify the data source and the destination cluster information in a YAML file. Bulldozer provides the functionality to auto-generate the data schema which is defined in a protobuf file. The protobuf schema is used for serializing and deserializing the data by Bulldozer and data consumers. Bulldozer uses Spark to read the data from the data warehouse into DataFrames, converts each data entry to a key-value pair using the schema defined in the protobuf and then delivers key-value pairs into a key-value store in batches.

Instead of directly moving data into a specific key-value store like Cassandra or Memcached, Bulldozer moves data to a Netflix implemented Key-Value Data Abstraction Layer (KV DAL). The KV DAL allows applications to use a well-defined and storage engine agnostic HTTP/gRPC key-value data interface that in turn decouples applications from hard to maintain and backwards-incompatible datastore APIs. By leveraging multiple shards of the KV DAL, Bulldozer only needs to provide one single solution for writing data to the highly abstracted key-value data interface, instead of developing different plugins and connectors for different data stores. Then the KV DAL handles writing to the appropriate underlying storage engines depending on latency, availability, cost, and durability requirements.

Figure 2. How Bulldozer leverages Spark, Protobuf and KV DAL for moving the data.

Configuration-Based Bulldozer Job

For batch data movement in Netflix, we provide job templates in our Scheduler to make movement of data from all data sources into and out of the data warehouse. Templates are backed by notebooks. Our data platform provides the clients with a configuration-based interface to run a templated job with input validation.

We provide the job template MoveDataToKvDal for moving the data from the warehouse to one Key-Value DAL. Users only need to put the configurations together in a YAML file to define the movement job. The job is then scheduled and executed in Netflix Big Data Platform. This configuration defines what and where the data should be moved. Bulldozer abstracts the underlying infrastructure on how the data moves.

Let’s look at an example of a Bulldozer YAML configuration (Figure 3). Basically the configuration consists of three major domains: 1) data_movement includes the properties that specify what data to move. 2) key_value_dal defines the properties of where the data should be moved. 3) bulldozer_protobuf has the required information for protobuf file auto generation.

Figure 3. An Exemplar Bulldozer Job YAML.

In the data_movement domain, the source of the data can be a warehouse table or a SQL query. Users also need to define the key and value columns to tell Bulldozer which column is used as the key and which columns are included in the value message. We will discuss more details about the schema mapping in the next Data Model section. In the key_value_dal domain, it defines the destination of the data which is a namespace in the Key-Value DAL. One namespace in a Key-Value DAL contains as many key-value data as required, it is the equivalent to a table in a database.

Data Model

Bulldozer uses protobuf for 1) representing warehouse table schema into a key-value schema; 2) serializing and deserializing the key-value data when performing write and read operations to KV DAL. In this way, it allows us to provide a more traditional typed record store while keeping the key-value storage engine abstracted.

Figure 4 shows a simple example of how we represent a warehouse table schema into a key-value schema. The left part of the figure shows the schema of the warehouse table while the right part is the protobuf message that Bulldozer auto generates based on the configurations in the YAML file. The field names should exactly match for Bulldozer to convert the structured data entries into the key-value pairs. In this case, profile_id field is the key while email and age fields are included in the value schema. Users can use the protobuf schema KeyMessage and ValueMessage to deserialize data from Key-Value DAL as well.

Figure 4. An Example of Schema Mapping.

In this example, the schema of the warehouse table is flat, but sometimes the table can have nested structures. Bulldozer supports complicated schemas, like struct of struct type, array of struct, map of struct and map of map type.

Data Version Control

Bulldozer jobs can be configured to execute at a desired frequency of time, like once or many times per day. Each execution moves the latest view of the data warehouse into a Key-Value DAL namespace. Each view of the data warehouse is a new version of the entire dataset. For example, the data warehouse has two versions of full dataset as of January 1st and 2nd, Bulldozer job is scheduled to execute daily for moving each version of the data.

Figure 5. Dataset of January 1st 2020.
Figure 6. Dataset of January 2nd 2020.

When Bulldozer moves these versioned data, it usually has the following requirements:

  • Data Integrity. For one Bulldozer job moving one version of data, it should write the full dataset or none. Partially writing is not acceptable. For example above, if the consumer reads value for movie_id: 1 and movie_id: 2 after the Bulldozer jobs, the returned values shouldn’t come from two versions, like: (movie_id: 1, cost 40), (movie_id: 2, cost 101).
  • Seamless to Data Consumer. Once a Bulldozer job finishes moving a new version of data, the data consumer should be able to start reading the new data automatically and seamlessly.
  • Data Fallback. Normally, data consumers read only the latest version of the data, but if there’s some data corruption in that version, we should have a mechanism to fallback to the previous version.

Bulldozer leverages the KV DAL data namespace and namespace alias functionality to manage these versioned datasets. For each Bulldozer job execution, it creates a new namespace suffixed with the date and moves the data to that namespace. The data consumer reads data from an alias namespace which points to one of these version namespaces. Once the job moves the full data successfully, the Bulldozer job updates the alias namespace to point to the new namespace which contains the new version of data. The old namespaces are closed to reads and writes and deleted in the background once it’s safe to do so. As most key-value storage engines support efficiently deleting a namespace (e.g. truncate or drop a table) this allows us to cheaply recycle old versions of the data. There are also other systems in Netflix like Gutenberg which adopt a similar namespace alias approach for data versioning which is applied to terabyte datasets.

For example, in Figure 7 data consumers read the data through namespace: alias_namespace which points to one of the underlying namespaces. On January 1st 2020, Bulldozer job creates namespace_2020_01_01 and moves the dataset, alias_namespace points to namespace_2020_01_01. On January 2nd 2020, there’s a new version of data, bulldozer creates namespace_2020_01_02 , moves the new dataset and updates alias_namespace pointing to namespace_2020_01_02. Both namespace_2020_01_01 and namespace_2020_01_02 are transparent to the data consumers.

Figure 7. An Example of How the Namespace Aliasing Works.

The namespace aliasing mechanism ensures that the data consumer only reads data from one single version. If there’s a bad version of data, we can always swap the underlying namespaces to fallback to the old version.

Production Usage

We released Bulldozer in production in early 2020. Currently, Bulldozer transfers billions of records from the data warehouse to key-value stores in Netflix everyday. The use cases include our members’ predicted scores data to help improve personalized experience (one example shows in Figure 8), the metadata of Airtable and Google Sheets for data lifecycle management, the messaging modeling data for messaging personalization and more.

Figure 8. Personalized articles in Netflix Help Center powered by Bulldozer.

Stay Tuned

The ideas discussed here include only a small set of problems with many more challenges still left to be identified and addressed. Please share your thoughts and experience by posting your comments below and stay tuned for more on data movement work at Netflix.


We would like to thank the following persons and teams for contributing to the Bulldozer project: Data Integration Platform Team (Raghuram Onti Srinivasan, Andreas Andreakis, and Yun Wang), Data Gateway Team (Vinay Chella, Joseph Lynch, Vidhya Arvind and Chandrasekhar Thumuluru), Shashi Shekar Madappa and Justin Cunningham.

Bulldozer: Batch Data Moving from Data Warehouse to Online Key-Value Stores was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

[Webcast] Astra on GCP: Spring into Action and Get Up and Running In Minutes with Cassandra

Building Netflix’s Distributed Tracing Infrastructure

by Maulik Pandey

Our Team — Kevin Lew, Narayanan Arunachalam, Elizabeth Carretto, Dustin Haffner, Andrei Ushakov, Seth Katz, Greg Burrell, Ram Vaithilingam, Mike Smith and Maulik Pandey

@Netflixhelps Why doesn’t Tiger King play on my phone?” — a Netflix member via Twitter

This is an example of a question our on-call engineers need to answer to help resolve a member issue — which is difficult when troubleshooting distributed systems. Investigating a video streaming failure consists of inspecting all aspects of a member account. In our previous blog post we introduced Edgar, our troubleshooting tool for streaming sessions. Now let’s look at how we designed the tracing infrastructure that powers Edgar.

Distributed Tracing: the missing context in troubleshooting services at scale

Prior to Edgar, our engineers had to sift through a mountain of metadata and logs pulled from various Netflix microservices in order to understand a specific streaming failure experienced by any of our members. Reconstructing a streaming session was a tedious and time consuming process that involved tracing all interactions (requests) between the Netflix app, our Content Delivery Network (CDN), and backend microservices. The process started with manual pull of member account information that was part of the session. The next step was to put all puzzle pieces together and hope the resulting picture would help resolve the member issue. We needed to increase engineering productivity via distributed request tracing.

If we had an ID for each streaming session then distributed tracing could easily reconstruct session failure by providing service topology, retry and error tags, and latency measurements for all service calls. We could also get contextual information about the streaming session by joining relevant traces with account metadata and service logs. This insight led us to build Edgar: a distributed tracing infrastructure and user experience.

Figure 1. Troubleshooting a session in Edgar

When we started building Edgar four years ago, there were very few open-source distributed tracing systems that satisfied our needs. Our tactical approach was to use Netflix-specific libraries for collecting traces from Java-based streaming services until open source tracer libraries matured. By 2017, open source projects like Open-Tracing and Open-Zipkin were mature enough for use in polyglot runtime environments at Netflix. We chose Open-Zipkin because it had better integrations with our Spring Boot based Java runtime environment. We use Mantis for processing the stream of collected traces, and we use Cassandra for storing traces. Our distributed tracing infrastructure is grouped into three sections: tracer library instrumentation, stream processing, and storage. Traces collected from various microservices are ingested in a stream processing manner into the data store. The following sections describe our journey in building these components.

Trace Instrumentation: how will it impact our service?

That is the first question our engineering teams asked us when integrating the tracer library. It is an important question because tracer libraries intercept all requests flowing through mission-critical streaming services. Safe integration and deployment of tracer libraries in our polyglot runtime environments was our top priority. We earned the trust of our engineers by developing empathy for their operational burden and by focusing on providing efficient tracer library integrations in runtime environments.

Distributed tracing relies on propagating context for local interprocess calls (IPC) and client calls to remote microservices for any arbitrary request. Passing the request context captures causal relationships between microservices during runtime. We adopted Open-Zipkin’s B3 HTTP header based context propagation mechanism. We ensure that context propagation headers are correctly passed between microservices across a variety of our “paved road” Java and Node runtime environments, which include both older environments with legacy codebases and newer environments such as Spring Boot. We execute the Freedom & Responsibility principle of our culture in supporting tracer libraries for environments like Python, NodeJS, and Ruby on Rails that are not part of the “paved road” developer experience. Our loosely coupled but highly aligned engineering teams have the freedom to choose an appropriate tracer library for their runtime environment and have the responsibility to ensure correct context propagation and integration of network call interceptors.

Our runtime environment integrations inject infrastructure tags like service name, auto-scaling group (ASG), and container instance identifiers. Edgar uses this infrastructure tagging schema to query and join traces with log data for troubleshooting streaming sessions. Additionally, it became easy to provide deep links to different monitoring and deployment systems in Edgar due to consistent tagging. With runtime environment integrations in place, we had to set an appropriate trace data sampling policy for building a troubleshooting experience.

Stream Processing: to sample or not to sample trace data?

This was the most important question we considered when building our infrastructure because data sampling policy dictates the amount of traces that are recorded, transported, and stored. A lenient trace data sampling policy generates a large number of traces in each service container and can lead to degraded performance of streaming services as more CPU, memory, and network resources are consumed by the tracer library. An additional implication of a lenient sampling policy is the need for scalable stream processing and storage infrastructure fleets to handle increased data volume.

We knew that a heavily sampled trace dataset is not reliable for troubleshooting because there is no guarantee that the request you want is in the gathered samples. We needed a thoughtful approach for collecting all traces in the streaming microservices while keeping low operational complexity of running our infrastructure.

Most distributed tracing systems enforce sampling policy at the request ingestion point in a microservice call graph. We took a hybrid head-based sampling approach that allows for recording 100% of traces for a specific and configurable set of requests, while continuing to randomly sample traffic per the policy set at ingestion point. This flexibility allows tracer libraries to record 100% traces in our mission-critical streaming microservices while collecting minimal traces from auxiliary systems like offline batch data processing. Our engineering teams tuned their services for performance after factoring in increased resource utilization due to tracing. The next challenge was to stream large amounts of traces via a scalable data processing platform.

Mantis is our go-to platform for processing operational data at Netflix. We chose Mantis as our backbone to transport and process large volumes of trace data because we needed a backpressure-aware, scalable stream processing system. Our trace data collection agent transports traces to Mantis job cluster via the Mantis Publish library. We buffer spans for a time period in order to collect all spans for a trace in the first job. A second job taps the data feed from the first job, does tail sampling of data and writes traces to the storage system. This setup of chained Mantis jobs allows us to scale each data processing component independently. An additional advantage of using Mantis is the ability to perform real-time ad-hoc data exploration in Raven using the Mantis Query Language (MQL). However, having a scalable stream processing platform doesn’t help much if you can’t store data in a cost efficient manner.

Storage: don’t break the bank!

We started with Elasticsearch as our data store due to its flexible data model and querying capabilities. As we onboarded more streaming services, the trace data volume started increasing exponentially. The increased operational burden of scaling ElasticSearch clusters due to high data write rate became painful for us. The data read queries took an increasingly longer time to finish because ElasticSearch clusters were using heavy compute resources for creating indexes on ingested traces. The high data ingestion rate eventually degraded both read and write operations. We solved this by migrating to Cassandra as our data store for handling high data ingestion rates. Using simple lookup indices in Cassandra gives us the ability to maintain acceptable read latencies while doing heavy writes.

In theory, scaling up horizontally would allow us to handle higher write rates and retain larger amounts of data in Cassandra clusters. This implies that the cost of storing traces grows linearly to the amount of data being stored. We needed to ensure storage cost growth was sub-linear to the amount of data being stored. In pursuit of this goal, we outlined following storage optimization strategies:

  1. Use cheaper Elastic Block Store (EBS) volumes instead of SSD instance stores in EC2.
  2. Employ better compression technique to reduce trace data size.
  3. Store only relevant and interesting traces by using simple rules-based filters.

We were adding new Cassandra nodes whenever the EC2 SSD instance stores of existing nodes reached maximum storage capacity. The use of a cheaper EBS Elastic volume instead of an SSD instance store was an attractive option because AWS allows dynamic increase in EBS volume size without re-provisioning the EC2 node. This allowed us to increase total storage capacity without adding a new Cassandra node to the existing cluster. In 2019 our stunning colleagues in the Cloud Database Engineering (CDE) team benchmarked EBS performance for our use case and migrated existing clusters to use EBS Elastic volumes. By optimizing the Time Window Compaction Strategy (TWCS) parameters, they reduced the disk write and merge operations of Cassandra SSTable files, thereby reducing the EBS I/O rate. This optimization helped us reduce the data replication network traffic amongst the cluster nodes because SSTable files were created less often than in our previous configuration. Additionally, by enabling Zstd block compression on Cassandra data files, the size of our trace data files was reduced by half. With these optimized Cassandra clusters in place, it now costs us 71% less to operate clusters and we could store 35x more data than our previous configuration.

We observed that Edgar users explored less than 1% of collected traces. This insight leads us to believe that we can reduce write pressure and retain more data in the storage system if we drop traces that users will not care about. We currently use a simple rule based filter in our Storage Mantis job that retains interesting traces for very rarely looked service call paths in Edgar. The filter qualifies a trace as an interesting data point by inspecting all buffered spans of a trace for warnings, errors, and retry tags. This tail-based sampling approach reduced the trace data volume by 20% without impacting user experience. There is an opportunity to use machine learning based classification techniques to further reduce trace data volume.

While we have made substantial progress, we are now at another inflection point in building our trace data storage system. Onboarding new user experiences on Edgar could require us to store 10x the amount of current data volume. As a result, we are currently experimenting with a tiered storage approach for a new data gateway. This data gateway provides a querying interface that abstracts the complexity of reading and writing data from tiered data stores. Additionally, the data gateway routes ingested data to the Cassandra cluster and transfers compacted data files from Cassandra cluster to S3. We plan to retain the last few hours worth of data in Cassandra clusters and keep the rest in S3 buckets for long term retention of traces.

Table 1. Timeline of Storage Optimizations

Secondary advantages

In addition to powering Edgar, trace data is used for the following use cases:

Application Health Monitoring

Trace data is a key signal used by Telltale in monitoring macro level application health at Netflix. Telltale uses the causal information from traces to infer microservice topology and correlate traces with time series data from Atlas. This approach paints a richer observability portrait of application health.

Resiliency Engineering

Our chaos engineering team uses traces to verify that failures are correctly injected while our engineers stress test their microservices via Failure Injection Testing (FIT) platform.

Regional Evacuation

The Demand Engineering team leverages tracing to improve the correctness of prescaling during regional evacuations. Traces provide visibility into the types of devices interacting with microservices such that changes in demand for these services can be better accounted for when an AWS region is evacuated.

Estimate infrastructure cost of running an A/B test

The Data Science and Product team factors in the costs of running A/B tests on microservices by analyzing traces that have relevant A/B test names as tags.

What’s next?

The scope and complexity of our software systems continue to increase as Netflix grows. We will focus on following areas for extending Edgar:

  • Provide a great developer experience for collecting traces across all runtime environments. With an easy way to to try out distributed tracing, we hope that more engineers instrument their services with traces and provide additional context for each request by tagging relevant metadata.
  • Enhance our analytics capability for querying trace data to enable power users at Netflix in building their own dashboards and systems for narrowly focused use cases.
  • Build abstractions that correlate data from metrics, logging, and tracing systems to provide additional contextual information for troubleshooting.

As we progress in building distributed tracing infrastructure, our engineers continue to rely on Edgar for troubleshooting streaming issues like “Why doesn’t Tiger King play on my phone?”. Our distributed tracing infrastructure helps in ensuring that Netflix members continue to enjoy a must-watch show like Tiger King!

We are looking for stunning colleagues to join us on this journey of building distributed tracing infrastructure. If you are passionate about Observability then come talk to us.

Building Netflix’s Distributed Tracing Infrastructure was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Building a Low-Latency Distributed Stock Broker Application: Part 4

In the fourth blog of the  “Around the World ” series we built a prototype of the application, designed to run in two georegions.

Recently I re-watched “Star Trek: The Motion Picture” (The original 1979 Star Trek Film). I’d forgotten how much like “2001: A Space Odyssey” the vibe was (a drawn out quest to encounter a distant, but rapidly approaching very powerful and dangerous alien called “V’ger”), and also that the “alien” was originally from Earth and returning in search of “the Creator”—V’ger was actually a seriously upgraded Voyager spacecraft!

Star Trek: The Motion Picture (V’ger)

The original Voyager 1 and 2 had only been recently launched when the movie came out, and were responsible for some remarkable discoveries (including the famous “Death Star” image of Saturn’s moon Mimas, taken in 1980). Voyager 1 has now traveled further than any other human artefact and has since left the solar system! It’s amazing that after 40+ years it’s still working, although communicating with it now takes a whopping 40 hours in round-trip latency (which happens via the Deep Space Network—one of the stations is at Tidbinbilla, near our Canberra office).

Canberra Deep Space Communication Complex (CSIRO)

Luckily we are only interested in traveling “Around the World” in this blog series, so the latency challenges we face in deploying a globally distributed stock trading application are substantially less than the 40 hours latency to outer space and back. In Part 4 of this blog series we catch up with Phileas Fogg and Passepartout in their journey, and explore the initial results from a prototype application deployed in two locations, Sydney and North Virginia.

1. The Story So Far

In Part 1 of this blog series we built a map of the world to take into account inter-region AWS latencies, and identified some “georegions” that enabled sub 100ms latency between AWS regions within the same georegion. In Part 2 we conducted some experiments to understand how to configure multi-DC Cassandra clusters and use java clients, and measured latencies from Sydney to North Virginia. In Part 3 we explored the design for a globally distributed stock broker application, and built a simulation and got some indicative latency predictions. 

The goal of the application is to ensure that stock trades are done as close as possible to the stock exchanges the stocks are listed on, to reduce the latency between receiving a stock ticker update, checking the conditions for a stock order, and initiating a trade if the conditions are met. 

2. The Prototype

For this blog I built a prototype of the application, designed to run in two georegions, Australia and the USA. We are initially only trading stocks available from stock exchanges in these two georegions, and orders for stocks traded in New York will be directed to a Broker deployed in the AWS North Virginia region, and orders for stocks traded in Sydney will be directed to a Broker deployed in the AWS Sydney region. As this Google Earth image shows, they are close to being antipodes (diametrically opposite each other, at 15,500 km apart, so pretty much satisfy the definition of  traveling “Around the World”), with a measured inter-region latency (from blog 2) of 230ms.

Sydney to North Virginia (George Washington’s Mount Vernon house)

The design of the initial (simulated) version of the application was changed in a couple of ways to: 

  • Ensure that it worked correctly when instances of the Broker were deployed in multiple AWS regions
  • Measure actual rather than simulated latencies, and
  • Use a multi-DC Cassandra Cluster. 

The prototype isn’t a complete implementation of the application yet. In particular it only uses a single Cassandra table for Orders—to ensure that Orders are made available in both georegions, and can be matched against incoming stock tickers by the Broker deployed in that region. 

Some other parts of the application are “stubs”, including the Stock Ticker component (which will eventually use Kafka), and checks/updates of Holdings and generation of Transactions records (which will eventually also be Cassandra tables). Currently only the asynchronous, non-market, order types are implemented (Limit and Stop orders), as I realized that implementing Market orders (which are required to be traded immediately) using a Cassandra table would result in too many tombstones being produced—as each order is deleted immediately upon being filled (rather than being marked as “filled” in the original design, to enable the Cassandra query to quickly find unfilled orders and prevent the Orders table from growing indefinitely). However, for non-market orders it is a reasonable design choice to use a Cassandra table, as Orders may exist for extended periods of time (hours, days, or even weeks) before being traded, and as there is only a small number of successful trades (10s to 100s) per second relative to the total number of waiting Orders (potentially millions), the number of deletions, and therefore tombstones, will be acceptable. 

We now have a look at the details of some of the more significant changes that were made to the application.


I created a multi-DC Cassandra keyspace as follows:

CREATE KEYSPACE broker WITH replication = {'class': 'NetworkTopologyStrategy', 'NorthVirginiaDC': '3', 'SydneyDC': '3'};

In the Cassandra Java driver, the application.conf file determines which Data Center the Java client connects to. For example, to connect to the SydneyDC the file has the following settings:

datastax-java-driver { = [

    basic.load-balancing-policy {
        class = DefaultLoadBalancingPolicy
        local-datacenter = "SydneyDC"

The Orders table was created as follows (note that I couldn’t use “limit” for a column name as it’s a reserved word in Cassandra!):

CREATE TABLE broker.orders (
    symbol text,
    orderid text,
    buyorsell text,
    customerid text,
    limitthreshold double,
    location text,
    ordertype text,
    quantity bigint,
    starttime bigint,
    PRIMARY KEY (symbol, orderid)

For the primary key, the partition key is the stock symbol, so that all outstanding orders for a stock can be found when each stock ticker is received by the Broker, and the clustering column is the (unique) orderid, so that multiple orders for the same symbol can be written and read, and a specific order (for an orderid) can be deleted. In a production environment using a single stock symbol partition may result in skewed and unbounded partitions which is not recommended.

The prepared statements for creating, reading, and deleting orders are as follows:

PreparedStatement prepared_insert = Cassandra.session.prepare(
"insert into broker.orders (symbol, orderid, buyorsell, customerid, limitthreshold, location, ordertype, quantity, starttime) values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
PreparedStatement prepared_select = Cassandra.session.prepare(
        "select * from broker.orders where symbol = ?");

PreparedStatement prepared_delete = Cassandra.session.prepare(
        "delete from broker.orders where symbol = ? and orderid = ?");

I implemented a simplified “Place limit or stop order” operation (see Part 3), which uses the prepared_insert statement to create each new order, initially in the Cassandra Data Center local to the Broker where the order was created from, which is then automatically replicated in the other Cassandra Data Center. I also implemented the “Trade Matching Order” operation (Part 3), which uses the prepared_select statement to query orders matching each incoming Stock Ticker, checks the rules, and then if a trade is filled deletes the order.


I created a 3 node Cassandra cluster in the Sydney AWS region, and then added another identical Data Center in the North Virginia AWS regions using Instaclustr Managed Cassandra for AWS. This gave me 6 nodes in total, running on t3.small instances (5 GB SSD, 2GB RAM, 2 CPU Cores). This is a small developer sized cluster, but is adequate for a prototype, and very affordable (2 cents an hour per node for AWS costs) given that the Brokers are currently only single threaded so don’t produce much load. We’re more interested in latency at this point of the experiment, and we may want to increase the number of Data Centers in the future. I also spun up an EC2 instance (t3a.micro) in the same AWS regions, and deployed an instance of the Stock Broker on each (it only used 20% CPU). Here’s what the complete deployment looks like:

3. The Results

For the prototype, the focus was on demonstrating that the design goal of minimizing latency for trading stop and limit orders (asynchronous trades) was achieved. For the prototype, the latency for these order types is measured from the time of receiving a Stock Ticker, to the time an Order is filled. We ran a Broker in each AWS region concurrently for an hour, with the same workload for each, and measured the average and maximum latencies. For the first configuration, each Broker is connected to its local Cassandra Data Center, which is how it would be configured in practice. The results were encouraging, with an average latency of 3ms, and a maximum of 60ms, as shown in this graph.  

During the run, across both Brokers, 300 new orders were created each second, 600 stock tickers were received each second, and 200 trades were carried out each second. 

Given that I hadn’t implemented Market Orders yet, I wondered how I could configure and approximately measure the expected latency for these synchronous order types between different regions (i.e. Sydney and North Virginia)? The latency for Market orders in the same region will be comparable to the non-market orders. The solution turned out to be simple— just re-configure the Brokers to use the remote Cassandra Data Center, which introduces the inter-region round-trip latency which would also be encountered with Market Orders placed on one region and traded immediately in the other region. I could also have achieved a similar result by changing the consistency level to EACH_QUOROM (which requires a majority of nodes in each data center to respond). Not surprisingly, the latencies were higher, rising to 360ms average, and 1200ms maximum, as shown in this graph with both configurations (Stop and Limit Orders on the left, and Market Orders on the right):

So our initial experiments are a success, and validate the primary design goal, as asynchronous stop and limit Orders can be traded with low latency from the Broker nearest the relevant stock exchanges, while synchronous Market Orders will take significantly longer due to inter-region latency. 

Write Amplification

I wondered what else can be learned from running this experiment? We can understand more about resource utilization in multi-DC Cassandra clusters. Using the Instaclustr Cassandra Console, I monitored the CPU Utilization on each of the nodes in the cluster, initially with only one Data Center and one Broker, and then with two Data Centers and a single Broker, and then both Brokers running. It turns out that the read load results in 20% CPU Utilization on each node in the local Cassandra Data Center, and the write load also results in 20% locally.  Thus, for a single Data Center cluster the total load is 40% CPU. However, with two Data Centers things get more complex due to the replication of the local write loads to each other Data Center. This is also called “Write Amplification”.

The following table shows the measured total load for 1 and 2 Data Centers, and predicted load for up to 8 Data Centers, showing that for more than 3 Data Centers you need bigger nodes (or bigger clusters). A four CPU Core node instance type would be adequate for 7 Data Centers, and would result in about 80% CPU Utilization.  

  Number of Data  Centres Local Read Load Local Write Load Remote Write Load Total Write Load Total Load
  1 20 20 0 20 40
  2 20 20 20 40 60
  3 20 20 40 60 80
  4 20 20 60 80 100
  5 20 20 80 100 120
  6 20 20 100 120 140
  7 20 20 120 140 160
  8 20 20 140 160 180


The total cost to run the prototype includes the Instaclustr Managed Cassandra nodes (3 nodes per Data Center x 2 Data Centers = 6 nodes), the two AWS EC2 Broker instances, and the data transfer between regions (AWS only charges for data out of a region, not in, but the prices vary depending on the source region). For example, data transfer out of North Virginia is 2 cents/GB, but Sydney is more expensive at 9.8 cents/GB. I computed the total monthly operating cost to be $361 for this configuration, broken down into $337/month (93%) for Cassandra and EC2 instances, and $24/month (7%) for data transfer, to process around 500 million stock trades. Note that this is only a small prototype configuration, but can easily be scaled for higher throughputs (with incrementally and proportionally increasing costs).


In this blog we built and experimented with a prototype of the globally distributed stock broker application, focussing on testing the multi-DC Cassandra part of the system which enabled us to significantly reduce the impact of planetary scale latencies (from seconds to low milliseconds) and ensure greater redundancy (across multiple AWS regions), for the real-time stock trading function. Some parts of the application remain as stubs, and in future blogs I aim to replace them with suitable functionality (e.g. streaming, analytics) and non-functionality (e.g. failover) from a selection of Kafka, Elasticsearch and maybe even Redis!

The post Building a Low-Latency Distributed Stock Broker Application: Part 4 appeared first on Instaclustr.

Understanding the Impacts of the Native Transport Requests Change Introduced in Cassandra 3.11.5


Recently, Cassandra made changes to the Native Transport Requests (NTR) queue behaviour. Through our performance testing, we found the new NTR change to be good for clusters that have a constant load causing the NTR queue to block. Under the new mechanism the queue no longer blocks, but throttles the load based on queue size setting, which by default is 10% of the heap.

Compared to the Native Transport Requests queue length limit, this improves how Cassandra handles traffic when queue capacity is reached. The “back pressure” mechanism more gracefully handles the overloaded NTR queue, resulting in a significant lift of operations without clients timing out. In summary, clusters with later versions of Cassandra can handle more load before hitting hard limits.


At Instaclustr, we are responsible for managing the Cassandra versions that we release to the public. This involves performing a review of Cassandra release changes, followed by performance testing. In cases where major changes have been made in the behaviour of Cassandra, further research is required. So without further delay let’s introduce the change to be investigated.

  • Prevent client requests from blocking on executor task queue (CASSANDRA-15013)
Versions affected:


Native Transport Requests

Native transport requests (NTR) are any requests made via the CQL Native Protocol. CQL Native Protocol is the way the Cassandra driver communicates with the server. This includes all reads, writes, schema changes, etc. There are a limited number of threads available to process incoming requests. When all threads are in use, some requests wait in a queue (pending). If the queue fills up, some requests are silently rejected (blocked). The server never replies, so this eventually causes a client-side timeout. The main way to prevent blocked native transport requests is to throttle load, so the requests are performed over a longer period.

Prior to 3.11.5

Prior to 3.11.5, Cassandra used the following configuration settings to set the size and throughput of the queue:

  • native_transport_max_threads is used to set the maximum threads for handling requests.  Each thread pulls requests from the NTR queue.
  • cassandra.max_queued_native_transport_requests is used to set queue size. Once the queue is full the Netty threads are blocked waiting for the queue to have free space (default 128).

Once the NTR queue is full requests from all clients are not accepted. There is no strict ordering by which blocked Netty threads will process requests. Therefore in 3.11.4 latency becomes random once all Netty threads are blocked.

Native Transport Requests - Cassandra 3.11.4

Change After 3.11.5

In 3.11.5 and above, instead of blocking the NTR queue as previously described, it throttles. The NTR queue is throttled based on the heap size. The native transport requests are limited in terms of total size occupied in memory rather than the number of them. Requests are paused after the queue is full.

  • native_transport_max_concurrent_requests_in_bytes a global limit on the number of NTR requests, measured in bytes. (default heapSize / 10)
  • native_transport_max_concurrent_requests_in_bytes_per_ip an endpoint limit on the number of NTR requests, measured in bytes. (default heapSize / 40)

Maxed Queue Behaviour

From previously conducted performance testing of 3.11.4 and 3.11.6 we noticed similar behaviour when the traffic pressure has not yet reached the point of saturation in the NTR queue. In this section, we will discuss the expected behaviour when saturation does occur and breaking point is reached. 

In 3.11.4, when the queue has been maxed, client requests will be refused. For example, when trying to make a connection via cqlsh, it will yield an error, see Figure 2.

Cassandra 3.11.4 - queue maxed out, client requests refused
Figure 2: Timed out request

Or on the client that tries to run a query, you may see NoHostAvailableException

Where a 3.11.4 cluster previously got blocked NTRs, when upgraded to 3.11.6 NTRs are no longer blocked. The reason is that 3.11.6 doesn’t place a limit on the number of NTRs but rather on the size of memory of all those NTRs. Thus when the new size limit is reached, NTRs are paused. Default settings in 3.11.6 result in a much larger NTR queue in comparison to the small 128 limit in 3.11.4 (in normal situations where the payload size would not be extremely large).

Benchmarking Setup

This testing procedure requires the NTR queue on a cluster to be at max capacity with enough load to start blocking requests at a constant rate. In order to do this we used multiple test boxes to stress the cluster. This was achieved by using 12 active boxes to create multiple client connections to the test cluster. Once the cluster NTR queue is in constant contention, we monitored the performance using:

  • Client metrics: requests per second, latency from client perspective
  • NTR Queue metrics: Active Tasks, Pending Tasks, Currently Blocked Tasks, and Paused Connections.

For testing purposes we used two testing clusters with details provided in the table below:

Cassandra Cluster size Instance Type Cores RAM Disk
3.11.4 3 M5xl-1600-v2  4 16GB 1600 GB
3.11.6 3 m5xl-1600-v2 4 16GB 1600 GB
Table 1: Cluster Details

To simplify the setup we disabled encryption and authentication. Multiple test instances were set up in the same region as the clusters. For testing purposes we used 12 KB blob payloads. To give each cluster node a balanced mixed load, we kept the number of test boxes generating write load equal to the number of instances generating read load. We ran the load against the cluster for 10 mins to temporarily saturate the queue with read and write requests and cause contention for the Netty threads.

Our test script used cassandra-stress for generating the load, you can also refer to Deep Diving cassandra-stress – Part 3 (Using YAML Profiles) for more information.

In the stressSpec.yaml, we used the following table definition and queries:

table_definition: |
 CREATE TABLE typestest (
       name text,
       choice boolean,
       date timestamp,
       address inet,
       dbl double,
       lval bigint,
               ival int,
       uid timeuuid,
       value blob,
       PRIMARY KEY((name,choice), date, address, dbl, lval, ival, uid)
 ) WITH compaction = { 'class':'LeveledCompactionStrategy' }
   AND comment='A table of many types to test wide rows'
 - name: name
   size: fixed(48)
   population: uniform(1..1000000000) # the range of unique values to select for the field 
 - name: date
   cluster: uniform(20..1000)
 - name: lval
   population: gaussian(1..1000)
   cluster: uniform(1..4)
 - name: value
   size: fixed(12000)
 partitions: fixed(1)       # number of unique partitions to update in a single operation
                                 # if batchcount > 1, multiple batches will be used but all partitions will
                                 # occur in all batches (unless they finish early); only the row counts will vary
 batchtype: UNLOGGED               # type of batch to use
 select: uniform(1..10)/10       # uniform chance any single generated CQL row will be visited in a partition;
                                 # generated for each partition independently, each time we visit it
# List of queries to run against the schema
     cql: select * from typestest where name = ? and choice = ? LIMIT 1
     fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
     cql: select name, choice, uid  from typestest where name = ? and choice = ? and date >= ? LIMIT 10
     fields: multirow            # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
     cql: select name, choice, uid from typestest where name = ? and choice = ? LIMIT 1
     fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)

Write loads were generated with:

cassandra-stress user no-warmup 'ops(insert=10)' profile=stressSpec.yaml cl=QUORUM duration=10m -mode native cql3 maxPending=32768 connectionsPerHost=40 -rate threads=2048 -node file=node_list.txt

Read loads were generated by changing ops to



3.11.4 Queue Saturation Test

The active NTR queue reached max capacity (at 128) and remained in contention under load. Pending NTR tasks remained above 128 throughout. At this point, timeouts were occurring when running 12 load instances to stress the cluster. Each node had 2 load instances performing reads and another 2 performing writes. 4 of the read load instances constantly logged NoHostAvailableExceptions as shown in the example below.

ERROR 04:26:42,542 [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: (com.datastax.driver.core.exceptions.OperationTimedOutException: [] Timed out waiting for server response), (com.datastax.driver.core.exceptions.OperationTimedOutException: [] Timed out waiting for server response), (com.datastax.driver.core.exceptions.OperationTimedOutException: [] Timed out waiting for server response))

The client results we got from this stress run are shown in Table 2.

Box Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
1 700.00 2,862.20 2,078.30 7,977.60 11,291.10 19,495.10 34,426.80
2 651.00 3,054.50 2,319.50 8,048.90 11,525.90 19,528.70 32,950.50
3 620.00 3,200.90 2,426.40 8,409.60 12,599.70 20,367.50 34,158.40
4 607.00 3,312.80 2,621.40 8,304.70 11,769.20 19,730.00 31,977.40
5 568.00 3,529.80 3,011.50 8,216.60 11,618.20 19,260.20 32,698.80
6 553.00 3,627.10 3,028.30 8,631.90 12,918.50 20,115.90 34,292.60
Writes 3,699.00 3,264.55 2,580.90 8,264.88 11,953.77 19,749.57 34,426.80
7 469.00 4,296.50 3,839.90 9,101.60 14,831.10 21,290.30 35,634.80
8 484.00 4,221.50 3,808.40 8,925.50 11,760.80 20,468.20 34,863.10
9 Crashed due to time out
10 Crashed due to time out
11 Crashed due to time out
12 Crashed due to time out
Reads 953.00 4,259.00 3,824.15 9,092.80 14,800.40 21,289.48 35,634.80
Summary 4,652.00 3,761.78 3,202.53 8,678.84 13,377.08 20,519.52 35,634.80
Table 2: 3.11.4 Mixed Load Saturating The NTR Queue

* To calculate the total write operations, we summed the values from 6 instances. For max write latency we used the max value from all instances and for the rest of latency values, we calculated the average of results. Write results are summarised in the Table 2 “Write” row. For the read result we did the same, and results are recorded in the “Read” row. The last row in the table summarises the results in “Write” and “Read” rows.

The 6 write load instances finished normally, but the read instances struggled. Only 2 of the read load instances were able to send traffic through normally, the other clients received too many timeout errors causing them to crash. Another observation we have made is that the Cassandra timeout metrics, under client-request-metrics, did not capture any of the client timeout we have observed.

Same Load on 3.11.6

Next, we proceeded to test 3.11.6 with the same load. Using the default NTR settings, all test instances were able to finish the stress test successfully.

Box Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
1 677.00 2,992.60 2,715.80 7,868.50 9,303.00 9,957.30 10,510.90
2 658.00 3,080.20 2,770.30 7,918.80 9,319.70 10,116.70 10,510.90
3 653.00 3,102.80 2,785.00 7,939.80 9,353.30 10,116.70 10,510.90
4 608.00 3,340.90 3,028.30 8,057.30 9,386.90 10,192.20 10,502.50
5 639.00 3,178.30 2,868.90 7,994.30 9,370.10 10,116.70 10,510.90
6 650.00 3,120.50 2,799.70 7,952.40 9,353.30 10,116.70 10,510.90
Writes 3,885.00 3,135.88 2,828.00 7,955.18 9,347.72 10,102.72 10,510.90
7 755.00 2,677.70 2,468.30 7,923.00 9,378.50 9,982.40 10,762.60
8 640.00 3,160.70 2,812.30 8,132.80 9,529.50 10,418.70 11,031.00
9 592.00 3,427.60 3,101.70 8,262.80 9,579.80 10,452.20 11,005.90
10 583.00 3,483.00 3,160.40 8,279.60 9,579.80 10,435.40 11,022.60
11 582.00 3,503.60 3,181.40 8,287.90 9,588.20 10,469.00 11,047.80
12 582.00 3,506.70 3,181.40 8,279.60 9,588.20 10,460.60 11,014.20
Reads 3,734.00 3,293.22 2,984.25 8,194.28 9,540.67 10,369.72 11,047.80
Summary 7,619.00 3,214.55 2,906.13 8,074.73 9,444.19 10,236.22 11,047.80
Table 3: 3.11.6 Mixed Load

Default Native Transport Requests (NTR) Setting Comparison

Taking the summary row from both versions (Table 2 and Table 3), we produced Table 4.

Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
3.11.4 4652 3761.775 3202.525 8678.839167 13377.08183 20519.52228 35634.8
3.11.6 7619 3214.55 2906.125 8074.733333 9444.191667 10236.21667 11047.8
Table 4: Mixed Load 3.11.4 vs 3.11.6

Figure 2: Latency 3.11.4 vs 3.11.6

Figure 2 shows the latencies from Table 4. From the results, 3.11.6 had slightly better average latency than 3.11.4. Furthermore, in the worst case where contention is high, 3.11.6 handled the latency of a request better than 3.11.4. This is shown by the difference in Latency Max. Not only did 3.11.6 have lower latency but it was able to process many more requests due to not having a blocked queue.

3.11.6 Queue Saturation Test

The default native_transport_max_concurrent_requests_in_bytes is set to 1/10 of the heap size. The Cassandra max heap size of our cluster is 8 GB, so the default queue size for our queue is 0.8 GB. This turns out to be too large for this cluster size, as this configuration will run into CPU and other bottlenecks before we hit NTR saturation.

So we took the reverse approach to investigate full queue behaviour, which is setting the queue size to a lower number. In cassandra.yaml, we added:

native_transport_max_concurrent_requests_in_bytes: 1000000

This means we set the global queue size to be throttled at 1MB. Once Cassandra was restarted and all nodes were online with the new settings, we ran the same mixed load on this cluster, the results we got are shown in Table 5.

3.11.6 Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
Write: Default setting 3,885.00 3,135.88 2,828.00 7,955.18 9,347.72 10,102.72 10,510.90
Write: 1MB setting 2,105.00 5,749.13 3,471.82 16,924.02 26,172.45 29,681.68 31,105.00
Read: Default setting 3,734.00 3,293.22 2,984.25 8,194.28 9,540.67 10,369.72 11,047.80
Read: 1MB setting 5,395.00 2,263.13 1,864.55 5,176.47 8,074.73 9,693.03 15,183.40
Summary: Default setting 7,619.00 3,214.55 2,906.13 8,074.73 9,444.19 10,236.22 11,047.80
Summary: 1MB setting 7,500.00 4,006.13 2,668.18 11,050.24 17,123.59 19,687.36 31,105.00

Table 5: 3.11.6 native_transport_max_concurrent_requests_in_bytes default and 1MB setting 

During the test, we observed a lot of paused connections and discarded requests—see Figure 3. For a full list of Instaclustr exposed metrics see our support documentation.

NTR Test - Paused Connections and Discarded Requests
Figure 3: 3.11.6 Paused Connections and Discarded Requests

After setting native_transport_max_concurrent_requests_in_bytes to a lower number, we start to get paused connections and discarded requests, write latency increased resulting in fewer processed operations, shown in Table 5. The increased write latency is illustrated Figure 4.

Cassandra 3.11.6 Write Latency Under Different Settings
Figure 4: 3.11.6 Write Latency Under Different Settings

On the other hand, read latency decreased, see Figure 5, resulting in a higher number of operations being processed.

Cassandra 3.11.6 Read Latency Under Different Settings
Figure 5: 3.11.6 Read Latency Under Different Settings
Cassandra 3.11.6 Operations Rate Under Different Settings
Figure 6: 3.11.6 Operations Rate Under Different Settings

As illustrated in Figure 6, the total number of operations decreased slightly with the 1MB setting, but the difference is very small and the effect of read and write almost “cancel each other out”. However, when we look at each type of operation individually, we can see that rather than getting equal share of the channel in a default setting of “almost unlimited queue”, the lower queue size penalizes writes and favors read. While our testing identified this outcome, further investigation will be required to determine exactly why this is the case.


In conclusion, the new NTR change offers an improvement over the previous NTR queue behaviour. Through our performance testing we found the change to be good for clusters that have a constant load causing the NTR queue to block. Under the new mechanism the queue no longer blocks, but throttles the load based on the amount of memory allocated to requests.

The results from testing indicated that the changed queue behaviour reduced latency and provided a significant lift in the number of operations without clients timing out. Clusters with our latest version of Cassandra can handle more load before hitting hard limits. For more information feel free to comment below or reach out to our Support team to learn more about changes to 3.11.6 or any of our other supported Cassandra versions.

The post Understanding the Impacts of the Native Transport Requests Change Introduced in Cassandra 3.11.5 appeared first on Instaclustr.

[Webcast] Leave it to Astra: Cassandra-as-a-Service on Google Cloud

Apache Cassandra Usage Report 2020

Apache Cassandra is the open source NoSQL database for mission critical data. Today the community announced findings from a comprehensive global survey of 901 practitioners on Cassandra usage. It’s the first of what will become an annual survey that provides a baseline understanding of who, how, and why organizations use Cassandra.

“I saw zero downtime at global scale with Apache Cassandra. That’s a powerful statement to make. For our business that’s quite crucial.” - Practitioner, London

Key Themes

Cassandra adoption is correlated with organizations in a more advanced stage of digital transformation.

People from organizations that self-identified as being in a “highly advanced” stage of digital transformation were more likely to be using Cassandra (26%) compared with those in an “advanced” stage (10%) or “in process” (5%).

Optionality, security, and scalability are among the key reasons Cassandra is selected by practitioners.

The top reasons practitioners use Cassandra for mission critical apps are “good hybrid solutions” (62%), “very secure” (60%), “highly scalable” (57%), “fast” (57%), and “easy to build apps with” (55%).

A lack of skilled staff and the challenge of migration deters adoption of Cassandra.

Thirty-six percent of practitioners currently using Cassandra for mission critical apps say that a lack of Cassandra-skilled team members may deter adoption. When asked what it would take for practitioners to use Cassandra for more applications and features in production, they said “easier to migrate” and “easier to integrate.”


Sample. The survey consisted of 1,404 interviews of IT professionals and executives, including 901 practitioners which is the focus of this usage report, from April 13-23, 2020. Respondents came from 13 geographies (China, India, Japan, South Korea, Germany, United Kingdom, France, the Netherlands, Ireland, Brazil, Mexico, Argentina, and the U.S.) and the survey was offered in seven languages corresponding to those geographies. While margin of sampling error cannot technically be calculated for online panel populations where the relationship between sample and universe is unknown, the margin of sampling error for equivalent representative samples would be +/- 2.6% for the total sample, +/- 3.3% for the practitioner sample, and +/- 4.4% for the executive sample.

To ensure the highest quality respondents, surveys include enhanced screening beyond title and activities of company size (no companies under 100 employees), cloud IT knowledge, and years of IT experience.

Rounding and multi-response. Figures may not add to 100 due to rounding or multi-response questions.


Practitioner respondents represent a variety of roles as follows: Dev/DevOps (52%), Ops/Architect (29%), Data Scientists and Engineers (11%), and Database Administrators (8%) in the Americas (43%), Europe (32%), and Asia Pacific (12%).

Cassandra roles

Respondents include both enterprise (65% from companies with 1k+ employees) and SMEs (35% from companies with at least 100 employees). Industries include IT (45%), financial services (11%), manufacturing (8%), health care (4%), retail (3%), government (5%), education (4%), telco (3%), and 17% were listed as “other.”

Cassandra companies

Cassandra Adoption

Twenty-two percent of practitioners are currently using or evaluating Cassandra with an additional 11% planning to use it in the next 12 months.

Of those currently using Cassandra, 89% are using open source Cassandra, including both self-managed (72%) and third-party managed (48%).

Practitioners using Cassandra today are more likely to use it for more projects tomorrow. Overall, 15% of practitioners say they are extremely likely (10 on a 10-pt scale) to use it for their next project. Of those, 71% are currently using or have used it before.

Cassandra adoption

Cassandra Usage

People from organizations that self-identified as being in a “highly advanced” stage of digital transformation were more likely to be using Cassandra (26%) compared with those in an “advanced” stage (10%) in “in process” (5%).

Cassandra predominates in very important or mission critical apps. Among practitioners, 31% use Cassandra for their mission critical applications, 55% for their very important applications, 38% for their somewhat important applications, and 20% for their least important applications.

“We’re scheduling 100s of millions of messages to be sent. Per day. If it’s two weeks, we’re talking about a couple billion. So for this, we use Cassandra.” - Practitioner, Amsterdam

Cassandra usage

Why Cassandra?

The top reasons practitioners use Cassandra for mission critical apps are “good hybrid solutions” (62%), “very secure” (60%), “highly scalable” (57%), “fast” (57%), and “easy to build apps with” (55%).

“High traffic, high data environments where really you’re just looking for very simplistic key value persistence of your data. It’s going to be a great fit for you, I can promise that.” - Global SVP Engineering

Top reasons practitioners use Cassandra

For companies in a highly advanced stage of digital transformation, 58% cite “won’t lose data” as the top reason, followed by “gives me confidence” (56%), “cloud native” (56%), and “very secure” (56%).

“It can’t lose anything, it has to be able to capture everything. It can’t have any security defects. It needs to be somewhat compatible with the environment. If we adopt a new database, it can’t be a duplicate of the data we already have.… So: Cassandra.” - Practitioner, San Francisco

However, 36% of practitioners currently using Cassandra for mission critical apps say that a lack of Cassandra-skilled team members may deter adoption.

“We don’t have time to train a ton of developers, so that time to deploy, time to onboard, that’s really key. All the other stuff, scalability, that all sounds fine.” – Practitioner, London

When asked what it would take for practitioners to use Cassandra for more applications and features in production, they said “easier to migrate” and “easier to integrate.”

“If I can get started and be productive in 30 minutes, it’s a no brainer.” - Practitioner, London


We invite anyone who is curious about Cassandra to test the 4.0 beta release. There will be no new features or breaking API changes in future Beta or GA builds, so you can expect the time you put into the beta to translate into transitioning your production workloads to 4.0.

We also invite you to participate in a short survey about Kubernetes and Cassandra that is open through September 24, 2020. Details will be shared with the Cassandra Kubernetes SIG after it closes.

Survey Credits

A volunteer from the community helped analyze the report, which was conducted by ClearPath Strategies, a strategic consulting and research firm, and donated to the community by DataStax. It is available for use under Creative Commons Attribution-ShareAlike 4.0 International (CC BY-SA 4.0).