Troubleshooting Large Rows and Large Cells in Scylla

We’ve talked in the past about how Scylla helps you find large partitions. But sometimes you need to get even more granular to get to the heart of what might be causing a hiccup in your database performance. Below we describe how to detect large rows and large cells in Scylla.

While it tries its best to handle them, Scylla is not optimized for very large rows or large cells. They require allocation of large, contiguous memory areas and therefore may increase latency. Rows may also grow over time. For example, many insert operations may add elements to the same collection, or a large blob can be inserted in a single operation.

Similar to the large partitions table, the large rows and large cells tables are updated when SSTables are written or deleted, for example, on memtable flush or during compaction. We added the means to search for large rows and large cells when we added the SSTable “mc” format with Scylla 3.0 and Scylla Enterprise 2019.1. This SSTable format is enabled by default on Scylla Open Source 3.1 and above.

Find Large Rows

For example, look at an example of the system.large_rows table:

Let’s break down what each of these columns and values represent:

Parameter Description
keyspace_name The keyspace name that holds the large partition
table_name The table name that holds the large partition
sstable_name The SSTable name that holds the large partition
row_size The size of the row
clustering_key The clustering key that holds the large row
compaction_time Time when compaction occur

Next, let’s look at a simple CQL query for a specific keyspace and or table within all the large rows. For example if we were looking for the keyspace demodb and table tmcr:

SELECT * FROM system.large_rows WHERE keyspace_name = 'demodb' AND table_name = 'tmcr;

Find Large Cells

To find large cells, let’s look at the system.large_cells table:

And similarly, let’s understand the specific parameters of this table:

Parameter Description
keyspace_name The keyspace name that holds the large partition
table_name The table name that holds the large partition
sstable_name The SSTable name that holds the large partition
row_size The size of the row
clustering_key The clustering key that holds the large row
column_name The column of the large cell
compaction_time Time when compaction occur

The main difference between the large row and large cell tables is the addition of the column_name in the latter.

For example, if we were looking for the keyspace demodb and table tmcr, use this CQL query:

SELECT * FROM system.large_cells WHERE keyspace_name = 'demodb' AND table_name = 'tmcr;


Configuration of the large row and cell detection threshold in the scylla.yaml file uses the following parameters:

  • compaction_large_row_warning_threshold_mb parameter (default: 10MB)
  • compaction_large_cell_warning_threshold_mb parameter (default: 1MB)

Once the threshold is reached, the relevant information is captured in the large_row / large_cell table. In addition, a warning message is logged in the Scylla log (refer to our documentation on logging).


Large rows and large cells are stored in system tables with the following schemas:

Expiring Data

In order to prevent stale data from appearing, all rows in the system.large_rows and system.large_cells tables are inserted with Time To Live (TTL) equal to 30 days.


Large rows and large cells are unfortunate but frequently found artifacts when users are first beginning data modeling with Scylla. Often users don’t anticipate how their early data modeling decisions will impact performance until they go into production. That’s why we feel it is vital to put tools in the hands of users to be able to easily detect and quickly troubleshoot these data phenomena.

Have you run into problems with large partitions, rows or cells that caused you some worried hours or sleepless nights? How did you solve them? We’d love to hear your war stories. Write to us, or join our Slack channel to tell us all about it.

In the meanwhile, if you want to improve your skills with Scylla, make sure you take our Scylla University course on data modeling, with sections for both beginners and advanced users. It’s completely free!



The post Troubleshooting Large Rows and Large Cells in Scylla appeared first on ScyllaDB.

Comparing stress tools for Apache Cassandra

Editors Note: The Last Pickle was recently acquired by DataStax and as part of the new DataStax mission of reorienting to embrace open source Apache Cassandra, this is the first in a series of blog posts that will compare new open source offerings, particularly those now coming out of the new DataStax. In open source spirit we want to embrace you, the community, in choosing the right tool for the right job.

Benchmarking and stress testing Apache Cassandra are important exercises that both operators and developers do regularly. Approaches come in numerous flavours, from “look how my code is better than everyone else’s”, to “what do I need to run this?” and “how much money will this save me?”, and my favourite, “when this dies a horrible death what will that look like?”.

Knowing what you want to achieve and how to interpret the results of these tests is a big part of the challenge. With a run through of these available tools, hopefully that will become easier.

Comparing stress tools for Apache Cassandra

This blog post will look at and compare the following three stress tools:

With these three tools we will step through a number of basic use cases:

  1. Just Generate Some Load
  2. Using a Real Cluster
  3. Iteration and Thread Counts
  4. Observability
  5. Batch sizes and Overwrites
  6. Predefined Workloads
  7. Custom Workloads
  8. Client Contention and Server Saturation

The versions of the tools used in these steps are 3.11.6 for cassandra-stress, 4.0.0 for tlp-stress, and 3.12.77 for nosqlbench.

1. Just Generate Some Load

Sometimes all you want to do is generate some load or data. This is good for when all we want is a cassandra node that is doing something. It can be just to raise the CPU, or to generate some commitlogs, memtables, or sstables on disk.

Each tool will generate a slightly different load configration for these tests:

$ cassandra-stress write

      Performs over a million writes (after an initial 50k warmup writes) iterating a number of times increasing the number of threads used in the client, starting with four threads.

$ tlp-stress run BasicTimeSeries -i 1M

      Performs exactly one million requests with a 9:1 write-to-read ratio.

$ nb cql-iot write_cl=LOCAL_ONE

      Performs ten million writes during a warmup phase and then ten million requests with a 9:1 write-to-read ratio.

All of them execute writes connected to a localhost Cassandra node, using the java-driver and consistency level LOCAL_ONE.

There is a difference in the model, however, as cassandra-stress uses a simple key value table, while tlp-stress and nosqlbench are using time-series data models.

2. Using a Real Cluster

This repeats the exercise of just generating any load or data, but is used when you have an actual cluster you are targeting.

$ cassandra-stress write -node cassandra1

$ tlp-stress run BasicTimeSeries --host cassandra1

$ nb cql-iot host=cassandra1

Note: There is no need to specify multiple hosts with any of these stress tools. These are contact hosts that are passed to the java driver, and unlike a coded application where you would want multiple contact hosts specified for reliability during deployment and startup, with a stress tool invocation it is reasonable to expect the single contact host specified to be up and running.

3. Iteration and Thread Counts

The following shows how to specify the number of iterations and the number of threads to use, in each of the tools.

$ cassandra-stress write n=100000 -rate threads=16

$ tlp-stress run BasicTimeSeries -n 100k -t 16

$ nb cql-iot write_cl=LOCAL_ONE main-cycles=100k threads=16

4. Observability

Even with well-designed workloads there is a lot more to benchmarking than the final throughput numbers. We want to see how the cluster operates over time. This can be from spikes in traffic to the many background operations Cassandra can perform. Taking a closer look at how Cassandra performs helps plan for a healthy and stable cluster over a longer period of time that what we are able to benchmark.

$ cassandra-stress write -graph file=example-benchmark.html title=example revision=benchmark-0

      For more information on this read our previous blog post on Cassandra-Stress and Graphs.  

$ nb cql-iot write_cl=LOCAL_ONE --docker-metrics

      Then open http://localhost:3000 in your browser. Note this only works on linux and requires docker to be running on the host. For more info see here.



      Note: tlp-stress has no similar observability feature, but does export Prometheus metrics on port 9501.

The out of the box generated graphs from cassandra-stress are a nice feature. For any serious benchmarking activity though you will want to have metrics from Cassandra graphed and to have insight into the stress tools behaviour beyond just performance numbers.

5. Batch sizes and Overwrites

This invocation is of particular interest because it has been a pain for those using cassandra-stress. In Cassandra, unlogged batches are not normal and not recommended unless for very small groupings (10-20) of rows within the one partition.

cassandra-stress, by default, puts all writes for any partition into single batches, which makes for poor and unrealistic results. It is impossible to get cassandra-stress to not use batches, and quite convoluted to get it to write batches that consist only of single inserts. More info on this can be read in this ticket CASSANDRA-11105

Overwrite and deletes are not something we see a lot of among published Cassandra benchmarks because its harder to implement. Often this makes sense as workloads like key-value and time-series are likely not overwrite data models. Yet, there are plenty of data models out there that do require these patterns and that we would like to benchmark.


      First download batch_too_large.yaml.

$ cassandra-stress user profile=batch_too_large.yaml ops\(insert=1\) -insert visits=FIXED\(10M\)  


      tlp-stress does not perform unlogged batches by default like cassandra-stress. If unlogged batches are desired you need to write your own workload, see the Custom Workloads section.

      tlp-stress does make deletes very easy, treating them in a similar fashion to the read rate flag. This will make 10% of the operations deletes of previously written data

$ tlp-stress run KeyValue --deletes 0.1

      tlp-stress does overwrites in a similar way to cassandra-stress. This will write 100k operations over 100 partitions. Without clustering keys, this is roughly 1k overwrites on each partition

$ tlp-stress run KeyValue -p 100 -n 100k  


      nosqlbench can handle overwrites in the same manner as cassandra-stress and tlp-stress by providing a smaller partition count than the iteration count. nosqlbench does not currently provide any deletes or unlogged batches examples. Logged batches have been implemented with custom workloads, so deletes and unlogged batches are probably possible too with a custom implemented workload.  

6. Predefined Workloads


      cassandra-stress does not have built in workloads. You need to specify the user mode and supply your own configuration as shown in the next section.  


      tlp-stress has the most extensive list of workloads. These workloads have been used at TLP to demonstrate real limitations with certain features and to provide a hands on approach to recommending the best production solutions.

$ tlp-stress list

    Available Workloads:


$ tlp-stress run CountersWide  


      nosqlbench lists the workloads from its predefined yaml workload files. Within these workloads it lists the different phases that are used, and that can be combined. This offers us our first glimpse of how complex and specific a workload can be defined. It also lists the sequences workload, which is not based on the cql driver.

$ nb --list-workloads

    from: cql-keyvalue.yaml
    from: cql-iot.yaml
    from: cql-iot-dse.yaml
    from: cql-tabular.yaml
    from: sequences.yaml

$ nb cql-tabular  

7. Custom Workloads

A benchmark that is part of a feasibility or capacity planning exercise for production environment will nearly always require a custom defined workload.  


      For cassandra-stress an example of this was done for the Zipkin project. cassandra-stress can not benchmark more than one table at a time, so there is a separate workload yaml for each table and these have to run as separate invocations each. Here we see that cassandra-stress does not support Zipkin’s full schema, specifically UDTs and collections, so the folder above also contains some cql files to create a schema we can stress.

      Create the zipkin test schema

cqlsh -f zipkin2-test-schema.cql

      Fill this schema with some data, throttle as appropriate

$ cassandra-stress  user profile=span-stress.yaml ops\(insert=1\) no-warmup duration=1m  -rate threads=4 throttle=50/s

      Now benchmark a mixed read and write workload, again throttle as appropriate

$ cassandra-stress  user profile=span-stress.yaml ops\(insert=1,by_trace=1,by_trace_ts_id=1,by_annotation=1\)  duration=1m  -rate threads=4 throttle=50/s  

As can be seen above, creating custom workloads in cassandra-stress has always been an involved and difficult experience. While tlp-stress and nosqlbench improve on this situation, they each do so in different ways.


      nosqlbench provides all of its workload configurations via yaml files. Getting the hang of these will be quite daunting for the newcomer, but along with the documentation provided, and practicing first with taking and tweaking the predefined workloads, there’s a wealth of possibility here.  


      tlp-stress on the other hand focuses on writing workloads in the code. tlp-stress is written in Kotlin, so if you find Kotlin enjoyable you will find it quick and intuitive to write workloads. The existing workloads can be found here, take a peek and you will see that they are quite simple to write.  

8. Client Contention and Server Saturation

Which benchmark tool is faster? That may sound like a weird question, but it opens some real concerns. Not just in choosing what hardware to run the client on, or how many clients are required, but to know when the results you are getting are nonsense. Understanding the load you want to generate versus what you need to measure is as important to benchmarking as the workload.

It is important to avoid saturating the server. Any benchmark that pushes throughput to its limit is meaningless. A real world (and overly simplified) comparison of this is in OLAP clusters, like those paired with Apache Spark, where without appropriate thresholds put onto the spark-cassandra-connector you can get a yo-yo effect on throughput as the cluster saturates, jams up, and then accepts writes again. With tuning and throughput thresholds put into place, higher throughput is sustainable over time. Responsiveness Under Load (RUL) benchmark is where we apply such throughput limits and observe responsiveness instead.

These problems extend into the client stress tool as well. Unlike the server that can block or load-shed at the defined throughput threshold, the client’s throughput of operations can be either limited or scheduled. This difference can be important, but explaining it goes beyond this blog post. For those interested I’d recommend reading this post on Fixing Coordinated Omission in Cassandra Stress.  

$ cassandra-stress write -rate threads=4 fixed=50/s  


      nosqlbench has no scheduler per se, but deals with reducing coordinated omission via asynchronous requests and a non-fixed thread count. More information on nosqlbench’s timing terminology can be found here.

$ nb cql-iot cyclerate=50 async=256 threads=auto

      Very few production clusters ever demonstrate constant throughput like this, so benchmarking bursts is a real thing. Currently only nosqlbench does this in-process.

$ nb cql-iot cyclerate=50,1.5 async=256 threads=auto

      This specifies a rate threshold of 50 operations per second, with bursts of up to 50%. More information on bursts is available here


      tlp-stress does not deal with Coordinated Omission. Its --rate flag relies on google’s RateLimiter and limits the throughput, but does not schedule.  


Looking through the documentation for each of the tools it is easy to see that nosqlbench offers substantially more. But tlp-stress docs are elegant and easy for the beginner, though they are still missing information on how to implement your own workload (or profile as tlp-stress refers to them).

Wrap Up

cassandra-stress is an advanced tool for a very narrow applications against Cassandra. It is quickly a clumsy user-experience and often requires adventures into some awkward code to understand and get things working as expected.

tlp-stress was written as a replacement to cassandra-stress. Apart from not dealing with Coordinated Omission it succeeds in that goal in every aspect: good documentation, a rich command-line user-interface, and is an easy code to understand and contribute to.

nosqlbench takes the next step, aiming to be a YCSB replacement. It feels like a power-user’s tool and comes with the features and capacity to earn that title. Expect to see more and more workloads be made available for testing lots of different technologies in the NoSQL world.

“Around the World” – Globally Distributed Storage, Streaming and Search: An Introduction to Cassandra Multi-Data Centers – Part 2

In this second blog in our “Around the World in (Approximately) 8 Data Centers” series we catch our first mode of transportation (Cassandra) and explore how it works to get us started on our journey to multiple destinations (Data Centers).

1. What is a (Cassandra) Data Center?

What does a Data Center (DC) look like? Here are some cool examples of DCs (old and new)!

Arguably the first “electronic” Data Center was ENIAC, circa 1946. It was, however, just a single (albeit monster) machine! It weighed more than 30 tonnes, occupied 1,800 square feet, consumed 200kW of power, got up to 50C inside, and was rumoured to cause blackouts in neighboring Philadelphia when it was switched on!

first electronic data centre

Jumping forward to the present, a photo of Google DC shows mainly cooling pipes. In common with ENIAC, power and heat are still a feature of modern DCs:

Google data centre

Google Data Center (plumbing)

So what is a Cassandra Data Center?! Ever since I started using Cassandra I’ve been puzzled about Cassandra Data Centers (DCs). When you create a keyspace you typically also specify a DC name, for example:

    WITH replication = {'class': 'NetworkTopologyStrategy', ‘DCHere’ : 3};

The NetworkTopologyStrategy is a production ready replication strategy that allows you to have an explicit DC name. But why do you need an explicit DC name? The reason is because you can actually have more than one Data Center in a Cassandra Cluster, and each DC can have a different replication factor, for example, here’s an example with two DCs:

CREATE KEYSPACE here_and_there
    WITH replication = {'class': 'NetworkTopologyStrategy', ‘DCHere’ : 3,  ‘DCThere' : 3};

So what does having multiple DCs achieve? Powerful automatic Global replication of data! Essentially you can easily create a globally distributed Cassandra cluster where data written to the local DC is asynchronously replicated to all the other DCs in the keyspace.  You can have multiple keyspaces with different DCs and replication factors depending on how many copies and where you want your data replicated to.

But where do the Cassandra DCs come from? Well, it’s easy to create a cluster in a given location and Data Center name in Instaclustr Managed Cassandra!

When creating a Cassandra cluster using the Instaclustr console, there is a section called “Data Center” where you can select from options including: 

Infrastructure Provider, Region, Custom Name, Data Center Network address block, Node Size, EBS Encryption option, Replication Factor and number of nodes.

The Custom Name is a logical name you can choose for a Data Center within Cassandra and is how you reference the Data Center when you create a keyspace with NetworkTopologyStrategy

So that explains the mystery of single Cassandra Data Center creation. What does this look like once it’s provisioned and running? Well, you can use CQLSH to connect to a node in the cluster, and then discover the Data Center you are connected to as follows:

cqlsh> use system;
cqlsh:system> select data_center from local;


How about multiple DCs?  Using Instaclustr managed Cassandra the simplest way of creating multiple DC Cassandra clusters is to create a single DC cluster first (Call it ‘DCHere’). Then in the management console for this cluster, click on “Add a DC”.  You can add one DC at a time to create a cluster with the total number of DCs you need, just follow our support documentation here and here.

2. Multi-Data Center experiments with cqlsh

So, to better understand how Cassandra DCs work I created a test cluster with 3 nodes in each of three DCs, located in Sydney, Singapore and North Virginia (USA) AWS regions (9 nodes in total) as follows:

Cassandra test cluster with 3 nodes in each of three DCs
For this experiment, I used cqlsh running on my laptop, located in Canberra (close to Sydney). My initial goal was limited simply to explore latencies and try out failures of DCs. 

To measure latency I turned “Tracing on”, and to simulate DC failures I created multiple keyspaces, connected cqlsh to different DCs, and used different consistency levels. 

I created three separate keyspaces for each DC location. This doesn’t result in data replication across DCs, but instead directs all data written to any local DC to the single DC with RF > = 1. I.e. All data will be written to (and read from) the DCSYdney DC for the “sydney” keyspace:

Create keyspace "sydney" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 0, 'DCSingapore' : 3, 'DCUSA' : 0 }; 

