Connect to ScyllaDB Clusters using the DBeaver Universal Database Manager

Learn how to use DBeaver as a simple alternative to cqlsh – so you can access your ScyllaDB clusters through a GUI with syntax highlighting

DBeaver is a universal database manager for working with SQL and NoSQL. It provides a central GUI for connecting to and administering a wide variety of popular databases (Postgres, MySQL, Redis…) . You can use it to toggle between the various databases in your tech stack, view and evaluate multiple database schemas, and visualize your data.

And now you can use it with ScyllaDB, the monstrously fast NoSQL database. This provides a simple alternative to cqlsh – enabling you to access your ScyllaDB clusters through a GUI with syntax highlighting.

DBeaver Enterprise has released version 23.3, introducing support for ScyllaDB. This update lets you seamlessly connect to ScyllaDB clusters using DBeaver. Both self-hosted and ScyllaDB Cloud instances work well with DBeaver.You can install DBeaver on Windows, Linux, and MacOS.

In this post, I will show you how easy it is to connect DBeaver and ScyllaDB and run some basic queries.

Connect to ScyllaDB in DBeaver

Since ScyllaDB is compatible with Apache Cassandra, you can leverage DBeaver’s Cassandra driver to interact with ScyllaDB.
To create a new ScyllaDB connection in DBeaver:

  1. Download DBeaver Enterprise at https://dbeaver.com/download/enterprise/
  2. Click the “New database connection” button and search for “ScyllaDB.”
  3. Click “Next”.
  4. Enter the database connection details as follows:
  5. Click “Test Connection” and “Finish.”
  6. Inspect your tables.

  7. Run your CQL queries.

Query examples

Once the connection is set up, you can run all your CQL queries and see the result table right below your query. For example:

Aside from simple SELECT queries, you can also run queries to create new objects in the database, like a materialized view:

Wrapping up

DBeaver is a great database management tool that allows easy access to your ScyllaDB clusters. It provides a simple alternative to CQLsh with a nice user interface and syntax highlighting. Get started by downloading DBeaver and creating a new ScyllaDB Cloud cluster.

Any questions? You can discuss this post and share your thoughts in our community forum.

Worldwide Local Latency With ScyllaDB: The ZeroFlucs Strategy

How data is replicated to support low latency for ZeroFlucs’ global usage patterns – without racking up unnecessary costs

ZeroFlucs’ business – processing sports betting data – is rather latency sensitive. Content must be processed in near real-time, constantly, and in a region local to both the customer and the data. And there’s incredibly high throughput and concurrency requirements – events can update dozens of times per minute and each one of those updates triggers tens of thousands of new simulations (they process ~250,000 in-game events per second).

At ScyllaDB Summit 23, ZeroFlucs’ Director of Software Engineering Carly Christensen walked attendees through how ZeroFlucs uses ScyllaDB to provide optimized data storage local to the customer – including how their recently open-sourced package (cleverly named Charybdis) facilitates this. This blog post, based on that talk, shares their brilliant approach to figuring out exactly how data should be replicated to support low latency for their global usage patterns without racking up unnecessary storage costs.

Join us at ScyllaDB Summit 24 to hear more firsthand accounts of how teams are tackling their toughest database challenges. Disney, Discord, Expedia, Fanatics, Paramount, and more are all on the agenda.

Update: ScyllaDB Summit 2024 is now a wrap!

Access ScyllaDB Summit On Demand

What’s ZeroFlucs?

First, a little background on the business challenges that the ZeroFlucs technology is supporting. ZeroFlucs’ same-game pricing lets sports enthusiasts bet on multiple correlated outcomes within a single game. This is leagues beyond traditional bets on what team will win a game and by what spread. Here, customers are encouraged to design and test sophisticated game theories involving interconnected outcomes within the game. As a result, placing a bet is complex, and there’s a lot more at stake as the live event unfolds.

For example, assume there are three “markets” for bets:

  • Will Team A or Team B win?
  • Which player will score the first touchdown?
  • Will the combined scores of Team A and B be over or under 45.5 points?

Someone could place a bet on team A to win, B. Bhooma to score the first touchdown, and for the total score to be under 45.5 points. If you look at those 3 outcomes and multiply the prices together, you get a price of around $28. But in this case, the correct price is approximately $14.50.

Carly explains why. “It’s because these are correlated outcomes. So, we need to use a simulation-based approach to more effectively model the relationships between those outcomes. If a team wins, it’s much more likely that they will score the first touchdown or any other touchdown in that match. So, we run simulations, and each simulation models a game end-to-end, play-by-play. We run tens of thousands of these simulations to ensure that we cover as much of the probability space as possible.”

The ZeroFlucs Architecture

The ZeroFlucs platform was designed from the ground up to be cloud native. Their software stack runs on Kubernetes, using Oracle Container Engine for Kubernetes. There are 130+ microservices, growing every week. And a lot of their environment can be managed through custom resource definitions (CRDs) and operators. As Carly explains, “For example, if we want to add a new sport, we just define a new instance of that resource type and deploy that YAML file out to all of our clusters.” A few more tech details:

  • Services are primarily Golang
  • Python is used for modeling and simulation services
  • GRPC is used for internal communications
  • Kafka is used for “at least once” delivery of all incoming and outgoing updates
  • GraphQL is used for external-facing APIs

As the diagram above shows:

  • Multiple third-party sources send content feeds.
  • Those content items are combined into booking events, which are then used for model simulations.
  • The simulation results are used to generate hundreds to thousands of new markets (specific outcomes that can be bet on), which are then stored back on the original booking event.
  • Customers can interact directly with that booking event. Or, they can use the ZeroFlucs API to request prices for custom combinations of outcomes via the ZeroFlucs query engine. Those queries are answered with stored results from their simulations.

Any content update starts the entire process over again.

Keeping Pace with Live In-Play Events

ZeroFlucs’ ultimate goal is to process and simulate events fast enough to offer same-game prices for live in-play events. For example, they need to predict whether this play results in a touchdown and which player will score the next touchdown – and they must do it fast enough to provide the prices before the play is completed. There are two main challenges to accomplishing this:

  • High throughput and concurrency. Events can update dozens of times per minute, and each update triggers tens of thousands of new simulations (hundreds of megabytes of data). They’re currently processing about 250,000 in-game events per second.
  • Customers can be located anywhere in the world. That means ZeroFlucs must be able to place their services — and the associated data – near these customers. With each request passing through many microservices, even a small increase in latency between those services and the database can result in a major impact on the total end-to-end processing time.

Selecting a Database That’s Up to the Task

Carly and team initially explored whether three popular databases might meet their needs here.

  • MongoDB was familiar to many team members. However, they discovered that with a high number of concurrent queries, some queries took several seconds to complete.
  • Cassandra supported network-aware replication strategies, but its performance and resource usage fell short of their requirements.
  • CosmosDB addressed all their performance and regional distribution needs, but its high cost and Azure-only availability posed limitations on their portability. But they couldn’t justify its high cost, or the vendor lock-in.

Then they thought about ScyllaDB, a database they had discovered while working on a different project. It didn’t make sense for the earlier use case, but it met this project’s requirements quite nicely. As Carly put it: “ScyllaDB supported the distributed architecture that we needed, so we could locate our data replicas near our services and our customers to ensure that they always had low latency. It also supported the high throughput and concurrency that we required. We haven’t yet found a situation that we couldn’t just scale through. ScyllaDB was also easy to adopt. Using ScyllaDB Operator, we didn’t need a lot of domain knowledge to get started.”

Inside their ScyllaDB Architecture

ZeroFlucs is currently using ScyllaDB hosted on Oracle Cloud Flex 4 VMs. These VMs allow them to change the CPU and memory allocation to those nodes if needed. It’s currently performing well, but the company’s throughput increases with every new customer. That’s why they appreciate being able to scale up and run on bare metal if needed in the future.

They’re already using ScyllaDB Operator to manage ScyllaDB, and they were reviewing their strategy around ScyllaDB Manager and ScyllaDB Monitoring at the time of the talk.

Ensuring Data is Local to Customers

To make the most of ScyllaDB, ZeroFlucs divided their data into three main categories:

  • Global data. This is slow-changing data used by all their customers. It’s replicated to each and every one of their regions.
  • Regional data. This is data that’s used by multiple customers in a single region (for example, a sports feed). If a customer in another region requires their data, they separately replicate it into that region.
  • Customer data. This is data that is specific to that customer, such as their booked events or their simulation results. Each customer has a home region where multiple replicas of their data are stored. ZeroFlucs also keeps additional copies of their data in other agents that they can use for disaster recovery purposes.

Carly shared an example: “Just to illustrate that idea, let’s say we have a customer in London. We will place a copy of our services (“a cell”) into that region. And all of that customer’s interactions will be contained in that region, ensuring that they always have low latency. We’ll place multiple replicas of their data in that region. And will also place additional replicas of their data in other regions. This becomes important later.”

Now assume there’s a customer in the Newport region. They would place a cell of their services there, and all of that customer’s interactions would be contained within the Newport region so they also have low latency.

Carly continues, “If the London data center becomes unavailable, we can redirect that customer’s requests to the Newport region. And although they would have increased latency on the first hop of those requests, the rest of the processing is still contained within one data center – so it would still be low latency.” With a complete outage for that customer averted, ZeroFlucs would then increase the number of replicas of their data in that region to restore data resiliency for them.

Between Scylla(DB) and Charybdis

ZeroFlucs separates data into services and keyspaces, with each service using at least one keyspace. Global data has just one keyspace, regional data has a keyspace per region, and customer data has a keyspace per customer. Some services can have more than one data type, and thus might have both a global keyspace as well as customer keyspaces.

They needed a simple way to manage the orchestration and updating of keyspaces across all their services. Enter Charybdis, the Golang ScyllaDB helper library that the ZeroFlucs team created and open sourced. Charybdis features a table manager that will automatically create keyspaces as well as add tables, columns, and indexes. It offers simplified functions for CRUD-style operations, and it supports LWT and TTL.

Note: For an in-depth look at the design decisions behind Charydbis, see this blog by ZeroFlucs Founder and CEO Steve Gray.

There’s also a topology Controller Service that’s responsible for managing the replication settings and keyspace information related to every service.

Upon startup, the service calls the topology controller and retrieves its replication settings. It then combines that data with its table definitions and uses it to maintain its keyspaces in ScyllaDB. The above image shows sample Charybdis-generated DDL statements that include a network topology strategy.

Next on their Odyssey

Carly concluded: “We still have a lot to learn, and we’re really early in our journey. For example, our initial attempt at dynamic keyspace creation caused some timeouts between our services, especially if it was the first request for that instance of the service. And there are still many Scylla DB settings that we have yet to explore. I’m sure that we’ll be able to increase our performance and get even more out of Scylla DB in the future.”

Watch the Complete Tech Talk

You can watch Carly’s complete tech talk and skim through her deck in our tech talk library.

Watch the Full Tech Talk

Running Apache Cassandra® Single and Multi-Node Clusters on Docker with Docker Compose

Sometimes you might need to spin up a local test database quickly–a database that doesn’t need to last beyond a set time or number of uses. Or maybe you want to integrate Apache Cassandra® into an existing Docker setup.  

Either way, you’re going to want to run Cassandra on Docker, which means running it in a container with Docker as the container manager. This tutorial is here to guide you through running a single and multi-node setup of Apache Cassandra on Docker. 

Prerequisites 

Before getting started, you’ll need to have a few things already installed, and a few basic skills. These will make deploying and running your Cassandra database in Docker a seamless experience: 

  • Docker installed  
  • Basic knowledge of containers and Docker (see the Docker documentation for more insight) 
  • Basic command line knowledge 
  • A code editor (I use VSCode) 
  • CQL shell, aka cqlsh, installed (instructions for installing a standalone cqlsh without installing Cassandra can be found here) 

Method 1: Running a single Cassandra node using Docker CLI 

This method uses the Docker CLI to create a container based on the latest official Cassandra image. In this example we will: 

  • Set up the Docker container 
  • Test that it’s set up by connecting to it and running cqlsh 
  • Clean up the container once you’re done with using it. 

Setting up the container 

You can run Cassandra on your machine by opening up a terminal and using the following command in the Docker CLI: 

docker run –name my-cassandra-db  -d cassandra:latest 

Let’s look at what this command does: 

  • Docker uses the ‘run’ subcommand to run new containers.  
  • The ‘–name’ field allows us to name the container, which helps for later use and cleanup; we’ll use the name ‘my-cassandra-db’ 
  • The ‘-d’ flag tells Docker to run the container in the background, so we can run other commands or close the terminal without turning off the container.  
  • The final argument ‘cassandra:latest’ is the image to build the container from; we’re using the latest official Cassandra image 

When you run this, you should see an ID, like the screenshot below: 

To check and make sure everything is running smoothly, run the following command: 

docker ps -a 

You should see something like this: 

Connecting to the container 

Now that the data container has been created, you can now connect to it using the following command: 

docker exec -it my-cassandra-db cqlsh 

This will run cqlsh, or CQL Shell, inside your container, allowing you to make queries to your new Cassandra database. You should see a prompt like the following: 

Cleaning up the container 

Once you’re done, you can clean up the container with the ’docker rm’ command. First, you’ll need to stop the container though, so you must to run the following 2 commands:  

docker stop my-cassandra-db 

docker rm my-cassandra-db 

This will delete the database container, including all data that was written to the database. You’ll see a prompt like the following, which, if it worked correctly, will show the ID of the container being stopped/removed: 

Method 2: Deploying a three-node Apache Cassandra cluster using Docker compose 

This method allows you to have multiple nodes running on a single machine. But in which situations would you want to use this method? Some examples include testing the consistency level of your queries, your replication setup, and more.

Writing a docker-compose.yml 

The first step is creating a docker-compose.yml file that describes our Cassandra cluster. In your code editor, create a docker-compose.yml file and enter the following into it: 

version: '3.8' 

networks: 

  cassandra: 

services: 

  cassandra1: 

    image: cassandra:latest 

    container_name: cassandra1 

    hostname: cassandra1 

    networks: 

      - cassandra 

    ports: 

      - "9042:9042" 

    environment: &environment  

        CASSANDRA_SEEDS: "cassandra1,cassandra2"   

        CASSANDRA_CLUSTER_NAME: MyTestCluster 

        CASSANDRA_DC: DC1 

        CASSANDRA_RACK: RACK1 

        CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch 

        CASSANDRA_NUM_TOKENS: 128 

  cassandra2: 

    image: cassandra:latest 

    container_name: cassandra2 

    hostname: cassandra2 

    networks: 

      - cassandra 

    ports: 

      - "9043:9042" 

    environment: *environment   

    depends_on: 

      cassandra1:  

        condition: service_started 

  cassandra3: 

    image: cassandra:latest 

    container_name: cassandra3 

    hostname: cassandra3 

    networks: 

      - cassandra 

    ports: 

      - "9044:9042" 

    environment: *environment   

    depends_on: 

      cassandra2:   

        condition: service_started

So what does this all mean? Let’s examine it part-by-part:  

First, we declare our docker compose version.  

version: '3.8'

Then, we declared a network called cassandra to host our cluster.  

networks: 

  cassandra:

