Scylla Monitoring Stack 2.0

Scylla Release

The Scylla team is pleased to announce the release of Scylla Monitoring Stack 2.0

Scylla Monitoring is an open source stack for Monitoring Scylla Enterprise and Scylla Open Source, based on Prometheus and Grafana.  Scylla Monitoring 2.0 stack supports:

  • Scylla Open Source versions 2.1, 2.2, 2.3
  • Scylla Enterprise versions 2017.x and 2018.x
  • Scylla Manager 1.1

Scylla Monitoring 2.0 brings many improvements, both in dashboard usability and the underlying stack, in particular, moving to a new version of Prometheus. Please read the Monitoring upgrade guide carefully before upgrading.

Enterprise users are welcome to contact us for proactive help in the upgrade process.

Open Source users are welcome to use the User Slack or User Mailing list for any questions you may have.

New in Monitoring 2.0

  • Move to Prometheus version 2.3.2.
    Scylla Monitoring stack 1.0 was based on Prometheus 1.x. Moving to
    Prometheus 2.x brings many improvements, mostly in the storage format. Note that Prometheus 2.x  is not backward compatible with Prometheus 1.x, which can make the monitoring stack upgrade process more complex. More here.

  • Support for Multi-cluster and multi-DC dashboards
    The Prometheus target files contain mapping information to map nodes to their respective data centers (DCs) and clusters. You can then use Prometheus to filter charts on the dashboard for either the cluster or DC by choosing DC or Cluster from the drop-down multi-select buttons. This is very useful in cases where you are using one monitoring stack to monitor more than one cluster.

Scylla Monitor 2.0 - selecting DCs

Example from prometheus/scylla_servers.yml, monitoring two clusters (cluster1, cluster2), the first with two DCs (dc1, dc2)

- targets:



cluster: cluster1
dc: dc1

- targets:



cluster: cluster1
dc: dc2

- targets:



cluster: cluster2

The same applies to prometheus/node_exporter_servers.yml, with node_exporter port (9100)

Note that the Monitoring stack uses the data provided in the target file (or service discovery), and not the Scylla cluster topology as presented in “nodetool status”. We plan to fix this gap in a future release. #139

  • Identify nodes by their IP.
    Node IPs are replacing hostname as node identifiers in all dashboards. This unifies the identifiers of Scylla and node_exporter(OS level) metrics.

  • Support for Scylla Open Source 2.3.
    Use -v 2.3 to start with Scylla 2.3 Grafana dashboards. You can use multiple dashboards at the same time, for example, -v 2.2,2.3

    The following dashboards are available:

    • Scylla Overview Metrics 2.3
    • Scylla CPU Per Server Metrics 2.3
    • Scylla Per-Server Disk I/O 2.3
    • Scylla Per Server Metrics 2.3

  • Accept any instance name. Characters such as underscore and colon may be used for instance names for example, “” is a valid instance name. #351

Related Links     


The post Scylla Monitoring Stack 2.0 appeared first on ScyllaDB.

Overheard at Distributed Data Summit 2018

Distributed Data Summit 2018


NoSQL practitioners from across the globe gathered this past Friday, 14 September 2018, in San Francisco for the latest Distributed Data Summit (#dds18). The attendees were as distributed as the data they manage, from the UK to Japan and all points between. While the event began years ago as a purely Apache Cassandra Summit, it currently bills itself as appealing to users of Apache Cassandra “and Friends.” The folks at ScyllaDB are definitely among those friends!

Distributed Data Summit Banner Graphic

Spinning up for the event

Our team’s own efforts kicked off the night before with spinning up a 3-node cluster to show off Scylla’s performance for the show: two million IOPS with 4 ms latency at the 99.9% level. (And note the Average CPU Load pegged at 100%.)

A Hot Keynote


The keynote to the event was delivered by Nate McCall (@zznate), current Apache Cassandra Project Management Committee (PMC) Chair. He laid out the current state of Cassandra 4.0, including updates on its offerings, including Transient Replicas (14404) and Zero Copy Streaming (14556).

Nate McCall (@zznate) delivers keynote at DDS18

Nate acknowledged his own frustrations with Cassandra. For example, the “tick-tock” release model adopted in 2015, named after the practice originally promulgated by Intel. With a slide titled “Tick Toc Legacy: Bad,” Nate was pretty blunt, “I have to sacrifice a chicken [to get git to work].” Since Intel itself moved away from “tick-tock” in 2016, and the Cassandra community decided in 2017 to end tick-tock (coinciding with 3.10) in favor of 6-month release schedules, this is simply a legacy issue, but a painful one.


He also took a rather strident stance on Materialized Views, a feature Nate believes is not-quite-there-yet: “If you have them, take them out.”


When he mentioned the final nails in the coffin for Thrift (deprecated in favor of CQL), a hearty round of applause rose from the audience. Though, as an aside, Scylla will continue to support Thrift for legacy applications.


Nate also highlighted a number of sidecar proposals being made, such as The Last Pickle’s Cassandra Reaper for repairs, plus the Netflix and Datastax proposals.


Besides the specific technical updates, Nate also addressed maturity lifecycle issues that are starting to crop up in Cassandra. For the most part, he praised Cassandra staying close to its open source roots: “The features that are coming out are for users, by users… The marketing team is not involved.” But like Hadoop before it (which was split between Cloudera, Hortonworks and MapR), Cassandra is now witnessing increased divergence emerging between commercial vendors and code committers (such as DataStax and ScyllaDB).

Yahoo! Japan

The ScyllaDB team also had a chance to meet our friends Shogo Hoshii and Murukesh Mohanan. They presented on Cassandra and ScyllaDB at Yahoo! Japan, focusing on the evaluation they performed for exceedingly heavy traffic generated by their pre-existing Cassandra network of 1,700 nodes. Yahoo! Japan is no stranger to NoSQL technology, forming their NoSQL team in 2012. We hope to bring you more of their analyses and results in the future. For now, we’ll leave you with this one teaser slide Dor took during their presentation.


Was Cassandra the Right Base for Scylla?

ScyllaDB CEO Dor Laor asked this fundamental question for his session. If we knew then what we know now, four years ago, would we have still started with Cassandra as a base?


While Scylla mimics the best of Cassandra, it was engineered in C++ to avoid some of the worst of its pitfalls, such as JVM Garbage Collection, making it quite different under the hood.


Dor observed first “what worked well for us?” Particularly, inheriting such items as CQL, drivers, and the whole Cassandra ecosystem (Spark, KairosDB, JanusGraph, Presto, Kafka). Also, Cassandra’s “scale-out” capabilities, high availability and cross-datacenter replication. Backup and restore had reasonable solutions. (Though it is particularly nice to see people write Scylla-specific backup tools like Helpshift’s Scyllabackup.) Cassandra also has a rich data model, with key-value and wide rows. There was so much there with Cassandra. “Even hinted handoffs.” And, not to be left out of the equation, tools like Prometheus and Grafana.


On the other hand, there was a lot that wasn’t so great. Nodetool and JMX do not make a great API. The whole current sidecar debate lays bare that there was no built-in management console or strategy. Configurations based on IP are problematic. Dor’s opinion is that they should have been based on DNS instead. Secondary indexes in Cassandra are local and do not scale. “We fixed it” for Scylla, Dor noted.


Since 2014, a lot has changed in the NoSQL world. Dor cited a number of examples, from CosmosDB, which is multi-model and fully multi-tenant, to Dynamo’s point-in-time backups and streams, to CockroachDB’s ACID guarantees and SQL interface. Some of those features are definitely compelling. But would they have been a better place to start from?


So Dor took the audience on a step-by-step review of the criteria and requirements for what the founding team at ScyllaDB considered when choosing a baseline database.


Fundamental reliability, performance, and the ecosystem were givens for Cassandra. So those became quick checkboxes.


Dor instead focused on the next few bullets. Operational management could be better. Had it been part of Cassandra’s baseline, it would have obviated the need for sidecars.


For data and consistency, Dor cited Baidu’s use of CockroachDB for management of terabytes of data with 50 million inserts daily. But while that sounds impressive, a quick bit of math reveals that only equates to 578 operations per second.


While SSTables 3.0 provide a significant storage savings compared to 2.0, where Cassandra needs to improve in terms of storage and efficiency is to adopt a tiered-storage model (hot vs. cold storage).


“Cloud-native” is a term for which a specific definition is still quite arguable, especially when it comes to databases. Though there have been a few good takes at the topic, the term itself can still mean different things to different people, never mind the implementations. For instance, Dor brought up the example of what might happen if a split-brain divides the database and your Kubernetes (“k8s”) differently. Dor argued that Scylla/Cassandra’s topology awareness is better than Kubernetes, but the fact that they are different at all means there needs to be a reconciliation to heal the split-brain.


With regards to multi-tenancy, Dor saw the main problem here as one of idle resources, exacerbated by cloud vendors all-too-happy to over-provision. The opportunity here is to encapsulate workloads and keyspaces per tenant, and to provide full isolation, including system resources (CPU, memory, disk, network), security, and so on — without over-provisioning, and also while handling hot partitions well. It would require a single cluster for multiple tenants, consolidating all idle resources. Ideally, such a multi-tenant cluster would permit scale-outs in a millisecond. Dor also emphasized that there is a great need to define better SLAs that focus on real customer workloads and multi-tenancy needs.


So was Cassandra the right baseline to work from? Those who attended definitely got Dor’s opinion. But if you couldn’t attend, don’t worry! We plan on publishing an article delving into far more depth on this topic in the near future.

Thread per Core


ScyllaDB CTO Avi Kivity flew in from Israel to present on Scylla’s thread-per-core architecture.


The background to his talk was the industry momentum towards ever-increasing core counts. Utilizing these high core count machines is getting increasingly more difficult. Lock contention. Optimizing for NUMA and NUCA. Asymmetric architectures, which can lead to over- or under-utilization extremes.


Yet there are compelling reasons to deal with such difficulties to achieve significant advantages. Lowered mean time between failures (MTBFs) and reduced management burdens by dealing with orders of magnitude fewer machines. The use of far less space in your rack. Less switch ports. And, of course, a reduction in overall costs.


So, Avi asked, “How do you get the most from a large fat node?” Besides system architecture, there are also considerations within the core itself. “What is the right number of threads” to run on a single core? If you run too few, you can end up with underutilization. If you run too many, you run into the inverse problems of thrashing, lock contention and high latencies.

The natural fit for this issue is a thread-per-core approach. Data has traditionally been partitioned across nodes. This simply drills the model down to a per-core basis. It avoids all locking and kernel scheduling. And it scales up linearly. However, if you say “no kernel scheduling,” you have to take on all scheduling yourself. And everything has to be asynchronous.

In fact, each thread has to be able to perform all tasks. Reads. Writes. Compactions. Repairs. And all administrative functions. The thread also has to be responsible for balancing CPU utilization across these tasks, based on policies, and with sufficient granularity. Thus, the thread has to be able to preempt any and every task. And, finally, threads have to work on separate data to avoid any locking—a true “shared-nothing” architecture.

Avi took us on a tour of a millisecond in a thread’s life. “We maintain queues of tasks we want to run.” This “gives us complete control of what we want to run next.” You’ll notice the thread maintains preemption and poll points, to occasionally change its behavior. Every computation must be preemptable at sub-millisecond resolution.

Whether you were in the middle of SSTable reads or writes, compactions, intranode partitioning, replicating data or metadata or mutating your mutable data structures, “Still you must be able to stop at any point and turn over the CPU.”

Being able to implement this low-level thread-per-core architecture brought significant advantages to Scylla. In one use case, it allowed the contraction from 120 Cassandra i3.2xl nodes to just three (3) i3.16xl nodes for Scylla. Such a reduction in nodes maintained the exact same number of cores, yet required significantly lower administrative overhead.

OLTP or Analytics? Why not Both?

In a second talk, Avi asked the question that has been plaguing the data community for years.

Can databases support parallel workloads with conflicting demands? OLTP workloads asks for the fastest response time, high cache utilization, and has a random access pattern. OLAP on the other hand has a batchy nature, latency is less important while throughput and efficient processing is paramount. The workload can scan large amount of data and access them once, thus the data shouldn’t be cached.

As Chad Jones of Deep Information Sciences quipped at his Percona Live 2016 keynote, “Databases can’t walk and chew gum at the same time.” He observed you can optimize databases for reads, or writes, but not both. At least, that was his view in 2016.

So how do you get a single system to provide both Online Transaction Processing (OLTP) and Online Analytical Processing (OLAP)? Avi noted that workloads can compete with each other. Writes, reads, batch workloads, and even internal maintenance tasks like compactions or repairs can all dominate a system if left to their own devices.

Prior practices with Cassandra have focused on throttling, configuring maximum throughput. But this has often been a manual tuning effort, which could leave systems idle at certain points, while being unable to shift system resources during peak activities.

Finally, after years of investment in foreground and background workload isolation Scylla is able to provide different SLA guarantee for the end-user workloads.

Avi took the audience through the low-level methods Scylla, built on the Seastar scheduler, can allow users to create new scheduling classes, and thus assign relative resource shares. For example, imagine a role defined as follows:


CREATE ROLE analytics

   WITH LOGIN = true

   AND SERVICE_LEVEL = { ‘shares’: 200 };

This would create an analytics role that would be constrained to a share of system resources. Note that the rule limits the analytics role to portion of the resources, however, this is not a hard cap. In case there are enough idle resources the analytics workload can go beyond its share and thus get the best of all worlds — utilization for OLAP and low latency for OLTP.

Avi then shared the results of two experiments, creating an oltp and analytics user to simulate performance under different scenarios. In the second more-realistic scenario, Avi outlined setting a latency-sensitive online workload that would consume 50% of CPU, versus a batch load with three times the thread count that would try to achieve as much throughput as it could get.

The results of his experiments were promising, the oltp workload continued to perform exactly as before, despite the 3X competing workload from analytics.

Having a single datacenter that handles operational and Spark (analytics) workload is a huge advance by Scylla. There is no more need to reserve an entire datacenter together with 1x-3x data duplication for Spark. If you want to hear more about this novel ability, we will be covering it at the Scylla Summit in November.

Looking Towards an Official Cassandra Sidecar, Netflix

Focusing on Cassandra itself, there were a number of very interesting talks. One in particular by the team at Netflix on “Looking towards an Official C* Sidecar.” They’ve kindly shared their slides after the event. Netflix is no stranger to sidecars, being the home of Priam (first committed to github in 2011), as well as sidecars for other services like Raigad for Elasticsearch and Prana. We at ScyllaDB have our own vision on a Scylla sidecar, in the form of Scylla Manager, but it is vital to hear the views of major stakeholders in the Cassandra community like Netflix. If you didn’t attend DDS18, this is definitely a talk to check out! See You at Scylla Summit! It was great to meet so many of you at Distributed Data Summit. Let’s keep those conversations going. We’re looking forward to seeing you at our own event coming up in November, Scylla Summit 2018. There will be hands-on training plus in-depth talks on a wide range of technical subjects, practical use cases, and lessons-learned by some of the industry’s leading practitioners. Don’t delay! Register today!

The post Overheard at Distributed Data Summit 2018 appeared first on ScyllaDB.

Scylla Open Source Release 2.3

Scylla Release

The Scylla team is pleased to announce the release of Scylla 2.3, a production-ready Scylla Open Source minor release.

The Scylla 2.3 release includes CQL enhancements, new troubleshooting tools, performance improvements and more. Experimental features include Materialized Views, Secondary Indexes, and Hinted Handoff (details below). Starting from Scylla 2.3, packages are also available for Ubuntu 18.04 and Debian 9.

Scylla is an open source, Apache Cassandra-compatible NoSQL database, with superior performance and consistently low latency. Find the Scylla 2.3 repository for your Linux distribution here.

Our open source policy is to support only the current active release and its predecessor. Therefore, with the release of Scylla 2.3, Scylla 2.2 is still supported, but Scylla 2.1 is officially retired.

Related Links     

New Distribution Support

Scylla packages are now available for:

AWS – Scylla AMI

The Scylla 2.3 AMI is now optimized for i3.metal as well. Read more on the performance gains of using bare-metal i3.metal compared to virtualized i3.16xlarge.


  • CQL: Identify Large Partitions. One of the common anti-patterns in Scylla is large partitions. Recent releases addressed this issue and the handling of large partitions was greatly improved. However, we still had to solve issues where reading a large partition is less efficient and does not scale as well. With this release, Scylla now identifies large partitions and makes the information available in a system table. This allows you to identify large partitions, giving you the ability to fix them. The threshold for large partitions can be set in scylla.yaml:

compaction_large_partition_warning_threshold_mb parameter (default 100MB)

Example:  SELECT * FROM system.large_partitions;

> More on large partitions support in scylla 2.3

  • CQL Tracing: Added prepared statement parameters #1657
  • Scyllatop – The Scyllatop tool, originally introduced with the Scylla Collectd API, is now using the Scylla Prometheus API, same as the Scylla Monitoring Stack. Starting from Scylla 2.3, Collectd is not installed by default, but the Collectd API is still supported.  #1541 #3490
  • iotune v2. Iotune is a storage benchmarking tool that runs as part of the scylla_setup script. Iotune runs a short benchmark on the Scylla storage and uses the results to set the Scylla io_properties.yaml configuration file (formerly called io.conf). Scylla uses these settings to optimize I/O performance, specifically through setting max storage bandwidth and max concurrent requests.The new iotune output matches the new IO scheduler configuration, is time-limited (2 minutes) and produces more consistent results than the previous version.
  • Python re-write: All scripts, previously written in bash, such as scylla_setup, were re-written in Python. This change does not have any visible effect but will make future enhancements easier.  See Google Bash Style Guide on the same subject.

New Features in Scylla 2.3

  • CQL: Datetime Functions Support. Scylla now includes support for the following functions:


Example: SELECT * FROM myTable WHERE date >= currentDate() - 2d

  • CQL: JSON Function Support: Scylla now includes support for: JSON operations, SELECT JSON, INSERT JSON, and the functions: toJson() and fromJson(). It is compatible with Apache Cassandra 2.2, with the exception of tuples and user-defined types. #3708Example:

   a text PRIMARY KEY,
   b timestamp);

INSERT INTO test JSON '{  "a" : "Sprint", "b" : "2011-02-03 04:05+0000" }';

  • CQL: Different timeouts for reads and range scans can now be set #3013
  • CQL: TWCS support for using millisecond values in timestamps #3152 (used by KairosDB and others)
  • Storage: Scylla now supports Apache Cassandra 2.2 file format (la sstable format). Native support for Apache Cassandra 3.0 file format is planned for the next major Scylla release.

Performance Improvements in Scylla 2.3

We continue to invest in increasing Scylla throughput and in reducing latency, focusing on reducing high percentile latencies.

  • Dynamic controllers for additional compaction strategies. A dynamic controller for Size Tiered Compaction Strategy was introduced in Scylla 2.2. In Scylla 2.3, we have added controllers for Leveled Compaction Strategy and Time Window Compaction Strategy.
  • Enhanced tagging for scheduling groups to improve performance isolation

Experimental Features

You are welcome to try the following features on a test environment. We welcome your feedback.

  • Materialized Views (MV)
    With Scylla 2.3, MV is feature-compatible with Cassandra 3.0, but is not considered production ready. In particular, the following are part of the release:
    • Creating a MV based on any subset of columns of the base table, including the primary key columns
    • Updating a MV for base table DELETE or UPDATE.
    • Indexing of existing data when creating an MV
    • Support for MV hinted handoff
    • Topology changes, and migration of MV token ranges
    • Repair of MV tables
    • Sync of TTL between a base table and an MV table
    • nodetool viewbuildstatus

The following MV functions are not available in Scylla 2.3:

    • Backpressure – cluster may be overloaded and fail under a write workload with MV/SI – planned for the next release
    • Unselected columns keep MV row alive #3362, CASSANDRA-13826 – planned for the next release
    • MV on static and collection columns
  • Secondary Indexes (SI)
    Unlike Apache Cassandra,
    Scylla’s SI is based on MV. This means every secondary index creates a materialized view under the hood, using all the columns of the original base table’s primary key, and the required indexed columns.The following SI functions are *not* available in Scylla 2.3:
    • Intersection between more than one SI and between an SI and a Partition Key – planned for the next release
    • Support for ALLOW FILTERING with a secondary index – planned for the next release
    • Paging support
    • Indexing of Static and Collection columns (same as for MV above)

Metrics Updates from Scylla 2.2 to Scylla 2.3

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.

The post Scylla Open Source Release 2.3 appeared first on ScyllaDB.

Assassinate - A Command of Last Resort within Apache Cassandra

The nodetool assassinate command is meant specifically to remove cosmetic issues after nodetool decommission or nodetool removenode commands have been properly run and at least 72 hours have passed. It is not a command that should be run under most circumstances nor included in your regular toolbox. Rather the lengthier nodetool decommission process is preferred when removing nodes to ensure no data is lost. Note that you can also use the nodetool removenode command if cluster consistency is not the primary concern.

This blog post will explain:

  • How gossip works and why assassinate can disrupt it.
  • How to properly remove nodes.
  • When and how to assassinate nodes.
  • How to resolve issues when assassination attempts fail.

Gossip: Cassandra’s Decentralized Topology State

Since all topological changes happen within Cassandra’s gossip layer, before we discuss how to manipulate the gossip layer, let’s discuss the fundamentals of how gossip works.

From Wikipedia:

A gossip (communication) protocol is a procedure or process of computer-computer communication that is based on the way social networks disseminate information or how epidemics spread… Modern distributed systems often use gossip protocols to solve problems that might be difficult to solve in other ways, either because the underlying network has an inconvenient structure, is extremely large, or because gossip solutions are the most efficient ones available.

The gossip state within Cassandra is the decentralized, eventually consistent, agreed upon topological state of all nodes within a Cassandra cluster. Cassandra gossip heartbeats keep the topological gossip state updated, are emitted via each node in the cluster, and contain the following information:

  • What that node’s status is, and
  • What its neighbors statuses’ are.

When a node goes offline the gossip heartbeat will not be emitted and the node’s neighbors will detect that the node is offline (with help from an algorithm which is tuned by the phi_convict_threshold parameter as defined within the cassandra.yaml), and the neighbor will broadcast an updated status saying that the neighbor node is unavailable until further notice.

However, as soon as the node comes online, two things will happen:

  1. The revived node will:
    • Ask a neighbor node what the current gossip state is.
    • Modify the received gossip state to include its own status.
    • Assume the modified state as its own.
    • Broadcast the new gossip state across the network.
  2. A neighbor node will:
    • Discover the revived node is back online, either by:
      • First-hand discovery, or
      • Second-hand gossiping.
    • Update the received gossip state with the new information.
    • Modify the received gossip state to include its own status.
    • Assume the modified state as its own.
    • Broadcast the new gossip state across the network.

The above gossip protocol is responsible for the UN/DN, or Up|Down/Normal, statuses seen within nodetool status and is responsible for ensuring requests and replicas are properly routed to the available and responsible nodes, among other tasks.

Differences Between Assassination, Decommission, and Removenode

There are three main commands used to take a node offline: assassination, decommission, and removenode. Having the node be in the LEFT state ensures that each node’s gossip state will eventually be consistent and agree that:

  • The deprecated node has in fact been deprecated.
  • The deprecated node was deprecated after a given timestamp.
  • The deprecated token ranges are now owned by a new node.
  • Ideally, the deprecated LEFT stage will be automatically purged in 72 hours.

Underlying Actions of Decommission and Removenode on the Gossip Layer

When nodetool decommission and nodetool removenode commands are run, we are changing the state of the gossip layer to the LEFT state for the deprecated node.

Following the gossip protocol procedure in the previous section, the LEFT status will spread across the cluster as the the new truth since the LEFT status has a more recent timestamp than the previous status.

As more nodes begin to assimilate the LEFT status, the cluster will ultimately reach consensus that the deprecated node has LEFT the cluster.

Underlying Actions of Assassination

Unlike nodetool decommission and nodetool removenode above, when nodetool assassinate is used we are updating the gossip state to the LEFT state, then forcing an incrementation of the gossip generation number, and updating the application state to the LEFT state explicitly, which will then propagate as normal.

Removing Nodes: The “Proper” Way

When clusters grow large, an operator may need to remove a node, either due to hardware faults or horizontally scaling down the cluster. At that time, the operator will need to modify the topological gossip state with either a nodetool decommission command for online nodes or nodetool removenode for offline nodes.

Decommissioning a Node: While Saving All Replicas

The typical command to run on a live node that will be exiting the cluster is:

nodetool decommission

The nodetool decommission command will:

  • Extend certain token ranges within the gossip state.
  • Stream all of the decommissioned node’s data to the new replicas in a consistent manner (the opposite of bootstrap).
  • Report to the gossip state that the node has exited the ring.

While this command may take a while to complete, the extra time spent on this command will ensure that all owned replicas are streamed off the node and towards the new replica owners.

Removing a Node: And Losing Non-Replicated Replicas

Sometimes a node may be offline due to hardware issues and/or has been offline for longer than gc_grace_seconds within a cluster that ingests deletion mutations. In this case, the node needs to be removed from the cluster while remaining offline to prevent “zombie data” from propagating around the cluster due to already expired tombstones, as defined by the gc_grace_seconds window. In the case where the node will remain offline, the following command should be run on a neighbor node:

nodetool removenode $HOST_ID

The nodetool removenode command will:

  • Extend certain token ranges within the gossip state.
  • Report to the gossip state that the node has exited the ring.
  • Will NOT stream any of the decommissioned node’s data to the new replicas.

Increasing Consistency After Removing a Node

Typically a follow up repair is required in a rolling fashion around the data center to ensure each new replica has the required information:

nodetool repair -pr

Note that:

  • The above command will only repair replica consistencies if the replication factor is greater than 1 and one of the surviving nodes contains a replica of the data.
  • Running a rolling repair will generate disk, CPU, and network load proportional to the amount of data needing to be repaired.
  • Throttling a rolling repair by repairing only one node at a time may be ideal.
  • Using Reaper for Apache Cassandra can schedule, manage, and load balance the repair operations throughout the lifetime of the cluster.

How We Can Detect Assassination is Needed

In either of the above cases, sometimes the gossip state will continue to be out of sync. There will be echoes of past statuses that claim not only the node is still part of the cluster, but it may still be alive. And then missing. Intermittently.

When the gossip truth is continuously inconsistent, nodetool assassinate will resolve these inconsistencies, but should only be run after nodetool decommission or nodetool removenode have been run and at least 72 hours have passed.

These issues are typically cosmetic and appear as similar lines within the system.log:

2014-09-26 01:26:41,901 DEBUG [Reconnection-1] Cluster - Failed reconnection to /172.x.y.zzz:9042 ([/172.x.y.zzz:9042] Cannot connect), scheduling retry in 600000 milliseconds

Or may appear as UNREACHABLE within the nodetool describecluster output:

Cluster Information:
        Name: Production Cluster
       Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
       Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
       Schema versions:
              65e78f0e-e81e-30d8-a631-a65dff93bf82: [172.x.y.z]
              UNREACHABLE: [172.x.y.zzz]

Sometimes you may find yourself looking even deeper and spot the deprecated node within nodetool gossipinfo months after removing the node:

TOKENS: not present

Note that the LEFT status should stick around for 72 hours to ensure all nodes come to the consensus that the node has been removed. So please don’t rush things if that’s the case. Again, it’s only cosmetic.

In all of these cases the truth may be slightly outdated and an operator may want to set the record straight with truth-based gossip states instead of cosmetic rumor-filled gossip states that include offline deprecated nodes.

How to Run the Assassination Command

Pre-2.2.0, operators used to have to use Java MBeans to assassinate a token (see below). Post-2.2.0, operators can use the nodetool assassinate method.

From an online node, run the command:

nodetool assassinate $IP_ADDRESS

Internally, the nodetool assassinate command will execute the unsafeAssassinateEndpoint command over JMX on the Gossiper MBean.

Java Mbeans Assassination

If using a version of Cassandra that does not yet have the nodetool assassinate command, we’ll have to rely on jmxterm.

You can use the following command to download jmxterm:


Then we’ll want to use the Gossiper MBean and run the unsafeAssassinateEndpoint command:

echo "run -b unsafeAssassinateEndpoint $IP_TO_ASSASSINATE" \
    | java -jar jmxterm-1.0.0-uber.jar -l $IP_OF_LIVE_NODE:7199

Both of the assassination commands will trigger the same MBean command over JMX, however the nodetool assassinatecommand is preferred for its ease of use without additional dependencies.

Resolving Failed Assassination Attempts: And Why the First Attempts Failed

When clusters grow large enough, are geospatially distant enough, or are under intense load, the gossip state may become a bit out of sync with reality. Sometimes this causes assassination attempts to fail and while the solution may sound unnerving, it’s relatively simple once you consider how gossip states act and are maintained.

Because gossip states can be decentralized across high latency nodes, sometimes gossip state updates can be delayed and cause a variety of race conditions that may show offline nodes as still being online. Most times these race conditions will be corrected in relatively short-time periods, as tuned by the phi_convict_threshold within the cassandra.yaml (between a value of 8 for hardware and 12 for virtualized instances). In almost all cases the gossip state will converge into a global truth.

However, because gossip state from nodes that are no longer participating in gossip heartbeat rounds do not have an explicit source and are instead fueled by rumors, dead nodes may sometimes continue to live within the gossip state even after calling the assassinate command.

To solve these issues, you must ensure all race conditions are eliminated.

If a gossip state will not forget a node that was removed from the cluster more than a week ago:

  • Login to each node within the Cassandra cluster.
  • Download jmxterm on each node, if nodetool assassinate is not an option.
  • Run nodetool assassinate, or the unsafeAssassinateEndpoint command, multiple times in quick succession.
    • I typically recommend running the command 3-5 times within 2 seconds.
    • I understand that sometimes the command takes time to return, so the “2 seconds” suggestion is less of a requirement than it is a mindset.
    • Also, sometimes 3-5 times isn’t enough. In such cases, shoot for the moon and try 20 assassination attempts in quick succession.

What we are trying to do is to create a flood of messages requesting all nodes completely forget there used to be an entry within the gossip state for the given IP address. If each node can prune its own gossip state and broadcast that to the rest of the nodes, we should eliminate any race conditions that may exist where at least one node still remembers the given IP address.

As soon as all nodes come to agreement that they don’t remember the deprecated node, the cosmetic issue will no longer be a concern in any system.logs, nodetool describecluster commands, nor nodetool gossipinfo output.

Recap: How To Properly Remove Nodes Completely

Operators shouldn’t opt for the assassinate command as a first resort for taking a Cassandra node out since it is sledgehammer and most of the time operators are dealing with a screw.

However, when operators follow best practices and perform a nodetool decommission for live nodes or nodetool removenode for offline nodes, sometimes lingering cosmetic issues may lead the operator to want to keep the gossip state consistent.

After at least a week of inconsistent gossip state, nodetool assassinate or the unsafeAssassinateEndpoint command may be used to remove deprecated nodes from the gossip state.

When a single assassination attempt does not work across an entire cluster, sometimes the assassination attempt needs to be attempted multiple times on all node within the cluster simultaneously. Doing so will ensure that each node modifies its own gossip state to accurately reflect the deprecated node’s absence within the gossip state as well as ensuring no node will further broadcast rumors of a false gossip state.

Scylla Summit Preview: Keeping Your Latency SLAs No Matter What!

Scylla Summit 2018: Keeping Your Latency SLAs No Matter What!

In the run-up to Scylla Summit 2018, we’ll be featuring our speakers and providing sneak peaks at their presentations. The first interview in this series is with ScyllaDB’s own Glauber Costa.

Glauber, your talk is entitled “Keeping Your Latency SLAs No Matter What!” How did you come up with this topic?

Last year I gave a talk at Scylla Summit where I unequivocally stated that we view latency spikes as a bug. If they are a bug, that means we should fix them so I also talked about some of the techniques that we used to do that.

But you know, there are some things that are by design never truly finished and the more you search, the more you find. This year my team and I poured a lot more work into finding even more situations where latency creeps up, and we fixed those too. So I figured the Summit would be a great time to update the community on the improvements we have made in this area.

What do you believe are the hardest elements to maintain in latency SLAs?

The hardest part of keeping latencies under control is that events that cause latency spikes can occur anytime, anywhere. For Java-based systems we are already familiar with the much infamous garbage collection, that Scylla gracefully solves by being written in C++. But the hardware can introduce latencies, the OS Kernel can introduce latencies, and even in the database itself they can come from the most unpredictable of places. It’s a battle against everyone.

What is a war story you’re free to share of a deployment that just wasn’t keeping its SLAs?

That’s actually a good question and a nice opportunity to show that SLAs are indeed complex beasts that can come from anywhere. We have a customer that had very strict SLAs for their p99 and those weren’t always being met. After much investigation we determined that the source of those latencies were not Scylla itself, but the Linux Kernel (in particular the XFS filesystem). Thankfully we have in our bench a lot of people who know Linux deeply, having contributed to the kernel for more than a decade. We were able to then understand the problem and work on a solution ourselves.

That’s interesting! Is that the kind of thing people are expected to learn by coming to the talk?

Yes, I will cover that. I want to show people a 360º view of the work we do in Scylla to keep latencies low and predictable. Not all of that work is done in the Scylla codebase. The majority of course is, but it ends up sprawling down all the way to the Linux Kernel. But this won’t be a Linux talk! We have many interesting pieces of technology that help us keep our latencies consistently low in Scylla, and I will be talking about themas well. For example, we redesigned our I/O Scheduler, we finalized our CPU Scheduler, added full controllers for all compaction strategies, and also took a methodical approach to find sources of latency spikes and get rid of them. It will be certainly a very extensive talk!

Thanks Glauber! We’re looking forward to your talk at the Summit!

It’s my pleasure! By the way, if anyone reading this article hasn’t registered yet, you can register with the code glauber25monster to get 25% off the current price.

The post Scylla Summit Preview: Keeping Your Latency SLAs No Matter What! appeared first on ScyllaDB.

Why We Built an Open Source Cassandra-Operator to Run Apache Cassandra on Kubernetes

As Kubernetes becomes the de facto for container orchestration, more and more developers (and enterprises) want to run Apache Cassandra on Kubernetes. It’s easy to get started with this – especially considering the capabilities that Kubernetes’ StatefulSets bring to the table. Kubernetes, though, certainly has room to improve when it comes to storing data in-state and understanding how different databases work.

For example, Kubernetes doesn’t know if you’re writing to a leader or a follower database, to a multi-sharded leader infrastructure, or to a single database instance. StatefulSets – workload API objects used to manage stateful applications – offer the building blocks required for stable unique network identifiers, stable persistent storage, ordered and smooth deployment and scaling, deletion and termination, and automated rolling updates. However, while getting started with Cassandra on Kubernetes might be easy, it can still be a challenge to run and manage (and running Docker is another challenge in itself).


To overcome some of these hurdles, we decided to build an open source Cassandra-operator that runs and operates Cassandra within Kubernetes. Think of it as Cassandra-as-a-Service on top of Kubernetes. We’ve made this Cassandra-operator open source and freely available on GitHub. It remains a work in progress between myself, others on my team, and a number of partner contributors – but it is functional and ready for use. The Cassandra-operator supports Docker images, which are open source and available as well (via the same link).


This Cassandra-operator is designed to provide “operations-free” Cassandra: it takes care of deployment and allows users to manage and run Cassandra, in a safe way, within Kubernetes environments. It also makes it simple to utilize consistent and reproducible environments.


While it’s possible for developers to build scripts for managing and running Cassandra on Kubernetes, the Cassandra-operator offers the advantage of providing the same consistent reproducible environment, as well as the same consistent reproducible set of operations through different production clusters. And this is true across development, staging, and QA environments. Furthermore, because best practices are already built into the operator, development teams are spared from operational concerns and are able to focus on their core capabilities.


What is a Kubernetes operator?


A Kubernetes operator consists of two components: a controller and a custom resource definition (CRD). The CRD allows devs to create Cassandra objects in Kubernetes. It’s an extension of Kubernetes that allows us to define custom objects or resources using Kubernetes that our controller can then listen to for any changes to the resource definition. Devs can define an object in Kubernetes that contains configuration options for Cassandra, such as cluster name, node count, jvm tuning options, etc. – all the information you want to give Kubernetes about how to deploy Cassandra.


You can isolate the Cassandra-operator to a specific Kubernetes namespace, define what kinds of persistent volumes it should use, and more. The Cassandra-operator controller listens to state changes on the Cassandra CRD and will create its own StatefulSets to match those requirements. It will also manage those operations and can ensure repairs, backups, and safe scaling as specified via the CRD. In this way, it leverages the Kubernetes concept of building controllers upon other controllers in order to achieve intelligent and helpful behaviours.


How does it work?

Architecturally, the Cassandra controller itself connects to the Kubernetes Master. It listens to state changes and manipulates Pod definitions and CRDs. It then deploys those, waits for changes to occur, and repeats until the entirety of necessary changes is fully completed.


The Cassandra controller can, of course, perform operations within the Cassandra cluster itself. For example, want to scale down your Cassandra cluster? Instead of manipulating the StatefulSet to handle this task, the controller will first see the CRD change. The node count will change to a lower number (say from six to five). The controller will get that state change, and it will first run a decommission operation on the Cassandra node that’s going to be removed. This ensures that the Cassandra node stops gracefully and that it will redistribute and rebalance the data it held across the remaining nodes. Once the Cassandra controller sees that this has happened successfully, it will modify that StatefulSet definition to allow Kubernetes to finally decommission that particular Pod. Thus, the Cassandra controller brings needed intelligence to the Kubernetes environment to run Cassandra properly and ensure smoother operations.


As we continue this project and iterate on the Cassandra-operator, our goal is to add new components that will continue to expand the tool’s features and value. A good example is the Cassandra SideCar (included in the diagram above), which will begin to take responsibility for tasks like backups and repairs. Current and future features of the project can be viewed on GitHub. Our goal for the Cassandra-operator is to give devs a powerful open source option for running Cassandra on Kubernetes with a simplicity and grace that has not yet been all that easy to achieve.


Ben Bromhead is CTO at Instaclustr, which provides a managed service platform of open source technologies such as Apache Cassandra, Apache Spark, Elasticsearch and Apache Kafka.

The post Why We Built an Open Source Cassandra-Operator to Run Apache Cassandra on Kubernetes appeared first on Instaclustr.

Large Partitions Support in Scylla 2.3 and Beyond

Scylla 2.3 helps find your large partitions

Large partitions, although supported by Scylla, are also well known for causing performance issues. Fortunately, release 2.3 comes with a helping hand for discovering and investigating large partitions present in a cluster — system.large_partitions table.

Large partitions

CQL, as a data modeling language, aims towards very good readability and hiding unneeded implementation details from users. As a result, sometimes it’s not clear why a very simple data model suffers from unexpected performance problems. One of the potential suspects might be large partitions. Our blog entry on large partitions contains a detailed explanation on why coping with large partitions is important. We’ll use some of the same example tables from this article below.

The following table could be used in a distributed air quality monitoring system with multiple sensors:

CREATE TABLE air_quality_data (
   sensor_id text,
   time timestamp,
   co_ppm int,
   PRIMARY KEY (sensor_id, time)

With time being our table’s clustering key, it’s easy to imagine that partitions for each sensor can grow very large – especially if data is gathered every couple of milliseconds. Given that there is a hard limit on the number of clustering rows per partition (2 billion) this innocent looking table can eventually become unusable. In this example, in about 50 days.

A standard solution is to amend the data model to reduce the number of clustering keys per partition key. In this case, let’s take a look at amended table air_quality_data:

CREATE TABLE air_quality_data (
   sensor_id text,
   date text,
   time timestamp,
   co_ppm int,
   PRIMARY KEY ((sensor_id, date), time)

After the change, one partition holds the values gathered in a single day, which makes it less likely to overflow.

system.large_partitions table

Amending the data model may help with large partition issues. But sometimes you have such issues without realizing it. It’s useful to be able to see which tables have large partitions and how many of them exist in a cluster.

In order to track how many large partitions are created and to which table they belong, one can use the system.large_partitions table, which is implicitly created with the following schema:

CREATE TABLE system.large_partitions (
keyspace_name text,
table_name text,
sstable_name text,
partition_size bigint,
partition_key text,
compaction_time timestamp,
PRIMARY KEY ((keyspace_name, table_name), sstable_name, partition_size, partition_key)

) WITH CLUSTERING ORDER BY (sstable_name ASC, partition_size DESC, partition_key ASC);

How it works

Partitions are written to disk during memtable flushes and compaction. If, during any of this action, a large partition is written, an entry in system.large_partitions will be created (or updated). It’s important to remember that large partition information is updated when a row is actually written to disk; changes might not be visible immediately after acknowledging a write operation by the user, since data could still reside in a memtable for some time.

Each entry in system.large_partitions table represents a partition written to a given sstable. Note that large_partitions table is node-local – querying it will return large partition information only for the node that serves the request.

Listing all local large partition info can be achieved with:

SELECT * FROM system.large_partitions;

Checking large partitions for a specific table:

SELECT * FROM system.large_partitions WHERE keyspace_name = 'ks' and table_name = 'air_quality_data';

Listing all large partitions in a given keyspace that exceeded 140MB:

SELECT * FROM system.large_partitions WHERE partition_size > 146800640 ALLOW FILTERING; *

  • Note: ALLOW FILTERING support is not part of 2.3; it will be present in the next release

Listing all large partitions compacted today:

SELECT * FROM system.large_partitions WHERE compaction_time >= toTimestamp(currentDate()) ALLOW FILTERING; *

  • Note: ALLOW FILTERING support is not part of 2.3; it will be present in the next release

Since system.large_partitions can be read just like a regular CQL table, there are many more combinations of queries that return helpful results. Remember that keyspace_name and table_name act as the partition key, so some more complex queries, like the last example above, may involve filtering (hence the appended ALLOW FILTERING keywords). Filtering support for such queries is not part of 2.3 and will be available in the next release.

Aside from table name and size, system.large_partitions contains information on the offending partition key, when the compaction that led to the creation of this large partition occurred, and its sstable name (which makes it easy to locate its filename).


For both readability and performance reasons, not all partitions are registered in system.large_partitions table. The threshold can be configured with an already existing parameter in scylla.yaml:

compaction_large_partition_warning_threshold_mb: 100

Previously, this configuration option was used to trigger a warning and logged each time a large-enough partition was written.

The large partition warning threshold defaults to 100MiB, which implies that each larger partition will be registered into system.large_partitions table the moment it’s written, either because of memtable flush or as a result of compaction.

If the default value is not sufficient for a specific use case, e.g. even 1MiB partitions are considered “too big” or, conversely, virtually every partition is bigger than 100MiB, it you can modify compaction_large_partition_warning_threshold_mb accordingly.

Disabling system.large_partitions can effectively be done by setting the threshold to an extremely high value, say, 500GiB. However, it’s highly recommended to leave it at a reasonable level. Better safe than sorry.

In order to prevent stale data from appearing in system.large_partitions, each record is inserted with time-to-live of 30 days.


We promised back in 2016 for Release 1.3 that we’d continue to improve support for large partitions. This improvement for 2.3 is a follow-through on that commitment. As you can see, we already have some next-steps planned out with future support for ALLOW FILTERING.

For now, we’d like for you to try system.large_partitions, and let us know what you find. Are you already aware of large partitions in your database, or did it help you discover anything about your data you didn’t already know?

If large partitions are critical to you, feel free to contact us with your war stories and requirements, or bring them up when you see us at Scylla Summit this November.

The post Large Partitions Support in Scylla 2.3 and Beyond appeared first on ScyllaDB.

The 5 Things You Need to Know About Distributed Cloud Databases

Customer expectations are fast evolving based on companies who leverage cloud innovation. Key to being able to meet customer expectations is having a distributed cloud database to drive your applications. The faster and more reliable your applications are, the more seamless the customer experience. But it’s not all about speed. It’s also about providing a highly personalized interaction that makes your customers feel as if you anticipate their needs and tailor your services to their unique preferences.

Distributed cloud databases are databases where the operational data is spread across various physical locations — data centers, hybrid clouds, public cloud regions or different public clouds. They may be a native service within a public cloud provider or from a cloud-agnostic software vendor.

These databases are critical to the success of your applications. Why?

Here are five features of applications that are powered by distributed cloud databases. Make sure any cloud database provider you consider supports these functionalities:

1. Relevancy

Applications have to be able to provide contextual logic to customize an experience for the customer. Your cloud database needs to correlate customer transaction data across all touchpoints to create an individualized customer experience. This takes the shape of recommendations and curating products or solutions that would most likely resonate with the customer.

2. Availability

It used to be that 99.99% uptime was acceptable. Not anymore. Today’s customers expect 100% uptime and your business can’t afford to fail on this. Any downtime sends customers to your competitors—and they stay where they find available service. Your cloud database needs to get continuous access to data without any downtime. It must be always on.

3. Responsiveness

Cloud databases need to process millions of transactions in real time in order to provide instant service to customers. Like downtime, customers have zero tolerance for lagging applications. They want their apps to work in the moment they need them.

4. Accessibility

Whether your company is national or global, chances are you have customers around the world accessing your apps. A distributed cloud database ensures that data can be replicated across multiple data centers and/or cloud regions. Applications are then available worldwide with localized data.

5. Engagement

As your business grows or experiences seasonal volume spikes, your cloud database needs to be able to automatically scale to handle more data quickly and cost-effectively. Scalability ensures you provide continuous service to customers while your business grows or experiences high volume traffic.

These are the five must-have features your distributed cloud database should offer. There’s one more feature you should look for: data autonomy. You need to be able to keep your data portable and avoid cloud lock-in.

DataStax Enterprise is the only geo-distributed data layer that provides data portability across public clouds to take advantage of each and agility when market changes require you to efficiently switch cloud providers. It also supports and makes GDPR compliance much easier.

Powering Growth and Innovation in a Hybrid Cloud World (eBook)


Creating Value from Big Data, Right Now

About AMD: Advanced Micro Devices, Inc. (AMD) is an American multinational semiconductor company based in Santa Clara, California, that develops computer processors and related technologies for business and consumer markets.

The world is undergoing unprecedented change driven by big data and advances in communications technology.

The advent of big data has revolutionized analytics and data science by allowing enterprises to store, access, and analyze massive amounts of data of almost any type from any source. Simultaneously, billions of people (and things) came online, connecting to the internet and to each other, generating enormous amounts of new data in the process.

New generations of data-processing architectures took on the big data challenge by creating methods to store and access all this data in continuously growing data lakes. But the real challenge, of course, is to generate value from this data; to turn data into insight and insight into action. This, by itself, is a significant challenge, but the increasingly always-on, connected nature of society, with expectations to connect anywhere from any device with a seamless customer experience has given rise to a new class of systems.

DataStax Enterprise (DSE) implements a multi-workload, low-latency database at cloud-scale, enabling outstanding customer engagement and real-time personalization in order to create an unsurpassed customer experience from anywhere at any time.

The revolutionary AMD EPYC™ processor is perfectly suited to power this system. The AMD EPYC SoC brings a new balance to the data center. Utilizing an x86-architecture, EPYC brings together high core counts, large memory capacity, ample memory bandwidth, and massive I/O with the right ratios to help performance reach new heights.

DSE can handle transactional, search, and operational analytics workloads all running on the same cluster and possesses built-in workload isolation and replication capabilities that keep these workloads from competing for resources. EPYC’s innovative architecture is incredibly flexible, allowing you to match core count to the application need without compromising processor features. This balanced set of resources means more freedom to right-size the server configuration to the workload.

The revolutionary EPYC processor has gained significant momentum in the industry this year. It is truly exciting to see its adoption by major server vendors and cloud service providers. EPYC is now designed-in to over 50 server platforms, including those offered by HP, Dell EMC, Super Micro and Cisco.

Partnerships are critical to bringing the potential of EPYC for anyone who wants to leverage its unique blend of performance and features, and AMD is proud to partner with DataStax to create jointly validated and engineered solutions to meet today’s challenges head-on. For more information, check out the AMD and DataStax joint solution brief.

We are all here at Strata this week in New York. We invite you to stop by the AMD booth and come hear Jonathan Ellis, CTO of DataStax, on Thursday, September 13th at 3:30 pm as he discusses the strengths, weaknesses, and tradeoffs between open source Cassandra and cloud-hosted NoSQL offerings.

Incremental Repair Improvements in Cassandra 4

In our previous post, “Should you use incremental repair?”, we recommended to use subrange full repairs instead of incremental repair as CASSANDRA-9143 could generate some severe instabilities on a running cluster. As the 4.0 release approaches, let’s see how incremental repair was modified for the next major version of Apache Cassandra in order to become reliable in production.

Incremental Repair in Pre-4.0 Clusters

Since Apache Cassandra 2.1, incremental repair was performed as follows:

  • The repair coordinator will ask all replicas to build Merkle trees only using SSTables with a RepairedAt value of 0 (meaning they haven’t been part of a repair yet).
    Merkle trees are hash trees of the data they represent, they don’t store the original data.
  • Mismatching leaves of the Merkle trees will get streamed between replicas
  • When all streaming is done, anticompaction is performed on all SSTables that were part of the repair session

But during the whole process, SSTables could still get compacted away as part of the standard automatic compactions. If that happened, the SSTable would not get anticompacted and all the data it contains would not be marked as repaired. In the below diagram, SSTable 1 is compacted with 3, 4 and 5, creating SSTable 6 during the streaming phase. This happens before anticompaction is able to split apart the repaired and unrepaired data:

SSTable 1 gets compacted away before anticompaction could kick in
SSTable 1 gets compacted away before anticompaction could kick in.

If this happens on a single node, the next incremental repair run would find differences as the previously repaired data would be skipped on all replicas but one, which would lead potentially to a lot of overstreaming. This happens because Merkle trees only contain hashes of data, and in Cassandra, the height of the tree is bounded to prevent over allocation of memory. The more data we use to build our tree, the larger the tree would be. Limiting the height of the tree means the hashes in the leaves are responsible for bigger ranges of data.

Already repaired data in SSTable 6 will be part of the Merkle tree computation
Already repaired data in SSTable 6 will be part of the Merkle tree computation.

If you wonder what troubles can be generated by this bug, I invite you to read my previous blog post on this topic.

Incremental repair in 4.0, the theory

The incremental repair process is now supervised by a transaction to guarantee its consistency. In the “Prepare phase”, anticompaction is performed before the Merkle trees are computed, and the candidate SSTables will be marked as pending a specific repair. note that they are not marked as repaired just yet to avoid inconsistencies in case the repair session fails.

If a candidate SSTable is currently part of a running compaction, Cassandra will try to cancel that compaction and wait up to a minute. If the compaction successfully stops within that time, the SSTable will be locked for future anticompaction, otherwise the whole prepare phase and the repair session will fail.

Incremental repair in 4.0

SSTables marked as pending repair are only eligible to be compacted with other tables marked as pending.
SSTables in the pending repair pool are the only ones participating in both Merkle tree computations and streaming operations :

Incremental repair in 4.0

During repair, the pool of unrepaired SSTables receives newly flushed ones and compaction takes place as usual within it. SSTables that are being streamed in are part of the “pending repair” pool. This prevents two potential problems: If the streamed SSTables were put in the unrepaired pool, it could get compacted away as part of normal compaction tasks and would never be marked as repaired If the streamed SSTables were put in the repaired pool and the repair session failed, we would have data that is marked as repaired on some nodes and not others, which would generate overstreaming during the next repair

Once the repair succeeds, the coordinator sends a request to all replicas to mark the SSTables in pending state as repaired, by setting the RepairedAt timestamp (since anticompaction already took place, Cassandra just needs to set this timestamp).

Incremental repair in 4.0

If some nodes failed during the repair, the “pending repair” SSTables will be released and eligible for compaction (and repair) again. They will not be marked as repaired :

Incremental repair in 4.0

The practice

Let’s see how all of this process takes place by running a repair and observing the behavior of Cassandra.

To that end, I created a 5 node CCM cluster running locally on my laptop and used tlp-stress to load some data with a replication factor of 2 :

bin/tlp-stress run BasicTimeSeries -i 1M -p 1M -t 2 --rate 5000  --replication "{'class':'SimpleStrategy', 'replication_factor':2}"  --compaction "{'class': 'SizeTieredCompactionStrategy'}"  --host

Node was then stopped and I deleted all the SSTables from the tlp_stress.sensor_data table :

Datacenter: datacenter1
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns    Host ID                               Rack
UN  247,07 KiB  1            ?       dbccdd3e-f74a-4b7f-8cea-e8770bf995db  rack1
UN  44,08 MiB  1            ?       3ce4cca5-da75-4ede-94b7-a37e01d2c725  rack1
UN  44,07 MiB  1            ?       3b9fd30d-80c2-4fa6-b324-eaecc4f9564c  rack1
UN  43,98 MiB  1            ?       f34af1cb-4862-45e5-95cd-c36404142b9c  rack1
UN  44,05 MiB  1            ?       a5add584-2e00-4adb-8949-716b7ef35925  rack1

I ran a major compaction on all nodes to easily observe the anticompactions. On node, we then have a single SSTable on disk :

sensor_data-f4b94700ad1d11e8981cd5d05c109484 adejanovski$ ls -lrt *Data*
-rw-r--r--  1 adejanovski  staff  41110587 31 aoû 15:09 na-4-big-Data.db

The sstablemetadata tool gives us interesting information about this file :

sstablemetadata na-4-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-4-big
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Bloom Filter FP chance: 0.01
Minimum timestamp: 1535720482962762 (08/31/2018 15:01:22)
Maximum timestamp: 1535720601312716 (08/31/2018 15:03:21)
SSTable min local deletion time: 2147483647 (no tombstones)
SSTable max local deletion time: 2147483647 (no tombstones)
Compression ratio: 0.8694195642299255
TTL min: 0
TTL max: 0
First token: -9223352583900436183 (001.0.1824322)
Last token: 9223317557999414559 (001.1.2601952)
minClusteringValues: [3ca8ce0d-ad1e-11e8-80a6-91cbb8e39b05]
maxClusteringValues: [f61aabc1-ad1d-11e8-80a6-91cbb8e39b05]
Estimated droppable tombstones: 0.0
SSTable Level: 0
Repaired at: 0
Pending repair: --
Replay positions covered: {CommitLogPosition(segmentId=1535719935055, position=7307)=CommitLogPosition(segmentId=1535719935056, position=20131708)}
totalColumnsSet: 231168
totalRows: 231168
Estimated tombstone drop times: 
   Drop Time | Count  (%)  Histogram 
   50th      0 
   75th      0 
   95th      0 
   98th      0 
   99th      0 
   Min       0 
   Max       0 
Partition Size: 
   Size (bytes) | Count  (%)  Histogram 
   179 (179 B)  | 56330 ( 24) OOOOOOOOOOOOOOOOOOo
   215 (215 B)  | 78726 ( 34) OOOOOOOOOOOOOOOOOOOOOOOOOO.
   310 (310 B)  |   158 (  0) 
   372 (372 B)  |  1166 (  0) .
   446 (446 B)  |  1691 (  0) .
   535 (535 B)  |   225 (  0) 
   642 (642 B)  |    23 (  0) 
   770 (770 B)  |     1 (  0) 
   50th      215 (215 B)
   75th      258 (258 B)
   95th      258 (258 B)
   98th      258 (258 B)
   99th      372 (372 B)
   Min       150 (150 B)
   Max       770 (770 B)
Column Count: 
   Columns | Count   (%)  Histogram 
   2       |   3230 (  1) .
   3       |     34 (  0) 
   50th      1
   75th      1
   95th      1
   98th      1
   99th      2
   Min       0
   Max       3
Estimated cardinality: 222877
EncodingStats minTTL: 0
EncodingStats minLocalDeletionTime: 1442880000 (09/22/2015 02:00:00)
EncodingStats minTimestamp: 1535720482962762 (08/31/2018 15:01:22)
KeyType: org.apache.cassandra.db.marshal.UTF8Type
ClusteringTypes: [org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimeUUIDType)]
RegularColumns: data:org.apache.cassandra.db.marshal.UTF8Type

It is worth noting the cool improvements sstablemetadata has gone through in 4.0, especially regarding the histograms rendering. So far, and as expected, our SSTable is not repaired and it is not pending a running repair.

Once the repair starts, the coordinator node executes the Prepare phase and anticompaction is performed :

sensor_data-f4b94700ad1d11e8981cd5d05c109484 adejanovski$ ls -lrt *Data*
-rw-r--r--  1 adejanovski  staff  20939890 31 aoû 15:41 na-6-big-Data.db
-rw-r--r--  1 adejanovski  staff  20863325 31 aoû 15:41 na-7-big-Data.db

SSTable na-6-big is marked as pending our repair :

sstablemetadata na-6-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-6-big
Repaired at: 0
Pending repair: 8e584410-ad23-11e8-ba2c-0feeb881768f
Replay positions covered: {CommitLogPosition(segmentId=1535719935055, position=7307)=CommitLogPosition(segmentId=1535719935056, position=21103491)}

na-7-big remains in the “unrepaired pool” (it contains tokens that are not being repaired in this session) :

sstablemetadata na-7-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-7-big
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Bloom Filter FP chance: 0.01
Repaired at: 0
Pending repair: --

Once repair finishes, another look at sstablemetadata on na-6-big shows us that it is now marked as repaired :

sstablemetadata na-6-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-6-big
Estimated droppable tombstones: 0.0
SSTable Level: 0
Repaired at: 1535722885852 (08/31/2018 15:41:25)
Pending repair: --

Again, I really appreciate not having to compute the repair date by myself thanks to an sstablemetadata output that is a lot more readable than it was before.

Reliable incremental repair

While Apache Cassandra 4.0 is being stabilized and there are still a few bugs to hunt down, incremental repair finally received the fix it deserved to make it production ready for all situations. The transaction that encloses the whole operation will shield Cassandra from inconsistencies and overstreaming, making cyclic repairs a fast and safe operation. Orchestration is still needed though as SSTables cannot be part of 2 distinct repair sessions that would run at the same time, and it is advised to use a topology aware tool to perform the operation without hurdles.
It it worth noting that full repair in 4.0 doesn’t involve anticompaction anymore and does not mark SSTables as repaired. This will bring full repair back to its 2.1 behavior and allow to run it on several nodes at the same time without fearing conflicts between validation compactions and anticompactions.