Renaming and reshaping Scylla tables using scylla-migrator

We have recently faced a problem where some of the first Scylla tables we created on our main production cluster were not in line any more with the evolved schemas that recent tables are using.

This typical engineering problem requires either to keep those legacy tables and data queries or to migrate it to the more optimal model with the bandwagon of applications to be modified to query the data the new way… That’s something nobody likes doing but hey, we don’t like legacy at Numberly so let’s kill that one!

To overcome this challenge we used the scylla-migrator project and I thought it could be useful to share this experience.

How and why our schema evolved

When we first approached ID matching tables we chose to answer two problems at the same time: query the most recent data and keep the history of the changes per source ID.

This means that those tables included a date as part of their PRIMARY KEY while the partition key was obviously the matching table ID we wanted to lookup from:

CREATE TABLE IF NOT EXISTS ids_by_partnerid(
partnerid text,
id text,
date timestamp,
PRIMARY KEY ((partnerid), date, id)
)
WITH CLUSTERING ORDER BY (date DESC)

Making a table with an ever changing date in the clustering key creates what we call a history table. In the schema above the uniqueness of a row is not only defined by a partner_id / id couple but also by its date!

Quick caveat: you have to be careful about the actual date timestamp resolution since you may not want to create a row for every second of the same partner_id / id couple (we use an hour resolution).

History tables are good for analytics and we also figured we could use them for batch and real time queries where we would be interested in the “most recent ids for the given partner_id” (sometimes flavored with a LIMIT):

SELECT id FROM ids_by_partnerid WHERE partner_id = "AXZAZLKDJ" ORDER BY date DESC;

As time passed, real time Kafka pipelines started to query these tables hard and were mostly interested in “all the ids known for the given partner_id“.

A sort of DISTINCT(id) is out of the scope of our table! For this we need a table schema that represents a condensed view of the data. We call them compact tables and the only difference with the history table is that the date timestamp is simply not part of the PRIMARY KEY:

CREATE TABLE IF NOT EXISTS ids_by_partnerid(
partnerid text,
id text,
seen_date timestamp,
PRIMARY KEY ((partnerid), id)
)

To make that transition happen we thus wanted to:

  • rename history tables with an _history suffix so that they are clearly identified as such
  • get a compacted version of the tables (by keeping their old name) while renaming the date column name to seen_date
  • do it as fast as possible since we will need to stop our feeding pipeline and most of our applications during the process…

STOP: it’s not possible to rename a table in CQL!

Scylla-migrator to the rescue

We decided to abuse the scylla-migrator to perform this perilous migration.

As it was originally designed to help users migrate from Cassandra to Scylla by leveraging Spark it seemed like a good fit for the task since we happen to own Spark clusters powered by Hadoop YARN.

Building scylla-migrator for Spark < 2.4

Recent scylla-migrator does not support older Spark versions. The trick is to look at the README.md git log and checkout the hopefully right commit that supports your Spark cluster version.

In our case for Spark 2.3 we used git commit bc82a57e4134452f19a11cd127bd4c6a25f75020.

On Gentoo, make sure to use dev-java/sbt-bin since the non binary version is vastly out of date and won’t build the project. You need at least version 1.3.

The scylla-migrator plan

The documentation explains that we need a config file that points to a source cluster+table and a destination cluster+table as long as they have the same schema structure…

Renaming is then as simple as duplicating the schema using CQLSH and running the migrator!

But what about our compacted version of our original table? The schema is different from the source table!…

Good news is that as long as all your columns remain present, you can also change the PRIMARY KEY of your destination table and it will still work!

This make the scylla-migrator an amazing tool to reshape or pivot tables!

  • the column date is renamed to seen_date: that’s okay, scylla-migrator supports column renaming (it’s a Spark dataframe after all)!
  • the PRIMARY KEY is different in the compacted table since we removed the ‘date‘ from the clustering columns: we’ll get a compacted table for free!

Using scylla-migrator

The documentation is a bit poor on how to submit your application to a Hadoop YARN cluster but that’s kind of expected.

It also did not mention how to connect to a SSL enabled cluster (are there people really not using SSL on the wire in their production environment?)… anyway let’s not start a flame war 🙂

The trick that will save you is to know that you can append all the usual Spark options that are available in the spark-cassandra-connector!

Submitting to a Kerberos protected Hadoop YARN cluster targeting a SSL enabled Scylla cluster then looks like this:

export JAR_NAME=target/scala-2.11/scylla-migrator-assembly-0.0.1.jar
export KRB_PRINCIPAL=USERNAME

spark2-submit \
 --name ScyllaMigratorApplication \
 --class com.scylladb.migrator.Migrator  \
 --conf spark.cassandra.connection.ssl.clientAuth.enabled=True  \
 --conf spark.cassandra.connection.ssl.enabled=True  \
 --conf spark.cassandra.connection.ssl.trustStore.path=jssecacerts  \
 --conf spark.cassandra.connection.ssl.trustStore.password=JKS_PASSWORD  \
 --conf spark.cassandra.input.consistency.level=LOCAL_QUORUM \
 --conf spark.cassandra.output.consistency.level=LOCAL_QUORUM \
 --conf spark.scylla.config=config.yaml \
 --conf spark.yarn.executor.memoryOverhead=1g \
 --conf spark.blacklist.enabled=true  \
 --conf spark.blacklist.task.maxTaskAttemptsPerExecutor=1  \
 --conf spark.blacklist.task.maxTaskAttemptsPerNode=1  \
 --conf spark.blacklist.stage.maxFailedTasksPerExecutor=1  \
 --conf spark.blacklist.stage.maxFailedExecutorsPerNode=1  \
 --conf spark.executor.cores=16 \
 --deploy-mode client \
 --files jssecacerts \
 --jars ${JAR_NAME}  \
 --keytab ${KRB_PRINCIPAL}.keytab  \
 --master yarn \
 --principal ${KRB_PRINCIPAL}  \
 ${JAR_NAME}

Note that we chose to apply a higher consistency level to our reads using a LOCAL_QUORUM instead of the default LOCAL_ONE. I strongly encourage you to do the same since it’s appropriate when you’re using this kind of tool!

Column renaming is simply expressed in the configuration file like this:

# Column renaming configuration.
renames:
  - from: date
    to: seen_date

Tuning scylla-migrator

While easy to use, tuning scylla-migrator to operate those migrations as fast as possible turned out to be a real challenge (remember we have some production applications shut down during the process).

Even using 300+ Spark executors I couldn’t get my Scylla cluster utilization to more than 50% and migrating a single table with a bit more than 1B rows took almost 2 hours…

We found the best knobs to play with thanks to the help of Lubos Kosco and this blog post from ScyllaDB:

  • Increase the splitCount setting: more splits means more Spark executors will be spawned and more tasks out of it. While it might be magic on a pure Spark deployment it’s not that amazing on a Hadoop YARN one where executors are scheduled in containers with 1 core by default. We simply moved it from 256 to 384.
  • Disable compaction on destination tables schemas. This gave us a big boost and saved the day since it avoids adding the overhead of compacting while you’re pushing down data hard!

To disable compaction on a table simply:

ALTER TABLE ids_by_partnerid_history WITH compaction = {'class': 'NullCompactionStrategy'};

Remember to run a manual compaction (nodetool compact <keyspace> <table>) and to enable compaction back on your tables once you’re done!

Happy Scylla tables mangling!

How to Keep Pirates “Hackers” Away From Your Booty “Data” with Cassandra RBAC

Sizing Matters: Sizing Astra for Apache Cassandra Apps

Clearing the Air on Cassandra Batches

Apache Cassandra Changelog #1 (October 2020)

Introducing the first Cassandra Changelog blog! Our monthly roundup of key activities and knowledge to keep the community informed.

Cassandra Changelog header

Release Notes

Updated

The most current Apache Cassandra releases are 4.0-beta2, 3.11.8, 3.0.22, 2.2.18 and 2.1.22 released on August 31 and are in the repositories. The next cut of releases will be out soon―join the Cassandra mailing list to stay up-to-date.

We continue to make progress toward the 4.0 GA release with the overarching goal of it being at a state where major users should feel confident running it in production when it is cut. Over 1,300 Jira tickets have been closed and less than 100 remain as of this post. To gain this confidence, there are various ongoing testing efforts involving correctness, performance, and ease of use.

Added

With CASSANDRA-15013, the community improved Cassandra's ability to handle high throughput workloads, while having enough safeguards in place to protect itself from potentially going out of memory.

Added

The Harry project is a fuzz testing tool that aims to generate reproducible workloads that are as close to real-life as possible, while being able to efficiently verify the cluster state against the model without pausing the workload itself.

Added

The community published its first Apache Cassandra Usage Report 2020 detailing findings from a comprehensive global survey of 901 practitioners on Cassandra usage to provide a baseline understanding of who, how, and why organizations use Cassandra.

Community Notes

Updates on new and active Cassandra Enhancement Proposals (CEPs) and how to contribute.

Changed

CEP-2: Kubernetes Operator was introduced this year and is an active discussion on creation of a community-based operator with the goal of making it easy to run Cassandra on Kubernetes.

Added

CEP-7: Storage Attached Index (SAI) is a new secondary index for Cassandra that builds on the advancements made with SASI. It is intended to replace the existing built-in secondary index implementations.

Added

Cassandra was selected by the ASF Diversity & Inclusion committee to be included in a research project to evaluate and understand the current state of diversity.

User Space

Bigmate

"In vetting MySQL, MongoDB, and other potential databases for IoT scale, we found they couldn't match the scalability we could get with open source Apache Cassandra. Cassandra's built-for-scale architecture enables us to handle millions of operations or concurrent users each second with ease – making it ideal for IoT deployments." - Brett Orr

Bloomberg

"Our group is working on a multi-year build, creating a new Index Construction Platform to handle the daily production of the Bloomberg Barclays fixed income indices. This involves building and productionizing an Apache Solr-backed search platform to handle thousands of searches per minute, an Apache Cassandra back-end database to store millions of data points per day, and a distributed computational engine to handle millions of computations daily." - Noel Gunasekar

In the News

Solutions Review - The Five Best Apache Cassandra Books on Our Reading List

ZDNet - What Cassandra users think of their NoSQL DBMS

Datanami - Cassandra Adoption Correlates with Experience

Container Journal - 5 to 1: An Overview of Apache Cassandra Kubernetes Operators

Datanami - Cassandra Gets Monitoring, Performance Upgrades

ZDNet - Faster than ever, Apache Cassandra 4.0 beta is on its way

Cassandra Tutorials & More

A Cassandra user was in search of a tool to perform schema DDL upgrades. Another user suggested https://github.com/patka/cassandra-migration to ensure you don't get schema mismatches if running multiple upgrade statements in one migration. See the full email on the user mailing list for other recommended tools.

Start using virtual tables in Apache Cassandra 4.0 - Ben Bromhead, Instaclustr

Benchmarking Apache Cassandra with Rust - Piotr Kołaczkowski, DataStax

Open Source BI Tools and Cassandra - Arpan Patel, Anant Corporation

Build Fault Tolerant Applications With Cassandra API for Azure Cosmos DB - Abhishek Gupta, Microsoft

Understanding Data Modifications in Cassandra - Sameer Shukla, Redgate

Cassandra Changelog footer

Cassandra Changelog is curated by the community. Please send submissions to cassandra@constantia.io.

Building a Low-Latency Distributed Stock Broker Application: Part 4

In the fourth blog of the  “Around the World ” series we built a prototype of the application, designed to run in two georegions.

Recently I re-watched “Star Trek: The Motion Picture” (The original 1979 Star Trek Film). I’d forgotten how much like “2001: A Space Odyssey” the vibe was (a drawn out quest to encounter a distant, but rapidly approaching very powerful and dangerous alien called “V’ger”), and also that the “alien” was originally from Earth and returning in search of “the Creator”—V’ger was actually a seriously upgraded Voyager spacecraft!

Star Trek: The Motion Picture (V’ger)

The original Voyager 1 and 2 had only been recently launched when the movie came out, and were responsible for some remarkable discoveries (including the famous “Death Star” image of Saturn’s moon Mimas, taken in 1980). Voyager 1 has now traveled further than any other human artefact and has since left the solar system! It’s amazing that after 40+ years it’s still working, although communicating with it now takes a whopping 40 hours in round-trip latency (which happens via the Deep Space Network—one of the stations is at Tidbinbilla, near our Canberra office).

Canberra Deep Space Communication Complex (CSIRO)

Luckily we are only interested in traveling “Around the World” in this blog series, so the latency challenges we face in deploying a globally distributed stock trading application are substantially less than the 40 hours latency to outer space and back. In Part 4 of this blog series we catch up with Phileas Fogg and Passepartout in their journey, and explore the initial results from a prototype application deployed in two locations, Sydney and North Virginia.

1. The Story So Far

In Part 1 of this blog series we built a map of the world to take into account inter-region AWS latencies, and identified some “georegions” that enabled sub 100ms latency between AWS regions within the same georegion. In Part 2 we conducted some experiments to understand how to configure multi-DC Cassandra clusters and use java clients, and measured latencies from Sydney to North Virginia. In Part 3 we explored the design for a globally distributed stock broker application, and built a simulation and got some indicative latency predictions. 

The goal of the application is to ensure that stock trades are done as close as possible to the stock exchanges the stocks are listed on, to reduce the latency between receiving a stock ticker update, checking the conditions for a stock order, and initiating a trade if the conditions are met. 

2. The Prototype

For this blog I built a prototype of the application, designed to run in two georegions, Australia and the USA. We are initially only trading stocks available from stock exchanges in these two georegions, and orders for stocks traded in New York will be directed to a Broker deployed in the AWS North Virginia region, and orders for stocks traded in Sydney will be directed to a Broker deployed in the AWS Sydney region. As this Google Earth image shows, they are close to being antipodes (diametrically opposite each other, at 15,500 km apart, so pretty much satisfy the definition of  traveling “Around the World”), with a measured inter-region latency (from blog 2) of 230ms.

Sydney to North Virginia (George Washington’s Mount Vernon house)

The design of the initial (simulated) version of the application was changed in a couple of ways to: 

  • Ensure that it worked correctly when instances of the Broker were deployed in multiple AWS regions
  • Measure actual rather than simulated latencies, and
  • Use a multi-DC Cassandra Cluster. 

The prototype isn’t a complete implementation of the application yet. In particular it only uses a single Cassandra table for Orders—to ensure that Orders are made available in both georegions, and can be matched against incoming stock tickers by the Broker deployed in that region. 

Some other parts of the application are “stubs”, including the Stock Ticker component (which will eventually use Kafka), and checks/updates of Holdings and generation of Transactions records (which will eventually also be Cassandra tables). Currently only the asynchronous, non-market, order types are implemented (Limit and Stop orders), as I realized that implementing Market orders (which are required to be traded immediately) using a Cassandra table would result in too many tombstones being produced—as each order is deleted immediately upon being filled (rather than being marked as “filled” in the original design, to enable the Cassandra query to quickly find unfilled orders and prevent the Orders table from growing indefinitely). However, for non-market orders it is a reasonable design choice to use a Cassandra table, as Orders may exist for extended periods of time (hours, days, or even weeks) before being traded, and as there is only a small number of successful trades (10s to 100s) per second relative to the total number of waiting Orders (potentially millions), the number of deletions, and therefore tombstones, will be acceptable. 

We now have a look at the details of some of the more significant changes that were made to the application.

Cassandra

I created a multi-DC Cassandra keyspace as follows:

CREATE KEYSPACE broker WITH replication = {'class': 'NetworkTopologyStrategy', 'NorthVirginiaDC': '3', 'SydneyDC': '3'};

In the Cassandra Java driver, the application.conf file determines which Data Center the Java client connects to. For example, to connect to the SydneyDC the file has the following settings:

datastax-java-driver {
    basic.contact-points = [
        "1.2.3.4:9042"
    ]

    basic.load-balancing-policy {
        class = DefaultLoadBalancingPolicy
        local-datacenter = "SydneyDC"
    }
}

The Orders table was created as follows (note that I couldn’t use “limit” for a column name as it’s a reserved word in Cassandra!):

CREATE TABLE broker.orders (
    symbol text,
    orderid text,
    buyorsell text,
    customerid text,
    limitthreshold double,
    location text,
    ordertype text,
    quantity bigint,
    starttime bigint,
    PRIMARY KEY (symbol, orderid)
);

For the primary key, the partition key is the stock symbol, so that all outstanding orders for a stock can be found when each stock ticker is received by the Broker, and the clustering column is the (unique) orderid, so that multiple orders for the same symbol can be written and read, and a specific order (for an orderid) can be deleted. In a production environment using a single stock symbol partition may result in skewed and unbounded partitions which is not recommended.

The prepared statements for creating, reading, and deleting orders are as follows:

PreparedStatement prepared_insert = Cassandra.session.prepare(
"insert into broker.orders (symbol, orderid, buyorsell, customerid, limitthreshold, location, ordertype, quantity, starttime) values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
                 
PreparedStatement prepared_select = Cassandra.session.prepare(
        "select * from broker.orders where symbol = ?");

PreparedStatement prepared_delete = Cassandra.session.prepare(
        "delete from broker.orders where symbol = ? and orderid = ?");

I implemented a simplified “Place limit or stop order” operation (see Part 3), which uses the prepared_insert statement to create each new order, initially in the Cassandra Data Center local to the Broker where the order was created from, which is then automatically replicated in the other Cassandra Data Center. I also implemented the “Trade Matching Order” operation (Part 3), which uses the prepared_select statement to query orders matching each incoming Stock Ticker, checks the rules, and then if a trade is filled deletes the order.

Deployment

I created a 3 node Cassandra cluster in the Sydney AWS region, and then added another identical Data Center in the North Virginia AWS regions using Instaclustr Managed Cassandra for AWS. This gave me 6 nodes in total, running on t3.small instances (5 GB SSD, 2GB RAM, 2 CPU Cores). This is a small developer sized cluster, but is adequate for a prototype, and very affordable (2 cents an hour per node for AWS costs) given that the Brokers are currently only single threaded so don’t produce much load. We’re more interested in latency at this point of the experiment, and we may want to increase the number of Data Centers in the future. I also spun up an EC2 instance (t3a.micro) in the same AWS regions, and deployed an instance of the Stock Broker on each (it only used 20% CPU). Here’s what the complete deployment looks like:

3. The Results

For the prototype, the focus was on demonstrating that the design goal of minimizing latency for trading stop and limit orders (asynchronous trades) was achieved. For the prototype, the latency for these order types is measured from the time of receiving a Stock Ticker, to the time an Order is filled. We ran a Broker in each AWS region concurrently for an hour, with the same workload for each, and measured the average and maximum latencies. For the first configuration, each Broker is connected to its local Cassandra Data Center, which is how it would be configured in practice. The results were encouraging, with an average latency of 3ms, and a maximum of 60ms, as shown in this graph.  

During the run, across both Brokers, 300 new orders were created each second, 600 stock tickers were received each second, and 200 trades were carried out each second. 

Given that I hadn’t implemented Market Orders yet, I wondered how I could configure and approximately measure the expected latency for these synchronous order types between different regions (i.e. Sydney and North Virginia)? The latency for Market orders in the same region will be comparable to the non-market orders. The solution turned out to be simple— just re-configure the Brokers to use the remote Cassandra Data Center, which introduces the inter-region round-trip latency which would also be encountered with Market Orders placed on one region and traded immediately in the other region. I could also have achieved a similar result by changing the consistency level to EACH_QUOROM (which requires a majority of nodes in each data center to respond). Not surprisingly, the latencies were higher, rising to 360ms average, and 1200ms maximum, as shown in this graph with both configurations (Stop and Limit Orders on the left, and Market Orders on the right):

So our initial experiments are a success, and validate the primary design goal, as asynchronous stop and limit Orders can be traded with low latency from the Broker nearest the relevant stock exchanges, while synchronous Market Orders will take significantly longer due to inter-region latency. 

Write Amplification

I wondered what else can be learned from running this experiment? We can understand more about resource utilization in multi-DC Cassandra clusters. Using the Instaclustr Cassandra Console, I monitored the CPU Utilization on each of the nodes in the cluster, initially with only one Data Center and one Broker, and then with two Data Centers and a single Broker, and then both Brokers running. It turns out that the read load results in 20% CPU Utilization on each node in the local Cassandra Data Center, and the write load also results in 20% locally.  Thus, for a single Data Center cluster the total load is 40% CPU. However, with two Data Centers things get more complex due to the replication of the local write loads to each other Data Center. This is also called “Write Amplification”.

The following table shows the measured total load for 1 and 2 Data Centers, and predicted load for up to 8 Data Centers, showing that for more than 3 Data Centers you need bigger nodes (or bigger clusters). A four CPU Core node instance type would be adequate for 7 Data Centers, and would result in about 80% CPU Utilization.  

  Number of Data  Centres Local Read Load Local Write Load Remote Write Load Total Write Load Total Load
  1 20 20 0 20 40
  2 20 20 20 40 60
  3 20 20 40 60 80
  4 20 20 60 80 100
  5 20 20 80 100 120
  6 20 20 100 120 140
  7 20 20 120 140 160
  8 20 20 140 160 180

Costs

The total cost to run the prototype includes the Instaclustr Managed Cassandra nodes (3 nodes per Data Center x 2 Data Centers = 6 nodes), the two AWS EC2 Broker instances, and the data transfer between regions (AWS only charges for data out of a region, not in, but the prices vary depending on the source region). For example, data transfer out of North Virginia is 2 cents/GB, but Sydney is more expensive at 9.8 cents/GB. I computed the total monthly operating cost to be $361 for this configuration, broken down into $337/month (93%) for Cassandra and EC2 instances, and $24/month (7%) for data transfer, to process around 500 million stock trades. Note that this is only a small prototype configuration, but can easily be scaled for higher throughputs (with incrementally and proportionally increasing costs).

Conclusions

In this blog we built and experimented with a prototype of the globally distributed stock broker application, focussing on testing the multi-DC Cassandra part of the system which enabled us to significantly reduce the impact of planetary scale latencies (from seconds to low milliseconds) and ensure greater redundancy (across multiple AWS regions), for the real-time stock trading function. Some parts of the application remain as stubs, and in future blogs I aim to replace them with suitable functionality (e.g. streaming, analytics) and non-functionality (e.g. failover) from a selection of Kafka, Elasticsearch and maybe even Redis!

The post Building a Low-Latency Distributed Stock Broker Application: Part 4 appeared first on Instaclustr.

Understanding the Impacts of the Native Transport Requests Change Introduced in Cassandra 3.11.5

Summary

Recently, Cassandra made changes to the Native Transport Requests (NTR) queue behaviour. Through our performance testing, we found the new NTR change to be good for clusters that have a constant load causing the NTR queue to block. Under the new mechanism the queue no longer blocks, but throttles the load based on queue size setting, which by default is 10% of the heap.

Compared to the Native Transport Requests queue length limit, this improves how Cassandra handles traffic when queue capacity is reached. The “back pressure” mechanism more gracefully handles the overloaded NTR queue, resulting in a significant lift of operations without clients timing out. In summary, clusters with later versions of Cassandra can handle more load before hitting hard limits.

Introduction

At Instaclustr, we are responsible for managing the Cassandra versions that we release to the public. This involves performing a review of Cassandra release changes, followed by performance testing. In cases where major changes have been made in the behaviour of Cassandra, further research is required. So without further delay let’s introduce the change to be investigated.

Change:
  • Prevent client requests from blocking on executor task queue (CASSANDRA-15013)
Versions affected:

Background

Native Transport Requests

Native transport requests (NTR) are any requests made via the CQL Native Protocol. CQL Native Protocol is the way the Cassandra driver communicates with the server. This includes all reads, writes, schema changes, etc. There are a limited number of threads available to process incoming requests. When all threads are in use, some requests wait in a queue (pending). If the queue fills up, some requests are silently rejected (blocked). The server never replies, so this eventually causes a client-side timeout. The main way to prevent blocked native transport requests is to throttle load, so the requests are performed over a longer period.

Prior to 3.11.5

Prior to 3.11.5, Cassandra used the following configuration settings to set the size and throughput of the queue:

  • native_transport_max_threads is used to set the maximum threads for handling requests.  Each thread pulls requests from the NTR queue.
  • cassandra.max_queued_native_transport_requests is used to set queue size. Once the queue is full the Netty threads are blocked waiting for the queue to have free space (default 128).

Once the NTR queue is full requests from all clients are not accepted. There is no strict ordering by which blocked Netty threads will process requests. Therefore in 3.11.4 latency becomes random once all Netty threads are blocked.

Native Transport Requests - Cassandra 3.11.4

Change After 3.11.5

In 3.11.5 and above, instead of blocking the NTR queue as previously described, it throttles. The NTR queue is throttled based on the heap size. The native transport requests are limited in terms of total size occupied in memory rather than the number of them. Requests are paused after the queue is full.

  • native_transport_max_concurrent_requests_in_bytes a global limit on the number of NTR requests, measured in bytes. (default heapSize / 10)
  • native_transport_max_concurrent_requests_in_bytes_per_ip an endpoint limit on the number of NTR requests, measured in bytes. (default heapSize / 40)

Maxed Queue Behaviour

From previously conducted performance testing of 3.11.4 and 3.11.6 we noticed similar behaviour when the traffic pressure has not yet reached the point of saturation in the NTR queue. In this section, we will discuss the expected behaviour when saturation does occur and breaking point is reached. 

In 3.11.4, when the queue has been maxed, client requests will be refused. For example, when trying to make a connection via cqlsh, it will yield an error, see Figure 2.

Cassandra 3.11.4 - queue maxed out, client requests refused
Figure 2: Timed out request

Or on the client that tries to run a query, you may see NoHostAvailableException

Where a 3.11.4 cluster previously got blocked NTRs, when upgraded to 3.11.6 NTRs are no longer blocked. The reason is that 3.11.6 doesn’t place a limit on the number of NTRs but rather on the size of memory of all those NTRs. Thus when the new size limit is reached, NTRs are paused. Default settings in 3.11.6 result in a much larger NTR queue in comparison to the small 128 limit in 3.11.4 (in normal situations where the payload size would not be extremely large).

Benchmarking Setup

This testing procedure requires the NTR queue on a cluster to be at max capacity with enough load to start blocking requests at a constant rate. In order to do this we used multiple test boxes to stress the cluster. This was achieved by using 12 active boxes to create multiple client connections to the test cluster. Once the cluster NTR queue is in constant contention, we monitored the performance using:

  • Client metrics: requests per second, latency from client perspective
  • NTR Queue metrics: Active Tasks, Pending Tasks, Currently Blocked Tasks, and Paused Connections.

For testing purposes we used two testing clusters with details provided in the table below:

Cassandra Cluster size Instance Type Cores RAM Disk
3.11.4 3 M5xl-1600-v2  4 16GB 1600 GB
3.11.6 3 m5xl-1600-v2 4 16GB 1600 GB
Table 1: Cluster Details

To simplify the setup we disabled encryption and authentication. Multiple test instances were set up in the same region as the clusters. For testing purposes we used 12 KB blob payloads. To give each cluster node a balanced mixed load, we kept the number of test boxes generating write load equal to the number of instances generating read load. We ran the load against the cluster for 10 mins to temporarily saturate the queue with read and write requests and cause contention for the Netty threads.

Our test script used cassandra-stress for generating the load, you can also refer to Deep Diving cassandra-stress – Part 3 (Using YAML Profiles) for more information.

In the stressSpec.yaml, we used the following table definition and queries:

table_definition: |
 CREATE TABLE typestest (
       name text,
       choice boolean,
       date timestamp,
       address inet,
       dbl double,
       lval bigint,
               ival int,
       uid timeuuid,
       value blob,
       PRIMARY KEY((name,choice), date, address, dbl, lval, ival, uid)
 ) WITH compaction = { 'class':'LeveledCompactionStrategy' }
   AND comment='A table of many types to test wide rows'
 
columnspec:
 - name: name
   size: fixed(48)
   population: uniform(1..1000000000) # the range of unique values to select for the field 
 - name: date
   cluster: uniform(20..1000)
 - name: lval
   population: gaussian(1..1000)
   cluster: uniform(1..4)
 - name: value
   size: fixed(12000)
 
insert:
 partitions: fixed(1)       # number of unique partitions to update in a single operation
                                 # if batchcount > 1, multiple batches will be used but all partitions will
                                 # occur in all batches (unless they finish early); only the row counts will vary
 batchtype: UNLOGGED               # type of batch to use
 select: uniform(1..10)/10       # uniform chance any single generated CQL row will be visited in a partition;
                                 # generated for each partition independently, each time we visit it
 
#
# List of queries to run against the schema
#
queries:
  simple1:
     cql: select * from typestest where name = ? and choice = ? LIMIT 1
     fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
  range1:
     cql: select name, choice, uid  from typestest where name = ? and choice = ? and date >= ? LIMIT 10
     fields: multirow            # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
  simple2:
     cql: select name, choice, uid from typestest where name = ? and choice = ? LIMIT 1
     fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)

Write loads were generated with:

cassandra-stress user no-warmup 'ops(insert=10)' profile=stressSpec.yaml cl=QUORUM duration=10m -mode native cql3 maxPending=32768 connectionsPerHost=40 -rate threads=2048 -node file=node_list.txt

Read loads were generated by changing ops to

ops(simple1=10,range1=1)'

Comparison

3.11.4 Queue Saturation Test

The active NTR queue reached max capacity (at 128) and remained in contention under load. Pending NTR tasks remained above 128 throughout. At this point, timeouts were occurring when running 12 load instances to stress the cluster. Each node had 2 load instances performing reads and another 2 performing writes. 4 of the read load instances constantly logged NoHostAvailableExceptions as shown in the example below.

ERROR 04:26:42,542 [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: ec2-18-211-4-255.compute-1.amazonaws.com/18.211.4.255:9042 (com.datastax.driver.core.exceptions.OperationTimedOutException: [ec2-18-211-4-255.compute-1.amazonaws.com/18.211.4.255] Timed out waiting for server response), ec2-35-170-231-79.compute-1.amazonaws.com/35.170.231.79:9042 (com.datastax.driver.core.exceptions.OperationTimedOutException: [ec2-35-170-231-79.compute-1.amazonaws.com/35.170.231.79] Timed out waiting for server response), ec2-35-168-69-19.compute-1.amazonaws.com/35.168.69.19:9042 (com.datastax.driver.core.exceptions.OperationTimedOutException: [ec2-35-168-69-19.compute-1.amazonaws.com/35.168.69.19] Timed out waiting for server response))

The client results we got from this stress run are shown in Table 2.

Box Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
1 700.00 2,862.20 2,078.30 7,977.60 11,291.10 19,495.10 34,426.80
2 651.00 3,054.50 2,319.50 8,048.90 11,525.90 19,528.70 32,950.50
3 620.00 3,200.90 2,426.40 8,409.60 12,599.70 20,367.50 34,158.40
4 607.00 3,312.80 2,621.40 8,304.70 11,769.20 19,730.00 31,977.40
5 568.00 3,529.80 3,011.50 8,216.60 11,618.20 19,260.20 32,698.80
6 553.00 3,627.10 3,028.30 8,631.90 12,918.50 20,115.90 34,292.60
Writes 3,699.00 3,264.55 2,580.90 8,264.88 11,953.77 19,749.57 34,426.80
7 469.00 4,296.50 3,839.90 9,101.60 14,831.10 21,290.30 35,634.80
8 484.00 4,221.50 3,808.40 8,925.50 11,760.80 20,468.20 34,863.10
9 Crashed due to time out
10 Crashed due to time out
11 Crashed due to time out
12 Crashed due to time out
Reads 953.00 4,259.00 3,824.15 9,092.80 14,800.40 21,289.48 35,634.80
Summary 4,652.00 3,761.78 3,202.53 8,678.84 13,377.08 20,519.52 35,634.80
Table 2: 3.11.4 Mixed Load Saturating The NTR Queue

* To calculate the total write operations, we summed the values from 6 instances. For max write latency we used the max value from all instances and for the rest of latency values, we calculated the average of results. Write results are summarised in the Table 2 “Write” row. For the read result we did the same, and results are recorded in the “Read” row. The last row in the table summarises the results in “Write” and “Read” rows.

The 6 write load instances finished normally, but the read instances struggled. Only 2 of the read load instances were able to send traffic through normally, the other clients received too many timeout errors causing them to crash. Another observation we have made is that the Cassandra timeout metrics, under client-request-metrics, did not capture any of the client timeout we have observed.

Same Load on 3.11.6

Next, we proceeded to test 3.11.6 with the same load. Using the default NTR settings, all test instances were able to finish the stress test successfully.

Box Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
1 677.00 2,992.60 2,715.80 7,868.50 9,303.00 9,957.30 10,510.90
2 658.00 3,080.20 2,770.30 7,918.80 9,319.70 10,116.70 10,510.90
3 653.00 3,102.80 2,785.00 7,939.80 9,353.30 10,116.70 10,510.90
4 608.00 3,340.90 3,028.30 8,057.30 9,386.90 10,192.20 10,502.50
5 639.00 3,178.30 2,868.90 7,994.30 9,370.10 10,116.70 10,510.90
6 650.00 3,120.50 2,799.70 7,952.40 9,353.30 10,116.70 10,510.90
Writes 3,885.00 3,135.88 2,828.00 7,955.18 9,347.72 10,102.72 10,510.90
7 755.00 2,677.70 2,468.30 7,923.00 9,378.50 9,982.40 10,762.60
8 640.00 3,160.70 2,812.30 8,132.80 9,529.50 10,418.70 11,031.00
9 592.00 3,427.60 3,101.70 8,262.80 9,579.80 10,452.20 11,005.90
10 583.00 3,483.00 3,160.40 8,279.60 9,579.80 10,435.40 11,022.60
11 582.00 3,503.60 3,181.40 8,287.90 9,588.20 10,469.00 11,047.80
12 582.00 3,506.70 3,181.40 8,279.60 9,588.20 10,460.60 11,014.20
Reads 3,734.00 3,293.22 2,984.25 8,194.28 9,540.67 10,369.72 11,047.80
Summary 7,619.00 3,214.55 2,906.13 8,074.73 9,444.19 10,236.22 11,047.80
Table 3: 3.11.6 Mixed Load

Default Native Transport Requests (NTR) Setting Comparison

Taking the summary row from both versions (Table 2 and Table 3), we produced Table 4.

Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
3.11.4 4652 3761.775 3202.525 8678.839167 13377.08183 20519.52228 35634.8
3.11.6 7619 3214.55 2906.125 8074.733333 9444.191667 10236.21667 11047.8
Table 4: Mixed Load 3.11.4 vs 3.11.6


Figure 2: Latency 3.11.4 vs 3.11.6

Figure 2 shows the latencies from Table 4. From the results, 3.11.6 had slightly better average latency than 3.11.4. Furthermore, in the worst case where contention is high, 3.11.6 handled the latency of a request better than 3.11.4. This is shown by the difference in Latency Max. Not only did 3.11.6 have lower latency but it was able to process many more requests due to not having a blocked queue.

3.11.6 Queue Saturation Test

The default native_transport_max_concurrent_requests_in_bytes is set to 1/10 of the heap size. The Cassandra max heap size of our cluster is 8 GB, so the default queue size for our queue is 0.8 GB. This turns out to be too large for this cluster size, as this configuration will run into CPU and other bottlenecks before we hit NTR saturation.

So we took the reverse approach to investigate full queue behaviour, which is setting the queue size to a lower number. In cassandra.yaml, we added:

native_transport_max_concurrent_requests_in_bytes: 1000000

This means we set the global queue size to be throttled at 1MB. Once Cassandra was restarted and all nodes were online with the new settings, we ran the same mixed load on this cluster, the results we got are shown in Table 5.

3.11.6 Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
Write: Default setting 3,885.00 3,135.88 2,828.00 7,955.18 9,347.72 10,102.72 10,510.90
Write: 1MB setting 2,105.00 5,749.13 3,471.82 16,924.02 26,172.45 29,681.68 31,105.00
Read: Default setting 3,734.00 3,293.22 2,984.25 8,194.28 9,540.67 10,369.72 11,047.80
Read: 1MB setting 5,395.00 2,263.13 1,864.55 5,176.47 8,074.73 9,693.03 15,183.40
Summary: Default setting 7,619.00 3,214.55 2,906.13 8,074.73 9,444.19 10,236.22 11,047.80
Summary: 1MB setting 7,500.00 4,006.13 2,668.18 11,050.24 17,123.59 19,687.36 31,105.00

Table 5: 3.11.6 native_transport_max_concurrent_requests_in_bytes default and 1MB setting 

During the test, we observed a lot of paused connections and discarded requests—see Figure 3. For a full list of Instaclustr exposed metrics see our support documentation.

NTR Test - Paused Connections and Discarded Requests
Figure 3: 3.11.6 Paused Connections and Discarded Requests

After setting native_transport_max_concurrent_requests_in_bytes to a lower number, we start to get paused connections and discarded requests, write latency increased resulting in fewer processed operations, shown in Table 5. The increased write latency is illustrated Figure 4.

Cassandra 3.11.6 Write Latency Under Different Settings
Figure 4: 3.11.6 Write Latency Under Different Settings

On the other hand, read latency decreased, see Figure 5, resulting in a higher number of operations being processed.

Cassandra 3.11.6 Read Latency Under Different Settings
Figure 5: 3.11.6 Read Latency Under Different Settings
Cassandra 3.11.6 Operations Rate Under Different Settings
Figure 6: 3.11.6 Operations Rate Under Different Settings

As illustrated in Figure 6, the total number of operations decreased slightly with the 1MB setting, but the difference is very small and the effect of read and write almost “cancel each other out”. However, when we look at each type of operation individually, we can see that rather than getting equal share of the channel in a default setting of “almost unlimited queue”, the lower queue size penalizes writes and favors read. While our testing identified this outcome, further investigation will be required to determine exactly why this is the case.

Conclusion

In conclusion, the new NTR change offers an improvement over the previous NTR queue behaviour. Through our performance testing we found the change to be good for clusters that have a constant load causing the NTR queue to block. Under the new mechanism the queue no longer blocks, but throttles the load based on the amount of memory allocated to requests.

The results from testing indicated that the changed queue behaviour reduced latency and provided a significant lift in the number of operations without clients timing out. Clusters with our latest version of Cassandra can handle more load before hitting hard limits. For more information feel free to comment below or reach out to our Support team to learn more about changes to 3.11.6 or any of our other supported Cassandra versions.

The post Understanding the Impacts of the Native Transport Requests Change Introduced in Cassandra 3.11.5 appeared first on Instaclustr.

Apache Cassandra Usage Report 2020

Apache Cassandra is the open source NoSQL database for mission critical data. Today the community announced findings from a comprehensive global survey of 901 practitioners on Cassandra usage. It’s the first of what will become an annual survey that provides a baseline understanding of who, how, and why organizations use Cassandra.

“I saw zero downtime at global scale with Apache Cassandra. That’s a powerful statement to make. For our business that’s quite crucial.” - Practitioner, London

Key Themes

Cassandra adoption is correlated with organizations in a more advanced stage of digital transformation.

People from organizations that self-identified as being in a “highly advanced” stage of digital transformation were more likely to be using Cassandra (26%) compared with those in an “advanced” stage (10%) or “in process” (5%).

Optionality, security, and scalability are among the key reasons Cassandra is selected by practitioners.

The top reasons practitioners use Cassandra for mission critical apps are “good hybrid solutions” (62%), “very secure” (60%), “highly scalable” (57%), “fast” (57%), and “easy to build apps with” (55%).

A lack of skilled staff and the challenge of migration deters adoption of Cassandra.

Thirty-six percent of practitioners currently using Cassandra for mission critical apps say that a lack of Cassandra-skilled team members may deter adoption. When asked what it would take for practitioners to use Cassandra for more applications and features in production, they said “easier to migrate” and “easier to integrate.”

Methodology

Sample. The survey consisted of 1,404 interviews of IT professionals and executives, including 901 practitioners which is the focus of this usage report, from April 13-23, 2020. Respondents came from 13 geographies (China, India, Japan, South Korea, Germany, United Kingdom, France, the Netherlands, Ireland, Brazil, Mexico, Argentina, and the U.S.) and the survey was offered in seven languages corresponding to those geographies. While margin of sampling error cannot technically be calculated for online panel populations where the relationship between sample and universe is unknown, the margin of sampling error for equivalent representative samples would be +/- 2.6% for the total sample, +/- 3.3% for the practitioner sample, and +/- 4.4% for the executive sample.

To ensure the highest quality respondents, surveys include enhanced screening beyond title and activities of company size (no companies under 100 employees), cloud IT knowledge, and years of IT experience.

Rounding and multi-response. Figures may not add to 100 due to rounding or multi-response questions.

Demographics

Practitioner respondents represent a variety of roles as follows: Dev/DevOps (52%), Ops/Architect (29%), Data Scientists and Engineers (11%), and Database Administrators (8%) in the Americas (43%), Europe (32%), and Asia Pacific (12%).

Cassandra roles

Respondents include both enterprise (65% from companies with 1k+ employees) and SMEs (35% from companies with at least 100 employees). Industries include IT (45%), financial services (11%), manufacturing (8%), health care (4%), retail (3%), government (5%), education (4%), telco (3%), and 17% were listed as “other.”

Cassandra companies

Cassandra Adoption

Twenty-two percent of practitioners are currently using or evaluating Cassandra with an additional 11% planning to use it in the next 12 months.

Of those currently using Cassandra, 89% are using open source Cassandra, including both self-managed (72%) and third-party managed (48%).

Practitioners using Cassandra today are more likely to use it for more projects tomorrow. Overall, 15% of practitioners say they are extremely likely (10 on a 10-pt scale) to use it for their next project. Of those, 71% are currently using or have used it before.

Cassandra adoption

Cassandra Usage

People from organizations that self-identified as being in a “highly advanced” stage of digital transformation were more likely to be using Cassandra (26%) compared with those in an “advanced” stage (10%) in “in process” (5%).

Cassandra predominates in very important or mission critical apps. Among practitioners, 31% use Cassandra for their mission critical applications, 55% for their very important applications, 38% for their somewhat important applications, and 20% for their least important applications.

“We’re scheduling 100s of millions of messages to be sent. Per day. If it’s two weeks, we’re talking about a couple billion. So for this, we use Cassandra.” - Practitioner, Amsterdam

Cassandra usage

Why Cassandra?

The top reasons practitioners use Cassandra for mission critical apps are “good hybrid solutions” (62%), “very secure” (60%), “highly scalable” (57%), “fast” (57%), and “easy to build apps with” (55%).

“High traffic, high data environments where really you’re just looking for very simplistic key value persistence of your data. It’s going to be a great fit for you, I can promise that.” - Global SVP Engineering

Top reasons practitioners use Cassandra

For companies in a highly advanced stage of digital transformation, 58% cite “won’t lose data” as the top reason, followed by “gives me confidence” (56%), “cloud native” (56%), and “very secure” (56%).

“It can’t lose anything, it has to be able to capture everything. It can’t have any security defects. It needs to be somewhat compatible with the environment. If we adopt a new database, it can’t be a duplicate of the data we already have.… So: Cassandra.” - Practitioner, San Francisco

However, 36% of practitioners currently using Cassandra for mission critical apps say that a lack of Cassandra-skilled team members may deter adoption.

“We don’t have time to train a ton of developers, so that time to deploy, time to onboard, that’s really key. All the other stuff, scalability, that all sounds fine.” – Practitioner, London

When asked what it would take for practitioners to use Cassandra for more applications and features in production, they said “easier to migrate” and “easier to integrate.”

“If I can get started and be productive in 30 minutes, it’s a no brainer.” - Practitioner, London

Conclusion

We invite anyone who is curious about Cassandra to test the 4.0 beta release. There will be no new features or breaking API changes in future Beta or GA builds, so you can expect the time you put into the beta to translate into transitioning your production workloads to 4.0.

We also invite you to participate in a short survey about Kubernetes and Cassandra that is open through September 24, 2020. Details will be shared with the Cassandra Kubernetes SIG after it closes.

Survey Credits