Under services, cassandra1 is started. (NOTE: the depends on service start conditions in cassandra2 and cassandra3’s `depends_on~ attributes prevent them from starting until the service on cassandra1 and cassandra2 have started, respectively.) We also set the port forwarding here so that our local 9042 port will map to the container’s 9042. We also add it to the cassandra network we established: 

services: 

  cassandra1: 

    image: cassandra:latest 

    container_name: cassandra1 

    hostname: cassandra1 

    networks: 

      - cassandra 

    ports: 

      - "9042:9042" 

    environment: &environment  

        CASSANDRA_SEEDS: "cassandra1,cassandra2"   

        CASSANDRA_CLUSTER_NAME: MyTestCluster 

        CASSANDRA_DC: DC1 

        CASSANDRA_RACK: RACK1 

        CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch 

        CASSANDRA_NUM_TOKENS: 128

Finally, we set some environment variables needed for startup, such as declaring CASSANDRA_SEEDS to be cassandra1 and cassandra2. 

The configurations for containers ‘cassandra2 ‘and ‘cassandra3’ are very similar; the only real difference are the names.  

  • Both use the same cassandra:latest image, set container names, add themselves to the Cassandra network, and expose their 9042 port.  
  • They also point to the same environment variables as cassandra1 with the *environment syntax. 
    • Their only difference? cassandra2 waits on cassandra1, and cassandra3 waits on cassandra2. 

Here is the code section that this maps to: 

cassandra2: 

    image: cassandra:latest 

    container_name: cassandra2 

    hostname: cassandra2 

    networks: 

      - cassandra 

    ports: 

      - "9043:9042" 

    environment: *environment   

    depends_on: 

      cassandra1:  

        condition: service_started 

  cassandra3: 

    image: cassandra:latest 

    container_name: cassandra3 

    hostname: cassandra3 

    networks: 

      - cassandra 

    ports: 

      - "9044:9042" 

    environment: *environment   

    depends_on: 

      cassandra2:   

        condition: service_started

Deploying your Cassandra cluster and running commands 

To deploy your Cassandra cluster, use the Docker CLI in the same folder as your docker-compose.yml to run the following command (the -d causes the containers to run in the background): 

docker compose up -d

Quite a few things should happen in your terminal when you run the command, but when the dust has settled you should see something like this: 

If you run the ‘docker ps -a,’ command,  you should see three running containers: 

To access your Cassandra cluster, you can use csqlsh to connect to the container database using the following commands: 

sudo docker exec -it cassandra1 cqlsh

You can also check the cluster configuration using: 

docker exec -it cassandra1 nodetool status

Which will get you something like this: 

And the node info with: 

docker exec -it cassandra1 nodetool info

From which you’ll see something similar to the following: 

You can also run these commands on the cassandra2 and cassandra3 containers. 

Cleaning up 

Once you’re done with the database cluster, you can take it down and remove it with the following command: 

docker compose down

This will stop and destroy all three containers, outputting something like this:

Now that we’ve covered two ways to run Cassandra in Docker, let’s look at a few things to keep in mind when you’re using it. 

Important things to know about running Cassandra in Docker 

Data Permanence 

Unless you declare volumes on the machine that maps to container volumes, the data you write to your Cassandra database will be erased when the container is destroyed. (You can read more about using Docker volumes here).

Performance and Resources 

Apache Cassandra can take a lot of resources, especially when a cluster is deployed on a single machine. This can affect the performance of queries, and you’ll need a decent amount of CPU and RAM to run a cluster locally. 

Conclusion 

There are several ways to run Apache Cassandra on Docker, and we hope this post has illuminated a few ways to do so. If you’re interested in learning more about Cassandra, you can find out more about how data modelling works with Cassandra, or how PostgreSQL and Cassandra differ. 

Ready to spin up some Cassandra clusters yourself? Give it a go with a free trial on the Instaclustr Managed Platform for Apache Cassandra® today!

The post Running Apache Cassandra® Single and Multi-Node Clusters on Docker with Docker Compose appeared first on Instaclustr.

The World’s Largest Apache Kafka® and Apache Cassandra® Migration?

Here at Instaclustr by NetApp, we pride ourselves on our ability to migrate customers from self-managed or other managed providers with minimal risk and zero downtime, no matter how complex the scenario. At any point in time, we typically have 5-10 cluster migrations in progress. Planning and executing these migrations with our customers is a core part of the expertise of our Technical Operations team. 

Recently, we completed the largest new customer onboarding migration exercise in our history and it’s quite possibly the largest Apache Cassandra and Apache Kafka migration exercise ever completed by anyone. While we can’t tell you who the customer is, in this blog we will walk through the overall process and provide details of our approach. This will give you an idea of the lengths we go to onboarding customers and perhaps to pick up some hints for your own migration exercises. 

Firstly, some stats to give you a sense of the scale of the exercise: 

  • Apache Cassandra: 
    • 58 clusters
    • 1,079 nodes
    • 17 node sizes (ranging from r6g.medium to im4gn.4xlarge)
    • 2 cloud providers (AWS and GCP)
    • 6 cloud provider regions
  • Apache Kafka 
    • 154 clusters
    • 1,050 nodes
    • 21 node sizes (ranging from r6g.large to im4gn.4xlarge and r6gd.4xlarge)
    • 2 cloud providers (AWS and GCP)
    • 6 cloud provider regions

From the size of the environment, you can get a sense that the customer involved is a pretty large and mature organisation. Interestingly, this customer had been an Instaclustr support customer for a number of years. Based on that support experience, they decide to trust us with taking on full management of their clusters to help reduce costs and improve reliability. 

Clearly, completing this number of migrations required a big effort both from Instaclustr and our customer. The timeline for the project looked something like: 

  • July 2022: contract signed and project kicked off 
  • July 2022 – March 2023: customer compliance review, POCs and feature enhancement development 
  • February 2023 – October 2023: production migrations 

Project Management and Governance 

A key to the success of any large project like this is strong project management and governance. Instaclustr has a wellestablished customer project management methodology that we apply to projects: 

Source: Instaclustr 

In line with this methodology, we staffed several key roles to support this project: 

  • Overall program manager 
  • Cassandra migration project manager 
  • Cassandra technical lead 
  • Kafka migration project manager 
  • Kafka technical lead 
  • Key Customer Product Manager 

The team worked directly with our customer counterparts and established several communication mechanisms that were vital to the success of the project. 

Architectural and Security Compliance 

While high-level compliance with the customer’s security and architecture requirements had been established during the pre-contract phase, the first phase of post-contract work was a more detailed solution review with the customer’s compliance and architectural teams.  

To facilitate this requirement, Instaclustr staff met regularly with the customer’s security team to understand their requirements and explain Instaclustr’s existing controls that met these needs. 

As expected, Instaclustr’s existing SOC2 and PCI certified controls meant that a very high percentage of the customer’s requirements were met right out of the box. This included controls such as intrusion detection, access logging and operating system hardening.  

However, as is common in mature environments with well-established requirements, a few gaps were identified and Instaclustr agreed to take these on as system enhancements. Some examples of the enhancements we delivered prior to commencing production migrations include: 

  • Extending the existing system to export logs to a customer-owned location to include audit logs 
  • The ability to opt-in at an account level for all newly created clusters to be automatically configured with log shipping 
  • Allowing the process that loads custom Kafka Connect connectors to use instance roles rather than access keys for s3 access 
  • Enhancements to our SCIM API for provisioning SSO access 

In addition to establishing security compliance, we used this period to further validate architectural fit and identified some enhancements that would help to ensure an optimal fit for the migrated clusters. Two key enhancements were delivered to meet this goal: 

  • Support for Kafka clusters running in two Availability Zones with RF2 
  • This is necessary as the customer has a fairly unique architecture that delivers HA above the Kafka cluster level 
  • Enabling multiple new AWS and GCP node types to optimize infrastructure spend 

Apache Kafka Migration 

Often when migrating Apache Kafka, the simplest approach is what we call Drain Out.   

In this approach, Kafka consumers are pointed at both the source and destination clusters; the producers are then switched to send messages to just the destination cluster. Once all messages are read from the source cluster, the consumers there can be switched off and the migration is complete. 

However, while this is the simplest approach from a Kafka point of view, it does not allow you to preserve message ordering through the cutover. This can be important in many use cases, and was certainly important for this customer. 

When the Drain Out approach is not suitable, using MirrorMaker2 can also be an option; we have deployed it on many occasions for other migrations. In this particular case, however, the level of consumer/producer application dependency for this approach ruled out using MirorrMaker2.  

This left us with the Shared Cluster approach, where we operate the source and destination clusters as a single cluster for a period before decommissioning the source. 

The high-level steps we followed for this shared cluster migration approach are: 

1. Provision destination Instaclustr managed cluster, shut down and wipe all data 

2. Update configurations on the destination cluster to match source cluster as required 

3. Join network environments with the source cluster (VPC peering, etc) 

4. Start up destination Apache ZooKeeper™ in observer mode, and start up destination Kafka brokers 

5. Use Kafka partition reassignment to move data:

a. Increase replication factor and replicate across destination as well as source brokers 

b. Swap preferred leaders to destination brokers 

c. Decrease replication factor to remove replicas from source brokers 

6. Reconfigure clients to use destination brokers as initial contact points 

7. Remove old brokers 

For each cluster, a detailed change plan was created by Instaclustr to cover all of the high-level steps listed above and rollback if any issues arose.  

 A couple of other specific requirements from this environment that added extra complexity worth mentioning: 

  • The source environment shared a single ZooKeeper instance across multiple clusters. This is not a configuration that we support and the customer agreed that it was a legacy configuration that they would rather leave behind. To accommodate the migration from this shared ZooKeeper, we had to develop functionality for custom configuration of ZooKeeper node names in our managed clusters as well as build a tool to “clean” the destination ZooKeeper of data related to other clusters after migration (for security and ongoing supportability). 
  • The existing clusters had port listener mappings that did not align with the mappings supported by our management system, and reconfiguring these prior to migration would have added extensive work on the customer side. We therefore extended our custom configuration to allow more extensive custom configuration of listeners. Like other custom configuration we support, this is stored in our central configuration database so it survives node replacements and is automatically added to new nodes in a cluster. 

Apache Cassandra Migration 

We have been doing zero downtime migrations of Apache Cassandra since 2014. All of them basically follow the “add a datacenter to an existing cluster” process that we outlined in a 2016 blog. 

One key enhancement that we’ve made since this blog–and even utilized since this most recent migration–is the introduction of the Instaclustr Minotaur consistent rebuild tool (available on GitHub here).  

If the source cluster is missing replicas of some data prior to starting the rebuild, the standard Cassandra data center rebuild process can try to copy more than one replica from the same source node. This results in even fewer replicas of data on the destination cluster.  

Instaclustr Minotaur addresses these issues.  

This can mean that in the standard case of replication factor 3 and consistency level quorum queries, you can go from having 2 replicas and data being consistently returned on the source cluster to only 1 replica (or even 0 replicas) and data being intermittently missed on the destination cluster.  

The “textbook” Cassandra approach to address this is to run Cassandra repairs after the rebuild, which will ensure all expected replicas are in sync. However, we are frequently asked to migrate clusters that have not been repaired for a long time and that can make running repairs a very tricky operation.  

Using the Minotaur tool, we can guarantee that the destination cluster has at least as many replicas as the source cluster. Running repairs to get the cluster back into a fully healthy state can then be left until the cluster is fully migrated, and our Tech Ops team can hand-hold the process. 

This approach was employed across all Cassandra migrations for this customer and proved particularly important for certain clusters with high levels of inconsistency pre-migration; one particularly tricky cluster even took two and half months to fully repair post migration!  

Another noteworthy challenge from this migration was a set of clusters where tables were dropped every 2 to 3 hours.  

This is a common design pattern for temporary data in Cassandra as it allows the data to be quickly and completely removed when it is no longer required (rather than a standard delete creating “tombstones” or virtual delete records). The downside is that the streaming of data to new nodes fails if a schema change occurs during a streaming operation and can’t be restarted.  

Through the migration process, we managed to work around this with manual coordination of pausing the table drop operation on the customer side while each node rebuild was occurring. However, it quickly became apparent that this would be too cumbersome to sustain through ongoing operations.  

To remedy this, we held a joint brainstorming meeting with the customer to work through the issue and potential solutions. The end result was a design for the automation on the customer-side to pause the dropping of tables whenever it was detected that a node in the cluster was not fully available. Instaclustr’s provisioning API provided node status information that could be used to facilitate this automation.  

Conclusion 

This was a mammoth effort that not only relied on Instaclustr’s accumulated expertise from many years of running Cassandra and Kafka, but also our strong focus on working as part of one team with our customers.  

The following feedback we received from the customer project manager is exactly the type of reaction we aim for with every customer interaction: 

“We’ve hit our goal ahead of schedule and could not have done it without the work from everyone on the Instaclustr side and [customer team]. It was a pleasure working with all parties involved!  

“The migration when smoothly with minimal disruption and some lessons learned. I’m looking forward to working with the Instaclustr team as we start to normalize with the new environment and build new processes with your teams to leverage your expertise.  

“Considering the size, scope, timeline and amount of data transferred, this was the largest migration I’ve ever worked on and I couldn’t have asked for better partners on both sides.” 

Interested in doing a migration yourself? Reach out to our team of engineers and we’ll get started on the best plan of action for your use case! 

The post The World’s Largest Apache Kafka® and Apache Cassandra® Migration? appeared first on Instaclustr.

Instaclustr Product Update: December 2023

There have been a lot of exciting things happening at Instaclustr! Here’s a roundup of the updates to our platform over the last few months. 

As always, if you have a specific feature request or improvement you’d love to see, please get in contact with us. 

Major Announcements 

Cadence®: 

  • Instaclustr Managed Platform adds HTTP API for Cadence®

The Instaclustr Managed Platform now provides an HTTP API to allow interaction with Cadence® workflows from virtually any language. For further information or a technical briefing, contact an Instaclustr Customer Success representative or sales@instaclustr.com 

  • Cadence® on the Instaclustr Managed Platform Achieves PCI Certification 

We are pleased to announce that Cadence® 1.0 on the Instaclustr Managed Platform is now PCI Certified on AWS and GCP, demonstrating our company’s commitment to stringent data security practices and architecture. For further information or a technical briefing, contact an Instaclustr Customer Success representative or sales@instaclustr.com 

Apache Cassandra®: 

  • Debezium Connector Cassandra is Released for General Availability  

Instaclustr’s managed Debezium connector for Apache Cassandra makes it easy to export a stream of data changes from an Instaclustr Managed Cassandra cluster to Instaclustr Managed Kafka cluster. Try creating a Cassandra cluster with Debezium Connector on our console. 

Ocean for Apache Spark™:  

  • Interactive data engineering at scaleJupyter Notebooks, embedded in the Ocean for Apache Spark UI, is now generally available in Ocean Spark. Our customers’ administrators no longer need to install components since Ocean Spark is now hosting Jupyter infrastructure. Users no longer need to switch applications as they can have Ocean for Apache Spark process massive amounts of data in the cloud while viewing live analytic results on our screen

Other Significant Changes 

Apache Cassandra®: 

  • Apache Cassandra® Version 3.11.16 was released as generally available on Instaclustr Platform 
  • A new feature has been released for zero-downtime restores for Apache Cassandra® that eliminates downtime when the in-place restore operation is performed on a Cassandra cluster 
  • Added support for customer-initiated resize on GCP and Azure, allowing customers to vertically scale their Cassandra cluster and select any larger production disk or instance size when scaling up. And for instance size, the ability to also scale down through the Instaclustr Console or API. 

Apache Kafka®: 

  • Support for Apache ZooKeeper™ version 3.8.2 released in general availability 
  • Altered the maximum heap size allocated to Instaclustr for Kafka and ZooKeeper on smaller node sizes to improve application stability. 

Cadence®: 

  • Cadence 1.2.2 is now in general availability 

Instaclustr Managed Platform: 

  • Added support for new AWS region: UAE, Zurich, Jakarta, Melbourne and Osaka 

OpenSearch®: 

  • OpenSearch 1.3.11 and OpenSearch 2.9.0 are now in general availability on the Instaclustr Platform 
  • Dedicated Ingest Nodes for OpenSearch® is released as public preview

PostgreSQL®: 

  • PostgreSQL 15.4, 14.9, 13.12, and 16 are now in general availability 

Ocean for Apache Spark™: 

  • New memory metrics charts in our UI give our users the self-help they need to write better big data analytical applications. Our UI-experience-tracking software has shown that this new feature has already helped some customers to find the root cause of failed applications. 
  • Autotuning memory down for cost savings. Customers can use rules to mitigate out of memory errors. 
  • Cost metrics improved in both API and UI: 
    • API: users can now make REST calls to retrieve Ocean Spark usage charges one month at a time 
    • UI: metrics that estimate cloud provider costs have been enhanced to account for differences among on-demand instances, spot instances, and reserved instances.  

Upcoming Releases 

Apache Cassandra®: 

  • AWS PrivateLink with Instaclustr for Apache Cassandra® will soon be released to general availability. The AWS PrivateLink offering provides our AWS customers with a simpler and more secure option for network connectivity.​​ Log into the Instaclustr Console with just one click to trial the AWS PrivateLink public preview release with your Cassandra clusters today. Alternatively, the AWS PrivateLink public preview for Cassandra is available at the Instaclustr API. 
  • AWS 7-series nodes for Apache Cassandra® will soon be generally available on Instaclustr’s Managed Platform. The AWS 7-series nodes introduce new hardware, with Arm-based AWS Graviton3 processors and Double Data Rate 5 (DDR5) memory, providing customers with leading technology that Arm-based AWS Graviton3 processors are known for.  

Apache Kafka®: 

  • Support for custom Subject Alternative Names (SAN) on AWS​ will be added early next year, making it easier for our customers to use internal DNS to connect to their managed Kafka clusters without needing to use IP addresses. 
  • Kafka 3.6.x will be introduced soon in general availability. This will be the first Managed Kafka release on our platform to have KRaft available in GA, entitling our customers to full coverage under our extensive SLAs. 

Cadence®: 

  • User authentication through open source is under development to enhance security of our Cadence architecture. This also helps our customers more easily pass security compliance checks of applications running on Instaclustr Managed Cadence. 

Instaclustr Managed Platform: 

  • Instaclustr Managed Services will soon be available to purchase through the Azure Marketplace. This will allow you to allocate your Instaclustr spend towards any Microsoft commit you might have. The integration will first be rolled out to RIIA clusters, then a fully RIYOA setup. Both self-serve and private offers will be supported. 
  • The upcoming release of Signup with Microsoft integration marks another step in enhancing our platform’s accessibility. Users will have the option to sign up and sign in using their Microsoft Account credentials. This addition complements our existing support for Google and GitHub SSO, as well as the ability to create an account using email, ensuring a variety of convenient access methods to suit different user preferences.  

OpenSearch®: 

  • Release of searchable snapshots​​ feature is coming soon as GA. This feature will make it possible to search indexes that are stored as snapshots within remote repositories (i.e., S3) without the need to download all the index data to disk ahead of time, allowing customers to save time and leverage cheaper storage options. 
  • Continuing on from the public preview release of dedicated ingest nodes for Managed OpenSearch, we’re now working on making it generally available within the coming months. 

PostgreSQL®: 

  • Release of PostgreSQL–Azure NetApp (ANF) Files Fast Forking feature will be available soon. By leveraging the ANF filesystem, customers can quickly and easily create an exact copy of their PostgreSQL cluster on additional infrastructure. 
  • Release of pgvector extension is expected soon. This extension lets you use your PostgreSQL server for vector embeddings to take advantage of the latest in Large Language Models (LLMs). 

Ocean for Apache Spark™: 

  • We will soon be announcing SOC 2 certification by an external auditor since we have implemented new controls and processes to better protect data. 
  • We are currently developing new log collection techniques and UI charts to accommodate longer running applications since our customers are running more streaming workloads on our platform. 

Did you know? 

It has become more common for customers to create many accounts, each managing smaller groups of clusters. The account search functionality helps answer several questions: where the cluster is across all accounts, where the account is, and what it owns. 

Combining the real-time data streaming capabilities of Apache Kafka with the powerful distributed computing capabilities of Apache Spark can simplify your end-to-end big data processing and Machine Learning pipelines. Explore a real-world example in the first part of a blog series on Real Time Machine Learning using NetApp’s Ocean for Apache Spark and Instaclustr for Apache Kafka 

Apache Kafka cluster visualization—AKHQ—is an open source Kafka GUI for Apache Kafka, which helps with managing topics, consumers groups, Schema Registry, Kafka® Connect, and more. In this blog we detail the steps needed to be done to get AKHQ, an open source project licensed under Apache 2, working with Instaclustr for Apache Kafka. This is the first in a series of planned blogs where we’ll take you through similar steps on using other popular open source Kafka user interfaces and how to use them with your beloved Instaclustr for Apache Kafka. 

The post Instaclustr Product Update: December 2023 appeared first on Instaclustr.

Debezium® Change Data Capture for Apache Cassandra®

The Debezium® Change Data Capture (CDC) source connector for Apache Cassandra® version 4.0.11 is now generally available on the Instaclustr Managed Platform. 

The Debezium CDC source connector will stream your Cassandra data (via Apache Kafka®) to a centralized enterprise data warehouse or any other downstream data consumer. Now business information in your Cassandra database is streamed much more simply than before!  

Instaclustr has been supporting the Debezium CDC Cassandra source connector through a private offering since 2020. More recently, our Development and Open Source teams have been developing the CDC feature for use by all Instaclustr Cassandra customers. 

Instaclustr’s support for the Debezium Cassandra source connector was released to Public Preview earlier this year for our customers to trial in a test environment. Now with the general availability release, our customers can get a fully supported Cassandra CDC feature already integrated and tested on the Instaclustr platform, rather than performing the tricky integration themselves.

The Debezium CDC source connector is a really exciting feature to add to our product set, enabling our customers to transform Cassandra database records into streaming events and easily pipe the events to downstream consumers.  

Our team has been actively developing support for Debezium CDC Cassandra source connector both on our managed platform and also through open source contributions by NetApp to the Debezium Cassandra connector project. We’re looking forward to seeing more of our Cassandra customers using CDC to enhance their data infrastructure operations.”—Jaime Borucinski, Apache Cassandra Product Manager

Instaclustr’s industryleading SLAs and support offered for the CDC feature provide our customers with confidence to rely on this solution for production. This means that Instaclustr’s managed Debezium source connector for Cassandra is a good fit for your most demanding production data workloads, increasing the value for our customers with integrated solutions across our product set. Our support document, Creating a Cassandra Cluster With Debezium Connector provides step by step instructions to create an Instaclustr for Cassandra CDC solution for your business.  

How Does Change Data Capture Operate with Cassandra?  

Change Data Capture is a native Cassandra setting that can be enabled on a table at creation, or with an alter table command on an existing table. Once enabled, it will create logs that capture and track cluster data that has changed because of inserts, updates, or deletes. Once initial installation and setup has been completed, the CDC process will take these row-level changes and convert your database records to a streaming event for downstream data integration consumers. When running, the Debezium Cassandra source connector will: 

  1. Read Cassandra commit log files in cdc_raw directory for inserts, updates, deletes, and log the change into the nodes commit log. 
  2. Create a change event for every row-level insert.  
  3. For each table, publish change events in a separate Kafka topic. In practice this means that each CDC enabled table in your Cassandra cluster will have its own Kafka topic.
  4. Delete the commit log from the cdc_raw directory. 

Apache Cassandra and Debezium Open Source Developments 

Support for CDC was introduced in Cassandra version 3.11 and has continued to mature through version 4.0. There have been notable open source contributions that improve CDC functionality, including support for Cassandra version 3.11 and Cassandra 4.0 as separate modules, with shared common logic known as a core module authored by NetApp employee Stefan Miklosovic and committed by Gunnar Morling. This was identified during development for Cassandra version 4.0 CDC support.

As support for version 4.0 was added, certain components of Cassandra version 3.11 CDC would break. The introduction of the core module now allows Debezium features and fixes to be pinned to a specific Cassandra version when released, without breaking Debezium support for other Cassandra versions. This will be particularly valuable for future development of Debezium support for Cassandra version 4.1 and each newer Cassandra version. 

Stefan has continued to enhance Cassandra 4.0 support for Debezium with additional contributions like changing the CQL schema of a node without interrupting streaming events to Kafka. Previously, propagating changes in the CQL schema back to the Debezium source connector required a restart of the Debezium connector. Stefan created a CQL Java Driver schema listener, hooked it to a running node, and as soon as somebody adds a column or a table or similar to their Cassandra cluster these changes can now be detected in Debezium streamed events with no Debezium restarts! 

Another notable improvement for Cassandra CDC in Cassandra version 4.0 is the way the Debezium source connector reads the commit log. Cassandra version 3.11 CDC would buffer the CDC change events and only publish change events in batch cycles creating a processing delay. Cassandra version 4.0 CDC now continuously processes the commit logs as new data is available, achieving near real-time publishing. 

If you already have Cassandra clusters that you want to stream into an enterprise data store, get started today using the Debezium source connector for Cassandra on Instaclustr’s Managed Platform.  

Contact our Support team to learn more about how Instaclustr’s managed Debezium connector for Cassandra can unlock the value of your new or existing Cassandra data stores.  

The post Debezium® Change Data Capture for Apache Cassandra® appeared first on Instaclustr.

Medusa 0.16 was released

The k8ssandra team is happy to announce the release of Medusa for Apache Cassandra™ v0.16. This is a special release as we did a major overhaul of the storage classes implementation. We now have less code (and less dependencies) while providing much faster and resilient storage communications.

Back to ~basics~ official SDKs

Medusa has been an open source project for about 4 years now, and a private one for a few more. Over such a long time, other software it depends upon (or doesn’t) evolves as well. More specifically, the SDKs of the major cloud providers evolved a lot. We decided to check if we could replace a lot of custom code doing asynchronous parallelisation and calls to the cloud storage CLI utilities with the official SDKs.

Our storage classes so far relied on two different ways of interacting with the object storage backends:

  • Apache Libcloud, which provided a Python API for abstracting ourselves from the different protocols. It was convenient and fast for uploading a lot of small files, but very inefficient for large transfers.
  • Specific cloud vendors CLIs, which were much more efficient with large file transfers, but invoked through subprocesses. This created an overhead that made them inefficient for small file transfers. Relying on subprocesses also created a much more brittle implementation which led the community to create a lot of issues we’ve been struggling to fix.

To cut a long story short, we did it!

  • We started by looking at S3, where we went for the official boto3. As it turns out, boto3 does all the chunking, throttling, retries and parallelisation for us. Yay!
  • Next we looked at GCP. Here we went with TalkIQ’s gcloud-aio-storage. It works very well for everything, including the large files. The only thing missing is the throughput throttling.
  • Finally, we used Azure’s official SDK to cover Azure compatibility. Sadly, this still works without throttling as well.

Right after finishing these replacements, we spotted the following improvements:

  • The integration tests duration against the storage backends dropped from ~45 min to ~15 min.
    • This means Medusa became far more efficient.
    • There is now much less time spent managing storage interaction thanks to it being asynchronous to the core.
  • The Medusa uncompressed image size we bundle into k8ssandra dropped from ~2GB to ~600MB and its build time went from 2 hours to about 15 minutes.
    • Aside from giving us much faster feedback loops when working on k8ssandra, this should help k8ssandra itself move a little bit faster.
  • The file transfers are now much faster.
    • We observed up to several hundreds of MB/s per node when moving data from a VM to blob storage within the same provider. The available network speed is the limit now.
    • We are also aware that consuming the whole network throughput is not great. That’s why we now have proper throttling for S3 and are working on a solution for this in other backends too.

The only compromise we had to make was to drop Python 3.6 support. This is because the Pythons asyncio features only come in Python 3.7.

The other good stuff

Even though we are the happiest about the storage backends, there is a number of changes that should not go without mention:

  • We fixed a bug with hierarchical storage containers in Azure. This flavor of blob storage works more like a regular file system, meaning it has a concept of directories. None of the other backends do this (including the vanilla Azure ones), and Medusa was not dealing gracefully with this.
  • We are now able to build Medusa images for multiple architectures, including the arm64 one.
  • Medusa can now purge backups of nodes that have been decommissioned, meaning they are no longer present in the most recent backups. Use the new medusa purge-decommissioned command to trigger such a purge.

Upgrade now

We encourage all Medusa users to upgrade to version 0.16 to benefit from all these storage improvements, making it much faster and reliable.

Medusa v0.16 is the default version in the newly released k8ssandra-operator v1.9.0, and it can be used with previous releases by setting the .spec.medusa.containerImage.tag field in your K8ssandraCluster manifests.

Building a 100% ScyllaDB Shard-Aware Application Using Rust

Building a 100% ScyllaDB Shard-Aware Application Using Rust

I wrote a web transcript of the talk I gave with my colleagues Joseph and Yassir at [Scylla Su...

Learning Rust the hard way for a production Kafka+ScyllaDB pipeline

Learning Rust the hard way for a production Kafka+ScyllaDB pipeline

This is the web version of the talk I gave at [Scylla Summit 2022](https://www.scyllad...

Reaper 3.0 for Apache Cassandra was released

The K8ssandra team is pleased to announce the release of Reaper 3.0. Let’s dive into the main new features and improvements this major version brings, along with some notable removals.

Storage backends

Over the years, we regularly discussed dropping support for Postgres and H2 with the TLP team. The effort for maintaining these storage backends was moderate, despite our lack of expertise in Postgres, as long as Reaper’s architecture was simple. Complexity grew with more deployment options, culminating with the addition of the sidecar mode.
Some features require different consensus strategies depending on the backend, which sometimes led to implementations that worked well with one backend and were buggy with others.
In order to allow building new features faster, while providing a consistent experience for all users, we decided to drop the Postgres and H2 backends in 3.0.

Apache Cassandra and the managed DataStax Astra DB service are now the only production storage backends for Reaper. The free tier of Astra DB will be more than sufficient for most deployments.

Reaper does not generally require high availability - even complete data loss has mild consequences. Where Astra is not an option, a single Cassandra server can be started on the instance that hosts Reaper, or an existing cluster can be used as a backend data store.

Adaptive Repairs and Schedules

One of the pain points we observed when people start using Reaper is understanding the segment orchestration and knowing how the default timeout impacts the execution of repairs.
Repair is a complex choreography of operations in a distributed system. As such, and especially in the days when Reaper was created, the process could get blocked for several reasons and required a manual restart. The smart folks that designed Reaper at Spotify decided to put a timeout on segments to deal with such blockage, over which they would be terminated and rescheduled.
Problems arise when segments are too big (or have too much entropy) to process within the default 30 minutes timeout, despite not being blocked. They are repeatedly terminated and recreated, and the repair appears to make no progress.
Reaper did a poor job at dealing with this for mainly two reasons:

  • Each retry will use the same timeout, possibly failing segments forever
  • Nothing obvious was reported to explain what was failing and how to fix the situation

We fixed the former by using a longer timeout on subsequent retries, which is a simple trick to make repairs more “adaptive”. If the segments are too big, they’ll eventually pass after a few retries. It’s a good first step to improve the experience, but it’s not enough for scheduled repairs as they could end up with the same repeated failures for each run.
This is where we introduce adaptive schedules, which use feedback from past repair runs to adjust either the number of segments or the timeout for the next repair run.

Adaptive Schedules

Adaptive schedules will be updated at the end of each repair if the run metrics justify it. The schedule can get a different number of segments or a higher segment timeout depending on the latest run.
The rules are the following:

  • if more than 20% segments were extended, the number of segments will be raised by 20% on the schedule
  • if less than 20% segments were extended (and at least one), the timeout will be set to twice the current timeout
  • if no segment was extended and the maximum duration of segments is below 5 minutes, the number of segments will be reduced by 10% with a minimum of 16 segments per node.

This feature is disabled by default and is configurable on a per schedule basis. The timeout can now be set differently for each schedule, from the UI or the REST API, instead of having to change the Reaper config file and restart the process.

Incremental Repair Triggers

As we celebrate the long awaited improvements in incremental repairs brought by Cassandra 4.0, it was time to embrace them with more appropriate triggers. One metric that incremental repair makes available is the percentage of repaired data per table. When running against too much unrepaired data, incremental repair can put a lot of pressure on a cluster due to the heavy anticompaction process.

The best practice is to run it on a regular basis so that the amount of unrepaired data is kept low. Since your throughput may vary from one table/keyspace to the other, it can be challenging to set the right interval for your incremental repair schedules.

Reaper 3.0 introduces a new trigger for the incremental schedules, which is a threshold of unrepaired data. This allows creating schedules that will start a new run as soon as, for example, 10% of the data for at least one table from the keyspace is unrepaired.

Those triggers are complementary to the interval in days, which could still be necessary for low traffic keyspaces that need to be repaired to secure tombstones.

Percent unrepaired triggers

These new features will allow to securely optimize tombstone deletions by enabling the only_purge_repaired_tombstones compaction subproperty in Cassandra, permitting to reduce gc_grace_seconds down to 3 hours without fearing that deleted data reappears.

Schedules can be edited

That may sound like an obvious feature but previous versions of Reaper didn’t allow for editing of an existing schedule. This led to an annoying procedure where you had to delete the schedule (which isn’t made easy by Reaper either) and recreate it with the new settings.

3.0 fixes that embarrassing situation and adds an edit button to schedules, which allows to change the mutable settings of schedules:

Edit Repair Schedule

More improvements

In order to protect clusters from running mixed incremental and full repairs in older versions of Cassandra, Reaper would disallow the creation of an incremental repair run/schedule if a full repair had been created on the same set of tables in the past (and vice versa).

Now that incremental repair is safe for production use, it is necessary to allow such mixed repair types. In case of conflict, Reaper 3.0 will display a pop up informing you and allowing to force create the schedule/run:

Force bypass schedule conflict

We’ve also added a special “schema migration mode” for Reaper, which will exit after the schema was created/upgraded. We use this mode in K8ssandra to prevent schema conflicts and allow the schema creation to be executed in an init container that won’t be subject to liveness probes that could trigger the premature termination of the Reaper pod:

java -jar path/to/reaper.jar schema-migration path/to/cassandra-reaper.yaml

There are many other improvements and we invite all users to check the changelog in the GitHub repo.

Upgrade Now

We encourage all Reaper users to upgrade to 3.0.0, while recommending users to carefully prepare their migration out of Postgres/H2. Note that there is no export/import feature and schedules will need to be recreated after the migration.

All instructions to download, install, configure, and use Reaper 3.0.0 are available on the Reaper website.

Certificates management and Cassandra Pt II - cert-manager and Kubernetes

The joys of certificate management

Certificate management has long been a bugbear of enterprise environments, and expired certs have been the cause of countless outages. When managing large numbers of services at scale, it helps to have an automated approach to managing certs in order to handle renewal and avoid embarrassing and avoidable downtime.

This is part II of our exploration of certificates and encrypting Cassandra. In this blog post, we will dive into certificate management in Kubernetes. This post builds on a few of the concepts in Part I of this series, where Anthony explained the components of SSL encryption.

Recent years have seen the rise of some fantastic, free, automation-first services like letsencrypt, and no one should be caught flat footed by certificate renewals in 2021. In this blog post, we will look at one Kubernetes native tool that aims to make this process much more ergonomic on Kubernetes; cert-manager.

Recap

Anthony has already discussed several points about certificates. To recap:

  1. In asymmetric encryption and digital signing processes we always have public/private key pairs. We are referring to these as the Keystore Private Signing Key (KS PSK) and Keystore Public Certificate (KS PC).
  2. Public keys can always be openly published and allow senders to communicate to the holder of the matching private key.
  3. A certificate is just a public key - and some additional fields - which has been signed by a certificate authority (CA). A CA is a party trusted by all parties to an encrypted conversation.
  4. When a CA signs a certificate, this is a way for that mutually trusted party to attest that the party holding that certificate is who they say they are.
  5. CA’s themselves use public certificates (Certificate Authority Public Certificate; CA PC) and private signing keys (the Certificate Authority Private Signing Key; CA PSK) to sign certificates in a verifiable way.

The many certificates that Cassandra might be using

In a moderately complex Cassandra configuration, we might have:

  1. A root CA (cert A) for internode encryption.
  2. A certificate per node signed by cert A.
  3. A root CA (cert B) for the client-server encryption.
  4. A certificate per node signed by cert B.
  5. A certificate per client signed by cert B.

Even in a three node cluster, we can envisage a case where we must create two root CAs and 6 certificates, plus a certificate for each client application; for a total of 8+ certificates!

To compound the problem, this isn’t a one-off setup. Instead, we need to be able to rotate these certificates at regular intervals as they expire.

Ergonomic certificate management on Kubernetes with cert-manager

Thankfully, these processes are well supported on Kubernetes by a tool called cert-manager.

cert-manager is an all-in-one tool that should save you from ever having to reach for openssl or keytool again. As a Kubernetes operator, it manages a variety of custom resources (CRs) such as (Cluster)Issuers, CertificateRequests and Certificates. Critically it integrates with Automated Certificate Management Environment (ACME) Issuers, such as LetsEncrypt (which we will not be discussing today).

The workfow reduces to:

  1. Create an Issuer (via ACME, or a custom CA).
  2. Create a Certificate CR.
  3. Pick up your certificates and signing keys from the secrets cert-manager creates, and mount them as volumes in your pods’ containers.

Everything is managed declaratively, and you can reissue certificates at will simply by deleting and re-creating the certificates and secrets.

Or you can use the kubectl plugin which allows you to write a simple kubectl cert-manager renew. We won’t discuss this in depth here, see the cert-manager documentation for more information

Java batteries included (mostly)

At this point, Cassandra users are probably about to interject with a loud “Yes, but I need keystores and truststores, so this solution only gets me halfway”. As luck would have it, from version .15, cert-manager also allows you to create JKS truststores and keystores directly from the Certificate CR.

The fine print

There are two caveats to be aware of here:

  1. Most Cassandra deployment options currently available (including statefulSets, cass-operator or k8ssandra) do not currently support using a cert-per-node configuration in a convenient fashion. This is because the PodTemplate.spec portions of these resources are identical for each pod in the StatefulSet. This precludes the possibility of adding per-node certs via environment or volume mounts.
  2. There are currently some open questions about how to rotate certificates without downtime when using internode encryption.
    • Our current recommendation is to use a CA PC per Cassandra datacenter (DC) and add some basic scripts to merge both CA PCs into a single truststore to be propagated across all nodes. By renewing the CA PC independently you can ensure one DC is always online, but you still do suffer a network partition. Hinted handoff should theoretically rescue the situation but it is a less than robust solution, particularly on larger clusters. This solution is not recommended when using lightweight transactions or non LOCAL consistency levels.
    • One mitigation to consider is using non-expiring CA PCs, in which case no CA PC rotation is ever performed without a manual trigger. KS PCs and KS PSKs may still be rotated. When CA PC rotation is essential this approach allows for careful planning ahead of time, but it is not always possible when using a 3rd party CA.
    • Istio or other service mesh approaches can fully automate mTLS in clusters, but Istio is a fairly large committment and can create its own complexities.
    • Manual management of certificates may be possible using a secure vault (e.g. HashiCorp vault), sealed secrets, or similar approaches. In this case, cert manager may not be involved.

These caveats are not trivial. To address (2) more elegantly you could also implement Anthony’s solution from part one of this blog series; but you’ll need to script this up yourself to suit your k8s environment.

We are also in discussions with the folks over at cert-manager about how their ecosystem can better support Cassandra. We hope to report progress on this front over the coming months.

These caveats present challenges, but there are also specific cases where they matter less.

cert-manager and Reaper - a match made in heaven

One case where we really don’t care if a client is unavailable for a short period is when Reaper is the client.

Cassandra is an eventually consistent system and suffers from entropy. Data on nodes can become out of sync with other nodes due to transient network failures, node restarts and the general wear and tear incurred by a server operating 24/7 for several years.

Cassandra contemplates that this may occur. It provides a variety of consistency level settings allowing you to control how many nodes must agree for a piece of data to be considered the truth. But even though properly set consistency levels ensure that the data returned will be accurate, the process of reconciling data across the network degrades read performance - it is best to have consistent data on hand when you go to read it.

As a result, we recommend the use of Reaper, which runs as a Cassandra client and automatically repairs the cluster in a slow trickle, ensuring that a high volume of repairs are not scheduled all at once (which would overwhelm the cluster and degrade the performance of real clients) while also making sure that all data is eventually repaired for when it is needed.

The set up

The manifests for this blog post can be found here.

Environment

We assume that you’re running Kubernetes 1.21, and we’ll be running with a Cassandra 3.11.10 install. The demo environment we’ll be setting up is a 3 node environment, and we have tested this configuration against 3 nodes.

We will be installing the cass-operator and Cassandra cluster into the cass-operator namespace, while the cert-manager operator will sit within the cert-manager namespace.

Setting up kind

For testing, we often use kind to provide a local k8s cluster. You can use minikube or whatever solution you prefer (including a real cluster running on GKE, EKS, or AKS), but we’ll include some kind instructions and scripts here to ease the way.

If you want a quick fix to get you started, try running the setup-kind-multicluster.sh script from the k8ssandra-operator repository, with setup-kind-multicluster.sh --kind-worker-nodes 3. I have included this script in the root of the code examples repo that accompanies this blog.

A demo CA certificate

We aren’t going to use LetsEncrypt for this demo, firstly because ACME certificate issuance has some complexities (including needing a DNS or a publicly hosted HTTP server) and secondly because I want to reinforce that cert-manager is useful to organisations who are bringing their own certs and don’t need one issued. This is especially useful for on-prem deployments.

First off, create a new private key and certificate pair for your root CA. Note that the file names tls.crt and tls.key will become important in a moment.

openssl genrsa -out manifests/demoCA/tls.key 4096
openssl req -new -x509 -key manifests/demoCA/tls.key -out manifests/demoCA/tls.crt -subj "/C=AU/ST=NSW/L=Sydney/O=Global Security/OU=IT Department/CN=example.com"

(Or you can just run the generate-certs.sh script in the manifests/demoCA directory - ensure you run it from the root of the project so that the secrets appear in .manifests/demoCA/.)

When running this process on MacOS be aware of this issue which affects the creation of self signed certificates. The repo referenced in this blog post contains example certificates which you can use for demo purposes - but do not use these outside your local machine.

Now we’re going to use kustomize (which comes with kubectl) to add these files to Kubernetes as secrets. kustomize is not a templating language like Helm. But it fulfills a similar role by allowing you to build a set of base manifests that are then bundled, and which can be customised for your particular deployment scenario by patching.

Run kubectl apply -k manifests/demoCA. This will build the secrets resources using the kustomize secretGenerator and add them to Kubernetes. Breaking this process down piece by piece:

# ./manifests/demoCA
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: cass-operator
generatorOptions:
 disableNameSuffixHash: true
secretGenerator:
- name: demo-ca
  type: tls
  files:
  - tls.crt
  - tls.key
  • We use disableNameSuffixHash, because otherwise kustomize will add hashes to each of our secret names. This makes it harder to build these deployments one component at a time.
  • The tls type secret conventionally takes two keys with these names, as per the next point. cert-manager expects a secret in this format in order to create the Issuer which we will explain in the next step.
  • We are adding the files tls.crt and tls.key. The file names will become the keys of a secret called demo-ca.

cert-manager

cert-manager can be installed by running kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.5.3/cert-manager.yaml. It will install into the cert-manager namespace because a Kubernetes cluster should only ever have a single cert-manager operator installed.

cert-manager will install a deployment, as well as various custom resource definitions (CRDs) and webhooks to deal with the lifecycle of the Custom Resources (CRs).

A cert-manager Issuer

Issuers come in various forms. Today we’ll be using a CA Issuer because our components need to trust each other, but don’t need to be trusted by a web browser.

Other options include ACME based Issuers compatible with LetsEncrypt, but these would require that we have control of a public facing DNS or HTTP server, and that isn’t always the case for Cassandra, especially on-prem.

Dive into the truststore-keystore directory and you’ll find the Issuer, it is very simple so we won’t reproduce it here. The only thing to note is that it takes a secret which has keys of tls.crt and tls.key - the secret you pass in must have these keys. These are the CA PC and CA PSK we mentioned earlier.

We’ll apply this manifest to the cluster in the next step.

Some cert-manager certs

Let’s start with the Cassandra-Certificate.yaml resource:

spec:
  # Secret names are always required.
  secretName: cassandra-jks-keystore
  duration: 2160h # 90d
  renewBefore: 360h # 15d
  subject:
    organizations:
    - datastax
  dnsNames:
  - dc1.cass-operator.svc.cluster.local
  isCA: false
  usages:
    - server auth
    - client auth
  issuerRef:
    name: ca-issuer
    # We can reference ClusterIssuers by changing the kind here.
    # The default value is `Issuer` (i.e. a locally namespaced Issuer)
    kind: Issuer
    # This is optional since cert-manager will default to this value however
    # if you are using an external issuer, change this to that `Issuer` group.
    group: cert-manager.io
  keystores:
    jks:
      create: true
      passwordSecretRef: # Password used to encrypt the keystore
        key: keystore-pass
        name: jks-password
  privateKey:
    algorithm: RSA
    encoding: PKCS1
    size: 2048

The first part of the spec here tells us a few things:

  • The keystore, truststore and certificates will be fields within a secret called cassandra-jks-keystore. This secret will end up holding our KS PSK and KS PC.
  • It will be valid for 90 days.
  • 15 days before expiry, it will be renewed automatically by cert manager, which will contact the Issuer to do so.
  • It has a subject organisation. You can add any of the X509 subject fields here, but it needs to have one of them.
  • It has a DNS name - you could also provide a URI or IP address. In this case we have used the service address of the Cassandra datacenter which we are about to create via the operator. This has a format of <DC_NAME>.<NAMESPACE>.svc.cluster.local.
  • It is not a CA (isCA), and can be used for server auth or client auth (usages). You can tune these settings according to your needs. If you make your cert a CA you can even reference it in a new Issuer, and define cute tree like structures (if you’re into that).

Outside the certificates themselves, there are additional settings controlling how they are issued and what format this happens in.

  • IssuerRef is used to define the Issuer we want to issue the certificate. The Issuer will sign the certificate with its CA PSK.
  • We are specifying that we would like a keystore created with the keystore key, and that we’d like it in jks format with the corresponding key.
  • passwordSecretKeyRef references a secret and a key within it. It will be used to provide the password for the keystore (the truststore is unencrypted as it contains only public certs and no signing keys).

The Reaper-Certificate.yaml is similar in structure, but has a different DNS name. We aren’t configuring Cassandra to verify that the DNS name on the certificate matches the DNS name of the parties in this particular case.

Apply all of the certs and the Issuer using kubectl apply -k manifests/truststore-keystore.

Cass-operator

Examining the cass-operator directory, we’ll see that there is a kustomization.yaml which references the remote cass-operator repository and a local cassandraDatacenter.yaml. This applies the manifests required to run up a cass-operator installation namespaced to the cass-operator namespace.

Note that this installation of the operator will only watch its own namespace for CassandraDatacenter CRs. So if you create a DC in a different namespace, nothing will happen.

We will apply these manifests in the next step.

CassandraDatacenter

Finally, the CassandraDatacenter resource in the ./cass-operator/ directory will describe the kind of DC we want:

apiVersion: cassandra.datastax.com/v1beta1
kind: CassandraDatacenter
metadata:
  name: dc1
spec:
  clusterName: cluster1
  serverType: cassandra
  serverVersion: 3.11.10
  managementApiAuth:
    insecure: {}
  size: 1
  podTemplateSpec:
    spec:
      containers:
        - name: "cassandra"
          volumeMounts:
          - name: certs
            mountPath: "/crypto"
      volumes:
      - name: certs
        secret:
          secretName: cassandra-jks-keystore
  storageConfig:
    cassandraDataVolumeClaimSpec:
      storageClassName: standard
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 5Gi
  config:
    cassandra-yaml:
      authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
      authorizer: org.apache.cassandra.auth.AllowAllAuthorizer
      role_manager: org.apache.cassandra.auth.CassandraRoleManager
      client_encryption_options:
        enabled: true
        # If enabled and optional is set to true encrypted and unencrypted connections are handled.
        optional: false
        keystore: /crypto/keystore.jks
        keystore_password: dc1
        require_client_auth: true
        # Set trustore and truststore_password if require_client_auth is true
        truststore: /crypto/truststore.jks
        truststore_password: dc1
        protocol: TLS
        # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA] # An earlier version of this manifest configured cipher suites but the proposed config was less secure. This section does not need to be modified.
      server_encryption_options:
        internode_encryption: all
        keystore: /crypto/keystore.jks
        keystore_password: dc1
        truststore: /crypto/truststore.jks
        truststore_password: dc1
    jvm-options:
      initial_heap_size: 800M
      max_heap_size: 800M
  • We provide a name for the DC - dc1.
  • We provide a name for the cluster - the DC would join other DCs if they already exist in the k8s cluster and we configured the additionalSeeds property.
  • We use the podTemplateSpec.volumes array to declare the volumes for the Cassandra pods, and we use the podTemplateSpec.containers.volumeMounts array to describe where and how to mount them.

The config.cassandra-yaml field is where most of the encryption configuration happens, and we are using it to enable both internode and client-server encryption, which both use the same keystore and truststore for simplicity. Remember that using internode encryption means your DC needs to go offline briefly for a full restart when the CA’s keys rotate.

  • We are not using authz/n in this case to keep things simple. Don’t do this in production.
  • For both encryption types we need to specify (1) the keystore location, (2) the truststore location and (3) the passwords for the keystores. The locations of the keystore/truststore come from where we mounted them in volumeMounts.
  • We are specifying JVM options just to make this run politely on a smaller machine. You would tune this for a production deployment.

Roll out the cass-operator and the CassandraDatacenter using kubectl apply -k manifests/cass-operator. Because the CRDs might take a moment to propagate, there is a chance you’ll see errors stating that the resource type does not exist. Just keep re-applying until everything works - this is a declarative system so applying the same manifests multiple times is an idempotent operation.

Reaper deployment

The k8ssandra project offers a Reaper operator, but for simplicity we are using a simple deployment (because not every deployment needs an operator). The deployment is standard kubernetes fare, and if you want more information on how these work you should refer to the Kubernetes docs.

We are injecting the keystore and truststore passwords into the environment here, to avoid placing them in the manifests. cass-operator does not currently support this approach without an initContainer to pre-process the cassandra.yaml using envsubst or a similar tool.

The only other note is that we are also pulling down a Cassandra image and using it in an initContainer to create a keyspace for Reaper, if it does not exist. In this container, we are also adding a ~/.cassandra/cqlshrc file under the home directory. This provides SSL connectivity configurations for the container. The critical part of the cqlshrc file that we are adding is:

[ssl]
certfile = /crypto/ca.crt
validate = true
userkey = /crypto/tls.key
usercert = /crypto/tls.crt
version = TLSv1_2

The version = TLSv1_2 tripped me up a few times, as it seems to be a recent requirement. Failing to add this line will give you back the rather fierce Last error: [SSL] internal error in the initContainer. The commands run in this container are not ideal. In particular, the fact that we are sleeping for 840 seconds to wait for Cassandra to start is sloppy. In a real deployment we’d want to health check and wait until the Cassandra service became available.

Apply the manifests using kubectl apply -k manifests/reaper.

Results

If you use a GUI, look at the logs for Reaper, you should see that it has connected to the cluster and provided some nice ASCII art to your console.

If you don’t use a GUI, you can run kubectl get pods -n cass-operator to find your Reaper pod (which we’ll call REAPER_PODNAME) and then run kubectl logs -n cass-operator REAPER_PODNAME to pull the logs.

Conclusion

While the above might seem like a complex procedure, we’ve just created a Cassandra cluster with both client-server and internode encryption enabled, all of the required certs, and a Reaper deployment which is configured to connect using the correct certs. Not bad.

Do keep in mind the weaknesses relating to key rotation, and watch this space for progress on that front.

Cassandra Certificate Management Part 1 - How to Rotate Keys Without Downtime

Welcome to this three part blog series where we dive into the management of certificates in an Apache Cassandra cluster. For this first post in the series we will focus on how to rotate keys in an Apache Cassandra cluster without downtime.

Usability and Security at Odds

If you have downloaded and installed a vanilla installation of Apache Cassandra, you may have noticed when it is first started all security is disabled. Your “Hello World” application works out of the box because the Cassandra project chose usability over security. This is deliberately done so everyone benefits from the usability, as security requirement for each deployment differ. While only some deployments require multiple layers of security, others require no security features to be enabled.

Security of a system is applied in layers. For example one layer is isolating the nodes in a cluster behind a proxy. Another layer is locking down OS permissions. Encrypting connections between nodes, and between nodes and the application is another layer that can be applied. If this is the only layer applied, it leaves other areas of a system insecure. When securing a Cassandra cluster, we recommend pursuing an informed approach which offers defence-in-depth. Consider additional aspects such as encryption at rest (e.g. disk encryption), authorization, authentication, network architecture, and hardware, host and OS security.

Encrypting connections between two hosts can be difficult to set up as it involves a number of tools and commands to generate the necessary assets for the first time. We covered this process in previous posts: Hardening Cassandra Step by Step - Part 1 Inter-Node Encryption and Hardening Cassandra Step by Step - Part 2 Hostname Verification for Internode Encryption. I recommend reading both posts before reading through the rest of the series, as we will build off concepts explained in them.

Here is a quick summary of the basic steps to create the assets necessary to encrypt connections between two hosts.

  1. Create the Root Certificate Authority (CA) key pair from a configuration file using openssl.
  2. Create a keystore for each host (node or client) using keytool.
  3. Export the Public Certificate from each host keystore as a “Signing Request” using keytool.
  4. Sign each Public Certificate “Signing Request” with our Root CA to generate a Signed Certificate using openssl.
  5. Import the Root CA Public Certificate and the Signed Certificate into each keystore using keytool.
  6. Create a common truststore and import the CA Public Certificate into it using keytool.

Security Requires Ongoing Maintenance

Setting up SSL encryption for the various connections to Cassandra is only half the story. Like all other software out in the wild, there are ongoing maintenance to ensure the SSL encrypted connections continue to work.

At some point you wil need to update the certificates and stores used to implement the SSL encrypted connections because they will expire. If the certificates for a node expire it will be unable to communicate with other nodes in the cluster. This will lead to at least data inconsistencies or, in the worst case, unavailable data.

This point is specifically called out towards the end of the Inter-Node Encryption blog post. The note refers to steps 1, 2 and 4 in the above summary of commands to set up the certificates and stores. The validity periods are set for the certificates and stores in their respective steps.

One Certificate Authority to Rule Them All

Before we jump into how we handle expiring certificates and stores in a cluster, we first need to understand the role a certificate plays in securing a connection.

Certificates (and encryption) are often considered a hard topic. However, there are only a few concepts that you need to bear in mind when managing certificates.

Consider the case where two parties A and B wish to communicate with one another. Both parties distrust each other and each needs a way to prove that they are who they claim to be, as well as verify the other party is who they claim to be. To do this a mutually trusted third party needs to be brought in. In our case the trusted third party is the Certificate Authority (CA); often referred to as the Root CA.

The Root CA is effectively just a key pair; similar to an SSH key pair. The main difference is the public portion of the key pair has additional fields detailing who the public key belongs to. It has the following two components.

  • Certificate Authority Private Signing Key (CA PSK) - Private component of the CA key pair. Used to sign a keystore’s public certificate.
  • Certificate Authority Public Certificate (CA PC) - Public component of the CA key pair. Used to provide the issuer name when signing a keystore’s public certificate, as well as by a node to confirm that a third party public certificate (when presented) has been signed by the Root CA PSK.

When you run openssl to create your CA key pair using a certificate configuration file, this is the command that is run.

$ openssl req \
      -config path/to/ca_certificate.config \
      -new \
      -x509 \
      -keyout path/to/ca_psk \
      -out path/to/ca_pc \
      -days <valid_days>

In the above command the -keyout specifies the path to the CA PSK, and the -out specifies the path to the CA PC.

And in the Darkness Sign Them

In addition to a common Root CA key pair, each party has its own certificate key pair to uniquely identify it and to encrypt communications. In the Cassandra world, two components are used to store the information needed to perform the above verification check and communication encryption; the keystore and the truststore.

The keystore contains a key pair which is made up of the following two components.

  • Keystore Private Signing Key (KS PSK) - Hidden in keystore. Used to sign messages sent by the node, and decrypt messages received by the node.
  • Keystore Public Certificate (KS PC) - Exported for signing by the Root CA. Used by a third party to encrypt messages sent to the node that owns this keystore.

When created, the keystore will contain the PC, and the PSK. The PC signed by the Root CA, and the CA PC are added to the keystore in subsequent operations to complete the trust chain. The certificates are always public and are presented to other parties, while PSK always remains secret. In an asymmetric/public key encryption system, messages can be encrypted with the PC but can only be decrypted using the PSK. In this way, a node can initiate encrypted communications without needing to share a secret.

The truststore stores one or more CA PCs of the parties which the node has chosen to trust, since they are the source of trust for the cluster. If a party tries to communicate with the node, it will refer to its truststore to see if it can validate the attempted communication using a CA PC that it knows about.

For a node’s KS PC to be trusted and verified by another node using the CA PC in the truststore, the KS PC needs to be signed by the Root CA key pair. Futhermore, the CA key pair is used to sign the KS PC of each party.

When you run openssl to sign an exported Keystore PC, this is the command that is run.

$ openssl x509 \
    -req \
    -CAkey path/to/ca_psk \
    -CA path/to/ca_pc \
    -in path/to/exported_ks_pc_sign_request \
    -out paht/to/signed_ks_pc \
    -days <valid_days> \
    -CAcreateserial \
    -passin pass:<ca_psk_password>

In the above command both the Root CA PSK and CA PC are used via -CAkey and -CA respectively when signing the KS PC.

More Than One Way to Secure a Connection

Now that we have a deeper understanding of the assets that are used to encrypt communications, we can examine various ways to implement it. There are multiple ways to implement SSL encryption in an Apache Cassandra cluster. Regardless of the encryption approach, the objective when applying this type of security to a cluster is to ensure;

  • Hosts (nodes or clients) can determine whether they should trust other hosts in cluster.
  • Any intercepted communication between two hosts is indecipherable.

The three most common methods vary in both ease of deployment and resulting level of security. They are as follows.

The Cheats Way

The easiest and least secure method for rolling out SSL encryption can be done in the following way

Generation

  • Single CA for the cluster.
  • Single truststore containing the CA PC.
  • Single keystore which has been signed by the CA.

Deployment

  • The same keystore and truststore are deployed to each node.

In this method a single Root CA and a single keystore is deployed to all nodes in the cluster. This means any node can decipher communications intended for any other node. If a bad actor gains control of a node in the cluster then they will be able to impersonate any other node. That is, compromise of one host will compromise all of them. Depending on your threat model, this approach can be better than no encryption at all. It will ensure that a bad actor with access to only the network will no longer be able to eavesdrop on traffic.

We would use this method as a stop gap to get internode encryption enabled in a cluster. The idea would be to quickly deploy internode encryption with the view of updating the deployment in the near future to be more secure.

Best Bang for Buck

Arguably the most popular and well documented method for rolling out SSL encryption is

Generation

  • Single CA for the cluster.
  • Single truststore containing the CA PC.
  • Unique keystore for each node all of which have been signed by the CA.

Deployment

  • Each keystore is deployed to its associated node.
  • The same truststore is deployed to each node.

Similar to the previous method, this method uses a cluster wide CA. However, unlike the previous method each node will have its own keystore. Each keystore has its own certificate that is signed by a Root CA common to all nodes. The process to generate and deploy the keystores in this way is practiced widely and well documented.

We would use this method as it provides better security over the previous method. Each keystore can have its own password and host verification, which further enhances the security that can be applied.

Fort Knox

The method that offers the strongest security of the three can be rolled out in following way

Generation

  • Unique CA for each node.
  • A single truststore containing the Public Certificate for each of the CAs.
  • Unique keystore for each node that has been signed by the CA specific to the node.

Deployment

  • Each keystore with its unique CA PC is deployed to its associated node.
  • The same truststore is deployed to each node.

Unlike the other two methods, this one uses a Root CA per host and similar to the previous method, each node will have its own keystore. Each keystore has its own PC that is signed by a Root CA unique to the node. The Root CA PC of each node needs to be added to the truststore that is deployed to all nodes. For large cluster deployments this encryption configuration is cumbersome and will result in a large truststore being generated. Deployments of this encryption configuration are less common in the wild.

We would use this method as it provides all the advantages of the previous method and in addition, provides the ability to isolate a node from the cluster. This can be done by simply rolling out a new truststore which excludes a specific node’s CA PC. In this way a compromised node could be isolated from the cluster by simply changing the truststore. Under the previous two approaches, isolation of a compromised node in this fashion would require a rollout of an entirely new Root CA and one or more new keystores. Furthermore, each new Keystore CA would need to be signed by the new Root CA.

WARNING: Ensure your Certificate Authority is secure!

Regardless of the deployment method chosen, the whole setup will depend on the security of the Root CA. Ideally both components should be secured, or at the very least the PSK needs to be secured properly after it is generated since all trust is based on it. If both components are compromised by a bad actor, then that actor can potentially impersonate another node in the cluster. The good news is, there are a variety of ways to secure the Root CA components, however that topic goes beyond the scope of this post.

The Need for Rotation

If we are following best practices when generating our CAs and keystores, they will have an expiry date. This is a good thing because it forces us to regenerate and roll out our new encryption assets (stores, certificates, passwords) to the cluster. By doing this we minimise the exposure that any one of the components has. For example, if a password for a keystore is unknowingly leaked, the password is only good up until the keystore expiry. Having a scheduled expiry reduces the chance of a security leak becoming a breach, and increases the difficulty for a bad actor to gain persistence in the system. In the worst case it limits the validity of compromised credentials.

Always Read the Expiry Label

The only catch to having an expiry date on our encryption assets is that we need to rotate (update) them before they expire. Otherwise, our data will be unavailable or may be inconsistent in our cluster for a period of time. Expired encryption assets when forgotten can be a silent, sinister problem. If, for example, our SSL certificates expire unnoticed we will only discover this blunder when we restart the Cassandra service. In this case the Cassandra service will fail to connect to the cluster on restart and SSL expiry error will appear in the logs. At this point there is nothing we can do without incurring some data unavailability or inconsistency in the cluster. We will cover what to do in this case in a subsequent post. However, it is best to avoid this situation by rotating the encryption assets before they expire.

How to Play Musical Certificates

Assuming we are going to rotate our SSL certificates before they expire, we can perform this operation live on the cluster without downtime. This process requires the replication factor and consistency level to configured to allow for a single node to be down for a short period of time in the cluster. Hence, it works best when use a replication factor >= 3 and use consistency level <= QUORUM or LOCAL_QUORUM depending on the cluster configuration.

  1. Create the NEW encryption assets; NEW CA, NEW keystores, and NEW truststore, using the process described earlier.
  2. Import the NEW CA to the OLD truststore already deployed in the cluster using keytool. The OLD truststore will increase in size, as it has both the OLD and NEW CAs in it.
    $ keytool -keystore <old_truststore> -alias CARoot -importcert -file <new_ca_pc> -keypass <new_ca_psk_password> -storepass <old_truststore_password> -noprompt
    

    Where:

    • <old_truststore>: The path to the OLD truststore already deployed in the cluster. This can be just a copy of the OLD truststore deployed.
    • <new_ca_pc>: The path to the NEW CA PC generated.
    • <new_ca_psk_password>: The password for the NEW CA PSKz.
    • <old_truststore_password>: The password for the OLD truststore.
  3. Deploy the updated OLD truststore to all the nodes in the cluster. Specifically, perform these steps on a single node, then repeat them on the next node until all nodes are updated. Once this step is complete, all nodes in the cluster will be able to establish connections using both the OLD and NEW CAs.
    1. Drain the node using nodetool drain.
    2. Stop the Cassandra service on the node.
    3. Copy the updated OLD truststore to the node.
    4. Start the Cassandra service on the node.
  4. Deploy the NEW keystores to their respective nodes in the cluster. Perform this operation one node at a time in the same way the OLD truststore was deployed in the previous step. Once this step is complete, all nodes in the cluster will be using their NEW SSL certificate to establish encrypted connections with each other.
  5. Deploy the NEW truststore to all the nodes in the cluster. Once again, perform this operation one node at a time in the same way the OLD truststore was deployed in Step 3.

The key to ensuring uptime in the rotation are in Steps 2 and 3. That is, we have the OLD and the NEW CAs all in the truststore and deployed on every node prior to rolling out the NEW keystores. This allows nodes to communicate regardless of whether they have the OLD or NEW keystore. This is because both the OLD and NEW assets are trusted by all nodes. The process still works whether our NEW CAs are per host or cluster wide. If the NEW CAs are per host, then they all need to be added to the OLD truststore.

Example Certificate Rotation on a Cluster

Now that we understand the theory, let’s see the process in action. We will use ccm to create a three node cluster running Cassandra 3.11.10 with internode encryption configured.

As pre-cluster setup task we will generate the keystores and truststore to implement the internode encryption. Rather than carry out the steps manually to generate the stores, we have developed a script called generate_cluster_ssl_stores that does the job for us.

The script requires us to supply the node IP addresses, and a certificate configuration file. Our certificate configuration file, test_ca_cert.conf has the following contents:

[ req ]
distinguished_name     = req_distinguished_name
prompt                 = no
output_password        = mypass
default_bits           = 2048

[ req_distinguished_name ]
C                      = AU
ST                     = NSW
L                      = Sydney
O                      = TLP
OU                     = SSLTestCluster
CN                     = SSLTestClusterRootCA
emailAddress           = info@thelastpickle.com¡

The command used to call the generate_cluster_ssl_stores.sh script is as follows.

$ ./generate_cluster_ssl_stores.sh -g -c -n 127.0.0.1,127.0.0.2,127.0.0.3 test_ca_cert.conf

Let’s break down the options in the above command.

  • -g - Generate passwords for each keystore and the truststore.
  • -c - Create a Root CA for the cluster and sign each keystore PC with it.
  • -n - List of nodes to generate keystores for.

The above command generates the following encryption assets.

$ ls -alh ssl_artifacts_20210602_125353
total 72
drwxr-xr-x   9 anthony  staff   288B  2 Jun 12:53 .
drwxr-xr-x   5 anthony  staff   160B  2 Jun 12:53 ..
-rw-r--r--   1 anthony  staff    17B  2 Jun 12:53 .srl
-rw-r--r--   1 anthony  staff   4.2K  2 Jun 12:53 127-0-0-1-keystore.jks
-rw-r--r--   1 anthony  staff   4.2K  2 Jun 12:53 127-0-0-2-keystore.jks
-rw-r--r--   1 anthony  staff   4.2K  2 Jun 12:53 127-0-0-3-keystore.jks
drwxr-xr-x  10 anthony  staff   320B  2 Jun 12:53 certs
-rw-r--r--   1 anthony  staff   1.0K  2 Jun 12:53 common-truststore.jks
-rw-r--r--   1 anthony  staff   219B  2 Jun 12:53 stores.password

With the necessary stores generated we can create our three node cluster in ccm. Prior to starting the cluster our nodes should look something like this.

$ ccm status
Cluster: 'SSLTestCluster'
-------------------------
node1: DOWN (Not initialized)
node2: DOWN (Not initialized)
node3: DOWN (Not initialized)

We can configure internode encryption in the cluster by modifying the cassandra.yaml files for each node as follows. The passwords for each store are in the stores.password file created by the generate_cluster_ssl_stores.sh script.

node1 - cassandra.yaml

...
server_encryption_options:
  internode_encryption: all
  keystore: /ssl_artifacts_20210602_125353/127-0-0-1-keystore.jks
  keystore_password: HQR6xX4XQrYCz58CgAiFkWL9OTVDz08e
  truststore: /ssl_artifacts_20210602_125353/common-truststore.jks
  truststore_password: 8dPhJ2oshBihAYHcaXzgfzq6kbJ13tQi
...

node2 - cassandra.yaml

...
server_encryption_options:
  internode_encryption: all
  keystore: /ssl_artifacts_20210602_125353/127-0-0-2-keystore.jks
  keystore_password: Aw7pDCmrtacGLm6a1NCwVGxohB4E3eui
  truststore: /ssl_artifacts_20210602_125353/common-truststore.jks
  truststore_password: 8dPhJ2oshBihAYHcaXzgfzq6kbJ13tQi
...

node3 - cassandra.yaml

...
server_encryption_options:
  internode_encryption: all
  keystore: /ssl_artifacts_20210602_125353/127-0-0-3-keystore.jks
  keystore_password: 1DdFk27up3zsmP0E5959PCvuXIgZeLzd
  truststore: /ssl_artifacts_20210602_125353/common-truststore.jks
  truststore_password: 8dPhJ2oshBihAYHcaXzgfzq6kbJ13tQi
...

Now that we configured internode encryption in the cluster, we can start the nodes and monitor the logs to make sure they start correctly.

$ ccm node1 start && touch ~/.ccm/SSLTestCluster/node1/logs/system.log && tail -n 40 -f ~/.ccm/SSLTestCluster/node1/logs/system.log
...
$ ccm node2 start && touch ~/.ccm/SSLTestCluster/node2/logs/system.log && tail -n 40 -f ~/.ccm/SSLTestCluster/node2/logs/system.log
...
$ ccm node3 start && touch ~/.ccm/SSLTestCluster/node3/logs/system.log && tail -n 40 -f ~/.ccm/SSLTestCluster/node3/logs/system.log

In all cases we see the following message in the logs indicating that internode encryption is enabled.

INFO  [main] ... MessagingService.java:704 - Starting Encrypted Messaging Service on SSL port 7001

Once all the nodes have started, we can check the cluster status. We are looking to see that all nodes are up and in a normal state.

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  90.65 KiB  16           65.8%             2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  66.31 KiB  16           65.5%             f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  71.46 KiB  16           68.7%             46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

We will create a NEW Root CA along with a NEW set of stores for the cluster. As part of this process, we will add the NEW Root CA PC to OLD (current) truststore that is already in use in the cluster. Once again we can use our generate_cluster_ssl_stores.sh script to this, including the additional step of adding the NEW Root CA PC to our OLD truststore. This can be done with the following commands.

# Make the password to our old truststore available to script so we can add the new Root CA to it.

$ export EXISTING_TRUSTSTORE_PASSWORD=$(cat ssl_artifacts_20210602_125353/stores.password | grep common-truststore.jks | cut -d':' -f2)
$ ./generate_cluster_ssl_stores.sh -g -c -n 127.0.0.1,127.0.0.2,127.0.0.3 -e ssl_artifacts_20210602_125353/common-truststore.jks test_ca_cert.conf 

We call our script using a similar command to the first time we used it. The difference now is we are using one additional option; -e.

  • -e - Path to our OLD (existing) truststore which we will add the new Root CA PC to. This option requires us to set the OLD truststore password in the EXISTING_TRUSTSTORE_PASSWORD variable.

The above command generates the following new encryption assets. These files are located in a different directory to the old ones. The directory with the old encryption assets is ssl_artifacts_20210602_125353 and the directory with the new encryption assets is ssl_artifacts_20210603_070951

$ ls -alh ssl_artifacts_20210603_070951
total 72
drwxr-xr-x   9 anthony  staff   288B  3 Jun 07:09 .
drwxr-xr-x   6 anthony  staff   192B  3 Jun 07:09 ..
-rw-r--r--   1 anthony  staff    17B  3 Jun 07:09 .srl
-rw-r--r--   1 anthony  staff   4.2K  3 Jun 07:09 127-0-0-1-keystore.jks
-rw-r--r--   1 anthony  staff   4.2K  3 Jun 07:09 127-0-0-2-keystore.jks
-rw-r--r--   1 anthony  staff   4.2K  3 Jun 07:09 127-0-0-3-keystore.jks
drwxr-xr-x  10 anthony  staff   320B  3 Jun 07:09 certs
-rw-r--r--   1 anthony  staff   1.0K  3 Jun 07:09 common-truststore.jks
-rw-r--r--   1 anthony  staff   223B  3 Jun 07:09 stores.password

When we look at our OLD truststore we can see that it has increased in size. Originally, it was 1.0K and it is now 2.0K in size after adding the new Root CA PC it.

$ ls -alh ssl_artifacts_20210602_125353/common-truststore.jks
-rw-r--r--  1 anthony  staff   2.0K  3 Jun 07:09 ssl_artifacts_20210602_125353/common-truststore.jks

We can now roll out the updated OLD truststore. In a production Cassandra deployment we would copy the updated OLD truststore to a node and restart the Cassandra service. Then repeat this process on the other nodes in the cluster, one node at a time. In our case, our locally running nodes are already pointing to the updated OLD truststore. We need to only restart the Cassandra service.

$ for i in $(ccm status | grep UP | cut -d':' -f1); do echo "restarting ${i}" && ccm ${i} stop && sleep 3 && ccm ${i} start; done
restarting node1
restarting node2
restarting node3

After the restart, our nodes are up and in a normal state.

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  140.35 KiB  16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  167.23 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  173.7 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

Our nodes are using the updated OLD truststore which has the old Root CA PC and the new Root CA PC. This means that nodes will be able to communicate using either the old (current) keystore or the new keystore. We can now roll out the new keystore one node at a time and still have all our data available.

To do the new keystore roll out we will stop the Cassandra service, update its configuration to point to the new keystore, and then start the Cassandra service. A few notes before we start:

  • The node will need to point to the new keystore located in the directory with the new encryption assets; ssl_artifacts_20210603_070951.
  • The node will still need to use the OLD truststore, so its path will remain unchanged.

node1 - stop Cassandra service

$ ccm node1 stop
$ ccm node2 status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
DN  127.0.0.1  140.35 KiB  16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  142.19 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  148.66 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

node1 - update keystore path to point to new keystore in cassandra.yaml

...
server_encryption_options:
  internode_encryption: all
  keystore: /ssl_artifacts_20210603_070951/127-0-0-1-keystore.jks
  keystore_password: V3fKP76XfK67KTAti3CXAMc8hVJGJ7Jg
  truststore: /ssl_artifacts_20210602_125353/common-truststore.jks
  truststore_password: 8dPhJ2oshBihAYHcaXzgfzq6kbJ13tQi
...

node1 - start Cassandra service

$ ccm node1 start
$ ccm node2 status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  179.23 KiB  16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  142.19 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  148.66 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

At this point we have node1 using the new keystore while node2 and node3 are using the old keystore. Our nodes are once again up and in a normal state, so we can proceed to update the certificates on node2.

node2 - stop Cassandra service

$ ccm node2 stop
$ ccm node3 status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  224.48 KiB  16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
DN  127.0.0.2  188.46 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  194.35 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

node2 - update keystore path to point to new keystore in cassandra.yaml

...
server_encryption_options:
  internode_encryption: all
  keystore: /ssl_artifacts_20210603_070951/127-0-0-2-keystore.jks
  keystore_password: 3uEjkTiR0xI56RUDyo23TENJjtMk8VbY
  truststore: /ssl_artifacts_20210602_125353/common-truststore.jks
  truststore_password: 8dPhJ2oshBihAYHcaXzgfzq6kbJ13tQi
...

node2 - start Cassandra service

$ ccm node2 start
$ ccm node3 status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  224.48 KiB  16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  227.12 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  194.35 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

At this point we have node1 and node2 using the new keystore while node3 is using the old keystore. Our nodes are once again up and in a normal state, so we can proceed to update the certificates on node3.

node3 - stop Cassandra service

$ ccm node3 stop
$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  225.42 KiB  16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  191.31 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
DN  127.0.0.3  194.35 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

node3 - update keystore path to point to new keystore in cassandra.yaml

...
server_encryption_options:
  internode_encryption: all
  keystore: /ssl_artifacts_20210603_070951/127-0-0-3-keystore.jks
  keystore_password: hkjMwpn2y2aYllePAgCNzkBnpD7Vxl6f
  truststore: /ssl_artifacts_20210602_125353/common-truststore.jks
  truststore_password: 8dPhJ2oshBihAYHcaXzgfzq6kbJ13tQi
...

node3 - start Cassandra service

$ ccm node3 start
$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  225.42 KiB  16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  191.31 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  239.3 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

The keystore rotation is now complete on all nodes in our cluster. However, all nodes are still using the updated OLD truststore. To ensure that our old Root CA can no longer be used to intercept messages in our cluster we need to roll out the NEW truststore to all nodes. This can be done in the same way we deployed the new keystores.

node1 - stop Cassandra service

$ ccm node1 stop
$ ccm node2 status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
DN  127.0.0.1  225.42 KiB  16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  191.31 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  185.37 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

node1 - update truststore path to point to new truststore in cassandra.yaml

...
server_encryption_options:
  internode_encryption: all
  keystore: /ssl_artifacts_20210603_070951/127-0-0-1-keystore.jks
  keystore_password: V3fKP76XfK67KTAti3CXAMc8hVJGJ7Jg
  truststore: /ssl_artifacts_20210603_070951/common-truststore.jks
  truststore_password: 0bYmrrXaKIPJQ5UrtQQTFpPLepMweaLc
...

node1 - start Cassandra service

$ ccm node1 start
$ ccm node2 status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  150 KiB    16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  191.31 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  185.37 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

Now we update the truststore for node2.

node2 - stop Cassandra service

$ ccm node2 stop
$ ccm node3 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  150 KiB    16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
DN  127.0.0.2  191.31 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  185.37 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

node2 - update truststore path to point to NEW truststore in cassandra.yaml

...
server_encryption_options:
  internode_encryption: all
  keystore: /ssl_artifacts_20210603_070951/127-0-0-2-keystore.jks
  keystore_password: 3uEjkTiR0xI56RUDyo23TENJjtMk8VbY
  truststore: /ssl_artifacts_20210603_070951/common-truststore.jks
  truststore_password: 0bYmrrXaKIPJQ5UrtQQTFpPLepMweaLc
...

node2 - start Cassandra service

$ ccm node2 start
$ ccm node3 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  150 KiB    16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  294.05 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  185.37 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

Now we update the truststore for node3.

node3 - stop Cassandra service

$ ccm node3 stop
$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  150 KiB    16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  208.83 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
DN  127.0.0.3  185.37 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

node3 - update truststore path to point to NEW truststore in cassandra.yaml

...
server_encryption_options:
  internode_encryption: all
  keystore: /ssl_artifacts_20210603_070951/127-0-0-3-keystore.jks
  keystore_password: hkjMwpn2y2aYllePAgCNzkBnpD7Vxl6f
  truststore: /ssl_artifacts_20210603_070951/common-truststore.jks
  truststore_password: 0bYmrrXaKIPJQ5UrtQQTFpPLepMweaLc
...

node3 - start Cassandra service

$ ccm node3 start
$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  150 KiB    16           100.0%            2661807a-d8d3-4bba-8639-6c0fada2ac88  rack1
UN  127.0.0.2  208.83 KiB  16           100.0%            f3db4bbe-1f35-4edb-8513-cb55a05393a7  rack1
UN  127.0.0.3  288.6 KiB  16           100.0%            46c2f4b5-905b-42b4-8bb9-563a03c4b415  rack1

The rotation of the certificates is now complete and all while having only a single node down at any one time! This process can be used for all three of the deployment variations. In addition, it can be used to move between the different deployment variations without incurring downtime.

Conclusion

Internode encryption plays an important role in securing the internal communication of a cluster. When deployed, it is crucial that certificate expiry dates be tracked so the certificates can be rotated before they expire. Failure to do so will result in unavailability and inconsistencies.

Using the process discussed in this post and combined with the appropriate tooling, internode encryption can be easily deployed and associated certificates easily rotated. In addition, the process can be used to move between the different encryption deployments.

Regardless of the reason for using the process, it can be executed without incurring downtime in common Cassandra use cases.

Running your Database on OpenShift and CodeReady Containers

Let’s take an introductory run-through of setting up your database on OpenShift, using your own hardware and RedHat’s CodeReady Containers.

CodeReady Containers is a great way to run OpenShift K8s locally, ideal for development and testing. The steps in this blog post will require a machine, laptop or desktop, of decent capability; preferably quad CPUs and 16GB+ RAM.

Download and Install RedHat’s CodeReady Containers

Download and install RedHat’s CodeReady Containers (version 1.27) as described in Red Hat OpenShift 4 on your laptop: Introducing Red Hat CodeReady Containers.

First configure CodeReady Containers, from the command line

❯ crc setup

…
Your system is correctly setup for using CodeReady Containers, you can now run 'crc start' to start the OpenShift cluster

Check the version is correct.

❯ crc version

…
CodeReady Containers version: 1.27.0+3d6bc39d
OpenShift version: 4.7.11 (not embedded in executable)

Then start it, entering the Pull Secret copied from the download page. Have patience here, this can take ten minutes or more.

❯ crc start

INFO Checking if running as non-root
…
Started the OpenShift cluster.

The server is accessible via web console at:
  https://console-openshift-console.apps-crc.testing 
…

The output above will include the kubeadmin password which is required in the following oc login … command.

❯ eval $(crc oc-env)
❯ oc login -u kubeadmin -p <password-from-crc-setup-output> https://api.crc.testing:6443

❯ oc version

Client Version: 4.7.11
Server Version: 4.7.11
Kubernetes Version: v1.20.0+75370d3

Open in a browser the URL https://console-openshift-console.apps-crc.testing

Log in using the kubeadmin username and password, as used above with the oc login … command. You might need to try a few times because of the self-signed certificate used.

Once OpenShift has started and is running you should see the following webpage

CodeReady Start Webpage

Some commands to help check status and the startup process are

❯ oc status       

In project default on server https://api.crc.testing:6443

svc/openshift - kubernetes.default.svc.cluster.local
svc/kubernetes - 10.217.4.1:443 -> 6443

View details with 'oc describe <resource>/<name>' or list resources with 'oc get all'.      

Before continuing, go to the CodeReady Containers Preferences dialog. Increase CPUs and Memory to >12 and >14GB correspondingly.

CodeReady Preferences dialog

Create the OpenShift Local Volumes

Cassandra needs persistent volumes for its data directories. There are different ways to do this in OpenShift, from enabling local host paths in Rancher persistent volumes, to installing and using the OpenShift Local Storage Operator, and of course persistent volumes on the different cloud provider backends.

This blog post will use vanilla OpenShift volumes using folders on the master k8s node.

Go to the “Terminal” tab for the master node and create the required directories. The master node is found on the /cluster/nodes/ webpage.

Click on the node, named something like crc-m89r2-master-0, and then click on the “Terminal” tab. In the terminal, execute the following commands:

sh-4.4# chroot /host
sh-4.4# mkdir -p /mnt/cass-operator/pv000
sh-4.4# mkdir -p /mnt/cass-operator/pv001
sh-4.4# mkdir -p /mnt/cass-operator/pv002
sh-4.4# 

Persistent Volumes are to be created with affinity to the master node, declared in the following yaml. The name of the master node can vary from installation to installation. If your master node is not named crc-gm7cm-master-0 then the following command replaces its name. First download the cass-operator-1.7.0-openshift-storage.yaml file, check the name of the node in the nodeAffinity sections against your current CodeReady Containers instance, updating if necessary.

❯ wget https://thelastpickle.com/files/openshift-intro/cass-operator-1.7.0-openshift-storage.yaml

# The name of your master node
❯ oc get nodes -o=custom-columns=NAME:.metadata.name --no-headers

# If it is not crc-gm7cm-master-0
❯ sed -i '' "s/crc-gm7cm-master-0/$(oc get nodes -o=custom-columns=NAME:.metadata.name --no-headers)/" cass-operator-1.7.0-openshift-storage.yaml

Create the Persistent Volumes (PV) and Storage Class (SC).

❯ oc apply -f cass-operator-1.7.0-openshift-storage.yaml

persistentvolume/server-storage-0 created
persistentvolume/server-storage-1 created
persistentvolume/server-storage-2 created
storageclass.storage.k8s.io/server-storage created

To check the existence of the PVs.

❯ oc get pv | grep server-storage

server-storage-0   10Gi   RWO    Delete   Available   server-storage     5m19s
server-storage-1   10Gi   RWO    Delete   Available   server-storage     5m19s
server-storage-2   10Gi   RWO    Delete   Available   server-storage     5m19s

To check the existence of the SC.

❯ oc get sc

NAME             PROVISIONER                    RECLAIMPOLICY   VOLUMEBINDINGMODE      ALLOWVOLUMEEXPANSION   AGE
server-storage   kubernetes.io/no-provisioner   Delete          WaitForFirstConsumer   false                  5m36s

More information on using the can be found in the RedHat documentation for OpenShift volumes.

Deploy the Cass-Operator

Now create the cass-operator. Here we can use the upstream 1.7.0 version of the cass-operator. After creating (applying) the cass-operator, it is important to quickly execute the oc adm policy … commands in the following step so the pods have the privileges required and are successfully created.

❯ oc apply -f https://raw.githubusercontent.com/k8ssandra/cass-operator/v1.7.0/docs/user/cass-operator-manifests.yaml

namespace/cass-operator created
serviceaccount/cass-operator created
secret/cass-operator-webhook-config created
W0606 14:25:44.757092   27806 warnings.go:70] apiextensions.k8s.io/v1beta1 CustomResourceDefinition is deprecated in v1.16+, unavailable in v1.22+; use apiextensions.k8s.io/v1 CustomResourceDefinition
W0606 14:25:45.077394   27806 warnings.go:70] apiextensions.k8s.io/v1beta1 CustomResourceDefinition is deprecated in v1.16+, unavailable in v1.22+; use apiextensions.k8s.io/v1 CustomResourceDefinition
customresourcedefinition.apiextensions.k8s.io/cassandradatacenters.cassandra.datastax.com created
clusterrole.rbac.authorization.k8s.io/cass-operator-cr created
clusterrole.rbac.authorization.k8s.io/cass-operator-webhook created
clusterrolebinding.rbac.authorization.k8s.io/cass-operator-crb created
clusterrolebinding.rbac.authorization.k8s.io/cass-operator-webhook created
role.rbac.authorization.k8s.io/cass-operator created
rolebinding.rbac.authorization.k8s.io/cass-operator created
service/cassandradatacenter-webhook-service created
deployment.apps/cass-operator created
W0606 14:25:46.701712   27806 warnings.go:70] admissionregistration.k8s.io/v1beta1 ValidatingWebhookConfiguration is deprecated in v1.16+, unavailable in v1.22+; use admissionregistration.k8s.io/v1 ValidatingWebhookConfiguration
W0606 14:25:47.068795   27806 warnings.go:70] admissionregistration.k8s.io/v1beta1 ValidatingWebhookConfiguration is deprecated in v1.16+, unavailable in v1.22+; use admissionregistration.k8s.io/v1 ValidatingWebhookConfiguration
validatingwebhookconfiguration.admissionregistration.k8s.io/cassandradatacenter-webhook-registration created

❯ oc adm policy add-scc-to-user privileged -z default -n cass-operator

clusterrole.rbac.authorization.k8s.io/system:openshift:scc:privileged added: "default"

❯ oc adm policy add-scc-to-user privileged -z cass-operator -n cass-operator

clusterrole.rbac.authorization.k8s.io/system:openshift:scc:privileged added: "cass-operator"

Let’s check the deployment happened.

❯ oc get deployments -n cass-operator

NAME            READY   UP-TO-DATE   AVAILABLE   AGE
cass-operator   1/1     1            1           14m

Let’s also check the cass-operator pod was created and is successfully running. Note that the kubectl command is used here, for all k8s actions the oc and kubectl commands are interchangable.

❯ kubectl get pods -w -n cass-operator

NAME                             READY   STATUS    RESTARTS   AGE
cass-operator-7675b65744-hxc8z   1/1     Running   0          15m

Troubleshooting: If the cass-operator does not end up in Running status, or if any pods in later sections fail to start, it is recommended to use the OpenShift UI Events webpage for easy diagnostics.

Setup the Cassandra Cluster

The next step is to create the cluster. The following deployment file creates a 3 node cluster. It is largely a copy from the upstream cass-operator version 1.7.0 file example-cassdc-minimal.yaml but with a small modification made to allow all the pods to be deployed to the same worker node (as CodeReady Containers only uses one k8s node by default).

❯ oc apply -n cass-operator -f https://thelastpickle.com/files/openshift-intro/cass-operator-1.7.0-openshift-minimal-3.11.yaml

cassandradatacenter.cassandra.datastax.com/dc1 created

Let’s watch the pods get created, initialise, and eventually becoming running, using the kubectl get pods … watch command.

❯ kubectl get pods -w -n cass-operator

NAME                             READY   STATUS    RESTARTS   AGE
cass-operator-7675b65744-28fhw   1/1     Running   0          102s
cluster1-dc1-default-sts-0       0/2     Pending   0          0s
cluster1-dc1-default-sts-1       0/2     Pending   0          0s
cluster1-dc1-default-sts-2       0/2     Pending   0          0s
cluster1-dc1-default-sts-0       2/2     Running   0          3m
cluster1-dc1-default-sts-1       2/2     Running   0          3m
cluster1-dc1-default-sts-2       2/2     Running   0          3m

Use the Cassandra Cluster

With the Cassandra pods each up and running, the cluster is ready to be used. Test it out using the nodetool status command.

❯ kubectl -n cass-operator exec -it cluster1-dc1-default-sts-0 -- nodetool status

Defaulting container name to cassandra.
Use 'kubectl describe pod/cluster1-dc1-default-sts-0 -n cass-operator' to see all of the containers in this pod.
Datacenter: dc1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address      Load       Tokens       Owns (effective)  Host ID                               Rack
UN  10.217.0.73  84.42 KiB  1            83.6%             672baba8-9a05-45ac-aad1-46427027b57a  default
UN  10.217.0.72  70.2 KiB   1            65.3%             42758a86-ea7b-4e9b-a974-f9e71b958429  default
UN  10.217.0.71  65.31 KiB  1            51.1%             2fa73bc2-471a-4782-ae63-5a34cc27ab69  default

The above command can be run on `cluster1-dc1-default-sts-1` and `cluster1-dc1-default-sts-2` too.

Next, test out cqlsh. For this authentication is required, so first get the CQL username and password.

# Get the cql username
❯ kubectl -n cass-operator get secret cluster1-superuser -o yaml | grep " username" | awk -F" " '{print $2}' | base64 -d && echo ""

# Get the cql password
❯ kubectl -n cass-operator get secret cluster1-superuser -o yaml | grep " password" | awk -F" " '{print $2}' | base64 -d && echo ""

❯ kubectl -n cass-operator exec -it cluster1-dc1-default-sts-0 -- cqlsh -u <cql-username> -p <cql-password>

Connected to cluster1 at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.7 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cluster1-superuser@cqlsh>

Keep It Clean

CodeReady Containers are very simple to clean up, especially because it is a packaging of OpenShift intended only for development purposes. To wipe everything, just “delete”

❯ crc stop
❯ crc delete

If, on the other hand, you only want to delete individual steps, each of the following can be done (but in order).

❯ oc delete -n cass-operator -f https://thelastpickle.com/files/openshift-intro/cass-operator-1.7.0-openshift-minimal-3.11.yaml

❯ oc delete -f https://raw.githubusercontent.com/k8ssandra/cass-operator/v1.7.0/docs/user/cass-operator-manifests.yaml

❯ oc delete -f cass-operator-1.7.0-openshift-storage.yaml

On Scylla Manager Suspend & Resume feature

On Scylla Manager Suspend & Resume feature

!!! warning "Disclaimer" This blog post is neither a rant nor intended to undermine the great work that...

Apache Cassandra's Continuous Integration Systems

With Apache Cassandra 4.0 just around the corner, and the feature freeze on trunk lifted, let’s take a dive into the efforts ongoing with the project’s testing and Continuous Integration systems.

continuous integration in open source

Every software project benefits from sound testing practices and having a continuous integration in place. Even more so for open source projects. From contributors working around the world in many different timezones, particularly prone to broken builds and longer wait times and uncertainties, to contributors just not having the same communication bandwidths between each other because they work in different companies and are scratching different itches.

This is especially true for Apache Cassandra. As an early-maturity technology used everywhere on mission critical data, stability and reliability are crucial for deployments. Contributors from many companies: Alibaba, Amazon, Apple, Bloomberg, Dynatrace, DataStax, Huawei, Instaclustr, Netflix, Pythian, and more; need to coordinate and collaborate and most importantly trust each other.

During the feature freeze the project was fortunate to not just stabilise and fix tons of tests, but to also expand its continuous integration systems. This really helps set the stage for a post 4.0 roadmap that features heavy on pluggability, developer experience and safety, as well as aiming for an always-shippable trunk.

@ cassandra

The continuous integration systems at play are CircleCI and ci-cassandra.apache.org

CircleCI is a commercial solution. The main usage today of CircleCI is pre-commit, that is testing your patches while they get reviewed before they get merged. To effectively use CircleCI on Cassandra requires either the medium or high resource profiles that enables the use of hundreds of containers and lots of resources, and that’s basically only available for folk working in companies that are paying for a premium CircleCI account. There are lots stages to the CircleCI pipeline, and developers just trigger those stages they feel are relevant to test that patch on.

CircleCI pipeline

ci-cassandra is our community CI. It is based on CloudBees, provided by the ASF and running 40 agents (servers) around the world donated by numerous different companies in our community. Its main usage is post-commit, and its pipelines run every stage automatically. Today the pipeline consists of 40K tests. And for the first time in many years, on the lead up to 4.0, pipeline runs are completely green.

ci-cassandra pipeline

ci-cassandra is setup with a combination of Jenkins DSL script, and declarative Jenkinsfiles. These jobs use the build scripts found here.

forty thousand tests

The project has many types of tests. It has proper unit tests, and unit tests that have some embedded Cassandra server. The unit tests are run in a number of different parameterisations: from different Cassandra configuration, JDK 8 and JDK 11, to supporting the ARM architecture. There’s CQLSH tests written in Python against a single ccm node. Then there’s the Java distributed tests and Python distributed tests. The Python distributed tests are older, use CCM, and also run parameterised. The Java distributed tests are a recent addition and run the Cassandra nodes inside the JVM. Both types of distributed tests also include testing the upgrade paths of different Cassandra versions. Most new distributed tests today are written as Java distributed tests. There are also burn and microbench (JMH) tests.

distributed is difficult

Testing distributed tech is hardcore. Anyone who’s tried to run the Python upgrade dtests locally knows the pain. Running the tests in Docker helps a lot, and this is what CircleCI and ci-cassandra predominantly does. The base Docker images are found here. Distributed tests can fall over for numerous reasons, exacerbated in ci-cassandra with heterogenous servers around the world and all the possible network and disk issues that can occur. Just for the 4.0 release over 200 Jira tickets were focused just on strengthening flakey tests. Because ci-cassandra has limited storage, the logs and test results to all runs are archived in nightlies.apache.org/cassandra.

call for help

There’s still heaps to do. This is all part-time and volunteer efforts. No one in the community is dedicated to these systems or as a build engineer. The project can use all the help it can get.

There’s a ton of exciting stuff to add. Some examples are microbench and JMH reports, Jacoco test coverage reports, Harry for fuzz testing, Adelphi or Fallout for end-to-end performance and comparison testing, hooking up Apache Yetus for efficient resource usage, or putting our Jenkins stack into a k8s operator run script so you can run the pipeline on your own k8s cluster.

So don’t be afraid to jump in, pick your poison, we’d love to see you!

Reaper 2.2 for Apache Cassandra was released

We’re pleased to announce that Reaper 2.2 for Apache Cassandra was just released. This release includes a major redesign of how segments are orchestrated, which allows users to run concurrent repairs on nodes. Let’s dive into these changes and see what they mean for Reaper’s users.

New Segment Orchestration

Reaper works in a variety of standalone or distributed modes, which create some challenges in meeting the following requirements:

  • A segment is processed successfully exactly once.
  • No more than one segment is running on a node at once.
  • Segments can only be started if the number of pending compactions on a node involved is lower than the defined threshold.

To make sure a segment won’t be handled by several Reaper instances at once, Reaper relies on LightWeight Transactions (LWT) to implement a leader election process. A Reaper instance will “take the lead” on a segment by using a LWT and then perform the checks for the last two conditions above.

To avoid race conditions between two different segments involving a common set of replicas that would start at the same time, a “master lock” was placed after the checks to guarantee that a single segment would be able to start. This required a double check to be performed before actually starting the segment.

Segment Orchestration pre 2.2 design

There were several issues with this design:

  • It involved a lot of LWTs even if no segment could be started.
  • It was a complex design which made the code hard to maintain.
  • The “master lock” was creating a lot of contention as all Reaper instances would compete for the same partition, leading to some nasty situations. This was especially the case in sidecar mode as it involved running a lot of Reaper instances (one per Cassandra node).

As we were seeing suboptimal performance and high LWT contention in some setups, we redesigned how segments were orchestrated to reduce the number of LWTs and maximize concurrency during repairs (all nodes should be busy repairing if possible).
Instead of locking segments, we explored whether it would be possible to lock nodes instead. This approach would give us several benefits:

  • We could check which nodes are already busy without issuing JMX requests to the nodes.
  • We could easily filter segments to be processed to retain only those with available nodes.
  • We could remove the master lock as we would have no more race conditions between segments.

One of the hard parts was that locking several nodes in a consistent manner would be challenging as they would involve several rows, and Cassandra doesn’t have a concept of an atomic transaction that can be rolled back as RDBMS do. Luckily, we were able to leverage one feature of batch statements: All Cassandra batch statements which target a single partition will turn all operations into a single atomic one (at the node level). If one node out of all replicas was already locked, then none would be locked by the batched LWTs. We used the following model for the leader election table on nodes:

CREATE TABLE reaper_db.running_repairs (
    repair_id uuid,
    node text,
    reaper_instance_host text,
    reaper_instance_id uuid,
    segment_id uuid,
    PRIMARY KEY (repair_id, node)
) WITH CLUSTERING ORDER BY (node ASC)

The following LWTs are then issued in a batch for each replica:

BEGIN BATCH

UPDATE reaper_db.running_repairs USING TTL 90
SET reaper_instance_host = 'reaper-host-1', 
    reaper_instance_id = 62ce0425-ee46-4cdb-824f-4242ee7f86f4,
    segment_id = 70f52bc2-7519-11eb-809e-9f94505f3a3e
WHERE repair_id = 70f52bc0-7519-11eb-809e-9f94505f3a3e AND node = 'node1'
IF reaper_instance_id = null;

UPDATE reaper_db.running_repairs USING TTL 90
SET reaper_instance_host = 'reaper-host-1',
    reaper_instance_id = 62ce0425-ee46-4cdb-824f-4242ee7f86f4,
    segment_id = 70f52bc2-7519-11eb-809e-9f94505f3a3e
WHERE repair_id = 70f52bc0-7519-11eb-809e-9f94505f3a3e AND node = 'node2'
IF reaper_instance_id = null;

UPDATE reaper_db.running_repairs USING TTL 90
SET reaper_instance_host = 'reaper-host-1',
    reaper_instance_id = 62ce0425-ee46-4cdb-824f-4242ee7f86f4,
    segment_id = 70f52bc2-7519-11eb-809e-9f94505f3a3e
WHERE repair_id = 70f52bc0-7519-11eb-809e-9f94505f3a3e AND node = 'node3'
IF reaper_instance_id = null;

APPLY BATCH;

If all the conditional updates are able to be applied, we’ll get the following data in the table:

cqlsh> select * from reaper_db.running_repairs;

 repair_id                            | node  | reaper_instance_host | reaper_instance_id                   | segment_id
--------------------------------------+-------+----------------------+--------------------------------------+--------------------------------------
 70f52bc0-7519-11eb-809e-9f94505f3a3e | node1 |        reaper-host-1 | 62ce0425-ee46-4cdb-824f-4242ee7f86f4 | 70f52bc2-7519-11eb-809e-9f94505f3a3e
 70f52bc0-7519-11eb-809e-9f94505f3a3e | node2 |        reaper-host-1 | 62ce0425-ee46-4cdb-824f-4242ee7f86f4 | 70f52bc2-7519-11eb-809e-9f94505f3a3e
 70f52bc0-7519-11eb-809e-9f94505f3a3e | node3 |        reaper-host-1 | 62ce0425-ee46-4cdb-824f-4242ee7f86f4 | 70f52bc2-7519-11eb-809e-9f94505f3a3e
 

If one of the conditional updates fails because one node is already locked for the same repair_id, then none will be applied.

Note: the Postgres backend also benefits from these new features through the use of transactions, using commit and rollback to deal with success/failure cases.

The new design is now much simpler than the initial one:

Segment Orchestration post 2.2 design

Segments are now filtered on those that have no replica locked to avoid wasting energy in trying to lock them and the pending compactions check also happens before any locking.

This reduces the number of LWTs by four in the simplest cases and we expect more challenging repairs to benefit from even more reductions:

LWT improvements

At the same time, repair duration on a 9-node cluster showed 15%-20% improvements thanks to the more efficient segment selection.

One prerequisite to make that design efficient was to store the replicas for each segment in the database when the repair run is created. You can now see which nodes are involved for each segment and which datacenter they belong to in the Segments view:

Segments view

Concurrent repairs

Using the repair id as the partition key for the node leader election table gives us another feature that was long awaited: Concurrent repairs.
A node could be locked by different Reaper instances for different repair runs, allowing several repairs to run concurrently on each node. In order to control the level of concurrency, a new setting was introduced in Reaper: maxParallelRepairs
By default it is set to 2 and should be tuned carefully as heavy concurrent repairs could have a negative impact on clusters performance.
If you have small keyspaces that need to be repaired on a regular basis, they won’t be blocked by large keyspaces anymore.

Future upgrades

As some of you are probably aware, JFrog has decided to sunset Bintray and JCenter. Bintray is our main distribution medium and we will be working on replacement repositories. The 2.2.0 release is unaffected by this change but future upgrades could require an update to yum/apt repos. The documentation will be updated accordingly in due time.

Upgrade now

We encourage all Reaper users to upgrade to 2.2.0. It was tested successfully by some of our customers which had issues with LWT pressure and blocking repairs. This version is expected to make repairs faster and more lightweight on the Cassandra backend. We were able to remove a lot of legacy code and design which were fit to single token clusters, but failed at spreading segments efficiently for clusters using vnodes.

The binaries for Reaper 2.2.0 are available from yum, apt-get, Maven Central, Docker Hub, and are also downloadable as tarball packages. Remember to backup your database before starting the upgrade.

All instructions to download, install, configure, and use Reaper 2.2.0 are available on the Reaper website.

Creating Flamegraphs with Apache Cassandra in Kubernetes (cass-operator)

In a previous blog post recommending disabling read repair chance, some flamegraphs were generated to demonstrate the effect read repair chance had on a cluster. Let’s go through how those flamegraphs were captured, step-by-step using Apache Cassandra 3.11.6, Kubernetes and the cass-operator, nosqlbench and the async-profiler.

In previous blog posts we would have used the existing tools of tlp-cluster or ccm, tlp-stress or cassandra-stress, and sjk. Here we take a new approach that is a lot more fun, as with k8s the same approach can be used locally or in the cloud. No need to switch between ccm clusters for local testing and tlp-cluster for cloud testing. Nor are you bound to AWS for big instance testing, that’s right: no vendor lock-in. Cass-operator and K8ssandra is getting a ton of momentum from DataStax, so it is only deserved and exciting to introduce them to as much of the open source world as we can.

This blog post is not an in-depth dive into using cass-operator, rather a simple teaser to demonstrate how we can grab some flamegraphs, as quickly as possible. The blog post is split into three sections

  • Setting up Kubernetes and getting Cassandra running
  • Getting access to Cassandra from outside Kubernetes
  • Stress testing and creating flamegraphs

Setup

Let’s go through a quick demonstration using Kubernetes, the cass-operator, and some flamegraphs.

First, download four yaml configuration files that will be used. This is not strictly necessary for the latter three, as kubectl may reference them by their URLs, but let’s download them for the sake of having the files locally and being able to make edits if and when desired.

wget https://thelastpickle.com/files/2021-01-31-cass_operator/01-kind-config.yaml
wget https://thelastpickle.com/files/2021-01-31-cass_operator/02-storageclass-kind.yaml
wget https://thelastpickle.com/files/2021-01-31-cass_operator/11-install-cass-operator-v1.1.yaml
wget https://thelastpickle.com/files/2021-01-31-cass_operator/13-cassandra-cluster-3nodes.yaml

The next steps involve kind and kubectl to create a local cluster we can test. To use kind you have docker running locally, it is recommended to have 4 CPU and 12GB RAM for this exercise.

kind create cluster --name read-repair-chance-test --config 01-kind-config.yaml

kubectl create ns cass-operator
kubectl -n cass-operator apply -f 02-storageclass-kind.yaml
kubectl -n cass-operator apply -f 11-install-cass-operator-v1.1.yaml

# watch and wait until the pod is running
watch kubectl -n cass-operator get pod

# create 3 node C* cluster
kubectl -n cass-operator apply -f 13-cassandra-cluster-3nodes.yaml

# again, wait for pods to be running
watch kubectl -n cass-operator get pod

# test the three nodes are up
kubectl -n cass-operator exec -it cluster1-dc1-default-sts-0 -- nodetool status

Access

For this example we are going to run NoSqlBench from outside the k8s cluster, so we will need access to a pod’s Native Protocol interface via port-forwarding. This approach is practical here because it was desired to have the benchmark connect to just one coordinator. In many situations you would instead run NoSqlBench from a separate dedicated pod inside the k8s cluster.

# get the cql username
kubectl -n cass-operator get secret cluster1-superuser -o yaml | grep " username" | awk -F" " '{print $2}' | base64 -d && echo ""

# get the cql password
kubectl -n cass-operator get secret cluster1-superuser -o yaml | grep " password" | awk -F" " '{print $2}' | base64 -d && echo ""

# port forward the native protocol (CQL)
kubectl -n cass-operator port-forward --address 0.0.0.0 cluster1-dc1-default-sts-0 9042:9042 

The above sets up the k8s cluster, a k8s storageClass, and the cass-operator with a three node Cassandra cluster. For a more in depth look at this setup checkout this tutorial.

Stress Testing and Flamegraphs

With a cluster to play with, let’s generate some load and then go grab some flamegraphs.

Instead of using SJK (Swiss Java Knife), as our previous blog posts have done, we will use the async-profiler. The async-profiler does not suffer from Safepoint bias problem, an issue we see more often than we would like in Cassandra nodes (protip: make sure you configure ParallelGCThreads and ConcGCThreads to the same value).

Open a new terminal window and do the following.

# get the latest NoSqlBench jarfile
wget https://github.com/nosqlbench/nosqlbench/releases/latest/download/nb.jar

# generate some load, use credentials as found above

java -jar nb.jar cql-keyvalue username=<cql_username> password=<cql_password> whitelist=127.0.0.1 rampup-cycles=10000 main-cycles=500000 rf=3 read_cl=LOCAL_ONE

# while the load is still running,
# open a shell in the coordinator pod, download async-profiler and generate a flamegraph
kubectl -n cass-operator exec -it cluster1-dc1-default-sts-0 -- /bin/bash

wget https://github.com/jvm-profiling-tools/async-profiler/releases/download/v1.8.3/async-profiler-1.8.3-linux-x64.tar.gz

tar xvf async-profiler-1.8.3-linux-x64.tar.gz

async-profiler-1.8.3-linux-x64/profiler.sh -d 300 -f /tmp/flame_away.svg <CASSANDRA_PID>

exit

# copy the flamegraph out of the pod
kubectl -n cass-operator cp cluster1-dc1-default-sts-0:/tmp/flame_away.svg flame_away.svg

Keep It Clean

After everything is done, it is time to clean up after yourself.

Delete the CassandraDatacenters first, otherwise Kubernetes will block deletion because we use a finalizer. Note, this will delete all data in the cluster.

kubectl delete cassdcs --all-namespaces --all

Remove the operator Deployment, CRD, etc.

# this command can take a while, be patient

kubectl delete -f https://raw.githubusercontent.com/datastax/cass-operator/v1.5.1/docs/user/cass-operator-manifests-v1.16.yaml

# if troubleshooting, to forcibly remove resources, though
# this should not be necessary, and take care as this will wipe all resources

kubectl delete "$(kubectl api-resources --namespaced=true --verbs=delete -o name | tr "\n" "," | sed -e 's/,$//')" --all

To remove the local Kubernetes cluster altogether

kind delete cluster --name read-repair-chance-test

To stop and remove the docker containers that are left running…

docker stop $(docker ps | grep kindest | cut -d" " -f1)
docker rm $(docker ps -a | grep kindest | cut -d" " -f1)

More… the cass-operator tutorials

There is a ton of documentation and tutorials getting released on how to use the cass-operator. If you are keen to learn more the following is highly recommended: Managing Cassandra Clusters in Kubernetes Using Cass-Operator.

The Impacts of Changing the Number of VNodes in Apache Cassandra

Apache Cassandra’s default value for num_tokens is about to change in 4.0! This might seem like a small edit note in the CHANGES.txt, however such a change can have a profound effect on day-to-day operations of the cluster. In this post we will examine how changing the value for num_tokens impacts the cluster and its behaviour.

There are many knobs and levers that can be modified in Apache Cassandra to tune its behaviour. The num_tokens setting is one of those. Like many settings it lives in the cassandra.yaml file and has a defined default value. That’s where it stops being like many of Cassandra’s settings. You see, most of Cassandra’s settings will only affect a single aspect of the cluster. However, when changing the value of num_tokens there is an array of behaviours that are altered. The Apache Cassandra project has committed and resolved CASSANDRA-13701 which changed the default value for num_tokens from 256 to 16. This change is significant, and to understand the consequences we first need to understand the role that num_tokens play in the cluster.

Never try this on production

Before we dive into any details it is worth noting that the num_tokens setting on a node should never ever be changed once it has joined the cluster. For one thing the node will fail on a restart. The value of this setting should be the same for every node in a datacenter. Historically, different values were expected for heterogeneous clusters. While it’s rare to see, nor would we recommend, you can still in theory double the num_tokens on nodes that are twice as big in terms of hardware specifications. Furthermore, it is common to see the nodes in a datacenter have a value for num_tokens that differs to nodes in another datacenter. This is partly how changing the value of this setting on a live cluster can be safely done with zero downtime. It is out of scope for this blog post, but details can be found in migration to a new datacenter.

The Basics

The num_tokens setting influences the way Cassandra allocates data amongst the nodes, how that data is retrieved, and how that data is moved between nodes.

Under the hood Cassandra uses a partitioner to decide where data is stored in the cluster. The partitioner is a consistent hashing algorithm that maps a partition key (first part of the primary key) to a token. The token dictates which nodes will contain the data associated with the partition key. Each node in the cluster is assigned one or more unique token values from a token ring. This is just a fancy way of saying each node is assigned a number from a circular number range. That is, “the number” being the token hash, and “a circular number range” being the token ring. The token ring is circular because the next value after the maximum token value is the minimum token value.

An assigned token defines the range of tokens in the token ring the node is responsible for. This is commonly known as a “token range”. The “token range” a node is responsible for is bounded by its assigned token, and the next smallest token value going backwards in the ring. The assigned token is included in the range, and the smallest token value going backwards is excluded from the range. The smallest token value going backwards typically resides on the previous neighbouring node. Having a circular token ring means that the range of tokens a node is responsible for, could include both the minimum and maximum tokens in the ring. In at least one case the smallest token value going backwards will wrap back past the maximum token value in the ring.

For example, in the following Token Ring Assignment diagram we have a token ring with a range of hashes from 0 to 99. Token 10 is allocated to Node 1. The node before Node 1 in the cluster is Node 5. Node 5 is allocated token 90. Therefore, the range of tokens that Node 1 is responsible for is between 91 and 10. In this particular case, the token range wraps around past the maximum token in the ring.

Token ring

Note that the above diagram is for only a single data replica. This is because only a single node is assigned to each token in the token ring. If multiple replicas of the data exists, a node’s neighbours become replicas for the token as well. This is illustrated in the Token Ring Assignment diagram below.

Token ring

The reason the partitioner is defined as a consistent hashing algorithm is because it is just that; no matter how many times you feed in a specific input, it will always generate the same output value. It ensures that every node, coordinator, or otherwise, will always calculate the same token for a given partition key. The calculated token can then be used to reliably pinpoint the nodes with the sought after data.

Consequently, the minimum and maximum numbers for the token ring are defined by the partitioner. The default Murur3Partitioner based on the Murmur hash has for example, a minimum and maximum range of -2^63 to +2^63 - 1. The legacy RandomPartitioner (based on the MD5 hash) on the other hand has a range of 0 to 2^127 - 1. A critical side effect of this system is that once a partitioner for a cluster is picked, it can never be changed. Changing to a different partitioner requires the creation of a new cluster with the desired partitioner and then reloading the data into the new cluster.

Further information on consistent hashing functionality can be found in the Apache Cassandra documentation.

Back in the day…

Back in the pre-1.2 era, nodes could only be manually assigned a single token. This was done and can still be done today using the initial_token setting in the cassandra.yaml file. The default partitioner at that point was the RandomPartitioner. Despite token assignment being manual, the partitioner made the process of calculating the assigned tokens fairly straightforward when setting up a cluster from scratch. For example, if you had a three node cluster you would divide 2^127 - 1 by 3 and the quotient would give you the correct increment amount for each token value. Your first node would have an initial_token of 0, your next node would have an initial_token of (2^127 - 1) / 3, and your third node would have an initial_token of (2^127 - 1) / 3 * 2. Thus, each node will have the same sized token ranges.

Dividing the token ranges up evenly makes it less likely individual nodes are overloaded (assuming identical hardware for the nodes, and an even distribution of data across the cluster). Uneven token distribution can result in what is termed “hot spots”. This is where a node is under pressure as it is servicing more requests or carrying more data than other nodes.

Even though setting up a single token cluster can be a very manual process, their deployment is still common. Especially for very large Cassandra clusters where the node count typically exceeds 1,000 nodes. One of the advantages of this type of deployment, is you can ensure that the token distribution is even.

Although setting up a single token cluster from scratch can result in an even load distribution, growing the cluster is far less straight forward. If you insert a single node into your three node cluster, the result is that two out of the four nodes will have a smaller token range than the other two nodes. To fix this problem and re-balance, you then have to run nodetool move to relocate tokens to other nodes. This is a tedious and expensive task though, involving a lot of streaming around the whole cluster. The alternative is to double the size of your cluster each time you expand it. However, this usually means using more hardware than you need. Much like having an immaculate backyard garden, maintaining an even token range per node in a single token cluster requires time, care, and attention, or alternatively, a good deal of clever automation.

Scaling in a single token world is only half the challenge. Certain failure scenarios heavily reduce time to recovery. Let’s say for example you had a six node cluster with three replicas of the data in a single datacenter (Replication Factor = 3). Replicas might reside on Node 1 and Node 4, Node 2 and Node 5, and lastly on Node 3 and Node 6. In this scenario each node is responsible for a sixth of each of the three replicas.

Six node cluster and three replicas

In the above diagram, the tokens in the token ring are assigned an alpha character. This is to make tracking the token assignment to each node easier to follow. If the cluster had an outage where Node 1 and Node 6 are unavailable, you could only use Nodes 2 and 5 to recover the unique sixth of the data they each have. That is, only Node 2 could be used to recover the data associated with token range ‘F’, and similarly only Node 5 could be used to recover the data associated with token range ‘E’. This is illustrated in the diagram below.

Six node cluster and three replicas failures scenario

vnodes to the rescue

To solve the shortcomings of a single token assignment, Cassandra version 1.2 was enhanced to allow a node to be assigned multiple tokens. That is a node could be responsible for multiple token ranges. This Cassandra feature is known as “virtual node” or vnodes for short. The vnodes feature was introduced via CASSANDRA-4119. As per the ticket description, the goals of vnodes were:

  • Reduced operations complexity for scaling up/down.
  • Reduced rebuild time in event of failure.
  • Evenly distributed load impact in the event of failure.
  • Evenly distributed impact of streaming operations.
  • More viable support for heterogeneity of hardware.

The introduction of this feature gave birth to the num_tokens setting in the cassandra.yaml file. The setting defined the number of vnodes (token ranges) a node was responsible for. By increasing the number of vnodes per node, the token ranges become smaller. This is because the token ring has a finite number of tokens. The more ranges it is divided up into the smaller each range is.

To maintain backwards compatibility with older 1.x series clusters, the num_tokens defaulted to a value of 1. Moreover, the setting was effectively disabled on a vanilla installation. Specifically, the value in the cassandra.yaml file was commented out. The commented line and previous development commits did give a glimpse into the future of where the feature was headed though.

As foretold by the cassandra.yaml file, and the git commit history, when Cassandra version 2.0 was released out the vnodes feature was enabled by default. The num_tokens line was no longer commented out, so its effective default value on a vanilla installation was 256. Thus ushering in a new era of clusters that had relatively even token distributions, and were simple to grow.

With nodes consisting of 256 vnodes and the accompanying additional features, expanding the cluster was a dream. You could insert one new node into your cluster and Cassandra would calculate and assign the tokens automatically! The token values were randomly calculated, and so over time as you added more nodes, the cluster would converge on being in a balanced state. This engineering wizardry put an end to spending hours doing calculations and nodetool move operations to grow a cluster. The option was still there though. If you had a very large cluster or another requirement, you could still use the initial_token setting which was commented out in Cassandra version 2.0. In this case, the value for the num_tokens still had to be set to the number of tokens manually defined in the initial_token setting.

Remember to read the fine print

This gave us a feature that was like a personal devops assistant; you handed them a node, told them to insert it, and then after some time it had tokens allocated and was part of the cluster. However, in a similar vein, there is a price to pay for the convenience…

While we get a more even token distribution when using 256 vnodes, the problem is that availability degrades earlier. Ironically, the more we break the token ranges up the more quickly we can get data unavailability. Then there is the issue of unbalanced token ranges when using a small number of vnodes. By small, I mean values less than 32. Cassandra’s random token allocation is hopeless when it comes to small vnode values. This is because there are insufficient tokens to balance out the wildly different token range sizes that are generated.

Pics or it didn’t happen

It is very easy to demonstrate the availability and token range imbalance issues, using a test cluster. We can set up a single token range cluster with six nodes using ccm. After calculating the tokens, configuring and starting our test cluster, it looked like this.

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  71.17 KiB  1            33.3%             8d483ae7-e7fa-4c06-9c68-22e71b78e91f  rack1
UN  127.0.0.2  65.99 KiB  1            33.3%             cc15803b-2b93-40f7-825f-4e7bdda327f8  rack1
UN  127.0.0.3  85.3 KiB   1            33.3%             d2dd4acb-b765-4b9e-a5ac-a49ec155f666  rack1
UN  127.0.0.4  104.58 KiB  1            33.3%             ad11be76-b65a-486a-8b78-ccf911db4aeb  rack1
UN  127.0.0.5  71.19 KiB  1            33.3%             76234ece-bf24-426a-8def-355239e8f17b  rack1
UN  127.0.0.6  30.45 KiB  1            33.3%             cca81c64-d3b9-47b8-ba03-46356133401b  rack1

We can then create a test keyspace and populated it using cqlsh.

$ ccm node1 cqlsh
Connected to SINGLETOKEN at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.9 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE test_keyspace WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 };
cqlsh> CREATE TABLE test_keyspace.test_table (
...   id int,
...   value text,
...   PRIMARY KEY (id));
cqlsh> CONSISTENCY LOCAL_QUORUM;
Consistency level set to LOCAL_QUORUM.
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (1, 'foo');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (2, 'bar');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (3, 'net');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (4, 'moo');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (5, 'car');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (6, 'set');

To confirm that the cluster is perfectly balanced, we can check the token ring.

$ ccm node1 nodetool ring test_keyspace


Datacenter: datacenter1
==========
Address    Rack   Status  State   Load        Owns     Token
                                                       6148914691236517202
127.0.0.1  rack1  Up      Normal  125.64 KiB  50.00%   -9223372036854775808
127.0.0.2  rack1  Up      Normal  125.31 KiB  50.00%   -6148914691236517206
127.0.0.3  rack1  Up      Normal  124.1 KiB   50.00%   -3074457345618258604
127.0.0.4  rack1  Up      Normal  104.01 KiB  50.00%   -2
127.0.0.5  rack1  Up      Normal  126.05 KiB  50.00%   3074457345618258600
127.0.0.6  rack1  Up      Normal  120.76 KiB  50.00%   6148914691236517202

We can see in the “Owns” column all nodes have 50% ownership of the data. To make the example easier to follow we can manually add a letter representation next to each token number. So the token ranges could be represented in the following way:

$ ccm node1 nodetool ring test_keyspace


Datacenter: datacenter1
==========
Address    Rack   Status  State   Load        Owns     Token                 Token Letter
                                                       6148914691236517202   F
127.0.0.1  rack1  Up      Normal  125.64 KiB  50.00%   -9223372036854775808  A
127.0.0.2  rack1  Up      Normal  125.31 KiB  50.00%   -6148914691236517206  B
127.0.0.3  rack1  Up      Normal  124.1 KiB   50.00%   -3074457345618258604  C
127.0.0.4  rack1  Up      Normal  104.01 KiB  50.00%   -2                    D
127.0.0.5  rack1  Up      Normal  126.05 KiB  50.00%   3074457345618258600   E
127.0.0.6  rack1  Up      Normal  120.76 KiB  50.00%   6148914691236517202   F

We can then capture the output of ccm node1 nodetool describering test_keyspace and change the token numbers to the corresponding letters in the above token ring output.

$ ccm node1 nodetool describering test_keyspace

Schema Version:6256fe3f-a41e-34ac-ad76-82dba04d92c3
TokenRange:
  TokenRange(start_token:A, end_token:B, endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.4], rpc_endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:C, end_token:D, endpoints:[127.0.0.4, 127.0.0.5, 127.0.0.6], rpc_endpoints:[127.0.0.4, 127.0.0.5, 127.0.0.6], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:B, end_token:C, endpoints:[127.0.0.3, 127.0.0.4, 127.0.0.5], rpc_endpoints:[127.0.0.3, 127.0.0.4, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:D, end_token:E, endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.1], rpc_endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:F, end_token:A, endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.3], rpc_endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:E, end_token:F, endpoints:[127.0.0.6, 127.0.0.1, 127.0.0.2], rpc_endpoints:[127.0.0.6, 127.0.0.1, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1)])

Using the above output, specifically the end_token, we can determine all the token ranges assigned to each node. As mentioned earlier, the token range is defined by the values after the previous token (start_token) up to and including the assigned token (end_token). The token ranges assigned to each node looked like this:

Six node cluster and three replicas

In this setup, if node3 and node6 were unavailable, we would lose an entire replica. Even if the application is using a Consistency Level of LOCAL_QUORUM, all the data is still available. We still have two other replicas across the other four nodes.

Now let’s consider the case where our cluster is using vnodes. For example purposes we can set num_tokens to 3. It will give us a smaller number of tokens making for an easier to follow example. After configuring and starting the nodes in ccm, our test cluster initially looked like this.

For the majority of production deployments where the cluster size is less than 500 nodes, it is recommended that you use a larger value for `num_tokens`. Further information can be found in the Apache Cassandra Production Recommendations.
$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  71.21 KiB  3       46.2%             7d30cbd4-8356-4189-8c94-0abe8e4d4d73  rack1
UN  127.0.0.2  66.04 KiB  3       37.5%             16bb0b37-2260-440c-ae2a-08cbf9192f85  rack1
UN  127.0.0.3  90.48 KiB  3       28.9%             dc8c9dfd-cf5b-470c-836d-8391941a5a7e  rack1
UN  127.0.0.4  104.64 KiB  3      20.7%             3eecfe2f-65c4-4f41-bbe4-4236bcdf5bd2  rack1
UN  127.0.0.5  66.09 KiB  3       36.1%             4d5adf9f-fe0d-49a0-8ab3-e1f5f9f8e0a2  rack1
UN  127.0.0.6  71.23 KiB  3       30.6%             b41496e6-f391-471c-b3c4-6f56ed4442d6  rack1

Right off the blocks we can see signs that the cluster might be unbalanced. Similar to what we did with the single node cluster, here we create the test keyspace and populate it using cqlsh. We then grab a read out of the token ring to see what that looks like. Once again, to make the example easier to follow we manually add a letter representation next to each token number.

$ ccm node1 nodetool ring test_keyspace

Datacenter: datacenter1
==========
Address    Rack   Status  State   Load        Owns    Token                 Token Letter
                                                      8828652533728408318   R
127.0.0.5  rack1  Up      Normal  121.09 KiB  41.44%  -7586808982694641609  A
127.0.0.1  rack1  Up      Normal  126.49 KiB  64.03%  -6737339388913371534  B
127.0.0.2  rack1  Up      Normal  126.04 KiB  66.60%  -5657740186656828604  C
127.0.0.3  rack1  Up      Normal  135.71 KiB  39.89%  -3714593062517416200  D
127.0.0.6  rack1  Up      Normal  126.58 KiB  40.07%  -2697218374613409116  E
127.0.0.1  rack1  Up      Normal  126.49 KiB  64.03%  -1044956249817882006  F
127.0.0.2  rack1  Up      Normal  126.04 KiB  66.60%  -877178609551551982   G
127.0.0.4  rack1  Up      Normal  110.22 KiB  47.96%  -852432543207202252   H
127.0.0.5  rack1  Up      Normal  121.09 KiB  41.44%  117262867395611452    I
127.0.0.6  rack1  Up      Normal  126.58 KiB  40.07%  762725591397791743    J
127.0.0.3  rack1  Up      Normal  135.71 KiB  39.89%  1416289897444876127   K
127.0.0.1  rack1  Up      Normal  126.49 KiB  64.03%  3730403440915368492   L
127.0.0.4  rack1  Up      Normal  110.22 KiB  47.96%  4190414744358754863   M
127.0.0.2  rack1  Up      Normal  126.04 KiB  66.60%  6904945895761639194   N
127.0.0.5  rack1  Up      Normal  121.09 KiB  41.44%  7117770953638238964   O
127.0.0.4  rack1  Up      Normal  110.22 KiB  47.96%  7764578023697676989   P
127.0.0.3  rack1  Up      Normal  135.71 KiB  39.89%  8123167640761197831   Q
127.0.0.6  rack1  Up      Normal  126.58 KiB  40.07%  8828652533728408318   R

As we can see from the “Owns” column above, there are some large token range ownership imbalances. The smallest token range ownership is by node 127.0.0.3 at 39.89%. The largest token range ownership is by node 127.0.0.2 at 66.6%. This is about 26% difference!

Once again, we capture the output of ccm node1 nodetool describering test_keyspace and change the token numbers to the corresponding letters in the above token ring.

$ ccm node1 nodetool describering test_keyspace

Schema Version:4b2dc440-2e7c-33a4-aac6-ffea86cb0e21
TokenRange:
    TokenRange(start_token:J, end_token:K, endpoints:[127.0.0.3, 127.0.0.1, 127.0.0.4], rpc_endpoints:[127.0.0.3, 127.0.0.1, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:K, end_token:L, endpoints:[127.0.0.1, 127.0.0.4, 127.0.0.2], rpc_endpoints:[127.0.0.1, 127.0.0.4, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:E, end_token:F, endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.4], rpc_endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:D, end_token:E, endpoints:[127.0.0.6, 127.0.0.1, 127.0.0.2], rpc_endpoints:[127.0.0.6, 127.0.0.1, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:I, end_token:J, endpoints:[127.0.0.6, 127.0.0.3, 127.0.0.1], rpc_endpoints:[127.0.0.6, 127.0.0.3, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:A, end_token:B, endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.3], rpc_endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:R, end_token:A, endpoints:[127.0.0.5, 127.0.0.1, 127.0.0.2], rpc_endpoints:[127.0.0.5, 127.0.0.1, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:M, end_token:N, endpoints:[127.0.0.2, 127.0.0.5, 127.0.0.4], rpc_endpoints:[127.0.0.2, 127.0.0.5, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:H, end_token:I, endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.3], rpc_endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:L, end_token:M, endpoints:[127.0.0.4, 127.0.0.2, 127.0.0.5], rpc_endpoints:[127.0.0.4, 127.0.0.2, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:N, end_token:O, endpoints:[127.0.0.5, 127.0.0.4, 127.0.0.3], rpc_endpoints:[127.0.0.5, 127.0.0.4, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:P, end_token:Q, endpoints:[127.0.0.3, 127.0.0.6, 127.0.0.5], rpc_endpoints:[127.0.0.3, 127.0.0.6, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:Q, end_token:R, endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.1], rpc_endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:F, end_token:G, endpoints:[127.0.0.2, 127.0.0.4, 127.0.0.5], rpc_endpoints:[127.0.0.2, 127.0.0.4, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:C, end_token:D, endpoints:[127.0.0.3, 127.0.0.6, 127.0.0.1], rpc_endpoints:[127.0.0.3, 127.0.0.6, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:G, end_token:H, endpoints:[127.0.0.4, 127.0.0.5, 127.0.0.6], rpc_endpoints:[127.0.0.4, 127.0.0.5, 127.0.0.6], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:B, end_token:C, endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.6], rpc_endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.6], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:O, end_token:P, endpoints:[127.0.0.4, 127.0.0.3, 127.0.0.6], rpc_endpoints:[127.0.0.4, 127.0.0.3, 127.0.0.6], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1)])

Finally, we can determine all the token ranges assigned to each node. The token ranges assigned to each node looked like this:

Six node cluster and three replicas

Using this we can see what happens if we had the same outage as the single token cluster did, that is, node3 and node6 are unavailable. As we can see node3 and node6 are both responsible for tokens C, D, I, J, P, and Q. Hence, data associated with those tokens would be unavailable if our application is using a Consistency Level of LOCAL_QUORUM. To put that in different terms, unlike our single token cluster, in this case 33.3% of our data could no longer be retrieved.

Rack ‘em up

A seasoned Cassandra operator will notice that so far we have run our token distribution tests on clusters with only a single rack. To help increase the availability when using vnodes racks can be deployed. When racks are used Cassandra will try to place single replicas in each rack. That is, it will try to ensure no two identical token ranges appear in the same rack.

The key here is to configure the cluster so that for a given datacenter the number of racks is the same as the replication factor.

Let’s retry our previous example where we set num_tokens to 3, only this time we’ll define three racks in the test cluster. After configuring and starting the nodes in ccm, our newly configured test cluster initially looks like this:

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  71.08 KiB  3       31.8%             49df615d-bfe5-46ce-a8dd-4748c086f639  rack1
UN  127.0.0.2  71.04 KiB  3       34.4%             3fef187e-00f5-476d-b31f-7aa03e9d813c  rack2
UN  127.0.0.3  66.04 KiB  3       37.3%             c6a0a5f4-91f8-4bd1-b814-1efc3dae208f  rack3
UN  127.0.0.4  109.79 KiB  3      52.9%             74ac0727-c03b-476b-8f52-38c154cfc759  rack1
UN  127.0.0.5  66.09 KiB  3       18.7%             5153bad4-07d7-4a24-8066-0189084bbc80  rack2
UN  127.0.0.6  66.09 KiB  3       25.0%             6693214b-a599-4f58-b1b4-a6cf0dd684ba  rack3

We can still see signs that the cluster might be unbalanced. This is a side issue, as the main point to take from the above is that we now have three racks defined in the cluster with two nodes assigned in each. Once again, similar to the single node cluster, we can create the test keyspace and populate it using cqlsh. We then grab a read out of the token ring to see what it looks like. Same as the previous tests, to make the example easier to follow, we manually add a letter representation next to each token number.

ccm node1 nodetool ring test_keyspace


Datacenter: datacenter1
==========
Address    Rack   Status  State   Load        Owns    Token                 Token Letter
                                                      8993942771016137629   R
127.0.0.5  rack2  Up      Normal  122.42 KiB  34.65%  -8459555739932651620  A
127.0.0.4  rack1  Up      Normal  111.07 KiB  53.84%  -8458588239787937390  B
127.0.0.3  rack3  Up      Normal  116.12 KiB  60.72%  -8347996802899210689  C
127.0.0.1  rack1  Up      Normal  121.31 KiB  46.16%  -5712162437894176338  D
127.0.0.4  rack1  Up      Normal  111.07 KiB  53.84%  -2744262056092270718  E
127.0.0.6  rack3  Up      Normal  122.39 KiB  39.28%  -2132400046698162304  F
127.0.0.2  rack2  Up      Normal  121.42 KiB  65.35%  -1232974565497331829  G
127.0.0.4  rack1  Up      Normal  111.07 KiB  53.84%  1026323925278501795   H
127.0.0.2  rack2  Up      Normal  121.42 KiB  65.35%  3093888090255198737   I
127.0.0.2  rack2  Up      Normal  121.42 KiB  65.35%  3596129656253861692   J
127.0.0.3  rack3  Up      Normal  116.12 KiB  60.72%  3674189467337391158   K
127.0.0.5  rack2  Up      Normal  122.42 KiB  34.65%  3846303495312788195   L
127.0.0.1  rack1  Up      Normal  121.31 KiB  46.16%  4699181476441710984   M
127.0.0.1  rack1  Up      Normal  121.31 KiB  46.16%  6795515568417945696   N
127.0.0.3  rack3  Up      Normal  116.12 KiB  60.72%  7964270297230943708   O
127.0.0.5  rack2  Up      Normal  122.42 KiB  34.65%  8105847793464083809   P
127.0.0.6  rack3  Up      Normal  122.39 KiB  39.28%  8813162133522758143   Q
127.0.0.6  rack3  Up      Normal  122.39 KiB  39.28%  8993942771016137629   R

Once again we capture the output of ccm node1 nodetool describering test_keyspace and change the token numbers to the corresponding letters in the above token ring.

$ ccm node1 nodetool describering test_keyspace

Schema Version:aff03498-f4c1-3be1-b133-25503becf208
TokenRange:
    TokenRange(start_token:B, end_token:C, endpoints:[127.0.0.3, 127.0.0.1, 127.0.0.2], rpc_endpoints:[127.0.0.3, 127.0.0.1, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:L, end_token:M, endpoints:[127.0.0.1, 127.0.0.3, 127.0.0.5], rpc_endpoints:[127.0.0.1, 127.0.0.3, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:N, end_token:O, endpoints:[127.0.0.3, 127.0.0.5, 127.0.0.4], rpc_endpoints:[127.0.0.3, 127.0.0.5, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:P, end_token:Q, endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.4], rpc_endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:K, end_token:L, endpoints:[127.0.0.5, 127.0.0.1, 127.0.0.3], rpc_endpoints:[127.0.0.5, 127.0.0.1, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3)])
    TokenRange(start_token:R, end_token:A, endpoints:[127.0.0.5, 127.0.0.4, 127.0.0.3], rpc_endpoints:[127.0.0.5, 127.0.0.4, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3)])
    TokenRange(start_token:I, end_token:J, endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.1], rpc_endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:Q, end_token:R, endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.4], rpc_endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:E, end_token:F, endpoints:[127.0.0.6, 127.0.0.2, 127.0.0.4], rpc_endpoints:[127.0.0.6, 127.0.0.2, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:H, end_token:I, endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.1], rpc_endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:D, end_token:E, endpoints:[127.0.0.4, 127.0.0.6, 127.0.0.2], rpc_endpoints:[127.0.0.4, 127.0.0.6, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:A, end_token:B, endpoints:[127.0.0.4, 127.0.0.3, 127.0.0.2], rpc_endpoints:[127.0.0.4, 127.0.0.3, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:C, end_token:D, endpoints:[127.0.0.1, 127.0.0.6, 127.0.0.2], rpc_endpoints:[127.0.0.1, 127.0.0.6, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:F, end_token:G, endpoints:[127.0.0.2, 127.0.0.4, 127.0.0.3], rpc_endpoints:[127.0.0.2, 127.0.0.4, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3)])
    TokenRange(start_token:O, end_token:P, endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.4], rpc_endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:J, end_token:K, endpoints:[127.0.0.3, 127.0.0.5, 127.0.0.1], rpc_endpoints:[127.0.0.3, 127.0.0.5, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:G, end_token:H, endpoints:[127.0.0.4, 127.0.0.2, 127.0.0.3], rpc_endpoints:[127.0.0.4, 127.0.0.2, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3)])
    TokenRange(start_token:M, end_token:N, endpoints:[127.0.0.1, 127.0.0.3, 127.0.0.5], rpc_endpoints:[127.0.0.1, 127.0.0.3, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2)])

Lastly, we once again determine all the token ranges assigned to each node:

Six node cluster and three replicas

As we can see from the way Cassandra has assigned the tokens, there is now a complete data replica spread across two nodes in each of our three racks. If we go back to our failure scenario where node3 and node6 become unavailable, we can still service queries using a Consistency Level of LOCAL_QUORUM. The only elephant in the room here is node3 has a lot more tokens distributed to it than other nodes. Its counterpart in the same rack, node6, is at the opposite end with fewer tokens allocated to it.

Too many vnodes spoil the cluster

Given the token distribution issues with a low numbers of vnodes, one would think the best option is to have a large vnode value. However, apart from having a higher chance of some data being unavailable in a multi-node outage, large vnode values also impact streaming operations. To repair data on a node, Cassandra will start one repair session per vnode. These repair sessions need to be processed sequentially. Hence, the larger the vnode value the longer the repair times, and the overhead needed to run a repair.

In an effort to fix slow repair times as a result of large vnode values, CASSANDRA-5220 was introduced in 3.0. This change allows Cassandra to group common token ranges for a set of nodes into a single repair session. It increased the size of the repair session as multiple token ranges were being repaired, but reduced the number of repair sessions being executed in parallel.

We can see the effect that vnodes have on repair by running a simple test on a cluster backed by real hardware. To do this test we first need create a cluster that uses single tokens run a repair. Then we can create the same cluster except with 256 vnodes, and run the same repair. We will use tlp-cluster to create a Cassandra cluster in AWS with the following properties.

  • Instance size: i3.2xlarge
  • Node count: 12
  • Rack count: 3 (4 nodes per rack)
  • Cassandra version: 3.11.9 (latest stable release at the time of writing)

The commands to build this cluster are as follows.

$ tlp-cluster init --azs a,b,c --cassandra 12 --instance i3.2xlarge --stress 1 TLP BLOG "Blogpost repair testing"
$ tlp-cluster up
$ tlp-cluster use --config "cluster_name:SingleToken" --config "num_tokens:1" 3.11.9
$ tlp-cluster install

Once we provision the hardware we set the initial_token property for each of the nodes individually. We can calculate the initial tokens for each node using a simple Python command.

Python 2.7.16 (default, Nov 23 2020, 08:01:20)
[GCC Apple LLVM 12.0.0 (clang-1200.0.30.4) [+internal-os, ptrauth-isa=sign+stri on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> num_tokens = 1
>>> num_nodes = 12
>>> print("\n".join(['[Node {}] initial_token: {}'.format(n + 1, ','.join([str(((2**64 / (num_tokens * num_nodes)) * (t * num_nodes + n)) - 2**63) for t in range(num_tokens)])) for n in range(num_nodes)]))
[Node 1] initial_token: -9223372036854775808
[Node 2] initial_token: -7686143364045646507
[Node 3] initial_token: -6148914691236517206
[Node 4] initial_token: -4611686018427387905
[Node 5] initial_token: -3074457345618258604
[Node 6] initial_token: -1537228672809129303
[Node 7] initial_token: -2
[Node 8] initial_token: 1537228672809129299
[Node 9] initial_token: 3074457345618258600
[Node 10] initial_token: 4611686018427387901
[Node 11] initial_token: 6148914691236517202
[Node 12] initial_token: 7686143364045646503

After starting Cassandra on all the nodes, around 3 GB of data per node can be preloaded using the following tlp-stress command. In this command we set our keyspace replication factor to 3 and set gc_grace_seconds to 0. This is done to make hints expire immediately when they are created, which ensures they are never delivered to the destination node.

ubuntu@ip-172-31-19-180:~$ tlp-stress run KeyValue --replication "{'class': 'NetworkTopologyStrategy', 'us-west-2':3 }" --cql "ALTER TABLE tlp_stress.keyvalue WITH gc_grace_seconds = 0" --reads 1 --partitions 100M --populate 100M --iterations 1

Upon completion of the data loading, the cluster status looks like this.

ubuntu@ip-172-31-30-95:~$ nodetool status
Datacenter: us-west-2
=====================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.30.95   2.78 GiB   1            25.0%             6640c7b9-c026-4496-9001-9d79bea7e8e5  2a
UN  172.31.31.106  2.79 GiB   1            25.0%             ceaf9d56-3a62-40be-bfeb-79a7f7ade402  2a
UN  172.31.2.74    2.78 GiB   1            25.0%             4a90b071-830e-4dfe-9d9d-ab4674be3507  2c
UN  172.31.39.56   2.79 GiB   1            25.0%             37fd3fe0-598b-428f-a84b-c27fc65ee7d5  2b
UN  172.31.31.184  2.78 GiB   1            25.0%             40b4e538-476a-4f20-a012-022b10f257e9  2a
UN  172.31.10.87   2.79 GiB   1            25.0%             fdccabef-53a9-475b-9131-b73c9f08a180  2c
UN  172.31.18.118  2.79 GiB   1            25.0%             b41ab8fe-45e7-4628-94f0-a4ec3d21f8d0  2a
UN  172.31.35.4    2.79 GiB   1            25.0%             246bf6d8-8deb-42fe-bd11-05cca8f880d7  2b
UN  172.31.40.147  2.79 GiB   1            25.0%             bdd3dd61-bb6a-4849-a7a6-b60a2b8499f6  2b
UN  172.31.13.226  2.79 GiB   1            25.0%             d0389979-c38f-41e5-9836-5a7539b3d757  2c
UN  172.31.5.192   2.79 GiB   1            25.0%             b0031ef9-de9f-4044-a530-ffc67288ebb6  2c
UN  172.31.33.0    2.79 GiB   1            25.0%             da612776-4018-4cb7-afd5-79758a7b9cf8  2b

We can then run a full repair on each node using the following commands.

$ source env.sh
$ c_all "nodetool repair -full tlp_stress"

The repair times recorded for each node were.

[2021-01-22 20:20:13,952] Repair command #1 finished in 3 minutes 55 seconds
[2021-01-22 20:23:57,053] Repair command #1 finished in 3 minutes 36 seconds
[2021-01-22 20:27:42,123] Repair command #1 finished in 3 minutes 32 seconds
[2021-01-22 20:30:57,654] Repair command #1 finished in 3 minutes 21 seconds
[2021-01-22 20:34:27,740] Repair command #1 finished in 3 minutes 17 seconds
[2021-01-22 20:37:40,449] Repair command #1 finished in 3 minutes 23 seconds
[2021-01-22 20:41:32,391] Repair command #1 finished in 3 minutes 36 seconds
[2021-01-22 20:44:52,917] Repair command #1 finished in 3 minutes 25 seconds
[2021-01-22 20:47:57,729] Repair command #1 finished in 2 minutes 58 seconds
[2021-01-22 20:49:58,868] Repair command #1 finished in 1 minute 58 seconds
[2021-01-22 20:51:58,724] Repair command #1 finished in 1 minute 53 seconds
[2021-01-22 20:54:01,100] Repair command #1 finished in 1 minute 50 seconds

These times give us a total repair time of 36 minutes and 44 seconds.

The same cluster can be reused to test repair times when 256 vnodes are used. To do this we execute the following steps.

  • Shut down Cassandra on all the nodes.
  • Delete the contents in each of the directories data, commitlog, hints, and saved_caches (these are located in /var/lib/cassandra/ on each node).
  • Set num_tokens in the cassandra.yaml configuration file to a value of 256 and remove the initial_token setting.
  • Start up Cassandra on all the nodes.

After populating the cluster with data its status looked like this.

ubuntu@ip-172-31-30-95:~$ nodetool status
Datacenter: us-west-2
=====================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.30.95   2.79 GiB   256          24.3%             10b0a8b5-aaa6-4528-9d14-65887a9b0b9c  2a
UN  172.31.2.74    2.81 GiB   256          24.4%             a748964d-0460-4f86-907d-a78edae2a2cb  2c
UN  172.31.31.106  3.1 GiB    256          26.4%             1fc68fbd-335d-4689-83b9-d62cca25c88a  2a
UN  172.31.31.184  2.78 GiB   256          23.9%             8a1b25e7-d2d8-4471-aa76-941c2556cc30  2a
UN  172.31.39.56   2.73 GiB   256          23.5%             3642a964-5d21-44f9-b330-74c03e017943  2b
UN  172.31.10.87   2.95 GiB   256          25.4%             540a38f5-ad05-4636-8768-241d85d88107  2c
UN  172.31.18.118  2.99 GiB   256          25.4%             41b9f16e-6e71-4631-9794-9321a6e875bd  2a
UN  172.31.35.4    2.96 GiB   256          25.6%             7f62d7fd-b9c2-46cf-89a1-83155feebb70  2b
UN  172.31.40.147  3.26 GiB   256          27.4%             e17fd867-2221-4fb5-99ec-5b33981a05ef  2b
UN  172.31.13.226  2.91 GiB   256          25.0%             4ef69969-d9fe-4336-9618-359877c4b570  2c
UN  172.31.33.0    2.74 GiB   256          23.6%             298ab053-0c29-44ab-8a0a-8dde03b4f125  2b
UN  172.31.5.192   2.93 GiB   256          25.2%             7c690640-24df-4345-aef3-dacd6643d6c0  2c

When we run the same repair test for the single token cluster on the vnode cluster, the following repair times were recorded.

[2021-01-22 22:45:56,689] Repair command #1 finished in 4 minutes 40 seconds
[2021-01-22 22:50:09,170] Repair command #1 finished in 4 minutes 6 seconds
[2021-01-22 22:54:04,820] Repair command #1 finished in 3 minutes 43 seconds
[2021-01-22 22:57:26,193] Repair command #1 finished in 3 minutes 27 seconds
[2021-01-22 23:01:23,554] Repair command #1 finished in 3 minutes 44 seconds
[2021-01-22 23:04:40,523] Repair command #1 finished in 3 minutes 27 seconds
[2021-01-22 23:08:20,231] Repair command #1 finished in 3 minutes 23 seconds
[2021-01-22 23:11:01,230] Repair command #1 finished in 2 minutes 45 seconds
[2021-01-22 23:13:48,682] Repair command #1 finished in 2 minutes 40 seconds
[2021-01-22 23:16:23,630] Repair command #1 finished in 2 minutes 32 seconds
[2021-01-22 23:18:56,786] Repair command #1 finished in 2 minutes 26 seconds
[2021-01-22 23:21:38,961] Repair command #1 finished in 2 minutes 30 seconds

These times give us a total repair time of 39 minutes and 23 seconds.

While the time difference is quite small for 3 GB of data per node (up to an additional 45 seconds per node), it is easy to see how the difference could balloon out when we have data sizes in the order of hundreds of gigabytes per node.

Unfortunately, all data streaming operations like bootstrap and datacenter rebuild fall victim to the same issue repairs have with large vnode values. Specifically, when a node needs to stream data to another node a streaming session is opened for each token range on the node. This results in a lot of unnecessary overhead, as data is transferred via the JVM.

Secondary indexes impacted too

To add insult to injury, the negative effect of a large vnode values extends to secondary indexes because of the way the read path works.

When a coordinator node receives a secondary index request from a client, it fans out the request to all the nodes in the cluster or datacenter depending on the locality of the consistency level. Each node then checks the SSTables for each of the token ranges assigned to it for a match to the secondary index query. Matches to the query are then returned to the coordinator node.

Hence, the larger the number of vnodes, the larger the impact to the responsiveness of the secondary index query. Furthermore, the performance impacts on secondary indexes grow exponentially with the number of replicas in the cluster. In a scenario where multiple datacenters have nodes using many vnodes, secondary indexes become even more inefficient.

A new hope

So what we are left with then is a property in Cassandra that really hits the mark in terms of reducing the complexities when resizing a cluster. Unfortunately, their benefits come at the expense of unbalanced token ranges on one end, and degraded operations performance at the other. That being said, the vnodes story is far from over.

Eventually, it became a well-known fact in the Apache Cassandra project that large vnode values had undesirable side effects on a cluster. To combat this issue, clever contributors and committers added CASSANDRA-7032 in 3.0; a replica aware token allocation algorithm. The idea was to allow a low value to be used for num_tokens while maintaining relatively even balanced token ranges. The enhancement includes the addition of the allocate_tokens_for_keyspace setting in the cassandra.yaml file. The new algorithm is used instead of the random token allocator when an existing user keyspace is assigned to the allocate_tokens_for_keyspace setting.

Behind the scenes, Cassandra takes the replication factor of the defined keyspace and uses it when calculating the token values for the node when it first enters the cluster. Unlike the random token generator, the replica aware generator is like an experienced member of a symphony orchestra; sophisticated and in tune with its surroundings. So much so, that the process it uses to generate token ranges involves:

  • Constructing an initial token ring state.
  • Computing candidates for new tokens by splitting all existing token ranges right in the middle.
  • Evaluating the expected improvements from all candidates and forming a priority queue.
  • Iterating through the candidates in the queue and selecting the best combination.
    • During token selection, re-evaluate the candidate improvements in the queue.

While this was good advancement for Cassandra, there are a few gotchas to watch out for when using the replica aware token allocation algorithm. To start with, it only works with the Murmur3Partitioner partitioner. If you started with an old cluster that used another partitioner such as the RandomPartitioner and have upgraded over time to 3.0, the feature is unusable. The second and more common stumbling block is that some trickery is required to use this feature when creating a cluster from scratch. The question was common enough that we wrote a blog post specifically on how to use the new replica aware token allocation algorithm to set up a new cluster with even token distribution.

As you can see, Cassandra 3.0 made a genuine effort to address vnode’s rough edges. What’s more, there are additional beacons of light on the horizon with the upcoming Cassandra 4.0 major release. For instance, a new allocate_tokens_for_local_replication_factor setting has been added to the cassandra.yaml file via CASSANDRA-15260. Similar to its cousin the allocate_tokens_for_keyspace setting, the replica aware token allocation algorithm is activated when a value is supplied to it.

However, unlike its close relative, it is more user-friendly. This is because no phaffing is required to create a balanced cluster from scratch. In the simplest case, you can set a value for the allocate_tokens_for_local_replication_factor setting and just start adding nodes. Advanced operators can still manually assign tokens to the initial nodes to ensure the desired replication factor is met. After that, subsequent nodes can be added with the replication factor value assigned to the allocate_tokens_for_local_replication_factor setting.

Arguably, one of the longest time coming and significant changes to be released with Cassandra 4.0 is the update to the default value of the num_tokens setting. As mentioned at the beginning of this post thanks to CASSANDRA-13701 Cassandra 4.0 will ship with a num_tokens value set to 16 in the cassandra.yaml file. In addition, the allocate_tokens_for_local_replication_factor setting is enabled by default and set to a value of 3.

These changes are much better user defaults. On a vanilla installation of Cassandra 4.0, the replica aware token allocation algorithm kicks in as soon as there are enough hosts to satisfy a replication factor of 3. The result is an evenly distributed token ranges for new nodes with all the benefits that a low vnodes value has to offer.

Conclusion

The consistent hashing and token allocation functionality form part of Cassandra’s backbone. Virtual nodes take the guess work out of maintaining this critical functionality, specifically, making cluster resizing quicker and easier. As a rule of thumb, the lower the number of vnodes, the less even the token distribution will be, leading to some nodes being over worked. Alternatively, the higher the number of vnodes, the slower cluster wide operations take to complete and more likely data will be unavailable if multiple nodes are down. The features in 3.0 and the enhancements to those features thanks to 4.0, allow Cassandra to use a low number of vnodes while still maintaining a relatively even token distribution. Ultimately, it will produce a better out-of-the-box experience for new users when running a vanilla installation of Cassandra 4.0.

Get Rid of Read Repair Chance

Apache Cassandra has a feature called Read Repair Chance that we always recommend our clients to disable. It is often an additional ~20% internal read load cost on your cluster that serves little purpose and provides no guarantees.

What is read repair chance?

The feature comes with two schema options at the table level: read_repair_chance and dclocal_read_repair_chance. Each representing the probability that the coordinator node will query the extra replica nodes, beyond the requested consistency level, for the purpose of read repairs.

The original setting read_repair_chance now defines the probability of issuing the extra queries to all replicas in all data centers. And the newer dclocal_read_repair_chance setting defines the probability of issuing the extra queries to all replicas within the current data center.

The default values are read_repair_chance = 0.0 and dclocal_read_repair_chance = 0.1. This means that cross-datacenter asynchronous read repair is disabled and asynchronous read repair within the datacenter occurs on 10% of read requests.

What does it cost?

Consider the following cluster deployment:

  • A keyspace with a replication factor of three (RF=3) in a single data center
  • The default value of dclocal_read_repair_chance = 0.1
  • Client reads using a consistency level of LOCAL_QUORUM
  • Client is using the token aware policy (default for most drivers)

In this setup, the cluster is going to see ~10% of the read requests result in the coordinator issuing two messaging system queries to two replicas, instead of just one. This results in an additional ~5% load.

If the requested consistency level is LOCAL_ONE, which is the default for the java-driver, then ~10% of the read requests result in the coordinator increasing messaging system queries from zero to two. This equates to a ~20% read load increase.

With read_repair_chance = 0.1 and multiple datacenters the situation is much worse. With three data centers each with RF=3, then 10% of the read requests will result in the coordinator issuing eight extra replica queries. And six of those extra replica queries are now via cross-datacenter queries. In this use-case it becomes a doubling of your read load.

Let’s take a look at this with some flamegraphs…

The first flamegraph shows the default configuration of dclocal_read_repair_chance = 0.1. When the coordinator’s code hits the AbstractReadExecutor.getReadExecutor(..) method, it splits paths depending on the ReadRepairDecision for the table. Stack traces containing either AlwaysSpeculatingReadExecutor, SpeculatingReadExecutor or NeverSpeculatingReadExecutor provide us a hint to which code path we are on, and whether either the read repair chance or speculative retry are in play.

Async Read Repairs in the default configuration

The second flamegraph shows the behaviour when the configuration has been changed to dclocal_read_repair_chance = 0.0. The AlwaysSpeculatingReadExecutor flame is gone and this demonstrates the degree of complexity removed from runtime. Specifically, read requests from the client are now forwarded to every replica instead of only those defined by the consistency level.

No Read Repairs

ℹ️   These flamegraphs were created with Apache Cassandra 3.11.9, Kubernetes and the cass-operator, nosqlbench and the async-profiler.

Previously we relied upon the existing tools of tlp-cluster, ccm, tlp-stress and cassandra-stress. This new approach with new tools is remarkably easy, and by using k8s the same approach can be used locally or against a dedicated k8s infrastructure. That is, I don't need to switch between ccm clusters for local testing and tlp-cluster for cloud testing. The same recipe applies everywhere. Nor am I bound to AWS for my cloud testing. It is also worth mentioning that these new tools are gaining a lot of focus and momentum from DataStax, so the introduction of this new approach to the open source community is deserved.

The full approach and recipe to generating these flamegraphs will follow in a [subsequent blog post](/blog/2021/01/31/cassandra_and_kubernetes_cass_operator.html).

What is the benefit of this additional load?

The coordinator returns the result to the client once it has received the response from one of the replicas, per the user’s requested consistency level. This is why we call the feature asynchronous read repairs. This means that read latencies are not directly impacted though the additional background load will indirectly impact latencies.

Asynchronous read repairs also means that there’s no guarantee that the response to the client is repaired data. In summary, 10% of the data you read will be guaranteed to be repaired after you have read it. This is not a guarantee clients can use or rely upon. And it is not a guarantee Cassandra operators can rely upon to ensure data at rest is consistent. In fact it is not a guarantee an operator would want to rely upon anyway, as most inconsistencies are dealt with by hints and nodes down longer than the hint window are expected to be manually repaired.

Furthermore, systems that use strong consistency (i.e. where reads and writes are using quorum consistency levels) will not expose such unrepaired data anyway. Such systems only need repairs and consistent data on disk for lower read latencies (by avoiding the additional digest mismatch round trip between coordinator and replicas) and ensuring deleted data is not resurrected (i.e. tombstones are properly propagated).

So the feature gives us additional load for no usable benefit. This is why disabling the feature is always an immediate recommendation we give everyone.

It is also the rationale for the feature being removed altogether in the next major release, Cassandra version 4.0. And, since 3.0.17 and 3.11.3, if you still have values set for these properties in your table, you may have noticed the following warning during startup:

dclocal_read_repair_chance table option has been deprecated and will be removed in version 4.0

Get Rid of It

For Cassandra clusters not yet on version 4.0, do the following to disable all asynchronous read repairs:

cqlsh -e 'ALTER TABLE <keyspace_name>.<table_name> WITH read_repair_chance = 0.0 AND dclocal_read_repair_chance = 0.0;'

When upgrading to Cassandra 4.0 no action is required, these settings are ignored and disappear.

Renaming and reshaping Scylla tables using scylla-migrator

We have recently faced a problem where some of the first Scylla tables we created on our main production cluster were not in line any more with the evolved s...

Python scylla-driver: how we unleashed the Scylla monster's performance

At Scylla summit 2019 I had the chance to meet Israel Fruchter and we dreamed of working on adding **shard...

Scylla Summit 2019

I've had the pleasure to attend again and present at the Scylla Summit in San Francisco and the honor to be awarded the...

Scylla: four ways to optimize your disk space consumption

We recently had to face free disk space outages on some of our scylla clusters and we learnt some very interesting things while outlining some improvements t...

Scylla Summit 2018 write-up

It's been almost one month since I had the chance to attend and speak at Scylla Summit 2018 so I'm reliev...

Authenticating and connecting to a SSL enabled Scylla cluster using Spark 2

This quick article is a wrap up for reference on how to connect to ScyllaDB using Spark 2 when authentication and SSL are enforced for the clients on the...

A botspot story

I felt like sharing a recent story that allowed us identify a bot in a haystack thanks to Scylla.

...

Evaluating ScyllaDB for production 2/2

In my previous blog post, I shared [7 lessons on our experience in evaluating Scylla](https://www.ultrabug.fr...

Evaluating ScyllaDB for production 1/2

I have recently been conducting a quite deep evaluation of ScyllaDB to find out if we could benefit from this database in some of...

Step by Step Guide to Installing and Configuring Spark 2.0 to Connect with Cassandra 3.x

In this guide, we will be installing Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program. If you have any of these software packages installed and configured already you can skip that step. This guide assumes you have a Cassandra 3.x cluster that is already up and running. For more information on installing and using Cassandra visit http://cassandra.apache.org/.Note: The following steps should be performed on all the nodes in the cluster unless otherwise noted.

Install Scala 2.11

Ensure you have Java installed

$ java -version java version "1.8.0_91" Java(TM) SE Runtime Environment (build 1.8.0_91-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

If you don't have Java installed follow this 
tutorial to get Java 8 installed.
Install Scala 2.11.8

$ wget www.scala-lang.org/files/archive/scala-2.11.8.deb $ sudo dpkg -i scala-2.11.8.deb $ scala -version

Install SBT 0.13


echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list $ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823 $ sudo apt-get update $ sudo apt-get install sbt

Install Spark 2.0

Download Spark 2.0 from https://spark.apache.org/downloads.html and unpack the tar file

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz $ tar zxf spark-2.0.2-bin-hadoop2.7.tgz $ sudo mv spark-2.0.2-bin-hadoop2.7/ /usr/local/spark/

Update system variables

$ sudo nano /etc/environment

Add an environment variable called SPARK_HOME

export SPARK_HOME=/usr/local/spark

At the end of the PATH variable add
$SPARK_HOME/bin

PATH="<previous_entries>:/usr/local/spark/bin"

Refresh the environment

source /etc/environment

Create spark user and make it the owner of the SPARK_HOME
directory

$ sudo adduser spark --system --home /usr/local/spark/ --disabled-password $ sudo chown -R spark:root /usr/local/spark

Create Log and Pid directories

$ sudo mkdir /var/log/spark $ sudo chown spark:root /var/log/spark $ sudo -u spark mkdir $SPARK_HOME/run

Create the Spark Configuration files

Create the Spark Configuration files by copying the templates

$ sudo cp /usr/local/spark/conf/spark-env.sh.template /usr/local/spark/conf/spark-env.sh $ sudo cp /usr/local/spark/conf/spark-defaults.conf.template /usr/local/spark/conf/spark-defaults.conf $ sudo chown spark:root /usr/local/spark/conf/spark-*

Edit the Spark Environment file spark-env.sh

export SPARK_LOG_DIR=/var/log/spark export SPARK_PID_DIR=${SPARK_HOME}/run

Configure Spark nodes to join cluster

If you will not be managing Spark using the Mesos or YARN cluster managers, you'll be running Spark in what is called "Standalone Mode".
In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then on every worker node you must edit the /etc/spark/conf/spark-env.sh to point to the host where the Spark Master runs.

# Options for the daemons used in the standalone deploy mode export SPARK_MASTER_HOST=<spark_master_ip_or_hostname_here>

You can also change other elements of the default configuration by
editing the /etc/spark/conf/spark-env.sh. Some other
configs to consider are:
  • SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
  • SPARK_WORKER_CORES, to set the number of cores to use on this machine
  • SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
  • SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
  • SPARK_WORKER_INSTANCE, to set the number of worker processes per node
  • SPARK_WORKER_DIR, to set the working directory of worker processes

Installing Spark as a service

Run the following commands to create a service for the spark-master and spark-worker

$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-master $ sudo chmod 0755 /etc/init.d/spark-master $ sudo cp /etc/init.d/skeleton /etc/init.d/spark-worker $ sudo chmod 0755 /etc/init.d/spark-worker $ sudo update-rc.d spark-master defaults 99 $ sudo update-rc.d spark-worker defaults 99

Edit the /etc/init.d/spark-worker file. If a variable
or function already exists then replace it with the text below.

DESC="Spark Worker" NAME=spark-worker SPARK_HOME=/usr/local/spark PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.worker.Worker-1.pid export SPARK_HOME

# Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0

if [ -f $SPARK_HOME/conf/spark-env.sh ];then         . $SPARK_HOME/conf/spark-env.sh else         echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi

# # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() {     CMD_PATT="org.apache.spark.deploy.worker.Worker"     if [ -f $PIDFILE ]; then         pid=`cat $PIDFILE`         grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0         return 1     fi     return 3 }

# # Function that starts the daemon/service # do_start() {         # Return         #   0 if daemon has been started         #   1 if daemon was already running         #   2 if daemon could not be started

        [ -e `dirname "$PIDFILE"` ] || \                 install -d -ospark -groot -m755 `dirname $PIDFILE`

        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE  \                 --exec $SPARK_HOME/sbin/start-slave.sh  \                 --test > /dev/null \                 || return 1         start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \                 --exec $SPARK_HOME/sbin/start-slave.sh -- spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \                 || return 2

}

# # Function that stops the daemon/service # do_stop() {         start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE         RETVAL="$?"         rm -f $PIDFILE         return "$RETVAL" }

# # Function that sends a SIGHUP to the daemon/service # do_reload() {         #         # If the daemon can reload its configuration without         # restarting (for example, when it is sent a SIGHUP),         # then implement that here.         #         start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE         return 0 }

... status)       is_running       stat=$?       case "$stat" in               0) log_success_msg "$DESC is running" ;;               1) log_failure_msg "could not access pidfile for $DESC" ;;               *) log_success_msg "$DESC is not running" ;;       esac       exit "$stat"       ;; ...

Edit the /etc/init.d/spark-master file. If a variable
or function already exists then replace it with the text below.

DESC="Spark Master" NAME=spark-master SPARK_HOME=/usr/local/spark PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.master.Master-1.pid export SPARK_HOME

# Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0

if [ -f $SPARK_HOME/conf/spark-env.sh ];then         . $SPARK_HOME/conf/spark-env.sh else         echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi

# # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() {     CMD_PATT="org.apache.spark.deploy.master.Master"     if [ -f $PIDFILE ]; then         pid=`cat $PIDFILE`         grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0         return 1     fi     return 3 }

# # Function that starts the daemon/service # do_start() {         # Return         #   0 if daemon has been started         #   1 if daemon was already running         #   2 if daemon could not be started

        [ -e `dirname "$PIDFILE"` ] || \                 install -d -ospark -groot -m755 `dirname $PIDFILE`

        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh --test > /$                 || return 1         start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh  \                 || return 2 }

# # Function that stops the daemon/service # do_stop() {         start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE

        RETVAL="$?"         rm -f $PIDFILE         return "$RETVAL" }

# # Function that sends a SIGHUP to the daemon/service # do_reload() {         #         # If the daemon can reload its configuration without         # restarting (for example, when it is sent a SIGHUP),         # then implement that here.         #         start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE         return 0 }

... status)       is_running       stat=$?       case "$stat" in               0) log_success_msg "$DESC is running" ;;               1) log_failure_msg "could not access pidfile for $DESC" ;;               *) log_success_msg "$DESC is not running" ;;       esac       exit "$stat"       ;; ...

Running Spark as a service

Start the Spark master node first. On whichever node you've selected to be master, run:

$ sudo service spark-master start

On all the other nodes, start the workers:

$ sudo service spark-worker start

To stop Spark, run the following commands on the appropriate
nodes

$ sudo service spark-worker stop $ sudo service spark-master stop

Service logs will be stored in /var/log/spark.

Testing the Spark service

To test the Spark service, start spark-shell on one of the nodes.

$ spark-shell --master spark://<IP>:<Port>

When the prompt comes up, execute the following line of code:

$ scala> sc.parallelize( 1 to 1000 ).sum()

Get Spark-Cassandra-Connector on the client

The spark-cassandra-connector is a Scala library that exposes Cassandra tables as Spark RDDs, lets you write Spark RDDs to Cassandra tables, and allows you to execute arbitrary computations and CQL queries that are distributed to the Cassandra nodes holding the data, which allows them to be fast. Your code + the spark-cassandra-connector and all dependencies are packaged up and sent to the Spark nodes.
If you are writing ad-hoc queries / computations from the spark-shell. Start up the shell by running the command:

$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11

The --packages option downloads the connector and all
of its dependencies from the Spark Packages site and places it in
the path of the Spark Driver and all Spark Executors.
If you are writing a Scala application; configure a new Scala project. Your build.sbt file should look something like this:

name := "MySparkProject"

version := "1.0"

scalaVersion := "2.11.8"

val sparkVersion = "2.0.2"

resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq(   "org.apache.spark"      % "spark-core_2.11"  % sparkVersion,   "org.apache.spark"      % "spark-sql_2.11"   % sparkVersion,   "datastax"              % "spark-cassandra-connector" % "2.0.0-M2-s_2.11" )

Testing the connector

To start out, create a simple keyspace and table in Cassandra. Run the following statements in cqlsh:

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE test.kv(key text PRIMARY KEY, value int);

Then insert some example data:

INSERT INTO test.kv(key, value) VALUES ('key1', 1); INSERT INTO test.kv(key, value) VALUES ('key2', 2);

For this test, we'll use the spark-shell.

$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11


// import the spark connector namespace import com.datastax.spark.connector._

// read Cassandra data into an RDD val rdd = sc.cassandraTable("test", "kv") println(rdd.count) println(rdd.first) println(rdd.map(_.getInt("value")).sum)  

// Add two more rows to the table val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))
                    

Diagnosing and Fixing Cassandra Timeouts

We recently brought Cassandra into our infrastructure to take advantage of its robust built-in scalability model. After reading all the "marketing" material, watching/attending talks by evangelists, and reading blog posts by experts and enthusiasts we were excited to embark on our Cassandra journey.

via GIPHY


Everything was honky dory in the early going. We followed data modeling best practices, provisioned the right sized servers as recommended by DataStax, and saw great results in the test environment. The issues didn't start until we started moving over portions of our production traffic to our Cassandra production servers. That's when we noticed that we would fairly frequently get read and write timeouts as the volume of traffic got higher.


via GIPHY

My first troubleshooting step was to find out was to monitor the servers to see if there were any anomalies or resource constraints that led to the timeouts. I'm a fan of Brendan Gregg's USE method for monitoring overall system health and identifying errors and bottlenecks. However, at the time we didn't have a comprehensive monitoring solution like Datadog or Icinga2 setup for those servers. So in its place, I wrote a shell script that every 30 seconds would log memory and CPU utilization, disk I/O, network I/O, Cassandra thread pool stats (nodetool tpstats), GC stats, compaction stats, and the table stats for my most active tables. To my chagrin, my hacked together monitor did not reveal any anomalies or resource constraints that were the cause of the timeouts. On the contrary, it revealed that resource utilization (with the exception of CPU which was a bit high) was well under the hardware limits. I did notice that a number of times, a query timeout would coincide with the start of SSTable compaction which led down the rabbit hole of compaction tweaks, however I emerged from that hole with no solution to the Cassandra timeouts.


via GIPHY


Once hardware constraints had been ruled out, I kicked off my next round of troubleshooting efforts by turning on trace probability and looking at the trace for long running queries. In the traces I noticed high queue times, which is not unexpected during times of high load, but I also noticed something else. There were a number of times when a worker node received the request from the coordinator, completed the read operation for the query, queued up the result to be sent back to the coordinator, but the results were never sent and the coordinator timed out waiting for the response. Bingo! I had found the cause of the timeout! But I still hadn't found the root cause; why were the responses not getting sent back? And I didn't know how to fix it either. I spent a good amount of time looking into tweaking Cassandra thread pools (like the RequestResponseStage pool), reading the Cassandra source code on GitHub and re-verifying network traffic/congestion between the nodes during high loads. But those efforts yielded no solution to the query timeouts which led me to the third round of troubleshooting efforts.


via GIPHY


For this round of troubleshooting I turned up the logging level on the org.apache.cassandra logger to DEBUG and I live tailed both the Cassandra system.log and debug.log. That's when I noticed something weird; several times, one of the nodes will detect the other as being down for a period of time. When I looked at the system logs and Linux process activity in htop for the supposedly downed node, it appeared that the node was up and running. This lead me to take a fine-grained look at the system.log for the downed node and in there I noticed a pretty long GC pause. So I flipped over to the gc.log, examined it, and noticed even more long GC pauses. I also noticed that at times there were 35 second pauses that would occur in succession. Those super long GC pauses cause the node to appear down to its peers. It also explained cases where a coordinator sent a request and timed out because the worker node paused execution and didn't return the response in a timely manner. I had found my problem!


via GIPHY


So how did this happen? Well, when we were configuring Cassandra, some doofus (aka me!) noticed that
1. The key cache drastically improved the performance of queries
2. They had a high key cache hit rate and continued to have a high hit rate even as they increased the size of the key cache.
So this doofus (again, me), decided to bump the key cache all the way to 8 GB so he could cache entire tables' keys, essentially turning them into in-memory lookups. To accommodate having such a large key cache, he bumped up the JVM heap to 20 GB thinking the new G1GC garbage collector can handle very large heaps. News flash; it can't! At least not when dealing with my workload of high volumes of concurrent reads and writes.

The fix was simple; reduce the size of the JVM heap and the key cache accordingly. The recommended heap size is 8 GB. I'm able to have my new max heap size set to slightly above that recommendation because I switched my memtables to storing part of its data off-heap (offheap_buffers) which greatly reduced heap memory pressure. I also started using the JEMAlloc allocator which is more efficient than the native GCC allocator.

In conclusion, distributed database systems are complex beasts and tuning them has to be done on many levels. While Cassandra gives you many knobs to tune to get the right performance out of your system, it takes a lot of study, experience or both, to know just how to tune them for your specific hardware and workload. DataStax published this useful Cassandra JVM tuning article to help provide guidance for users. But my advice for you would be to try a number of JVM configs and find out what works for your production workload. If I were doing this exercise over, I would start at the hardware level again but with better monitoring tools and then I would move to the system/debug/gc logs before looking at the traces. There is a good performance tuning tutorial by Kiyu Gabriel on DataStax Academy that you might find useful. Hope this article helps someone be more productive than I was.



Notes from Cassandra Day Atlanta 2016

I attended the DataStax Cassandra 2016 in Atlanta and took down a ton of notes on things that I found interesting. After going through those notes it occurred to me that many of the nuggets in these notes could be useful to someone else other than myself. So I’ve published the notes below.

The information below is mostly composed of quotes from DataStax engineers and evangelists. Very little context is contained in these notes. However, if you are a beginning-intermediate level Cassandra developer or admin you’ll likely have the needed context already. I did attempt to organize the notes somewhat coherently in order to allow you jump to a section you care about and also to provide some context in the grouping.

Data Modeling Tips

General

  • When migrating to C*, don’t just port over your SQL schema. Be query-driven in the design of your schema.
  • If you are planning on using Spark with C*, start with a C* use case / data model 1st and then use Spark on top of it for analytics
  • Patrick McFadden (DataStax evangelist) on not having certain relational DB constructs in C*: “In the past I’ve scaled SQL DBs by removing referential integrity, indexes and denormalizing. I’ve even built a KV database on an Oracle DB that I was paying of dollars per core for”. The implication here is these constructs bound scalability in relational databases and in explicitly not having them Cassandra’s scalability is unbounded (well, at least theoretically).
  • You can stop partition hotspots by adding an additional column to the partition key (like getting the modulus of another column when divided by the number of nodes) or by increasing the resolution of the key in the case where the partition key is a time span.
  • Using the “IF NOT EXISTS” clause stops an UPSERT from happening automatically / by-default. It also creates a lock on the record while executing, so that multiple writers don’t step on each other trying to insert the same record in a race condition. This is a light weight transaction (LWT). You can also create an LWT when doing a BATCH UPSERT
  • You can set a default TTL (Time To Live) on an individual table. This will apply to all data inserted into the table. A CQL insert can also specify a TTL for the inserted data that overrides the default.
  • DTCS (DateTieredCompactionStrategy) compaction is built for time series data. It groups SSTables together by time so that older tables don’t get compacted and can be efficiently dropped if a TTL is set.
  • CQL Maps allow you to create complex types inside your data store
  • One of the reasons for limiting the size of elements that can be in a CQL collection is because on reads the entire collection must be denormalized as a whole in the JVM so you can add a lot of data to the heap.

Secondary indexes

  • Secondary indexes are not like you have them in relational DBs. They are not built for speed, they are built for access.
  • Secondary indexes get slower the more nodes you have (because of network latencies)
  • Best thing to do with a secondary index is just to test it out and see its performance, but do it on a cluster not your laptop so you can actually see how it would perform in prod. Secondary indexes are good for low cardinality data.

Development Tips

Querying

  • Use the datastax drivers not ODBC drivers because datastax drivers are token aware and therefore can send queries to the right node, removing the need for the coordinator to make excessive network requests depending on the consistency level.
  • Use PreparedStatements for repeated queries. The performance difference is significant.
  • Use ExecuteAsync with PreparedStatements when bulk loading. You can have callbacks on Future objects and use the callbacks for things like detecting a failure and responding appropriately
  • BATCH is not a performance optimization. It leads to garbage collection and hotspots because the data stays in memory on the coordinator.
  • Use BATCH only to update multiple tables at once atomically. An example is if you have a materialized view / inverted index table that needs to be kept in sync with the main table.

General

  • Updates on collections create range tombstones to mark the old version of the collection (map, set, list) as deleted & create the new one. This is important to know because tombstones affect read performance and at a certain time having too many tombstones (100K) can cause a read to fail. http://www.jsravn.com/2015/05/13/cassandra-tombstones-collections.html
  • Cassandra triggers should be used with care and only in specific use cases because you need to consider the distributed nature of C*.

Ops Tips

Replication Settings

  • SimpleStrategy fails if you have multiple datacenters (DCs). Because 50% of your traffic that’s going to the other DC becomes terribly slow. Use NetworkTopologyStrategy instead. You can configure how replication goes to each DC individually, so you can have a table that never gets replicated to the US for example, etc.
  • If you are using the NetworkTopologyStrategy then you should use the Gossiping Property File Snitch to make C* network topology aware instead of the other property file configurator because you dan’t now have to change the file on every node and reboot them.

Hardware Sizing

Recommended Node size
  • 32 GB RAM
  • 8-12 Cores
  • 2 TB SSD
Hardware should be sized appropriately. 64 cores will be hard to use. If you are adding search and/or analytics to the node, you need more RAM: 128+ GB. More memory is needed for search because it keeps its indexes in memory.

Recommendation for Spark & Cassandra on the same node: Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations.

The recommendation is to have no more that 1 TB of data per node. The reason for 2 TB disk despite a 1 TB recommendation is because once over 60% of your disk is full you run a risk of not having enough disk space during compaction. This is especially true if you are using size tiered compaction. With level tiered compaction you can use up to 70% without risk.

Use RAID 0 for your storage configuration. C* does replication for you. You can also use JBOD and C* can intelligently handle failures of some of the disks in your JBOD cluster.

Java Heap Size & Garbage Collection

  • As a general rule of thumb; start with defaults and then walk it up.
  • The ParNew/CMS GC works best with 8 GB
  • The G1GC can manage 20 GB of RAM (Note: Another engineer mentioned to me that 32 GB of RAM is no problem for G1GC). Should not be used if the heap is under 8 GB.
    • Use G1GC with Solr / DSE Search nodes

Memory Usage and Caching

  • Its very important to have ample Off-heap RAM. Some C* data structures such as memtables and bloom filters are Off-heap. You also want to have non-heap RAM for page caching.
  • Row caching can significantly speed up reads because if avoids a table scan (If the page isn’t cached already). However row caching should be judiciously used. Best use case is for tables with a high density of hotspot data. The reason being that on a large table with varying and disparate data and seemingly random reads, you’ll end up with a lot of cache misses which invalidates the point of having a cache.
  • The row cache is filled on reads. memtables are filled on writes.
  • Memtables remain in memory until there is memory pressure based on configuration in the cassandra.yaml, then they are removed from RAM.

Benchmarking

  • Use the Cassandra Stress program that comes with C*.
  • Cassandra Stress can be configured; you can specify number of columns, data size, data model, queries, types of data, cardinality, etc.
  • To model production, use multiple clients & multiple threads for clients in your Benchmarking
  • When stress testing make sure you run it long enough to run into compactions, GC, repairs. Because when you test you want to know what happens in that situation. You can even stress test and introduce failures and see how it responds. You can/should instrument the JVM during stress testing and then go back and look at it.
  • General recommended stress test times is 24 - 48 hrs run times.
  • DSE has solr-stress for testing the solr integration.
For Performance A starting expectation of 3k - 5k transactions per second per core is reasonable.

Interesting Note: A DataStax customer once conducted a stress test that ran for 6-8 weeks for 24 hrs. They were testing to see how changing the compaction strategy impacted their read heavy workload.

General

  • Turn on user authentication. At least Basic Auth. This is good for security and auditing purposes. Also it allows you to not accidentally drop a production table because you thought you were connected to staging.
  • Use TLS if you are talking between DCs across the public internet
  • Don’t just bump up the heap for greater performance or to solve your problems! You’ll have to pay for it later during GC.
  • If you have crappy latency on 1% of your operations you shouldn’t just ignore it. You should try to understand what happened, is it compaction? Is it GC? That way you can address the issue that caused the high latency. Because that 1% could one day be 5%.
  • Why should be have backups? Backups exist to protect against people not machines. data corruption is the primary reason for backups. For example someone accidentally changes all the '1’s in your DB to 'c’s.
  • There is no built in way to count the number of rows in a Cassandra table. The only way to do so is to write a Spark job. You can estimate the table size if you know the amount of data per row and divide the table size by that amount.
  • Use ntpd! C* nodes in a cluster must always be on time because time stamps are important and are used in resolving conflict. Clock drifts cannot be tolerated.
  • Tombstone Hell: queries on partitions with a lot of tombstones require a lot of filtering which can cause performance problems. Compaction gets rid of tombstones.
  • Turn off swap on C* nodes.
  • If C* runs out of memory it just dies. But that’s perfectly ok, because the data is distributed / replicated and you can just bring it back up. In the mean time data will be read from the other nodes.

Cluster Management

  • Don’t put a load balancer in front of your C* cluster.
  • Make sure you are running repairs. Repairs are essentially network defrag and help maintain consistency. Run repairs a little at a time, all the time.
  • If you can model your data to have TTLs you can run repairs much less or not at all.
  • If you never delete your data you can set gc_grace_period to 0.
  • Don’t upgrade your C* versions by replacing an outgoing node with a new node running a newer version of C*. C* is very sensitive when it comes to running mixed versions in production. The older nodes may not be able to stream data to the newer node. Instead you should do an in-place upgrade, i.e. shut down the node (the C* service), upgrade C* and then bring it back up. (https://docs.datastax.com/en/upgrade/doc/upgrade/cassandra/upgradeCassandraDetails.html)
  • When a new node is added in order to increase storage capacity / relieve storage pressure on the existing nodes. Ensure you run nodetool cleanup as the final step. This is because C* won’t automatically reclaim the space of the data streamed out to the new node.

Monitoring, Diagnostic

Monitoring Services for capturing machine level metrics
  • Monit
  • Munin
  • Icinga
  • JMX Metrics
Make sure you are capturing application metrics and deliver them to a dashboard that can integrate app metrics and server metrics

Since you are running on multiple machines it becomes important to aggregate your logs.
  • Logstash
Diagnostic tools
  • htop (a better version of top)
  • iostat
  • dstat
  • strace
  • jstack
  • tcpdump (monitor network traffic, can even see plain text queried coming in)
  • nodetool tpstats (can help diagnose performance problems by showing you which thread pools are overwhelmed / blocked. From there you can make hypotheses are to the cause of the blockage / performance problem)

DSE Offering

DSE Max => Cassandra + Support + Solr + Spark

DSE search

  • Solr fixes a couple of rough edges for C* like joins, ad-hoc querying, fuzzy text searching and secondary indexing problems in larger clusters.
  • DSE search has tight Solr integration with C*. C* stores the data, Solr stores the indexes. CQL searches that use the solr_query expression in the WHERE clause search Solr first for the location of the data to fetch and then queries C* for the actual data.
  • You can checkout killrvideo’s Github for an example of DSE search in action (https://github.com/LukeTillman/killrvideo-csharp/blob/master/src/KillrVideo.Search/SearchImpl/DataStaxEnterpriseSearch.cs)
  • Solr is about a 3x multiplication on CPU and RAM needed for a running regular C*. This is because Solr indexes must live in RAM.
  • Solr can do geospatial searches & can do arbitrary time range searches (which is another rough edge that C* cannot do). E.g. “search for all sales in the past 4 mins 30 seconds”

DSE Spark

  • Spark runs over distributed data stores and schedules analytics jobs on workers on those nodes. DSE Max has Spark integration that just requires the flipping of a switch, no additional config.
  • There’s no need for definition files, workers automatically have access to the tables and Spark is data locality aware so jobs go to the right node.
  • Optionally with DSE search integration you can have search running on the same nodes that have the analytics and data and leverage the search indexes for faster querying instead of doing table scans.
  • With DSE analytics, you can create an analytics DC and have 2-way replication between the operations DC and the analytics DC. 2 way is important because it means that the analytics DC can store the result of its computation to the Cassandra table which then gets replicated back to the ops DC.
  • The Spark jobs / workers have access to more than just the C* table data. They can do just about anything you code. They can pull data from anything; open files, read queues, JDBC data stores, HDFS, etc. And write data back out as well.
  • Recommendation for Spark & Cassandra on the same node. Appropriate resource allocation is important. Having Spark will require more memory. Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations.

Other Notes

Cassandra has a reference implementation called killrvideo. It is an actual website hosted on MS Azure. The address is killrvideo.com. It is written by Luke Tillman in C#. Checkout the source code on Github (https://github.com/LukeTillman/killrvideo-csharp).

Configuring Remote Management and Monitoring on a Cassandra Node with JConsole

In your cassandra-env.sh set

LOCAL_JMX=no

If you want username and password security. Keep the default setting for jmxremote authenticate which is true. Otherwise set it to false:

JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false"

Note: If set to true, enter a username and password for the jmx user in the jmxremote.password.file. Follow the instructions here on how to secure that file. Setting jmxremote.authenticate to true also requires you to pass in the username and password when running a nodetool command, e.g. nodetool status -u cassandra -pw cassandra

Restart the server (if needed).

Connecting to the node using JConsole for monitoring

Find the JConsole jar in your JDK_HOME/bin or JDK_HOME/lib directory. If you don’t have the Java JDK installed it can be downloaded from the Oracle Website.

Double-click the executable jar to run it (or run it from the command line). Select “Remote Process” and enter the following connection string.

 service:jmx:rmi:///jndi/rmi://<target ip address>:7199/jmxrmi

replacing <target ip address> with the address of the machine you intend to manage or monitor.

Note: JConsole may try (and fail) to connect using ssl first. If it does so it will ask if it can connect over a non-encrypted connection. You should answer this prompt in the affirmative and you are good.

Congrats! You now have everything you need to monitor and manage a cassandra node.

For help with how to monitor Cassandra using JMX and with interpreting the metrics see: