Cassandra Data Modeling Best Practices Guide

Apache Cassandra is an open source non-relational, or NoSQL, distributed database that enables continuous availability, tremendous scale, and data distribution across multiple data centers and cloud availability zones. Simply put, it provides a highly reliable data storage engine for applications requiring immense scale.

Data modeling is a process used to analyze, organize, and understand the data requirements for a product or service. Data modeling creates the structure your data will live in. It defines how things are labeled and organized, and determines how your data can and will be used. The process of data modeling is similar to designing a house. You start with a conceptual model and add detail to produce the final blueprint. 

The ultimate goal of Cassandra data modeling and analysis is to develop a complete, well organized, and high performance Cassandra cluster. Following the five Cassandra data modeling best practices outlined will hopefully help you meet that goal:

  1. Cassandra is not a relational database, don’t try to model it like one
  2. Design your model to meet 3 fundamental goals for data distribution
  3. Understand the importance of the Primary Key in the overall data structure 
  4. Model around your queries but don’t forget about your data
  5. Follow a six step structured approach to building your model. 

Cassandra Is Not a Relational Database

Do not try to design a Cassandra data model like you would with a relational database. 

Query first design: You must define how you plan to access the data tables at the beginning of the data modeling process not towards the end. 

No joins or derived tables: Tables cannot be joined so if you need data from more than one table, the tables must be merged into a denormalized table.   

Denormalization: Cassandra does not support joins or derived tables so denormalization is a key practice in Cassandra table design.

Designing for optimal storage: For relational databases this is usually transparent to the designer. With Cassandra, an important goal of the design is to optimize how data is distributed around the cluster. 

Sorting is a Design Decision: In Cassandra, sorting can be done only on the clustering columns specified in the PRIMARY KEY.

The Fundamental Goals of the Cassandra Data Model

Distributed data systems, such as Cassandra, distribute incoming data into chunks called partitions.  Cassandra groups data into distinct partitions by hashing a data attribute called partition key and distributes these partitions among the nodes in the cluster. 

(A detailed explanation can be found in Cassandra Data Partitioning.)

A good Cassandra data model is one that: 

  1. Distributes data evenly across the nodes in the cluster
  2. Place limits on the size of a partition
  3. Minimizes the number of partitions returned by a query.

Distributes Data Evenly Around the Cassandra Cluster

Choose a partition key that has a high cardinality to avoid hot spots—a situation where one or a few nodes are under heavy load while others are idle.   

Limits the Size of Partitions

For performance reasons choose partition keys whose number of possible values is bounded. For optimal performance, keep the size of a partition between 10 and 100MB. 

Minimize the Number of Partitions Read by a Single Query 

Ideally, each of your queries will read a single partition. Reading many partitions at one time is expensive because each partition may reside on a different node. The coordinator (this is the node in the cluster that first receives the request) will generally need to issue separate commands to separate nodes for each partition you request. This adds overhead and increases the variation in latency. Unless the data set is small, attempting to read an entire table, that is all the partitions, fails due to a read timeout. 

Understand the Importance of the Primary Key

Every table in Cassandra must have a  set of columns called the primary key. (In older versions of Cassandra, tables were called column families). In addition to determining the uniqueness of a row, the primary key also shapes the data structure of a table. The Cassandra primary key has two parts:

Partition key: The first column or set of columns in the primary key. This is required. The hashed value of the partition key value determines where the partition will reside within the cluster.

Clustering key (aka clustering columns): Are the columns after the partition key. The clustering key is optional. The clustering key determines the default sort order of rows within a partition.  

A very important part of the design process is to make sure a partition key will: 

  1. Distribute data evenly across all nodes in a cluster.  Avoid using keys that have a very small domain of possible values, such as gender, status, school grades, and the like.  The minimum number of possible values should always be greater  than the number of nodes in the cluster.  Also, avoid using keys where the distribution of possible values is highly skewed. Using such a key will create “hotspots” on the cluster. 
  2. Have a bounded range of values. Large partitions can increase read latency and cause stress on a node during a background process called compaction. Try to keep the size of partitions under 100MB. 

Model Around Your Queries

The Cassandra Query Language (CQL) is the primary language used to communicate with a Cassandra database. In syntax and function, CQL resembles SQL which makes it easy for those who know the latter to quickly learn how to write queries for Cassandra. But there are some important differences that affect your design choices. 

A well known one is that Cassandra does not support joins or derived tables. Whenever you require data from two or more tables, you must denormalize. 

Search conditions have restrictions that also impact the design. 

  • Only primary key columns can be used as query predicates. (Note: a predicate is an operation on expressions that evaluates to TRUE, FALSE).
  • Partition key columns are limited to equality searches. Range searches can only be done on clustering columns.
  • If there are multiple partition key columns (i.e. a composite partition key), all partition columns must be included in the search condition.
  • Not all clustering columns need to be included in the search condition. But there are some restrictions: 
    • When omiting columns you must start with the rightmost column listed in the primary key definition;  
    • An equality search cannot follow a range search.

Don’t Forget About the Data

Creating a complete Cassandra data model involves more than knowing your queries. You can identify all the queries correctly but if you miss some data, your model will not be complete.  Attempting to refactor a mature Cassandra data can be an arduous task. 

Developing a good conceptual model (see below) will help identify the data your application needs. 

Take a Structured Approach to Design 

In order to create a data model that is complete and high performing, it helps to follow a big data modeling methodology for Apache Cassandra that can be summarized as: 

  1. Data Discovery (DD). This is a high level view of the data your application needs and identifies the entities (things), the attributes of the entities, and which attributes are the identifiers. This may be an iterative process as development. 
  2. Identify the Access Patterns (AP).  Identify and list the queries your application will want to perform.  You need to answer: What data needs to be retrieved together, what are the search criteria, and what are the update patterns? This also may be an iterative process. 
  3. Map data and queries (MDQ).  Maps the queries to the data identified in steps 1 and 2 to create logical tables which are high level representations of Cassandra tables.
  4. Create the physical tables (PT).  Convert the logical data model to a physical data model (PDM) by using CQL CREATE TABLE statements. 
  5. Review and Refine physical data model.  Confirm that the physical tables will meet the 3 Basic Goals for Cassandra Data Model.

Structured approach to cassandra data model design

A more detail examination of these steps can be found in an earlier Instaclustr Whitepaper: 6 Step Guide to Apache Cassandra Data Modelling

If you have worked with relational database design, some steps will be familiar because they are also in the entity-relationship (ER) model.  At the conceptual stage, it can be useful to visually represent the data model by ER diagrams using either the Chen or Information Engineering (IE) notation. The Chebotko diagram uses a notation developed by Artem Chebotko to represent data and queries at the logical and physical modeling stages. 

Cassandra Model Example

Let’s assume that we have a simple logging system with two entities: LogSource and LogMessage.  For LogSource the key attribute is sourceName.  For the entity LogMessage, the key attribute is messageID.  

The query we want to execute is:  Q1) show the message information about the 10 most recent messages for a given source. 

The primary access entity is LogSource because it contains the equality search attribute (sourceName).  We create a logical table named LogMessage_by_Source and push the attribute sourceName into it. That becomes the partition key (indicated by the K).

We need to sort by time so messageTime becomes the clustering column in  LogMessage_by_Source.  (Indicated by C↑) 

The secondary entity is LogMessage. The key attribute messageID becomes a 2nd clustering column of the primary key in  LogMessage_By_Source to ensure uniqueness of the row.  Finally, we add the remaining columns from the secondary source to complete the data needed by the query. 

An example of Cassandra data model

Data Duplication 

Data duplication refers to the number of times data must be duplicated in different tables to satisfy access patterns.   For example, if  we wanted to search for a specific message by its  unique identifier we would duplicate the data by creating a new table called LogMessage_by_ID that uses  messageID as the partition key.

Two issues can arise from duplication: 

  • Increased complexity to maintain  data integrity across multiple tables; 
  • If the data being duplicated is very large it puts size and write pressure on the database.

In a case where data duplication would cause more problems than it solves, an alternative is to duplicate only lookup keys, that is a lookup table. However, this solution requires the client perform a second query to read the secondary data. The trade-off between read performance and data maintenance cost needs to be judged in the context of the specific performance requirements of your application and such a solution would need to be benchmarked to ensure that it is a workable solution.

Materialized Views

These are objects created by a query which are copies of a base table but with a different partition key. The data between the materialized view and the base table is automatically synchronized by Cassandra. Their purpose was to make modeling to new query patterns easier and more flexible.  

Instaclustr’s advice is not to use them in Cassandra 3.x because of problems in keeping the view and the base table synchronized. The Apache Cassandra project has classified Materialized Views as an experimental feature for Cassandra 3.x. 

Summary

Cassandra Data modeling is a process used to define and analyze data requirements and access patterns on the data needed to support a business process. 

A data model helps define the problem, enabling you to consider different approaches and choose the best one.  It ensures that all necessary data is captured and stored efficiently. 

Models document important concepts and jargon, proving a basis for long-term maintenance.

Creating a Cassandra is a non-relational database.  Do not design it as you would a relational database. Don’t be afraid to denormalize data. Writes in Cassandra are relatively cheaper than for relational databases.

 The goals of a successful Cassandra Data Model are to choose a partition key that (1)  distributes data evenly across the nodes in the cluster; (2) minimizes the number of partitions read by one query, and (3) bounds the size of a partition.

Take a structured approach to your model. Your first steps are understanding your and identifying access patterns on the data. These are most critical to developing a complete model.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post Cassandra Data Modeling Best Practices Guide appeared first on Instaclustr.

Cassandra Monitoring: A Best Practice Guide

Introduction to Cassandra Monitoring

Apache Cassandra is a NoSQL database designed to provide scalability, reliability, and availability with linear performance scaling. Cassandra database is designed as a distributed system, and aims to handle big data efficiently. Refer to what-is-apache-cassandra and cassandra-architecture for more information. Note that knowledge of Cassandra architecture and basic terminology is a prerequisite to understanding Cassandra monitoring. 

Cassandra monitoring is an essential area of database operations to ensure the good health of a cluster and optimal performance. Alerting is another crucial area for production systems, and it is complementary to monitoring. Good alerting in Cassandra can be achieved by utilization of the monitoring infrastructure and relevant toolset. Alerting and monitoring help create a robust environment for any Cassandra deployment.

This blog post aims to touch all important aspects of Cassandra monitoring. We hope it provides the reader with crucial information about monitoring tools, components, and metrics.

Monitoring Terminologies

JVM Based Monitoring

Cassandra is developed in Java and is a JVM based system. Each Cassandra node runs a single Cassandra process. JVM based systems are enabled with JMX (Java Management Extensions) for monitoring and management. Cassandra exposes various metrics using MBeans which can be accessed through JMX. Cassandra monitoring tools are configured to scrape the metrics through JMX and then filter, aggregate, and render the metrics in the desired format. There are a few performance limitations in the JMX monitoring method, which are referred to later. 

The metrics management in Cassandra is performed using Dropwizard library. The metrics are collected per node in Cassandra. However, those can be aggregated by the monitoring system. 

Cassandra Monitoring

Metrics 

There are a large number of metrics exposed by Cassandra to cover all possible areas including performance, resources, communication, node, and cluster state etc. The metrics are defined with distinct types, and those can be categorized as well for operational ease.    

Metrics Types

Cassandra metrics are defined with specific data types. These types are designed to accommodate metrics representations to represent the metrics like latency, counts, and others correctly. 

The metrics types are not intuitive and you might need some time to get familiar. 

  • Gauge: A single value representing a metric at a specific point in time, e.g. value of memory allocated or a number of active tasks. 
  • Counter: Counters are the same as a gauge but are used for value comparisons. Generally, a counter is only incremented, and it is reset when the functionality gets disrupted like a node restart. An example is cache_hit count.
  • Histogram: Histogram is a count of data elements from a data stream grouped in fixed intervals. A histogram gives a statistical distribution of values. The data elements are provided over min, max, mean, median, 75th, 90th, 95th, 98th, 99th, 99.9th percentile value intervals. 
  • Timer: Timer keeps the rate of execution and histogram of duration for a metric. 
  • Latency: This is a special type to measure latency. It includes Timer and the latency is in microseconds. There is also a TotalLatency with each latency metric. The total latency is the count of latency since the beginning. The beginning means the start of a node. 
  • Meter: Meter is a unit to measure throughput. It also includes a weighted moving average for first, fifth, and fifteenth minute.

Metrics Categories

The metrics are categorised based on Cassandra domains, e.g. table, keyspace, storage, communication, JVM etc. Not all metrics should be monitored all the time, but those should be available in case required, i.e. during troubleshooting or application performance testing. 

The metrics are further subdivided in terms of broader areas like resources, network, internals, crucial data elements etc. Metrics can be represented as per topology levels like cluster level, node level, table level etc. to organize all the information. 

The categorization becomes clear as we go through specific metrics and correlate those with specific Cassandra areas.

Metrics Format

The Cassandra dropwizard metrics are specified in format below:

Dropwizard Metric Name: org.apache.cassandra.metrics.<Metric scope>.<Metric type>.<MetricName>

Mbean: org.apache.cassandra.metrics:type=<Metric type> scope=<Metric scope> name=<MetricName>

Metric Type: This is the category of metrics e.g. table, keyspace, threadpool. Do not confuse this with the data type of metrics.

Metric scope: This is the metric sub type for more granularity wherever required. The scope is hence optional. E.g. the table name or keyspace name. 

Metric name: The final metric name like LiveSSTableCount. 

Essential Metrics

Cassandra Metrics

Node Status 

The status of nodes must be monitored and alerted immediately if a node is down. Cassandra cluster’s availability directly depends on the uptime of all the nodes in the cluster. Although the anti-entropy mechanism in Cassandra helps protect data from inconsistency, there is no replacement for lost performance during a node downtime. A down node puts pressure on other nodes in the data center to handle requests and store hints. Hence, downtime for a node should be minimum. 

Cassandra operational activity requires node restart or downtime but those can be scheduled at least busy times for the cluster. This alert helps keep track of any service disruption and the need to run repair a node. A node should be repaired if it is out of the cluster for more than the hinted handoff window which is three hours by default. 

Client Request Metrics

The client requests metrics provide information about client communication in forms of read and write requests per second between the client and a coordinator node. Other than normal read and write requests, there are special types of read and write operations CAS, View, and RangeSlice which have their own set of metrics. These metrics help to track the request count, latency, failures, and a few other statistics. The basic statistic to monitor is the number of requests per seconds, i.e. throughput and request latency.

Requests Per Second

The number of requests should be aggregated per data center and per node. There could be some nodes receiving more requests as compared to other nodes. This behaviour creates extra pressure for the nodes receiving more requests. The specific requests like CAS and RangeSlice should be tracked separately for clarity. These operations are resource-intensive and have a unique effect on the nodes. The ratio of read requests to write requests is crucial to understand the type of workload. There are specific configurations to optimize a read-heavy or a write-heavy workload. 

Each cluster can handle a certain amount of client requests per second efficiently. If the number of requests exceeds the cluster capacity, it can result in undesirable results like dropped messages, inconsistency, increased latency etc. The CAS and RangeSlice request can cause increased latency. 

Uneven load on a few nodes can be handled with optimal load balancing at the driver side. The read and write latency or throughput issues caused by constant overloading should be addressed by adding more nodes to the data center and revisiting the data model if required.

Alerting: Set alerts on the number of requests threshold served per node and data center. 

Client Request Latency

Latency tracked by these metrics is the read and write latency experienced by client applications. There are various percentiles of latency, as mentioned in latency metric type. These metric types should be tracked separately as well as overall values so that there is a clear view of system performance metrics. Production systems generally have latency SLAs. The SLA on a specific or overall latency should be tracked and alerted upon the client latency.

There are various factors which affect latency including, the amount of load served by a node or cluster, system resources and tuning, GC settings and behaviour, type of requests. Troubleshooting latency issues mainly depends on the accurate investigation of the root cause. Correlating latency metrics with other metrics helps to track down root causes. Using a graph solution like Grafana for visualization is the most efficient way to sight and track issues.

Alerting: Set alerts for latency SLA thresholds if any or expected latency range.

Request Timeout and Failure 

These metrics are the number of client requests timed out or failed. Failed requests are a clear indication of errors, and those should be addressed immediately. The common causes for request failure are unavailability of data, failure to get a response from the required number of replicas, data inconsistency, and network error. Troubleshooting for error is performed using the error messages and other metrics correlation. 

Alerting: Set alerts for more than a few failure requests on production systems.

Compaction Statistics 

This group of metrics include the amount of data compacted, the number of active/completed compactions, and other relevant details. Compactions consume node resources and could consume the disk space quickly. Monitoring compactions provides a good insight into the compaction strategy used as each strategy has a unique operational footprint. Specific Cassandra operations like repairs, high volume data writes, add/remove/replace nodes etc. increase the compaction activity. It is important to monitor the compactions while performing such operations. 

A common troubleshooting method for high compaction activities and high resource consumption is to throttle the compaction rate. In some scenarios, compactions can be temporarily stopped, but it requires a lot of caution and must be re-enabled at some point to keep the SSTable count low, and read latency optimal.

Alerting: Alerting is not essential for these metrics. However, alerts can be set if there are a higher number of pending compactions sustained for longer than expected time interval.

Garbage Collector Metrics

The Garbage Collector (GC) is yet another crucial area for monitoring. The efficiency of Cassandra throughput and performance depends on the effective use of JVM resources and streamlined GC. The GC behaviour mainly depends on these factors—the garbage collector used, the workload served by Cassandra nodes, GC parameter settings, the heap size for JVM etc. A common issue with garbage collection is long GC pause or the time taken to perform garbage collection. 

The GC works well with the default settings by Cassandra, but those can be tuned if required to suit a specific workload and the number of resources. GC parameter tuning is a non-trivial task and requires knowledge of GC internals. However, sometimes the GC can be resolved by fixing the data model, changing the workload, or JVM resources. It is essential to correlate bad GC behaviour with the exact root cause before performing a remedy. Also, any change in parameters impacting GC should be monitored carefully to ensure improvements. 

Alerting: Set alert on GC pauses for more than acceptable thresholds on production systems.

Memory Metrics

The memory metrics provide JVM heap, non-heap, and total memory used by Cassandra. The JVM heap storage is used heavily for a variety of purposes by Cassandra. The non-heap memory is also used a lot by later versions of Cassandra. Monitoring the heap and overall memory gives insight into memory usage. It can be used to correlate with any issues and determine memory requirements. 

Please note, Cassandra cannot scale with an indefinite amount of memory. This boils down to the fact that JVM and GC cannot perform optimally for large heap size. The most common range of heap size for Cassandra is 8GB-32GB where the smaller size is configured with CMS GC and the larger size with G1GC.

Alerting: Set alerts to test specific memory thresholds and tuning.  

Threadpool Metrics

Cassandra works with numerous thread pools internally. This design is aimed to achieve asynchronous tasks, and it also helps to handle back pressure. Monitoring for the thread pools makes it easy to understand the internal system behaviour. It also helps to understand  specific pools under pressure with active, pending, and blocked tasks. 

The solution for constantly saturated pools generally is to provide more processing capacity to the node or the cluster. Other core issues like poor data model and query pattern also impact on the thread pools. 

Alerting: Set alerts for more than a few blocked tasks on the production system. This helps take preventive action to help avoid performance impact.

Table Metrics 

Table metrics are useful in tracking each table independently. These can be used to monitor a specific set of tables which are performance-critical or host a large volume of data. There are various metrics for each table but some of the most important are discussed here:  

Partition Size

The partition size is a crucial factor in ensuring optimal performance. Cassandra uses partitions of data as a unit of data storage, retrieval, and replication. Hence, if the partition size is larger it impacts overall performance. The ideal range of partition size is less than 10MB with an upper limit of 100MB. These values are derived from operational experience from the Cassandra community. 

The data model and table definition control the partition size. The partition key for a table determines the data to create partitions. A partition key should be designed to accumulate data only up to acceptable size limits. Unfortunately, it is not easy to replace current partitions for a table. But, if the data model is in the design phase, it is crucial to test all the table definitions for potential large partitions sizes. In the existing tables, if large partitions are a major issue, they can be addressed by complete data rewrite. This operation could be long-running, but it can solve many performance issues, and if configured correctly, it can be performed without minimal or no downtime for the table. 

Alerting: Configure alerts on large partitions for tables with unbounded partitions. An unbounded partition is where the partition grows in size with new data insertion and does not have an upper bound.

Tombstone Scanned

Tombstones are the deletion markers in Cassandra. Tombstones are produced by data deletion, and it could be performed using various means like delete queries, TTL expiry, null inserts etc. The immutable design of SSTables and compaction operations makes tombstone eviction difficult in some scenarios. Tombstone presence directly impacts read performance; its effect increases with the number of tombstones scanned per operation. This metric provides a histogram of tombstones read for a table’s queries in recent time. 

The troubleshooting for tombstone eviction can be performed using various options like revisiting the compaction strategy, major compaction, nodetool garbagecollect etc. Note that all the mentioned remedies for tombstone eviction could operate on a large set of SSTables and are non-trivial operations. The operations must be well tested before executing on production. 

Alerting: Set alerts for tombstones-scanned per read metrics for performance-sensitive tables. 

SSTable Per Read

These metrics are related to the immutable design of SSTables and read operation. The SSTables are created per table, and the data is arranged sequentially in the order it is written. This results in multiple SSTable reads to complete a single read operation. The number of SSTables read contributes to the time consumed to complete the read operation. Hence, the number of SSTables per read should be minimized. 

A good number of SSTables per read is a relative value and depends on the data volume and compaction strategy. However, as a general rule, those should be less than 10. The compaction strategy used for a table plays a crucial role in this metric. A table should be configured with optimum compaction strategy as per the table usage. Repair operation plays a role in keeping the SSTables consistent and hence also indirectly impacts this metric. All the data in Cassandra should ideally be repaired once per gc_grace_seconds cycle. 

Alerting: Set alerts for all the read performance-sensitive and high data volume tables for SSTables per read. 

Additional Metrics

It is difficult to cover all the metrics present in Cassandra in this blog post, and it is also difficult to predict the most useful ones in general. I have tried to cover the most used metrics individually. But there are still some crucial metrics which are useful for getting insight in specific Cassandra areas. Let’s look at those briefly:

Dropped Messages

Cassandra handles many forms of messages corresponding to various functions. These messages can get dropped mostly due to load or communication error etc. The dropping of messages causes data inconsistency between nodes, and if those are frequent, it can cause performance issues. It is necessary to identify the cause of dropped messages. If those occur frequently or if those are in large numbers, the system resources and data model should be revisited. Alerts should be set for an unexpected occurrence or number of dropped messages. 

Caches For Tables

Cassandra uses quite some cache, and those are configurable. The cache metrics are useful to track effective use of a particular cache. A good example is the use of row cache for frequently accessed rows in a table. If caching hot data in row cache improves the cache hits, it is a successful use of the row cache. 

Data Streaming

Streaming is used while booting up new nodes, repair operations, and during some other cluster operations. Streaming operations can move many data across a cluster and hence consume network bandwidth. The streaming metrics are useful for monitoring node activities and repairs when planned. The streaming rate can be controlled if required to spare the bandwidth for operations.

Hinted Handoff 

Hints are a part of the anti-entropy mechanism, and those try to protect nodes from data loss when those are offline. Hints are stored and transferred, so metrics related to these attributes and delivery success, failure, delays, and timeouts are exposed. 

The hints metrics are useful to monitor all hints activities. A lot of hints stored and used indicate nodes being offline where hint delays, failures indicate a network or other communication issues.

CQL and Batch 

CQL metrics include the number of statements executed of each type. The batch metrics include the number of batch statements executed. These metrics help to monitor the application activity and query semantics used. Use of logged and unlogged batches has its caveats in Cassandra, and they can cause performance penalty if not used correctly. 

System Metrics

These metrics are not exported by Cassandra but those are obtained from the OS. These metrics are equally important as the Cassandra metrics to obtain system insights. 

Disk Usage

The disk usage is subject to monitoring as Cassandra is optimized to write a lot of data in quick time. The real risk for disk fillup is from compactions. The default compaction strategy used for Cassandra is SizeTieredCompactionStrategy STCS. This strategy merges many SSTables and outputs a single SSTable. The resulting SSTable can have a size equal to the combined size of all the SSTables merged in it. Also, until a compaction operation ends, both old and new SSTables exist on disk. 

The disk space guidelines for a cluster with most tables using STCS is to utilise the disk space up to 50% and to leave the rest as a room for compactions. Generally, disk space is cheaper in cost as compared to other resources and there is no harm to keep vacant space on nodes. However, if there is limited disk space available, disk monitoring becomes even more crucial as free disk left for compactions can be reduced further than general guidelines. 

Remedy for high disk usage includes snapshot deletion as those can consume a considerable amount of space. Another method is to stop specific compaction operation; this frees space consumed by the new SSTables. The time until the compaction starts again can be utilizd to add more space. 

Alerting: Set alerts for various stages of disk usage. The alerts can be categorized for severity based on the amount of free disk space on a node. 

CPU Usage

CPU capacity in a Cassandra cluster contributes as the main processing capacity. The number of requests served by a node and the amount of data stored are the factors directly proportional to the CPU utilization. CPU utilization should be monitored to ensure the nodes are not overloaded. 

A Cassandra cluster or a single data center should have all the nodes of similar size. Those should have an equal number of CPU cores, and the CPU utilization should also be equivalent. A single node or a few nodes with high CPU is an indication of uneven load or request processing across the nodes. It is observed that Cassandra is not CPU bound in most cases. However, a cluster or data center with high CPU utilization at most times should be considered for node size upgrade. 

Alerting: Set alerts for specific levels of CPU utilization on nodes or just for a single threshold. The levels can be defined as per expected CPU load, e.g. 80%, 90%, >95% etc. 

Monitoring tools

There are various tools available to set up Cassandra monitoring. I am describing here a few popular open-source tools used widely across the Cassandra community.

Prometheus

Prometheus is a metrics tool used for handling time-series based monitoring. It has alerting capability as well, which works on the time-series metrics. Prometheus can be configured to collect Cassandra metrics from nodes as well as the system metrics of the nodes. Prometheus uses exporters which are installed on the nodes and export data to Prometheus.  

Prometheus runs with a time-series database to store metrics. The metrics are stored in the database and can be queried using promQL, a query language for Prometheus. Prometheus also runs a web UI which can be used to visualise the actual metrics, graphs, alert rules etc. 

Alertmanager is the extension used for configuring alerts. Alertmanager has various integrations available for alerting including email, slack, hipchat, pagerduty etc. Prometheus has evolved over time, and it integrates well with the dropwizard metrics library. 

Prometheus - time-series based cassandra monitoring

Grafana

Grafana is a visualisation tool which can be used to visualize any time-series metrics. Grafana has various panels to showcase the data. The most commonly used panel is a graph. A graph is used to plot incoming data against a time-series in two dimensions. 

Grafana integrates with various data sources. These sources are queried in real-time by Grafana to obtain metrics. Each Grafana panel has one or more queries configured to query a data source; the result of the query is rendered on the panel. Grafana uses Prometheus as a well-integrated data source.

Grafana - Time series metrics visualization

Cassandra Exporter

Cassandra exporter is Instaclustr’s open-source solution for collecting Cassandra metrics efficiently. It is designed to integrate with Cassandra JVM and collect and publish metrics. Hence, Cassandra exporter is a replacement for the JMX metrics. 

JMX metrics in Cassandra have performance limitations and hence can cause some issues if used on systems with a large number of nodes. The Cassandra exporter has been well tested for optimal performance monitoring. The metrics produced by Cassandra exporter are also time-series and can be readily consumed by Prometheus. Please refer to the github page for information regarding configuration and usage. 

Conclusion

Cassandra monitoring is essential to get insight into the database internals. Monitoring is a must for production systems to ensure optimal performance, alerting, troubleshooting, and debugging. There are a large number of Cassandra metrics out of which important and relevant metrics can provide a good picture of the system. 

Finally, Instaclustr has the Cassandra monitoring expertise and capability with various options. 

  • Cassandra exporter is an excellent open source tool for optimal monitoring performance on large Cassandra clusters. 
  • Instaclustr Cassandra managed service uses a comprehensive monitoring-alerting service with 24×7 support and it is a good option to outsource all Cassandra operations and it comes with a free trial.

Instaclustr Cassandra Consulting services can help you with any monitoring or other Cassandra operations.

The post Cassandra Monitoring: A Best Practice Guide appeared first on Instaclustr.

Building a Low-Latency Distributed Stock Broker Application: Part 3

In the third blog of  “Around the World ” series focussing on globally distributed storage, streaming and search, we build a Stock Broker Application. 

1. Place Your Bets!

London Stock Exchange 1800’s

How did Phileas Fogg make his fortune? Around the World in Eighty Days describes Phileas Fogg in this way: 

Was Phileas Fogg rich? Undoubtedly. But those who knew him best could not imagine how he had made his fortune, and Mr. Fogg was the last person to whom to apply for the information.

I wondered if he had made his fortune on the Stock Market, until I read this:

Certainly an Englishman, it was more doubtful whether Phileas Fogg was a Londoner. He was never seen on ‘Change, nor at the Bank, nor in the counting-rooms of the “City“‘

Well, even if Fogg wasn’t seen in person at the ‘Change (London Stock Exchange), by 1872 (the year the story is set), it was common to use the telegraph (the internet of the Victorian age, which features regularly in the story) to play the market.

Victorian era Telegraph Office

(Not) a real Victorian era Telegraph Office

In fact the ability of the telegraph to send and receive information faster than horses/trains/boats etc. had been used for stock market fraud as early as 1814! (The “Great Stock Exchange Fraud of 1814”). Coincidentally (or not?), the famous London Stock Exchange Forgery, also involving the telegraph, also occurred in 1872! Perhaps this explains the ambiguity around the origin of Fogg’s wealth!

What is certain is that Phileas Fogg became the subject of intense betting, and he was even listed on the London Stock Exchange (Chapter V – IN WHICH A NEW SPECIES OF FUNDS, UNKNOWN TO THE MONEYED MEN, APPEARS ON ‘CHANGE):

Not only the members of the Reform, but the general public, made heavy wagers for or against Phileas Fogg, who was set down in the betting books as if he were a race-horse. Bonds were issued, and made their appearance on ‘Change; “Phileas Fogg bonds” were offered at par or at a premium, and a great business was done in them. But five days after the article in the bulletin of the Geographical Society appeared, the demand began to subside: “Phileas Fogg” declined. They were offered by packages, at first of five, then of ten, until at last nobody would take less than twenty, fifty, a hundred!”

The 1870’s also saw the introduction of a new technological innovation in Stock Trading, the Stock Ticker machine. Stock tickers were a special type of telegraph receiver designed to print an alphabetical company symbol and the current price of that company’s stock on a paper roll called ticker tape. This enabled stock prices to be communicated closer to real-time across vast distances, and revolutionized trading. 

1870’s Stock Ticker Machine

1870’s Stock Ticker Machine

2. Let’s Build a Stock Broker Application

Fast forward 128 years from 1872 to 2000 and technology looked a bit different. I’m taking inspiration from an earlier project I worked with from 2000-2003 at CSIRO (Australia’s national science research agency) called “StockOnline”. This was an online Stock Broker application designed to benchmark new component-based middleware technologies, including Corba and Enterprise Java (J2EE). The original version simulated traders checking their stock holdings and current stock prices, and then buying and selling stocks, resulting in revised stock holdings. The benchmark could be configured with different workload mixes, and the number of concurrent traders could be scaled up to stress the system under test. Metrics captured included the relative number, throughput, and response time of each of the operations. 

Some of the technology innovations that the project was designed to give insights into included: 

  • the use of application servers to provide easy to manage and scalable container resourcing (yes, containers are at least 20 years old); 
  • how portable the application was across multiple different vendors application servers (sort of);
  •  the impact of JVM choice and settings (lots); 
  • explicit support for component configuration (e.g. wiring components together); and 
  • deployment into containers, rich container services, and multiple persistence models to manage state and allow database portability (e.g. Container Managed Persistence vs. Bean Managed Persistence). 

At the end of the project we made the StockOnline code and documentation open source, and I recently rediscovered it and made it available on github. I was surprised to learn that Enterprise Java is still going strong and is now run by the Eclipse Foundation and called “Jakarta EE”.  Also interesting is that there is support for persistence to Cassandra

So let’s move on to the present day.

3. High-Frequency Low-Latency Trading

Modern stock markets are fast-moving, ultra-competitive environments. Orders are sent to the market by high speed networks and executed almost instantly. This makes low-latency trading a key to profitability.  Trade related latency is any delay in the time it takes for a trader to interact with the market, and includes distance related network delays, delays in receiving and acting on information, and delays caused by brokers (e.g. queuing of orders, delays interacting with the stock exchange to trade the orders, etc.). Some of the key solutions to reducing latency are broker side hosting of orders (orders are hosted on brokers and automatically traded when conditions are met), and Direct Market Access (brokers are as close as possible to stock exchanges, with super-fast network connections).

A new type of US Stock Exchange (IEX) was even created to address some of the issues around fairness of stock trading due to latency. Some brokers are able to take advantage of even small latency differences – “price snipping”, or so called “dark pools” which fill orders from within a pool rather than via public stock exchanges, to make huge profits. Although, somewhat oddly, the IEX levelled the playing field by introducing delays to ensure that no one playing the market has more up-to-date information than anyone else.

Latency is partially caused by the global location of stock exchanges. Where are stock exchanges located?  There are 60 exchanges around the world on every continent with a total value of $69 Trillion, and 16 worth more than $1 Trillion each!

4. Initial Application Design

I had already started writing a new version of StockOnline before I rediscovered the original, so the new version doesn’t share any of the original code. However, it does turn out to have similar entities, but with some significant differences to model multiple StockExchanges and Brokers.  Here’s the UML Class diagram of my new prototype code:

The first major difference is that it’s designed to model and simulate distributed stock brokers across multiple global “georegions”. We introduced the concept of georegions in blog 1 (called “latency regions”) and blog 2 (called “georegions”). A georegion is a geographic region that has at least two AWS regions (for data center redundancy), and ensures that applications within the same georegion are within 100ms latency of each other and users in the same georegion in case of failure of one region.  Here’s the map from the previous blogs showing the eight sub 100ms latency georegions that we identified (North America, South America, Atlantic, Europe, Middle East, Central Asia, East Asia, Australasia):

This means that I have to explicitly model multiple StockExchanges. Each StockExchange is in a particular location in a georegion.  Each StockExchange is responsible for listing some stocks, providing meta-data about the stocks,  publishing changes to stock prices as StockTickers, and matching buy/sell orders (i.e. executing trades).  For simplicity we assume that each stock is only listed on a single StockExchange. 

Each georegion has one or more StockBrokers which are co-located in the same georegion as some StockExchanges to ensure low-latency (and potentially redundancy).  The StockBrokers are responsible for discovering StockExchanges, all the stocks listed for trading, obtaining StockTickers to update current price data, and computing trends and longer term Stock Statistics that inform traders making decisions about buying and selling. They are also responsible for keeping track of trader data, updating StockHoldings for Traders, keeping track of orders and trading them on the appropriate StockExchanges, and keeping records of trades (transactions). Also different to the original version (which only had a single Market Order type), I wanted to have multiple different order types including Market, Limit and Stop Orders. This is important for the latency story as market orders are traded “manually” and immediately, but Limit and Stop Orders are traded automatically when they meet specific conditions, so can be traded very quickly and in larger volumes, this is a good explanation).

We assume that traders connect to their nearest StockBroker (to reduce latency and possibly to satisfy local financial rules). There is a series of operations supported by StockBrokers for traders, and also for interacting with the StockExchanges as follows.  First let’s look at the workflow for trading Market Orders, “Place Market Order”. These are essentially synchronous and immediate trades. The trader connects to the nearest broker, gets their current StockHoldings and current StockStatistics (for their holdings and possibly for other stocks they don’t currently own). Based on this information they decide what stocks to trade, whether to buy or sell, and the quantity of stocks, and create a Market Order. The broker then processes the Market Order (which may involve sending it to another broker), and receives confirmation that the trade occurred (including price, quantity, transaction ID etc.), and finally updates the trader’s StockHoldings for the stock traded. 

The steps to “Process Market Order” are as follows.  The order is sent to a broker in the same Georegion as the StockExchange listing the stock. This broker then immediately executes the trade (buys or sells) with the StockExchange, gets the transaction details, marks the order as filled (so it isn’t processed more than once), and updates the StockHolding amounts for the trader. 

The “Execute Trade with StockExchange” assumes that a trade is always possible (at the current price) and will occur instantaneously and completely, and has the following sub steps:

Market Orders are potentially a slow process due to all the synchronous steps, “think time” for the trader, and cumulative latencies due to the trader to broker, broker to broker, and broker to StockExchange communication paths.

As an alternative we also provide some asynchronous Order types: Limit and Stop. These brder types are only traded when the conditions are met, but then need to be executed as quickly as possible to prevent losses in a fast moving market.

We assume that the initial steps are mostly the same as “Place Market Order”, but with the added choice of Limit of Stop Order, and the limit price, and the final step (notification of eventual Trade) is asynchronous:

Once the Order placed, it is processed by sending it to the correct broker (as for Market Orders), and then that broker is responsible for continuously checking orders to see if they match:

This is done as follows (“Trade Matching Orders”) and relies on each broker receiving a stream of StockTicker updates from the StockExchanges in the same georegion. For each StockTicker the broker finds orders for that stock, and checks which orders meet the buy/sell conditions (the logic depends on the order type, if the price is rising or dropping, and if the current price is less than or greater to the order limit price). If the matching order(s) are Sell Orders then an extra step is to check that the trader still has sufficient holdings of that stock (they may have already sold some stock due to other orders being executed first). If all conditions are met then the broker initiates an immediate “Market” trade with the StockExchange as before.

The initial prototype application code is written in pure Java and just simulates the expected behaviour of the traders, brokers, and StockExchanges. It creates a specified number of georegions, brokers, StockExchanges, stocks, and traders with initial data. Then the simulation runs lots of rounds (seconds) for a specified period (e.g. a day).  Each round results in traders checking their holdings and StockStatistics, and creating orders (about ⅓ of each type, but only if the type matches the specific stock and market conditions). The orders are sent to the correct brokers. Each round the brokers receive simulated StockTickers from StockMarkets in the same georegion (using a pseudo random walk which keeps the stock direction for the majority of the time, but occasionally changing direction). Some stocks are more volatile than others so change faster.  Each round the brokers immediately Trade Market Orders, and check the conditions and trade matching Limit or Stop Orders. 

5. Initial Simulation Results—“Time Is Money”!

The simulation computes business level metrics including number of trades, value of trades, etc., and expected end-to-end latency based on deploying brokers in 8 georegions, and using the AWS inter-region latencies from blog 1 of this series. This gives us a baseline to eventually compare the real results with. The average latency to get the current stock price from a remote StockExchange and “Process Market Orders” is 700ms (max 1.2s), which includes multiple times for intra-region and inter-region networking. The average latency for Limit and Stop “Trade Matching” Orders is shorter at 100ms (max 200ms), as it only includes times to get StockTicker updates and the time to trade;.  i.e. it doesn’t include any AWS inter-region latencies as the operation is asynchronous and processed entirely within the georegion of the broker/StockExchange (we expect this to be slightly higher in practice due to the eventual overhead of database lookups and condition checking on the broker).

So is the saying “Time Is Money!” true in the context of low latency trading, and how much money exactly? I added a calculation to the simulation to compute potential profit losses assuming high volatility in the prices of some stocks, and higher latency times to trade. Due to potentially high volumes of trades even a small profit loss per trade can add up to big losses in profit very quickly.  For one simulation run with 2,101 completed trades, the potential profit loss for the higher latency Market Orders was 0.7% of the trade value (or Market Orders), but for the lower latency Limit and Stop Orders it was significantly less at 0.1% of the trade value (for those order types). For an average order size of $20,000 this corresponds to a $140 profit loss per Market Order, compared with only $20 profit loss for each Limit and Stop Order. Over hundreds or even thousands of trades per day (typical of HFT) this would quickly add up to significant amounts of money! Moreover, to make a profit, High Frequency Trading (HFT) relies on conducting a high volume of trades to rapidly take advantage of very small movements in prices, with potentially smaller profits per trade. So it’s easy to see why minimizing latency is a worthwhile goal in Fintech applications such as this. 

6. What Next?

In the next few blogs we’ll continue our journey “Around the World” and explore how to refine the initial simple design of the application so as to deploy and run it across multiple georegions using multiple AWS regions.

Initially this will involve mapping the components to Cassandra tables on a single data center. Once it’s working correctly with a single Cassandra data center, we’ll extend it to use multiple Cassandra data centers, which will require the use of multiple keyspaces for different replication topologies (e.g. replication across pairs of data centers vs. all data centers). We’ll also work out if, and how, to load-balance the application across multiple data centers in the same georegions, and how to enable redundancy, failover, and recovery at the application level.  It’s possible that a Kubernetes federated microservices mesh framework will help in doing this. We also plan to put Kafka to use to enable streaming StockTickers, so we’ll be investigating Kafka multi data center replication. 

7. Further Resources

IBM also has a StockTrader demonstration application, and an in-depth series about deploying it using Cassandra, Kafka, Redis, and Kubernetes.

There’s an example of stock analysis using Elasticsearch (I’m planning on using Elasticsearch to analyse the stock trends, and provide some map-based visualisation of the data).

This is an interesting article on “Hacking a HFT System”!

The Original CSIRO StockOnline code and documentation is now on github.

The post Building a Low-Latency Distributed Stock Broker Application: Part 3 appeared first on Instaclustr.

A Comprehensive Guide to Cassandra Architecture

Introduction

The Apache Cassandra architecture is designed to provide scalability, availability, and reliability to store massive amounts of data. If you are new to Cassandra, we recommend going through the high-level concepts covered in what is Cassandra before diving into the architecture.  

This blog post aims to cover all the architecture components of Cassandra. After reading the post, you will have a basic understanding of the components. This can be used as a basis to learn about the Cassandra Data Model, to design your own Cassandra cluster, or simply for Cassandra knowledge.

Cluster Topology and Design

Cassandra is based on distributed system architecture. In its simplest form, Cassandra can be installed on a single machine or in a docker container, and it works well for basic testing. A single Cassandra instance is called a node. Cassandra supports horizontal scalability achieved by adding more than one node as a part of a Cassandra cluster. The scalability works with linear performance improvement if the resources are configured optimally.

Cassandra works with peer to peer architecture, with each node connected to all other nodes. Each Cassandra node performs all database operations and can serve client requests without the need for a master node. A Cassandra cluster does not have a single point of failure as a result of the peer-to-peer distributed architecture. 

Nodes in a cluster communicate with each other for various purposes. There are various components used in this process:

  • Seeds: Each node configures a list of seeds which is simply a list of other nodes. A seed node is used to bootstrap a node when it is first joining a cluster. A seed does not have any other specific purpose, and it is not a single point of failure. A node does not require a seed on subsequent restarts after bootstrap. It is recommended to use two to three seed nodes per Cassandra data center (data centers are explained below), and keep the seeds list uniform across all the nodes. 
  • Gossip: Gossip is the protocol used by Cassandra nodes for peer-to-peer communication. The gossip informs a node about the state of all other nodes. A node performs gossip with up to three other nodes every second. The gossip messages follow specific format and version numbers to make efficient communication.

A cluster is subdivided into racks and data centers. These terminologies are Cassandra’s representation of a real-world rack and data center. A physical rack is a group of bare-metal servers sharing resources like a network switch, power supply etc. In Cassandra, the nodes can be grouped in racks and data centers with snitch configuration. Ideally, the node placement should follow the node placement in actual data centers and racks. Data replication and placement depends on the rack and data center configuration. 

Cluster subdivided into Racks and Data centers

Multiple Data Centers

A rack in Cassandra is used to hold a complete replica of data if there are enough replicas, and the configuration uses NetworkTopologyStrategy, which is explained later. This configuration allows Cassandra to survive a rack failure without losing a significant level of replication to perform optimally. 

There are various scenarios to use multiple data centers in Cassandra. Few common scenarios are:

  • Build a Cassandra cluster with geographically distinct data centers which cater to clients from distinct locations, e.g.a cluster with three data centers in US, EU, and APAC serving local clients with low latency.
  • Separate Cassandra data centers which cater to distinct workloads using the same data, e.g. separate data centers to serve client requests and to run analytics jobs.
  • Active disaster recovery by creating geographically distinct data centers, e.g. a cluster with data centers in each US AWS region to support disaster recovery.

Database Structures

Cassandra stores data in tables where each table is organized in rows and columns same as any other database. Cassandra table was formerly referred to as column family. Tables are grouped in keyspaces. A keyspace could be used to group tables serving a similar purpose from a business perspective like all transactional tables, metadata tables, use information tables etc. Data replication is configured per keyspace in terms of replication factor per data center and the replication strategy.  See the replication section for more details.

Each table has a defined primary key. The primary key is divided into partition key and clustering columns. The clustering columns are optional. There is no uniqueness constraint for any of the keys.

The partition key is used by Cassandra to index the data. All rows which share a common partition key make a single data partition which is the basic unit of data partitioning, storage, and retrieval in Cassandra.  

Refer to cassandra-data-partitioning for detailed information about this topic. 

Partitioning

A partition key is converted to a token by a partitioner. There are various partitioner options available in Cassandra out of which Murmur3Partitioner is used by default. The tokens are signed integer values between -2^63 to +2^63-1, and this range is referred to as token range. Each Cassandra node owns a portion of this range and it primarily owns data corresponding to the range. A token is used to precisely locate the data among the nodes and on data storage of the corresponding node.  

It is evident that when there is only one node in a cluster, it owns the complete token range. As more nodes are added, the token range ownership is split between the nodes, and each node is aware of the range of all the other nodes. 

Here is a simplified example to illustrate the token range assignment. If we consider there are only 100 tokens used for a Cassandra cluster with three nodes. Each node is assigned approximately 33 tokens like: 

 node1: 0-33 node2: 34-66 node3: 67-99. 

 If there are nodes added or removed, the token range distribution should be shuffled to suit the new topology. This process takes a lot of calculation and configuration change for each cluster operation. 

Virtual nodes/Vnodes

To simplify the token calculation complexity and other token assignment difficulties, Cassandra uses the concept of virtual nodes referred to as Vnodes. A cluster is divided into a large number of virtual nodes for token assignment. Each physical node is assigned an equal number of virtual nodes. In our previous example, if each node is assigned three Vnodes and each Vnode 11 tokens: 

 v1:0-9, v2:10-19, v3:20-29 so on 

 Each physical node is assigned these vnodes as:

 node1: v1, v4, v7 node2: v2, v5, v8 node3: v3, v6, v9 

Virtual Nodes or Vnodes

The default number of Vnodes owned by a node in Cassandra is 256, which is set by  num_tokens property. When a node is added into a cluster, the token allocation algorithm allocates tokens to the node. The algorithm selects random token values to ensure uniform distribution. But, the num_tokens property can be changed to achieve uniform data distribution. The number of 256 Vnodes per physical node is calculated to achieve uniform data distribution for clusters of any size and with any replication factor. In some large clusters, the 256 Vnode do not perform well please refer blog cassandra-vnodes-how-many-should-i-use for more information.

Replication

The data in each keyspace is replicated with a replication factor. The most common replication factor used is three. There is one primary replica of data which resides with the token owner node as explained in the data partitioning section. The remainder of replicas are placed by Cassandra on specific nodes using replica placement strategy. All replicas are equally important for all database operations except for a few cluster mutation operations.

There are two settings which mainly impact replica placement. First is snitch, which determines the data center, and the rack a Cassandra node belongs to, and it is set at the node level. They inform Cassandra about the network topology so that requests are routed efficiently and allow Cassandra to distribute replicas by grouping machines into data centers and racks. GossipingPropertyFileSnitch is the goto snitch for any cluster deployment. It uses a configuration file called cassandra-rackdc.properties on each node. It contains the rack and data center name which hosts the node. There are cloud-specific snitch available for AWS and GCP. 

The second setting is the replication strategy. The replication strategy is set at keyspace level. There are two strategies: SimpleStrategy and NetworkTopologyStrategy. The SimpleStrategy does not consider racks and multiple data centers. It places data replicas on nodes sequentially. The NetworkTopologyStrategy is rack aware and data center aware. SimpleStrategy should be only used for temporary and small cluster deployments, for all other clusters NetworkTopologyStrategy is highly recommended. A keyspace definition when used with NetworkTopologyStrategy specifies the number of replicas per data center as:

cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy', dc_1: 3, dc_2: 1}

Here, the keyspace named ks is replicated in dc_1 with factor three and in dc_2 with factor one.

Consistency and Availability

Each distributed system works on the principle of CAP theorem. The CAP theorem states that any distributed system can strongly deliver any two out of the three properties: Consistency, Availability and Partition-tolerance. Cassandra provides flexibility for choosing between consistency and availability while querying data. In other words, data can be highly available with low consistency guarantee, or it can be highly consistent with lower availability. For example, if there are three data replicas, a query reading or writing data can ask for acknowledgments from one, two, or all three replicas to mark the completion of the request. For a read request, Cassandra requests the data from the required number of replicas and compares their write-timestamp. The replica with the latest write-timestamp is considered to be the correct version of the data. Hence, the more replicas involved in a read operation adds to the data consistency guarantee. For write requests, the requested number is considered for replicas acknowledgeing the write. 

Naturally, the time required to get the acknowledgement from replicas is directly proportional to the number of replicas requests for acknowledgement. Hence, consistency and availability are exchangeable. The concept of requesting a certain number of acknowledgements is called tunable consistency and it can be applied at the individual query level. 

There are a few considerations related to data availability and consistency: 

  • The replication factor should ideally be an odd number. The common replication factor used is three, which provides a balance between replication overhead, data distribution, and consistency for most workloads.    
  • The number of racks in a data center should be in multiples of the replication factor. The common number used for nodes is in multiples of three. 
  • There are various terms used to refer to the consistency levels – 
    • One, two, three: Specified number of replicas must acknowledge the operation.
    • Quorum: The strict majority of nodes is called a quorum. The majority is one more than half of the nodes. This consistency level ensures that most of the replicas confirm the operation without having to wait for all replicas. It balances the operation efficiency and good consistency. e.g.Quorum for a replication factor of three is (3/2)+1=2; For replication factor five it is (5/2)+1=3.
    • Local_*: This is a consistency level for a local data center in a multi-data center cluster. A local data center is where the client is connected to a coordinator node. The * takes a value of any specific number specified above or quorum, e.g. local_three, local_quorum. 
    • Each_*: This level is also related to multi data center setup. It denotes the consistency to be achieved in each of the data centers independently, e.g. each_quorum means quorum consistency in each data center. 

The data written and read at a low consistency level does not mean it misses the advantage of replication. The data is kept consistent across all replicas by Cassandra, but it happens in the background. This concept is referred to as eventual consistency. In the three replica example, if a user queries data at consistency level one, the query will be acknowledged when the read/write happens for a single replica. In case of a read operation, this could mean relying on a single data replica as a source of truth for the data. In case of a write operation, the remainder replicas receive the data later on and are made consistent eventually. In case of failure of replication, the replicas might not get the data. Cassandra handles replication shortcomings with a mechanism called anti-entropy which is covered later in the post. 

Query Interface

Cassandra Query Language CQL is the interface to query Cassandra with a binary protocol. Earlier versions of Cassandra supported thrift which is now entirely replaced by CQL. CQL is designed to be similar to SQL for a quicker learning curve and familiar syntax. The DDL operations allow to create keyspace and tables, the CRUD operations are select, insert, update, and delete where select is a Cassandra read operation, and all others are Cassandra write operations. 

A table definition includes column definitions and primary, partition, and clustering keys. The table definition also contains several settings for data storage and maintenance. The primary key is a combination of partition key and clustering columns. The clustering columns are optional. The partition key can be a single column or a composite key. 

The query set available in CQL is quite limited as compared to SQL. A few highlights: 

  • Cassandra does not support join operations and nested queries. 
  • Each select query should specify a complete partition key. It is possible to query multiple partitions, but not recommended. Refer cassandra scalability 
  • Cassandra supports a limited set of data aggregation operations.
  • The order by clause can be used only for columns in the clustering key. Also, those should be used in the correct order of precedence.

The reason for a limited query set in Cassandra comes from specific data modelling requirements. The data model for a Cassandra database should be aimed to create denormalized tables which can cater to the select query patterns. Cassandra data modeling is one of the essential operations while designing the database. All the features provided by Cassandra architecture like scalability and reliability are directly subject to an optimum data model. Refer cassandra-data-modelling for details on the topic.  

The Cassandra driver program provides a toolset for connection management, pooling, and querying. The driver creates a connection with a Cassandra node which is then referred to as the coordinator node for the query. The coordinator is responsible for query execution and to aggregate partial results. 

The Datastax Java Driver is the most popular, efficient and feature rich driver available for Cassandra. There are several other technology drivers which provide similar functionality. 

Data Storage

Cassandra uses commit log for each incoming write request on a node. Commit log is a write-ahead log, and it can be replayed in case of failure. The on-disk data structure is called SSTable. SSTables are created per table in the database. 

Example:

Consider a sample keyspace and table created as follows.

cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy','datacenter_1' : 3};

cqlsh> CREATE TABLE ks.tb (
    id int PRIMARY KEY,
    col1 text);

And insert some data:
cqlsh> insert into ks.tb (id, col1) values (1, 'first_row');
cqlsh> insert into ks.tb (id, col1) values (2, 'second_row');
cqlsh> insert into ks.tb (id, col1) values (3, 'third_row');

The data we inserted looks as given below in an SSTable. 

Note that this representation is obtained by a utility to generate human-readable data from SSTables. The actual data in SSTables is in binary format and compressed for efficiency.

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 33,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:07.756013Z" },
        "cells" : [
          { "name" : "col1", "value" : "first_row" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "2" ],
      "position" : 34
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 71,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:29.923397Z" },
        "cells" : [
          { "name" : "col1", "value" : "second_row" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "3" ],
      "position" : 72
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 108,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:39.282459Z" },
        "cells" : [
          { "name" : "col1", "value" : "third_row" }
        ]
      }
    ]
  }
]

Cassandra maintains immutability for data storage to provide optimal performance. Hence, SSTables are immutable. The updates and deletes to data are handled with a new version of data. This strategy results in multiple versions of data at any given time. Cassandra is designed to be optimistic for write operations as compared to the read operations. The read operation consolidates all versions of the data and returns the most recent version. Each data cell is written with a write-timestamp which specifies the time when the particular data was written. This timestamp is used to find the latest version of data while retrieving data for a read operation. 

In the above example, we update data for a column of id 1 and see the result:

cqlsh> update ks.tb set col1='updated_row_one' where id=1;

The resulting data in the SSTable for this update looks like:

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 39,
        "cells" : [
          { "name" : "col1", "value" : "updated_row_one", "tstamp" : "2020-04-14T13:38:37.794697Z" }
        ]
      }
    ]
  }
]

The data looks precisely the same to the newly inserted data. Cassandra identifies this and considers the updated value as it has greater timestamp value. 

The deletes are handled uniquely in Cassandra to make those compatible with immutable data. Each delete is recorded as a new record which marks the deletion of the referenced data. This special data record is called a tombstone. Cassandra read operation discards all the information for a row or cell if a tombstone exists, as it denotes deletion of the data. There are various types of tombstones to denote data deletion for each element, e.g. cell, row, partition, range of rows etc. Cassandra allows setting a Time To Live TTL on a data row to expire it after a specified amount of time after insertion. The data once past its TTL is regarded as a tombstone in Cassandra. Refer managing-tombstones-in-cassandra for operational information and efficiency about tombstones. 

Now with the SSTable example, a cell delete looks like:

cqlsh> delete col1 from ks.tb where id=1;

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 24,
        "cells" : [
          { "name" : "col1", "deletion_info" : { "local_delete_time" : "2020-04-14T13:44:27Z" },
            "tstamp" : "2020-04-14T13:44:27.179254Z"
          }
        ]
      }
    ]
  }
]

The deletion_info indicates that the cell is deleted. This data is the tombstone for the original data and all the data versions. 

Cassandra performs compaction operation on SSTables which consolidates two or more SSTables to form a new SSTable. This process combines all versions of data in participating SSTables. The compaction outputs a single version of data among all obtained versions in the resulting SSTable. Compactions also purge the data associated with a tombstone if all the required conditions for purging are met. There are various strategies to trigger and perform compaction. Refer apache-cassandra-compactions

  • SizeTieredCompactionStrategy (STCS): This is the default compaction strategy. It is triggered using the size of SSTables on-disk. 
  • LevelledCompactionStrategy (LCS): This strategy is used to optimize read performance. This strategy considers the data partitions present in SSTables, and arranges SSTables in levels. Each level has a fixed set of tables and those are compacted with each other.  
  • TimeWindowCompactionStrategy (TWCS): This is a specialized strategy for time series data. It arranges SSTables in time window buckets defined in the table definition. The SSTables within a time window are only compacted with each other. 

The other crucial set of operations performed in Cassandra is anti-entropy. The aim of these operations is to keep data as consistent as possible. The anti-entropy enables Cassandra to provide the eventual consistency model.

  • Hinted Handoff: If a node in Cassandra is not available for a short period, the data which is supposed to be replicated on the node is stored on a peer node. This data is called hints. Once the original node becomes available, the hints are transferred to the node, and the node is caught up with missed data. There are time and storage restrictions for hints. If a node is not available for a longer duration than configured, no hints are saved for it. Hints cannot be considered as a primary anti-entropy mechanism.
  • Read Repair: Read operation is used as an opportunity to repair inconsistent data across replicas. The latest write-timestamp is used as a marker for the correct version of data. The read repair operation is performed only in a portion of the total reads to avoid performance degradation. Read repairs are opportunistic operations and not a primary operation for anti-entropy.
  • Repair: Repair is the primary anti-entropy operation to make data consistent across replicas. Repairs are performed by creating specialized data structures called Merkel-trees. These are hash values of all data values in a replica. Then these are transferred to other replicas and compared to detect inconsistencies. The correct data is then streamed across nodes to repair the inconsistencies.

Repairs need to be scheduled manually as these are intensive operations that consume a significant amount of cluster resources. 

Write Path

Cassandra write path is the process followed by a Cassandra node to store data in response to a write operation. A coordinator node initiates a write path and is responsible for the request completion. 

The high-level steps are as follows:

  1. The partitioner applies hash to the partition key of an incoming data partition and generates a token.
  2. The node is identified where the partition belongs to and all the nodes where the replicas reside for the partition.
  3. Write request is forwarded to all replica nodes, and acknowledgement is awaited. 
  4. As the number of nodes required to fulfil the write consistency level acknowledge the request completion, the write operation completes. 

An example with a six node cluster, a replication factor of three and a write request consistency of quorum. 

Quorum for RF 3 = (3/2)+1 = 2

Common error scenarios:

  1. If the sufficient number of nodes required to fulfil the request are not available, or do not return the request acknowledgement, coordinator throws an exception.  
  2. Even after satisfying the request with the required number of replica acknowledgements, if an additional node which stores a replica for the data is not available,  the data could be saved as a hint on another node. 

In a multi-data center cluster, the coordinator forwards write requests to all applicable local nodes. For the remote data centers, the write request is forwarded to a single node per data center. The node replicates data to the data center with the required number of nodes to satisfy the consistency level. 

The Anatomy of a Write Operation on a Node

This operation involves commit log, memtable and SSTable. Commit log is a write-ahead log which is stored on-disk. The write operation is recorded in the commit log of a node, and the acknowledgement is returned. The data is then stored in a memtable which is in memory structure representing SSTable on-disk. The memtable is flushed to disk after reaching the memory threshold which creates a new SSTable. The SSTables are eventually compacted to consolidate the data and optimize read performance.

Read Path 

Cassandra read path is the process followed by a Cassandra node to retrieve data in response to a read operation. The read path has more steps than the write path. Actions performed to serve a read request are as follows:

  1. The coordinator generates a hash using the partition key and gathers the replica nodes which are responsible for storing the data.
  2. The coordinator checks if replicas required to satisfy the read consistency level are available. If not, an exception is thrown, and the read operation ends.
  3. The coordinator then sends a read data request to the fastest responding replica; the fastest replica could be the coordinator itself. The fast replica is determined by dynamic snitch, which keeps track of node latencies dynamically.
  4. The coordinator then sends a digest request to the replicas of data. The digest is a hash calculated over requested data by the replica nodes.
  5. The coordinator compares all the digests to determine whether all the replicas have a consistent version of the data. If those are equal, it returns the result obtained from the fastest replica.

If the digests from all the replicas are not equal, it means some replicas do not have the latest version of the data. In this case, read data requests for all the replicas are issued, and the data with the latest timestamp is returned to the client. Also, read repair requests are issued for the replicas which do not have the latest data version.

Components involved in a read operation on a node:

  • Row cache: This is a cache for frequently read data rows, also referred to as hot data. It stores a complete data row which can be returned directly to the client if requested by a read operation. This is an optional feature and works best if there are a small number of hot rows which can fit in the row cache.
  • Partition key cache: This component caches the partition index entries per table which are frequently used. In other words, it stores the location of partitions which are commonly queried but not the complete rows. This feature is used by default in Cassandra, but it can be optimized more.
  • Bloom filter: A bloom filter is a data structure which indicates if a data partition could be included in a given SSTable. The positive result returned by a bloom filter can be a false signal, but the negative results are always accurate. Hence it saves a lot of seek-time for read operations.
  • Partition index and summary: A partition index contains offset of all partitions for their location in SSTable. The partition summary is a summary of the index. These components enable locating a partition exactly in an SSTable rather than scanning data.
  • Memtable: Memtable is in-memory representation of SSTables. If a data partition is present in memtable, it can be directly read for specific data rows and returned to the client.
  • Compression offset map: This is the map for locating data in SSTables when it is compressed on-disk. 
  • SSTable: The on-disk data structure which holds all the data once flushed from memory. 

Anatomy of Read Operation on a Node

  1. Cassandra checks the row cache for data presence. If present, the data is returned, and the request ends.
  2. The flow of request includes checking bloom filters. If the bloom filter indicates data presence in an SSTable, Cassandra continues to look for the required partition in the SSTable.
  3. The key cache is checked for the partition key presence. The cache hit provides an offset for the partition in SSTable. This offset is then used to retrieve the partition, and the request completes.
  4. Cassandra continues to seek the partition in the partition summary and partition index. These structures also provide the partition offset in an SSTable which is then used to retrieve the partition and return. The caches are updated if present with the latest data read. 

Conclusion

Cassandra architecture is uniquely designed to provide scalability, reliability, and performance. It is based on distributed system architecture and operates on CAP theorem.  Cassandra’s unique architecture needs careful configuration and tuning. It is essential to understand the components in order to use Cassandra efficiently.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post A Comprehensive Guide to Cassandra Architecture appeared first on Instaclustr.

Dial C* for Operator - Creating a Cassandra Cluster with Cass Operator

In this post we are going to take a deep dive look at provisioning a Cassandra cluster using the DataStax Kubernetes operator for Cassandra, Cass Operator. We will set up a multi-rack cluster with each rack in a different availability zone.

For the examples, I will use a nine node, regional cluster in Google Kubernetes Engine (GKE) that is spread across three zones. Here is what my Kubernetes cluster looks like:

$ kubectl get nodes --label-columns failure-domain.beta.kubernetes.io/region,failure-domain.beta.kubernetes.io/zone | awk {'print $1" "$6" "$7'} | column -t
NAME                                     REGION    ZONE
gke-cass-dev-default-pool-3cab2f1f-3swp  us-east1  us-east1-d
gke-cass-dev-default-pool-3cab2f1f-408v  us-east1  us-east1-d
gke-cass-dev-default-pool-3cab2f1f-pv6v  us-east1  us-east1-d
gke-cass-dev-default-pool-63ec3f9d-5781  us-east1  us-east1-b
gke-cass-dev-default-pool-63ec3f9d-blrh  us-east1  us-east1-b
gke-cass-dev-default-pool-63ec3f9d-g4cb  us-east1  us-east1-b
gke-cass-dev-default-pool-b1ee1c3c-5th7  us-east1  us-east1-c
gke-cass-dev-default-pool-b1ee1c3c-ht20  us-east1  us-east1-c
gke-cass-dev-default-pool-b1ee1c3c-xp2v  us-east1  us-east1-c

Operator Concepts

Without getting into too much detail, I want to quickly cover some fundamental concepts for some of the things we will discuss in this post. Kubernetes is made up of controllers. A controller manages the state of one more Kubernetes resource types. The controller executes an infinite loop continually trying to converge the desired state of resources with their actual state. The controller watches for changes of interest in the Kubernetes cluster, i.e., a resource added, deleted, or updated. When there is a change, a key uniquely identifying the effected resource is added to a work queue. The controller eventually gets the key from the queue and begins whatever work is necessary.

Sometimes a controller has to perform potentially long-running operations like pulling an image from a remote registry. Rather than blocking until the operation completes, the controller usually requeues the key so that it can continue with other work while the operation completes in the background. When there is no more work to do for a resource, i.e. the desired state matches the actual state, the controller removes the key from the work queue.

An operator consists of one or more controllers that manage the state of one or more custom resources. Every controller has a Reconciler object that implements a reconcile loop. The reconcile loop is passed a request, which is the resource key.

A Few Words on Terminology

A Kubernetes worker node, Kubernetes worker, or worker node is a machine that runs services necessary to run and manage pods. These services include:

  • kubelet
  • kube-proxy
  • container runtime, e.g., Docker

A Cassandra node is the process running in a container.

A Cassandra container is the container, i.e., Docker container, in which the Cassandra node is running.

A Cassandra pod is a Kubernetes pod that includes one more containers. One of those containers is running the Cassandra node.

Installing the Operator

Apply the cass-operator-manifests.yaml manifests as follows:

$ kubectl create -f https://raw.githubusercontent.com/datastax/cass-operator/b96bfd77775b5ba909bd9172834b4a56ef15c319/docs/user/cass-operator-manifests.yaml
namespace/cass-operator created
serviceaccount/cass-operator created
secret/cass-operator-webhook-config created
customresourcedefinition.apiextensions.k8s.io/cassandradatacenters.cassandra.datastax.com created
clusterrole.rbac.authorization.k8s.io/cass-operator-cluster-role created
clusterrolebinding.rbac.authorization.k8s.io/cass-operator created
role.rbac.authorization.k8s.io/cass-operator created
rolebinding.rbac.authorization.k8s.io/cass-operator created
service/cassandradatacenter-webhook-service created
deployment.apps/cass-operator created
validatingwebhookconfiguration.admissionregistration.k8s.io/cassandradatacenter-webhook-registration created

Note: The operator is deployed in the cass-operator namespace.

Make sure that the operator has deployed successfully. You should see output similar to this:

$ kubectl -n cass-operator get deployments
NAME            READY   UP-TO-DATE   AVAILABLE   AGE
cass-operator   1/1     1            1           2m8s

Create a Storage Class

We need to create a StorageClass that is suitable for Cassandra. Place the following in a file named server-storageclass.yaml:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: server
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-ssd
  replication-type: none
volumeBindingMode: WaitForFirstConsumer
reclaimPolicy: Delete

One thing thing to note here is volumeBindingMode: WaitForFirstConsumer. The default value is Immediate and should not be used. It can prevent Cassandra pods from being scheduled on a worker node. If a pod fails to run and its status reports a message like, had volume node affinity conflict, then check the volumeBindingMode of the StorageClass being used. See Topology-Aware Volume Provisioning in Kubernetes for more details.

Create the StorageClass with:

$ kubectl -n cass-operator apply -f server-storageclass.yaml
storageclass.storage.k8s.io/server-storage created

The Spec

Most Kubernetes resources define spec and status properties. The spec declares the desired state of a resource which includes configuration settings provided by the user, default values expanded by the system, and other properties initialized by other internal components after resource creation. We will talk about the status in a little bit.

The manifest below declares a CassandraDatacenter custom resource. It does not include all possible properties. It includes the minimum necessary to create a multi-zone cluster.

apiVersion: cassandra.datastax.com/v1beta1
kind: CassandraDatacenter
metadata:
  name: multi-rack
spec:
  clusterName: multi-rack
  serverType: cassandra
  serverVersion: 3.11.6
  managementApiAuth:
    insecure: {}
  size: 9
  racks:
  - name: us-east1-b
    zone: us-east1-b
  - name: us-east1-c
    zone: us-east1-c
  - name: us-east1-d
    zone: us-east1-d    
  storageConfig:
    cassandraDataVolumeClaimSpec:
      storageClassName: standard
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 5Gi

This spec declares a single Cassandra datacenter. Cass Operator does support multi-DC clusters. It requires creating a separate CassandraDatacenter for each datacenter. Discussion of multi-DC clusters is outside the scope of this post.

The size property specifies the total number of Cassandra nodes in the datacenter.

racks is an array of Rack objects which consist of name and zone properties. The zone should be the name of a zone in GCP (or an AWS zone if the cluster was running in AWS for example). The operator will use this to pin Cassandra pods to Kubernetes workers in the zone. More on this later.

Create the CassandraDatacenter

Put the above manifest in a file named multi-rack-cassdc.yaml and then run:

$ kubectl -n cass-operator apply -f multi-rack-cassdc.yaml
cassandradatacenter.cassandra.datastax.com/multi-rack created

This creates a CassandraDatacenter object named multi-rack in the Kubernetes API server. The API server provides a REST API with which clients, like kubectl, interact. The API server maintains state in etcd. Creating a Kubernetes resource ultimately means persisting state in etcd. When the CassandraDatacenter object is persisted, the API server notifies any clients watching for changes, namely Cass Operator. From here the operator takes over. The new object is added to the operator’s internal work queue. The job of the operator is to make sure the desired state, i.e., the spec, matches the actual state of the CassandraDatacenter.

Now that we have created the CassandraDatacenter, it is time to focus our attention on what Cass Operator is doing to build the Cassandra cluster.

Monitor the Progress

We will look at a couple things to monitor the progress of the provisioning or scaling up of the cluster:

  • Changes in the status of the CassandraDatacenter
  • Kubernetes events emitted by the operator

We have already discussed that the spec describes a resource’s desired state. The status on the other hand, describes the object’s current, observed state. Earlier I mentioned that the Kubernetes API server provides a REST API to clients. A Kubernetes object or resource is a REST resource. The status of a Kubernetes resource is typically implemented as a REST subresource that can only be modified by internal, system components. In the case of a CassandraDatacenter, Cass Operator manages the status property.

An event is a Kubernetes resource that is created when objects like pods change state, or when an error occurs. Like other resources, events get stored in the API server. Cass Operator generates a number of events for a CassandraDatacenter.

Understandng both the changes in a CassandraDatacenter’s status and the events emitted by the operator provide valuable insight into what is actually happening during the provisioning process. That understanding also makes it easier to resolve issues when things go wrong. This applies not only to CassandraDatacenter, but also to other Kubernetes resource as well.

Monitor Status Updates

We can watch for changes in the status with:

$ kubectl -n cass-operator get -w cassdc multi-rack -o yaml

In the following sections we will discuss each of the status updates that occur while the operator works to create the Cassandra cluster.

Initial Status Update

Here is what the status looks like initially after creating the CassandraDatacenter:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  nodeStatuses: {}

cassandraOperatorProgress can have one of two values, Ready or Updating. It will change to Ready when the operator has no more work to do for the resource. This simple detail is really important, particularly if you are performing any automation with Cass Operator. For example, I have used Cassandra operators to provision clusters for integration tests. With Cass Operator my test setup code could simply poll cassandraOperatorProgress to know when the cluster is ready.

conditions is an array of DatacenterCondition objects. A lot of Kubernetes resources use conditions in their statuses. Conditions represent the latest observations of an object’s state. They should minimally include type and status fields. The status field can have as its value either True, False, or Unknown. lastTransitionTime is the time the condition transitioned from one status to another. type identifies the condition. CassandraDatacenter currently has the following condition types:

  • Ready
  • Initialized
  • ReplacingNodes
  • ScalingUp
  • Updating
  • Stopped
  • Resuming
  • RollingRestart

Implementing, understanding, and using conditions are often points of confusion. It is intuitive to think of and model a resource’s state as a state machine. Reminding yourself conditions are observations and not a state machine will go a long way in avoiding some of that confusion. It is worth noting there has been a lot of debate in the Kubernetes community about whether conditions should be removed. Some of the latest discussions in this ticket indicate that they are will remain.

lastRollingRestart is only updated when a rolling restart is explicitly requested. As we will see its value will remain unchanged, and therefore we will be ignoring it for this post.

nodeStatuses is a map that provides some details for each node. We will see it get updated as nodes are deployed.

Cassandra Node Starting

With the next update we see that a lastServerNodeStarted property has been added to the status:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:41:24Z"
  nodeStatuses: {}

lastServerNodeStarted gets updated when a Cassandra node is starting up. The operator also adds the label cassandra.datastax.com/node-state: Starting to the Cassandra pod. The astute reader may have noted that I said the lastServerNodeStarted is updated when Cassandra node is starting up rather than when the pod is starting up. For Cass Operator, there is a important distinction between the Cassandra node and the Cassandra container. The Cassandra Container section at the end of the post goes over this in some detail.

Cassandra Node Started

In the next update lastServerNodeStarted is modified and another entry is added to nodeStatuses:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:41:50Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5

The entry is keyed by the pod name, multi-rack-multi-rack-us-east1-b-sts-2. The value consists of two fields - the node’s host id and its IP address.

When Cass Operator determines the Cassandra node is up running, it updates the node-state label to cassandra.datastax.com/node-state: Started. After the label update, the operator uses a label selector query to see which pods have been started and are running. When the operator finds another node running, its host ID and IP address will be added to nodeStatuses.

Remaining Nodes Started

In this section we follow the progression of the rest of the Cassandra cluster being started. lastServerNodeStarted is changed with each of these status updates in addition to nodeStatuses being updated.

multi-rack-multi-rack-us-east1-c-sts-0 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:42:49Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3

Next, multi-rack-multi-rack-us-east1-d-sts-0 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:43:53Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4

Next, multi-rack-multi-rack-us-east1-c-sts-2 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:44:54Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4

Next, multi-rack-multi-rack-us-east1-d-sts-0 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:45:50Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3

With five out of the nine nodes started, now is a good time point out a couple things. First, we see one node at a time is added to nodeStatuses. Based on this it stands to reason Cass Operator is starting nodes serially. That is precisely what is happening.

Secondly, there is roughly a minute difference between the values of lastServerNodeStarted in each status update. It is taking about a minute or so to start each node, which means it should take somewhere between nine and ten minutes for the cluster to be ready. These times will almost certainly vary depending on a number of factors like the type of disks used, the machine type, etc. It is helpful though, particularly for larger cluster sizes, to be able to gauge how long it will take to get the entire cluster up and running.

Next, multi-rack-multi-rack-us-east1-d-sts-2 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:46:51Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Next, multi-rack-multi-rack-us-east1-b-sts- is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:00Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Next, multi-rack-multi-rack-us-east1-c-sts-1 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Finally, multi-rack-multi-rack-us-east1-b-sts-1 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Although all nine nodes are now started, the operator still has more work to do. This is evident based on the ScalingUp condition still being True and cassandraOperatorProgress still having a value of Updating.

Cassandra Super User Created

With the next update the superUserUpserted property is added to the status:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

superUserUpserted is the timestamp at which the operator creates a super user in Cassandra. We will explore this in a little more detail when we go through the events.

ScalingUp Transition

In this update the ScalingUp condition transitions to False. This condition changes only after all nodes have been started and after the super user has been created.

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "False"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

Add Initialized and Ready Conditions

Next, the operator adds the Initialized and Ready conditions to the status. Initialized means the CassandraDatacenter was successfully created. The transition for this condition should only happen once. Ready means the cluster can start serving client requests. The Ready condition will remain True during a rolling restart for example but will transition to False when all nodes are stopped. See The Cassandra Container section at the end of the post for more details on starting and stopping Cassandra nodes.

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "False"
    type: ScalingUp
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Initialized
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Ready
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

Final Status Update

In the last update, the value of cassandraOperatorProgress is changed to Ready:

status:
  cassandraOperatorProgress: Ready
  conditions:
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "False"
    type: ScalingUp
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Initialized
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Ready
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

We now know the operator has completed its work to scale up the cluster. We also know the cluster is initialized and ready for use. Let’s verify the desired state of the CassandraDatacenter matches actual state. We can do this with nodetool status and kubectl get nodes.

$ kubectl -n cass-operator exec -it multi-rack-multi-rack-us-east1-b-sts-0 -c cassandra -- nodetool status
Datacenter: multi-rack
======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  10.32.4.4  84.43 KiB  1            4.8%              c7e43757-92ee-4ca3-adaa-46a128045d4d  us-east1-d
UN  10.32.1.4  70.2 KiB   1            7.4%              3b1b60e0-62c6-47fb-93ff-3d164825035a  us-east1-b
UN  10.32.6.3  65.36 KiB  1            32.5%             dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76  us-east1-c
UN  10.32.3.3  103.54 KiB  1            34.0%             785e30ca-5772-4a57-b4bc-4bd7b3b24ebf  us-east1-d
UN  10.32.7.6  70.34 KiB  1            18.1%             a55082ba-0692-4ee9-97a2-a1bb16383d31  us-east1-c
UN  10.32.8.5  65.36 KiB  1            19.8%             facbbaa0-ffa7-403c-b323-e83e4cab8756  us-east1-c
UN  10.32.2.6  65.36 KiB  1            36.5%             d7246bca-ae64-45ec-8533-7c3a2540b5ef  us-east1-b
UN  10.32.0.5  65.36 KiB  1            39.9%             62399b3b-80f0-42f2-9930-6c4f2477c9bd  us-east1-b
UN  10.32.5.4  65.36 KiB  1            7.0%              8e8733ab-6f7b-4102-946d-c855adaabe49  us-east1-d

nodetool status reports nine nodes up across three racks. That looks good. Now let’s verify the pods are running where we expect them to be.

$ kubectl -n cass-operator get pods -l "cassandra.datastax.com/cluster=multi-rack" -o wide | awk {'print $1" "$7'} | column -t
NAME                                    NODE
multi-rack-multi-rack-us-east1-b-sts-0  gke-cass-dev-default-pool-63ec3f9d-5781
multi-rack-multi-rack-us-east1-b-sts-1  gke-cass-dev-default-pool-63ec3f9d-blrh
multi-rack-multi-rack-us-east1-b-sts-2  gke-cass-dev-default-pool-63ec3f9d-g4cb
multi-rack-multi-rack-us-east1-c-sts-0  gke-cass-dev-default-pool-b1ee1c3c-5th7
multi-rack-multi-rack-us-east1-c-sts-1  gke-cass-dev-default-pool-b1ee1c3c-ht20
multi-rack-multi-rack-us-east1-c-sts-2  gke-cass-dev-default-pool-b1ee1c3c-xp2v
multi-rack-multi-rack-us-east1-d-sts-0  gke-cass-dev-default-pool-3cab2f1f-3swp
multi-rack-multi-rack-us-east1-d-sts-1  gke-cass-dev-default-pool-3cab2f1f-408v
multi-rack-multi-rack-us-east1-d-sts-2  gke-cass-dev-default-pool-3cab2f1f-pv6v

Look carefully at the output, and you will see each pod is in fact running on a separate worker node. Furthermore, the pods are running on worker nodes in the expected zones.

Monitor Events

The operator reports a number of events useful for monitoring and debugging the provisioning process. As we will see, the events provide additional insights absent from the status updates alone.

There are some nuances with events that can make working with them a bit difficult. First, events are persisted with a TTL. They expire after one hour. Secondly, events can be listed out of order. The ordering appears to be done on the client side with a sort on the Age column. We will go through the events in the order in which the operator generates them. Lastly, while working on this post, I discovered that some events can get dropped. I created this ticket to investigate the issue. Kubernetes has in place some throttling mechanisms to prevent the system from getting overwhelmed by too many events. We won’t go through every single event as there are a lot. We will however cover enough, including some that may be dropped, in order to get an overall sense of what is going on.

We can list all of the events for the CassandraDatacenter with the describe command as follows:

$ kubectl -n cass-operator describe cassdc multi-rack
Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-b
  Normal  CreatedResource    12m    cass-operator  Created service multi-rack-seed-service
  Normal  CreatedResource    12m    cass-operator  Created service multi-rack-multi-rack-all-pods-service
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-b-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-c-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-d-sts
  Normal  CreatedResource    12m    cass-operator  Created service multi-rack-multi-rack-service
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-c
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-d
  Normal  LabeledPodAsSeed   12m    cass-operator  Labeled pod a seed node multi-rack-multi-rack-us-east1-b-sts-2
  Normal  StartingCassandra  12m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2
  Normal  StartedCassandra   11m    cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2
  Normal  StartingCassandra  11m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-0
  Normal  StartingCassandra  10m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-0
  Normal  StartedCassandra   10m    cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-0
  Normal  LabeledPodAsSeed   10m    cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-c-sts-0
  Normal  LabeledPodAsSeed   9m44s  cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-d-sts-0
  Normal  StartedCassandra   9m43s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-0
  Normal  StartingCassandra  9m43s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-2
  Normal  StartedCassandra   8m43s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-2
  Normal  StartingCassandra  8m43s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  Normal  StartedCassandra   7m47s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  Normal  StartingCassandra  7m46s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-2
  Normal  StartedCassandra   6m45s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-2
  Normal  StartingCassandra  6m45s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-0
  Normal  LabeledPodAsSeed   5m36s  cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-b-sts-0

In the following sections we will go through several of these events as well as some that are missing.

Create Headless Services

The first thing that Cass Operator does during the initial reconciliation loop is create a few headless services:

  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedResource    10m    cass-operator  Created service multi-rack-seed-service
  Normal  CreatedResource    10m    cass-operator  Created service multi-rack-multi-rack-all-pods-service
  Normal  CreatedResource    10m    cass-operator  Created service multi-rack-multi-rack-service    

multi-rack-seed-service exposes all pods running seed nodes. This service is used by Cassandra to configure seed nodes.

multi-rack-multi-rack-all-pods-service exposes all pods that are part of the CassandraDatacenter, regardless of whether they are ready. It is used to scrape metrics with Prometheus.

multi-rack-multi-rack-service exposes ready pods. CQL clients should use this service to establish connections to the cluster.

Create StatefulSets

Next the operator creates three StatefulSets, one for each rack:

  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-b-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-c-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-d-sts

I mentioned earlier the operator will use the zone property specified for each rack to pin pods to Kubernetes workers in the respective zones. The operator uses affinity rules to accomplish this.

Let’s take a look at the spec for multi-rack-multi-rack-us-east1-c-sts to see how this is accomplished:

$ kubectl -n cass-operator get sts multi-rack-multi-rack-us-east1-c-sts -o yaml
...
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: failure-domain.beta.kubernetes.io/zone
                operator: In
                values:
                - us-east1-c
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchExpressions:
              - key: cassandra.datastax.com/cluster
                operator: Exists
              - key: cassandra.datastax.com/datacenter
                operator: Exists
              - key: cassandra.datastax.com/rack
                operator: Exists
            topologyKey: kubernetes.io/hostname
...            

The nodeAffinity property constrains the worker nodes on which pods in the StatefulSet can be scheduled. requiredDuringSchedulingIgnoredDuringExecution is a NodeSelector which basically declares a query based on labels. In this case, if a node has the label failure-domain.beta.kubernetes.io/zone with a value of us-east1-c, then pods can be scheduled on that node.

Note: failure-domain.beta.kubernetes.io/zone is one of a number of well known labels that are used by the Kubernetes runtime.

I added emphasis to can be because of the podAntiAffinity property that is declared. It constrains the worker nodes on which the pods can be scheduled based on the labels of pods currently running on the nodes. The requiredDuringSchedulingIgnoredDuringExecution property is a PodAffinityTerm that defines labels that determine which pods cannot be co-located on a particular host. In short, this prevents pods from being scheduled on any node on which pods from a CassandraDatacenter are already running. In other words, no two Cassandra nodes should be running on the same Kubernetes worker node.

Note: You can run multiple Cassandra pods on a single worker node by setting .spec.allowMultipleNodesPerWorker to true.

Scale up the Racks

The next events involve scaling up the racks:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-b
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-c
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-d

The StatefulSets are initially created with zero replicas. They are subsequently scaled up to the desired replica count, which is three (per StatefulSet) in this case.

Label the First Seed Node Pod

After the StatefulSet controller starts creating pods, Cass Operator applies the following label to a pod to designate it as a Cassandra seed node:

cassandra.datastax.com/seed-node: "true"

At this stage in the provisioning process, no pods have the seed-node label. The following event indicates that the operator designates the pod to be a seed node:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  LabeledPodAsSeed   12m    cass-operator  Labeled pod a seed node multi-rack-multi-rack-us-east1-b-sts-2   

Note: You can use a label selector to query for all seed node pods, e.g., kubectl -n cass-operator get pods -l cassandra.datastax.com/seed-node="true".

Start the First Seed Node

Next the operator starts the first seed node:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  StartingCassandra  12m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2

The operator applies the label cassandra.datastax.com/node-state: Starting to the pod. The operator then requeues the request with a short delay, allowing time for the Cassandra node to start. Requeuing the request ends the current reconciliation.

If you are familiar with Kubernetes, this step of starting the Cassandra node may seem counter-intuitive because pods/containers cannot exist in a stopped state. See The Cassandra Container section at the end of the post for more information.

Update Status of First Seed Node Pod

In a subsequent reconciliation loop the operator finds that multi-rack-multi-rack-us-east1-b-sts-2 has been started and records the following event:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  StartedCassandra   11m    cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2

Then the cassandra.datastax.com/node-state label is updated to a value of Started to indicate the Cassandra node is now running. The event is recorded and the labeled is updated only when the Cassandra container’s readiness probe passes. If the readiness probe fails, the operator will requeue the request, ending the current reconciliation loop.

Start One Node Per Rack

After the first node, multi-rack-multi-rack-us-east1-b-sts-2, is running, the operator makes sure there is a node per rack running. Here is the sequence of events for a given node:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  StartingCassandra  8m43s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  
  Normal  StartedCassandra   7m47s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  
 Normal  LabeledPodAsSeed   9m44s  cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-d-sts-1 

Let’s break down what is happening here.

  • The cassandra.datastax.com/node-state: Starting label is applied to multi-rack-multi-rack-us-east1-d-sts-1
  • Cassandra is started
  • The request is requeued
  • On a subsequent reconciliation loop when Cassandra is running (as determined by the readiness probe), two things happen
    • The cassandra.datastax.com/seed-node="true" label is applied to the pod, making it a seed node
    • The cassandra.datastax.com/node-state label is updated to a value of Started

The operator will repeat this process for another rack which does not yet have a node running.

Now is a good time to discuss how the operator determines how many seeds there should be in total for the datacenter as well as how many seeds there should be per rack.

If the datacenter consists of only one or two nodes, then there will be one or two seeds respectively. If there are more than three racks, then the number of seeds will be set to the number of racks. If neither of those conditions hold, then there will be three seeds.

The seeds per rack are calculated as follows:

seedsPerRack = totalSeedCount / numRacks
extraSeeds = totalSeedCount % numRacks

For the example cluster in this post totalSeedCount will be three. Then seedsPersRack will be one, and extraSeeds will be zero.

Start Remaining Nodes

After we have a Cassandra node up and running in each rack, the operator proceeds to start the remaining non-seed nodes. I will skip over listing events here because they are the same as the previous ones. At this point the operator iterates over the pods without worrying about the racks. For each pod in which Cassandra is not already running, it will start Cassandra following the same process previously described.

Create a PodDisruptionBudget

After all Cassandra nodes have been started, Cass Operator creates a PodDisruptionBudget. It generates an event like this:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedResource    10m6s  cass-operator  Created PodDisruptionBudget multi-rack-pdb

Note: This is one of the dropped events.

A PodDisruptionBudget limits the number of pods that can be down from a voluntary disruption. Examples of voluntary disruptions include accidentally deleting a pod or draining a worker node for upgrade or repair.

All Cassandra pods in the CassandraDatacenter are managed by the disruption budget. When creating the PodDisruptionBudget, Cass Operator sets the .spec.minAvailable property. This specifies the number of pods that must be available after a pod eviction. Cass Operator sets this to the total number of Cassandra nodes minus one.

Create a Cassandra Super User

The final thing that Cass Operator does is to create a super user in Cassandra:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedSuperuser   10m6s  cass-operator  Created superuser

Earlier in the provisioning process, Cass Operator creates the super user credentials and stores them in a secret. The secret name can be specified by setting .spec.superuserSecretName.

The username is set <.spec.clusterName>-superuser which will be multi-rack-superuser for our example. The password is a random UTF-8 string less than or equal to 55 characters.

Note: Cass Operator disables the default super user, cassandra.

The Cassandra Container

Each Cassandra pod runs a container named cassandra. We need to talk about sidecars before we talk about the cassandra container. The sidecar pattern is a very well-known and used architectural pattern in Kubernetes. A pod consists of one or more containers. The containers in a pod share the same volume and network interfaces. Examples for sidecars include things like log aggregation, gRPC proxy, and backup / restore to name a few. Cass Operator utilizes the sidecar pattern but in a more unconventional manner.

We can take a look at the spec of one of the Cassandra pods to learn more about the cassandra container. Because we are only focused on this one part, most of the output is omitted.

$ kubectl -n cass-operator get pod multi-rack-multi-rack-us-east1-b-sts-0 -o yaml
apiVersion: v1
kind: Pod
...
spec:
...
      containers:
      - env:
        - name: DS_LICENSE
          value: accept
        - name: DSE_AUTO_CONF_OFF
          value: all
        - name: USE_MGMT_API
          value: "true"
        - name: MGMT_API_EXPLICIT_START
          value: "true"
        - name: DSE_MGMT_EXPLICIT_START
          value: "true"
        image: datastax/cassandra-mgmtapi-3_11_6:v0.1.0
        imagePullPolicy: IfNotPresent
        livenessProbe:
          failureThreshold: 3
          httpGet:
            path: /api/v0/probes/liveness
            port: 8080
            scheme: HTTP
          initialDelaySeconds: 15
          periodSeconds: 15
          successThreshold: 1
          timeoutSeconds: 1
        name: cassandra
        ports:
        - containerPort: 9042
          name: native
          protocol: TCP
        - containerPort: 8609
          name: inter-node-msg
          protocol: TCP
        - containerPort: 7000
          name: intra-node
          protocol: TCP
        - containerPort: 7001
          name: tls-intra-node
          protocol: TCP
        - containerPort: 8080
          name: mgmt-api-http
          protocol: TCP
        readinessProbe:
          failureThreshold: 3
          httpGet:
            path: /api/v0/probes/readiness
            port: 8080
            scheme: HTTP
          initialDelaySeconds: 20
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 1
        resources: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /config
          name: server-config
        - mountPath: /var/log/cassandra
          name: server-logs
        - mountPath: /var/lib/cassandra
          name: server-data
...          

There are two lines in the output on which we want to focus. The first line is:

name: cassandra

This is the name of the container. There are other containers listed in the output, but we are only concerned with the cassandra one.

The second line that we are interested in is:

image: datastax/cassandra-mgmtapi-3_11_6:v0.1.0

The image property specifies the image that the cassandra container is running. This is different from the Cassandra images such as the ones found on Docker Hub. This image is for the Management API Sidecar. There have been lots of discussions on the Cassandra community mailing lists about management sidecars. In fact there is even a Cassandra Enhancement Proposal (CEP) for providing an official, community based sidecar. The Management API Sidecar, or management sidecar for short, was not designed specifically for Kubernetes.

The process started in the cassandra container is the management sidecar rather than the CassandraDaemon process. The sidecar is responsible for starting/stopping the node. In addition to providing lifecycle management, the sidecar also provides configuration management, health checks, and per node actions (i.e., nodetool).

There is plenty to more to say about the management sidecar, but that is for another post.

Wrapping Up

Hopefully this post gives you a better understanding of Cass Operator and Kubernetes in general. While we covered a lot of ground, there is plenty more to discuss like multi-DC clusters and the management sidecar. If you want to hear more about Cassandra and Kubernetes, Patrick McFadin put together a series of interviews where he talks to early adopters in the field. Check out “Why Tomorrow’s Cassandra Deployments Will Be on Kubernetes” It will be available for streams as a part of the DataStax Accelerate online conference https://dtsx.io/3ex1Eop.

Comparing stress tools for Apache Cassandra

Editors Note: The Last Pickle was recently acquired by DataStax and as part of the new DataStax mission of reorienting to embrace open source Apache Cassandra, this is the first in a series of blog posts that will compare new open source offerings, particularly those now coming out of the new DataStax. In open source spirit we want to embrace you, the community, in choosing the right tool for the right job.

Benchmarking and stress testing Apache Cassandra are important exercises that both operators and developers do regularly. Approaches come in numerous flavours, from “look how my code is better than everyone else’s”, to “what do I need to run this?” and “how much money will this save me?”, and my favourite, “when this dies a horrible death what will that look like?”.

Knowing what you want to achieve and how to interpret the results of these tests is a big part of the challenge. With a run through of these available tools, hopefully that will become easier.

Comparing stress tools for Apache Cassandra


This blog post will look at and compare the following three stress tools:


With these three tools we will step through a number of basic use cases:

  1. Just Generate Some Load
  2. Using a Real Cluster
  3. Iteration and Thread Counts
  4. Observability
  5. Batch sizes and Overwrites
  6. Predefined Workloads
  7. Custom Workloads
  8. Client Contention and Server Saturation

The versions of the tools used in these steps are 3.11.6 for cassandra-stress, 4.0.0 for tlp-stress, and 3.12.77 for nosqlbench.


1. Just Generate Some Load

Sometimes all you want to do is generate some load or data. This is good for when all we want is a cassandra node that is doing something. It can be just to raise the CPU, or to generate some commitlogs, memtables, or sstables on disk.

Each tool will generate a slightly different load configuration for these tests:

$ cassandra-stress write

      Performs over a million writes (after an initial 50k warmup writes) iterating a number of times increasing the number of threads used in the client, starting with four threads.

$ tlp-stress run BasicTimeSeries -i 1M

      Performs exactly one million requests with a 9:1 write-to-read ratio.

$ nb cql-iot write_cl=LOCAL_ONE

      Performs ten million writes during a warmup phase and then ten million requests with a 9:1 write-to-read ratio.

All of them execute writes connected to a localhost Cassandra node, using the java-driver and consistency level LOCAL_ONE.

There is a difference in the model, however, as cassandra-stress uses a simple key value table, while tlp-stress and nosqlbench are using time-series data models.


2. Using a Real Cluster

This repeats the exercise of just generating any load or data, but is used when you have an actual cluster you are targeting.

$ cassandra-stress write -node cassandra1

$ tlp-stress run BasicTimeSeries --host cassandra1

$ nb cql-iot host=cassandra1

Note: There is no need to specify multiple hosts with any of these stress tools. These are contact hosts that are passed to the java driver, and unlike a coded application where you would want multiple contact hosts specified for reliability during deployment and startup, with a stress tool invocation it is reasonable to expect the single contact host specified to be up and running.


3. Iteration and Thread Counts

The following shows how to specify the number of iterations and the number of threads to use, in each of the tools.

$ cassandra-stress write n=100000 -rate threads=16

$ tlp-stress run BasicTimeSeries -n 100k -t 16

$ nb cql-iot write_cl=LOCAL_ONE main-cycles=100k threads=16


4. Observability

Even with well-designed workloads there is a lot more to benchmarking than the final throughput numbers. We want to see how the cluster operates over time. This can be from spikes in traffic to the many background operations Cassandra can perform. Taking a closer look at how Cassandra performs helps plan for a healthy and stable cluster over a longer period of time than what we are able to benchmark.


cassandra-stress
$ cassandra-stress write -graph file=example-benchmark.html title=example revision=benchmark-0

      For more information on this, read our previous blog post on Cassandra-Stress and Graphs.  


nosqlbench
$ nb cql-iot write_cl=LOCAL_ONE --docker-metrics

      Then open http://localhost:3000 in your browser. Note this only works on linux and requires docker to be running on the host. For more info see here.

 

tlp-stress

      Note: tlp-stress has no similar observability feature, but does export Prometheus metrics on port 9501.

The out of the box generated graphs from cassandra-stress are a nice feature. For any serious benchmarking activity though you will want to have metrics from Cassandra graphed and to have insight into the stress tools behaviour beyond just performance numbers.


5. Batch sizes and Overwrites

The following invocation is of particular interest because it has been a pain for those using cassandra-stress. In Cassandra, unlogged batches are not normal and not recommended unless for very small groupings (10-20) of rows within the one partition.

cassandra-stress, by default, puts all writes for any partition into single batches, which makes for poor and unrealistic results. It is impossible to get cassandra-stress to not use batches, and quite convoluted to get it to write batches that consist only of single inserts. More info on this can be read in this ticket CASSANDRA-11105

Overwrite and deletes are not something we see a lot of among published Cassandra benchmarks because its harder to implement. Often this makes sense as workloads like key-value and time-series are likely not overwrite data models. Yet, there are plenty of data models out there that do require these patterns and that we would like to benchmark.


cassandra-stress

      First download batch_too_large.yaml.

$ cassandra-stress user profile=batch_too_large.yaml ops\(insert=1\) -insert visits=FIXED\(10M\)  


tlp-stress

      tlp-stress does not perform unlogged batches by default like cassandra-stress. If unlogged batches are desired you need to write your own workload, see the Custom Workloads section.

      tlp-stress does make deletes very easy, treating them in a similar fashion to the read rate flag. This will make 10% of the operations deletes of previously written data

$ tlp-stress run KeyValue --deletes 0.1

      tlp-stress does overwrites in a similar way to cassandra-stress. This will write 100k operations over 100 partitions. Without clustering keys, this is roughly 1k overwrites on each partition

$ tlp-stress run KeyValue -p 100 -n 100k  


nosqlbench

      nosqlbench can handle overwrites in the same manner as cassandra-stress and tlp-stress by providing a smaller partition count than the iteration count. nosqlbench does not currently provide any deletes or unlogged batch examples. Logged batches have been implemented with custom workloads, so deletes and unlogged batches are probably possible with a custom implemented workload.  


6. Predefined Workloads

cassandra-stress

      cassandra-stress does not have built in workloads. You need to specify the user mode and supply your own configuration as shown in the next section.  


tlp-stress

      tlp-stress has the most extensive list of workloads. These workloads have been used at TLP to demonstrate real limitations with certain features and to provide a hands on approach to recommending the best production solutions.

$ tlp-stress list

    Available Workloads:

    AllowFiltering
    BasicTimeSeries
    CountersWide
    KeyValue
    LWT
    Locking
    Maps
    MaterializedViews
    RandomPartitionAccess
    Sets
    UdtTimeSeries

$ tlp-stress run CountersWide  


nosqlbench

      nosqlbench lists the workloads from its predefined yaml workload files. Within these workloads it lists the different phases that are used, and that can be combined. This offers us our first glimpse of how complex and specific a workload can be defined. It also lists the sequences workload, which is not based on the cql driver.

$ nb --list-workloads

    from: cql-keyvalue.yaml
        …
    from: cql-iot.yaml
        …
    from: cql-iot-dse.yaml
        …
    from: cql-tabular.yaml
        …
    from: sequences.yaml
        …


$ nb cql-tabular  


7. Custom Workloads

A benchmark that is part of a feasibility or capacity planning exercise for production environments will nearly always require a custom defined workload.  


cassandra-stress

      For cassandra-stress an example of this was done for the Zipkin project. cassandra-stress can not benchmark more than one table at a time, so there is a separate workload yaml for each table and these have to run as separate invocations. Here we see that cassandra-stress does not support Zipkin’s original schema, specifically UDTs and collections, so the folder above also contains some cql files to create a schema we can stress.

      Create the zipkin test schema

cqlsh -f zipkin2-test-schema.cql

      Fill this schema with some data, throttle as appropriate

$ cassandra-stress  user profile=span-stress.yaml ops\(insert=1\) no-warmup duration=1m  -rate threads=4 throttle=50/s

      Now benchmark a mixed read and write workload, again throttle as appropriate

$ cassandra-stress  user profile=span-stress.yaml ops\(insert=1,by_trace=1,by_trace_ts_id=1,by_annotation=1\)  duration=1m  -rate threads=4 throttle=50/s  

As can be seen above, creating custom workloads in cassandra-stress has always been an involved and difficult experience. While tlp-stress and nosqlbench improve on this situation, they each do so in different ways.


nosqlbench

      nosqlbench provides all of its workload configurations via yaml files. Getting the hang of these will be quite daunting for the newcomer, but along with the documentation provided, and practicing first with taking and tweaking the predefined workloads, there’s a wealth of possibility here.  


tlp-stress

      tlp-stress on the other hand focuses on writing workloads in the code. tlp-stress is written in Kotlin, so if you find Kotlin enjoyable you will find it quick and intuitive to write workloads. The existing workloads can be found here, take a peek and you will see that they are quite simple to write.  


8. Client Contention and Server Saturation

Which benchmark tool is faster? That may sound like a weird question, but it opens some real concerns. Not just in choosing what hardware to run the client on, or how many clients are required, but to know when the results you are getting are nonsense. Understanding the load you want to generate versus what you need to measure is as important to benchmarking as the workload.

It is important to avoid saturating the server. Any benchmark that pushes throughput to its limit is meaningless. A real world (and overly simplified) comparison of this is in OLAP clusters, like those paired with Apache Spark, where without appropriate thresholds put onto the spark-cassandra-connector you can get a yo-yo effect on throughput as the cluster saturates, jams up, and then accepts writes again. With tuning and throughput thresholds put into place, higher throughput is sustainable over time. Responsiveness Under Load (RUL) benchmark is where we apply such throughput limits and observe responsiveness instead.

These problems extend into the client stress tool as well. Unlike the server that can block or load-shed at the defined throughput threshold, the client’s throughput of operations can be either limited or scheduled. This difference can be important, but explaining it goes beyond this blog post. For those interested I’d recommend reading this post on Fixing Coordinated Omission in Cassandra Stress.  


cassandra-stress
$ cassandra-stress write -rate threads=4 fixed=50/s  


nosqlbench

      nosqlbench has no scheduler per se, but deals with reducing coordinated omission via asynchronous requests and a non-fixed thread count. More information on nosqlbench’s timing terminology can be found here.

$ nb cql-iot cyclerate=50 async=256 threads=auto

      Very few production clusters ever demonstrate constant throughput like this, so benchmarking bursts is a real thing. Currently only nosqlbench does this in-process.

$ nb cql-iot cyclerate=50,1.5 async=256 threads=auto

      This specifies a rate threshold of 50 operations per second, with bursts of up to 50%. More information on bursts is available here


tlp-stress

      tlp-stress does not deal with Coordinated Omission. Its --rate flag relies on google’s RateLimiter and limits the throughput, but does not schedule.  


Documentation

Looking through the documentation for each of the tools it is easy to see that nosqlbench offers substantially more. But tlp-stress docs are elegant and easy for the beginner, though they are still missing information on how to implement your own workload (or profile as tlp-stress refers to them).


Wrap Up

cassandra-stress is an advanced tool for very narrow applications against Cassandra. It is quickly a clumsy user-experience and often requires adventures into some awkward code to understand and get things working as expected.

tlp-stress was written as a replacement to cassandra-stress. Apart from not dealing with Coordinated Omission it succeeds in that goal in every aspect: good documentation, a rich command-line user-interface, and is an easy code to understand and contribute to.

nosqlbench takes the next step, aiming to be a YCSB replacement. It feels like a power-user’s tool and comes with the features and capacity to earn that title. Expect to see more and more workloads be made available for testing lots of different technologies in the NoSQL world.

An Introduction to Cassandra Multi-Data Centers: Part 2

In this second blog of  “Around the World in (Approximately) 8 Data Centers” series we catch our first mode of transportation (Cassandra) and explore how it works to get us started on our journey to multiple destinations (Data Centers).

1. What Is a (Cassandra) Data Center?

What does a Data Center (DC) look like? Here are some cool examples of DCs (old and new)!

Arguably the first “electronic” data center was ENIAC, circa 1946. It was, however, just a single (albeit monster) machine! It weighed more than 30 tonnes, occupied 1,800 square feet, consumed 200kW of power, got up to 50C inside, and was rumoured to cause blackouts in neighboring Philadelphia when it was switched on!

first electronic data centre

Jumping forward to the present, a photo of Google DC shows mainly cooling pipes. In common with ENIAC, power and heat are still a feature of modern DCs:

Google data centre

Google Data Center (plumbing)

So what is a Cassandra Data Center? Ever since I started using Cassandra I’ve been puzzled about Cassandra Data Centers (DCs). When you create a keyspace you typically also specify a DC name, for example:

CREATE KEYSPACE here
    WITH replication = {'class': 'NetworkTopologyStrategy', ‘DCHere’ : 3};

The NetworkTopologyStrategy is a production ready replication strategy that allows you to have an explicit DC name. But why do you need an explicit DC name? The reason is that you can actually have more than one data center in a Cassandra Cluster, and each DC can have a different replication factor, for example, here’s an example with two DCs:

CREATE KEYSPACE here_and_there
    WITH replication = {'class': 'NetworkTopologyStrategy', ‘DCHere’ : 3,  ‘DCThere' : 3};

So what does having multiple DCs achieve? Powerful automatic global replication of data! Essentially you can easily create a globally distributed Cassandra cluster where data written to the local DC is asynchronously replicated to all the other DCs in the keyspace.  You can have multiple keyspaces with different DCs and replication factors depending on how many copies and where you want your data replicated to.

But where do the Cassandra DCs come from? Well, it’s easy to create a cluster in a given location and Data Center name in Instaclustr Managed Cassandra!

When creating a Cassandra cluster using the Instaclustr console, there is a section called “Data Center” where you can select from options including: 

Infrastructure Provider, Region, Custom Name, Data Center Network address block, Node Size, EBS Encryption option, Replication Factor, and number of nodes.

The Custom Name is a logical name you can choose for a data center within Cassandra, and is how you reference the data center when you create a keyspace with NetworkTopologyStrategy

So that explains the mystery of single Cassandra Data Center creation. What does this look like once it’s provisioned and running? Well, you can use CQLSH to connect to a node in the cluster, and then discover the data center you are connected to as follows:

cqlsh> use system;
cqlsh:system> select data_center from local;

data_center
-------------
DCHere

How about multiple DCs?  Using Instaclustr Managed Cassandra the simplest way of creating multiple DC Cassandra clusters is to create a single DC cluster first (call it ‘DCHere’). Then in the management console for this cluster, click on “Add a DC”.  You can add one DC at a time to create a cluster with the total number of DCs you need, just follow our support documentation here and here.

2. Multi-Data Center Experiments With CQLSH

So, to better understand how Cassandra DCs work I created a test cluster with 3 nodes in each of three DCs, located in Sydney, Singapore, and North Virginia (USA) AWS regions (9 nodes in total) as follows:

Cassandra test cluster with 3 nodes in each of three DCs
For this experiment, I used cqlsh running on my laptop, located in Canberra (close to Sydney). My initial goal was limited simply to explore latencies and try out failures of DCs. 

To measure latency I turned “tracing on”, and to simulate DC failures I created multiple keyspaces, connected cqlsh to different DCs, and used different consistency levels. 

I created three separate keyspaces for each DC location. This doesn’t result in data replication across DCs, but instead directs all data written to any local DC to the single DC with RF > = 1. I.e. All data will be written to (and read from) the DCSydney DC for the “sydney” keyspace:

Create keyspace "sydney" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 0, 'DCSingapore' : 3, 'DCUSA' : 0 }; 

Create keyspace "singapore" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 0, 'DCUSA' : 0 }; 

Create keyspace "usa" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 0, 'DCUSA' : 0 };

I used a fourth keyspace for replication. Because this has multiple DCs with RF >= 1 the data will be replicated across all of the DCs, i.e. data written to any local DC will be written locally as well as to all other DCs:

Create keyspace "everywhere" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 3, 'DCUSA' : 3 };

2.1 Latency

First let’s look at latency.  To run latency tests I connected cqlsh to the Sydney Data Center.

I varied which keyspace I was writing or reading to/from (Keyspace column), and used consistency level ONE for all of these. ONE means that a write must be written to and acknowledged by at least one replica node, in any DC, so we don’t expect any write/read errors due to writing/read in a local DC which is different to the DC’s in the keyspace. The results show that latency increases from a minimum of 1-2ms (Sydney), to 200ms (Singapore) and 231ms (USA). Compared to the average inter-region network latencies I reported in the previous blog these latencies are 14% higher—the Singapore latency is 200ms (c.f. 174ms), and the USA latency is 231ms (c.f. 204ms). Longer times are to be expected as there is a Cassandra write or read included in this time, on top of the basic network latency. As expected (using consistency ONE) all of the operations succeeded. This table shows the results:

What does this reveal about how Cassandra keyspaces and DCs work? Cqlsh is connected to the Sydney DC as the local DC. For the keyspaces that just have a single DC, the write or read operation can only use that DC and therefore includes the overhead of network latency for the local DC to communicate with the remote DC (with no network overhead for sydney). However, for the “everywhere” keyspace which contains all three DCs, it behaves as if it’s just using the local DC and therefore has a low latency indistinguishable to the results for the “sydney” keyspace. The difference is that the row is written to all the other DCs asynchronously, which does not impact the operation time.  This picture shows the latencies on a map:

3 Cassandra DC's Latencies

2.2 Failures

I also wanted to understand what happens if a DC is unavailable. This was tricky to achieve acting as a typical user for Instaclustr Managed Cassandra (as there’s no way for users to stop/start Cassandra nodes), so I simulated it by using permutations of local DC, keyspaces, and a consistency level of LOCAL_ONE (a write must be sent to, and successfully acknowledged by, at least one replica node in the local DC). This also allows customers to try this out as well. Using LOCAL_ONE means that if cqlsh is connected to the Sydney DC, and the keyspace has a Sydney DC with RF >= 1 then writes and reads will succeed. But if the keyspace only has DCs in other regions (Singapore or USA) then the writes and reads will fail (simulating the failure of remote DCs).  This table shows the results of this experiment:

The results are almost identical to before, but with the key difference that we get a NoHostAvailable error (and therefore no latencies) when the keyspaces are singapore or usa. The keyspace of sydney or everywhere works ok still as the sydney DC is available in both cases. 

Note that Cassandra consistency levels are highly tunable, and there are more options that are relevant to multi-DC Cassandra operation. For example, ALL and EACH_QUORUM (writes only) work across all the DCs, and have stronger consistency, but at the penalty of higher latency and lower availability.

3. Multi-Data Centers Experiments With the Cassandra Java Client

Around the world in 80 days

In our journey “Around the World” it’s important to always have the latest information! As Phileas Fogg discovered, train timetables, like Cassandra documentation, can get out of date very quickly.

I was also interested in testing out the Cassandra Java client with my multi-DC cluster.   I had previously read that by default it supports automatic failover across multiple DCs which I thought would be interesting to see happening in practice. The DCAwareRoundRobinPolicy was recommended in the O’Reilly book “Learning Apache Cassandra (2nd edition 2017)” which says that “this policy is datacenter aware and routes queries to the local nodes first”. This is also the policy I used in my first attempt to connect with Cassandra way back in 2017!

However, a surprise was lying in wait! It turns out that since version 4.0 of the Java client there is no longer a DCAwareRoundRobinPolicy!  

Instead, the default policy now only connects to a single data center, so naturally there is no failover across DCs. You must provide the local DC name and this is the only one the client connects to. This also means that it behaves exactly like the last (LOCAL_ONE) results with cqlsh. This prevents potentially undesirable data consistency issues if you are using DC-local consistency levels but transparently failover to a different DC.

You can either handle any failures in the Java client code (e.g. if a DC is unavailable, pick the backup Cassandra DC and connect to that), or probably the intended approach is for the entire application stack in the DC with the Cassandra failure to failover to a complete copy in the backup region. I tried detecting a failure in the Java client code, and then failing over to the backup DC. This worked as expected. However, in the future I will need to explore how to recover from the failure (e.g. how do you detect when the original DC is available and consistent again, and switch back to using it).

3.1 Redundant Low-Latency DCs

This brings us full circle back to the first blog in the series where we discovered that there are 8 georegions in AWS that provide sub 100ms latency to clients in the same georegion:

8 georegions in AWS that provide sub 100ms latency to clients in the same georegion

The 8 AWS georegions with sub 100ms latency for geolocated clients

Which we then suggested could be serviced with 8 DCs to provide DC failover in each georegion as follows:

8 DCs to provide DC failover in each georegion

The matched pairs of DCs to provide High Availability (HA) and failover for each georegion look like this in table form. These are pairs of DCs that the application code will need to have knowledge of and failover between:

Table - Matched pairs of DCs to provide High Availability (HA) and failover for each georegion

In practice, the read load of the application/client would need to load balance over both of the data centers for some georegions (e.g. North America georegion load balances across both West and East Coast DCs). Depending on the data replication strategy (just replicating data written in each georegion to both redundant DC pairs, or to all DCs in the cluster—this really depends on the actual use case), and the expected extra load on each DC due to failover, DC cluster sizes will need to be sufficient to cope with the normal read/write loads on each DC, the replication write load (write amplification), and load spikes due to DC failures and failover to another DC. 

Based on these limited experiments we are now ready for the next Blog in the series, where we’ll try multi-DC Cassandra out in the context of a realistic globally distributed example application, potentially with multiple keyspaces, data centers and replication factors, to achieve goals including low latency, redundancy, and replication across georegions and even Around The World.

The post An Introduction to Cassandra Multi-Data Centers: Part 2 appeared first on Instaclustr.

Apache Cassandra 4.0 – Audit

Apache Cassandra 4.0 brings about a long awaited feature for tracking and logging database user activity. Primarily aimed at providing a robust set of audit capabilities allowing operators of Cassandra to meet external compliance obligations, it brings yet another enterprise feature into the database.  Combining work for the full query log capability, the audit log capability provides operators with the ability to audit all DML DDL and DCL changes to either a binary file or a user configurable source (including the new Diagnostics notification changes). 

This capability will go a long way toward helping Cassandra operators meet their SOX and PCI requirements.  If you are interested in reading about the development of the feature you can follow along here: https://issues.apache.org/jira/browse/CASSANDRA-12151

From a performance perspective the changes appear to only have a fairly minor hit on throughput and latency when enabled, and no discernible impact when disabled. Expect to see 10% to 15% impact on mixed workload throughput and p99 latency.

By default audit logs are written in the BinLog format and Cassandra comes with tools for parsing and processing them to human readable formats. Cassandra also supports executing an archive command for simple processing of audit logs. Audited keyspaces, users, and command categories can be whitelisted and blacklisted. Audit logging can be enabled in cassandra.yaml. 

What’s the Difference Between Audit Logging, Full Query Logging and Diagnostic Events? 

Both Audit logging (BinAuditLogger) and Full Query logging are managed internally by Apache Cassandra’s AuditLogManager. Both implement IAuditLogger, but are predefined in Apache Cassandra. The main difference is that the full query log receives AuditLogEntries before being processed by the AuditLogFilter. Both the FQL and BAL leverage the same BinLog format and share a common implementation of it. 

Diagnostic events are effectively a queue of internal events that happen in the node. There is an IAuditLogger implementation that publishes filtered LogEntries to the Diagnostics queue if users choose to consume audit records this way.

So think of it this way: Cassandra has an audit facility that enables both configurable audit on actions as well as a full query log, you can have as many AuditLoggers enabled as you want. Diagnostic events is a way for pushing events to client drivers using the CQL protocol and you can pipe AuditEvents to the Diagnostics system!

How Is This Different From Cassandra’s Change Data Capture() Mechanism?

Apache Cassandra has supported CDC on tables for some time now, however the implementation has always been a fairly low level and hard to consume mechanism. CDC in Cassandra is largely just an index into commitlog files that point to data relevant to the table with CDC enabled. It was then up to the consumer to read the commitlog format and do something with it. It also only just captured mutations that were persisted to disk.

Audit logging capability will log all reads, writes, login attempts, schema changes etc. Both features could be leveraged to build a proper CDC stream. I would hazard a guess that it’s probably easier to do with the IAuditLogger interface than consuming the CDC files!

The post Apache Cassandra 4.0 – Audit appeared first on Instaclustr.

Preview Release of Apache Cassandra 4.0

Instaclustr announces immediate availability of a preview release of Apache Cassandra 4.0 on the Instaclustr Managed Platform. This release is designed to allow Instaclustr customers to easily undertake any application-specific testing of Apache Cassandra 4.0 (alpha 3) in preparation for the forthcoming GA release of Apache Cassandra 4.0.

Apache Cassandra 4.0, as the first major release of Apache Cassandra for more than 2 years, is a major step forward for the Apache Cassandra community. Key features in Apache Cassandra 4.0 include:

  • non-blocking IO for internode communication—this has provide significant performance improvements in both user query tail-end latency and streaming operations;
  • virtual tables—providing the ability to retrieve performance metrics and other; metadata directly via CQL rather than requiring external tools;
  • audit logging—the ability to log queries that are executed for compliance purposes; and
  • many (generally minor but adding up) stability and testing improvements.

While Cassandra 4.0 is still in an alpha release phase and not yet ready for production usage, Instaclustr is making this preview release available for several reasons:

  1. Building it into our automated provisioning system was a prerequisite to beginning our own validation testing of the 4.0 stream using our Certified Cassandra Framework, and thus allowing us to be ready to release a full GA release of Cassandra 4.0 on our managed platform as soon as Cassandra 4.0 is ready.
  2. This preview release provides customers an easy path to start to undertake their own application-specific validation of Cassandra 4.0, and interacting with Instaclustr Support to debug any specific issues they may come across.

Being a preview release (of an alpha release of Apache Cassandra), this release has several limitations:

  • It is not supported for production usage and not covered by SLAs. The release should be used for testing purposes only;
  • It does not support Password Authentication, User Authorization, and Client to Node encryption (TLS);
  • CounterMutationStage, ViewMutationStage and DroppedMessage Monitoring does not report results;
  • The following add-ons are not supported:
    • Zeppelin
    • Spark
    • Lucene Index
    • Continuous Backup

We will be working to remove these limitations prior to a final release of Cassandra 4.0.

We look forward to working with our customers in this validation phase of Cassandra 4.0. Should you have any issues or questions please contact support@instaclustr.com.

The post Preview Release of Apache Cassandra 4.0 appeared first on Instaclustr.

The Last Pickle Joining DataStax

Today is a very emotional day: I’m happy, excited, and extremely proud to announce The Last Pickle has been acquired by DataStax.

I started contributing to the Apache Cassandra project in 2010, working by myself in my spare time. In March 2011, I left my job at Weta Digital and “went pro” becoming one of the first Apache Cassandra Consultants in the world. In 2013, Nate McCall joined me at The Last Pickle and we realised we could have a bigger impact on the world by working together. As the team at TLP grew over the years, so did our impact on the world. And with joining DataStax we are going to have the biggest impact possible.

We are at DataStax because we want to be. Because we have a shared passion to make Apache Cassandra the best database in the world, to make it easier to use, and to make it the first database people chose to use when starting a new project. Cassandra made large scale, highly available, databases an achievable goal for many companies around the world. For the first few years this was enough; it worked well enough if you knew how to take care of it. The addition of the Cassandra Query Language and improvements in operations expanded the user base as things got easier. But there is more work to do. We want to make open source Apache Cassandra easier to use at any scale, from 1 node to 1000, and to make it a realistic choice for every developer in the world.

The great level of technical understanding and kindness that the TLP team has given to the open source community, and importantly our customers, is not going away. The team is going to continue to help our customers as we always have, and will now be able to bring more resources when needed. Our contributions to the community are going to continue, and hopefully increase.

I’m extremely thankful to everyone that has worked at TLP over the years, everyone who has said nice things about us and helped us, and all of the customers who put their trust in my little company from New Zealand.

Thanks, Aaron

Instaclustr Announces PCI-DSS Certification

Instaclustr is very pleased to announce that we have achieved PCI-DSS certification for our Managed Apache Cassandra and Managed Apache Kafka offerings running in AWS. PCI-DSS (Payment Card Industry – Data Security Standard) is a mandated standard for many financial applications and we increasingly see the PCI-DSS controls adopted as the “gold standard” in other industries where the highest standards of security are crucial. PCI-DSS certification adds to our existing SOC2 accreditation to provide the levels of security assurance required by even the most demanding business requirements. 

Overall, this certification effort was the most significant single engineering project in the history of Instaclustr, requiring several person years of engineering effort to implement well over 100 changes touching every aspect of our systems over the course of several months. We’re very proud that, despite this level of change, impact to our customers has been absolutely minimal and we’re able to deliver another very significant piece of background infrastructure, allowing a wider range of customers to focus their efforts on building innovative business applications based on open source data technologies.

While PCI-DSS compliance may not be required by all customers, and is only supported on selected Instaclustr products, most of the security enhancements we have implemented will result in improved levels of security for all our Managed Service customers, regardless of product or platform. The most significant of these changes are:

  • Tightening of our admin access environment with technical controls to prevent egress of data via our admin systems.
  • Improved logging and auditing infrastructure.
  • Tightened operating system hardening and crypto standards.
  • Addition of a WAF (Web Application Firewall) in front of our console and APIs.
  • More automated scanning, and tightened resolution policies, for code dependency vulnerabilities.
  • More frequent security scanning of our central management systems.
  • More developer security training.

Customers wishing to achieve full PCI-DSS compliance will need to opt-in when creating a cluster as achieving PCI compliance will enforce a range of more restrictive security options (for example, password complexity in the Instaclustr console and use of Private Network Clusters) and enabling the required additional logging on the cluster incurs a performance penalty of approximately 5%.  There are also a set of customer responsibilities that customers must implement for full compliance. Additional technical controls activated for PCI compliant clusters include:

  • Logging of all user access to the managed applications (Cassandra, Kafka)
  • Locked-down outbound firewall rules
  • Second approver system for sudo access for our admins

For full details please see our support page.

Customers with existing clusters who wish to move to full PCI compliance should contact support@instaclustr.com who will arrange a plan to apply the new controls to your cluster.

We will be publishing more detail on many of these controls in the coming weeks and holding webinars to cover the Cassandra and Kafka specific implementation details which we expect will be of broad interest. In the meantime, should you have any interest in any further information please contact your Instaclustr Customer Success representative or sales@instaclustr.com who will be able to arrange technical briefings.

The post Instaclustr Announces PCI-DSS Certification appeared first on Instaclustr.

Apache Cassandra 4.0 – Netty Transport

In this series of blog posts, we’ll take a meandering tour of some of the important changes, cool features, and nice ergonomics that are coming with Apache Cassandra 4.0. In part 1 we focussed on Cassandra 4.0 stability and testing, and Part 2 around virtual tables. In this part we will learn about Netty Transport Framework

One of the headline features for Apache Cassandra 4.0 is the refactor of internode messaging to use Javas (non-blocking) NIO capability (https://issues.apache.org/jira/browse/CASSANDRA-8457 and https://issues.apache.org/jira/browse/CASSANDRA-15066) via the Netty library (link to Netty). 

This allows Cassandra to move away from having an N threads per peer model, to a single thread pool for all connections. This dramatically reduces performance issues related to thread signalling, coordination and context switching.

Moving to the Netty framework has also enabled other features like zero copy streaming for SSTables.

Performance improvements have been apparent from day one, both in terms of throughput and latency. The big win however is the significant reduction in tail end latency with up to 40%+ reductions in P99s seen in initial testing.  

Of course patches and improvements are being made all the time during the beta process, so benchmarking will need to be revalidated against each workload and within your own environment, but progress is promising!

https://issues.apache.org/jira/browse/CASSANDRA-14746

The post Apache Cassandra 4.0 – Netty Transport appeared first on Instaclustr.

Running tlp-stress in Kubernetes

Performance tuning and benchmarking is key to the successful operation of Cassandra. We have a great tool in tlp-stress that makes benchmarking a lot easier. I have been exploring running Cassandra in Kubernetes for a while now. At one point I thought to myself, it would be nice to be able to utilize tlp-stress in Kubernetes. After a bit of prototyping, I decided that I would write an operator. This article introduces the Kubernetes operator for tlp-stress, stress-operator.

Before getting into the details of stress-operator, let’s consider the following question: What exactly is a Kubernetes operator?

Kubernetes has a well-defined REST API with lots of built-in resource types like Pods, Deployments, and Jobs. The API for creating these built-in objects is declarative. Users typically create objects using the tool kubectl and YAML files. A controller is code that executes a control loop watching one or more of these resource types. A controller’s job is to ensure that an object’s actual state matches its expected state.

An operator extends Kubernetes with custom resource definitions (CRDs) and custom controllers. A CRD provides domain specific abstractions. The custom controller provides automation that is tailored around those abstractions.

If the concept of an operator is still a bit murky, don’t worry. It will get clearer as we look at examples of using stress-operator that hightlight some of its features including:

  • Configuring and deploying tlp-stress
  • Provisioning Cassandra
  • Monitoring with Prometheus and Grafana

Installing the Operator

You need to have kubectl installed. Check out the official Kubernetes docs if you do not already have it installed.

Lastly, you need access to a running Kubernetes cluster. For local development, my tool of choice is kind.

Download the following manifests:

stress-operator.yaml declares all of the resources necessary to install and run the operator. The other files are optional dependencies.

casskop.yaml installs the Cassandra operator casskop which stress-operator uses to provision Cassandra.

grafana-operator.yaml and prometheus-operator.yaml install grafana-operator and prometheus-operator respectively. stress-operator uses them to install, configure, and monitor tlp-stress.

Install the operator along with the optional dependencies as follows:

$ kubectl apply -f stress-operator.yaml

$ kubectl apply -f casskop.yaml

$ kubectl apply -f grafana-operator.yaml

$ kubectl apply -f prometheus-operator.yaml

The above commands install CRDs as well as the operators themselves. There should be three CRDs installed for stress-operator. We can verify this as follows:

$ kubectl get crds | grep thelastpickle
cassandraclustertemplates.thelastpickle.com   2020-02-26T16:10:00Z
stresscontexts.thelastpickle.com              2020-02-26T16:10:00Z
stresses.thelastpickle.com                    2020-02-26T16:10:00Z

Lastly, verify that each of the operators is up and running:

$ kubectl get deployments
NAME                     READY   UP-TO-DATE   AVAILABLE   AGE
cassandra-k8s-operator   1/1     1            1           6h5m
grafana-deployment       1/1     1            1           4h35m
grafana-operator         1/1     1            1           6h5m
stress-operator          1/1     1            1           4h51m

Note: The prometheus-operator is currently installed with cluster-wide scope in the prometheus-operator namespace.

Configuring and Deploying a Stress Instance

Let’s look at an example of configuring and deploying a Stress instance. First, we create a KeyValue workload in a file named key-value-stress.yaml:

apiVersion: thelastpickle.com/v1alpha1
kind: Stress
metadata:
  name: key-value
spec:
  stressConfig:
    workload: KeyValue
    partitions: 25m
    duration: 60m
    readRate: "0.3"
    consistencyLevel: LOCAL_QUORUM
    replication:
      networkTopologyStrategy:
        dc1: 3
    partitionGenerator: sequence
  cassandraConfig:
    cassandraService: stress

Each property under stressConfig corresponds to a command line option for tlp-stress.

The cassandraConfig section is Kubernetes-specific. When you run tlp-stress (outside of Kubernetes) it will try to connect to Cassandra on localhost by default. You can override the default behavior with the --host option. See the tlp-stress docs for more information about all its options.

In Kubernetes, Cassandra should be deployed using StatefulSets. A StatefulSet requires a headless Service. Among other things, a Service maintains a list of endpoints for the pods to which it provides access.

The cassandraService property specifies the name of the Cassandra cluster headless service. It is needed in order for tlp-stress to connect to the Cassandra cluster.

Now we create the Stress object:

$ kubectl apply -f key-value-stress.yaml
stress.thelastpickle.com/key-value created

# Query for Stress objects to verify that it was created
$ kubectl get stress
NAME       AGE
key-value   4s

Under the hood, stress-operator deploys a Job to run tlp-stress.

$ kubectl get jobs
NAME       COMPLETIONS   DURATION   AGE
key-value   0/1           4s         4s

We can use a label selector to find the pod that is created by the job:

$ kubectl get pods -l stress=key-value,job-name=key-value
NAME             READY   STATUS    RESTARTS   AGE
key-value-pv6kz   1/1     Running   0          3m20s

We can monitor the progress of tlp-stress by following the logs:

$ kubectl logs -f key-value-pv6kz

Note: If you are following the steps locally, the Pod name will have a different suffix.

Later we will look at how we monitor tlp-stress with Prometheus and Grafana.

Cleaning Up

When you are ready to delete the Stress instance, run:

$ kubectl delete stress key-value

The above command deletes the Stress object as well as the underlying Job and Pod.

Provisioning a Cassandra Cluster

stress-operator provides the ability to provision a Cassandra cluster using casskop. This is convenient when you want to quickly to spin up a cluster for some testing.

Let’s take a look at another example, time-series-casskop-stress.yaml:

apiVersion: thelastpickle.com/v1alpha1
kind: Stress
metadata:
  name: time-series-casskop
spec:
  stressConfig:
    workload: BasicTimeSeries
    partitions: 50m
    duration: 60m
    readRate: "0.45"
    consistencyLevel: LOCAL_QUORUM
    replication:
      networkTopologyStrategy:
        dc1: 3
    ttl: 300
  cassandraConfig:
    cassandraClusterTemplate:
      metadata:
        name: time-series-casskop
      spec:
        baseImage: orangeopensource/cassandra-image
        version: 3.11.4-8u212-0.3.1-cqlsh
        runAsUser: 1000
        dataCapacity: 10Gi
        imagepullpolicy: IfNotPresent
        deletePVC: true
        maxPodUnavailable: 0
        nodesPerRacks: 3
        resources:
          requests:
            cpu: '1'
            memory: 1Gi
          limits:
            cpu: '1'
            memory: 1Gi
        topology:
          dc:
            - name: dc1
              rack:
                - name: rack1       

This time we are running a BasicTimeSeries workload with a TTL of five minutes.

In the cassandraConfig section we declare a cassandraClusterTemplate instead of a cassandraService. CassandraCluster is a CRD provided by casskop. With this template we are creating a three-node cluster in a single rack.

We won’t go into any more detail about casskop for now. It is beyond the scope of this post.

Here is what happens when we run kubectl apply -f time-series-casskop-stress.yaml:

  • We create the Stress object
  • stress-operator creates the CassandraCluster object specified in cassandraClusterTemplate
  • casskop provisions the Cassandra cluster
  • stress-operator waits for the Cassandra cluster to be ready (in a non-blocking manner)
  • stress-operator creates the Job to run tlp-stress
  • tlp-stress runs against the Cassandra cluster

There is another benefit of this approach in addition to being able to easily spin up a cluster. We do not have to implement any steps to wait for the cluster to be ready before running tlp-stress. The stress-operator takes care of this for us.

Cleaning Up

When you are ready to delete the Stress instance, run:

$ kubectl delete stress time-series-casskop

The deletion does not cascade to the CassandraCluster object. This is by design. If you want to rerun the same Stress instance (or a different one that uses reuses the same Cassandra cluster), the stress-operator reuses the Cassandra cluster if it already exists.

Run the following to delete the Cassandra cluster:

$ kubectl delete cassandracluster time-series-casskop

Monitoring with Prometheus and Grafana

The stress-operator integrates with Prometheus and Grafana to provide robust monitoring. Earlier we installed grafana-operator and prometheus-operator. They, along with casskop, are optional dependencies. It is entirely possible to integrate with Prometheus and Grafana instances that were installed by means other than the respective operators.

If you want stress-operator to provision Prometheus and Grafana, then the operators must be installed.

There is an additional step that is required for stress-operator to automatically provision Prometheus and Grafana. We need to create a StressContext. Let’s take a look at stresscontext.yaml:

# There should only be one StressContext per namespace. It must be named
# tlpstress; otherwise, the controller will ignore it.
#
apiVersion: thelastpickle.com/v1alpha1
kind: StressContext
metadata:
  name: tlpstress
spec:
  installPrometheus: true
  installGrafana: true

Let’s create the StressContext:

$ kubectl apply -f stresscontext.yaml

Creating the StressContext causes stress-operator to perform several actions including:

  • Configure RBAC setttings so that Prometheus can scrape metrics
  • Create a Prometheus custom resource
  • Expose Prometheus with a Service
  • Create a ServiceMonitor custom resource which effectively tells Prometheus to monitor tlp-stress
  • Create a Grafana custom resource
  • Expose Grafana with a Service
  • Create and configure a Prometheus data source in Grafana

Now when we create a Stress instance, stress-operator will now also create a Grafana dashboard for the tlp-stress job. We can test this with time-series-casskop-stress.yaml. The dashboard name will be the same as the name as the Stress instance, which in this example is time-series-casskop.

Note: To re run the job we need to delete and recreate the Stress instance.

$ kubectl delete stress times-series-casskop

$ kubectl apply -f time-series-casskop-stress.yaml

Note: You do not need to delete the CassandraCluster. The stress-operator will simply reuse it.

Now we want to check out the Grafana dashboard. There are different ways of accessing a Kubernetes service from outside the cluster. We will use kubectl port-forward.

Run the following to make Grafana accessible locally:

$ kubectl port-forward svc/grafana-service 3000:3000
Forwarding from 127.0.0.1:3000 -> 3000
Forwarding from [::1]:3000 -> 3000
Handling connection for 3000

Then in your browser go to http://localhost:3000/. It should direct you to the Home dashboard. Click on the Home label in the upper left part of the screen. You should see a row for the time-series-casskop dashboard. Click on it. The dashboard should look something like this:

tlp-stress Grafana dashboard

Cleaning Up

Delete the StressContext with:

$ kubectl delete stresscontext tlpstress

The deletion does not cascade to the Prometheus or Grafana objects. To delete them and their underlying resources run:

$ kubectl delete prometheus stress-prometheus

$ kubectl delete grafana stress-grafana

Wrap Up

This concludes the brief introduction to stress-operator. The project is still in early stages of development and as such undergoing lots of changes and improvements. I hope that stress-operator makes testing Cassandra in Kubernetes a little easier and more enjoyable!

Version 4.0 of tlp-stress for Cassandra released

The Last Pickle is very pleased to announce that version 4.0 of tlp-stress for Apache Cassandra has been released.

Here is what you will find in the release:

  • The biggest addition is support for including DELETE statements in a workload via the --delete option. This option works similar to the --reads option where a value between 0 and 1 is supplied with the argument. Each workload is responsible for creating the delete requests. So they naturally fit in with each of the workload patterns. As a result of this new feature, the BasicTimeSeries workload works with only Cassandra versions 3.0 and above. See the tlp-stress documentation for more information.

  • Support for adjusting the maximum requests per connection via the --max-request option.

  • A new Sets workload to exercise the set collection type. This was useful for investigating the root cause of slow inserts to set<text>. See CASSANDRA-15464.

  • Improved error handling for non-existent workload parameters, non-existent workloads, and when the read rate plus the delete rate is greater than 1.0.

  • Documentation updates to reflect the new option and workload additions.

A complete list of the items covered in the release can be found in the tlp-stress GitHub issues under the 4.0 milestone.

As always binaries for the release can be found in The Last Pickle Bintray and Docker Hub repositories.

If you have any questions or suggestions please let us know on the tlp-dev-tools mailing list. We would love to hear from you if you are using the tool!

Scylla Summit 2019

I’ve had the pleasure to attend again and present at the Scylla Summit in San Francisco and the honor to be awarded the Most innovative use case of Scylla.

It was a great event, full of friendly people and passionate conversations. Peter did a great full write-up of it already so I wanted to share some of my notes instead…

This a curated set of topics that I happened to question or discuss in depth so this post is not meant to be taken as a full coverage of the conference.

Scylla Manager version 2

The upcoming version of scylla-manager is dropping its dependency on SSH setup which will be replaced by an agent, most likely shipped as a separate package.

On the features side, I was a bit puzzled by the fact that ScyllaDB is advertising that its manager will provide a repair scheduling window so that you can control when it’s running or not.

Why did it struck me you ask?

Because MongoDB does the same thing within its balancer process and I always thought of this as a patch to a feature that the database should be able to cope with by itself.

And that database-do-it-better-than-you motto is exactly one of the promises of Scylla, the boring database, so smart at handling workload impacts on performance that you shouldn’t have to start playing tricks to mitigate them… I don’t want this time window feature on scylla-manager to be a trojan horse on the demise of that promise!

Kubernetes

They almost got late on this but are working hard to play well with the new toy of every tech around the world. Helm charts are also being worked on!

The community developed scylla operator by Yannis is now being worked on and backed by ScyllaDB. It can deploy, scale up and down a cluster.

Few things to note:

  • it’s using a configmap to store the scylla config
  • no TLS support yet
  • no RBAC support yet
  • kubernetes networking is lighter on the network performance hit that was seen on Docker
  • use placement strategies to dedicate kubernetes nodes to scylla!

Change Data Capture

Oh boy this one was awaited… but it’s now coming soon!

I inquired about it’s performance impact since every operation will be written to a table. Clearly my questioning was a bit alpha since CDC is still being worked on.

I had the chance to discuss ideas with Kamil, Tzach and Dor: one of the thing that one of my colleague Julien asked for was the ability for the CDC to generate an event when a tombstone is written so we could actually know when a specific data expired!

I want to stress a few other things too:

  • default TTL on CDC table is 24H
  • expect I/O impact (logical)
  • TTL tombstones can have a hidden disk space cost and nobody was able to tell me if the CDC table was going to be configured with a lower gc_grace_period than the default 10 days so that’s something we need to keep in mind and check for
  • there was no plan to add user information that would allow us to know who actually did the operation, so that’s something I asked for because it could be used as a cheap and open source way to get auditing!

LightWeight Transactions

Another so long awaited feature is also coming from the amazing work and knowledge of Konstantin. We had a great conversation about the differences between the currently worked on Paxos based LWT implementation and the maybe later Raft one.

So yes, the first LWT implementation will be using Paxos as a consensus algorithm. This will make the LWT feature very consistent while having it slower that what could be achieved using Raft. That’s why ScyllaDB have plans on another implementation that could be faster with less data consistency guarantees.

User Defined Functions / Aggregations

This one is bringing the Lua language inside Scylla!

To be precise, it will be a Lua JIT as its footprint is low and Lua can be cooperative enough but the ScyllaDB people made sure to monitor its violations (when it should yield but does not) and act strongly upon them.

I got into implementation details with Avi, this is what I noted:

  • lua function return type is not checked at creation but at execution, so expect runtime errors if your lua code is bad
  • since lua is lightweight, there’s no need to assign a core to lua execution
  • I found UDA examples, like top-k rows, to be very similar to the Map/Reduce logic
  • UDF will allow simpler token range full table scans thanks to syntax sugar
  • there will be memory limits applied to result sets from UDA, and they will be tunable

Text search

Dejan is the text search guy at ScyllaDB and the one who kindly implemented the LIKE feature we asked for and that will be released in the upcoming 3.2 version.

We discussed ideas and projected use cases to make sure that what’s going to be worked on will be used!

Redis API

I’ve always been frustrated about Redis because while I love the technology I never trusted its clustering and scaling capabilities.

What if you could scale your Redis like Scylla without giving up on performance? That’s what the implementation of the Redis API backed by Scylla will get us!

I’m desperately looking forward to see this happen!

Observability in Apache Cassandra 4.0 with Event Diagnostics

Several new observability features will be part of the next major Apache Cassandra 4.0 release. We covered virtual tables in an earlier blog post and now would like to give a preview of another new feature, Diagnostic Events, which provide real time insight into your Cassandra internals.

Observability is key to successfully operating Apache Cassandra, as it allows users and developers to find bugs and identify runtime issues. Log files and metrics are a very popular way to get insights into a cluster’s behaviour, but they are limited to small text representations or time series data. Often important information is missing from log files and can’t be added without changing the source code and rebuilding Cassandra. In addition, log text output is tailored to be readable by humans and is not designed to be machine readable without creating custom parsers.

Diagnostic Events have been designed to fill this gap by providing a way to observe all different types of changes that occur inside Cassandra as they happen. For example, testing frameworks can subscribe listeners that will block the server when change events happen, thus providing a continuous evaluation of invariants across testing scenarios and environments. While other observability tools could subscribe listeners without blocking the Cassandra server, providing third parties visibility into how Cassandra navigates a variety of changes and critical information about your cluster.

The idea of Event Diagnostics was first proposed in 2016 by Stefan Podwinksi and then implemented as part of CASSANDRA-12944.

Types of Diagnostic Events

Currently the types of observability implemented by Event Diagnostics are:

  • AuditEvents, capturing all audit log entries
  • BootstrapEvents, when a new node is bootstrapping into a cluster
  • GossiperEvents, state changes that are announced over gossip
  • HintEvents, the storage and delivery of hints
  • TokenMetadataEventsPendingRangeCalculatorServiceEvents, lifecycle changes and tasks when token ownership ranges are being calculated
  • ReadRepairEvents and PartitionRepairEvents, different types of repair events
  • SchemaAnnouncementEvents, schema changes proposed, received, and accepted
  • SchemaMigrationEvents, schema propagation across a cluster
  • SchemaEvents, high-level schema changes
  • TokenAllocatorEvents, allocation of token range ownerships, random or via an allocation strategy.

A little about the implementation

Diagnostic Events have been implemented with a JMX interface consisting of two MBeans: the DiagnosticEventService and the LastEventIdBroadcaster. The DiagnosticEventService provides methods to enable diagnostics on a per event type and to bulk read new events. The LastEventIdBroadcaster provides attributes for the last published offsets for each event type. Importanlty, the LastEventIdBroadcaster avoids the use of JMX notifications, a mechanism that too easily loses events, by maintaining an offset for each event type’s queue. Behind these JMX interfaces the persistence of events is regulated by DiagnosticEventStore and although an in-memory store is currently used, an implementation based on Chronicle Queue is planned.

Monitoring Cassandra Diagnostic Events

Reaper is one of the tools that has the ability to listen to and display Cassandra’s emitted Diagnostic Events in real time. The following section outlines how to implement this feature and gain better visiblity into your cluster.

Enabling Diagnostic Events server-side in Apache Cassandra 4.0

Diagnostic Events are not enabled (published) by default in Apache Cassandra version 4.0, but can be manually enabled. To activate the publishing of diagnostic events, enable the diagnostic_events_enabled flag on the Cassandra node:

# Diagnostic Events #
# If enabled, diagnostic events can be helpful for troubleshooting operational issues. Emitted events contain details
# on internal state and temporal relationships across events, accessible by clients via JMX.
diagnostic_events_enabled: true

Restarting the node is required after this change.

Using Reaper to display Event Diagnostics

In Reaper go to the “Live Diagnostics” page.

Select the cluster that is running Cassandra version 4.0 with diagnostic_events_enabled: true, using the “Filter cluster” field.

Expand the “Add Events Subscription” section and type in a description for the subscription to be created.

Select the node whose diagnostic events you want to observe and select the diagnostic events you want to observe. Check “Enable Live View”.

Add Diagnostic Subscription

Press “Save”. In the list of subscriptions there should now be a row that displays the information entered above.

Press “View”. A green bar will appear. Underneath this green bar the types of diagnostic events selected and the nodes subscribed to will be displayed. Press the green bar to stop showing live events.

Add Diagnostic Subscription

Event subscriptions is a way to understand what goes on inside Apache Cassandra, particularly at the cluster coordination level. Enabling Diagnostic Events has no performance overhead on the Cassandra node until subscriptions are also enabled. Reaper’s subscription may not be able to keep up and receive all events when traffic or load is significantly high on the Cassandra node as it keeps the event subscriptions in a fifo in-memory queue, of maximum length 200 per event type. The AuditEvent type can impose additional overhead on a cluster, and for that reason requires additional configuration in the auditing section of Cassandra’s yaml configuration file.

Happy diagnosing, hope you have some fun with this extra real-time insight into Cassandra internals. And if you would like to see more diagnostic event types added reach out, make your suggestions, and even better throw us a patch and get the ball rolling…

Cassandra Reaper 2.0 was released

Cassandra Reaper 2.0 was released a few days ago, bringing the (long awaited) sidecar mode along with a refreshed UI. It also features support for Apache Cassandra 4.0, diagnostic events and thanks to our new committer, Saleil Bhat, Postgres can now be used for all distributed modes of Reaper deployments, including sidecar.

Sidecar mode

By default and for security reasons, Apache Cassandra restricts JMX access to the local machine, blocking any external request.
Reaper relies heavily on JMX communications when discovering a cluster, starting repairs and monitoring metrics. In order to use Reaper, ops has to change Cassandra’s default configuration to allow external JMX access, potentially breaking existing company security policies. All this makes unnecessary burden on Reaper’s out-of-the-box experience.

With its 2.0 release, Reaper can now be installed as a sidecar to the Cassandra process and communicate locally only, coordinating with other Reaper instances through the storage backend exclusively.

At the risk of stating the obvious, this means that all nodes in the cluster must have a Reaper sidecar running so that repairs can be processed.

In sidecar mode, several Reaper instances are likely to be started at the same time, which could lead to schema disagreement. We’ve contributed to the migration library Reaper uses and added a consensus mechanism based on LWTs to only allow a single process to migrate a keyspace at once.

Also, since Reaper can only communicate with a single node in this mode, clusters in sidecar are automatically added to Reaper upon startup. This allowed us to seamlessly deploy Reaper in clusters generated by the latest versions of tlp-cluster.

A few limitations and caveats of the sidecar in 2.0:

  • Reaper clusters are isolated and you cannot manage several Cassandra clusters with a single Reaper cluster.
  • Authentication to the Reaper UI/backend cannot be shared among Reaper instances, which will make load balancing hard to implement.
  • Snapshots are not supported.
  • The default memory settings of Reaper will probably be too high (2G heap) for the sidecar and should be lowered in order to limit the impact of Reaper on the nodes.

Postgres for distributed modes

So far, we had only implemented the possibility of starting multiple Reaper instances at once when using Apache Cassandra as storage backend.
We were happy to receive a contribution from Saleil Bhat allowing Postgres for deployments with multiple Reaper instances, which also allows it to be used for sidecar setups.
As recognition for the hard work on this feature, we welcome Saleil as a committer on the project.

Apache Cassandra 4.0 support

Cassandra 4.0 is now available as an alpha release and there have been many changes we needed to support in Reaper. It is now fully operational and we will keep working on embracing 4.0 new features and enhancements.
Reaper can now listen and display in real-time live diagnostic events transmitted by Cassandra nodes. More background informations can be found in CASSANDRA-12944, and stay tuned for an upcoming TLP blog post on this exciting feature.

Refreshed UI look

While the UI look of Reaper is not as important as its core features, we’re trying to make Reaper as pleasant to use as possible. Reaper 2.0 now brings five UI themes that can be switched from the dropdown menu. You’ll have the choice between 2 dark themes and 3 light themes, which were all partially generated using this online tool.

Superhero reaper theme

Solarized reaper theme

Yeti reaper theme

Flatly reaper theme

United reaper theme

And more

The docker image was improved to avoid running Reaper as root and to allow disabling authentication, thanks to contributions from Miguel and Andrej.
The REST API and spreaper can now forcefully delete a cluster which still has schedules or repair runs in history, making it easier to remove obsolete clusters, without having to delete each run in history. Metrics naming was adjusted to improve tracking repair state on a keyspace. They are now provided in the UI, in the repair run detail panel:

United reaper theme

Also, Inactive/unreachable clusters will appear last in the cluster list, ensuring active clusters display quickly. Lastly, we brought various performance improvements, especially for Reaper installations with many registered clusters.

Upgrade now

In order to reduce the initial startup of Reaper and as we were starting to have a lot of small schema migrations, we collapsed the initial migration to cover the schema up to Reaper 1.2.2. This means upgrades to Reaper 2.0 are possible from Reaper 1.2.2 and onwards, if you are using Apache Cassandra as storage backend.

The binaries for Reaper 2.0 are available from yum, apt-get, Maven Central, Docker Hub, and are also downloadable as tarball packages. Remember to backup your database before starting the upgrade.

All instructions to download, install, configure, and use Reaper 1.4 are available on the Reaper website.

Note: the docker image in 2.0 seems to be broken currently and we’re actively working on a fix. Sorry for the inconvenience.

Medusa - Spotify’s Apache Cassandra backup tool is now open source

Spotify and The Last Pickle (TLP) have collaborated over the past year to build Medusa, a backup and restore system for Apache Cassandra which is now fully open sourced under the Apache License 2.0.

Challenges Backing Up Cassandra

Backing up Apache Cassandra databases is hard, not complicated. You can take manual snapshots using nodetool and move them off the node to another location. There are existing open source tools such as tablesnap, or the manual processes discussed in the previous TLP blog post “Cassandra Backup and Restore - Backup in AWS using EBS Volumes”. However they all tend to lack some features needed in production, particularly when it comes to restoring data - which is the ultimate test of a backup solution.

Providing disaster recovery for Cassandra has some interesting challenges and opportunities:

  • The data in each SSTable is immutable allowing for efficient differential backups that only copy changes since the last backup.
  • Each SSTable contains data that node is responsible for, and the restore process must make sure it is placed on a node that is also responsible for the data. Otherwise it may be unreachable by clients.
  • Restoring to different cluster configurations, changes in the number of nodes or their tokens, requires that data be re-distributed into the new topology following Cassandra’s rules.

Introducing Medusa

Medusa is a command line backup and restore tool that understands how Cassandra works.
The project was initially created by Spotify to replace their legacy backup system. TLP was hired shortly after to take over development, make it production ready and open source it.
It has been used on small and large clusters and provides most of the features needed by an operations team.

Medusa supports:

  1. Backup a single node.
  2. Restore a single node.
  3. Restore a whole cluster.
  4. Selective restore of keyspaces and tables.
  5. Support for single token and vnodes clusters.
  6. Purging the backup set of old data.
  7. Full or incremental backup modes.
  8. Automated verification of restored data.

The command line tool that uses Python version 3.6 and needs to be installed on all the nodes you want to back up. It supports all versions of Cassandra after 2.1.0 and thanks to the Apache libcloud project can store backups in a number of platforms including:

Backup A Single Node With Medusa

Once medusa is installed and configured a node can be backed up with a single, simple command:

medusa backup --backup-name=<backup name>

When executed like this Medusa will:

  1. Create a snapshot using the Cassandra nodetool command.
  2. Upload the snapshot to your configured storage provider.
  3. Clear the snapshot from the local node.

Along with the SSTables, Medusa will store three meta files for each backup:

  • The complete CQL schema.
  • The token map, a list of nodes and their token ownership.
  • The manifest, a list of backed up files with their md5 hash.

Full And Differential Backups

All Medusa backups only copy new SSTables from the nodes, reducing the network traffic needed. It then has two ways of managing the files in the backup catalog that we call Full or Differential backups. For Differential backups only references to SSTables are kept by each new backup, so that only a single instance of each SStable exists no matter how many backups it is in. Differential backups are the default and in operations at Spotify reduced the backup size for some clusters by up to 80%.

Full backups create a complete copy of all SSTables on the node each time they run. Files that have not changed since the last backup will be copied in the backup catalog into the new backup (and not copied off the node). In contrast to the differential method which only creates a reference to files. Full backups are useful when you need to take a complete copy and have all the files in a single location.

Cassandra Medusa Full Backups

Differential backups take advantage of the immutable SSTables created by the Log Structured Merge Tree storage engine used by Cassanda. In this mode Medusa checks if the SSTable has previously being backed up, and only copies the new files (just like always). However all SSTables for the node are then stored in a single common folder, and the backup manifest contains only metadata files and references to the SSTables.

Cassandra Medusa Full Backups

Backup A Cluster With Medusa

Medusa currently lacks an orchestration layer to run a backup on all nodes for you. In practice we have been using crontab to do cluster wide backups. While we consider the best way to automate this (and ask for suggestions) we recommend using techniques such as:

  • Scheduled via crontab on each node.
  • Manually on all nodes using pssh.
  • Scripted using cstar.

Listing Backups

All backups with the same “backup name” are considered part of the same backup for a cluster. Medusa can provide a list of all the backups for a cluster, when they started and finished, and if all the nodes have completed the backup.

To list all existing backups for a cluster, run the following command on one of the nodes:

$ medusa list-backups
2019080507 (started: 2019-08-05 07:07:03, finished: 2019-08-05 08:01:04)
2019080607 (started: 2019-08-06 07:07:04, finished: 2019-08-06 07:59:08)
2019080707 (started: 2019-08-07 07:07:04, finished: 2019-08-07 07:59:55)
2019080807 (started: 2019-08-08 07:07:03, finished: 2019-08-08 07:59:22)
2019080907 (started: 2019-08-09 07:07:04, finished: 2019-08-09 08:00:14)
2019081007 (started: 2019-08-10 07:07:04, finished: 2019-08-10 08:02:41)
2019081107 (started: 2019-08-11 07:07:04, finished: 2019-08-11 08:03:48)
2019081207 (started: 2019-08-12 07:07:04, finished: 2019-08-12 07:59:59)
2019081307 (started: 2019-08-13 07:07:03, finished: Incomplete [179 of 180 nodes])
2019081407 (started: 2019-08-14 07:07:04, finished: 2019-08-14 07:56:44)
2019081507 (started: 2019-08-15 07:07:03, finished: 2019-08-15 07:50:24)

In the example above the backup called “2019081307” is marked as incomplete because 1 of the 180 nodes failed to complete a backup with that name.

It is also possible to verify that all expected files are present for a backup, and their content matches hashes generated at the time of the backup. All these operations and more are detailed in the Medusa README file.

Restoring Backups

While orchestration is lacking for backups, Medusa coordinates restoring a whole cluster so you only need to run one command. The process connects to nodes via SSH, starting and stopping Cassandra as needed, until the cluster is ready for you to use. The restore process handles three different use cases.

  1. Restore to the same cluster.
  2. Restore to a different cluster with the same number of nodes.
  3. Restore to a different cluster with a different number of nodes.

Case #1 - Restore To The Same Cluster

This is the simplest case: restoring a backup to the same cluster. The topology of the cluster has not changed and all the nodes that were present at the time the backup was created are still running in the cluster.

Cassandra Medusa Full Backups

Use the following command to run an in-place restore:

$ medusa restore-cluster --backup-name=<name of the backup> \
                         --seed-target node1.domain.net

The seed target node will be used as a contact point to discover the other nodes in the cluster. Medusa will discover the number of nodes and token assignments in the cluster and check that it matches the topology of the source cluster.

To complete this restore each node will:

  1. Download the backup data into the /tmp directory.
  2. Stop Cassandra.
  3. Delete the commit log, saved caches and data directory including system keyspaces.
  4. Move the downloaded SSTables into to the data directory.
  5. Start Cassandra.

The schema does not need to be recreated as it is contained in the system keyspace, and copied from the backup.

Case #2 - Restore To A Different Cluster With Same Number Of Nodes

Restoring to a different cluster with the same number of nodes is a little harder because:

  • The destination cluster may have a different name, which is stored in system.local table.
  • The nodes may have different names.
  • The nodes may have different token assignments.

Cassandra Medusa Full Backups

Use the following command to run a remote restore:

$ medusa restore-cluster --backup-name=<name of the backup> \
                         --host-list <mapping file>

The host-list parameter tells Medusa how to map from the original backup nodes to the destination nodes in the new cluster, which is assumed to be a working Cassandra cluster. The mapping file must be a Command Separated File (without a heading row) with the following columns:

  1. is_seed: True or False indicating if the destination node is a seed node. So we can restore and start the seed nodes first.
  2. target_node: Host name of a node in the target cluster.
  3. source_node: Host name of a source node to copy the backup data from.

For example:

True,new_node1.foo.net,old_node1.foo.net
True,new_node2.foo.net,old_node2.foo.net
False,new_node3.foo.net,old_node3.foo.net

In addition to the steps listed for Case 1 above, when performing a backup to a remote cluster the following steps are taken:

  1. The system.local and system.peers tables are not modified to preserve the cluster name and prevent the target cluster from connecting to the source cluster.
  2. The system_auth keyspace is restored from the backup, unless the --keep-auth flag is passed to the restore command.
  3. Token ownership is updated on the target nodes to match the source nodes by passing the -Dcassandra.initial_token JVM parameter when the node is restarted. Which causes ownership to be updated in the local system keyspace.

Case #3 - Restore To A Different Cluster With A Different Number Of Nodes

Restoring to a different cluster with a different number of nodes is the hardest case to deal with because:

  • The destination cluster may have a different name, which is stored in system.local table.
  • The nodes may have different names.
  • The nodes may have different token assignments.
  • Token ranges can never be the same as there is a different number of nodes.

The last point is the crux of the matter. We cannot get the same token assignments because we have a different number of nodes, and the tokens are assigned to evenly distribute the data between nodes. However the SSTables we have backed up contain data aligned to the token ranges defined in the source cluster. The restore process must ensure the data is placed on the nodes which are replicas according to the new token assignments, or data will appear to have been lost.

To support restoring data into a different topology Medusa uses the sstableloader tool from the Cassandra code base. While slower than copying the files from the backup the sstableloader is able to “repair” data into the destination cluster. It does this by reading the token assignments and streaming the parts of the SSTable that match the new tokens ranges to all the replicas in the cluster.

Cassandra Medusa Full Backups

Use the following command to run a restore to a cluster with a different topology :

$ medusa restore-cluster --backup-name=<name of the backup> \
                         --seed-target target_node1.domain.net

Restoring data using this technique has some drawbacks:

  1. The restore will take significantly longer.
  2. The amount of data loaded into the cluster will be the size of the backup set multiplied by the Replication Factor. For example, a backup of a cluster with Replication Factor 3 will have 9 copies of the data loaded into it. The extra replicas will be removed by compaction however the total on disk load during the restore process will be higher than what it will be at the end of the restore. See below for a further discussion.
  3. The current schema in the cluster will be dropped and a new one created using the schema from the backup. By default Cassandra will take a snapshot when the schema is dropped, a feature controlled by the auto_snapshot configuration setting, which will not be cleared up by Medusa or Cassandra. If there is an existing schema with data it will take extra disk space. This is a sane safety precaution, and a simple work around is to manually ensure the destination cluster does not have any data in it.

A few extra words on the amplification of data when restoring using sstableloader. The backup has the replicated data and lets say we have a Replication Factor of 3, roughly speaking there are 3 copies of each partition. Those copies are spread around the SSTables we collected from each node. As we process each SSTable the sstableloader repairs the data back into the cluster, sending it to the 3 new replicas. So the backup contains 3 copies, we process each copy, and we send each copy to the 3 new replicas, which means in this case:

  • The restore sends nine copies of data to the cluster.
  • Each node gets three copies of data rather than one.

The following sequence of operations will happen when running this type of restore:

  1. Drop the schema objects and re-create them (once for the whole cluster)
  2. Download the backup data into the /tmp directory
  3. Run the sstableloader for each of the tables in the backup

Available Now On GitHub

Medusa is now available on GitHub and is available through PyPi. With this blog post and the readme file in the repository you should be able to take a backup within minutes of getting started. As always if you have any problems create an issue in the GitHub project to get some help. It has been in use for several months at Spotify, storing petabytes of backups in Google Cloud Storage (GCS), and we thank Spotify for donating the software to the community to allow others to have the same confidence that their data is safely backed up.

One last thing, contributions are happily accepted especially to add support for new object storage providers.

Also, we’re looking to hire a Cassandra expert in America.

How to Choose which Tombstones to Drop

Tombstones are notorious for causing issues in Apache Cassandra. They often become a problem when Cassandra is not able to purge them in a timely fashion. Delays in purging happen because there are a number of conditions that must be met before a tombstone can be dropped.

In this post, we are going to see how to make meeting these conditions more likely. We are going to achieve this by selecting which specific SSTables Cassandra should include in a compaction, which results in smaller and faster compactions that are more likely drop the tombstones.

Before we start, there are a few parameters we need to note:

  • We will consider only cases where the unchecked_tombstone_compaction compaction option is turned off. Enabling this option makes Cassandra run compaction with only one SSTable. This achieves a similar result to our process, but in a far less controlled way.
  • We also assume the unsafe_aggressive_sstable_expiration option of TWCS is turned off. This option makes Cassandra drop entire SSTables once they expire without checking if the partitions appear in other SSTables.

How Cassandra Drops Tombstones

When we look at the source, we see Cassandra will consider deleting a tombstone when a SSTable undergoes a compaction. Cassandra can only delete the tombstone if:

  • The tombstone is older than gc_grace_seconds (a table property).
  • There is no other SSTable outside of this compaction that:
    • Contains a fragment of the same partition the tombstone belongs to, and
    • The timestamp of any value (in the other SSTable) is younger than the tombstone.

In other words, for a tombstone to delete there can not be any data that a tombstone suppresses outside of the planned compaction. Unfortunately, in many cases tombstones are not isolated and will touch other data.

The heavy weight solution to this issue is to run a major compaction. A major compaction will include all SSTables in one big compaction, so there are no SSTables not participating in it. However, this approach comes with a cost:

  • It can require at least 50% of the disk space to be free.
  • It consumes CPU and disk throughput at the expense of regular traffic.
  • It can take hours to compact TBs of data.

So use a more light-weight solution that will compact only the SSTables within a given partition, but no others.

Step 1 - Identify Problematic Partition

The first step is to find out which partition is the most problematic. There are various ways of doing this.

First, if we know our data model inside and out, we can tell straight away which partitions tombstone heavy. If it’s not obvious by looking at it, we can use one of the other options below.

For example, another option is to consult the Cassandra logs. When Cassandra encounters too many tombstones, it will log a line similar to this:

WARN  [SharedPool-Worker-4] 2019-07-15 09:24:15,971 SliceQueryFilter.java:308 - Read 2738 live and 6351 tombstone cells in tlp_stress.sensor_data for key: 55d44291-0343-4bb6-9ac6-dd651f543323 (see tombstone_warn_threshold). 5000 columns were requested, slices=[-]

Finally, we can use a tool like Instaclustr’s ic-purge to give us a detailed overview of the tombstone situation:

Summary:
+---------+---------+
|         | Size    |
+---------+---------+
| Disk    | 36.0 GB |
| Reclaim |  5.5 GB |
+---------+---------+

Largest reclaimable partitions:
+------------------+----------+----------+------------------------------+
| Key              | Size     | Reclaim  | Generations                  |
+------------------+----------+----------+------------------------------+
|     001.0.361268 |  32.9 MB |  15.5 MB |               [46464, 62651] |
|     001.0.618927 |   3.5 MB |   1.8 MB |               [46268, 36368] |
+------------------+----------+----------+------------------------------+

In the table above, we see which partitions take the most reclaimable space (001.0.361268 takes the most). We also see which SSTables these partitions live in (the Generation column). We have found the SSTables to compact. However, we can take this one step further and ask Cassandra for the absolute paths, not just their generation numbers.

Step 2 - List Relevant SSTables

With the partition key known, we can simply use the nodetool getsstables command. It will make Cassandra tell us the absolute paths of SSTables that a partition lives in:

ccm node1 nodetool "getsstables tlp_stress sensor_data 001.0.361268"

/Users/tlp/.ccm/2-2-9/node1/data0/tlp_stress/sensor_data-b260e6a0a7cd11e9a56a372dfba9b857/lb-46464-big-Data.db
/Users/tlp/.ccm/2-2-9/node1/data0/tlp_stress/sensor_data-b260e6a0a7cd11e9a56a372dfba9b857/lb-62651-big-Data.db

After we find all the SSTables, the last thing we need to do is to trigger a compaction.

Step 3 - Trigger a Compaction

Triggering a user-defined compaction is something Jon has described in this post. We will proceed the same way and use jmxterm to trigger the forceUserDefinedCompaction MBean. We will need to pass it a comma-separated list of the SSTables we got in the previous step:

SSTABLE_LIST="/Users/tlp/.ccm/2-2-9/node1/data0/tlp_stress/sensor_data-b260e6a0a7cd11e9a56a372dfba9b857/lb-46464-big-Data.db,\
/Users/tlp/.ccm/2-2-9/node1/data0/tlp_stress/sensor_data-b260e6a0a7cd11e9a56a372dfba9b857/lb-46464-big-Data.db"

JMX_CMD="run -b org.apache.cassandra.db:type=CompactionManager forceUserDefinedCompaction ${SSTABLE_LIST}"
echo ${JMX_CMD} | java -jar jmxterm-1.0-alpha-4-uber.jar -l localhost:7100
#calling operation forceUserDefinedCompaction of mbean org.apache.cassandra.db:type=CompactionManager
#operation returns:
null
$>

Despite getting a null as the invocation result, the compaction has most likely started. We can go and watch the nodetool compactionstats to see how it is going.

Once the compaction completes, we can repeat the process we used in Step 1 above to validate that the tombstones have been deleted.

Note for LeveledCompactionStrategy: This procedure only works with STCS and TWCS. If a table is using LCS, Cassandra does not allow invoking the forceUserDefinedCompaction MBean. For LCS, we could nudge Cassandra into compacting specific SSTables by resetting their levels. That, however, is complicated enough to deserve its own blog post.

Conclusion

In this post we saw how to trigger compactions for all of the SSTables a partition appears in. This is useful because the compaction will have a smaller footprint and is more efficient than running a major compaction, but will reliably purge droppable tombstones that can cause issues for Apache Cassandra.

Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

Streaming is a process where nodes of a cluster exchange data in the form of SSTables. Streaming can kick in during many situations such as bootstrap, repair, rebuild, range movement, cluster expansion, etc. In this post, we discuss the massive performance improvements made to the streaming process in Apache Cassandra 4.0.

High Availability

As we know Cassandra is a Highly Available, Eventually Consistent database. The way it maintains its legendary availability is by storing redundant copies of data in nodes known as replicas, usually running on commodity hardware. During normal operations, these replicas may end up having hardware issues causing them to fail. As a result, we need to replace them with new nodes on fresh hardware.

As part of this replacement operation, the new Cassandra node streams data from the neighboring nodes that hold copies of the data belonging to this new node’s token range. Depending on the amount of data stored, this process can require substantial network bandwidth, taking some time to complete. The longer these types of operations take, the more we are exposing ourselves to loss of availability. Depending on your replication factor and consistency requirements, if another node fails during this replacement operation, ability will be impacted.

Increasing Availability

To minimize the failure window, we want to make these operations as fast as possible. The faster the new node completes streaming its data, the faster it can serve traffic, increasing the availability of the cluster. Towards this goal, Cassandra 4.0 saw the addition of Zero Copy streaming. For more details on Cassandra’s zero copy implementation, see this blog post and CASSANDRA-14556 for more information.

Talking Numbers

To quantify the results of these improvements, we, at Netflix, measured the performance impact of streaming in 4.0 vs 3.0, using our open source NDBench benchmarking tool with the CassJavaDriverGeneric plugin. Though we knew there would be improvements, we were still amazed with the overall results of a five fold increase in streaming performance. The test setup and operations are all detailed below.

Test Setup

In our test setup, we used the following configurations:

  • 6-node clusters on i3.xl, i3.2xl, i3.4xl and i3.8xl EC2 instances, each on 3.0 and trunk (sha dd7ec5a2d6736b26d3c5f137388f2d0028df7a03).
  • Table schema
CREATE TABLE testing.test (
    key text,
    column1 int,
    value text,
    PRIMARY KEY (key, column1)
) WITH CLUSTERING ORDER BY (column1 ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
    AND compression = {'enabled': 'false'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
  • Data size per node: 500GB
  • No. of tokens per node: 1 (no vnodes)

To trigger the streaming process we used the following steps in each of the clusters:

  • terminated a node
  • add a new node as a replacement
  • measure the time taken to complete streaming data by the new node replacing the terminated node

For each cluster and version, we repeated this exercise multiple times to collect several samples.

Below is the distribution of streaming times we found across the clusters Benchmark results

Interpreting the Results

Based on the graph above, there are many conclusions one can draw from it. Some of them are

  • 3.0 streaming times are inconsistent and show high degree of variability (fat distributions across multiple samples)
  • 3.0 streaming is highly affected by the instance type and generally looks generally CPU bound
  • Zero Copy streaming is approximately 5x faster
  • Zero Copy streaming time shows little variability in its performance (thin distributions across multiple samples)
  • Zero Copy streaming performance is not CPU bound and remains consistent across instance types

It is clear from the performance test results that Zero Copy Streaming has a huge performance benefit over the current streaming infrastructure in Cassandra. But what does it mean in the real world? The following key points are the main take aways.

MTTR (Mean Time to Recovery): MTTR is a KPI (Key Performance Indicator) that is used to measure how quickly a system recovers from a failure. Zero Copy Streaming has a very direct impact here with a five fold improvement on performance.

Costs: Zero Copy Streaming is ~5x faster. This translates directly into cost for some organizations primarily as a result of reducing the need to maintain spare server or cloud capacity. In other situations where you’re migrating data to larger instance types or moving AZs or DCs, this means that instances that are sending data can be turned off sooner saving costs. An added cost benefit is that now you don’t have to over provision the instance. You get a similar streaming performance whether you use a i3.xl or an i3.8xl provided the bandwidth is available to the instance.

Risk Reduction: There is a great reduction in the risk due to Zero Copy Streaming as well. Since a Cluster’s recovery mainly depends on the streaming speed, Cassandra clusters with failed nodes will be able to recover much more quickly (5x faster). This means the window of vulnerability is reduced significantly, in some situations down to few minutes.

Finally, a benefit that we generally don’t talk about is the environmental benefit of this change. Zero Copy Streaming enables us to move data very quickly through the cluster. It objectively reduces the number and sizes of instances that are used to build Cassandra cluster. As a result not only does it reduce Cassandra’s TCO (Total Cost of Ownership), it also helps the environment by consuming fewer resources!

Scylla: four ways to optimize your disk space consumption

We recently had to face free disk space outages on some of our scylla clusters and we learnt some very interesting things while outlining some improvements that could be made to the ScyllaDB guys.

100% disk space usage?

First of all I wanted to give a bit of a heads up about what happened when some of our scylla nodes reached (almost) 100% disk space usage.

Basically they:

  • stopped listening to client requests
  • complained in the logs
  • wouldn’t flush commitlog (expected)
  • abort their compaction work (which actually gave back a few GB of space)
  • stay in a stuck / unable to stop state (unexpected, this has been reported)

After restarting your scylla server, the first and obvious thing you can try to do to get out of this situation is to run the nodetool clearsnapshot command which will remove any data snapshot that could be lying around. That’s a handy command to reclaim space usually.

Reminder: depending on your compaction strategy, it is usually not advised to allow your data to grow over 50% of disk space...

But that’s only a patch so let’s go down the rabbit hole and look at the optimization options we have.


Optimize your schemas

Schema design and the types your choose for your columns have a huge impact on disk space usage! And in our case we indeed overlooked some of the optimizations that we could have done from the start and that did cost us a lot of wasted disk space. Fortunately it was easy and fast to change.

To illustrate this, I’ll take a sample of 100,000 rows of a simple and naive schema associating readings of 50 integers to a user ID:

Note: all those operations were done using Scylla 3.0.3 on Gentoo Linux.

CREATE TABLE IF NOT EXISTS test.not_optimized
(
uid text,
readings list<int>,
PRIMARY KEY(uid)
) WITH compression = {};

Once inserted on disk, this takes about 250MB of disk space:

250M    not_optimized-00cf1500520b11e9ae38000000000004

Now depending on your use case, if those readings at not meant to be updated for example you could use a frozen list instead, which will allow a huge storage optimization:

CREATE TABLE IF NOT EXISTS test.mid_optimized
(
uid text,
readings frozen<list<int>>,
PRIMARY KEY(uid)
) WITH compression = {};

With this frozen list we now consume 54MB of disk space for the same data!

54M     mid_optimized-011bae60520b11e9ae38000000000004

There’s another optimization that we could do since our user ID are UUIDs. Let’s switch to the uuid type instead of text:

CREATE TABLE IF NOT EXISTS test.optimized
(
uid uuid,
readings frozen<list<int>>,
PRIMARY KEY(uid)
) WITH compression = {};

By switching to uuid, we now consume 50MB of disk space: that’s a 80% reduced disk space consumption compared to the naive schema for the same data!

50M     optimized-01f74150520b11e9ae38000000000004

Enable compression

All those examples were not using compression. If your workload latencies allows it, you should probably enable compression on your sstables.

Let’s see its impact on our tables:

ALTER TABLE test.not_optimized WITH compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'};
ALTER TABLE test.mid_optimized WITH compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'};
ALTER TABLE test.optimized WITH compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'};

Then we run a nodetool compact test to force a (re)compaction of all the sstables and we get:

63M     not_optimized-00cf1500520b11e9ae38000000000004
28M mid_optimized-011bae60520b11e9ae38000000000004
24M optimized-01f74150520b11e9ae38000000000004

Compression is really a great gain here allowing another 50% reduced disk space usage reduction on our optimized table!

Switch to the new “mc” sstable format

Since the Scylla 3.0 release you can use the latest “mc” sstable storage format on your scylla clusters. It promises a greater efficiency for usually a way more reduced disk space consumption!

It is not enabled by default, you have to add the enable_sstables_mc_format: true parameter to your scylla.yaml for it to be taken into account.

Since it’s backward compatible, you have nothing else to do as new compactions will start being made using the “mc” storage format and the scylla server will seamlessly read from old sstables as well.

But in our case of immediate disk space outage, we switched to the new format one node at a time, dropped the data from it and ran a nodetool rebuild to reconstruct the whole node using the new sstable format.

Let’s demonstrate its impact on our test tables: we add the option to the scylla.yaml file, restart scylla-server and run nodetool compact test again:

49M     not_optimized-00cf1500520b11e9ae38000000000004
26M mid_optimized-011bae60520b11e9ae38000000000004
22M optimized-01f74150520b11e9ae38000000000004

That’s a pretty cool gain of disk space, even more for the not optimized version of our schema!

So if you’re in great need of disk space or it is hard for you to change your schemas, switching to the new “mc” sstable format is a simple and efficient way to free up some space without effort.

Consider using secondary indexes

While denormalization is the norm (yep.. legitimate pun) in the NoSQL world this does not mean we have to duplicate everything all the time. A good example lies in the internals of secondary indexes if your workload can compromise with its moderate impact on latency.

Secondary indexes on scylla are built on top of Materialized Views that basically stores an up to date pointer from your indexed column to your main table partition key. That means that secondary indexes MVs are not duplicating all the columns (and thus the data) from your main table as you would have to do when denormalizing a table to query by another column: this saves disk space!

This of course comes with a latency drawback because if your workload is interested in the other columns than the partition key of the main table, the coordinator node will actually issue two queries to get all your data:

  1. query the secondary index MV to get the pointer to the partition key of the main table
  2. query the main table with the partition key to get the rest of the columns you asked for

This has been an effective trick to avoid duplicating a table and save disk space for some of our workloads!

(not a tip) Move the commitlog to another disk / partition?

This should only be considered as a sort of emergency procedure or for cost efficiency (cheap disk tiering) on non critical clusters.

While this is possible even if the disk is not formatted using XFS, it not advised to separate the commitlog from data on modern SSD/NVMe disks but… you technically can do it (as we did) on non production clusters.

Switching is simple, you just need to change the commitlog_directory parameter in your scylla.yaml file.

Testing Cassandra compatible APIs

In this quick blog post, I’m going to assess how the databases that advertise themselves as “Cassandra API-compatible” fare in the compatibility department.

But that is all I will do, only API testing, and not an extensive testing, just based on the APIs I see used often. Based on this, you can start building an informed decision on whether or not to change databases.

The contenders:

  • Apache Cassandra 4.0
    • Installation: Build from Source
  • Yugabyte – https://www.yugabyte.com/
    • “YCQL is a transactional flexible-schema API that is compatible with the Cassandra Query Language (CQL). “
    • Installation: Docker
  • ScyllaDB – https://www.scylladb.com/
    • “Apache Cassandra’s wire protocol, a rich polyglot of drivers, and integration with Spark, Presto, and Graph tools make for resource-efficient and performance-effective coding.”
    • Installation: Docker
  • Azure Cosmos – https://azure.microsoft.com/en-us/services/cosmos-db/
    • “Azure Cosmos DB provides native support for NoSQL and OSS APIs including MongoDB, Cassandra, Gremlin and SQL”
    • Installation: Azure Portal Wizard

All installations were done with the containers as they are provided. Cosmos DB used all defaults as they were provided by the wizard interface.

The CQL script used to test was this one: https://gist.github.com/cjrolo/f5f3cc02699c06ed1f4909d632d90f8f

What I’m not doing on this blog post: performance testing, feature comparison and everything else that is not testing the API. Those might all be more or less important for other use cases, but that is not the scope of this blog.

What was tested

In this test, the following CQL APIs were tested:

  1. Keyspace Creation
  2. Table Creation
  3. Adding a Column to a table (Alter table)
  4. Data Insert
  5. Data Insert with TTL (Time-to-live)
  6. Data Insert with LWT (Lightweight Transactions)
  7. Select Data
  8. Select data with a full table scan (ALLOW FILTERING)
  9. Creating a Secondary Index (2I)

Cassandra 4.0

  • All statements worked (as expected)

CosmosDB

LWT Not supported

ALLOW FILTERING Not supported

2i Is Not supported

Scylla

LWT Not supported

Yugabyte

2i Not Supported

Results Table

So, with these results, which are not a full comparison (I have left out other parts offered in these systems), you can decide if it is compatible enough for you.

How to build your very own Cassandra 4.0 release

Over the last few months, I have been seeing references to Cassandra 4.0 and some of its new features. When that happens with a technology I am interested in, I go looking for the preview releases to download and test. Unfortunately, so far, there are no such releases. But, I am still interested, so I’ve found it necessary to build my own Cassandra 4.0 release. This is in my humble opinion not the most desirable way to do things since there is no Cassandra 4.0 branch yet. Instead, the 4.0 code is on the trunk. So if you do two builds a commit or two apart, and there are typically at least three or four commits a week right now, you get a slightly different build. It is, in essence, a moving target.

All that said and done, I decided if I could do it, then the least I could do is write about how to do it and let everyone who wants to try it learn how to avoid a couple of dumb things I did when I first tried it.

Building your very own Cassandra 4.0 release is actually pretty easy. It consists of five steps:

  1. Make sure you have your prerequisites
    1. Java SDK 1.8 or Java 1.11 Open Source or Oracle
    2. Ant 1.8
    3. Git CLI client
    4. Python >=2.7<3.0
  2. Download the GIT repository
    1. git clone https://gitbox.apache.org/repos/asf/cassandra.git
  3. Build your new Cassandra release
    1. Cd cassandra
    2. Ant
  4. Run Cassandra
    1. Cd ./bin
    2. ./cassandra
  5. Have fun
    1. ./nodetool status
    2. ./cqlsh

I will discuss each step in a little bit more detail:

Step 1) Verify, and if necessary, install your prerequisites

For Java, you can confirm the JDK presence by typing in:


john@Lenny:~$javac -version
javac 1.8.0_191

For ant:


john@Lenny:~$ ant -version
Apache Ant(TM) version 1.9.6 compiled on July 20 2018

For git:


john@Lenny:~$ git --version
git version 2.7.4

For Python:


john@Lenny:~$ python --version
Python 2.7.12

If you have all of the right versions, you are ready for the next step. If not, you will need to install the required software which I am not going to go into here.

Step 2) Clone the repository

Verify you do not already have an older copy of the repository:


john@Lenny:~$ ls -l cassandra
ls: cannot access 'cassandra': No such file or directory

If you found a Cassandra directory, you will want to delete or move it or your current directory elsewhere. Otherwise:


john@Lenny:~$ git clone https://git-wip-us.apache.org/repos/asf/cassandra.git
Cloning into 'cassandra'...
remote: Counting objects: 316165, done.
remote: Compressing objects: 100% (51450/51450), done.
remote: Total 316165 (delta 192838), reused 311524 (delta 189005)
Receiving objects: 100% (316165/316165), 157.78 MiB | 2.72 MiB/s, done.
Resolving deltas: 100% (192838/192838), done.
Checking connectivity... done.
Checking out files: 100% (3576/3576), done.

john@Lenny:~$ du -sh *
294M cassandra

At this point, you have used up 294 MB on your host and you have an honest-for-real git repo clone on your host – in my case, a Lenovo laptop running Windows 10 Linux subsystem.

And your repository looks something like this:


  john@Lenny:~$ ls -l cassandra
  total 668
  drwxrwxrwx 1 john john    512 Feb  6 15:54 bin
  -rw-rw-rw- 1 john john    260 Feb  6 15:54 build.properties.default
  -rw-rw-rw- 1 john john 101433 Feb  6 15:54 build.xml
  -rw-rw-rw- 1 john john   4832 Feb  6 15:54 CASSANDRA-14092.txt
  -rw-rw-rw- 1 john john 390460 Feb  6 15:54 CHANGES.txt
  drwxrwxrwx 1 john john    512 Feb  6 15:54 conf
  -rw-rw-rw- 1 john john   1169 Feb  6 15:54 CONTRIBUTING.md
  drwxrwxrwx 1 john john    512 Feb  6 15:54 debian
  drwxrwxrwx 1 john john    512 Feb  6 15:54 doc
  -rw-rw-rw- 1 john john   5895 Feb  6 15:54 eclipse_compiler.properties
  drwxrwxrwx 1 john john    512 Feb  6 15:54 examples
  drwxrwxrwx 1 john john    512 Feb  6 15:54 ide
  drwxrwxrwx 1 john john    512 Feb  6 15:54 lib
  -rw-rw-rw- 1 john john  11609 Feb  6 15:54 LICENSE.txt
  -rw-rw-rw- 1 john john 123614 Feb  6 15:54 NEWS.txt
  -rw-rw-rw- 1 john john   2600 Feb  6 15:54 NOTICE.txt
  drwxrwxrwx 1 john john    512 Feb  6 15:54 pylib
  -rw-rw-rw- 1 john john   3723 Feb  6 15:54 README.asc
  drwxrwxrwx 1 john john    512 Feb  6 15:54 redhat
  drwxrwxrwx 1 john john    512 Feb  6 15:54 src
  drwxrwxrwx 1 john john    512 Feb  6 15:54 test
  -rw-rw-rw- 1 john john  17215 Feb  6 15:54 TESTING.md
  drwxrwxrwx 1 john john    512 Feb  6 15:54 tools

Step 3) Build your new Cassandra 4.0 release

Remember what I said in the beginning? There is no branch for Cassandra 4.0 at this point, so building from the trunk is quite simple:


john@Lenny:~$ cd cassandra
john@Lenny:~/cassandra$ ant
Buildfile: /home/john/cassandra/build.xml

BUILD SUCCESSFUL
Total time: 1 minute 4 seconds

That went quickly enough. Let’s take a look and see how much larger the directory has gotten:


john@Lenny:~$ du -sh *
375M cassandra

Our directory grew by 81MB pretty much all in the new build directory which now has 145 new files including ./build/apache-cassandra-4.0-SNAPSHOT.jar. I am liking that version 4.0 right in the middle of the filename.

Step 4) Start Cassandra up. This one is easy if you do the sensible thing


john@Lenny:~/cassandra$ cd ..
john@Lenny:~$ cd cassandra/bin
john@Lenny:~/cassandra/bin$ ./cassandra
john@Lenny:~/cassandra/bin$ CompilerOracle: dontinline org/apache/cassandra/db/Columns$Serializer.deserializeLargeSubset (Lorg/apache/cassandra/io/util/DataInputPlus;Lorg/apache/cassandra/db/Columns;I)Lorg/apache/cassandra/db/Columns;
CompilerOracle: dontinline org/apache/cassandra/db/Columns$Serializer.serializeLargeSubset (Ljava/util/Collection;ILorg/apache/cassandra/db/Columns;ILorg/apache/cassandra/io/util/DataOutputPlus;)V
CompilerOracle: dontinline org/apache/cassandra/db/Columns$Serializer.serializeLargeSubsetSize (Ljava/util/Collection;ILorg/apache/cassandra/db/Columns;I)I

INFO [MigrationStage:1] 2019-02-06 21:26:26,222 ColumnFamilyStore.java:407 - Initializing system_auth.role_members
INFO [MigrationStage:1] 2019-02-06 21:26:26,234 ColumnFamilyStore.java:407 - Initializing system_auth.role_permissions
INFO [MigrationStage:1] 2019-02-06 21:26:26,244 ColumnFamilyStore.java:407 - Initializing system_auth.roles

We seem to be up and running. Its time to try some things out:

Step 5) Have fun

We will start out making sure we are up and running by using nodetool to connect and display a cluster status. Then we will go into the CQL shell to see something new. It is important to note that since you are likely to have nodetool and cqlsh already installed on your host, you need to use the ./ in front of your commands to ensure you are using the 4.0 version. I have learned the hard way that forgetting the ./ can result in some very real confusion.


  john@Lenny:~/cassandra/bin$ ./nodetool status
  Datacenter: datacenter1
  =======================
  Status=Up/Down
  |/ State=Normal/Leaving/Joining/Moving
  --  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
  UN  127.0.0.1  115.11 KiB  256          100.0%            f875525b-3b78-49b4-a9e1-2ab0cf46b881  rack1
            
  john@Lenny:~/cassandra/bin$ ./cqlsh
  Connected to Test Cluster at 127.0.0.1:9042.
  [cqlsh 5.0.1 | Cassandra 4.0-SNAPSHOT | CQL spec 3.4.5 | Native protocol v4]
  Use HELP for help.
  cqlsh> desc keyspaces;

  system_traces  system_auth  system_distributed     system_views
  system_schema  system       system_virtual_schema

  cqlsh>

We got a nice cluster with one node and we see the usual built-in key spaces. Well um… not exactly. We see two new key spaces system_virtual_schema and system_views. Those look very interesting.

In my next blog, I’ll be talking more about Cassandra’s new virtual table facility and how very useful it is going to be someday soon. I hope.

Handling a Cassandra transactional workload

Overview of Cassandra

As previously mentioned in my notes on lightweight transactions, Cassandra does not support ACID transactions. Cassandra was built to support a brisk ingest of writes while being distributed for availability. Follow the link to my previous post above to learn more specifics about LWTs and the Paxos algorithm.

Here I’ll cover some other ways to handle a transactional workload in Cassandra.

Batch mode

The standard write path for Cassandra is from client to memtable, and commit log to sstable. The write is stored on the memtable and commitlog of replica nodes (as configured using replication factor) before it is considered complete.

The batch write path includes, in addition, a batch log which is used to group updates that are then considered complete (or not) together. This is an expensive operation unless batch writes affect a single partition key.

As with lightweight transactions, time coordination among nodes remains important with batched writes.

Workarounds

To avoid expensive lightweight transactions or batched writes, software can be installed beside Cassandra to manage writes that need to be done together. Applications coordinate with the software to introduce locks to the write path to ensure atomicity and isolation of updates; the software used manages these locks.

Two software tools that can be used for this type of workaround are Apache Zookeeper and Hashicorp Consul. Both of these tools are typically used to manage distributed configuration but can be leveraged to assist with Cassandra transactions. Whereas Zookeeper was originally created as an in-memory data store, Consul was built to be a configuration manager.

Zookeeper

Because Zookeeper is essentially a data store, several libraries were created for the locking functionality. Two of these are Google’s Cages and Netflix’s Curator (now maintained as an Apache project). Note that Zookeeper and the Cages/Curator libraries have not been updated in several years. There is no reason application developers could not write similar functionality within their main application to interact with Zookeeper, perhaps using these as references.

Cages

Cages is a Java library used to synchronize the movement of data among distributed machines, making Cassandra transactional workloads an ideal use case.

Cages includes several classes for reading and writing data. A pertinent one for transactional workloads is ZkWriteLock, used to wrap statements inside a lock stored in Zookeeper. Note that this lock stored in Zookeeper has nothing to do with the underlying Cassandra functionality, and must be adhered to by all parts of the application. Indeed, the application or another user could bypass the lock and interact directly with Cassandra.

Curator

Curator was created specifically to manage Zookeeper, resulting in a tighter integration. Curator works similarly to Cages, though, wrapping statements in a mutex and requiring the application to observe the locks to ensure data consistency.

Consul

Consul is also a distributed storage system used to manage configuration and similar data. It is recently developed and remains up-to-date.

The distribution of Consul storage is highly flexible and performant, making it a great alternative to Zookeeper. The basic interaction from the application remains the same: the application would store a lock as a key-value in Consul, and all writes from the application would need to respect the lock.

Performance

Introducing extra steps in the write path is not free with regard to performance.

In addition to the lag inherent to locking, Zookeeper can become a bottleneck. This can be avoided by scaling the Zookeeper clusters. A feature called Observer helps to reduce time spent getting a quorum from the Zookeeper cluster.

Regardless, there is an upper limit — of about 5-10K operations per second — that you can perform per second against Zookeeper, so take this into consideration when planning an architecture.

Recommendations

If the transactional workload is infrequent and minimal, lightweight transactions should suffice. However, if transactions are a core function of the application, we recommend using Zookeeper or Consul to manage write locks. Zookeeper has a longer history, but Consul is more up-to-date and provides great flexibility and performance, giving us a preference for Consul.