A volunteer from the community helped analyze the report, which was conducted by ClearPath Strategies, a strategic consulting and research firm, and donated to the community by DataStax. It is available for use under Creative Commons Attribution-ShareAlike 4.0 International (CC BY-SA 4.0).

Improving Apache Cassandra’s Front Door and Backpressure

As part of CASSANDRA-15013, we have improved Cassandra’s ability to handle high throughput workloads, while having enough safeguards in place to protect itself from potentially going out of memory. In order to better explain the change we have made, let us understand at a high level, on how an incoming request is processed by Cassandra before the fix, followed by what we changed, and the new relevant configuration knobs available.

How inbound requests were handled before

Let us take the scenario of a client application sending requests to C* cluster. For the purpose of this blog, let us focus on one of the C* coordinator nodes.

alt_text

Below is the microscopic view of client-server interaction at the C* coordinator node. Each client connection to Cassandra node happens over a netty channel, and for efficiency purposes, each Netty eventloop thread is responsible for more than one netty channel.

alt_text

The eventloop threads read requests coming off of netty channels and enqueue them into a bounded inbound queue in the Cassandra node.

alt_text

A thread pool dequeues requests from the inbound queue, processes them asynchronously and enqueues the response into an outbound queue. There exist multiple outbound queues, one for each eventloop thread to avoid races.

alt_text

alt_text

alt_text

The same eventloop threads that are responsible for enqueuing incoming requests into the inbound queue, are also responsible for dequeuing responses off from the outbound queue and shipping responses back to the client.

alt_text

alt_text

Issue with this workflow

Let us take a scenario where there is a spike in operations from the client. The eventloop threads are now enqueuing requests at a much higher rate than the rate at which the requests are being processed by the native transport thread pool. Eventually, the inbound queue reaches its limit and says it cannot store any more requests in the queue.

alt_text

Consequently, the eventloop threads get into a blocked state as they try to enqueue more requests into an already full inbound queue. They wait until they can successfully enqueue the request in hand, into the queue.

alt_text

As noted earlier, these blocked eventloop threads are also supposed to dequeue responses from the outbound queue. Given they are in blocked state, the outbound queue (which is unbounded) grows endlessly, with all the responses, eventually resulting in C* going out of memory. This is a vicious cycle because, since the eventloop threads are blocked, there is no one to ship responses back to the client; eventually client side timeout triggers, and clients may send more requests due to retries. This is an unfortunate situation to be in, since Cassandra is doing all the work of processing these requests as fast as it can, but there is no one to ship the produced responses back to the client.

alt_text

So far, we have built a fair understanding of how the front door of C* works with regard to handling client requests, and how blocked eventloop threads can affect Cassandra.

What we changed

Backpressure

The essential root cause of the issue is that eventloop threads are getting blocked. Let us not block them by making the bounded inbound queue unbounded. If we are not careful here though, we could have an out of memory situation, this time because of the unbounded inbound queue. So we defined an overloaded state for the node based on the memory usage of the inbound queue.

We introduced two levels of thresholds, one at the node level, and the other more granular, at client IP. The one at client IP helps to isolate rogue client IPs, while not affecting other good clients, if there is such a situation.

These thresholds can be set using cassandra yaml file.

native_transport_max_concurrent_requests_in_bytes_per_ip
native_transport_max_concurrent_requests_in_bytes

These thresholds can be further changed at runtime (CASSANDRA-15519).

Configurable server response to the client as part of backpressure

If C* happens to be in overloaded state (as defined by the thresholds mentioned above), C* can react in one of the following ways:

  • Apply backpressure by setting “Autoread” to false on the netty channel in question (default behavior).
  • Respond back to the client with Overloaded Exception (if client sets “THROW_ON_OVERLOAD” connection startup option to “true.”

Let us look at the client request-response workflow again, in both these cases.

THROW_ON_OVERLOAD = false (default)

If the inbound queue is full (i.e. the thresholds are met).

alt_text

C* sets autoread to false on the netty channel, which means it will stop reading bytes off of the netty channel.

alt_text

Consequently, the kernel socket inbound buffer becomes full since no bytes are being read off of it by netty eventloop.

alt_text

Once the Kernel Socket Inbound Buffer is full on the server side, things start getting piled up in the Kernel Socket Outbound Buffer on the client side, and once this buffer gets full, client will start experiencing backpressure.

alt_text

THROW_ON_OVERLOAD = true

If the inbound queue is full (i.e. the thresholds are met), eventloop threads do not enqueue the request into the Inbound Queue. Instead, the eventloop thread creates an OverloadedException response message and enqueues it into the flusher queue, which will then be shipped back to the client.

alt_text

This way, Cassandra is able to serve very large throughput, while protecting itself from getting into memory starvation issues. This patch has been vetted through thorough performance benchmarking. Detailed performance analysis can be found here.

Apache Cassandra vs DynamoDB

In this post, we’ll look at some of the key differences between Apache Cassandra (hereafter just Cassandra) and DynamoDB.

Both are distributed databases and have similar architecture, and both offer incredible scalability, reliability, and resilience. However, there are also differences,  and understanding the differences and cost benefits can help you determine the right solution for your application.

Apache Cassandra is an open source database available at no cost from the Apache Foundation. Installing and configuring Cassandra can be challenging and there is more than one pitfall along the way. However, Cassandra can be installed on any cloud service or at a physical location you choose.

The typical Cassandra installation is a cluster which is a collection of nodes (a node is a single instance of Cassandra installed on a computer or in a Docker container). Nodes can then be grouped in racks and data centers which can be in different locations (cloud zones and regions or physical collocations). You must scale Cassandra as your demand grows and are responsible for the ongoing management tasks such as backups, replacing bad nodes, or adding new nodes to meet demand.

Amazon DynamoDB is a fully managed database as a service. All implementation details are hidden and from the user viewpoint DynamoDB is serverless. DynamoDB automatically scales throughput capacity to meet workload demands, and partitions and repartitions your data as your table size grows, and distributes data across multiple availability zones. However, the service is available only through Amazon Web Services (AWS).

Replica Configuration and Placement

NoSQL data stores like Cassandra and DynamoDB use multiple replicas (copies of data) to ensure high availability and durability. The number of replicas and their placement determines the availability of your data.

With Cassandra, the number of replicas to have per cluster—the replication factor—and their placement is configurable. A cluster can be subdivided into two or more data centers which can be located in different cloud regions or physical collocations. The nodes in a data center can be assigned to different racks that can be assigned to different zones or to different physical racks.

In contrast, with DynamoDB, Amazon makes these decisions for you. By default, data is located in a single region and is replicated to three (3) availability zones in that region. Replication to different AWS regions is available as an option. Amazon streams must be enabled for multi-region replication.

Data Model

The top level data structure in Cassandra is the keyspace which is analogous to a relational database. The keyspace is the container for the tables and it is where you configure the replica count and placement. Keyspaces contain tables (formerly called column families) composed of rows and columns. A table schema must be defined at the time of table creation.

The top level structure for DynamoDB is the table which has the same functionality as the Cassandra table. Rows are items, and cells are attributes. In DynamoDB, it’s possible to define a schema for each item, rather than for the whole table.

Both tables store data in sparse rows—for a given row, they store only the columns present in that row. Each table must have a primary key that uniquely identifies rows or items. Every table must have a primary key which has two components: 

  • A partition key that determines the placement of the data by subsetting the table rows into partitions. This key is required.
  • A key that sorts the rows within a partition. In Cassandra, this is called the clustering key while DynamoDB calls it the sort key. This key is optional.

Taken together, the primary key ensures that each row in a table is unique. 

 Differences 

  • DynamoDB limits the number of tables in an AWS region to 256.  If you need more tables, you must contact AWS support. There are no hard limits in Cassandra. The practical limit is around 500 tables.
  • DynamoDB is schemaless. Only the primary key attributes need to be defined at table creation. 
  • DynamoDB charges for read and write throughput and requires you to manage capacity for each table. Read and write throughput and associated costs must be considered when designing tables and applications. 
  • The maximum size of an item in DynamoDB is 400KB. With Cassandra, the hard limit is 2GB; the practical limit is a few megabytes.
  • In DynamoDB, the primary key can have only one attribute as the primary key and one attribute as the sort key. Cassandra allows composite partition keys and multiple clustering columns.
  • Cassandra supports counter, time, timestamp, uuid, and timeuuid data types not found in DynamoDB.

Allocating Table Capacity

Both Cassandra and DynamoDB require capacity planning before setting up a cluster. However, the approaches are different. 

To create a performant Cassandra cluster, you must first make reasonably accurate estimates of your future workloads. Capacity is allocated by creating a good data model, choosing the right hardware, and properly sizing the cluster. Increasing workloads are met by adding nodes.

With DynamoDB, capacity planning is determined by the type of the read/write capacity modes you choose. On demand capacity requires no capacity planning other than setting an upper limit on each table. You pay only for the read and write requests on the table. Capacity is measured in Read Resource Units and Write Resource Units. On demand mode is best when you have an unknown workload, unpredictable application traffic, or you prefer the ease of paying for only what you use.

With provisioned capacity, you must specify the number of reads and write throughput limits for each table at the time of creation. If you exceed these limits for a table or tables, DynamoDB will throttle queries until usage is below defined capacity. Auto-scaling will adjust your table’s provisioned capacity automatically in response to traffic changes although there is a lag between the time throttling starts and increased capacity is applied.   

The throughput limits are provisioned in units called Read Capacity Units (RCU) and Write Capacity Units (WCU); queries are throttled whenever these limits are exceeded. One read capacity unit represents one strongly consistent read per second, or two eventually consistent reads per second, for an item up to 4 KB in size. Transactional read requests require two read capacity units to perform one read per second for items up to 4 KB. If you need to read an item that is larger than 4 KB, DynamoDB must consume additional read capacity units. One write capacity unit represents one write per second for an item up to 1 KB in size. If you need to write an item that is larger than 1 KB, DynamoDB must consume additional write capacity units. Transactional write requests require 2 write capacity units to perform one write per second for items up to 1 KB. For more information, see Managing Settings on DynamoDB Provisioned Capacity Tables.

Provisioned mode is a good option if any of the following are true: 

  • You have predictable application traffic.
  • Application traffic is consistent or ramps gradually.
  • You can forecast capacity requirements to control costs.

Partitions

Both Cassandra and DynamoDB group and distribute data based on the hashed value of the partition key. Both call these grouping partitions but they have very different definitions.

In Dynamo the partition is a storage unit that has a maximum size of 10 GB. When a partition fills, DynamoDB creates a new partition and the user has no control over the process. A partition can contain items with different partition key values. When a sort key is used, all the items with the same partition key value physically close together, ordered by sort key value.

DynamoDB partitions have capacity limits of 3,000 RCU or 1,000 WCU even for on-demand tables. Furthermore, these limits cannot be increased. If you exceed the partition limits, your queries will be throttled even if you have not exceeded the capacity of the table. See Throttling and Hot Keys (below) for more information.

A Cassandra partition is a set of rows that share the same hashed partition key value.  Rows with the same partition key are stored on the same node. Rows within the partition are sorted by the clustering columns. If no clustering column was specified, the partition holds a single row. While it would not be desirable, it would be possible for an application to drive tens of thousands of reads/writes to a single partition. 

See Cassandra Data Partitioning.

Query Language 

Cassandra provides a SQL-like language called Cassandra Query Language (CQL) to access data. DynamoDB uses JSON syntax. The following table shows the syntax for the query “return all information from the Music table for the song title ‘Lullaby of Broadway’ and the artist ‘Tommy Dorsey’”

CQL DynamoDB
Request all information for the song  ‘Lullaby of Broadway‘ played by Tommy Dorsey

SELECT *

FROM Music

WHERE Artist=’Tommy Dorsey’ AND SongTitle = ‘Lullaby of Broadway

get-item {

    TableName: “Music”,

    Key: {

        “Artist”: “Tommy Dorsey”,

        “SongTitle”: “Lullaby of Broadway

    }

} 


Secondary Indexes

By default, Cassandra and DynamoDB queries can use only the primary key columns in the search condition which must include all partition key columns. Non-key columns can be used in a search by creating an index on that column.

Cassandra supports creating an index on most columns including a clustering column of a compound primary key or on the partition key itself. Creating an index on a collection or the key of a collection map is also supported. However, when used incorrectly a secondary index can hurt performance. A general rule of thumb is to index a column with low cardinality of few values and to use only with the partition key in the search clause. Because the index table is stored on each node in a cluster, a query using a secondary index can degrade performance if multiple nodes are accessed. 

DynamoDB has local secondary indexes. This index uses the same partition key as the base table but has a different sort key. Scoped to the base table partition that has the same partition key value. Local secondary indexes must be created at the same time the table is created. A maximum of 5 local secondary indexes may be created per table. 

Materialized Views versus Global Secondary Indexes 

In Cassandra, a Materialized View (MV) is a table built from the results of a query from another table but with a new primary key and new properties. Queries are optimized by the primary key definition. The purpose of a materialized view is to provide multiple queries for a single table. It is an alternative to the standard practice of creating a new table with the same data if a different query is needed. Data in the materialized view is updated automatically by changes to the source table. However, the materialized view is an experimental feature and should not be used in production.

A similar object is DynamoDB is the Global Secondary Index (GSI) which creates an eventually consistent replica of a table. The GSI are created at table creation time and each table has a limit of 20. The GSI must be provisioned for reads and writes; there are storage costs as well.

Time To Live

Time To Live (TTL) is a feature that automatically removes items from your table after a period of time has elapsed.

  • Cassandra specifies TTL as the number of seconds from the time of creating or updating a row, after which the row expires.
  • In DynamoDB, TTL is a timestamp value representing the date and time at which the item expires.
  • DynamoDB applies TTL at item level. Cassandra applies it to the column.

Consistency

Both Cassandra and DynamoDB are distributed data stores.  In a distributed system there is a tradeoff between consistency—every read receives the most recent write or an error, and availability—every request receives a (non-error) response but without the guarantee that it contains the most recent write. In such a system there are two levels possible levels of consistency:

  • Eventual consistency. This implies that all updates reach all replicas eventually. A read with eventual consistency may return stale data until all replicas are reconciled to a consistent state.
  • Strong consistency returns up-to-date data for all prior successful writes but at the cost of slower response time and decreased availability.

DynamoDB supports eventually consistent and strongly consistent reads on a per query basis. The default is eventual consistency. How it is done is hidden from the user.

 Strongly consistent reads in DynamoDB have the following issues: 

  • The data might not be available if there is a network delay or outage.
  • The operation may have higher latency.
  • Strongly consistent reads are not supported on global secondary indexes.
  • Strongly consistent reads use more throughput capacity than eventually consistent reads and therefore is more expensive. See Throttling and Hot Keys (below).

Cassandra offers tunable consistency for any given read or write operation that is configurable by adjusting consistency levels. The consistency level is defined as the minimum number of Cassandra nodes that must acknowledge a read or write operation before the operation can be considered successful. You are able to configure strong consistency for both reads and writes at a tradeoff of increasing latency.

Conflicts Resolution

In a distributed system, there is the possibility that a query may return inconsistent data from the replicas. Both Cassandra and DynamoDB resolve any inconsistencies with a “last write wins” solution but with Cassandra, every time a piece of data is written to the cluster, a timestamp is attached. Then, when Cassandra has to deal with conflicting data, it simply chooses the data with the most recent timestamp.

For DynamoDB, “last write wins” applies only to global tables and strongly consistent reads.

Security Features

Cassandra and DynamoDB provide methods for user authentication and authorization and data access permissions  Both use encryption for client and inter-node communication. DynamoDB also offers encryption at rest. Instaclustr offers encryption at rest for its Cassandra Managed Services.

Performance Issues

Consistency and Read Speed

Choosing strong consistency requires more nodes to respond to a request which increases latency.

Scans

Scans are expensive for both systems. Scans in Cassandra are slow because Cassandra has to scan all nodes in the cluster. Scans are faster in DynamoDB but are expensive because resource use is based on the amount of data read not returned to the client. If the scan exceeds your provisioned read capacity, DynamoDB will generate errors.

DynamoDB’s Issues

Auto Scaling

Amazon DynamoDB auto scaling uses the AWS Application Auto Scaling Service to dynamically adjust provisioned throughput capacity on your behalf, in response to actual traffic patterns. This enables a table or a global secondary index to increase its provisioned read and write capacity to handle sudden increases in traffic, without throttling. When the workload decreases, application auto scaling decreases the throughput so that you don’t pay for unused provisioned capacity.

  • Auto scaling does not work well with varying and bursty workloads. The table will scale up only based on consumption, triggering these alarms time after time until it reaches the desired level.
  • There can be a lag (5-15 minutes) between the time capacity is exceeded and autoscaling takes effect.
  • Capacity decreases are limited. The maximum is 27 per day (A day is defined according to the GMT time zone).
  • Tables scale based on the number of requests made, not the number of successful requests.
  • Autoscaling cannot exceed hard I/O limits for tables.

Throttling and Hot Keys

In DynamoDB, the provisioned capacity of a table is shared evenly across all the partitions in a table with the consequence that each partition has a capacity less than the table capacity. For example, a table that has been provisioned with 400 WCU and 100 RCU and had 4 partitions, each partition would have a write capacity of 100 WCU and 25 RCU.  

So if you had a query send most read/write requests to a single partition—a hot partition— and that throughput exceeded the shared capacity of that partition, your queries would be throttled even though you had unused capacity in the table. If your application creates a hot partition, your only recourse would be either to redesign the data model or to over allocate capacity to the table.

Adaptive capacity can provide up to 5 minutes of grace time by allocating unused capacity from other partitions to the “hot” one provided unused capacity is available and hard limits are not reached. The hard limits on a partition are 3,000 RCU or 1,000 WCU.

Cross-Region Replication

If you want a DynamoDB table to span regions, you’ll need to use global tables that require careful planning and implementation. The tables in all regions must use on-demand allocation or have auto scaling enabled. The tables in each region must be the same: time to live, the number of global, and local secondary indexes, and so on.

Global tables support only eventual consistency that can lead to data inconsistency. In such a conflict, DynamoDB applies a last writer wins (lww) policy. The data with the most recent timestamp is used.

Migrating an existing local table to a global table requires additional steps. First, DynamoDB Streams is enabled on the original local table to create a changelog. Then an AWS Lambda function is configured to copy the corresponding change from the original table to the new global table.

Cost Explosion 

As highlighted in Managed Cassandra Versus DynamoDB, DynamoDB’s pricing model can easily make it expensive for a fast growing company:

  • The pricing model is based on throughput, therefore costs increase as IO increases. 
  • A hot partition may require you to overprovision a table.
  • Small reads and writes are penalized because measured throughput is rounded up to the nearest 1 KB boundary for writes and 4 KB boundary for reads.
  • Writes are four times more expensive than reads.
  • Strongly consistent reads are twice the cost of eventually consistent reads. 
  • Workloads performing scans or queries can be costly because the read capacity units are calculated on the number of bytes read rather than the amount of data returned.
  • Read heavy workloads may require DynamoDB Accelerator (DAX) which is priced per node-hour consumed. A partial node-hour consumed is billed as a full hour.
  • Cross region replication incurs additional charges for the amount of data replicated by the Global Secondary Index and the data read from DynamoDB Streams.
  • DynamoDB does not distinguish between a customer-facing, production table versus tables used for development, testing, and staging.

Summary 

DynamoDB’s advantages are an easy start; absence of the database management burden; sufficient flexibility, availability, and auto scaling; in-built metrics for monitoring; encryption of data at rest.

Cassandra’s main advantages are: fast speed of writes and reads; constant availability; a SQL-like Cassandra Query Language instead of DynamoDB’s complex API; reliable cross data center replication; linear scalability and high performance. 

However, the mere technical details of the two databases shouldn’t be the only aspect to analyze before making a choice.

Cassandra versus DynamoDB costs must be compared carefully. If you have an application that mostly writes or one that relies on strong read consistency, Cassandra may be a better choice.

You will also need to know which technologies you need to supplement the database? If you need the open source like ElasticSearch, Apache Spark, or Apache Kafka, Cassandra is your choice. If you plan to make extensive use of AWS products, then it’s DynamoDB.

If you use a cloud provider other than AWS or run your own data centers, then you would need to choose Cassandra.

The post Apache Cassandra vs DynamoDB appeared first on Instaclustr.

Python scylla-driver: how we unleashed the Scylla monster’s performance

At Scylla summit 2019 I had the chance to meet Israel Fruchter and we dreamed of working on adding shard-awareness support to the Python cassandra-driver which would be known as scylla-driver.

A few months later, when Israel reached out to me on the Scylla-Users #pythonistas Slack channel with the first draft PR I was so excited that I jumped in the wagon to help!

The efforts we put into the scylla-driver paid off and significantly improved the performance of the production applications that I had the chance to switch to using it by 15 to 25%!

Before we reached those numbers and even released the scylla-driver to PyPi, EuroPython 2020 RFP was open and I submitted a talk proposal which was luckily accepted by the community.

So I had the chance to deep-dive into Cassandra vs Scylla architecture differences, explain the rationale behind creating the scylla-driver and give Python code details on how we implemented it and the challenges we faced doing so. Check my talk spage for

I also explained that I wrote an RFC on the scylla-dev mailing list which lead the developers of Scylla to implement a new connection-to-shard algorithm that will allow clients connecting to a new listening port to select the actual shard they want a connection to.

This is an expected major optimization from the current mostly random way of connecting to all shards and I’m happy to say that it’s been implemented and is ready to be put to use by all the scylla drivers.

I’ve recently been contacted by PyCon India and other Python related conferences organizers for a talk so I’ve submitted one to PyCon India where I hope I’ll be able to showcase even better numbers thanks to the new algorithm!

After my Europython talk we also had very interesting discussions with Pau Freixes about his work on a fully asynchronous Python driver that wraps the C++ driver to get the best possible performance. First results are absolutely amazing so if you’re interested in this, make sure to give it a try and contribute to the driver!

Stay tuned for more amazing query latencies 😉

Benchmarking Cassandra with Local Storage on Azure

Continuing our efforts in adding support for the latest generation of Azure Virtual Machines, Instaclustr is pleased to announce support for the D15v2 and L8v2 types with local storage support for Apache Cassandra clusters on the Instaclustr Managed Platform. So let’s begin by introducing the two forms of storage.

Local storage is a non-persistent (i.e. volatile) storage that is internal to the physical Hyper-V host that a VM is running on at any given time. As soon as the VM is moved to another physical host as a result of a deallocate (stop) and start command, hardware or VM crash, or Azure maintenance, the local storage is wiped clean and any data stored on it is lost.

Remote storage is persistent and comes in the form of managed (or unmanaged) disks.  These disks can have different performance and redundancy characteristics. Remote storage is generally used for any VM’s OS disk or data disks that are intended to permanently store important data. Remote storage can be detached from the VM from one physical host to another in order to preserve the data. It is stored on Azure’s storage account which VMs use to attach it via the network. The physical Hyper-V host running the VM is independent of the remote storage account, which means that any Hyper-V host or VM can mount a remotely stored disk on more than one occasion.

Remote storage has the critical advantage of being persistent, while local storage has the advantage of being faster (because it’s local to the VM – on the same physical host) and included in the VM cost. When starting an Azure VM you only pay for its OS and disks. Local storage on a VM is included in the price of the VM and often provides a much cheaper storage alternative to remote storage.

The Azure Node Types

D15_v2s

D15_v2s are the latest addition to Azure’s Dv2 series, featuring more powerful CPU and optimal CPU-to-memory configuration making them suitable for most production workloads. The Dv2-series is about 35% faster than the D-series. D15_v2s are a memory optimized VM size offering a high memory-to-CPU ratio that is great for distributed database servers like Cassandra, medium to large caches, and in-memory data stores such as Redis.

Instaclustr customers can now leverage these benefits with the release of the D15_v2 VMs, which provide 20 virtual CPU cores and 140 GB RAM backed with 1000 GB of local storage.

L8_v2s

L8_v2s are a part of the Lsv2-series featuring high throughput, low latency, and directly mapped local NVMe storage. L8s like the rest of the series were designed to cater for high throughput and high IOPS workloads including big data applications, SQL and NoSQL databases, data warehousing, and large transactional databases. Examples include Cassandra, Elasticsearch and Redis. In general, applications that can benefit from large in-memory databases are a good fit for these VMs.

L8s offer 8 virtual CPU cores and 64 GB in RAM backed by local storage space of 1388 GB. L8s provide roughly equivalent compute capacity (cores and memory) to a D13v2 at about ¾ of the price. This offers speed and efficiency at the cost of forgoing persistent storage.

Benchmarking

Prior to the public release of these new instances on Instaclustr, we conducted Cassandra benchmarking for:

VM Type
CPU Cores RAM Storage Type Disk Type
DS13v2 8 56 GB remote 2046 GiB (SSD)
D15v2 20 140 GB local 1000 GiB (SSD)
L8s w/local 8 64 GB local 1388 GiB (SSD)
L8s w/remote 8 64 GB remote 2046 GiB (SSD)

All tests were conducted using Cassandra 3.11.6

The results are designed to outline the performance of utilising local storage and the benefits it has over remote storage. Testing is split into two groups, fixed and variable testing. Each group runs two different sets of payload:

  1. Small – Smaller more frequent payloads which are quick to execute but are requested in large numbers.
  2. Medium – Larger Payloads which take longer to execute and are potentially capable of slowing Cassandra’s ability to process requests

The fixed testing procedure is:

  1. Insert data to fill disks to ~30% full.
  2. Wait for compactions to complete.
  3. Run a fixed rate of operations
  4. Run a sequence of tests consisting of read, write and mixed read/write operations.
  5. Run the tests and measure the operational latency for a fixed rate of operations. 

The variable testing procedure is:

  1. Insert data to fill disks to ~30% full.
  2. Wait for compactions to complete.
  3. Target a median latency of 10ms.
  4. Run a sequence of tests comprising read, write and mixed read/write operations. These tests were broken down into multiple types.
  5. Run the tests and measure the results including operations per second and median operation latency. 
  6. Quorum consistency for all operations.
  7. We incrementally increase the thread count and re-run the tests until we have hit the target median latency and are under the allowable pending compactions.
  8. Wait for compactions to complete after each test.

As with any generic benchmarking results the performance may vary from the benchmark for different data models or application workloads. However, we have found this to be a reliable test for comparison of relative performance that will match many practical use cases.

Comparison

When comparing performance between instance types with local and remote storage it is important to take note of the latency of the operations. The latency of read operations indicate how long a request takes to retrieve information from the disk and return it back to the client.

In the fixed small and medium read tests, local storage offered better latency results in comparison to instances with remote storage. This is more noticeable in the medium read tests, where the tests had a larger payload and required more interaction with the locally available disks. Both L8s (with local storage) and D15_v2s offered a lower latency to match the operation rate given to the cluster.

When running variable-small-read tests under a certain latency, the operation rate for D15v2 reached nearly 3 times the number of ops/s for D13v2s with remote storage. Likewise L8s with local storage out performed L8s (with remote storage). L8s (with local storage) had twice the performance and half the latency of the L8s (with remote storage). 

variable-read-small L8s w/local L8s w/remote DS13_v2 DS15_v2
Operation Rate 19,974 7,198 23,985 61,489
Latency mean (avg) 22.1 48.3 21.9 19.4
Latency medium 20.1 40.3 19.9 17.7
Latency 95th percentile 43.6 123.5 43.3 39.2
Latency 99th percentile 66.9 148.9 74.3 58.9

In the medium read tests, instances with local storage outperformed instances with remote storage. Both L8s (with local storage) and D15s had a far better ops/s and latency result, even with a significantly higher operation rate. This makes a very convincing argument for local storage over remote storage when seeking optimal performance.

variable-read-medium L8s w/local L8s w/remote DS13_v2 DS15_v2
Operation Rate 235 76 77 368
Latency mean (avg) 4.2 13 12.9 2.7
Latency medium 3.2 8.4 9.5 2.5
Latency 95th percentile 4.9 39.7 39.3 3.4
Latency 99th percentile 23.5 63.4 58.8 4.9

Looking at the writes on the other hand, D15s outperformed due to their large pool of CPU cores. While differences were less obvious in the small tests, results were more obvious in the medium tests. Further investigation will be conducted to determine why this is the case.

Benchmarking Cassandra

Based on the graph for variable medium testing, D15s outperformed in all categories. D15v2s have both a larger number of cores to outpace competition with heavy loads of writes and local storage offering faster disk intensive reads. This was additionally supported by strong performance in the mixed medium testing results.

L8s with local storage took second place, performing better than DS13v2s in the read and mixed tests. Whilst DS13v2 nodes slightly edged out L8s in writes for larger payloads. The mixed results showed a substantial difference in performance between the two with L8s taking a lead thanks to local storage providing faster disk intensive reads. 

Conclusion

Based on the results from this comparison, we find that local storage offers amazing performance results for disk intensive operations such as reads. D15v2 nodes, with their large number of cores to perform CPU intensive writes and local storage to help with disk intensive reads, offer top tier performance for any production environment.

Furthermore, L8s with local storage offer a great cost efficient solution at around ¾ of the price of D13v2s and offer a better price-performance gain notably in read operations. This is especially beneficial for a production environment which prioritizes reads over writes.

In order to assist customers in upgrading their Cassandra clusters from currently used VM types to D15v2s or L8 VM type nodes, Instaclustr technical operations team has built several tried and tested node replacement strategies to provide zero-downtime, non-disruptive migrations for our customers. Reach out to our support team if you are interested in these new instance types for your Cassandra cluster.

You can access pricing details through the Instaclustr Console when you log in, or contact our Support team.

The post Benchmarking Cassandra with Local Storage on Azure appeared first on Instaclustr.

Cassandra and Kubernetes: SIG Update and Survey

Five operators for Apache Cassandra have been created that have made it easier to run containerized Cassandra on Kubernetes. Recently the major contributors to these operators came together to discuss the creation of a community-based operator with the intent of making one that makes it easy to run C* on K8s. One of the project’s organizational goals is that the end result will eventually become part of the Apache Software Foundation or the Apache Cassandra project.

The community created a special interest group (SIG) to set goals for what the operator should do at different levels to find a path for creating a standard community-based operator. The Operator Framework suggests five maturity levels for operator capabilities starting from basic installation to auto-pilot.

Operator Capability Maturity Levels

(Source: OperatorFramework.io)

The five Cassandra Kubernetes operators all come from different backgrounds, so the first major goal is to develop a common understanding as to what an operator needs to do and at which level. This first step involves collaborating on a Custom Resource Definition (CRD) that will set the syntax / schema which will be used to create Cassandra clusters on Kubernetes. Once this is done, a software extension can be developed in a variety of languages including Go, Java, or using the Operator SDK in Helm or Ansible without making changes to Kubernetes.

We’re not starting from zero, as the creators of the five operators are actively participating in the SIG. Hopefully much of the decided upon CRD will have code fragments that can be leveraged from the other projects. The major operators out publicly today are those by Sky UK, Orange Telecom, Instaclustr, Elassandra, and DataStax (list sourced from the awesome-cassandra project):

  • CassKop - Cassandra Kubernetes Operator - This Kubernetes operator by Orange automates Cassandra operations such as deploying a new rack aware cluster, adding/removing nodes, configuring the C and JVM parameters, upgrading JVM and C versions. Written in Go. This one was also one of the first ones out and is the only one that can support multiple Kubernetes clusters using Multi-CassKop
  • Cassandra Operator - A Kubernetes operator by SkyUK that manages Cassandra clusters inside Kubernetes. Well designed and organized. This was among the first operators to be released.
  • Instaclustr - Kubernetes Operator for Cassandra operator - The Cassandra operator by Instaclustr manages Cassandra clusters deployed to Kubernetes and automates tasks related to operating an Cassandra cluster.
  • Cass Operator - DataStax’s Kubernetes Operator supports Apache Cassandra as well as DSE containers on Kubernetes. Cassandra configuration is managed directly in the CRD, and Cassandra nodes are managed via a RESTful management API.
  • Elassandra Operator - The Elassandra Kubernetes Operator automates the deployment and management of Elassandra clusters deployed in multiple Kubernetes clusters.

If you’re interested in catching up on what the SIG has been talking about, you can watch the YouTube videos of the sessions and read up on the working documents:

As with any Kubernetes operator, the goal is to create a robot which takes the manual work of setting up complex configurations of containers in Kubernetes easier. An operator can also be seen as a translator between the logical concepts of the software and the concrete Kubernetes resources such as nodes, pods, services. Combined with controllers, operators can abstract out operations such that the human operators can focus on problems related to their industry or domain. As mentioned above, the different operator capability levels offer a roadmap to creating a robust operator for Cassandra users that is easy to use, set up, maintain and upgrade, and expand a cluster.

When a platform needs Cassandra, it’s probably exhausted the other potential datastores available because it needs high availability and fault tolerance, at high speeds, around the world. Kubernetes is a technology that can match well with Cassandra’s capabilities because it shares the features of being linearly scalable, vendor neutral, and cloud agnostic. There is a healthy debate about whether Cassandra belongs in Kubernetes — and whether databases belong in Kubernetes at all — because other orchestration tools are good enough, though the growing user base of Kubernetes in hobby and commercial realms suggests that we need to provide an operator that can keep up with the demand.

Most likely if someone is thinking about moving Cassandra workloads from public cloud, on-premises VMs, or even on-premises bare metal servers to either a public or private cloud hosted K8s, they’ll want to evaluate whether or not the existing architecture could run and be performant.

As part of the SIG, we’re also coming up with reference architectures on which to test the operator. Here are some of the common and most basic reference architectures that are likely candidates.

  • Single Workload in Single Region
    • 1 DCs in 1 region, with 3 nodes (3 total)
    • DC expands to 6 (6 total)
    • DC contracts to 3 ( 3 total)

Single Workload / Datacenter in a Single Region

  • Multi-Workload in Single Region
    • 2 DCs, both in the same region, with 3 nodes in each DC (6 total)
    • Both DCs expand to 6 each (12 total)
    • Both DCs contract to 3 each ( 6 total)
    • Add a third DC in the same region with 3 nodes (9 nodes)
    • Remove third DC

Multiple Workloads / Datacenters in a Single Region

  • Single Workload in Multi-Regions
    • 2 DCs, 1 in each region, with 3 nodes in each DC (6 total)
    • Both DCs expand to 6 each (12 total)
    • Both DCs contract to 3 each ( 6 total)
    • Add a third DC in a 3rd region with 3 nodes (9 total)
    • Remove third DC

Although each organization is different, these scenarios or combinations of these scenarios account for 80% of most pure Apache Cassandra use cases. The SIG would love to know more about Cassandra users’ use cases for Kubernetes. Please take this short survey, which will remain open through September 17, 2020.

Join the biweekly meetings to stay informed.

Apache Cassandra 4.0 Beta Released

The beta release of Apache Cassandra 4.0 is finally here, it’s been two years in the making. We’ve had a preview release available to customers since March for testing. A wide range of improvements have been made.

Stability

The explicit goal of this release has been to be “the most stable major release ever” to accelerate adoption within the release cycle, which I blogged about in January.  For this release a series of new testing frameworks were implemented focusing on stability, and performance, which have paid off handsomely. The feeling of the team at Instaclustr is that we have never been more confident about a release (and we’ve seen quite a few!)

Performance

This release integrates the async event-driven networking code from Netty for communication between nodes. I blogged about Netty in February but it’s worth reiterating what’s been achieved with this upgrade. It has enabled Cassandra 4.0 to have a single thread pool for all connections to other nodes instead of maintaining N threads per peer which was cramping performance by causing lots of context switching. It has also facilitated zero copy streaming for SStables which now goes x5 times faster than before.

This complete overhaul of the networking infrastructure has delivered some serious gains.

  • Tail end latency has been reduced by 40%+ in P99s in initial testing
  • Node recovery time has been vastly reduced
  • Scaling large clusters is easier and faster

Auditing and Observability

Cassandra 4.0 introduces a powerful new set of enterprise class audit capabilities that I covered here in March. These help Cassandra operators meet their SOX and PCI requirements with a robust high level interface. Audit logging saves to the node, outside of the database, with configurable log rollover. Audit logs can be configured to attend to particular keyspaces, commands, or users and they can be inspected with the auditlogviewer utility.

Full Query Logging is also supported and the fqltool allows inspection of these logs.

Virtual Tables

Virtual tables, which I covered here in February, enable a series of metrics to be pulled from a node via CQL from read-only tables. This is a more elegant mechanism than JMX access as it avoids the additional configuration required. JMX access is not going anywhere soon, but this presents a really solid improvement to a number of metric monitoring tasks.

Community

Our community is our most powerful feature in all our releases and I can’t think of a better validation of open source under the Apache Foundation community model than this release. I just want to take this opportunity to congratulate and thank everyone in the community who have taken both Cassandra and its release processes to the next level with this beta release. 

As always, you can spin up a free trial of Cassandra on our platform. Even with the performance gains delivered in this release our popular Cassandra Data Modeling Guide to Best Practices is always worth a read to get the most out of Cassandra.

The post Apache Cassandra 4.0 Beta Released appeared first on Instaclustr.

Introducing Apache Cassandra 4.0 Beta: Battle Tested From Day One

This is the most stable Apache Cassandra in history; you should start using Apache Cassandra 4.0 Beta today in your test and QA environments, head to the downloads site to get your hands on it. The Cassandra community is on a mission to deliver a 4.0 GA release that is ready to be deployed to production. You can guarantee this holds true by running your application workloads against the Beta release and contributing to the community’s validation effort to get Cassandra 4.0 to GA.

With over 1,000 bug fixes, improvements and new features and the project’s wholehearted focus on quality with replay, fuzz, property-based, fault-injection, and performance tests on clusters as large as 1,000 nodes and with hundreds of real world use cases and schemas tested, Cassandra 4.0 redefines what users should expect from any open or closed source database. With software, hardware, and QA testing donations from the likes of Instaclustr, iland, Amazon, and Datastax, this release has seen an unprecedented cross-industry collaboration towards releasing a battle-tested database with enterprise security features and an understanding of what it takes to deliver scale in the cloud.

There will be no new features or breaking API changes in future Beta or GA builds. You can expect the time you put into the beta to translate into transitioning your production workloads to 4.0 in the near future.

Quality in distributed infrastructure software takes time and this release is no exception. Open source projects are only as strong as the community of people that build and use them, so your feedback is a critical part of making this the best release in project history; share your thoughts on the user or dev mailing lists or in the #cassandra ASF slack channel.

Redefining the elasticity you should expect from your distributed systems with Zero Copy Streaming

5x faster scaling operations

Cassandra streams data between nodes during scaling operations such as adding a new node or datacenter during peak traffic times. Thanks to the new Zero Copy Streaming functionality in 4.0, this critical operation is now up to 5x faster without vnodes compared to previous versions, which means a more elastic architecture particularly in cloud and Kubernetes environments.

Globally distributed systems have unique consistency caveats and Cassandra keeps the data replicas in sync through a process called repair. Many of the fundamentals of the algorithm for incremental repair were rewritten to harden and optimize incremental repair for a faster and less resource intensive operation to maintain consistency across data replicas.

Giving you visibility and control over what’s happening in your cluster with real time Audit Logging and Traffic Replay

Enterprise-grade security & observability

To ensure regulatory and security compliance with SOX, PCI or GDPR, it’s critical to understand who is accessing data and when they are accessing it. Cassandra 4.0 delivers a long awaited audit logging feature for operators to track the DML, DDL, and DCL activity with minimal impact to normal workload performance. Built on the same underlying implementation, there is also a new fqltool that allows the capture and replay of production workloads for analysis.

There are new controls to enable use cases that require data access on a per data center basis. For example, if you have a data center in the United States and a data center in Europe, you can now configure a Cassandra role to only have access to a single data center using the new CassandraNetworkAuthorizer.

For years, the primary way to observe Cassandra clusters has been through JMX and open source tools such as Instaclustr’s Cassandra Exporter and DataStax’s Metrics Collector. In this most recent version of Cassandra you can selectively expose system metrics or configuration settings via Virtual Tables that are consumed like any other Cassandra table. This delivers flexibility for operators to ensure that they have the signals in place to keep their deployments healthy.

Looking to the future with Java 11 support and ZGC

One of the most exciting features of Java 11 is the new Z Garbage Collector (ZGC) that aims to reduce GC pause times to a max of a few milliseconds with no latency degradation as heap sizes increase. This feature is still experimental and thorough testing should be performed before deploying to production. These improvements significantly improve the node availability profiles from garbage collection on a cluster which is why this feature has been included as experimental in the Cassandra 4.0 release.

Part of a vibrant and healthy ecosystem

The third-party ecosystem has their eyes on this release and a number of utilities have already added support for Cassandra 4.0. These include the client driver libraries, Spring Boot and Spring Data, Quarkus, the DataStax Kafka Connector and Bulk Loader, The Last Pickle’s Cassandra Reaper tool for managing repairs, Medusa for handling backup and restore, the Spark Cassandra Connector, The Definitive Guide for Apache Cassandra, and the list goes on.

Get started today

There’s no doubt that open source drives innovation and the Cassandra 4.0 Beta exemplifies the value in a community of contributors that run Cassandra in some of the largest deployments in the world.

To put it in perspective, if you use a website or a smartphone today, you’re probably touching a Cassandra-backed system.

To download the Beta, head to the Apache Cassandra downloads site.

Resources:

Apache Cassandra Blog: Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

The Last Pickle Blog: Incremental Repair Improvements in Cassandra 4

Apache Cassandra Blog: Audit Logging in Apache Cassandra 4.0

The Last Pickle Blog: Cassandra 4.0 Data Center Security Enhancements

The Last Pickle Blog: Virtual tables are coming in Cassandra 4.0

The Last Pickle Blog: Java 11 Support in Apache Cassandra 4.0

Apache Cassandra Infographic

Cassandra Data Modeling Best Practices Guide

Apache Cassandra is an open source non-relational, or NoSQL, distributed database that enables continuous availability, tremendous scale, and data distribution across multiple data centers and cloud availability zones. Simply put, it provides a highly reliable data storage engine for applications requiring immense scale.

Data modeling is a process used to analyze, organize, and understand the data requirements for a product or service. Data modeling creates the structure your data will live in. It defines how things are labeled and organized, and determines how your data can and will be used. The process of data modeling is similar to designing a house. You start with a conceptual model and add detail to produce the final blueprint. 

The ultimate goal of Cassandra data modeling and analysis is to develop a complete, well organized, and high performance Cassandra cluster. Following the five Cassandra data modeling best practices outlined will hopefully help you meet that goal:

  1. Cassandra is not a relational database, don’t try to model it like one
  2. Design your model to meet 3 fundamental goals for data distribution
  3. Understand the importance of the Primary Key in the overall data structure 
  4. Model around your queries but don’t forget about your data
  5. Follow a six step structured approach to building your model. 

Cassandra Is Not a Relational Database

Do not try to design a Cassandra data model like you would with a relational database. 

Query first design: You must define how you plan to access the data tables at the beginning of the data modeling process not towards the end. 

No joins or derived tables: Tables cannot be joined so if you need data from more than one table, the tables must be merged into a denormalized table.   

Denormalization: Cassandra does not support joins or derived tables so denormalization is a key practice in Cassandra table design.

Designing for optimal storage: For relational databases this is usually transparent to the designer. With Cassandra, an important goal of the design is to optimize how data is distributed around the cluster. 

Sorting is a Design Decision: In Cassandra, sorting can be done only on the clustering columns specified in the PRIMARY KEY.

The Fundamental Goals of the Cassandra Data Model

Distributed data systems, such as Cassandra, distribute incoming data into chunks called partitions.  Cassandra groups data into distinct partitions by hashing a data attribute called partition key and distributes these partitions among the nodes in the cluster. 

(A detailed explanation can be found in Cassandra Data Partitioning.)

A good Cassandra data model is one that: 

  1. Distributes data evenly across the nodes in the cluster
  2. Place limits on the size of a partition
  3. Minimizes the number of partitions returned by a query.

Distributes Data Evenly Around the Cassandra Cluster

Choose a partition key that has a high cardinality to avoid hot spots—a situation where one or a few nodes are under heavy load while others are idle.   

Limits the Size of Partitions

For performance reasons choose partition keys whose number of possible values is bounded. For optimal performance, keep the size of a partition between 10 and 100MB. 

Minimize the Number of Partitions Read by a Single Query 

Ideally, each of your queries will read a single partition. Reading many partitions at one time is expensive because each partition may reside on a different node. The coordinator (this is the node in the cluster that first receives the request) will generally need to issue separate commands to separate nodes for each partition you request. This adds overhead and increases the variation in latency. Unless the data set is small, attempting to read an entire table, that is all the partitions, fails due to a read timeout. 

Understand the Importance of the Primary Key

Every table in Cassandra must have a  set of columns called the primary key. (In older versions of Cassandra, tables were called column families). In addition to determining the uniqueness of a row, the primary key also shapes the data structure of a table. The Cassandra primary key has two parts:

Partition key: The first column or set of columns in the primary key. This is required. The hashed value of the partition key value determines where the partition will reside within the cluster.

Clustering key (aka clustering columns): Are the columns after the partition key. The clustering key is optional. The clustering key determines the default sort order of rows within a partition.  

A very important part of the design process is to make sure a partition key will: 

  1. Distribute data evenly across all nodes in a cluster.  Avoid using keys that have a very small domain of possible values, such as gender, status, school grades, and the like.  The minimum number of possible values should always be greater  than the number of nodes in the cluster.  Also, avoid using keys where the distribution of possible values is highly skewed. Using such a key will create “hotspots” on the cluster. 
  2. Have a bounded range of values. Large partitions can increase read latency and cause stress on a node during a background process called compaction. Try to keep the size of partitions under 100MB. 

Model Around Your Queries

The Cassandra Query Language (CQL) is the primary language used to communicate with a Cassandra database. In syntax and function, CQL resembles SQL which makes it easy for those who know the latter to quickly learn how to write queries for Cassandra. But there are some important differences that affect your design choices. 

A well known one is that Cassandra does not support joins or derived tables. Whenever you require data from two or more tables, you must denormalize. 

Search conditions have restrictions that also impact the design. 

  • Only primary key columns can be used as query predicates. (Note: a predicate is an operation on expressions that evaluates to TRUE, FALSE).
  • Partition key columns are limited to equality searches. Range searches can only be done on clustering columns.
  • If there are multiple partition key columns (i.e. a composite partition key), all partition columns must be included in the search condition.
  • Not all clustering columns need to be included in the search condition. But there are some restrictions: 
    • When omiting columns you must start with the rightmost column listed in the primary key definition;  
    • An equality search cannot follow a range search.

Don’t Forget About the Data

Creating a complete Cassandra data model involves more than knowing your queries. You can identify all the queries correctly but if you miss some data, your model will not be complete.  Attempting to refactor a mature Cassandra data can be an arduous task. 

Developing a good conceptual model (see below) will help identify the data your application needs. 

Take a Structured Approach to Design 

In order to create a data model that is complete and high performing, it helps to follow a big data modeling methodology for Apache Cassandra that can be summarized as: 

  1. Data Discovery (DD). This is a high level view of the data your application needs and identifies the entities (things), the attributes of the entities, and which attributes are the identifiers. This may be an iterative process as development. 
  2. Identify the Access Patterns (AP).  Identify and list the queries your application will want to perform.  You need to answer: What data needs to be retrieved together, what are the search criteria, and what are the update patterns? This also may be an iterative process. 
  3. Map data and queries (MDQ).  Maps the queries to the data identified in steps 1 and 2 to create logical tables which are high level representations of Cassandra tables.
  4. Create the physical tables (PT).  Convert the logical data model to a physical data model (PDM) by using CQL CREATE TABLE statements. 
  5. Review and Refine physical data model.  Confirm that the physical tables will meet the 3 Basic Goals for Cassandra Data Model.

Structured approach to cassandra data model design

A more detail examination of these steps can be found in an earlier Instaclustr Whitepaper: 6 Step Guide to Apache Cassandra Data Modelling

If you have worked with relational database design, some steps will be familiar because they are also in the entity-relationship (ER) model.  At the conceptual stage, it can be useful to visually represent the data model by ER diagrams using either the Chen or Information Engineering (IE) notation. The Chebotko diagram uses a notation developed by Artem Chebotko to represent data and queries at the logical and physical modeling stages. 

Cassandra Model Example

Let’s assume that we have a simple logging system with two entities: LogSource and LogMessage.  For LogSource the key attribute is sourceName.  For the entity LogMessage, the key attribute is messageID.  

The query we want to execute is:  Q1) show the message information about the 10 most recent messages for a given source. 

The primary access entity is LogSource because it contains the equality search attribute (sourceName).  We create a logical table named LogMessage_by_Source and push the attribute sourceName into it. That becomes the partition key (indicated by the K).

We need to sort by time so messageTime becomes the clustering column in  LogMessage_by_Source.  (Indicated by C↑) 

The secondary entity is LogMessage. The key attribute messageID becomes a 2nd clustering column of the primary key in  LogMessage_By_Source to ensure uniqueness of the row.  Finally, we add the remaining columns from the secondary source to complete the data needed by the query. 

An example of Cassandra data model

Data Duplication 

Data duplication refers to the number of times data must be duplicated in different tables to satisfy access patterns.   For example, if  we wanted to search for a specific message by its  unique identifier we would duplicate the data by creating a new table called LogMessage_by_ID that uses  messageID as the partition key.

Two issues can arise from duplication: 

  • Increased complexity to maintain  data integrity across multiple tables; 
  • If the data being duplicated is very large it puts size and write pressure on the database.

In a case where data duplication would cause more problems than it solves, an alternative is to duplicate only lookup keys, that is a lookup table. However, this solution requires the client perform a second query to read the secondary data. The trade-off between read performance and data maintenance cost needs to be judged in the context of the specific performance requirements of your application and such a solution would need to be benchmarked to ensure that it is a workable solution.

Materialized Views

These are objects created by a query which are copies of a base table but with a different partition key. The data between the materialized view and the base table is automatically synchronized by Cassandra. Their purpose was to make modeling to new query patterns easier and more flexible.  

Instaclustr’s advice is not to use them in Cassandra 3.x because of problems in keeping the view and the base table synchronized. The Apache Cassandra project has classified Materialized Views as an experimental feature for Cassandra 3.x. 

Summary

Cassandra Data modeling is a process used to define and analyze data requirements and access patterns on the data needed to support a business process. 

A data model helps define the problem, enabling you to consider different approaches and choose the best one.  It ensures that all necessary data is captured and stored efficiently. 

Models document important concepts and jargon, proving a basis for long-term maintenance.

Creating a Cassandra is a non-relational database.  Do not design it as you would a relational database. Don’t be afraid to denormalize data. Writes in Cassandra are relatively cheaper than for relational databases.

 The goals of a successful Cassandra Data Model are to choose a partition key that (1)  distributes data evenly across the nodes in the cluster; (2) minimizes the number of partitions read by one query, and (3) bounds the size of a partition.

Take a structured approach to your model. Your first steps are understanding your and identifying access patterns on the data. These are most critical to developing a complete model.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post Cassandra Data Modeling Best Practices Guide appeared first on Instaclustr.

Cassandra Monitoring: A Best Practice Guide

Introduction to Cassandra Monitoring

Apache Cassandra is a NoSQL database designed to provide scalability, reliability, and availability with linear performance scaling. Cassandra database is designed as a distributed system, and aims to handle big data efficiently. Refer to what-is-apache-cassandra and cassandra-architecture for more information. Note that knowledge of Cassandra architecture and basic terminology is a prerequisite to understanding Cassandra monitoring. 

Cassandra monitoring is an essential area of database operations to ensure the good health of a cluster and optimal performance. Alerting is another crucial area for production systems, and it is complementary to monitoring. Good alerting in Cassandra can be achieved by utilization of the monitoring infrastructure and relevant toolset. Alerting and monitoring help create a robust environment for any Cassandra deployment.

This blog post aims to touch all important aspects of Cassandra monitoring. We hope it provides the reader with crucial information about monitoring tools, components, and metrics.

Monitoring Terminologies

JVM Based Monitoring

Cassandra is developed in Java and is a JVM based system. Each Cassandra node runs a single Cassandra process. JVM based systems are enabled with JMX (Java Management Extensions) for monitoring and management. Cassandra exposes various metrics using MBeans which can be accessed through JMX. Cassandra monitoring tools are configured to scrape the metrics through JMX and then filter, aggregate, and render the metrics in the desired format. There are a few performance limitations in the JMX monitoring method, which are referred to later. 

The metrics management in Cassandra is performed using Dropwizard library. The metrics are collected per node in Cassandra. However, those can be aggregated by the monitoring system. 

Cassandra Monitoring

Metrics 

There are a large number of metrics exposed by Cassandra to cover all possible areas including performance, resources, communication, node, and cluster state etc. The metrics are defined with distinct types, and those can be categorized as well for operational ease.    

Metrics Types

Cassandra metrics are defined with specific data types. These types are designed to accommodate metrics representations to represent the metrics like latency, counts, and others correctly. 

The metrics types are not intuitive and you might need some time to get familiar. 

  • Gauge: A single value representing a metric at a specific point in time, e.g. value of memory allocated or a number of active tasks. 
  • Counter: Counters are the same as a gauge but are used for value comparisons. Generally, a counter is only incremented, and it is reset when the functionality gets disrupted like a node restart. An example is cache_hit count.
  • Histogram: Histogram is a count of data elements from a data stream grouped in fixed intervals. A histogram gives a statistical distribution of values. The data elements are provided over min, max, mean, median, 75th, 90th, 95th, 98th, 99th, 99.9th percentile value intervals. 
  • Timer: Timer keeps the rate of execution and histogram of duration for a metric. 
  • Latency: This is a special type to measure latency. It includes Timer and the latency is in microseconds. There is also a TotalLatency with each latency metric. The total latency is the count of latency since the beginning. The beginning means the start of a node. 
  • Meter: Meter is a unit to measure throughput. It also includes a weighted moving average for first, fifth, and fifteenth minute.

Metrics Categories

The metrics are categorised based on Cassandra domains, e.g. table, keyspace, storage, communication, JVM etc. Not all metrics should be monitored all the time, but those should be available in case required, i.e. during troubleshooting or application performance testing. 

The metrics are further subdivided in terms of broader areas like resources, network, internals, crucial data elements etc. Metrics can be represented as per topology levels like cluster level, node level, table level etc. to organize all the information. 

The categorization becomes clear as we go through specific metrics and correlate those with specific Cassandra areas.

Metrics Format

The Cassandra dropwizard metrics are specified in format below:

Dropwizard Metric Name: org.apache.cassandra.metrics.<Metric scope>.<Metric type>.<MetricName>

Mbean: org.apache.cassandra.metrics:type=<Metric type> scope=<Metric scope> name=<MetricName>

Metric Type: This is the category of metrics e.g. table, keyspace, threadpool. Do not confuse this with the data type of metrics.

Metric scope: This is the metric sub type for more granularity wherever required. The scope is hence optional. E.g. the table name or keyspace name. 

Metric name: The final metric name like LiveSSTableCount. 

Essential Metrics

Cassandra Metrics

Node Status 

The status of nodes must be monitored and alerted immediately if a node is down. Cassandra cluster’s availability directly depends on the uptime of all the nodes in the cluster. Although the anti-entropy mechanism in Cassandra helps protect data from inconsistency, there is no replacement for lost performance during a node downtime. A down node puts pressure on other nodes in the data center to handle requests and store hints. Hence, downtime for a node should be minimum. 

Cassandra operational activity requires node restart or downtime but those can be scheduled at least busy times for the cluster. This alert helps keep track of any service disruption and the need to run repair a node. A node should be repaired if it is out of the cluster for more than the hinted handoff window which is three hours by default. 

Client Request Metrics

The client requests metrics provide information about client communication in forms of read and write requests per second between the client and a coordinator node. Other than normal read and write requests, there are special types of read and write operations CAS, View, and RangeSlice which have their own set of metrics. These metrics help to track the request count, latency, failures, and a few other statistics. The basic statistic to monitor is the number of requests per seconds, i.e. throughput and request latency.

Requests Per Second

The number of requests should be aggregated per data center and per node. There could be some nodes receiving more requests as compared to other nodes. This behaviour creates extra pressure for the nodes receiving more requests. The specific requests like CAS and RangeSlice should be tracked separately for clarity. These operations are resource-intensive and have a unique effect on the nodes. The ratio of read requests to write requests is crucial to understand the type of workload. There are specific configurations to optimize a read-heavy or a write-heavy workload. 

Each cluster can handle a certain amount of client requests per second efficiently. If the number of requests exceeds the cluster capacity, it can result in undesirable results like dropped messages, inconsistency, increased latency etc. The CAS and RangeSlice request can cause increased latency. 

Uneven load on a few nodes can be handled with optimal load balancing at the driver side. The read and write latency or throughput issues caused by constant overloading should be addressed by adding more nodes to the data center and revisiting the data model if required.

Alerting: Set alerts on the number of requests threshold served per node and data center. 

Client Request Latency

Latency tracked by these metrics is the read and write latency experienced by client applications. There are various percentiles of latency, as mentioned in latency metric type. These metric types should be tracked separately as well as overall values so that there is a clear view of system performance metrics. Production systems generally have latency SLAs. The SLA on a specific or overall latency should be tracked and alerted upon the client latency.

There are various factors which affect latency including, the amount of load served by a node or cluster, system resources and tuning, GC settings and behaviour, type of requests. Troubleshooting latency issues mainly depends on the accurate investigation of the root cause. Correlating latency metrics with other metrics helps to track down root causes. Using a graph solution like Grafana for visualization is the most efficient way to sight and track issues.

Alerting: Set alerts for latency SLA thresholds if any or expected latency range.

Request Timeout and Failure 

These metrics are the number of client requests timed out or failed. Failed requests are a clear indication of errors, and those should be addressed immediately. The common causes for request failure are unavailability of data, failure to get a response from the required number of replicas, data inconsistency, and network error. Troubleshooting for error is performed using the error messages and other metrics correlation. 

Alerting: Set alerts for more than a few failure requests on production systems.

Compaction Statistics 

This group of metrics include the amount of data compacted, the number of active/completed compactions, and other relevant details. Compactions consume node resources and could consume the disk space quickly. Monitoring compactions provides a good insight into the compaction strategy used as each strategy has a unique operational footprint. Specific Cassandra operations like repairs, high volume data writes, add/remove/replace nodes etc. increase the compaction activity. It is important to monitor the compactions while performing such operations. 

A common troubleshooting method for high compaction activities and high resource consumption is to throttle the compaction rate. In some scenarios, compactions can be temporarily stopped, but it requires a lot of caution and must be re-enabled at some point to keep the SSTable count low, and read latency optimal.

Alerting: Alerting is not essential for these metrics. However, alerts can be set if there are a higher number of pending compactions sustained for longer than expected time interval.

Garbage Collector Metrics

The Garbage Collector (GC) is yet another crucial area for monitoring. The efficiency of Cassandra throughput and performance depends on the effective use of JVM resources and streamlined GC. The GC behaviour mainly depends on these factors—the garbage collector used, the workload served by Cassandra nodes, GC parameter settings, the heap size for JVM etc. A common issue with garbage collection is long GC pause or the time taken to perform garbage collection. 

The GC works well with the default settings by Cassandra, but those can be tuned if required to suit a specific workload and the number of resources. GC parameter tuning is a non-trivial task and requires knowledge of GC internals. However, sometimes the GC can be resolved by fixing the data model, changing the workload, or JVM resources. It is essential to correlate bad GC behaviour with the exact root cause before performing a remedy. Also, any change in parameters impacting GC should be monitored carefully to ensure improvements. 

Alerting: Set alert on GC pauses for more than acceptable thresholds on production systems.

Memory Metrics

The memory metrics provide JVM heap, non-heap, and total memory used by Cassandra. The JVM heap storage is used heavily for a variety of purposes by Cassandra. The non-heap memory is also used a lot by later versions of Cassandra. Monitoring the heap and overall memory gives insight into memory usage. It can be used to correlate with any issues and determine memory requirements. 

Please note, Cassandra cannot scale with an indefinite amount of memory. This boils down to the fact that JVM and GC cannot perform optimally for large heap size. The most common range of heap size for Cassandra is 8GB-32GB where the smaller size is configured with CMS GC and the larger size with G1GC.

Alerting: Set alerts to test specific memory thresholds and tuning.  

Threadpool Metrics

Cassandra works with numerous thread pools internally. This design is aimed to achieve asynchronous tasks, and it also helps to handle back pressure. Monitoring for the thread pools makes it easy to understand the internal system behaviour. It also helps to understand  specific pools under pressure with active, pending, and blocked tasks. 

The solution for constantly saturated pools generally is to provide more processing capacity to the node or the cluster. Other core issues like poor data model and query pattern also impact on the thread pools. 

Alerting: Set alerts for more than a few blocked tasks on the production system. This helps take preventive action to help avoid performance impact.

Table Metrics 

Table metrics are useful in tracking each table independently. These can be used to monitor a specific set of tables which are performance-critical or host a large volume of data. There are various metrics for each table but some of the most important are discussed here:  

Partition Size

The partition size is a crucial factor in ensuring optimal performance. Cassandra uses partitions of data as a unit of data storage, retrieval, and replication. Hence, if the partition size is larger it impacts overall performance. The ideal range of partition size is less than 10MB with an upper limit of 100MB. These values are derived from operational experience from the Cassandra community. 

The data model and table definition control the partition size. The partition key for a table determines the data to create partitions. A partition key should be designed to accumulate data only up to acceptable size limits. Unfortunately, it is not easy to replace current partitions for a table. But, if the data model is in the design phase, it is crucial to test all the table definitions for potential large partitions sizes. In the existing tables, if large partitions are a major issue, they can be addressed by complete data rewrite. This operation could be long-running, but it can solve many performance issues, and if configured correctly, it can be performed without minimal or no downtime for the table. 

Alerting: Configure alerts on large partitions for tables with unbounded partitions. An unbounded partition is where the partition grows in size with new data insertion and does not have an upper bound.

Tombstone Scanned

Tombstones are the deletion markers in Cassandra. Tombstones are produced by data deletion, and it could be performed using various means like delete queries, TTL expiry, null inserts etc. The immutable design of SSTables and compaction operations makes tombstone eviction difficult in some scenarios. Tombstone presence directly impacts read performance; its effect increases with the number of tombstones scanned per operation. This metric provides a histogram of tombstones read for a table’s queries in recent time. 

The troubleshooting for tombstone eviction can be performed using various options like revisiting the compaction strategy, major compaction, nodetool garbagecollect etc. Note that all the mentioned remedies for tombstone eviction could operate on a large set of SSTables and are non-trivial operations. The operations must be well tested before executing on production. 

Alerting: Set alerts for tombstones-scanned per read metrics for performance-sensitive tables. 

SSTable Per Read

These metrics are related to the immutable design of SSTables and read operation. The SSTables are created per table, and the data is arranged sequentially in the order it is written. This results in multiple SSTable reads to complete a single read operation. The number of SSTables read contributes to the time consumed to complete the read operation. Hence, the number of SSTables per read should be minimized. 

A good number of SSTables per read is a relative value and depends on the data volume and compaction strategy. However, as a general rule, those should be less than 10. The compaction strategy used for a table plays a crucial role in this metric. A table should be configured with optimum compaction strategy as per the table usage. Repair operation plays a role in keeping the SSTables consistent and hence also indirectly impacts this metric. All the data in Cassandra should ideally be repaired once per gc_grace_seconds cycle. 

Alerting: Set alerts for all the read performance-sensitive and high data volume tables for SSTables per read. 

Additional Metrics

It is difficult to cover all the metrics present in Cassandra in this blog post, and it is also difficult to predict the most useful ones in general. I have tried to cover the most used metrics individually. But there are still some crucial metrics which are useful for getting insight in specific Cassandra areas. Let’s look at those briefly:

Dropped Messages

Cassandra handles many forms of messages corresponding to various functions. These messages can get dropped mostly due to load or communication error etc. The dropping of messages causes data inconsistency between nodes, and if those are frequent, it can cause performance issues. It is necessary to identify the cause of dropped messages. If those occur frequently or if those are in large numbers, the system resources and data model should be revisited. Alerts should be set for an unexpected occurrence or number of dropped messages. 

Caches For Tables

Cassandra uses quite some cache, and those are configurable. The cache metrics are useful to track effective use of a particular cache. A good example is the use of row cache for frequently accessed rows in a table. If caching hot data in row cache improves the cache hits, it is a successful use of the row cache. 

Data Streaming

Streaming is used while booting up new nodes, repair operations, and during some other cluster operations. Streaming operations can move many data across a cluster and hence consume network bandwidth. The streaming metrics are useful for monitoring node activities and repairs when planned. The streaming rate can be controlled if required to spare the bandwidth for operations.

Hinted Handoff 

Hints are a part of the anti-entropy mechanism, and those try to protect nodes from data loss when those are offline. Hints are stored and transferred, so metrics related to these attributes and delivery success, failure, delays, and timeouts are exposed. 

The hints metrics are useful to monitor all hints activities. A lot of hints stored and used indicate nodes being offline where hint delays, failures indicate a network or other communication issues.

CQL and Batch 

CQL metrics include the number of statements executed of each type. The batch metrics include the number of batch statements executed. These metrics help to monitor the application activity and query semantics used. Use of logged and unlogged batches has its caveats in Cassandra, and they can cause performance penalty if not used correctly. 

System Metrics

These metrics are not exported by Cassandra but those are obtained from the OS. These metrics are equally important as the Cassandra metrics to obtain system insights. 

Disk Usage

The disk usage is subject to monitoring as Cassandra is optimized to write a lot of data in quick time. The real risk for disk fillup is from compactions. The default compaction strategy used for Cassandra is SizeTieredCompactionStrategy STCS. This strategy merges many SSTables and outputs a single SSTable. The resulting SSTable can have a size equal to the combined size of all the SSTables merged in it. Also, until a compaction operation ends, both old and new SSTables exist on disk. 

The disk space guidelines for a cluster with most tables using STCS is to utilise the disk space up to 50% and to leave the rest as a room for compactions. Generally, disk space is cheaper in cost as compared to other resources and there is no harm to keep vacant space on nodes. However, if there is limited disk space available, disk monitoring becomes even more crucial as free disk left for compactions can be reduced further than general guidelines. 

Remedy for high disk usage includes snapshot deletion as those can consume a considerable amount of space. Another method is to stop specific compaction operation; this frees space consumed by the new SSTables. The time until the compaction starts again can be utilizd to add more space. 

Alerting: Set alerts for various stages of disk usage. The alerts can be categorized for severity based on the amount of free disk space on a node. 

CPU Usage

CPU capacity in a Cassandra cluster contributes as the main processing capacity. The number of requests served by a node and the amount of data stored are the factors directly proportional to the CPU utilization. CPU utilization should be monitored to ensure the nodes are not overloaded. 

A Cassandra cluster or a single data center should have all the nodes of similar size. Those should have an equal number of CPU cores, and the CPU utilization should also be equivalent. A single node or a few nodes with high CPU is an indication of uneven load or request processing across the nodes. It is observed that Cassandra is not CPU bound in most cases. However, a cluster or data center with high CPU utilization at most times should be considered for node size upgrade. 

Alerting: Set alerts for specific levels of CPU utilization on nodes or just for a single threshold. The levels can be defined as per expected CPU load, e.g. 80%, 90%, >95% etc. 

Monitoring tools

There are various tools available to set up Cassandra monitoring. I am describing here a few popular open-source tools used widely across the Cassandra community.

Prometheus

Prometheus is a metrics tool used for handling time-series based monitoring. It has alerting capability as well, which works on the time-series metrics. Prometheus can be configured to collect Cassandra metrics from nodes as well as the system metrics of the nodes. Prometheus uses exporters which are installed on the nodes and export data to Prometheus.  

Prometheus runs with a time-series database to store metrics. The metrics are stored in the database and can be queried using promQL, a query language for Prometheus. Prometheus also runs a web UI which can be used to visualise the actual metrics, graphs, alert rules etc. 

Alertmanager is the extension used for configuring alerts. Alertmanager has various integrations available for alerting including email, slack, hipchat, pagerduty etc. Prometheus has evolved over time, and it integrates well with the dropwizard metrics library. 

Prometheus - time-series based cassandra monitoring

Grafana

Grafana is a visualisation tool which can be used to visualize any time-series metrics. Grafana has various panels to showcase the data. The most commonly used panel is a graph. A graph is used to plot incoming data against a time-series in two dimensions. 

Grafana integrates with various data sources. These sources are queried in real-time by Grafana to obtain metrics. Each Grafana panel has one or more queries configured to query a data source; the result of the query is rendered on the panel. Grafana uses Prometheus as a well-integrated data source.

Grafana - Time series metrics visualization

Cassandra Exporter

Cassandra exporter is Instaclustr’s open-source solution for collecting Cassandra metrics efficiently. It is designed to integrate with Cassandra JVM and collect and publish metrics. Hence, Cassandra exporter is a replacement for the JMX metrics. 

JMX metrics in Cassandra have performance limitations and hence can cause some issues if used on systems with a large number of nodes. The Cassandra exporter has been well tested for optimal performance monitoring. The metrics produced by Cassandra exporter are also time-series and can be readily consumed by Prometheus. Please refer to the github page for information regarding configuration and usage. 

Conclusion

Cassandra monitoring is essential to get insight into the database internals. Monitoring is a must for production systems to ensure optimal performance, alerting, troubleshooting, and debugging. There are a large number of Cassandra metrics out of which important and relevant metrics can provide a good picture of the system. 

Finally, Instaclustr has the Cassandra monitoring expertise and capability with various options. 

  • Cassandra exporter is an excellent open source tool for optimal monitoring performance on large Cassandra clusters. 
  • Instaclustr Cassandra managed service uses a comprehensive monitoring-alerting service with 24×7 support and it is a good option to outsource all Cassandra operations and it comes with a free trial.

Instaclustr Cassandra Consulting services can help you with any monitoring or other Cassandra operations.

The post Cassandra Monitoring: A Best Practice Guide appeared first on Instaclustr.

Building a Low-Latency Distributed Stock Broker Application: Part 3

In the third blog of the  “Around the World ” series focusing on globally distributed storage, streaming, and search, we build a Stock Broker Application. 

1. Place Your Bets!

The London Stock Exchange 

How did Phileas Fogg make his fortune? Around the World in Eighty Days describes Phileas Fogg in this way: 

Was Phileas Fogg rich? Undoubtedly. But those who knew him best could not imagine how he had made his fortune, and Mr Fogg was the last person to whom to apply for the information.

I wondered if he had made his fortune on the Stock Market, until I read this:

Certainly an Englishman, it was more doubtful whether Phileas Fogg was a Londoner. He was never seen on ‘Change, nor at the Bank, nor in the counting-rooms of the “City“‘

Well, even if Fogg wasn’t seen in person at the ‘Change (London Stock Exchange), by 1872 (the year the story is set), it was common to use the telegraph (the internet of the Victorian age, which features regularly in the story) to play the market.

In fact the ability of the telegraph to send and receive information faster than horses/trains/boats etc. had been used for stock market fraud as early as 1814! (The “Great Stock Exchange Fraud of 1814”). Coincidentally (or not?), the famous London Stock Exchange Forgery, also involving the telegraph, also occurred in 1872! Perhaps this explains the ambiguity around the origin of Fogg’s wealth!

What is certain is that Phileas Fogg became the subject of intense betting, and he was even listed on the London Stock Exchange (Chapter V – IN WHICH A NEW SPECIES OF FUNDS, UNKNOWN TO THE MONEYED MEN, APPEARS ON ‘CHANGE):

Not only the members of the Reform, but the general public, made heavy wagers for or against Phileas Fogg, who was set down in the betting books as if he were a race-horse. Bonds were issued, and made their appearance on ‘Change; “Phileas Fogg bonds” were offered at par or at a premium, and a great business was done in them. But five days after the article in the bulletin of the Geographical Society appeared, the demand began to subside: “Phileas Fogg” declined. They were offered by packages, at first of five, then of ten, until at last nobody would take less than twenty, fifty, a hundred!”

The 1870’s also saw the introduction of a new technological innovation in Stock Trading, the Stock Ticker Machine. Stock tickers were a special type of telegraph receiver designed to print an alphabetical company symbol and the current price of that company’s stock on a paper roll called ticker tape. This enabled stock prices to be communicated closer to real-time across vast distances, and revolutionized trading. 

Vintage Stock Ticker Machine (Source: Shutterstock)

2. Let’s Build a Stock Broker Application

Fast forward 128 years from 1872 to 2000 and technology looked a bit different. I’m taking inspiration from an earlier project I worked with from 2000-2003 at CSIRO (Australia’s national science research agency) called “StockOnline”. This was an online Stock Broker application designed to benchmark new component-based middleware technologies, including Corba and Enterprise Java (J2EE). The original version simulated traders checking their stock holdings and current stock prices, and then buying and selling stocks, resulting in revised stock holdings. The benchmark could be configured with different workload mixes, and the number of concurrent traders could be scaled up to stress the system under test. Metrics captured included the relative number, throughput, and response time of each of the operations. 

Some of the technology innovations that the project was designed to give insights into included: 

  • the use of application servers to provide easy to manage and scalable container resourcing (yes, containers are at least 20 years old); 
  • how portable the application was across multiple different vendors application servers (sort of);
  •  the impact of JVM choice and settings (lots); 
  • explicit support for component configuration (e.g. wiring components together); and 
  • deployment into containers, rich container services, and multiple persistence models to manage state and allow database portability (e.g. Container Managed Persistence vs. Bean Managed Persistence). 

At the end of the project we made the StockOnline code and documentation open source, and I recently rediscovered it and made it available on github. I was surprised to learn that Enterprise Java is still going strong and is now run by the Eclipse Foundation and called “Jakarta EE”.  Also interesting is that there is support for persistence to Cassandra

So let’s move on to the present day.

3. High-Frequency Low-Latency Trading

Modern stock markets are fast-moving, ultra-competitive environments. Orders are sent to the market by high speed networks and executed almost instantly. This makes low-latency trading a key to profitability.  Trade related latency is any delay in the time it takes for a trader to interact with the market, and includes distance related network delays, delays in receiving and acting on information, and delays caused by brokers (e.g. queuing of orders, delays interacting with the stock exchange to trade the orders, etc.). Some of the key solutions to reducing latency are broker side hosting of orders (orders are hosted on brokers and automatically traded when conditions are met), and Direct Market Access (brokers are as close as possible to stock exchanges, with super-fast network connections).

A new type of US Stock Exchange (IEX) was even created to address some of the issues around fairness of stock trading due to latency. Some brokers are able to take advantage of even small latency differences – “price snipping”, or so called “dark pools” which fill orders from within a pool rather than via public stock exchanges, to make huge profits. Although, somewhat oddly, the IEX levelled the playing field by introducing delays to ensure that no one playing the market has more up-to-date information than anyone else.

Latency is partially caused by the global location of stock exchanges. Where are stock exchanges located?  There are 60 exchanges around the world on every continent with a total value of $69 Trillion, and 16 worth more than $1 Trillion each!

4. Initial Application Design

I had already started writing a new version of StockOnline before I rediscovered the original, so the new version doesn’t share any of the original code. However, it does turn out to have similar entities, but with some significant differences to model multiple StockExchanges and Brokers.  Here’s the UML Class diagram of my new prototype code:

The first major difference is that it’s designed to model and simulate distributed stock brokers across multiple global “georegions”. We introduced the concept of georegions in blog 1 (called “latency regions”) and blog 2 (called “georegions”). A georegion is a geographic region that has at least two AWS regions (for data center redundancy), and ensures that applications within the same georegion are within 100ms latency of each other and users in the same georegion in case of failure of one region.  Here’s the map from the previous blogs showing the eight sub 100ms latency georegions that we identified (North America, South America, Atlantic, Europe, Middle East, Central Asia, East Asia, Australasia):

This means that I have to explicitly model multiple StockExchanges. Each StockExchange is in a particular location in a georegion.  Each StockExchange is responsible for listing some stocks, providing meta-data about the stocks,  publishing changes to stock prices as StockTickers, and matching buy/sell orders (i.e. executing trades).  For simplicity we assume that each stock is only listed on a single StockExchange. 

Each georegion has one or more StockBrokers which are co-located in the same georegion as some StockExchanges to ensure low-latency (and potentially redundancy).  The StockBrokers are responsible for discovering StockExchanges, all the stocks listed for trading, obtaining StockTickers to update current price data, and computing trends and longer-term Stock Statistics that inform traders making decisions about buying and selling. They are also responsible for keeping track of trader data, updating StockHoldings for Traders, keeping track of orders and trading them on the appropriate StockExchanges, and keeping records of trades (transactions). Also different to the original version (which only had a single Market Order type), I wanted to have multiple different order types including Market, Limit and Stop Orders. This is important for the latency story as market orders are traded “manually” and immediately, but Limit and Stop Orders are traded automatically when they meet specific conditions, so can be traded very quickly and in larger volumes, this is a good explanation).

We assume that traders connect to their nearest StockBroker (to reduce latency and possibly to satisfy local financial rules). There is a series of operations supported by StockBrokers for traders, and also for interacting with the StockExchanges as follows.  First let’s look at the workflow for trading Market Orders, “Place Market Order”. These are essentially synchronous and immediate trades. The trader connects to the nearest broker, gets their current StockHoldings and current StockStatistics (for their holdings and possibly for other stocks they don’t currently own). Based on this information they decide what stocks to trade, whether to buy or sell, and the quantity of stocks, and create a Market Order. The broker then processes the Market Order (which may involve sending it to another broker), and receives confirmation that the trade occurred (including price, quantity, transaction ID etc.), and finally updates the trader’s StockHoldings for the stock traded. 

The steps to “Process Market Order” are as follows.  The order is sent to a broker in the same Georegion as the StockExchange listing the stock. This broker then immediately executes the trade (buys or sells) with the StockExchange, gets the transaction details, marks the order as filled (so it isn’t processed more than once), and updates the StockHolding amounts for the trader. 

The “Execute Trade with StockExchange” assumes that a trade is always possible (at the current price) and will occur instantaneously and completely, and has the following substeps:

Market Orders are potentially a slow process due to all the synchronous steps, “think time” for the trader, and cumulative latencies due to the trader to broker, broker to broker, and broker to StockExchange communication paths.

As an alternative we also provide some asynchronous Order types: Limit and Stop. These order types are only traded when the conditions are met, but then need to be executed as quickly as possible to prevent losses in a fast moving market.

We assume that the initial steps are mostly the same as “Place Market Order”, but with the added choice of Limit of Stop Order, and the limit price and the final step (notification of eventual Trade) is asynchronous:

Once the Order placed, it is processed by sending it to the correct broker (as for Market Orders), and then that broker is responsible for continuously checking orders to see if they match:

This is done as follows (“Trade Matching Orders”) and relies on each broker receiving a stream of StockTicker updates from the StockExchanges in the same georegion. For each StockTicker the broker finds orders for that stock, and checks which orders meet the buy/sell conditions (the logic depends on the order type, if the price is rising or dropping, and if the current price is less than or greater to the order limit price). If the matching order(s) are Sell Orders then an extra step is to check that the trader still has sufficient holdings of that stock (they may have already sold some stock due to other orders being executed first). If all conditions are met then the broker initiates an immediate “Market” trade with the StockExchange as before.

The initial prototype application code is written in pure Java and just simulates the expected behaviour of the traders, brokers, and StockExchanges. It creates a specified number of georegions, brokers, StockExchanges, stocks, and traders with initial data. Then the simulation runs lots of rounds (seconds) for a specified period (e.g. a day).  Each round results in traders checking their holdings and StockStatistics, and creating orders (about ⅓ of each type, but only if the type matches the specific stock and market conditions). The orders are sent to the correct brokers. Each round the brokers receive simulated StockTickers from StockMarkets in the same georegion (using a pseudo-random walk which keeps the stock direction for the majority of the time, but occasionally changing direction). Some stocks are more volatile than others so change faster.  Each round the brokers immediately Trade Market Orders, and check the conditions and trade matching Limit or Stop Orders. 

5. Initial Simulation Results—“Time Is Money”!

The simulation computes business level metrics including number of trades, value of trades, etc., and expected end-to-end latency based on deploying brokers in 8 georegions, and using the AWS inter-region latencies from blog 1 of this series. This gives us a baseline to eventually compare the real results with. The average latency to get the current stock price from a remote StockExchange and “Process Market Orders” is 700ms (max 1.2s), which includes multiple times for intra-region and inter-region networking. The average latency for Limit and Stop “Trade Matching” Orders is shorter at 100ms (max 200ms), as it only includes times to get StockTicker updates and the time to trade;.  i.e. it doesn’t include any AWS inter-region latencies as the operation is asynchronous and processed entirely within the georegion of the broker/StockExchange (we expect this to be slightly higher in practice due to the eventual overhead of database lookups and condition checking on the broker).

So is the saying “Time Is Money!” true in the context of low latency trading, and how much money exactly? I added a calculation to the simulation to compute potential profit losses assuming high volatility in the prices of some stocks, and higher latency times to trade. Due to potentially high volumes of trades even a small profit loss per trade can add up to big losses in profit very quickly.  For one simulation run with 2,101 completed trades, the potential profit loss for the higher latency Market Orders was 0.7% of the trade value (or Market Orders), but for the lower latency Limit and Stop Orders it was significantly less at 0.1% of the trade value (for those order types). For an average order size of $20,000 this corresponds to a $140 profit loss per Market Order, compared with only $20 profit loss for each Limit and Stop Order. Over hundreds or even thousands of trades per day (typical of HFT) this would quickly add up to significant amounts of money! Moreover, to make a profit, High Frequency Trading (HFT) relies on conducting a high volume of trades to rapidly take advantage of very small movements in prices, with potentially smaller profits per trade. So it’s easy to see why minimizing latency is a worthwhile goal in Fintech applications such as this. 

6. What Next?

In the next few blogs we’ll continue our journey “Around the World” and explore how to refine the initial simple design of the application so as to deploy and run it across multiple georegions using multiple AWS regions.

Initially this will involve mapping the components to Cassandra tables on a single data center. Once it’s working correctly with a single Cassandra data center, we’ll extend it to use multiple Cassandra data centers, which will require the use of multiple keyspaces for different replication topologies (e.g. replication across pairs of data centers vs. all data centers). We’ll also work out if, and how, to load-balance the application across multiple data centers in the same georegions, and how to enable redundancy, failover, and recovery at the application level.  It’s possible that a Kubernetes federated microservices mesh framework will help in doing this. We also plan to put Kafka to use to enable streaming StockTickers, so we’ll be investigating Kafka multi data center replication. 

7. Further Resources

IBM also has a StockTrader demonstration application, and an in-depth series about deploying it using Cassandra, Kafka, Redis, and Kubernetes.

There’s an example of stock analysis using Elasticsearch (I’m planning on using Elasticsearch to analyse the stock trends, and provide some map-based visualisation of the data).

This is an interesting article on “Hacking a HFT System”!

The Original CSIRO StockOnline code and documentation is now on github.

The post Building a Low-Latency Distributed Stock Broker Application: Part 3 appeared first on Instaclustr.

A Comprehensive Guide to Cassandra Architecture

Introduction

The Apache Cassandra architecture is designed to provide scalability, availability, and reliability to store massive amounts of data. If you are new to Cassandra, we recommend going through the high-level concepts covered in what is Cassandra before diving into the architecture.  

This blog post aims to cover all the architecture components of Cassandra. After reading the post, you will have a basic understanding of the components. This can be used as a basis to learn about the Cassandra Data Model, to design your own Cassandra cluster, or simply for Cassandra knowledge.

Cluster Topology and Design

Cassandra is based on distributed system architecture. In its simplest form, Cassandra can be installed on a single machine or in a docker container, and it works well for basic testing. A single Cassandra instance is called a node. Cassandra supports horizontal scalability achieved by adding more than one node as a part of a Cassandra cluster. The scalability works with linear performance improvement if the resources are configured optimally.

Cassandra works with peer to peer architecture, with each node connected to all other nodes. Each Cassandra node performs all database operations and can serve client requests without the need for a master node. A Cassandra cluster does not have a single point of failure as a result of the peer-to-peer distributed architecture. 

Nodes in a cluster communicate with each other for various purposes. There are various components used in this process:

  • Seeds: Each node configures a list of seeds which is simply a list of other nodes. A seed node is used to bootstrap a node when it is first joining a cluster. A seed does not have any other specific purpose, and it is not a single point of failure. A node does not require a seed on subsequent restarts after bootstrap. It is recommended to use two to three seed nodes per Cassandra data center (data centers are explained below), and keep the seeds list uniform across all the nodes. 
  • Gossip: Gossip is the protocol used by Cassandra nodes for peer-to-peer communication. The gossip informs a node about the state of all other nodes. A node performs gossip with up to three other nodes every second. The gossip messages follow specific format and version numbers to make efficient communication.

A cluster is subdivided into racks and data centers. These terminologies are Cassandra’s representation of a real-world rack and data center. A physical rack is a group of bare-metal servers sharing resources like a network switch, power supply etc. In Cassandra, the nodes can be grouped in racks and data centers with snitch configuration. Ideally, the node placement should follow the node placement in actual data centers and racks. Data replication and placement depends on the rack and data center configuration. 

Cluster subdivided into Racks and Data centers

Multiple Data Centers

A rack in Cassandra is used to hold a complete replica of data if there are enough replicas, and the configuration uses NetworkTopologyStrategy, which is explained later. This configuration allows Cassandra to survive a rack failure without losing a significant level of replication to perform optimally. 

There are various scenarios to use multiple data centers in Cassandra. Few common scenarios are:

  • Build a Cassandra cluster with geographically distinct data centers which cater to clients from distinct locations, e.g.a cluster with three data centers in US, EU, and APAC serving local clients with low latency.
  • Separate Cassandra data centers which cater to distinct workloads using the same data, e.g. separate data centers to serve client requests and to run analytics jobs.
  • Active disaster recovery by creating geographically distinct data centers, e.g. a cluster with data centers in each US AWS region to support disaster recovery.

Database Structures

Cassandra stores data in tables where each table is organized in rows and columns same as any other database. Cassandra table was formerly referred to as column family. Tables are grouped in keyspaces. A keyspace could be used to group tables serving a similar purpose from a business perspective like all transactional tables, metadata tables, use information tables etc. Data replication is configured per keyspace in terms of replication factor per data center and the replication strategy.  See the replication section for more details.

Each table has a defined primary key. The primary key is divided into partition key and clustering columns. The clustering columns are optional. There is no uniqueness constraint for any of the keys.

The partition key is used by Cassandra to index the data. All rows which share a common partition key make a single data partition which is the basic unit of data partitioning, storage, and retrieval in Cassandra.  

Refer to cassandra-data-partitioning for detailed information about this topic. 

Partitioning

A partition key is converted to a token by a partitioner. There are various partitioner options available in Cassandra out of which Murmur3Partitioner is used by default. The tokens are signed integer values between -2^63 to +2^63-1, and this range is referred to as token range. Each Cassandra node owns a portion of this range and it primarily owns data corresponding to the range. A token is used to precisely locate the data among the nodes and on data storage of the corresponding node.  

It is evident that when there is only one node in a cluster, it owns the complete token range. As more nodes are added, the token range ownership is split between the nodes, and each node is aware of the range of all the other nodes. 

Here is a simplified example to illustrate the token range assignment. If we consider there are only 100 tokens used for a Cassandra cluster with three nodes. Each node is assigned approximately 33 tokens like: 

 node1: 0-33 node2: 34-66 node3: 67-99. 

 If there are nodes added or removed, the token range distribution should be shuffled to suit the new topology. This process takes a lot of calculation and configuration change for each cluster operation. 

Virtual nodes/Vnodes

To simplify the token calculation complexity and other token assignment difficulties, Cassandra uses the concept of virtual nodes referred to as Vnodes. A cluster is divided into a large number of virtual nodes for token assignment. Each physical node is assigned an equal number of virtual nodes. In our previous example, if each node is assigned three Vnodes and each Vnode 11 tokens: 

 v1:0-9, v2:10-19, v3:20-29 so on 

 Each physical node is assigned these vnodes as:

 node1: v1, v4, v7 node2: v2, v5, v8 node3: v3, v6, v9 

Virtual Nodes or Vnodes

The default number of Vnodes owned by a node in Cassandra is 256, which is set by  num_tokens property. When a node is added into a cluster, the token allocation algorithm allocates tokens to the node. The algorithm selects random token values to ensure uniform distribution. But, the num_tokens property can be changed to achieve uniform data distribution. The number of 256 Vnodes per physical node is calculated to achieve uniform data distribution for clusters of any size and with any replication factor. In some large clusters, the 256 Vnode do not perform well please refer blog cassandra-vnodes-how-many-should-i-use for more information.

Replication

The data in each keyspace is replicated with a replication factor. The most common replication factor used is three. There is one primary replica of data which resides with the token owner node as explained in the data partitioning section. The remainder of replicas are placed by Cassandra on specific nodes using replica placement strategy. All replicas are equally important for all database operations except for a few cluster mutation operations.

There are two settings which mainly impact replica placement. First is snitch, which determines the data center, and the rack a Cassandra node belongs to, and it is set at the node level. They inform Cassandra about the network topology so that requests are routed efficiently and allow Cassandra to distribute replicas by grouping machines into data centers and racks. GossipingPropertyFileSnitch is the goto snitch for any cluster deployment. It uses a configuration file called cassandra-rackdc.properties on each node. It contains the rack and data center name which hosts the node. There are cloud-specific snitch available for AWS and GCP. 

The second setting is the replication strategy. The replication strategy is set at keyspace level. There are two strategies: SimpleStrategy and NetworkTopologyStrategy. The SimpleStrategy does not consider racks and multiple data centers. It places data replicas on nodes sequentially. The NetworkTopologyStrategy is rack aware and data center aware. SimpleStrategy should be only used for temporary and small cluster deployments, for all other clusters NetworkTopologyStrategy is highly recommended. A keyspace definition when used with NetworkTopologyStrategy specifies the number of replicas per data center as:

cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy', dc_1: 3, dc_2: 1}

Here, the keyspace named ks is replicated in dc_1 with factor three and in dc_2 with factor one.

Consistency and Availability

Each distributed system works on the principle of CAP theorem. The CAP theorem states that any distributed system can strongly deliver any two out of the three properties: Consistency, Availability and Partition-tolerance. Cassandra provides flexibility for choosing between consistency and availability while querying data. In other words, data can be highly available with low consistency guarantee, or it can be highly consistent with lower availability. For example, if there are three data replicas, a query reading or writing data can ask for acknowledgments from one, two, or all three replicas to mark the completion of the request. For a read request, Cassandra requests the data from the required number of replicas and compares their write-timestamp. The replica with the latest write-timestamp is considered to be the correct version of the data. Hence, the more replicas involved in a read operation adds to the data consistency guarantee. For write requests, the requested number is considered for replicas acknowledgeing the write. 

Naturally, the time required to get the acknowledgement from replicas is directly proportional to the number of replicas requests for acknowledgement. Hence, consistency and availability are exchangeable. The concept of requesting a certain number of acknowledgements is called tunable consistency and it can be applied at the individual query level. 

There are a few considerations related to data availability and consistency: 

  • The replication factor should ideally be an odd number. The common replication factor used is three, which provides a balance between replication overhead, data distribution, and consistency for most workloads.    
  • The number of racks in a data center should be in multiples of the replication factor. The common number used for nodes is in multiples of three. 
  • There are various terms used to refer to the consistency levels – 
    • One, two, three: Specified number of replicas must acknowledge the operation.
    • Quorum: The strict majority of nodes is called a quorum. The majority is one more than half of the nodes. This consistency level ensures that most of the replicas confirm the operation without having to wait for all replicas. It balances the operation efficiency and good consistency. e.g.Quorum for a replication factor of three is (3/2)+1=2; For replication factor five it is (5/2)+1=3.
    • Local_*: This is a consistency level for a local data center in a multi-data center cluster. A local data center is where the client is connected to a coordinator node. The * takes a value of any specific number specified above or quorum, e.g. local_three, local_quorum. 
    • Each_*: This level is also related to multi data center setup. It denotes the consistency to be achieved in each of the data centers independently, e.g. each_quorum means quorum consistency in each data center. 

The data written and read at a low consistency level does not mean it misses the advantage of replication. The data is kept consistent across all replicas by Cassandra, but it happens in the background. This concept is referred to as eventual consistency. In the three replica example, if a user queries data at consistency level one, the query will be acknowledged when the read/write happens for a single replica. In case of a read operation, this could mean relying on a single data replica as a source of truth for the data. In case of a write operation, the remainder replicas receive the data later on and are made consistent eventually. In case of failure of replication, the replicas might not get the data. Cassandra handles replication shortcomings with a mechanism called anti-entropy which is covered later in the post. 

Query Interface

Cassandra Query Language CQL is the interface to query Cassandra with a binary protocol. Earlier versions of Cassandra supported thrift which is now entirely replaced by CQL. CQL is designed to be similar to SQL for a quicker learning curve and familiar syntax. The DDL operations allow to create keyspace and tables, the CRUD operations are select, insert, update, and delete where select is a Cassandra read operation, and all others are Cassandra write operations. 

A table definition includes column definitions and primary, partition, and clustering keys. The table definition also contains several settings for data storage and maintenance. The primary key is a combination of partition key and clustering columns. The clustering columns are optional. The partition key can be a single column or a composite key. 

The query set available in CQL is quite limited as compared to SQL. A few highlights: 

  • Cassandra does not support join operations and nested queries. 
  • Each select query should specify a complete partition key. It is possible to query multiple partitions, but not recommended. Refer cassandra scalability 
  • Cassandra supports a limited set of data aggregation operations.
  • The order by clause can be used only for columns in the clustering key. Also, those should be used in the correct order of precedence.

The reason for a limited query set in Cassandra comes from specific data modelling requirements. The data model for a Cassandra database should be aimed to create denormalized tables which can cater to the select query patterns. Cassandra data modeling is one of the essential operations while designing the database. All the features provided by Cassandra architecture like scalability and reliability are directly subject to an optimum data model. Refer cassandra-data-modelling for details on the topic.  

The Cassandra driver program provides a toolset for connection management, pooling, and querying. The driver creates a connection with a Cassandra node which is then referred to as the coordinator node for the query. The coordinator is responsible for query execution and to aggregate partial results. 

The Datastax Java Driver is the most popular, efficient and feature rich driver available for Cassandra. There are several other technology drivers which provide similar functionality. 

Data Storage

Cassandra uses commit log for each incoming write request on a node. Commit log is a write-ahead log, and it can be replayed in case of failure. The on-disk data structure is called SSTable. SSTables are created per table in the database. 

Example:

Consider a sample keyspace and table created as follows.

cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy','datacenter_1' : 3};

cqlsh> CREATE TABLE ks.tb (
    id int PRIMARY KEY,
    col1 text);

And insert some data:
cqlsh> insert into ks.tb (id, col1) values (1, 'first_row');
cqlsh> insert into ks.tb (id, col1) values (2, 'second_row');
cqlsh> insert into ks.tb (id, col1) values (3, 'third_row');

The data we inserted looks as given below in an SSTable. 

Note that this representation is obtained by a utility to generate human-readable data from SSTables. The actual data in SSTables is in binary format and compressed for efficiency.

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 33,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:07.756013Z" },
        "cells" : [
          { "name" : "col1", "value" : "first_row" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "2" ],
      "position" : 34
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 71,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:29.923397Z" },
        "cells" : [
          { "name" : "col1", "value" : "second_row" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "3" ],
      "position" : 72
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 108,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:39.282459Z" },
        "cells" : [
          { "name" : "col1", "value" : "third_row" }
        ]
      }
    ]
  }
]

Cassandra maintains immutability for data storage to provide optimal performance. Hence, SSTables are immutable. The updates and deletes to data are handled with a new version of data. This strategy results in multiple versions of data at any given time. Cassandra is designed to be optimistic for write operations as compared to the read operations. The read operation consolidates all versions of the data and returns the most recent version. Each data cell is written with a write-timestamp which specifies the time when the particular data was written. This timestamp is used to find the latest version of data while retrieving data for a read operation. 

In the above example, we update data for a column of id 1 and see the result:

cqlsh> update ks.tb set col1='updated_row_one' where id=1;

The resulting data in the SSTable for this update looks like:

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 39,
        "cells" : [
          { "name" : "col1", "value" : "updated_row_one", "tstamp" : "2020-04-14T13:38:37.794697Z" }
        ]
      }
    ]
  }
]

The data looks precisely the same to the newly inserted data. Cassandra identifies this and considers the updated value as it has greater timestamp value. 

The deletes are handled uniquely in Cassandra to make those compatible with immutable data. Each delete is recorded as a new record which marks the deletion of the referenced data. This special data record is called a tombstone. Cassandra read operation discards all the information for a row or cell if a tombstone exists, as it denotes deletion of the data. There are various types of tombstones to denote data deletion for each element, e.g. cell, row, partition, range of rows etc. Cassandra allows setting a Time To Live TTL on a data row to expire it after a specified amount of time after insertion. The data once past its TTL is regarded as a tombstone in Cassandra. Refer managing-tombstones-in-cassandra for operational information and efficiency about tombstones. 

Now with the SSTable example, a cell delete looks like:

cqlsh> delete col1 from ks.tb where id=1;

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 24,
        "cells" : [
          { "name" : "col1", "deletion_info" : { "local_delete_time" : "2020-04-14T13:44:27Z" },
            "tstamp" : "2020-04-14T13:44:27.179254Z"
          }
        ]
      }
    ]
  }
]

The deletion_info indicates that the cell is deleted. This data is the tombstone for the original data and all the data versions. 

Cassandra performs compaction operation on SSTables which consolidates two or more SSTables to form a new SSTable. This process combines all versions of data in participating SSTables. The compaction outputs a single version of data among all obtained versions in the resulting SSTable. Compactions also purge the data associated with a tombstone if all the required conditions for purging are met. There are various strategies to trigger and perform compaction. Refer apache-cassandra-compactions

  • SizeTieredCompactionStrategy (STCS): This is the default compaction strategy. It is triggered using the size of SSTables on-disk. 
  • LevelledCompactionStrategy (LCS): This strategy is used to optimize read performance. This strategy considers the data partitions present in SSTables, and arranges SSTables in levels. Each level has a fixed set of tables and those are compacted with each other.  
  • TimeWindowCompactionStrategy (TWCS): This is a specialized strategy for time series data. It arranges SSTables in time window buckets defined in the table definition. The SSTables within a time window are only compacted with each other. 

The other crucial set of operations performed in Cassandra is anti-entropy. The aim of these operations is to keep data as consistent as possible. The anti-entropy enables Cassandra to provide the eventual consistency model.

  • Hinted Handoff: If a node in Cassandra is not available for a short period, the data which is supposed to be replicated on the node is stored on a peer node. This data is called hints. Once the original node becomes available, the hints are transferred to the node, and the node is caught up with missed data. There are time and storage restrictions for hints. If a node is not available for a longer duration than configured, no hints are saved for it. Hints cannot be considered as a primary anti-entropy mechanism.
  • Read Repair: Read operation is used as an opportunity to repair inconsistent data across replicas. The latest write-timestamp is used as a marker for the correct version of data. The read repair operation is performed only in a portion of the total reads to avoid performance degradation. Read repairs are opportunistic operations and not a primary operation for anti-entropy.
  • Repair: Repair is the primary anti-entropy operation to make data consistent across replicas. Repairs are performed by creating specialized data structures called Merkel-trees. These are hash values of all data values in a replica. Then these are transferred to other replicas and compared to detect inconsistencies. The correct data is then streamed across nodes to repair the inconsistencies.

Repairs need to be scheduled manually as these are intensive operations that consume a significant amount of cluster resources. 

Write Path

Cassandra write path is the process followed by a Cassandra node to store data in response to a write operation. A coordinator node initiates a write path and is responsible for the request completion. 

The high-level steps are as follows:

  • The partitioner applies hash to the partition key of an incoming data partition and generates a token.
  • The node is identified where the partition belongs to and all the nodes where the replicas reside for the partition.
  • Write request is forwarded to all replica nodes, and acknowledgement is awaited. 
  • As the number of nodes required to fulfil the write consistency level acknowledge the request completion, the write operation completes. 

An example with a six node cluster, a replication factor of three and a write request consistency of quorum. 

Quorum for RF 3 = (3/2)+1 = 2

Common error scenarios:

  1. If the sufficient number of nodes required to fulfil the request are not available, or do not return the request acknowledgement, coordinator throws an exception.  
  2. Even after satisfying the request with the required number of replica acknowledgements, if an additional node which stores a replica for the data is not available,  the data could be saved as a hint on another node. 

In a multi-data center cluster, the coordinator forwards write requests to all applicable local nodes. For the remote data centers, the write request is forwarded to a single node per data center. The node replicates data to the data center with the required number of nodes to satisfy the consistency level. 

The Anatomy of a Write Operation on a Node

This operation involves commit log, memtable and SSTable. Commit log is a write-ahead log which is stored on-disk. The write operation is recorded in the commit log of a node, and the acknowledgement is returned. The data is then stored in a memtable which is in memory structure representing SSTable on-disk. The memtable is flushed to disk after reaching the memory threshold which creates a new SSTable. The SSTables are eventually compacted to consolidate the data and optimize read performance.

Read Path 

Cassandra read path is the process followed by a Cassandra node to retrieve data in response to a read operation. The read path has more steps than the write path. Actions performed to serve a read request are as follows:

  1. The coordinator generates a hash using the partition key and gathers the replica nodes which are responsible for storing the data.
  2. The coordinator checks if replicas required to satisfy the read consistency level are available. If not, an exception is thrown, and the read operation ends.
  3. The coordinator then sends a read data request to the fastest responding replica; the fastest replica could be the coordinator itself. The fast replica is determined by dynamic snitch, which keeps track of node latencies dynamically.
  4. The coordinator then sends a digest request to the replicas of data. The digest is a hash calculated over requested data by the replica nodes.
  5. The coordinator compares all the digests to determine whether all the replicas have a consistent version of the data. If those are equal, it returns the result obtained from the fastest replica.

If the digests from all the replicas are not equal, it means some replicas do not have the latest version of the data. In this case, read data requests for all the replicas are issued, and the data with the latest timestamp is returned to the client. Also, read repair requests are issued for the replicas which do not have the latest data version.

Components involved in a read operation on a node:

  • Row cache: This is a cache for frequently read data rows, also referred to as hot data. It stores a complete data row which can be returned directly to the client if requested by a read operation. This is an optional feature and works best if there are a small number of hot rows which can fit in the row cache.
  • Partition key cache: This component caches the partition index entries per table which are frequently used. In other words, it stores the location of partitions which are commonly queried but not the complete rows. This feature is used by default in Cassandra, but it can be optimized more.
  • Bloom filter: A bloom filter is a data structure which indicates if a data partition could be included in a given SSTable. The positive result returned by a bloom filter can be a false signal, but the negative results are always accurate. Hence it saves a lot of seek-time for read operations.
  • Partition index and summary: A partition index contains offset of all partitions for their location in SSTable. The partition summary is a summary of the index. These components enable locating a partition exactly in an SSTable rather than scanning data.
  • Memtable: Memtable is in-memory representation of SSTables. If a data partition is present in memtable, it can be directly read for specific data rows and returned to the client.
  • Compression offset map: This is the map for locating data in SSTables when it is compressed on-disk. 
  • SSTable: The on-disk data structure which holds all the data once flushed from memory. 

Anatomy of Read Operation on a Node

  1. Cassandra checks the row cache for data presence. If present, the data is returned, and the request ends.
  2. The flow of request includes checking bloom filters. If the bloom filter indicates data presence in an SSTable, Cassandra continues to look for the required partition in the SSTable.
  3. The key cache is checked for the partition key presence. The cache hit provides an offset for the partition in SSTable. This offset is then used to retrieve the partition, and the request completes.
  4. Cassandra continues to seek the partition in the partition summary and partition index. These structures also provide the partition offset in an SSTable which is then used to retrieve the partition and return. The caches are updated if present with the latest data read. 

Conclusion

Cassandra architecture is uniquely designed to provide scalability, reliability, and performance. It is based on distributed system architecture and operates on CAP theorem.  Cassandra’s unique architecture needs careful configuration and tuning. It is essential to understand the components in order to use Cassandra efficiently.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post A Comprehensive Guide to Cassandra Architecture appeared first on Instaclustr.

Scylla Summit 2019

I’ve had the pleasure to attend again and present at the Scylla Summit in San Francisco and the honor to be awarded the Most innovative use case of Scylla.

It was a great event, full of friendly people and passionate conversations. Peter did a great full write-up of it already so I wanted to share some of my notes instead…

This a curated set of topics that I happened to question or discuss in depth so this post is not meant to be taken as a full coverage of the conference.

Scylla Manager version 2

The upcoming version of scylla-manager is dropping its dependency on SSH setup which will be replaced by an agent, most likely shipped as a separate package.

On the features side, I was a bit puzzled by the fact that ScyllaDB is advertising that its manager will provide a repair scheduling window so that you can control when it’s running or not.

Why did it struck me you ask?

Because MongoDB does the same thing within its balancer process and I always thought of this as a patch to a feature that the database should be able to cope with by itself.

And that database-do-it-better-than-you motto is exactly one of the promises of Scylla, the boring database, so smart at handling workload impacts on performance that you shouldn’t have to start playing tricks to mitigate them… I don’t want this time window feature on scylla-manager to be a trojan horse on the demise of that promise!

Kubernetes

They almost got late on this but are working hard to play well with the new toy of every tech around the world. Helm charts are also being worked on!

The community developed scylla operator by Yannis is now being worked on and backed by ScyllaDB. It can deploy, scale up and down a cluster.

Few things to note:

  • it’s using a configmap to store the scylla config
  • no TLS support yet
  • no RBAC support yet
  • kubernetes networking is lighter on the network performance hit that was seen on Docker
  • use placement strategies to dedicate kubernetes nodes to scylla!

Change Data Capture

Oh boy this one was awaited… but it’s now coming soon!

I inquired about it’s performance impact since every operation will be written to a table. Clearly my questioning was a bit alpha since CDC is still being worked on.

I had the chance to discuss ideas with Kamil, Tzach and Dor: one of the thing that one of my colleague Julien asked for was the ability for the CDC to generate an event when a tombstone is written so we could actually know when a specific data expired!

I want to stress a few other things too:

  • default TTL on CDC table is 24H
  • expect I/O impact (logical)
  • TTL tombstones can have a hidden disk space cost and nobody was able to tell me if the CDC table was going to be configured with a lower gc_grace_period than the default 10 days so that’s something we need to keep in mind and check for
  • there was no plan to add user information that would allow us to know who actually did the operation, so that’s something I asked for because it could be used as a cheap and open source way to get auditing!

LightWeight Transactions

Another so long awaited feature is also coming from the amazing work and knowledge of Konstantin. We had a great conversation about the differences between the currently worked on Paxos based LWT implementation and the maybe later Raft one.

So yes, the first LWT implementation will be using Paxos as a consensus algorithm. This will make the LWT feature very consistent while having it slower that what could be achieved using Raft. That’s why ScyllaDB have plans on another implementation that could be faster with less data consistency guarantees.

User Defined Functions / Aggregations

This one is bringing the Lua language inside Scylla!

To be precise, it will be a Lua JIT as its footprint is low and Lua can be cooperative enough but the ScyllaDB people made sure to monitor its violations (when it should yield but does not) and act strongly upon them.

I got into implementation details with Avi, this is what I noted:

  • lua function return type is not checked at creation but at execution, so expect runtime errors if your lua code is bad
  • since lua is lightweight, there’s no need to assign a core to lua execution
  • I found UDA examples, like top-k rows, to be very similar to the Map/Reduce logic
  • UDF will allow simpler token range full table scans thanks to syntax sugar
  • there will be memory limits applied to result sets from UDA, and they will be tunable

Text search

Dejan is the text search guy at ScyllaDB and the one who kindly implemented the LIKE feature we asked for and that will be released in the upcoming 3.2 version.

We discussed ideas and projected use cases to make sure that what’s going to be worked on will be used!

Redis API

I’ve always been frustrated about Redis because while I love the technology I never trusted its clustering and scaling capabilities.

What if you could scale your Redis like Scylla without giving up on performance? That’s what the implementation of the Redis API backed by Scylla will get us!

I’m desperately looking forward to see this happen!

Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

Streaming is a process where nodes of a cluster exchange data in the form of SSTables. Streaming can kick in during many situations such as bootstrap, repair, rebuild, range movement, cluster expansion, etc. In this post, we discuss the massive performance improvements made to the streaming process in Apache Cassandra 4.0.

High Availability

As we know Cassandra is a Highly Available, Eventually Consistent database. The way it maintains its legendary availability is by storing redundant copies of data in nodes known as replicas, usually running on commodity hardware. During normal operations, these replicas may end up having hardware issues causing them to fail. As a result, we need to replace them with new nodes on fresh hardware.

As part of this replacement operation, the new Cassandra node streams data from the neighboring nodes that hold copies of the data belonging to this new node’s token range. Depending on the amount of data stored, this process can require substantial network bandwidth, taking some time to complete. The longer these types of operations take, the more we are exposing ourselves to loss of availability. Depending on your replication factor and consistency requirements, if another node fails during this replacement operation, ability will be impacted.

Increasing Availability

To minimize the failure window, we want to make these operations as fast as possible. The faster the new node completes streaming its data, the faster it can serve traffic, increasing the availability of the cluster. Towards this goal, Cassandra 4.0 saw the addition of Zero Copy streaming. For more details on Cassandra’s zero copy implementation, see this blog post and CASSANDRA-14556 for more information.

Talking Numbers

To quantify the results of these improvements, we, at Netflix, measured the performance impact of streaming in 4.0 vs 3.0, using our open source NDBench benchmarking tool with the CassJavaDriverGeneric plugin. Though we knew there would be improvements, we were still amazed with the overall results of a five fold increase in streaming performance. The test setup and operations are all detailed below.

Test Setup

In our test setup, we used the following configurations:

  • 6-node clusters on i3.xl, i3.2xl, i3.4xl and i3.8xl EC2 instances, each on 3.0 and trunk (sha dd7ec5a2d6736b26d3c5f137388f2d0028df7a03).
  • Table schema
CREATE TABLE testing.test (
    key text,
    column1 int,
    value text,
    PRIMARY KEY (key, column1)
) WITH CLUSTERING ORDER BY (column1 ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
    AND compression = {'enabled': 'false'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
  • Data size per node: 500GB
  • No. of tokens per node: 1 (no vnodes)

To trigger the streaming process we used the following steps in each of the clusters:

  • terminated a node
  • add a new node as a replacement
  • measure the time taken to complete streaming data by the new node replacing the terminated node

For each cluster and version, we repeated this exercise multiple times to collect several samples.

Below is the distribution of streaming times we found across the clusters Benchmark results

Interpreting the Results

Based on the graph above, there are many conclusions one can draw from it. Some of them are

  • 3.0 streaming times are inconsistent and show high degree of variability (fat distributions across multiple samples)
  • 3.0 streaming is highly affected by the instance type and generally looks generally CPU bound
  • Zero Copy streaming is approximately 5x faster
  • Zero Copy streaming time shows little variability in its performance (thin distributions across multiple samples)
  • Zero Copy streaming performance is not CPU bound and remains consistent across instance types

It is clear from the performance test results that Zero Copy Streaming has a huge performance benefit over the current streaming infrastructure in Cassandra. But what does it mean in the real world? The following key points are the main take aways.

MTTR (Mean Time to Recovery): MTTR is a KPI (Key Performance Indicator) that is used to measure how quickly a system recovers from a failure. Zero Copy Streaming has a very direct impact here with a five fold improvement on performance.

Costs: Zero Copy Streaming is ~5x faster. This translates directly into cost for some organizations primarily as a result of reducing the need to maintain spare server or cloud capacity. In other situations where you’re migrating data to larger instance types or moving AZs or DCs, this means that instances that are sending data can be turned off sooner saving costs. An added cost benefit is that now you don’t have to over provision the instance. You get a similar streaming performance whether you use a i3.xl or an i3.8xl provided the bandwidth is available to the instance.

Risk Reduction: There is a great reduction in the risk due to Zero Copy Streaming as well. Since a Cluster’s recovery mainly depends on the streaming speed, Cassandra clusters with failed nodes will be able to recover much more quickly (5x faster). This means the window of vulnerability is reduced significantly, in some situations down to few minutes.

Finally, a benefit that we generally don’t talk about is the environmental benefit of this change. Zero Copy Streaming enables us to move data very quickly through the cluster. It objectively reduces the number and sizes of instances that are used to build Cassandra cluster. As a result not only does it reduce Cassandra’s TCO (Total Cost of Ownership), it also helps the environment by consuming fewer resources!

Scylla: four ways to optimize your disk space consumption

We recently had to face free disk space outages on some of our scylla clusters and we learnt some very interesting things while outlining some improvements that could be made to the ScyllaDB guys.

100% disk space usage?

First of all I wanted to give a bit of a heads up about what happened when some of our scylla nodes reached (almost) 100% disk space usage.

Basically they:

  • stopped listening to client requests
  • complained in the logs
  • wouldn’t flush commitlog (expected)
  • abort their compaction work (which actually gave back a few GB of space)
  • stay in a stuck / unable to stop state (unexpected, this has been reported)

After restarting your scylla server, the first and obvious thing you can try to do to get out of this situation is to run the nodetool clearsnapshot command which will remove any data snapshot that could be lying around. That’s a handy command to reclaim space usually.

Reminder: depending on your compaction strategy, it is usually not advised to allow your data to grow over 50% of disk space...

But that’s only a patch so let’s go down the rabbit hole and look at the optimization options we have.


Optimize your schemas

Schema design and the types your choose for your columns have a huge impact on disk space usage! And in our case we indeed overlooked some of the optimizations that we could have done from the start and that did cost us a lot of wasted disk space. Fortunately it was easy and fast to change.

To illustrate this, I’ll take a sample of 100,000 rows of a simple and naive schema associating readings of 50 integers to a user ID:

Note: all those operations were done using Scylla 3.0.3 on Gentoo Linux.

CREATE TABLE IF NOT EXISTS test.not_optimized
(
uid text,
readings list<int>,
PRIMARY KEY(uid)
) WITH compression = {};

Once inserted on disk, this takes about 250MB of disk space:

250M    not_optimized-00cf1500520b11e9ae38000000000004

Now depending on your use case, if those readings at not meant to be updated for example you could use a frozen list instead, which will allow a huge storage optimization:

CREATE TABLE IF NOT EXISTS test.mid_optimized
(
uid text,
readings frozen<list<int>>,
PRIMARY KEY(uid)
) WITH compression = {};

With this frozen list we now consume 54MB of disk space for the same data!

54M     mid_optimized-011bae60520b11e9ae38000000000004

There’s another optimization that we could do since our user ID are UUIDs. Let’s switch to the uuid type instead of text:

CREATE TABLE IF NOT EXISTS test.optimized
(
uid uuid,
readings frozen<list<int>>,
PRIMARY KEY(uid)
) WITH compression = {};

By switching to uuid, we now consume 50MB of disk space: that’s a 80% reduced disk space consumption compared to the naive schema for the same data!

50M     optimized-01f74150520b11e9ae38000000000004

Enable compression

All those examples were not using compression. If your workload latencies allows it, you should probably enable compression on your sstables.

Let’s see its impact on our tables:

ALTER TABLE test.not_optimized WITH compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'};
ALTER TABLE test.mid_optimized WITH compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'};
ALTER TABLE test.optimized WITH compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'};

Then we run a nodetool compact test to force a (re)compaction of all the sstables and we get:

63M     not_optimized-00cf1500520b11e9ae38000000000004
28M mid_optimized-011bae60520b11e9ae38000000000004
24M optimized-01f74150520b11e9ae38000000000004

Compression is really a great gain here allowing another 50% reduced disk space usage reduction on our optimized table!

Switch to the new “mc” sstable format

Since the Scylla 3.0 release you can use the latest “mc” sstable storage format on your scylla clusters. It promises a greater efficiency for usually a way more reduced disk space consumption!

It is not enabled by default, you have to add the enable_sstables_mc_format: true parameter to your scylla.yaml for it to be taken into account.

Since it’s backward compatible, you have nothing else to do as new compactions will start being made using the “mc” storage format and the scylla server will seamlessly read from old sstables as well.

But in our case of immediate disk space outage, we switched to the new format one node at a time, dropped the data from it and ran a nodetool rebuild to reconstruct the whole node using the new sstable format.

Let’s demonstrate its impact on our test tables: we add the option to the scylla.yaml file, restart scylla-server and run nodetool compact test again:

49M     not_optimized-00cf1500520b11e9ae38000000000004
26M mid_optimized-011bae60520b11e9ae38000000000004
22M optimized-01f74150520b11e9ae38000000000004

That’s a pretty cool gain of disk space, even more for the not optimized version of our schema!

So if you’re in great need of disk space or it is hard for you to change your schemas, switching to the new “mc” sstable format is a simple and efficient way to free up some space without effort.

Consider using secondary indexes

While denormalization is the norm (yep.. legitimate pun) in the NoSQL world this does not mean we have to duplicate everything all the time. A good example lies in the internals of secondary indexes if your workload can compromise with its moderate impact on latency.

Secondary indexes on scylla are built on top of Materialized Views that basically stores an up to date pointer from your indexed column to your main table partition key. That means that secondary indexes MVs are not duplicating all the columns (and thus the data) from your main table as you would have to do when denormalizing a table to query by another column: this saves disk space!

This of course comes with a latency drawback because if your workload is interested in the other columns than the partition key of the main table, the coordinator node will actually issue two queries to get all your data:

  1. query the secondary index MV to get the pointer to the partition key of the main table
  2. query the main table with the partition key to get the rest of the columns you asked for

This has been an effective trick to avoid duplicating a table and save disk space for some of our workloads!

(not a tip) Move the commitlog to another disk / partition?

This should only be considered as a sort of emergency procedure or for cost efficiency (cheap disk tiering) on non critical clusters.

While this is possible even if the disk is not formatted using XFS, it not advised to separate the commitlog from data on modern SSD/NVMe disks but… you technically can do it (as we did) on non production clusters.

Switching is simple, you just need to change the commitlog_directory parameter in your scylla.yaml file.

Scylla Summit 2018 write-up

It’s been almost one month since I had the chance to attend and speak at Scylla Summit 2018 so I’m relieved to finally publish a short write-up on the key things I wanted to share about this wonderful event!

Make Scylla boring

This statement of Glauber Costa sums up what looked to me to be the main driver of the engineering efforts put into Scylla lately: making it work so consistently well on any kind of workload that it’s boring to operate 🙂

I will follow up on this statement to highlight the things I heard and (hopefully) understood during the summit. I hope you’ll find it insightful.

Reduced operational efforts

The thread-per-core and queues design still has a lot of possibilities to be leveraged.

The recent addition of RPC streaming capabilities to seastar allows a drastic reduction in the time it takes the cluster to grow or shrink (data rebalancing / resynchronization).

Incremental compaction is also very promising as this background process is one of the most expensive there is in the database’s design.

I was happy to hear that scylla-manager will soon be made available and free to use with basic features while retaining more advanced ones for enterprise version (like backup/restore).
I also noticed that the current version was not supporting SSL enabled clusters to store its configuration. So I directly asked Michał for it and I’m glad that it will be released on version 1.3.1.

Performant multi-tenancy

Why choose between real-time OLTP & analytics OLAP workloads?

The goal here is to be able to run both on the same cluster by giving users the ability to assign “SLA” shares to ROLES. That’s basically like pools on Hadoop at a much finer grain since it will create dedicated queues that will be weighted by their share.

Having one queue per usage and full accounting will allow to limit resources efficiently and users to have their say on their latency SLAs.

But Scylla also has a lot to do in the background to run smoothly. So while this design pattern was already applied to tamper compactions, a lot of work has also been done on automatic flow control and back pressure.

For instance, Materialized Views are updated asynchronously which means that while we can interact and put a lot of pressure on the table its based on (called the Main Table), we could overwhelm the background work that’s needed to keep MVs View Tables in sync. To mitigate this, a smart back pressure approach was developed and will throttle the clients to make sure that Scylla can manage to do everything at the best performance the hardware allows!

I was happy to hear that work on tiered storage is also planned to better optimize disk space costs for certain workloads.

Last but not least, columnar storage optimized for time series and analytics workloads are also something the developers are looking at.

Latency is expensive

If you care for latency, you might be happy to hear that a new polling API (named IOCB_CMD_POLL) has been contributed by Christoph Hellwig and Avi Kivity to the 4.19 Linux kernel which avoids context switching I/O by using a shared ring between kernel and userspace. Scylla will be using it by default if the kernel supports it.

The iotune utility has been upgraded since 2.3 to generate an enhanced I/O configuration.

Also, persistent (disk backed) in-memory tables are getting ready and are very promising for latency sensitive workloads!

A word on drivers

ScyllaDB has been relying on the Datastax drivers since the start. While it’s a good thing for the whole community, it’s important to note that the shard-per-CPU approach on data that Scylla is using is not known and leveraged by the current drivers.

Discussions took place and it seems that Datastax will not allow the protocol to evolve so that drivers could discover if the connected cluster is shard aware or not and then use this information to be more clever in which write/read path to use.

So for now ScyllaDB has been forking and developing their shard aware drivers for Java and Go (no Python yet… I was disappointed).

Kubernetes & containers

The ScyllaDB guys of course couldn’t avoid the Kubernetes frenzy so Moreno Garcia gave a lot of feedback and tips on how to operate Scylla on docker with minimal performance degradation.

Kubernetes has been designed for stateless applications, not stateful ones and Docker does some automatic magic that have rather big performance hits on Scylla. You will basically have to play with affinities to dedicate one Scylla instance to run on one server with a “retain” reclaim policy.

Remember that the official Scylla docker image runs with dev-mode enabled by default which turns off all performance checks on start. So start by disabling that and look at all the tips and literature that Moreno has put online!

Scylla 3.0

A lot has been written on it already so I will just be short on things that important to understand in my point of view.

  • Materialized Views do back fill the whole data set
    • this job is done by the view building process
    • you can watch its progress in the system_distributed.view_build_status table
  • Secondary Indexes are Materialized Views under the hood
    • it’s like a reverse pointer to the primary key of the Main Table
    • so if you read the whole row by selecting on the indexed column, two reads will be issued under the hood: one on the indexed MV view table to get the primary key and one on the main table to get the rest of the columns
    • so if your workload is mostly interested by the whole row, you’re better off creating a complete MV to read from than using a SI
    • this is even more true if you plan to do range scans as this double query could lead you to read from multiple nodes instead of one
  • Range scan is way more performant
    • ALLOW FILTERING finally allows a great flexibility by providing server-side filtering!

Random notes

Support for LWT (lightweight transactions) will be relying on a future implementation of the Raft consensus algorithm inside Scylla. This work will also benefits Materialized Views consistency. Duarte Nunes will be the one working on this and I envy him very much!

Support for search workloads is high in the ScyllaDB devs priorities so we should definitely hear about it in the coming months.

Support for “mc” sstables (new generation format) is done and will reduce storage requirements thanks to metadata / data compression. Migration will be transparent because Scylla can read previous formats as well so it will upgrade your sstables as it compacts them.

ScyllaDB developers have not settled on how to best implement CDC. I hope they do rather soon because it is crucial in their ability to integrate well with Kafka!

Materialized Views, Secondary Indexes and filtering will benefit from the work on partition key and indexes intersections to avoid server side filtering on the coordinator. That’s an important optimization to come!

Last but not least, I’ve had the pleasure to discuss with Takuya Asada who is the packager of Scylla for RedHat/CentOS & Debian/Ubuntu. We discussed Gentoo Linux packaging requirements as well as the recent and promising work on a relocatable package. We will collaborate more closely in the future!

Introducing Transient Replication

Transient Replication is a new experimental feature soon to be available in 4.0. When enabled, it allows for the creation of keyspaces where replication factor can be specified as a number of copies (full replicas) and temporary copies (transient replicas). Transient replicas retain the data they replicate only long enough for it to be propagated to full replicas, via incremental repair, at which point the data is deleted. Writing to transient replicas can be avoided almost entirely if monotonic reads are not required because it is possible to achieve a quorum of acknowledged writes without them.

This results in a savings in disk space, CPU, and IO. By deleting data as soon as it is no longer needed, transient replicas require only a fraction of the disk space of a full replica. By not having to store the data indefinitely, the CPU and IO required for compaction is reduced, and read queries are faster as they have less data to process.

So what are the benefits of not actually keeping a full copy of the data? Well, for some installations and use cases, transient replicas can be almost free if monotonic reads are disabled. In future releases where monotonic reads are supported with Transient Replication, enabling monotonic reads would reduce the savings in CPU and IO, but even then they should still be significant.

Transient Replication is designed to be transparent to applications:

  • Consistency levels continue to produce the same results for queries.
  • The number of replicas that can be lost before data loss occurs is unchanged.
  • The number of replicas that can be unavailable before some queries start to timeout or return unavailable is unchanged (with the exception of ONE).

With Transient Replication, you can go from 3 replicas to 5 replicas, two of which are transient, without adding any hardware.

If you are running an active-passive 2 DC setup with 3 replicas in each DC, you can make one replica in each DC transient and still have four full copies of the data in total.

Feature support

Transient Replication is not intended to fully replace Cassandra’s existing approach to replication. There are features that currently don’t work with transiently replicated keyspaces and features that are unlikely ever to work with them.

You can have keyspaces with and without Transient Replication enabled in the same cluster, so it is possible to use Transient Replication for just the use cases that are a good fit for the currently available functionality.

Currently unsupported but coming:

  • Monotonic reads
  • Batch log
  • LWT
  • Counters

Will never be supported:

  • Secondary indexes
  • Materialized views

How Transient Replication works

Overview

Transient replication extends Cassandra’s existing consistent hashing algorithm to designate some replicas of a point or range on the consistent hash ring as transient and some as full. The following image depicts a consistent hash ring with three replicas A, B, and C. The replicas are located at tokens 5, 10, 15 respectively. A key k hashes to token 3 on the ring.

A consistent hash ring without Transient Replication

Replicas are selected by walking the ring clockwise starting at the point on the ring the key hashes to. At RF=3, the replicas of key k **are **A, B, C. With Transient Replication, the last N replicas (where N is the configured number of transient replicas) found while walking the ring are designated as transient.

There are no nodes designated as transient replicas or full replicas. All nodes will fully replicate some ranges on the ring and transiently replicate others.

The following image depicts a consistent hash ring at RF=3/1 (three replicas, one of which is transient). The replicas of k are still A, B, and C, but C is now transiently replicating k.

A consistent hash ring with Transient Replication

Normally all replicas of a range receive all writes for that range, as depicted in the following image.

Normal write behavior

Transient replicas do not receive writes in the normal write path.

Transient write behavior

If sufficient full replicas are unavailable, transient replicas will receive writes.

Transient write with unavailable node

This optimization, which is possible with Transient Replication, is called Cheap Quorums. This minimizes the amount of work that transient replicas have to do at write time, and reduces the amount of background compaction they will have to do.

Cheap Quorums and monotonic reads: Cheap Quorums may end up being incompatible with an initial implementation of monotonic reads, and operators will be able to make a conscious trade off between performance and monotonic reads.

Rapid write protection

In keyspaces utilizing Transient Replication, writes are sent to every full replica and enough transient replicas to meet the requested consistency level (to make up for unavailable full replicas). In addition, enough transient replicas are selected to reach a quorum in every datacenter, though unless the consistency level requires it, the write will be acknowledged without ensuring all have been delivered.

Because not all replicas are sent the write, it’s possible that insufficient replicas will respond, causing timeouts. To prevent this, we implement rapid write protection, similar to rapid read protection, that sends writes to additional replicas if sufficient acknowledgements to meet the consistency level are not received promptly.

The following animation shows rapid write protection in action.

Animation of rapid write protection preventing a write timeout

Rapid write protection is configured similarly to rapid read protection using the table option additional_write_policy. The policy determines how long to wait for acknowledgements before sending additional mutations. The default is to wait for P99 of the observed latency.

Incremental repair

Incremental repair is used to clean up transient data at transient replicas and propagate it to full replicas.

When incremental repair occurs transient replicas stream out transient data, but don’t receive any. Anti-compaction is used to separate transient and fully replicated data so that only fully replicated data is retained once incremental repair completes.

The result of running an incremental repair is that all full replicas for a range are synchronized and can be used interchangeably to retrieve the repaired data set for a query.

Read path

Reads must always include at least one full replica and can include as many replicas (transient or full) as necessary to achieve the desired consistency level. At least one full replica is required in order to provide the data not available at transient replicas, but it doesn’t matter which full replica is picked because incremental repair synchronizes the repaired data set across full replicas.

Reads at transient replicas are faster than reads at full replicas because reads at transient replicas are unlikely to return any results if monotonic reads are disabled, and they haven’t been receiving writes.

Creating keyspaces with Transient Replication

Transient Replication is supported by SimpleStrategy and NetworkTopologyStrategy. When specifying the replication factor, you can specify the number of transient replicas in addition to the total number of replicas (including transient replicas). The syntax for a replication factor of 3 replicas total with one of them being transient would be “3/1”.

ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'DC1' : '3/1'};
ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : '3/1'};

Monotonic reads are not supported with Transient Replication in 4.0, so any existing tables in the keyspace must have monotonic reads disabled by setting read_repair = 'NONE'

Once the keyspace has been altered, you will need to run incremental repair and then nodetool cleanup to ensure transient data is cleaned up.

Operational matters

Transient replication requires rolling incremental repair to be run regularly in order to move data from transient replicas to full replicas. By default transient replicas will receive 1% of writes for transiently replicated ranges due to rapid write protection. If a node is down for an extended period of time, its transient replicas will receive additional write load and that data should be cleaned up using incremental repair. Running incremental repair regularly will ensure that the size of each repair is small.

It’s also a good idea to run a small number of vnodes with transient replication so that when a node goes down the load is spread out over several other nodes that transiently replicate that range. Larges numbers of vnodes are known to be problematic, so it’s best to start with a cluster that is already close to or at its maximum size so that a small number of vnodes will be sufficient. If you intend to grow the cluster in the future, you will need to be cognizant of how this will interact with the number of vnodes you select.

While the odds of any data loss should multiple nodes be permanently lost remain the same with transient replication, the magnitude of potential data loss does not. With 3/1 transient replication the permanent loss of two nodes could result in the loss of the entirety of the repaired data set. If you are running a multi-DC setup with a high level of replication such as 2 DCs, with 3/1 replicas in each, then you will have 4 full copies total and the added risk of transient replication is minimal.

Experimental features

Experimental features are a relatively new idea for Apache Cassandra. Although we recently voted to make materialized views an experimental feature retroactively, Transient Replication is the first experimental feature to be introduced as such.

The goal of introducing experimental features is to allow for incremental development across multiple releases. In the case of Transient Replication, we can avoid a giant code drop that heavily modifies the code base, and the associated risks with incorporating a new feature that way.

What it means for a feature to be experimental doesn’t have a set definition, but for Transient Replication it’s intended to set expectations. As of 4.0, Transient Replication’s intended audience is expert operators of Cassandra with the ability to write the book on how to safely deploy Transient Replication, debug any issues that result, and if necessary contribute code back to address problems as they are discovered.

It’s expected that the feature set for Transient Replication will not change in minor updates to 4.0, but eventually it should be ready for use by a wider audience.

Next steps for Transient Replication

If increasing availability or saving on capacity sounds good to you, then you can help make transient replication production-ready by testing it out or even deploying it. Experience and feedback from the community is one the of the things that will drive transient replication bug fixing and development.