Create keyspace "singapore" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 0, 'DCUSA' : 0 }; 

Create keyspace "usa" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 0, 'DCUSA' : 0 };

I used a fourth keyspace for replication. Because this has multiple DCs with RF >= 1 the data will be replicated across all of the DCs. I.e. data written to any local DC will be written locally as well as to all other DCs:

Create keyspace "everywhere" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 3, 'DCUSA' : 3 };

2.1 Latency

First let’s look at latency.  To run latency tests I connected cqlsh to the Sydney Data Center.

I varied which keyspace I was writing or reading to/from (Keyspace column), and used consistency level ONE for all of these. ONE means that a write must be written to and acknowledged by at least one replica node, in any DC, so we don’t expect any write/read errors due to writing/read in a local DC which is different to the DC’s in the keyspace. The results show that latency increases from a minimum of 1-2ms (Sydney), to 200ms (Singapore) and 231ms (USA). Compared to the average inter-region network latencies I reported in the previous blog these latencies are 14% higher – the Singapore latency is 200ms (c.f. 174ms), and the USA latency is 231ms (c.f. 204ms). Longer times are to be expected as there is a Cassandra write or read included in this time, on top of the basic network latency. As expected (using consistency ONE) all of the operations succeeded. This table shows the results:

What does this reveal about how Cassandra keyspaces and DCs work? Cqlsh is connected to the sydney DC as the local DC. For the keyspaces that just have a single DC, the write or read operation can only use that DC and therefore includes the overhead of network latency for the local DC to communicate with the remote DC (with no network overhead for sydney). However, for the “everywhere” keyspace which contains all three DCs, it behaves as if it’s just using the local DC and therefore has a low latency indistinguishable to the results for the “sydney” keyspace. The difference is that the row is written to all the other DCs asynchronously, which does not impact the operation time.  This picture shows the latencies on a map:

3 Cassandra DC's Latencies

2.2 Failures

I also wanted to understand what happens if a DC is unavailable. This was tricky to achieve acting as a typical user for Instaclustr managed Cassandra (as there’s no way for users to stop/start Cassandra nodes), so I simulated it by using permutations of local DC, keyspaces and a consistency level of LOCAL_ONE (a write must be sent to, and successfully acknowledged by, at least one replica node in the local DC). This also allows customers to try this out as well. Using LOCAL_ONE means that if cqlsh is connected to the Sydney DC, and the keyspace has a Sydney DC with RF >= 1 then writes and reads will succeed. But if the keyspace only has DCs in other regions (Singapore or USA) then the writes and reads will fail (simulating the failure of remote DCs).  This table shows the results of this experiment:

The results are almost identical to before, but with the key difference that we get a NoHostAvailable error (and therefore no latencies) when the keyspaces are singapore or usa. The keyspace of sydney or everywhere works ok still as the sydney DC is available in both cases. 

Note that Cassandra consistency levels are highly tunable, and there are more options that are relevant to multi-DC Cassandra operation. For example, ALL and EACH_QUORUM (writes only) work across all the DCs, and have stronger consistency, but at the penalty of higher latency and lower availability.

3. Multi-Data Centers experiments with the Cassandra Java Client

Around the world in 80 days

In our journey “around the world” It’s important to always have the latest information! As Phileas Fogg discovered, train timetables, like Cassandra documentation, can get out of date very quickly.

I was also interested in testing out the Cassandra Java client with my multi-DC cluster.   I had previously read that by default it supports automatic failover across multiple DCs which I thought would be interesting to see happening in practice. The DCAwareRoundRobinPolicy was recommended in the O’Reilly book “Learning Apache Cassandra (2nd edition 2017)” which says that “this policy is datacenter aware and routes queries to the local nodes first”. This is also the policy I used in my first attempt to connect with Cassandra way back in 2017!

However, a surprise was lying in wait! It turns out that since version 4.0 of the Java client there is no longer a DCAwareRoundRobinPolicy!  

Instead, the default policy now only connects to a single Data Center, so naturally there is no failover across DCs. You must provide the local DC name and this is the only one the client connects to. This also means that it behaves exactly like the last (LOCAL_ONE) results with cqlsh. This prevents potentially undesirable data consistency issues if you are using DC-local consistency levels but transparently failover to a different DC.

You can either handle any failures in the Java client code (e.g. if a DC is unavailable, pick the backup Cassandra DC and connect to that), or probably the intended approach is for the entire application stack in the DC with the Cassandra failure to failover to a complete copy in the backup region. I tried detecting a failure in the Java client code, and then failing over to the backup DC. This worked as expected. However, in the future I will need to explore how to recover from the failure (e.g. how do you detect when the original DC is available and consistent again, and switch back to using it).

3.1 Redundant low-latency DCs

This brings us full circle back to the first blog in the series where we discovered that there are 8 georegions in AWS that provide sub 100ms latency to clients in the same georegion:

8 georegions in AWS that provide sub 100ms latency to clients in the same georegion

The 8 AWS georegions with sub 100ms latency for geolocated clients

Which we then suggested could be serviced with 8 DCs to provide DC failover in each georegion as follows:

8 DCs to provide DC failover in each georegion

The matched pairs of DCs to provide High Availability (HA) and failover for each georegion look like this in table form. These are pairs of DCs that the application code will need to have knowledge of and failover between:

Table - Matched pairs of DCs to provide High Availability (HA) and failover for each georegion

In practice, the read load of the application/client would need to load balance over both of the Data Centers for some georegions (E.g. North America georegion load balances across both West and East coast DCs), and depending on the data replication strategy (just replicating data written in each georegion to both redundant DC pairs, or to all DCs in the cluster – this really depends on the actual Use Case), and the expected extra load on each DC due to failover, DC cluster sizes will need to be sufficient to cope with the normal read/write loads on each DC, the replication write load (write amplification), and load spikes due to DC failures and failover to another DC. 

Based on these limited experiments we are now ready for the next Blog in the series, where we’ll try multi-DC Cassandra out in the context of a realistic globally distributed example application, potentially with multiple keyspaces, Data Centers and replication factors, to achieve goals including low latency, redundancy, and replication across georegions and even Around The World.

The post “Around the World” – Globally Distributed Storage, Streaming and Search: An Introduction to Cassandra Multi-Data Centers – Part 2 appeared first on Instaclustr.

Scylla Manager Releases 2.0.2 and 1.4.4

Scylla Manager Release Note

Scylla Manager Release 2.0.2

Scylla Manager is a management system that automates maintenance tasks on a Scylla cluster.
Release 2.0.2 is a bug fix release of the Scylla Manager 2.0 branch.

Scylla Enterprise customers are encouraged to upgrade to Scylla Manager 2.0.2 in coordination with the Scylla support team.

Useful Links:

Bugs fixed and improvement in this release

  • Backup:
    • Fix for a memory leak in AWS SDK making Scylla Manager Agent consume huge amounts of memory when uploading data to S3.
    • New scyllamgr_agent_setup script that helps to set up a cgroup for Scylla Manager Agent to run in.
    • Improvements to S3 upload resilience and stability.
    • Improvements to backups files listing containing very large amounts of files.
  • Repair:
    • Relevant for Open-Source Scylla versions 3.1+, adjustments to Scylla Row Level Repair. For the relevant versions Scylla Manager would not split tokens to shards.

Scylla Manager Release 1.4.4

The Scylla team is also pleased to announce the release of Scylla Manager 1.4.4.

Scylla Manager is a management system that automates maintenance tasks on a Scylla cluster. Release 1.4.4 is a bug fix release of the Scylla Manager 1.4 release.

Please note that the latest Scylla Manager Stable branch is Manager 2.0.x, and it is recommended to upgrade to it.

Related Links

Bugs fixed and improvement in this release

  • Skip repairing keyspaces having no replicas in given DC. Repair task fails when repair is being run on DC where kespace doesn’t have any replicas. This change allows skipping such keyspace and repairing all the rest keyspaces.
  • Improvements to ssh communication reliability allow Manager to apply retries after a connection reset.

The post Scylla Manager Releases 2.0.2 and 1.4.4 appeared first on ScyllaDB.

Scylla Enterprise Release 2019.1.7

Scylla Enterprise Release Notes

The ScyllaDB team announces the release of Scylla Enterprise 2019.1.7, a production-ready Scylla Enterprise patch release. As always, Scylla Enterprise customers are encouraged to upgrade to Scylla Enterprise 2019.1.7 in coordination with the Scylla support team.

The focus of Scylla Enterprise 2019.1.7 is improving stability and bug fixes. More below.

Related Links

Fixed issues in this release are listed below, with open source references, if present:

  • Stability: long-running cluster sees bad gossip generation when a node restarts #5164 (followup to a fix in 2019.1.4)
  • Active nodes wrongfully marked as DOWN #5800
  • In some rare cases, scylla node crashing after upgrade to 2019.1.6 when scanning a table containing a partition which has range-tombstones with a prefix start bound right at the end of the partition

The post Scylla Enterprise Release 2019.1.7 appeared first on ScyllaDB.

Observing data changes with Change Data Capture (CDC)


Nowadays it is no longer enough to just store data and process it once or twice a day with a batch job. More and more use cases require an almost immediate reaction to modifications occurring in the database. In a fast-moving world, the amount of new information is so big that it has to be processed on the spot. Otherwise, the backlog will grow to an unmaintainable size. We, at ScyllaDB, are aware of these challenges and would like to make it easier for Scylla users to build a streaming data pipelines that allow real-time data processing and analysis. This is why we have been working hard to bring a new feature, called Change Data Capture, to you.

What is Change Data Capture?

Change Data Capture (CDC) enables users to log all the mutations of data in a selected table(s). It does not capture changes on every table in the database but can be enabled on specific tables that a user is interested in observing. For the sake of simplicity, let’s assume for this blogpost that CDC was enabled on a single table. Let’s call this table a Base Table. On a very high level, CDC maintains a log of all changes applied to the Base Table. We call it CDC Log. It is a regular Scylla table created in the same keyspace as the Base Table and it can be accessed using regular CQL interface. Tools like cqlsh or driver libraries will be perfectly good means of accessing CDC Log. This means a user can not only check the log manually but also write a script or an application that can continuously monitor the changes. The content of the CDC Log is configurable in Scylla. The following information can be included:

  • The key of the record being changed
  • The actual modification
  • The state of the record before the change (we call it preimage)
  • The state of the record after the change (we call it postimage)

In Scylla Open Source 3.3, only the first three are supported. Postimage has been added into the Scylla Open Source 4.0 major branch. It is best to look at an example to get a better understanding of those concepts. Table used for the example will have the following schema:

CREATE TABLE company.employees (
    department text,
    last_name text,
    first_name text,
    age int,
    level int,
PRIMARY KEY (department, last_name, first_name)

Let’s assume that the table already has the following row:

department last_name first_name age level
Production Smith John 35 2

If we perform the following operation:

UPDATE employees SET level = 3 WHERE department = 'Production' AND last_name = 'Smith' AND first_name = 'John';

then the key of the change is composed of primary key columns (department, last_name, first_name) and in this example is equal to (‘Production’, ‘Smith’, ‘John’). Actual modification is primary key with the actual change. In this example only the level column was changed so the value of the modification is (‘Production’, ‘Smith’, ‘John’, null, 3). Preimage is the state of the changed data so in this case it is equal to (‘Production’, ‘Smith’, ‘John’, null, 2). One thing worth noting here is that column age is null in the preimage because the column was not involved in the operation. Even though it had a value of 35 before the modification. Finally, postimage for this operation is (‘Production’, ‘Smith’, ‘John’, 35, 3). Postimage, in opposition to preimage, always contains all the columns in the row no matter whether they were changed by the operation or not.

To achieve high performance, CDC Log is partitioned into a number of CDC streams. Each CDC stream is identified by a unique stream id and stores all the entries that describe modifications to data stored on a single shard on the Scylla node. The feature is designed this way to minimize the impact of CDC on Base Table write performance. As a result, CDC Log data is co-located with the Base Table data it describes. Co-location is not only in terms of nodes in the cluster but also within shards on the node.

Entries in CDC Log are sorted by the time the change they describe appeared. Because the log is partitioned, the order is maintained only within a stream. It is guaranteed though that all the changes applied to a single row will be in the same stream and will be sorted by the time they occurred.

CDC Log does not store its entries indefinitely. Records are removed using the standard TTL mechanism. By default, rows are removed after 24 hours but it can be configured to a different value. It is important to set this value to something big enough to allow a CDC client to consume all the changes before they disappear from the CDC Log table.

CDC Log format

For the example table mentioned above, CDC Log would have the following structure:

CREATE TABLE company.employees_scylla_cdc_log (
    cdc$stream_id blob, cdc$time timeuuid, cdc$batch_seq_no int,
    cdc$operation tinyint, cdc$ttl bigint,
    department    text,
    first_name    text,
    last_name     text,
    age            int,    cdc$deleted_age    boolean,
    level          int,    cdc$deleted_level  boolean,
    PRIMARY KEY (cdc$stream_id, cdc$time, cdc$batch_seq_no)

It will be automatically created in the same keyspace as the Base Table and its name will be the name of the Bast Table with a ‘_scylla_cdc_log’ suffix. It contains 3 groups of columns. The first group is a copy of all the columns from the Base Table:

CREATE TABLE company.employees_scylla_cdc_log (
    cdc$stream_id blob, cdc$time timeuuid, cdc$batch_seq_no int,
    cdc$operation tinyint, cdc$ttl bigint,
    department    text,
    first_name    text,
    last_name     text,
    age            int,    cdc$deleted_age    boolean,
    level          int,    cdc$deleted_level  boolean,
    PRIMARY KEY (cdc$stream_id, cdc$time, cdc$batch_seq_no)

Their names and types are exactly the same as in the employees table. For each of these columns that are not part of the primary key, CDC Log contains a boolean column that is used to indicate whether a given column has been deleted in a given operation. Names of those columns follow the pattern cdc$deleted_. Those columns form the next group:

CREATE TABLE company.employees_scylla_cdc_log (
    cdc$stream_id blob, cdc$time timeuuid, cdc$batch_seq_no int,
    cdc$operation tinyint, cdc$ttl bigint,
    department    text,
    first_name    text,
    last_name     text,
    age            int,    cdc$deleted_age    boolean,
    level          int,    cdc$deleted_level  boolean,
    PRIMARY KEY (cdc$stream_id, cdc$time, cdc$batch_seq_no)

Finally, there is the last group of columns that are not related to the columns in the Base Table. Three of them form the primary key of the column. cdc$stream_id is a specially selected partition key that leads to a co-location between a row in the Base Table and corresponding CDC Log records. Both in terms of servers in the cluster and shards on the node. Next there are two columns that compose a clustering key: cdc$time and cdc$batch_seq_no. cdc$time is the time of the change. It is represented as TimeUUID to avoid clashes of two operations that happened at the same time. cdc$batch_seq_no is a column that allows each operation to create multiple records in CDC Log. All records created for a single operation share cdc$stream_id and cdc$time but differ in cdc$batch_seq_no value. Another important column is called cdc$operation. It reflects the type of the operation that caused the addition of a given CDC Log record. At the moment there are following operation types:


PREIMAGE and POSTIMAGE are used to store records for those two special types of information. UPDATE, INSERT, ROW DELETE and PARTITION DELETE are self explanatory but options 5-8 might not be that clear. Those can be created with a range deletion like this:

DELETE FROM employees WHERE department = 'Production'
    AND last_name >= 'A' AND last_name < 'C';

The last column that has not been described yet is cdc$ttl. It is used to capture the TTL used with the operation recorded in CDC Log.

When can Change Data Capture be useful?

Change Data Capture can be used in multiple use cases. Most of them can be grouped into two categories: integration with other systems and real-time processing of data. Real-time processing can be done for example with Kafka Streams or Spark and is useful for triggers and monitoring. For example, it might be desirable to send an sms to a user if a logging attempt is performed from a country the user does not live in. Change Data Capture is also useful to replicate data stored in Scylla to other systems that create indexes or aggregations for it.

How are we different than Cassandra?

We took a completely different approach to Change Data Capture than the one used in Apache Cassandra. Cassandra’s implementation is very simple and pushes a lot of the work to the CDC client. Cassandra copies segments of commit log into a separate directory and that’s pretty much all that’s done. To consume Cassandra’s CDC, a user has to start a special agent on every machine in the cluster. Such an agent has to parse commitlog-like segments stored in the special directory and after it’s finished with processing the data, it has to clean up the directory. This approach, although simple to implement, causes multiple problems to the user. First, data is usually replicated to multiple nodes in the cluster and the user agent working on a single node knows nothing about the other replicas. This leaves the problem of conflict resolution and data reconciliation unsolved and pushed onto a user. Another problem is that if the agent does not process the data fast enough, the node can run out of disk space.

At ScyllaDB, we think that for Change Data Capture to be useful it has to solve those problems and provide users with a familiar access method that takes care of all the peculiarity related to the internal workings of the database. Thus, our CDC Log is a regular table accessible with CQL protocol and all standard tools. It means our implementation is more complex and has to deal with all the problems caused by the fact that Scylla is a distributed system but we feel that it wouldn’t be ok to push our problems to users.


We have only very preliminary results of Change Data Capture performance testing. Initial results are very promising though. We compared Scylla write performance in three variants: with CDC disabled, with CDC capturing key and actual modification, and with CDC capturing key, actual modification and preimage. The results for i3.4xlarge machines are shown in the table below:

No CDC Keys + modification Keys + modification + preimage
Throughput (ops) 196,713 122,243 (-37%) 63,463 (-67%)
Mean latency (ms) 4.2 6.6 (+57%) 12.6 (+199%)
99th percentile latency (ms) 8.4 12.4 (+47%) 23.6 (+180%)

It is apparent that CDC has a cost to it. With CDC you are essentially doubling the writes, and with a preimage, you must first read a record (to know what it had been set at), then modify it, and then write the results (original as well as new values) to the CDC table. You should take these throughput and latency effects into account when capacity planning.

Early preview in Scylla — Give it a try!

The early preview of the CDC was first released with Scylla Open Source 3.2, and improved for Scylla Open Source 3.3. We encourage you to play with the new feature and let us know what you think about it. Any questions and feedback are very much appreciated. The best way to reach us is Scylla mailing list (!forum/scylladb-users on the web, or via email at or through our Scylla Users Slack (

Important Note: Change Data Capture in Scylla Open Source 3.x and 4.x is still an experimental feature. It is under heavy development and some of its details, including user-facing API, may change before CDC is complete. It may be required to disable CDC on all tables before upgrading to the newer version of Scylla. We believe that the version included in Scylla Open Source 4.0 release should be very close to the final shape of the solution. With the release of Scylla Open Source 4.0, the CDC user API present in Scylla 3.2 and Scylla 3.3 is deprecated. Further, with the release of Scylla Open Source 4.0, and ScyllaDB’s policy of only maintaining the two most current releases, Scylla 3.2 will no longer be supported. We plan for CDC to be GA in Scylla Open Source 4.1.

Want to learn more about Change Data Capture in Scylla? Hear more from our team talking about the feature in our on-demand webinar.


The post Observing data changes with Change Data Capture (CDC) appeared first on ScyllaDB.

Scylla Open Source Release 3.2.4

Scylla Open Source Release Notes

The ScyllaDB team announces the release of Scylla Open Source 3.2.4, a bugfix release of the Scylla 3.2 stable branch. Scylla Open Source 3.2.4, like all past and future 3.x.y releases, is backward compatible and supports rolling upgrades.

Please note the latest stable release of Scylla is 3.3 and it is recommended to upgrade to it.

Related links:

Issues fixed in this release

  • CQL: a restricted column that is not in a select clause returns a wrong value #5708
  • Stability: Scylla exit when using Replication Factor (RF) of zero with SimpleStrategy (there is no good reason to do so) #5962
  • Stability: Scylla exit with Out-of-memory error during startup, despite free memory available with many (thousands of) tables #6003
  • Tracing: gets populated even though tracing has been disabled #6014
  • Building centos-rpm changes Scylla’s build-id #5881

The post Scylla Open Source Release 3.2.4 appeared first on ScyllaDB.

Avi Kivity at Core C++ 2019

Along with a countless number of other industry events we recently heard the news that Core C++ 2020 was postponed due to the Novel Coronavirus. With all of the tumultuous news in mind, and wishing the entire open source and C++ developer communities our best, we thought we’d take time to look back at last year’s Core C++ event hosted at the Academic College of Tel-Aviv-Yaffo, where our CTO Avi Kivity presented. As an added bonus, we’d like to share the presentations of community contributors who attended Seastar Summit 2019.

Unlike other talks at the conference Avi was not speaking about features of C++ directly but about the Seastar framework, which lies at the heart of Scylla. It was written in C++ and takes advantage of low-level C++ capabilities. Seastar requires a minimum of C++14, also works with C++17, and even uses some features of C++20.

The case for asynch: thinking of disk I/O and CPU-level communications as networks

Avi began by showing an SSD. He pointed out that 20 µseconds, a typical time needed to communicate with an SSD on a modern NVME device, is a relatively long age. Time enough for the CPU to execute 20,000 instructions. Developers should consider it as a networked device, but generally do not program in that way. Many times they use an API that is synchronous, which produces a thread that can be blocked. When that happens the thread has to be switched out, then eventually switched back in by the kernel, all of which is very expensive. Avi argued no one would use blocking calls with an ordinary networked device. Everything in networking is done asynchronously.

Next Avi presented the image of the logical layout of an Intel Xeon Processor. “This is also a networked device,” he asserted. He pointed out the cores are all connected by what is essentially a network — a dual ring interconnect architecture. There are two such rings and they are bidirectional.

Beyond that, if you have multiple multicore CPUs, they will intercommunicate over another external network. Still most programmers do their inter-core communication work synchronously, locking data and releasing locks. While this doesn’t present much of a problem on a small scale, when done at a large scale all of those locks translate to messages on the internal network that take time to propagate, the basis of cache line contention.

When you try to take a lock and don’t succeed, you then have to repeat the step. Only the kernel puts the thread trying to take the lock to sleep to schedule another thread in its place. Later it is again woken up when the lock is released. This all takes a lot of work and a lot of network messages.

Everything that is difficult around multi-core programming is even harder when you have multiple processors because each of them also has its own memory through Non-Uniform Memory Architecture (NUMA). So if you have one core from one socket trying to communicate with memory attached to another socket, that can take up to 400 cycles to get access to that memory — ages in terms of processing time. By the time you get the response back you could execute hundreds of instructions.

Avi then presented comparisons of how the Seastar framework, which heavily utilizes asynchronous communications between cores, developers can write applications in ways that take advantage of today’s large multicore servers. He compared a Seastar-based implementation of memcached versus stock memcached, as well as Scylla, a C++ rewrite of Cassandra that runs on top of the Seastar framework.

The Seastar framework, which uses asynchronous communications, outperforms stock memcached in both mutli-threaded and multi-process performance.

In this comparison of Scylla vs. Apache Cassandra, the horizontal axis measures operations per second (ops) and the vertical axis measures latencies in µseconds. A Scylla server on three nodes was able to maintain roughly the same number of transactions per second as a 30-node Cassandra database, but with latencies about a third of Cassandra’s.

In a traditional multi-threaded application threads are multiplexed onto a number of cores. If you have a very large number of threads then the kernel has to switch in and out of threads so that each will get its share of CPU time on the core. This switching, putting threads to sleep and then waking them again, is expensive. It induces latency. Oppositely, if you have too few threads, then a core won’t have a thread to execute and the machine will be idle. Because of this, on large multi-CPU, multi-core machines it is common to only see 30% or 40% CPU utilization.

In the Seastar model you have one thread per core so that you have the exact balance of operating processes to hardware. Avi noted “You never have a situation where a core doesn’t have a thread to run.” Nor would you ever have a situation where the core has to multiplex multiple threads.

As Avi observed, “so we solve the problem of assigning threads to cores but of course we gained a number of problems about how those threads can do different tasks that we usually assigned to different threads.”

“We partition the machine into a bunch of replicas which are called shards and each of them run the same code in parallel. Because of that the amount of communication between the shards is minimized. Each of them operates stand-alone yet they have means to communicate with each other via a point to point queue. There is a task scheduler whose role is to provide the abilities we lost when we enforced the one thread per core design.”

Instead of having a large number of threads, each of which perform a single task or a single process, all of the work is multiplexed, controlled via a task scheduler that picks the next task to execute. Avi described it as a shared-nothing run-to-completion scheduler “basically a class with a single virtual function… It takes a queue of tasks and just picks the next one to run and executes it.” While it runs as a first-in-first-out (FIFO) queue, there is also a bit of extra logic for task prioritization to allow different tasks to run at different times.

Shard per logical core architecture

While Avi was referring to “cores,” he noted that technically Scylla is sharded per “logical core,” that is, a hyperthread in the parlance of Intel, or, more generally, simultaneous multithreading (SMT) if speaking about a modern AMD chipset.

With Scylla, each of these virtual cores is assigned its own system memory as well as a shard of data (a range of partition keys). When a core needs to access data assigned to another core, rather than it asking to access it directly, it sends a message to the other virtual core to perform an operation on the data on its behalf.

“This is how we avoid locking,” Avi explained. “There’s only one thread accessing each piece of data. All of the costly locking instructions are avoided. Also, locking becomes much simpler because everything is serialized by the remote core. This avoids all of the internal networking operations that the CPU has to perform under the covers in order to move the data to the cores that execute the operations. Instead of moving the data we move the operation.”

The Seastar APIs are all composable. When you have a networking operation and a disk operation, and remote core operations you can combine them in different ways as enabled by the API. Avi noted this usually isn’t the case. “Usually if you have an asynchronous networking stack but synchronous disk access and synchronous memory access based on locks they don’t combine well. As soon as you take a lock you have to complete everything and then release it otherwise you’re going to cause other threads to contend.” For instance, if you take a lock and then perform a disk operation you will hit a contention. Seastar solves that by making everything asynchronous.

Futures, Promises and Continuations

The building block in Seastar is a Future, Promise and Continuation (FPC) model. Most of the attendees of Core C++ 2019 were familiar with the concept. It was invented back in the 1970s and recently came back into fashion with Node.js. If you want to familiarize yourself more with the concepts, you can read a good blog about it here.

C++ has futures and promises in the standard library. Avi explained, “it’s a very good way to compose different activities. In Seastar it’s even more than a result of some computation; it’s also a result of IO.

Watch the Full Video

Speaking about continuations, all of the above was covered in just the first fifteen minutes of Avi’s hour-long talk. If you want to learn more about the role of futures, promises and continuation in Scylla, and take a deep dive into the technology that lies at the heart of Scylla watch the full video below:

Also feel free to download the slides.


Presentations from Seastar Summit 2019

Avi Kivity was not the only one speaking about Seastar last year. Concurrent with last year’s Scylla Summit, we hosted our first Seastar Summit. Speakers from ScyllaDB, Red Hat, Vectorized and others who are leveraging Seastar to build their own asynchronous distributed applications gathered together to share progress in their work and lessons learned. You can browse their presentations via the links provided below:

Seastar Summit 2019 Keynote — Avi Kivity, ScyllaDB

Futures Under the Hood — Rafael Ávila de Espíndola, ScyllaDB


Seastar metrics — Amnon Heiman, ScyllaDB


Update on Crimson – the Seastarized Ceph — Kefu Chai, Red Hat
ceph::errorator<> throw/catch-free, compile time-checked exceptions for seastar::future<> — Radoslaw Zarzyhski, Red Hat
rpc redux — Alexander Gallego,

If you would like to participate in the Seastar development community, feel free to join our Slack and check out the #seastar channel.

The post Avi Kivity at Core C++ 2019 appeared first on ScyllaDB.