Announcing the Astra Service Broker: Tradeoff-Free Cassandra in Kubernetes

[Webcast] The 411 on Storage Attached Indexing in Apache Cassandra

How to Keep Pirates “Hackers” Away From Your Booty “Data” with Cassandra RBAC

Sizing Matters: Sizing Astra for Apache Cassandra Apps

Apache Cassandra Changelog #1 (October 2020)

Introducing the first Cassandra Changelog blog! Our monthly roundup of key activities and knowledge to keep the community informed.

Cassandra Changelog header

Release Notes

Updated

The most current Apache Cassandra releases are 4.0-beta2, 3.11.8, 3.0.22, 2.2.18 and 2.1.22 released on August 31 and are in the repositories. The next cut of releases will be out soon―join the Cassandra mailing list to stay up-to-date.

We continue to make progress toward the 4.0 GA release with the overarching goal of it being at a state where major users should feel confident running it in production when it is cut. Over 1,300 Jira tickets have been closed and less than 100 remain as of this post. To gain this confidence, there are various ongoing testing efforts involving correctness, performance, and ease of use.

Added

With CASSANDRA-15013, the community improved Cassandra's ability to handle high throughput workloads, while having enough safeguards in place to protect itself from potentially going out of memory.

Added

The Harry project is a fuzz testing tool that aims to generate reproducible workloads that are as close to real-life as possible, while being able to efficiently verify the cluster state against the model without pausing the workload itself.

Added

The community published its first Apache Cassandra Usage Report 2020 detailing findings from a comprehensive global survey of 901 practitioners on Cassandra usage to provide a baseline understanding of who, how, and why organizations use Cassandra.

Community Notes

Updates on new and active Cassandra Enhancement Proposals (CEPs) and how to contribute.

Changed

CEP-2: Kubernetes Operator was introduced this year and is an active discussion on creation of a community-based operator with the goal of making it easy to run Cassandra on Kubernetes.

Added

CEP-7: Storage Attached Index (SAI) is a new secondary index for Cassandra that builds on the advancements made with SASI. It is intended to replace the existing built-in secondary index implementations.

Added

Cassandra was selected by the ASF Diversity & Inclusion committee to be included in a research project to evaluate and understand the current state of diversity.

User Space

Bigmate

"In vetting MySQL, MongoDB, and other potential databases for IoT scale, we found they couldn't match the scalability we could get with open source Apache Cassandra. Cassandra's built-for-scale architecture enables us to handle millions of operations or concurrent users each second with ease – making it ideal for IoT deployments." - Brett Orr

Bloomberg

"Our group is working on a multi-year build, creating a new Index Construction Platform to handle the daily production of the Bloomberg Barclays fixed income indices. This involves building and productionizing an Apache Solr-backed search platform to handle thousands of searches per minute, an Apache Cassandra back-end database to store millions of data points per day, and a distributed computational engine to handle millions of computations daily." - Noel Gunasekar

In the News

Solutions Review - The Five Best Apache Cassandra Books on Our Reading List

ZDNet - What Cassandra users think of their NoSQL DBMS

Datanami - Cassandra Adoption Correlates with Experience

Container Journal - 5 to 1: An Overview of Apache Cassandra Kubernetes Operators

Datanami - Cassandra Gets Monitoring, Performance Upgrades

ZDNet - Faster than ever, Apache Cassandra 4.0 beta is on its way

Cassandra Tutorials & More

A Cassandra user was in search of a tool to perform schema DDL upgrades. Another user suggested https://github.com/patka/cassandra-migration to ensure you don't get schema mismatches if running multiple upgrade statements in one migration. See the full email on the user mailing list for other recommended tools.

Start using virtual tables in Apache Cassandra 4.0 - Ben Bromhead, Instaclustr

Benchmarking Apache Cassandra with Rust - Piotr Kołaczkowski, DataStax

Open Source BI Tools and Cassandra - Arpan Patel, Anant Corporation

Build Fault Tolerant Applications With Cassandra API for Azure Cosmos DB - Abhishek Gupta, Microsoft

Understanding Data Modifications in Cassandra - Sameer Shukla, Redgate

Cassandra Changelog footer

Cassandra Changelog is curated by the community. Please send submissions to cassandra@constantia.io.

Building a Low-Latency Distributed Stock Broker Application: Part 4

In the fourth blog of the  “Around the World ” series we built a prototype of the application, designed to run in two georegions.

Recently I re-watched “Star Trek: The Motion Picture” (The original 1979 Star Trek Film). I’d forgotten how much like “2001: A Space Odyssey” the vibe was (a drawn out quest to encounter a distant, but rapidly approaching very powerful and dangerous alien called “V’ger”), and also that the “alien” was originally from Earth and returning in search of “the Creator”—V’ger was actually a seriously upgraded Voyager spacecraft!

Star Trek: The Motion Picture (V’ger)

The original Voyager 1 and 2 had only been recently launched when the movie came out, and were responsible for some remarkable discoveries (including the famous “Death Star” image of Saturn’s moon Mimas, taken in 1980). Voyager 1 has now traveled further than any other human artefact and has since left the solar system! It’s amazing that after 40+ years it’s still working, although communicating with it now takes a whopping 40 hours in round-trip latency (which happens via the Deep Space Network—one of the stations is at Tidbinbilla, near our Canberra office).

Canberra Deep Space Communication Complex (CSIRO)

Luckily we are only interested in traveling “Around the World” in this blog series, so the latency challenges we face in deploying a globally distributed stock trading application are substantially less than the 40 hours latency to outer space and back. In Part 4 of this blog series we catch up with Phileas Fogg and Passepartout in their journey, and explore the initial results from a prototype application deployed in two locations, Sydney and North Virginia.

1. The Story So Far

In Part 1 of this blog series we built a map of the world to take into account inter-region AWS latencies, and identified some “georegions” that enabled sub 100ms latency between AWS regions within the same georegion. In Part 2 we conducted some experiments to understand how to configure multi-DC Cassandra clusters and use java clients, and measured latencies from Sydney to North Virginia. In Part 3 we explored the design for a globally distributed stock broker application, and built a simulation and got some indicative latency predictions. 

The goal of the application is to ensure that stock trades are done as close as possible to the stock exchanges the stocks are listed on, to reduce the latency between receiving a stock ticker update, checking the conditions for a stock order, and initiating a trade if the conditions are met. 

2. The Prototype

For this blog I built a prototype of the application, designed to run in two georegions, Australia and the USA. We are initially only trading stocks available from stock exchanges in these two georegions, and orders for stocks traded in New York will be directed to a Broker deployed in the AWS North Virginia region, and orders for stocks traded in Sydney will be directed to a Broker deployed in the AWS Sydney region. As this Google Earth image shows, they are close to being antipodes (diametrically opposite each other, at 15,500 km apart, so pretty much satisfy the definition of  traveling “Around the World”), with a measured inter-region latency (from blog 2) of 230ms.

Sydney to North Virginia (George Washington’s Mount Vernon house)

The design of the initial (simulated) version of the application was changed in a couple of ways to: 

  • Ensure that it worked correctly when instances of the Broker were deployed in multiple AWS regions
  • Measure actual rather than simulated latencies, and
  • Use a multi-DC Cassandra Cluster. 

The prototype isn’t a complete implementation of the application yet. In particular it only uses a single Cassandra table for Orders—to ensure that Orders are made available in both georegions, and can be matched against incoming stock tickers by the Broker deployed in that region. 

Some other parts of the application are “stubs”, including the Stock Ticker component (which will eventually use Kafka), and checks/updates of Holdings and generation of Transactions records (which will eventually also be Cassandra tables). Currently only the asynchronous, non-market, order types are implemented (Limit and Stop orders), as I realized that implementing Market orders (which are required to be traded immediately) using a Cassandra table would result in too many tombstones being produced—as each order is deleted immediately upon being filled (rather than being marked as “filled” in the original design, to enable the Cassandra query to quickly find unfilled orders and prevent the Orders table from growing indefinitely). However, for non-market orders it is a reasonable design choice to use a Cassandra table, as Orders may exist for extended periods of time (hours, days, or even weeks) before being traded, and as there is only a small number of successful trades (10s to 100s) per second relative to the total number of waiting Orders (potentially millions), the number of deletions, and therefore tombstones, will be acceptable. 

We now have a look at the details of some of the more significant changes that were made to the application.

Cassandra

I created a multi-DC Cassandra keyspace as follows:

CREATE KEYSPACE broker WITH replication = {'class': 'NetworkTopologyStrategy', 'NorthVirginiaDC': '3', 'SydneyDC': '3'};

In the Cassandra Java driver, the application.conf file determines which Data Center the Java client connects to. For example, to connect to the SydneyDC the file has the following settings:

datastax-java-driver {
    basic.contact-points = [
        "1.2.3.4:9042"
    ]

    basic.load-balancing-policy {
        class = DefaultLoadBalancingPolicy
        local-datacenter = "SydneyDC"
    }
}

The Orders table was created as follows (note that I couldn’t use “limit” for a column name as it’s a reserved word in Cassandra!):

CREATE TABLE broker.orders (
    symbol text,
    orderid text,
    buyorsell text,
    customerid text,
    limitthreshold double,
    location text,
    ordertype text,
    quantity bigint,
    starttime bigint,
    PRIMARY KEY (symbol, orderid)
);

For the primary key, the partition key is the stock symbol, so that all outstanding orders for a stock can be found when each stock ticker is received by the Broker, and the clustering column is the (unique) orderid, so that multiple orders for the same symbol can be written and read, and a specific order (for an orderid) can be deleted. In a production environment using a single stock symbol partition may result in skewed and unbounded partitions which is not recommended.

The prepared statements for creating, reading, and deleting orders are as follows:

PreparedStatement prepared_insert = Cassandra.session.prepare(
"insert into broker.orders (symbol, orderid, buyorsell, customerid, limitthreshold, location, ordertype, quantity, starttime) values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
                 
PreparedStatement prepared_select = Cassandra.session.prepare(
        "select * from broker.orders where symbol = ?");

PreparedStatement prepared_delete = Cassandra.session.prepare(
        "delete from broker.orders where symbol = ? and orderid = ?");

I implemented a simplified “Place limit or stop order” operation (see Part 3), which uses the prepared_insert statement to create each new order, initially in the Cassandra Data Center local to the Broker where the order was created from, which is then automatically replicated in the other Cassandra Data Center. I also implemented the “Trade Matching Order” operation (Part 3), which uses the prepared_select statement to query orders matching each incoming Stock Ticker, checks the rules, and then if a trade is filled deletes the order.

Deployment

I created a 3 node Cassandra cluster in the Sydney AWS region, and then added another identical Data Center in the North Virginia AWS regions using Instaclustr Managed Cassandra for AWS. This gave me 6 nodes in total, running on t3.small instances (5 GB SSD, 2GB RAM, 2 CPU Cores). This is a small developer sized cluster, but is adequate for a prototype, and very affordable (2 cents an hour per node for AWS costs) given that the Brokers are currently only single threaded so don’t produce much load. We’re more interested in latency at this point of the experiment, and we may want to increase the number of Data Centers in the future. I also spun up an EC2 instance (t3a.micro) in the same AWS regions, and deployed an instance of the Stock Broker on each (it only used 20% CPU). Here’s what the complete deployment looks like:

3. The Results

For the prototype, the focus was on demonstrating that the design goal of minimizing latency for trading stop and limit orders (asynchronous trades) was achieved. For the prototype, the latency for these order types is measured from the time of receiving a Stock Ticker, to the time an Order is filled. We ran a Broker in each AWS region concurrently for an hour, with the same workload for each, and measured the average and maximum latencies. For the first configuration, each Broker is connected to its local Cassandra Data Center, which is how it would be configured in practice. The results were encouraging, with an average latency of 3ms, and a maximum of 60ms, as shown in this graph.  

During the run, across both Brokers, 300 new orders were created each second, 600 stock tickers were received each second, and 200 trades were carried out each second. 

Given that I hadn’t implemented Market Orders yet, I wondered how I could configure and approximately measure the expected latency for these synchronous order types between different regions (i.e. Sydney and North Virginia)? The latency for Market orders in the same region will be comparable to the non-market orders. The solution turned out to be simple— just re-configure the Brokers to use the remote Cassandra Data Center, which introduces the inter-region round-trip latency which would also be encountered with Market Orders placed on one region and traded immediately in the other region. I could also have achieved a similar result by changing the consistency level to EACH_QUOROM (which requires a majority of nodes in each data center to respond). Not surprisingly, the latencies were higher, rising to 360ms average, and 1200ms maximum, as shown in this graph with both configurations (Stop and Limit Orders on the left, and Market Orders on the right):

So our initial experiments are a success, and validate the primary design goal, as asynchronous stop and limit Orders can be traded with low latency from the Broker nearest the relevant stock exchanges, while synchronous Market Orders will take significantly longer due to inter-region latency. 

Write Amplification

I wondered what else can be learned from running this experiment? We can understand more about resource utilization in multi-DC Cassandra clusters. Using the Instaclustr Cassandra Console, I monitored the CPU Utilization on each of the nodes in the cluster, initially with only one Data Center and one Broker, and then with two Data Centers and a single Broker, and then both Brokers running. It turns out that the read load results in 20% CPU Utilization on each node in the local Cassandra Data Center, and the write load also results in 20% locally.  Thus, for a single Data Center cluster the total load is 40% CPU. However, with two Data Centers things get more complex due to the replication of the local write loads to each other Data Center. This is also called “Write Amplification”.

The following table shows the measured total load for 1 and 2 Data Centers, and predicted load for up to 8 Data Centers, showing that for more than 3 Data Centers you need bigger nodes (or bigger clusters). A four CPU Core node instance type would be adequate for 7 Data Centers, and would result in about 80% CPU Utilization.  

  Number of Data  Centres Local Read Load Local Write Load Remote Write Load Total Write Load Total Load
  1 20 20 0 20 40
  2 20 20 20 40 60
  3 20 20 40 60 80
  4 20 20 60 80 100
  5 20 20 80 100 120
  6 20 20 100 120 140
  7 20 20 120 140 160
  8 20 20 140 160 180

Costs

The total cost to run the prototype includes the Instaclustr Managed Cassandra nodes (3 nodes per Data Center x 2 Data Centers = 6 nodes), the two AWS EC2 Broker instances, and the data transfer between regions (AWS only charges for data out of a region, not in, but the prices vary depending on the source region). For example, data transfer out of North Virginia is 2 cents/GB, but Sydney is more expensive at 9.8 cents/GB. I computed the total monthly operating cost to be $361 for this configuration, broken down into $337/month (93%) for Cassandra and EC2 instances, and $24/month (7%) for data transfer, to process around 500 million stock trades. Note that this is only a small prototype configuration, but can easily be scaled for higher throughputs (with incrementally and proportionally increasing costs).

Conclusions

In this blog we built and experimented with a prototype of the globally distributed stock broker application, focussing on testing the multi-DC Cassandra part of the system which enabled us to significantly reduce the impact of planetary scale latencies (from seconds to low milliseconds) and ensure greater redundancy (across multiple AWS regions), for the real-time stock trading function. Some parts of the application remain as stubs, and in future blogs I aim to replace them with suitable functionality (e.g. streaming, analytics) and non-functionality (e.g. failover) from a selection of Kafka, Elasticsearch and maybe even Redis!

The post Building a Low-Latency Distributed Stock Broker Application: Part 4 appeared first on Instaclustr.

Understanding the Impacts of the Native Transport Requests Change Introduced in Cassandra 3.11.5

Summary

Recently, Cassandra made changes to the Native Transport Requests (NTR) queue behaviour. Through our performance testing, we found the new NTR change to be good for clusters that have a constant load causing the NTR queue to block. Under the new mechanism the queue no longer blocks, but throttles the load based on queue size setting, which by default is 10% of the heap.

Compared to the Native Transport Requests queue length limit, this improves how Cassandra handles traffic when queue capacity is reached. The “back pressure” mechanism more gracefully handles the overloaded NTR queue, resulting in a significant lift of operations without clients timing out. In summary, clusters with later versions of Cassandra can handle more load before hitting hard limits.

Introduction

At Instaclustr, we are responsible for managing the Cassandra versions that we release to the public. This involves performing a review of Cassandra release changes, followed by performance testing. In cases where major changes have been made in the behaviour of Cassandra, further research is required. So without further delay let’s introduce the change to be investigated.

Change:
  • Prevent client requests from blocking on executor task queue (CASSANDRA-15013)
Versions affected:

Background

Native Transport Requests

Native transport requests (NTR) are any requests made via the CQL Native Protocol. CQL Native Protocol is the way the Cassandra driver communicates with the server. This includes all reads, writes, schema changes, etc. There are a limited number of threads available to process incoming requests. When all threads are in use, some requests wait in a queue (pending). If the queue fills up, some requests are silently rejected (blocked). The server never replies, so this eventually causes a client-side timeout. The main way to prevent blocked native transport requests is to throttle load, so the requests are performed over a longer period.

Prior to 3.11.5

Prior to 3.11.5, Cassandra used the following configuration settings to set the size and throughput of the queue:

  • native_transport_max_threads is used to set the maximum threads for handling requests.  Each thread pulls requests from the NTR queue.
  • cassandra.max_queued_native_transport_requests is used to set queue size. Once the queue is full the Netty threads are blocked waiting for the queue to have free space (default 128).

Once the NTR queue is full requests from all clients are not accepted. There is no strict ordering by which blocked Netty threads will process requests. Therefore in 3.11.4 latency becomes random once all Netty threads are blocked.

Native Transport Requests - Cassandra 3.11.4

Change After 3.11.5

In 3.11.5 and above, instead of blocking the NTR queue as previously described, it throttles. The NTR queue is throttled based on the heap size. The native transport requests are limited in terms of total size occupied in memory rather than the number of them. Requests are paused after the queue is full.

  • native_transport_max_concurrent_requests_in_bytes a global limit on the number of NTR requests, measured in bytes. (default heapSize / 10)
  • native_transport_max_concurrent_requests_in_bytes_per_ip an endpoint limit on the number of NTR requests, measured in bytes. (default heapSize / 40)

Maxed Queue Behaviour

From previously conducted performance testing of 3.11.4 and 3.11.6 we noticed similar behaviour when the traffic pressure has not yet reached the point of saturation in the NTR queue. In this section, we will discuss the expected behaviour when saturation does occur and breaking point is reached. 

In 3.11.4, when the queue has been maxed, client requests will be refused. For example, when trying to make a connection via cqlsh, it will yield an error, see Figure 2.

Cassandra 3.11.4 - queue maxed out, client requests refused
Figure 2: Timed out request

Or on the client that tries to run a query, you may see NoHostAvailableException

Where a 3.11.4 cluster previously got blocked NTRs, when upgraded to 3.11.6 NTRs are no longer blocked. The reason is that 3.11.6 doesn’t place a limit on the number of NTRs but rather on the size of memory of all those NTRs. Thus when the new size limit is reached, NTRs are paused. Default settings in 3.11.6 result in a much larger NTR queue in comparison to the small 128 limit in 3.11.4 (in normal situations where the payload size would not be extremely large).

Benchmarking Setup

This testing procedure requires the NTR queue on a cluster to be at max capacity with enough load to start blocking requests at a constant rate. In order to do this we used multiple test boxes to stress the cluster. This was achieved by using 12 active boxes to create multiple client connections to the test cluster. Once the cluster NTR queue is in constant contention, we monitored the performance using:

  • Client metrics: requests per second, latency from client perspective
  • NTR Queue metrics: Active Tasks, Pending Tasks, Currently Blocked Tasks, and Paused Connections.

For testing purposes we used two testing clusters with details provided in the table below:

Cassandra Cluster size Instance Type Cores RAM Disk
3.11.4 3 M5xl-1600-v2  4 16GB 1600 GB
3.11.6 3 m5xl-1600-v2 4 16GB 1600 GB
Table 1: Cluster Details

To simplify the setup we disabled encryption and authentication. Multiple test instances were set up in the same region as the clusters. For testing purposes we used 12 KB blob payloads. To give each cluster node a balanced mixed load, we kept the number of test boxes generating write load equal to the number of instances generating read load. We ran the load against the cluster for 10 mins to temporarily saturate the queue with read and write requests and cause contention for the Netty threads.

Our test script used cassandra-stress for generating the load, you can also refer to Deep Diving cassandra-stress – Part 3 (Using YAML Profiles) for more information.

In the stressSpec.yaml, we used the following table definition and queries:

table_definition: |
 CREATE TABLE typestest (
       name text,
       choice boolean,
       date timestamp,
       address inet,
       dbl double,
       lval bigint,
               ival int,
       uid timeuuid,
       value blob,
       PRIMARY KEY((name,choice), date, address, dbl, lval, ival, uid)
 ) WITH compaction = { 'class':'LeveledCompactionStrategy' }
   AND comment='A table of many types to test wide rows'
 
columnspec:
 - name: name
   size: fixed(48)
   population: uniform(1..1000000000) # the range of unique values to select for the field 
 - name: date
   cluster: uniform(20..1000)
 - name: lval
   population: gaussian(1..1000)
   cluster: uniform(1..4)
 - name: value
   size: fixed(12000)
 
insert:
 partitions: fixed(1)       # number of unique partitions to update in a single operation
                                 # if batchcount > 1, multiple batches will be used but all partitions will
                                 # occur in all batches (unless they finish early); only the row counts will vary
 batchtype: UNLOGGED               # type of batch to use
 select: uniform(1..10)/10       # uniform chance any single generated CQL row will be visited in a partition;
                                 # generated for each partition independently, each time we visit it
 
#
# List of queries to run against the schema
#
queries:
  simple1:
     cql: select * from typestest where name = ? and choice = ? LIMIT 1
     fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
  range1:
     cql: select name, choice, uid  from typestest where name = ? and choice = ? and date >= ? LIMIT 10
     fields: multirow            # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
  simple2:
     cql: select name, choice, uid from typestest where name = ? and choice = ? LIMIT 1
     fields: samerow             # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)

Write loads were generated with:

cassandra-stress user no-warmup 'ops(insert=10)' profile=stressSpec.yaml cl=QUORUM duration=10m -mode native cql3 maxPending=32768 connectionsPerHost=40 -rate threads=2048 -node file=node_list.txt

Read loads were generated by changing ops to

ops(simple1=10,range1=1)'

Comparison

3.11.4 Queue Saturation Test

The active NTR queue reached max capacity (at 128) and remained in contention under load. Pending NTR tasks remained above 128 throughout. At this point, timeouts were occurring when running 12 load instances to stress the cluster. Each node had 2 load instances performing reads and another 2 performing writes. 4 of the read load instances constantly logged NoHostAvailableExceptions as shown in the example below.

ERROR 04:26:42,542 [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: ec2-18-211-4-255.compute-1.amazonaws.com/18.211.4.255:9042 (com.datastax.driver.core.exceptions.OperationTimedOutException: [ec2-18-211-4-255.compute-1.amazonaws.com/18.211.4.255] Timed out waiting for server response), ec2-35-170-231-79.compute-1.amazonaws.com/35.170.231.79:9042 (com.datastax.driver.core.exceptions.OperationTimedOutException: [ec2-35-170-231-79.compute-1.amazonaws.com/35.170.231.79] Timed out waiting for server response), ec2-35-168-69-19.compute-1.amazonaws.com/35.168.69.19:9042 (com.datastax.driver.core.exceptions.OperationTimedOutException: [ec2-35-168-69-19.compute-1.amazonaws.com/35.168.69.19] Timed out waiting for server response))

The client results we got from this stress run are shown in Table 2.

Box Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
1 700.00 2,862.20 2,078.30 7,977.60 11,291.10 19,495.10 34,426.80
2 651.00 3,054.50 2,319.50 8,048.90 11,525.90 19,528.70 32,950.50
3 620.00 3,200.90 2,426.40 8,409.60 12,599.70 20,367.50 34,158.40
4 607.00 3,312.80 2,621.40 8,304.70 11,769.20 19,730.00 31,977.40
5 568.00 3,529.80 3,011.50 8,216.60 11,618.20 19,260.20 32,698.80
6 553.00 3,627.10 3,028.30 8,631.90 12,918.50 20,115.90 34,292.60
Writes 3,699.00 3,264.55 2,580.90 8,264.88 11,953.77 19,749.57 34,426.80
7 469.00 4,296.50 3,839.90 9,101.60 14,831.10 21,290.30 35,634.80
8 484.00 4,221.50 3,808.40 8,925.50 11,760.80 20,468.20 34,863.10
9 Crashed due to time out
10 Crashed due to time out
11 Crashed due to time out
12 Crashed due to time out
Reads 953.00 4,259.00 3,824.15 9,092.80 14,800.40 21,289.48 35,634.80
Summary 4,652.00 3,761.78 3,202.53 8,678.84 13,377.08 20,519.52 35,634.80
Table 2: 3.11.4 Mixed Load Saturating The NTR Queue

* To calculate the total write operations, we summed the values from 6 instances. For max write latency we used the max value from all instances and for the rest of latency values, we calculated the average of results. Write results are summarised in the Table 2 “Write” row. For the read result we did the same, and results are recorded in the “Read” row. The last row in the table summarises the results in “Write” and “Read” rows.

The 6 write load instances finished normally, but the read instances struggled. Only 2 of the read load instances were able to send traffic through normally, the other clients received too many timeout errors causing them to crash. Another observation we have made is that the Cassandra timeout metrics, under client-request-metrics, did not capture any of the client timeout we have observed.

Same Load on 3.11.6

Next, we proceeded to test 3.11.6 with the same load. Using the default NTR settings, all test instances were able to finish the stress test successfully.

Box Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
1 677.00 2,992.60 2,715.80 7,868.50 9,303.00 9,957.30 10,510.90
2 658.00 3,080.20 2,770.30 7,918.80 9,319.70 10,116.70 10,510.90
3 653.00 3,102.80 2,785.00 7,939.80 9,353.30 10,116.70 10,510.90
4 608.00 3,340.90 3,028.30 8,057.30 9,386.90 10,192.20 10,502.50
5 639.00 3,178.30 2,868.90 7,994.30 9,370.10 10,116.70 10,510.90
6 650.00 3,120.50 2,799.70 7,952.40 9,353.30 10,116.70 10,510.90
Writes 3,885.00 3,135.88 2,828.00 7,955.18 9,347.72 10,102.72 10,510.90
7 755.00 2,677.70 2,468.30 7,923.00 9,378.50 9,982.40 10,762.60
8 640.00 3,160.70 2,812.30 8,132.80 9,529.50 10,418.70 11,031.00
9 592.00 3,427.60 3,101.70 8,262.80 9,579.80 10,452.20 11,005.90
10 583.00 3,483.00 3,160.40 8,279.60 9,579.80 10,435.40 11,022.60
11 582.00 3,503.60 3,181.40 8,287.90 9,588.20 10,469.00 11,047.80
12 582.00 3,506.70 3,181.40 8,279.60 9,588.20 10,460.60 11,014.20
Reads 3,734.00 3,293.22 2,984.25 8,194.28 9,540.67 10,369.72 11,047.80
Summary 7,619.00 3,214.55 2,906.13 8,074.73 9,444.19 10,236.22 11,047.80
Table 3: 3.11.6 Mixed Load

Default Native Transport Requests (NTR) Setting Comparison

Taking the summary row from both versions (Table 2 and Table 3), we produced Table 4.

Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
3.11.4 4652 3761.775 3202.525 8678.839167 13377.08183 20519.52228 35634.8
3.11.6 7619 3214.55 2906.125 8074.733333 9444.191667 10236.21667 11047.8
Table 4: Mixed Load 3.11.4 vs 3.11.6


Figure 2: Latency 3.11.4 vs 3.11.6

Figure 2 shows the latencies from Table 4. From the results, 3.11.6 had slightly better average latency than 3.11.4. Furthermore, in the worst case where contention is high, 3.11.6 handled the latency of a request better than 3.11.4. This is shown by the difference in Latency Max. Not only did 3.11.6 have lower latency but it was able to process many more requests due to not having a blocked queue.

3.11.6 Queue Saturation Test

The default native_transport_max_concurrent_requests_in_bytes is set to 1/10 of the heap size. The Cassandra max heap size of our cluster is 8 GB, so the default queue size for our queue is 0.8 GB. This turns out to be too large for this cluster size, as this configuration will run into CPU and other bottlenecks before we hit NTR saturation.

So we took the reverse approach to investigate full queue behaviour, which is setting the queue size to a lower number. In cassandra.yaml, we added:

native_transport_max_concurrent_requests_in_bytes: 1000000

This means we set the global queue size to be throttled at 1MB. Once Cassandra was restarted and all nodes were online with the new settings, we ran the same mixed load on this cluster, the results we got are shown in Table 5.

3.11.6 Op rate (op/s) Latency mean (ms) Latency median (ms) Latency 95th percentile (ms) latency 99th percentile (ms) Latency 99.9th percentile (ms) Latency max (ms)
Write: Default setting 3,885.00 3,135.88 2,828.00 7,955.18 9,347.72 10,102.72 10,510.90
Write: 1MB setting 2,105.00 5,749.13 3,471.82 16,924.02 26,172.45 29,681.68 31,105.00
Read: Default setting 3,734.00 3,293.22 2,984.25 8,194.28 9,540.67 10,369.72 11,047.80
Read: 1MB setting 5,395.00 2,263.13 1,864.55 5,176.47 8,074.73 9,693.03 15,183.40
Summary: Default setting 7,619.00 3,214.55 2,906.13 8,074.73 9,444.19 10,236.22 11,047.80
Summary: 1MB setting 7,500.00 4,006.13 2,668.18 11,050.24 17,123.59 19,687.36 31,105.00

Table 5: 3.11.6 native_transport_max_concurrent_requests_in_bytes default and 1MB setting 

During the test, we observed a lot of paused connections and discarded requests—see Figure 3. For a full list of Instaclustr exposed metrics see our support documentation.

NTR Test - Paused Connections and Discarded Requests
Figure 3: 3.11.6 Paused Connections and Discarded Requests

After setting native_transport_max_concurrent_requests_in_bytes to a lower number, we start to get paused connections and discarded requests, write latency increased resulting in fewer processed operations, shown in Table 5. The increased write latency is illustrated Figure 4.

Cassandra 3.11.6 Write Latency Under Different Settings
Figure 4: 3.11.6 Write Latency Under Different Settings

On the other hand, read latency decreased, see Figure 5, resulting in a higher number of operations being processed.

Cassandra 3.11.6 Read Latency Under Different Settings
Figure 5: 3.11.6 Read Latency Under Different Settings
Cassandra 3.11.6 Operations Rate Under Different Settings
Figure 6: 3.11.6 Operations Rate Under Different Settings

As illustrated in Figure 6, the total number of operations decreased slightly with the 1MB setting, but the difference is very small and the effect of read and write almost “cancel each other out”. However, when we look at each type of operation individually, we can see that rather than getting equal share of the channel in a default setting of “almost unlimited queue”, the lower queue size penalizes writes and favors read. While our testing identified this outcome, further investigation will be required to determine exactly why this is the case.

Conclusion

In conclusion, the new NTR change offers an improvement over the previous NTR queue behaviour. Through our performance testing we found the change to be good for clusters that have a constant load causing the NTR queue to block. Under the new mechanism the queue no longer blocks, but throttles the load based on the amount of memory allocated to requests.

The results from testing indicated that the changed queue behaviour reduced latency and provided a significant lift in the number of operations without clients timing out. Clusters with our latest version of Cassandra can handle more load before hitting hard limits. For more information feel free to comment below or reach out to our Support team to learn more about changes to 3.11.6 or any of our other supported Cassandra versions.

The post Understanding the Impacts of the Native Transport Requests Change Introduced in Cassandra 3.11.5 appeared first on Instaclustr.

Apache Cassandra Usage Report 2020

Apache Cassandra is the open source NoSQL database for mission critical data. Today the community announced findings from a comprehensive global survey of 901 practitioners on Cassandra usage. It’s the first of what will become an annual survey that provides a baseline understanding of who, how, and why organizations use Cassandra.

“I saw zero downtime at global scale with Apache Cassandra. That’s a powerful statement to make. For our business that’s quite crucial.” - Practitioner, London

Key Themes

Cassandra adoption is correlated with organizations in a more advanced stage of digital transformation.

People from organizations that self-identified as being in a “highly advanced” stage of digital transformation were more likely to be using Cassandra (26%) compared with those in an “advanced” stage (10%) or “in process” (5%).

Optionality, security, and scalability are among the key reasons Cassandra is selected by practitioners.

The top reasons practitioners use Cassandra for mission critical apps are “good hybrid solutions” (62%), “very secure” (60%), “highly scalable” (57%), “fast” (57%), and “easy to build apps with” (55%).

A lack of skilled staff and the challenge of migration deters adoption of Cassandra.

Thirty-six percent of practitioners currently using Cassandra for mission critical apps say that a lack of Cassandra-skilled team members may deter adoption. When asked what it would take for practitioners to use Cassandra for more applications and features in production, they said “easier to migrate” and “easier to integrate.”

Methodology

Sample. The survey consisted of 1,404 interviews of IT professionals and executives, including 901 practitioners which is the focus of this usage report, from April 13-23, 2020. Respondents came from 13 geographies (China, India, Japan, South Korea, Germany, United Kingdom, France, the Netherlands, Ireland, Brazil, Mexico, Argentina, and the U.S.) and the survey was offered in seven languages corresponding to those geographies. While margin of sampling error cannot technically be calculated for online panel populations where the relationship between sample and universe is unknown, the margin of sampling error for equivalent representative samples would be +/- 2.6% for the total sample, +/- 3.3% for the practitioner sample, and +/- 4.4% for the executive sample.

To ensure the highest quality respondents, surveys include enhanced screening beyond title and activities of company size (no companies under 100 employees), cloud IT knowledge, and years of IT experience.

Rounding and multi-response. Figures may not add to 100 due to rounding or multi-response questions.

Demographics

Practitioner respondents represent a variety of roles as follows: Dev/DevOps (52%), Ops/Architect (29%), Data Scientists and Engineers (11%), and Database Administrators (8%) in the Americas (43%), Europe (32%), and Asia Pacific (12%).

Cassandra roles

Respondents include both enterprise (65% from companies with 1k+ employees) and SMEs (35% from companies with at least 100 employees). Industries include IT (45%), financial services (11%), manufacturing (8%), health care (4%), retail (3%), government (5%), education (4%), telco (3%), and 17% were listed as “other.”

Cassandra companies

Cassandra Adoption

Twenty-two percent of practitioners are currently using or evaluating Cassandra with an additional 11% planning to use it in the next 12 months.

Of those currently using Cassandra, 89% are using open source Cassandra, including both self-managed (72%) and third-party managed (48%).

Practitioners using Cassandra today are more likely to use it for more projects tomorrow. Overall, 15% of practitioners say they are extremely likely (10 on a 10-pt scale) to use it for their next project. Of those, 71% are currently using or have used it before.

Cassandra adoption

Cassandra Usage

People from organizations that self-identified as being in a “highly advanced” stage of digital transformation were more likely to be using Cassandra (26%) compared with those in an “advanced” stage (10%) in “in process” (5%).

Cassandra predominates in very important or mission critical apps. Among practitioners, 31% use Cassandra for their mission critical applications, 55% for their very important applications, 38% for their somewhat important applications, and 20% for their least important applications.

“We’re scheduling 100s of millions of messages to be sent. Per day. If it’s two weeks, we’re talking about a couple billion. So for this, we use Cassandra.” - Practitioner, Amsterdam

Cassandra usage

Why Cassandra?

The top reasons practitioners use Cassandra for mission critical apps are “good hybrid solutions” (62%), “very secure” (60%), “highly scalable” (57%), “fast” (57%), and “easy to build apps with” (55%).

“High traffic, high data environments where really you’re just looking for very simplistic key value persistence of your data. It’s going to be a great fit for you, I can promise that.” - Global SVP Engineering

Top reasons practitioners use Cassandra

For companies in a highly advanced stage of digital transformation, 58% cite “won’t lose data” as the top reason, followed by “gives me confidence” (56%), “cloud native” (56%), and “very secure” (56%).

“It can’t lose anything, it has to be able to capture everything. It can’t have any security defects. It needs to be somewhat compatible with the environment. If we adopt a new database, it can’t be a duplicate of the data we already have.… So: Cassandra.” - Practitioner, San Francisco

However, 36% of practitioners currently using Cassandra for mission critical apps say that a lack of Cassandra-skilled team members may deter adoption.

“We don’t have time to train a ton of developers, so that time to deploy, time to onboard, that’s really key. All the other stuff, scalability, that all sounds fine.” – Practitioner, London

When asked what it would take for practitioners to use Cassandra for more applications and features in production, they said “easier to migrate” and “easier to integrate.”

“If I can get started and be productive in 30 minutes, it’s a no brainer.” - Practitioner, London

Conclusion

We invite anyone who is curious about Cassandra to test the 4.0 beta release. There will be no new features or breaking API changes in future Beta or GA builds, so you can expect the time you put into the beta to translate into transitioning your production workloads to 4.0.

We also invite you to participate in a short survey about Kubernetes and Cassandra that is open through September 24, 2020. Details will be shared with the Cassandra Kubernetes SIG after it closes.

Survey Credits

A volunteer from the community helped analyze the report, which was conducted by ClearPath Strategies, a strategic consulting and research firm, and donated to the community by DataStax. It is available for use under Creative Commons Attribution-ShareAlike 4.0 International (CC BY-SA 4.0).

Improving Apache Cassandra’s Front Door and Backpressure

As part of CASSANDRA-15013, we have improved Cassandra’s ability to handle high throughput workloads, while having enough safeguards in place to protect itself from potentially going out of memory. In order to better explain the change we have made, let us understand at a high level, on how an incoming request is processed by Cassandra before the fix, followed by what we changed, and the new relevant configuration knobs available.

How inbound requests were handled before

Let us take the scenario of a client application sending requests to C* cluster. For the purpose of this blog, let us focus on one of the C* coordinator nodes.

alt_text

Below is the microscopic view of client-server interaction at the C* coordinator node. Each client connection to Cassandra node happens over a netty channel, and for efficiency purposes, each Netty eventloop thread is responsible for more than one netty channel.

alt_text

The eventloop threads read requests coming off of netty channels and enqueue them into a bounded inbound queue in the Cassandra node.

alt_text

A thread pool dequeues requests from the inbound queue, processes them asynchronously and enqueues the response into an outbound queue. There exist multiple outbound queues, one for each eventloop thread to avoid races.

alt_text

alt_text

alt_text

The same eventloop threads that are responsible for enqueuing incoming requests into the inbound queue, are also responsible for dequeuing responses off from the outbound queue and shipping responses back to the client.

alt_text

alt_text

Issue with this workflow

Let us take a scenario where there is a spike in operations from the client. The eventloop threads are now enqueuing requests at a much higher rate than the rate at which the requests are being processed by the native transport thread pool. Eventually, the inbound queue reaches its limit and says it cannot store any more requests in the queue.

alt_text

Consequently, the eventloop threads get into a blocked state as they try to enqueue more requests into an already full inbound queue. They wait until they can successfully enqueue the request in hand, into the queue.

alt_text

As noted earlier, these blocked eventloop threads are also supposed to dequeue responses from the outbound queue. Given they are in blocked state, the outbound queue (which is unbounded) grows endlessly, with all the responses, eventually resulting in C* going out of memory. This is a vicious cycle because, since the eventloop threads are blocked, there is no one to ship responses back to the client; eventually client side timeout triggers, and clients may send more requests due to retries. This is an unfortunate situation to be in, since Cassandra is doing all the work of processing these requests as fast as it can, but there is no one to ship the produced responses back to the client.

alt_text

So far, we have built a fair understanding of how the front door of C* works with regard to handling client requests, and how blocked eventloop threads can affect Cassandra.

What we changed

Backpressure

The essential root cause of the issue is that eventloop threads are getting blocked. Let us not block them by making the bounded inbound queue unbounded. If we are not careful here though, we could have an out of memory situation, this time because of the unbounded inbound queue. So we defined an overloaded state for the node based on the memory usage of the inbound queue.

We introduced two levels of thresholds, one at the node level, and the other more granular, at client IP. The one at client IP helps to isolate rogue client IPs, while not affecting other good clients, if there is such a situation.

These thresholds can be set using cassandra yaml file.

native_transport_max_concurrent_requests_in_bytes_per_ip
native_transport_max_concurrent_requests_in_bytes

These thresholds can be further changed at runtime (CASSANDRA-15519).

Configurable server response to the client as part of backpressure

If C* happens to be in overloaded state (as defined by the thresholds mentioned above), C* can react in one of the following ways:

  • Apply backpressure by setting “Autoread” to false on the netty channel in question (default behavior).
  • Respond back to the client with Overloaded Exception (if client sets “THROW_ON_OVERLOAD” connection startup option to “true.”

Let us look at the client request-response workflow again, in both these cases.

THROW_ON_OVERLOAD = false (default)

If the inbound queue is full (i.e. the thresholds are met).

alt_text

C* sets autoread to false on the netty channel, which means it will stop reading bytes off of the netty channel.

alt_text

Consequently, the kernel socket inbound buffer becomes full since no bytes are being read off of it by netty eventloop.

alt_text

Once the Kernel Socket Inbound Buffer is full on the server side, things start getting piled up in the Kernel Socket Outbound Buffer on the client side, and once this buffer gets full, client will start experiencing backpressure.

alt_text

THROW_ON_OVERLOAD = true

If the inbound queue is full (i.e. the thresholds are met), eventloop threads do not enqueue the request into the Inbound Queue. Instead, the eventloop thread creates an OverloadedException response message and enqueues it into the flusher queue, which will then be shipped back to the client.

alt_text

This way, Cassandra is able to serve very large throughput, while protecting itself from getting into memory starvation issues. This patch has been vetted through thorough performance benchmarking. Detailed performance analysis can be found here.

Apache Cassandra vs DynamoDB

In this post, we’ll look at some of the key differences between Apache Cassandra (hereafter just Cassandra) and DynamoDB.

Both are distributed databases and have similar architecture, and both offer incredible scalability, reliability, and resilience. However, there are also differences,  and understanding the differences and cost benefits can help you determine the right solution for your application.

Apache Cassandra is an open source database available at no cost from the Apache Foundation. Installing and configuring Cassandra can be challenging and there is more than one pitfall along the way. However, Cassandra can be installed on any cloud service or at a physical location you choose.

The typical Cassandra installation is a cluster which is a collection of nodes (a node is a single instance of Cassandra installed on a computer or in a Docker container). Nodes can then be grouped in racks and data centers which can be in different locations (cloud zones and regions or physical collocations). You must scale Cassandra as your demand grows and are responsible for the ongoing management tasks such as backups, replacing bad nodes, or adding new nodes to meet demand.

Amazon DynamoDB is a fully managed database as a service. All implementation details are hidden and from the user viewpoint DynamoDB is serverless. DynamoDB automatically scales throughput capacity to meet workload demands, and partitions and repartitions your data as your table size grows, and distributes data across multiple availability zones. However, the service is available only through Amazon Web Services (AWS).

Replica Configuration and Placement

NoSQL data stores like Cassandra and DynamoDB use multiple replicas (copies of data) to ensure high availability and durability. The number of replicas and their placement determines the availability of your data.

With Cassandra, the number of replicas to have per cluster—the replication factor—and their placement is configurable. A cluster can be subdivided into two or more data centers which can be located in different cloud regions or physical collocations. The nodes in a data center can be assigned to different racks that can be assigned to different zones or to different physical racks.

In contrast, with DynamoDB, Amazon makes these decisions for you. By default, data is located in a single region and is replicated to three (3) availability zones in that region. Replication to different AWS regions is available as an option. Amazon streams must be enabled for multi-region replication.

Data Model

The top level data structure in Cassandra is the keyspace which is analogous to a relational database. The keyspace is the container for the tables and it is where you configure the replica count and placement. Keyspaces contain tables (formerly called column families) composed of rows and columns. A table schema must be defined at the time of table creation.

The top level structure for DynamoDB is the table which has the same functionality as the Cassandra table. Rows are items, and cells are attributes. In DynamoDB, it’s possible to define a schema for each item, rather than for the whole table.

Both tables store data in sparse rows—for a given row, they store only the columns present in that row. Each table must have a primary key that uniquely identifies rows or items. Every table must have a primary key which has two components: 

  • A partition key that determines the placement of the data by subsetting the table rows into partitions. This key is required.
  • A key that sorts the rows within a partition. In Cassandra, this is called the clustering key while DynamoDB calls it the sort key. This key is optional.

Taken together, the primary key ensures that each row in a table is unique. 

 Differences 

  • DynamoDB limits the number of tables in an AWS region to 256.  If you need more tables, you must contact AWS support. There are no hard limits in Cassandra. The practical limit is around 500 tables.
  • DynamoDB is schemaless. Only the primary key attributes need to be defined at table creation. 
  • DynamoDB charges for read and write throughput and requires you to manage capacity for each table. Read and write throughput and associated costs must be considered when designing tables and applications. 
  • The maximum size of an item in DynamoDB is 400KB. With Cassandra, the hard limit is 2GB; the practical limit is a few megabytes.
  • In DynamoDB, the primary key can have only one attribute as the primary key and one attribute as the sort key. Cassandra allows composite partition keys and multiple clustering columns.
  • Cassandra supports counter, time, timestamp, uuid, and timeuuid data types not found in DynamoDB.

Allocating Table Capacity

Both Cassandra and DynamoDB require capacity planning before setting up a cluster. However, the approaches are different. 

To create a performant Cassandra cluster, you must first make reasonably accurate estimates of your future workloads. Capacity is allocated by creating a good data model, choosing the right hardware, and properly sizing the cluster. Increasing workloads are met by adding nodes.

With DynamoDB, capacity planning is determined by the type of the read/write capacity modes you choose. On demand capacity requires no capacity planning other than setting an upper limit on each table. You pay only for the read and write requests on the table. Capacity is measured in Read Resource Units and Write Resource Units. On demand mode is best when you have an unknown workload, unpredictable application traffic, or you prefer the ease of paying for only what you use.

With provisioned capacity, you must specify the number of reads and write throughput limits for each table at the time of creation. If you exceed these limits for a table or tables, DynamoDB will throttle queries until usage is below defined capacity. Auto-scaling will adjust your table’s provisioned capacity automatically in response to traffic changes although there is a lag between the time throttling starts and increased capacity is applied.   

The throughput limits are provisioned in units called Read Capacity Units (RCU) and Write Capacity Units (WCU); queries are throttled whenever these limits are exceeded. One read capacity unit represents one strongly consistent read per second, or two eventually consistent reads per second, for an item up to 4 KB in size. Transactional read requests require two read capacity units to perform one read per second for items up to 4 KB. If you need to read an item that is larger than 4 KB, DynamoDB must consume additional read capacity units. One write capacity unit represents one write per second for an item up to 1 KB in size. If you need to write an item that is larger than 1 KB, DynamoDB must consume additional write capacity units. Transactional write requests require 2 write capacity units to perform one write per second for items up to 1 KB. For more information, see Managing Settings on DynamoDB Provisioned Capacity Tables.

Provisioned mode is a good option if any of the following are true: 

  • You have predictable application traffic.
  • Application traffic is consistent or ramps gradually.
  • You can forecast capacity requirements to control costs.

Partitions

Both Cassandra and DynamoDB group and distribute data based on the hashed value of the partition key. Both call these grouping partitions but they have very different definitions.

In Dynamo the partition is a storage unit that has a maximum size of 10 GB. When a partition fills, DynamoDB creates a new partition and the user has no control over the process. A partition can contain items with different partition key values. When a sort key is used, all the items with the same partition key value physically close together, ordered by sort key value.

DynamoDB partitions have capacity limits of 3,000 RCU or 1,000 WCU even for on-demand tables. Furthermore, these limits cannot be increased. If you exceed the partition limits, your queries will be throttled even if you have not exceeded the capacity of the table. See Throttling and Hot Keys (below) for more information.

A Cassandra partition is a set of rows that share the same hashed partition key value.  Rows with the same partition key are stored on the same node. Rows within the partition are sorted by the clustering columns. If no clustering column was specified, the partition holds a single row. While it would not be desirable, it would be possible for an application to drive tens of thousands of reads/writes to a single partition. 

See Cassandra Data Partitioning.

Query Language 

Cassandra provides a SQL-like language called Cassandra Query Language (CQL) to access data. DynamoDB uses JSON syntax. The following table shows the syntax for the query “return all information from the Music table for the song title ‘Lullaby of Broadway’ and the artist ‘Tommy Dorsey’”

CQL DynamoDB
Request all information for the song  ‘Lullaby of Broadway‘ played by Tommy Dorsey

SELECT *

FROM Music

WHERE Artist=’Tommy Dorsey’ AND SongTitle = ‘Lullaby of Broadway

get-item {

    TableName: “Music”,

    Key: {

        “Artist”: “Tommy Dorsey”,

        “SongTitle”: “Lullaby of Broadway

    }

} 


Secondary Indexes

By default, Cassandra and DynamoDB queries can use only the primary key columns in the search condition which must include all partition key columns. Non-key columns can be used in a search by creating an index on that column.

Cassandra supports creating an index on most columns including a clustering column of a compound primary key or on the partition key itself. Creating an index on a collection or the key of a collection map is also supported. However, when used incorrectly a secondary index can hurt performance. A general rule of thumb is to index a column with low cardinality of few values and to use only with the partition key in the search clause. Because the index table is stored on each node in a cluster, a query using a secondary index can degrade performance if multiple nodes are accessed. 

DynamoDB has local secondary indexes. This index uses the same partition key as the base table but has a different sort key. Scoped to the base table partition that has the same partition key value. Local secondary indexes must be created at the same time the table is created. A maximum of 5 local secondary indexes may be created per table. 

Materialized Views versus Global Secondary Indexes 

In Cassandra, a Materialized View (MV) is a table built from the results of a query from another table but with a new primary key and new properties. Queries are optimized by the primary key definition. The purpose of a materialized view is to provide multiple queries for a single table. It is an alternative to the standard practice of creating a new table with the same data if a different query is needed. Data in the materialized view is updated automatically by changes to the source table. However, the materialized view is an experimental feature and should not be used in production.

A similar object is DynamoDB is the Global Secondary Index (GSI) which creates an eventually consistent replica of a table. The GSI are created at table creation time and each table has a limit of 20. The GSI must be provisioned for reads and writes; there are storage costs as well.

Time To Live

Time To Live (TTL) is a feature that automatically removes items from your table after a period of time has elapsed.

  • Cassandra specifies TTL as the number of seconds from the time of creating or updating a row, after which the row expires.
  • In DynamoDB, TTL is a timestamp value representing the date and time at which the item expires.
  • DynamoDB applies TTL at item level. Cassandra applies it to the column.

Consistency

Both Cassandra and DynamoDB are distributed data stores.  In a distributed system there is a tradeoff between consistency—every read receives the most recent write or an error, and availability—every request receives a (non-error) response but without the guarantee that it contains the most recent write. In such a system there are two levels possible levels of consistency:

  • Eventual consistency. This implies that all updates reach all replicas eventually. A read with eventual consistency may return stale data until all replicas are reconciled to a consistent state.
  • Strong consistency returns up-to-date data for all prior successful writes but at the cost of slower response time and decreased availability.

DynamoDB supports eventually consistent and strongly consistent reads on a per query basis. The default is eventual consistency. How it is done is hidden from the user.

 Strongly consistent reads in DynamoDB have the following issues: 

  • The data might not be available if there is a network delay or outage.
  • The operation may have higher latency.
  • Strongly consistent reads are not supported on global secondary indexes.
  • Strongly consistent reads use more throughput capacity than eventually consistent reads and therefore is more expensive. See Throttling and Hot Keys (below).

Cassandra offers tunable consistency for any given read or write operation that is configurable by adjusting consistency levels. The consistency level is defined as the minimum number of Cassandra nodes that must acknowledge a read or write operation before the operation can be considered successful. You are able to configure strong consistency for both reads and writes at a tradeoff of increasing latency.

Conflicts Resolution

In a distributed system, there is the possibility that a query may return inconsistent data from the replicas. Both Cassandra and DynamoDB resolve any inconsistencies with a “last write wins” solution but with Cassandra, every time a piece of data is written to the cluster, a timestamp is attached. Then, when Cassandra has to deal with conflicting data, it simply chooses the data with the most recent timestamp.

For DynamoDB, “last write wins” applies only to global tables and strongly consistent reads.

Security Features

Cassandra and DynamoDB provide methods for user authentication and authorization and data access permissions  Both use encryption for client and inter-node communication. DynamoDB also offers encryption at rest. Instaclustr offers encryption at rest for its Cassandra Managed Services.

Performance Issues

Consistency and Read Speed

Choosing strong consistency requires more nodes to respond to a request which increases latency.

Scans

Scans are expensive for both systems. Scans in Cassandra are slow because Cassandra has to scan all nodes in the cluster. Scans are faster in DynamoDB but are expensive because resource use is based on the amount of data read not returned to the client. If the scan exceeds your provisioned read capacity, DynamoDB will generate errors.

DynamoDB’s Issues

Auto Scaling

Amazon DynamoDB auto scaling uses the AWS Application Auto Scaling Service to dynamically adjust provisioned throughput capacity on your behalf, in response to actual traffic patterns. This enables a table or a global secondary index to increase its provisioned read and write capacity to handle sudden increases in traffic, without throttling. When the workload decreases, application auto scaling decreases the throughput so that you don’t pay for unused provisioned capacity.

  • Auto scaling does not work well with varying and bursty workloads. The table will scale up only based on consumption, triggering these alarms time after time until it reaches the desired level.
  • There can be a lag (5-15 minutes) between the time capacity is exceeded and autoscaling takes effect.
  • Capacity decreases are limited. The maximum is 27 per day (A day is defined according to the GMT time zone).
  • Tables scale based on the number of requests made, not the number of successful requests.
  • Autoscaling cannot exceed hard I/O limits for tables.

Throttling and Hot Keys

In DynamoDB, the provisioned capacity of a table is shared evenly across all the partitions in a table with the consequence that each partition has a capacity less than the table capacity. For example, a table that has been provisioned with 400 WCU and 100 RCU and had 4 partitions, each partition would have a write capacity of 100 WCU and 25 RCU.  

So if you had a query send most read/write requests to a single partition—a hot partition— and that throughput exceeded the shared capacity of that partition, your queries would be throttled even though you had unused capacity in the table. If your application creates a hot partition, your only recourse would be either to redesign the data model or to over allocate capacity to the table.

Adaptive capacity can provide up to 5 minutes of grace time by allocating unused capacity from other partitions to the “hot” one provided unused capacity is available and hard limits are not reached. The hard limits on a partition are 3,000 RCU or 1,000 WCU.

Cross-Region Replication

If you want a DynamoDB table to span regions, you’ll need to use global tables that require careful planning and implementation. The tables in all regions must use on-demand allocation or have auto scaling enabled. The tables in each region must be the same: time to live, the number of global, and local secondary indexes, and so on.

Global tables support only eventual consistency that can lead to data inconsistency. In such a conflict, DynamoDB applies a last writer wins (lww) policy. The data with the most recent timestamp is used.

Migrating an existing local table to a global table requires additional steps. First, DynamoDB Streams is enabled on the original local table to create a changelog. Then an AWS Lambda function is configured to copy the corresponding change from the original table to the new global table.

Cost Explosion 

As highlighted in Managed Cassandra Versus DynamoDB, DynamoDB’s pricing model can easily make it expensive for a fast growing company:

  • The pricing model is based on throughput, therefore costs increase as IO increases. 
  • A hot partition may require you to overprovision a table.
  • Small reads and writes are penalized because measured throughput is rounded up to the nearest 1 KB boundary for writes and 4 KB boundary for reads.
  • Writes are four times more expensive than reads.
  • Strongly consistent reads are twice the cost of eventually consistent reads. 
  • Workloads performing scans or queries can be costly because the read capacity units are calculated on the number of bytes read rather than the amount of data returned.
  • Read heavy workloads may require DynamoDB Accelerator (DAX) which is priced per node-hour consumed. A partial node-hour consumed is billed as a full hour.
  • Cross region replication incurs additional charges for the amount of data replicated by the Global Secondary Index and the data read from DynamoDB Streams.
  • DynamoDB does not distinguish between a customer-facing, production table versus tables used for development, testing, and staging.

Summary 

DynamoDB’s advantages are an easy start; absence of the database management burden; sufficient flexibility, availability, and auto scaling; in-built metrics for monitoring; encryption of data at rest.

Cassandra’s main advantages are: fast speed of writes and reads; constant availability; a SQL-like Cassandra Query Language instead of DynamoDB’s complex API; reliable cross data center replication; linear scalability and high performance. 

However, the mere technical details of the two databases shouldn’t be the only aspect to analyze before making a choice.

Cassandra versus DynamoDB costs must be compared carefully. If you have an application that mostly writes or one that relies on strong read consistency, Cassandra may be a better choice.

You will also need to know which technologies you need to supplement the database? If you need the open source like ElasticSearch, Apache Spark, or Apache Kafka, Cassandra is your choice. If you plan to make extensive use of AWS products, then it’s DynamoDB.

If you use a cloud provider other than AWS or run your own data centers, then you would need to choose Cassandra.

The post Apache Cassandra vs DynamoDB appeared first on Instaclustr.

Benchmarking Cassandra with Local Storage on Azure

Continuing our efforts in adding support for the latest generation of Azure Virtual Machines, Instaclustr is pleased to announce support for the D15v2 and L8v2 types with local storage support for Apache Cassandra clusters on the Instaclustr Managed Platform. So let’s begin by introducing the two forms of storage.

Local storage is a non-persistent (i.e. volatile) storage that is internal to the physical Hyper-V host that a VM is running on at any given time. As soon as the VM is moved to another physical host as a result of a deallocate (stop) and start command, hardware or VM crash, or Azure maintenance, the local storage is wiped clean and any data stored on it is lost.

Remote storage is persistent and comes in the form of managed (or unmanaged) disks.  These disks can have different performance and redundancy characteristics. Remote storage is generally used for any VM’s OS disk or data disks that are intended to permanently store important data. Remote storage can be detached from the VM from one physical host to another in order to preserve the data. It is stored on Azure’s storage account which VMs use to attach it via the network. The physical Hyper-V host running the VM is independent of the remote storage account, which means that any Hyper-V host or VM can mount a remotely stored disk on more than one occasion.

Remote storage has the critical advantage of being persistent, while local storage has the advantage of being faster (because it’s local to the VM – on the same physical host) and included in the VM cost. When starting an Azure VM you only pay for its OS and disks. Local storage on a VM is included in the price of the VM and often provides a much cheaper storage alternative to remote storage.

The Azure Node Types

D15_v2s

D15_v2s are the latest addition to Azure’s Dv2 series, featuring more powerful CPU and optimal CPU-to-memory configuration making them suitable for most production workloads. The Dv2-series is about 35% faster than the D-series. D15_v2s are a memory optimized VM size offering a high memory-to-CPU ratio that is great for distributed database servers like Cassandra, medium to large caches, and in-memory data stores such as Redis.

Instaclustr customers can now leverage these benefits with the release of the D15_v2 VMs, which provide 20 virtual CPU cores and 140 GB RAM backed with 1000 GB of local storage.

L8_v2s

L8_v2s are a part of the Lsv2-series featuring high throughput, low latency, and directly mapped local NVMe storage. L8s like the rest of the series were designed to cater for high throughput and high IOPS workloads including big data applications, SQL and NoSQL databases, data warehousing, and large transactional databases. Examples include Cassandra, Elasticsearch and Redis. In general, applications that can benefit from large in-memory databases are a good fit for these VMs.

L8s offer 8 virtual CPU cores and 64 GB in RAM backed by local storage space of 1388 GB. L8s provide roughly equivalent compute capacity (cores and memory) to a D13v2 at about ¾ of the price. This offers speed and efficiency at the cost of forgoing persistent storage.

Benchmarking

Prior to the public release of these new instances on Instaclustr, we conducted Cassandra benchmarking for:

VM Type
CPU Cores RAM Storage Type Disk Type
DS13v2 8 56 GB remote 2046 GiB (SSD)
D15v2 20 140 GB local 1000 GiB (SSD)
L8s w/local 8 64 GB local 1388 GiB (SSD)
L8s w/remote 8 64 GB remote 2046 GiB (SSD)

All tests were conducted using Cassandra 3.11.6

The results are designed to outline the performance of utilising local storage and the benefits it has over remote storage. Testing is split into two groups, fixed and variable testing. Each group runs two different sets of payload:

  1. Small – Smaller more frequent payloads which are quick to execute but are requested in large numbers.
  2. Medium – Larger Payloads which take longer to execute and are potentially capable of slowing Cassandra’s ability to process requests

The fixed testing procedure is:

  1. Insert data to fill disks to ~30% full.
  2. Wait for compactions to complete.
  3. Run a fixed rate of operations
  4. Run a sequence of tests consisting of read, write and mixed read/write operations.
  5. Run the tests and measure the operational latency for a fixed rate of operations. 

The variable testing procedure is:

  1. Insert data to fill disks to ~30% full.
  2. Wait for compactions to complete.
  3. Target a median latency of 10ms.
  4. Run a sequence of tests comprising read, write and mixed read/write operations. These tests were broken down into multiple types.
  5. Run the tests and measure the results including operations per second and median operation latency. 
  6. Quorum consistency for all operations.
  7. We incrementally increase the thread count and re-run the tests until we have hit the target median latency and are under the allowable pending compactions.
  8. Wait for compactions to complete after each test.

As with any generic benchmarking results the performance may vary from the benchmark for different data models or application workloads. However, we have found this to be a reliable test for comparison of relative performance that will match many practical use cases.

Comparison

When comparing performance between instance types with local and remote storage it is important to take note of the latency of the operations. The latency of read operations indicate how long a request takes to retrieve information from the disk and return it back to the client.

In the fixed small and medium read tests, local storage offered better latency results in comparison to instances with remote storage. This is more noticeable in the medium read tests, where the tests had a larger payload and required more interaction with the locally available disks. Both L8s (with local storage) and D15_v2s offered a lower latency to match the operation rate given to the cluster.

When running variable-small-read tests under a certain latency, the operation rate for D15v2 reached nearly 3 times the number of ops/s for D13v2s with remote storage. Likewise L8s with local storage out performed L8s (with remote storage). L8s (with local storage) had twice the performance and half the latency of the L8s (with remote storage). 

variable-read-small L8s w/local L8s w/remote DS13_v2 DS15_v2
Operation Rate 19,974 7,198 23,985 61,489
Latency mean (avg) 22.1 48.3 21.9 19.4
Latency medium 20.1 40.3 19.9 17.7
Latency 95th percentile 43.6 123.5 43.3 39.2
Latency 99th percentile 66.9 148.9 74.3 58.9

In the medium read tests, instances with local storage outperformed instances with remote storage. Both L8s (with local storage) and D15s had a far better ops/s and latency result, even with a significantly higher operation rate. This makes a very convincing argument for local storage over remote storage when seeking optimal performance.

variable-read-medium L8s w/local L8s w/remote DS13_v2 DS15_v2
Operation Rate 235 76 77 368
Latency mean (avg) 4.2 13 12.9 2.7
Latency medium 3.2 8.4 9.5 2.5
Latency 95th percentile 4.9 39.7 39.3 3.4
Latency 99th percentile 23.5 63.4 58.8 4.9

Looking at the writes on the other hand, D15s outperformed due to their large pool of CPU cores. While differences were less obvious in the small tests, results were more obvious in the medium tests. Further investigation will be conducted to determine why this is the case.

Benchmarking Cassandra

Based on the graph for variable medium testing, D15s outperformed in all categories. D15v2s have both a larger number of cores to outpace competition with heavy loads of writes and local storage offering faster disk intensive reads. This was additionally supported by strong performance in the mixed medium testing results.

L8s with local storage took second place, performing better than DS13v2s in the read and mixed tests. Whilst DS13v2 nodes slightly edged out L8s in writes for larger payloads. The mixed results showed a substantial difference in performance between the two with L8s taking a lead thanks to local storage providing faster disk intensive reads. 

Conclusion

Based on the results from this comparison, we find that local storage offers amazing performance results for disk intensive operations such as reads. D15v2 nodes, with their large number of cores to perform CPU intensive writes and local storage to help with disk intensive reads, offer top tier performance for any production environment.

Furthermore, L8s with local storage offer a great cost efficient solution at around ¾ of the price of D13v2s and offer a better price-performance gain notably in read operations. This is especially beneficial for a production environment which prioritizes reads over writes.

In order to assist customers in upgrading their Cassandra clusters from currently used VM types to D15v2s or L8 VM type nodes, Instaclustr technical operations team has built several tried and tested node replacement strategies to provide zero-downtime, non-disruptive migrations for our customers. Reach out to our support team if you are interested in these new instance types for your Cassandra cluster.

You can access pricing details through the Instaclustr Console when you log in, or contact our Support team.

The post Benchmarking Cassandra with Local Storage on Azure appeared first on Instaclustr.

Cassandra and Kubernetes: SIG Update and Survey

Five operators for Apache Cassandra have been created that have made it easier to run containerized Cassandra on Kubernetes. Recently the major contributors to these operators came together to discuss the creation of a community-based operator with the intent of making one that makes it easy to run C* on K8s. One of the project’s organizational goals is that the end result will eventually become part of the Apache Software Foundation or the Apache Cassandra project.

The community created a special interest group (SIG) to set goals for what the operator should do at different levels to find a path for creating a standard community-based operator. The Operator Framework suggests five maturity levels for operator capabilities starting from basic installation to auto-pilot.

Operator Capability Maturity Levels

(Source: OperatorFramework.io)

The five Cassandra Kubernetes operators all come from different backgrounds, so the first major goal is to develop a common understanding as to what an operator needs to do and at which level. This first step involves collaborating on a Custom Resource Definition (CRD) that will set the syntax / schema which will be used to create Cassandra clusters on Kubernetes. Once this is done, a software extension can be developed in a variety of languages including Go, Java, or using the Operator SDK in Helm or Ansible without making changes to Kubernetes.

We’re not starting from zero, as the creators of the five operators are actively participating in the SIG. Hopefully much of the decided upon CRD will have code fragments that can be leveraged from the other projects. The major operators out publicly today are those by Sky UK, Orange Telecom, Instaclustr, Elassandra, and DataStax (list sourced from the awesome-cassandra project):

  • CassKop - Cassandra Kubernetes Operator - This Kubernetes operator by Orange automates Cassandra operations such as deploying a new rack aware cluster, adding/removing nodes, configuring the C and JVM parameters, upgrading JVM and C versions. Written in Go. This one was also one of the first ones out and is the only one that can support multiple Kubernetes clusters using Multi-CassKop
  • Cassandra Operator - A Kubernetes operator by SkyUK that manages Cassandra clusters inside Kubernetes. Well designed and organized. This was among the first operators to be released.
  • Instaclustr - Kubernetes Operator for Cassandra operator - The Cassandra operator by Instaclustr manages Cassandra clusters deployed to Kubernetes and automates tasks related to operating an Cassandra cluster.
  • Cass Operator - DataStax’s Kubernetes Operator supports Apache Cassandra as well as DSE containers on Kubernetes. Cassandra configuration is managed directly in the CRD, and Cassandra nodes are managed via a RESTful management API.
  • Elassandra Operator - The Elassandra Kubernetes Operator automates the deployment and management of Elassandra clusters deployed in multiple Kubernetes clusters.

If you’re interested in catching up on what the SIG has been talking about, you can watch the YouTube videos of the sessions and read up on the working documents:

As with any Kubernetes operator, the goal is to create a robot which takes the manual work of setting up complex configurations of containers in Kubernetes easier. An operator can also be seen as a translator between the logical concepts of the software and the concrete Kubernetes resources such as nodes, pods, services. Combined with controllers, operators can abstract out operations such that the human operators can focus on problems related to their industry or domain. As mentioned above, the different operator capability levels offer a roadmap to creating a robust operator for Cassandra users that is easy to use, set up, maintain and upgrade, and expand a cluster.

When a platform needs Cassandra, it’s probably exhausted the other potential datastores available because it needs high availability and fault tolerance, at high speeds, around the world. Kubernetes is a technology that can match well with Cassandra’s capabilities because it shares the features of being linearly scalable, vendor neutral, and cloud agnostic. There is a healthy debate about whether Cassandra belongs in Kubernetes — and whether databases belong in Kubernetes at all — because other orchestration tools are good enough, though the growing user base of Kubernetes in hobby and commercial realms suggests that we need to provide an operator that can keep up with the demand.

Most likely if someone is thinking about moving Cassandra workloads from public cloud, on-premises VMs, or even on-premises bare metal servers to either a public or private cloud hosted K8s, they’ll want to evaluate whether or not the existing architecture could run and be performant.

As part of the SIG, we’re also coming up with reference architectures on which to test the operator. Here are some of the common and most basic reference architectures that are likely candidates.

  • Single Workload in Single Region
    • 1 DCs in 1 region, with 3 nodes (3 total)
    • DC expands to 6 (6 total)
    • DC contracts to 3 ( 3 total)

Single Workload / Datacenter in a Single Region

  • Multi-Workload in Single Region
    • 2 DCs, both in the same region, with 3 nodes in each DC (6 total)
    • Both DCs expand to 6 each (12 total)
    • Both DCs contract to 3 each ( 6 total)
    • Add a third DC in the same region with 3 nodes (9 nodes)
    • Remove third DC

Multiple Workloads / Datacenters in a Single Region

  • Single Workload in Multi-Regions
    • 2 DCs, 1 in each region, with 3 nodes in each DC (6 total)
    • Both DCs expand to 6 each (12 total)
    • Both DCs contract to 3 each ( 6 total)
    • Add a third DC in a 3rd region with 3 nodes (9 total)
    • Remove third DC

Although each organization is different, these scenarios or combinations of these scenarios account for 80% of most pure Apache Cassandra use cases. The SIG would love to know more about Cassandra users’ use cases for Kubernetes. Please take this short survey, which will remain open through September 17, 2020.

Join the biweekly meetings to stay informed.

Apache Cassandra 4.0 Beta Released

The beta release of Apache Cassandra 4.0 is finally here, it’s been two years in the making. We’ve had a preview release available to customers since March for testing. A wide range of improvements have been made.

Stability

The explicit goal of this release has been to be “the most stable major release ever” to accelerate adoption within the release cycle, which I blogged about in January.  For this release a series of new testing frameworks were implemented focusing on stability, and performance, which have paid off handsomely. The feeling of the team at Instaclustr is that we have never been more confident about a release (and we’ve seen quite a few!)

Performance

This release integrates the async event-driven networking code from Netty for communication between nodes. I blogged about Netty in February but it’s worth reiterating what’s been achieved with this upgrade. It has enabled Cassandra 4.0 to have a single thread pool for all connections to other nodes instead of maintaining N threads per peer which was cramping performance by causing lots of context switching. It has also facilitated zero copy streaming for SStables which now goes x5 times faster than before.

This complete overhaul of the networking infrastructure has delivered some serious gains.

  • Tail end latency has been reduced by 40%+ in P99s in initial testing
  • Node recovery time has been vastly reduced
  • Scaling large clusters is easier and faster

Auditing and Observability

Cassandra 4.0 introduces a powerful new set of enterprise class audit capabilities that I covered here in March. These help Cassandra operators meet their SOX and PCI requirements with a robust high level interface. Audit logging saves to the node, outside of the database, with configurable log rollover. Audit logs can be configured to attend to particular keyspaces, commands, or users and they can be inspected with the auditlogviewer utility.

Full Query Logging is also supported and the fqltool allows inspection of these logs.

Virtual Tables

Virtual tables, which I covered here in February, enable a series of metrics to be pulled from a node via CQL from read-only tables. This is a more elegant mechanism than JMX access as it avoids the additional configuration required. JMX access is not going anywhere soon, but this presents a really solid improvement to a number of metric monitoring tasks.

Community

Our community is our most powerful feature in all our releases and I can’t think of a better validation of open source under the Apache Foundation community model than this release. I just want to take this opportunity to congratulate and thank everyone in the community who have taken both Cassandra and its release processes to the next level with this beta release. 

As always, you can spin up a free trial of Cassandra on our platform. Even with the performance gains delivered in this release our popular Cassandra Data Modeling Guide to Best Practices is always worth a read to get the most out of Cassandra.

The post Apache Cassandra 4.0 Beta Released appeared first on Instaclustr.

Introducing Apache Cassandra 4.0 Beta: Battle Tested From Day One

This is the most stable Apache Cassandra in history; you should start using Apache Cassandra 4.0 Beta today in your test and QA environments, head to the downloads site to get your hands on it. The Cassandra community is on a mission to deliver a 4.0 GA release that is ready to be deployed to production. You can guarantee this holds true by running your application workloads against the Beta release and contributing to the community’s validation effort to get Cassandra 4.0 to GA.

With over 1,000 bug fixes, improvements and new features and the project’s wholehearted focus on quality with replay, fuzz, property-based, fault-injection, and performance tests on clusters as large as 1,000 nodes and with hundreds of real world use cases and schemas tested, Cassandra 4.0 redefines what users should expect from any open or closed source database. With software, hardware, and QA testing donations from the likes of Instaclustr, iland, Amazon, and Datastax, this release has seen an unprecedented cross-industry collaboration towards releasing a battle-tested database with enterprise security features and an understanding of what it takes to deliver scale in the cloud.

There will be no new features or breaking API changes in future Beta or GA builds. You can expect the time you put into the beta to translate into transitioning your production workloads to 4.0 in the near future.

Quality in distributed infrastructure software takes time and this release is no exception. Open source projects are only as strong as the community of people that build and use them, so your feedback is a critical part of making this the best release in project history; share your thoughts on the user or dev mailing lists or in the #cassandra ASF slack channel.

Redefining the elasticity you should expect from your distributed systems with Zero Copy Streaming

5x faster scaling operations

Cassandra streams data between nodes during scaling operations such as adding a new node or datacenter during peak traffic times. Thanks to the new Zero Copy Streaming functionality in 4.0, this critical operation is now up to 5x faster without vnodes compared to previous versions, which means a more elastic architecture particularly in cloud and Kubernetes environments.

Globally distributed systems have unique consistency caveats and Cassandra keeps the data replicas in sync through a process called repair. Many of the fundamentals of the algorithm for incremental repair were rewritten to harden and optimize incremental repair for a faster and less resource intensive operation to maintain consistency across data replicas.

Giving you visibility and control over what’s happening in your cluster with real time Audit Logging and Traffic Replay

Enterprise-grade security & observability

To ensure regulatory and security compliance with SOX, PCI or GDPR, it’s critical to understand who is accessing data and when they are accessing it. Cassandra 4.0 delivers a long awaited audit logging feature for operators to track the DML, DDL, and DCL activity with minimal impact to normal workload performance. Built on the same underlying implementation, there is also a new fqltool that allows the capture and replay of production workloads for analysis.

There are new controls to enable use cases that require data access on a per data center basis. For example, if you have a data center in the United States and a data center in Europe, you can now configure a Cassandra role to only have access to a single data center using the new CassandraNetworkAuthorizer.

For years, the primary way to observe Cassandra clusters has been through JMX and open source tools such as Instaclustr’s Cassandra Exporter and DataStax’s Metrics Collector. In this most recent version of Cassandra you can selectively expose system metrics or configuration settings via Virtual Tables that are consumed like any other Cassandra table. This delivers flexibility for operators to ensure that they have the signals in place to keep their deployments healthy.

Looking to the future with Java 11 support and ZGC

One of the most exciting features of Java 11 is the new Z Garbage Collector (ZGC) that aims to reduce GC pause times to a max of a few milliseconds with no latency degradation as heap sizes increase. This feature is still experimental and thorough testing should be performed before deploying to production. These improvements significantly improve the node availability profiles from garbage collection on a cluster which is why this feature has been included as experimental in the Cassandra 4.0 release.

Part of a vibrant and healthy ecosystem

The third-party ecosystem has their eyes on this release and a number of utilities have already added support for Cassandra 4.0. These include the client driver libraries, Spring Boot and Spring Data, Quarkus, the DataStax Kafka Connector and Bulk Loader, The Last Pickle’s Cassandra Reaper tool for managing repairs, Medusa for handling backup and restore, the Spark Cassandra Connector, The Definitive Guide for Apache Cassandra, and the list goes on.

Get started today

There’s no doubt that open source drives innovation and the Cassandra 4.0 Beta exemplifies the value in a community of contributors that run Cassandra in some of the largest deployments in the world.

To put it in perspective, if you use a website or a smartphone today, you’re probably touching a Cassandra-backed system.

To download the Beta, head to the Apache Cassandra downloads site.

Resources:

Apache Cassandra Blog: Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

The Last Pickle Blog: Incremental Repair Improvements in Cassandra 4

Apache Cassandra Blog: Audit Logging in Apache Cassandra 4.0

The Last Pickle Blog: Cassandra 4.0 Data Center Security Enhancements

The Last Pickle Blog: Virtual tables are coming in Cassandra 4.0

The Last Pickle Blog: Java 11 Support in Apache Cassandra 4.0

Apache Cassandra Infographic

Cassandra Data Modeling Best Practices Guide

Apache Cassandra is an open source non-relational, or NoSQL, distributed database that enables continuous availability, tremendous scale, and data distribution across multiple data centers and cloud availability zones. Simply put, it provides a highly reliable data storage engine for applications requiring immense scale.

Data modeling is a process used to analyze, organize, and understand the data requirements for a product or service. Data modeling creates the structure your data will live in. It defines how things are labeled and organized, and determines how your data can and will be used. The process of data modeling is similar to designing a house. You start with a conceptual model and add detail to produce the final blueprint. 

The ultimate goal of Cassandra data modeling and analysis is to develop a complete, well organized, and high performance Cassandra cluster. Following the five Cassandra data modeling best practices outlined will hopefully help you meet that goal:

  1. Cassandra is not a relational database, don’t try to model it like one
  2. Design your model to meet 3 fundamental goals for data distribution
  3. Understand the importance of the Primary Key in the overall data structure 
  4. Model around your queries but don’t forget about your data
  5. Follow a six step structured approach to building your model. 

Cassandra Is Not a Relational Database

Do not try to design a Cassandra data model like you would with a relational database. 

Query first design: You must define how you plan to access the data tables at the beginning of the data modeling process not towards the end. 

No joins or derived tables: Tables cannot be joined so if you need data from more than one table, the tables must be merged into a denormalized table.   

Denormalization: Cassandra does not support joins or derived tables so denormalization is a key practice in Cassandra table design.

Designing for optimal storage: For relational databases this is usually transparent to the designer. With Cassandra, an important goal of the design is to optimize how data is distributed around the cluster. 

Sorting is a Design Decision: In Cassandra, sorting can be done only on the clustering columns specified in the PRIMARY KEY.

The Fundamental Goals of the Cassandra Data Model

Distributed data systems, such as Cassandra, distribute incoming data into chunks called partitions.  Cassandra groups data into distinct partitions by hashing a data attribute called partition key and distributes these partitions among the nodes in the cluster. 

(A detailed explanation can be found in Cassandra Data Partitioning.)

A good Cassandra data model is one that: 

  1. Distributes data evenly across the nodes in the cluster
  2. Place limits on the size of a partition
  3. Minimizes the number of partitions returned by a query.

Distributes Data Evenly Around the Cassandra Cluster

Choose a partition key that has a high cardinality to avoid hot spots—a situation where one or a few nodes are under heavy load while others are idle.   

Limits the Size of Partitions

For performance reasons choose partition keys whose number of possible values is bounded. For optimal performance, keep the size of a partition between 10 and 100MB. 

Minimize the Number of Partitions Read by a Single Query 

Ideally, each of your queries will read a single partition. Reading many partitions at one time is expensive because each partition may reside on a different node. The coordinator (this is the node in the cluster that first receives the request) will generally need to issue separate commands to separate nodes for each partition you request. This adds overhead and increases the variation in latency. Unless the data set is small, attempting to read an entire table, that is all the partitions, fails due to a read timeout. 

Understand the Importance of the Primary Key

Every table in Cassandra must have a  set of columns called the primary key. (In older versions of Cassandra, tables were called column families). In addition to determining the uniqueness of a row, the primary key also shapes the data structure of a table. The Cassandra primary key has two parts:

Partition key: The first column or set of columns in the primary key. This is required. The hashed value of the partition key value determines where the partition will reside within the cluster.

Clustering key (aka clustering columns): Are the columns after the partition key. The clustering key is optional. The clustering key determines the default sort order of rows within a partition.  

A very important part of the design process is to make sure a partition key will: 

  1. Distribute data evenly across all nodes in a cluster.  Avoid using keys that have a very small domain of possible values, such as gender, status, school grades, and the like.  The minimum number of possible values should always be greater  than the number of nodes in the cluster.  Also, avoid using keys where the distribution of possible values is highly skewed. Using such a key will create “hotspots” on the cluster. 
  2. Have a bounded range of values. Large partitions can increase read latency and cause stress on a node during a background process called compaction. Try to keep the size of partitions under 100MB. 

Model Around Your Queries

The Cassandra Query Language (CQL) is the primary language used to communicate with a Cassandra database. In syntax and function, CQL resembles SQL which makes it easy for those who know the latter to quickly learn how to write queries for Cassandra. But there are some important differences that affect your design choices. 

A well known one is that Cassandra does not support joins or derived tables. Whenever you require data from two or more tables, you must denormalize. 

Search conditions have restrictions that also impact the design. 

  • Only primary key columns can be used as query predicates. (Note: a predicate is an operation on expressions that evaluates to TRUE, FALSE).
  • Partition key columns are limited to equality searches. Range searches can only be done on clustering columns.
  • If there are multiple partition key columns (i.e. a composite partition key), all partition columns must be included in the search condition.
  • Not all clustering columns need to be included in the search condition. But there are some restrictions: 
    • When omiting columns you must start with the rightmost column listed in the primary key definition;  
    • An equality search cannot follow a range search.

Don’t Forget About the Data

Creating a complete Cassandra data model involves more than knowing your queries. You can identify all the queries correctly but if you miss some data, your model will not be complete.  Attempting to refactor a mature Cassandra data can be an arduous task. 

Developing a good conceptual model (see below) will help identify the data your application needs. 

Take a Structured Approach to Design 

In order to create a data model that is complete and high performing, it helps to follow a big data modeling methodology for Apache Cassandra that can be summarized as: 

  1. Data Discovery (DD). This is a high level view of the data your application needs and identifies the entities (things), the attributes of the entities, and which attributes are the identifiers. This may be an iterative process as development. 
  2. Identify the Access Patterns (AP).  Identify and list the queries your application will want to perform.  You need to answer: What data needs to be retrieved together, what are the search criteria, and what are the update patterns? This also may be an iterative process. 
  3. Map data and queries (MDQ).  Maps the queries to the data identified in steps 1 and 2 to create logical tables which are high level representations of Cassandra tables.
  4. Create the physical tables (PT).  Convert the logical data model to a physical data model (PDM) by using CQL CREATE TABLE statements. 
  5. Review and Refine physical data model.  Confirm that the physical tables will meet the 3 Basic Goals for Cassandra Data Model.

Structured approach to cassandra data model design

A more detail examination of these steps can be found in an earlier Instaclustr Whitepaper: 6 Step Guide to Apache Cassandra Data Modelling

If you have worked with relational database design, some steps will be familiar because they are also in the entity-relationship (ER) model.  At the conceptual stage, it can be useful to visually represent the data model by ER diagrams using either the Chen or Information Engineering (IE) notation. The Chebotko diagram uses a notation developed by Artem Chebotko to represent data and queries at the logical and physical modeling stages. 

Cassandra Model Example

Let’s assume that we have a simple logging system with two entities: LogSource and LogMessage.  For LogSource the key attribute is sourceName.  For the entity LogMessage, the key attribute is messageID.  

The query we want to execute is:  Q1) show the message information about the 10 most recent messages for a given source. 

The primary access entity is LogSource because it contains the equality search attribute (sourceName).  We create a logical table named LogMessage_by_Source and push the attribute sourceName into it. That becomes the partition key (indicated by the K).

We need to sort by time so messageTime becomes the clustering column in  LogMessage_by_Source.  (Indicated by C↑) 

The secondary entity is LogMessage. The key attribute messageID becomes a 2nd clustering column of the primary key in  LogMessage_By_Source to ensure uniqueness of the row.  Finally, we add the remaining columns from the secondary source to complete the data needed by the query. 

An example of Cassandra data model

Data Duplication 

Data duplication refers to the number of times data must be duplicated in different tables to satisfy access patterns.   For example, if  we wanted to search for a specific message by its  unique identifier we would duplicate the data by creating a new table called LogMessage_by_ID that uses  messageID as the partition key.

Two issues can arise from duplication: 

  • Increased complexity to maintain  data integrity across multiple tables; 
  • If the data being duplicated is very large it puts size and write pressure on the database.

In a case where data duplication would cause more problems than it solves, an alternative is to duplicate only lookup keys, that is a lookup table. However, this solution requires the client perform a second query to read the secondary data. The trade-off between read performance and data maintenance cost needs to be judged in the context of the specific performance requirements of your application and such a solution would need to be benchmarked to ensure that it is a workable solution.

Materialized Views

These are objects created by a query which are copies of a base table but with a different partition key. The data between the materialized view and the base table is automatically synchronized by Cassandra. Their purpose was to make modeling to new query patterns easier and more flexible.  

Instaclustr’s advice is not to use them in Cassandra 3.x because of problems in keeping the view and the base table synchronized. The Apache Cassandra project has classified Materialized Views as an experimental feature for Cassandra 3.x. 

Summary

Cassandra Data modeling is a process used to define and analyze data requirements and access patterns on the data needed to support a business process. 

A data model helps define the problem, enabling you to consider different approaches and choose the best one.  It ensures that all necessary data is captured and stored efficiently. 

Models document important concepts and jargon, proving a basis for long-term maintenance.

Creating a Cassandra is a non-relational database.  Do not design it as you would a relational database. Don’t be afraid to denormalize data. Writes in Cassandra are relatively cheaper than for relational databases.

 The goals of a successful Cassandra Data Model are to choose a partition key that (1)  distributes data evenly across the nodes in the cluster; (2) minimizes the number of partitions read by one query, and (3) bounds the size of a partition.

Take a structured approach to your model. Your first steps are understanding your and identifying access patterns on the data. These are most critical to developing a complete model.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post Cassandra Data Modeling Best Practices Guide appeared first on Instaclustr.

Cassandra Monitoring: A Best Practice Guide

Introduction to Cassandra Monitoring

Apache Cassandra is a NoSQL database designed to provide scalability, reliability, and availability with linear performance scaling. Cassandra database is designed as a distributed system, and aims to handle big data efficiently. Refer to what-is-apache-cassandra and cassandra-architecture for more information. Note that knowledge of Cassandra architecture and basic terminology is a prerequisite to understanding Cassandra monitoring. 

Cassandra monitoring is an essential area of database operations to ensure the good health of a cluster and optimal performance. Alerting is another crucial area for production systems, and it is complementary to monitoring. Good alerting in Cassandra can be achieved by utilization of the monitoring infrastructure and relevant toolset. Alerting and monitoring help create a robust environment for any Cassandra deployment.

This blog post aims to touch all important aspects of Cassandra monitoring. We hope it provides the reader with crucial information about monitoring tools, components, and metrics.

Monitoring Terminologies

JVM Based Monitoring

Cassandra is developed in Java and is a JVM based system. Each Cassandra node runs a single Cassandra process. JVM based systems are enabled with JMX (Java Management Extensions) for monitoring and management. Cassandra exposes various metrics using MBeans which can be accessed through JMX. Cassandra monitoring tools are configured to scrape the metrics through JMX and then filter, aggregate, and render the metrics in the desired format. There are a few performance limitations in the JMX monitoring method, which are referred to later. 

The metrics management in Cassandra is performed using Dropwizard library. The metrics are collected per node in Cassandra. However, those can be aggregated by the monitoring system. 

Cassandra Monitoring

Metrics 

There are a large number of metrics exposed by Cassandra to cover all possible areas including performance, resources, communication, node, and cluster state etc. The metrics are defined with distinct types, and those can be categorized as well for operational ease.    

Metrics Types

Cassandra metrics are defined with specific data types. These types are designed to accommodate metrics representations to represent the metrics like latency, counts, and others correctly. 

The metrics types are not intuitive and you might need some time to get familiar. 

  • Gauge: A single value representing a metric at a specific point in time, e.g. value of memory allocated or a number of active tasks. 
  • Counter: Counters are the same as a gauge but are used for value comparisons. Generally, a counter is only incremented, and it is reset when the functionality gets disrupted like a node restart. An example is cache_hit count.
  • Histogram: Histogram is a count of data elements from a data stream grouped in fixed intervals. A histogram gives a statistical distribution of values. The data elements are provided over min, max, mean, median, 75th, 90th, 95th, 98th, 99th, 99.9th percentile value intervals. 
  • Timer: Timer keeps the rate of execution and histogram of duration for a metric. 
  • Latency: This is a special type to measure latency. It includes Timer and the latency is in microseconds. There is also a TotalLatency with each latency metric. The total latency is the count of latency since the beginning. The beginning means the start of a node. 
  • Meter: Meter is a unit to measure throughput. It also includes a weighted moving average for first, fifth, and fifteenth minute.

Metrics Categories

The metrics are categorised based on Cassandra domains, e.g. table, keyspace, storage, communication, JVM etc. Not all metrics should be monitored all the time, but those should be available in case required, i.e. during troubleshooting or application performance testing. 

The metrics are further subdivided in terms of broader areas like resources, network, internals, crucial data elements etc. Metrics can be represented as per topology levels like cluster level, node level, table level etc. to organize all the information. 

The categorization becomes clear as we go through specific metrics and correlate those with specific Cassandra areas.

Metrics Format

The Cassandra dropwizard metrics are specified in format below:

Dropwizard Metric Name: org.apache.cassandra.metrics.<Metric scope>.<Metric type>.<MetricName>

Mbean: org.apache.cassandra.metrics:type=<Metric type> scope=<Metric scope> name=<MetricName>

Metric Type: This is the category of metrics e.g. table, keyspace, threadpool. Do not confuse this with the data type of metrics.

Metric scope: This is the metric sub type for more granularity wherever required. The scope is hence optional. E.g. the table name or keyspace name. 

Metric name: The final metric name like LiveSSTableCount. 

Essential Metrics

Cassandra Metrics

Node Status 

The status of nodes must be monitored and alerted immediately if a node is down. Cassandra cluster’s availability directly depends on the uptime of all the nodes in the cluster. Although the anti-entropy mechanism in Cassandra helps protect data from inconsistency, there is no replacement for lost performance during a node downtime. A down node puts pressure on other nodes in the data center to handle requests and store hints. Hence, downtime for a node should be minimum. 

Cassandra operational activity requires node restart or downtime but those can be scheduled at least busy times for the cluster. This alert helps keep track of any service disruption and the need to run repair a node. A node should be repaired if it is out of the cluster for more than the hinted handoff window which is three hours by default. 

Client Request Metrics

The client requests metrics provide information about client communication in forms of read and write requests per second between the client and a coordinator node. Other than normal read and write requests, there are special types of read and write operations CAS, View, and RangeSlice which have their own set of metrics. These metrics help to track the request count, latency, failures, and a few other statistics. The basic statistic to monitor is the number of requests per seconds, i.e. throughput and request latency.

Requests Per Second

The number of requests should be aggregated per data center and per node. There could be some nodes receiving more requests as compared to other nodes. This behaviour creates extra pressure for the nodes receiving more requests. The specific requests like CAS and RangeSlice should be tracked separately for clarity. These operations are resource-intensive and have a unique effect on the nodes. The ratio of read requests to write requests is crucial to understand the type of workload. There are specific configurations to optimize a read-heavy or a write-heavy workload. 

Each cluster can handle a certain amount of client requests per second efficiently. If the number of requests exceeds the cluster capacity, it can result in undesirable results like dropped messages, inconsistency, increased latency etc. The CAS and RangeSlice request can cause increased latency. 

Uneven load on a few nodes can be handled with optimal load balancing at the driver side. The read and write latency or throughput issues caused by constant overloading should be addressed by adding more nodes to the data center and revisiting the data model if required.

Alerting: Set alerts on the number of requests threshold served per node and data center. 

Client Request Latency

Latency tracked by these metrics is the read and write latency experienced by client applications. There are various percentiles of latency, as mentioned in latency metric type. These metric types should be tracked separately as well as overall values so that there is a clear view of system performance metrics. Production systems generally have latency SLAs. The SLA on a specific or overall latency should be tracked and alerted upon the client latency.

There are various factors which affect latency including, the amount of load served by a node or cluster, system resources and tuning, GC settings and behaviour, type of requests. Troubleshooting latency issues mainly depends on the accurate investigation of the root cause. Correlating latency metrics with other metrics helps to track down root causes. Using a graph solution like Grafana for visualization is the most efficient way to sight and track issues.

Alerting: Set alerts for latency SLA thresholds if any or expected latency range.

Request Timeout and Failure 

These metrics are the number of client requests timed out or failed. Failed requests are a clear indication of errors, and those should be addressed immediately. The common causes for request failure are unavailability of data, failure to get a response from the required number of replicas, data inconsistency, and network error. Troubleshooting for error is performed using the error messages and other metrics correlation. 

Alerting: Set alerts for more than a few failure requests on production systems.

Compaction Statistics 

This group of metrics include the amount of data compacted, the number of active/completed compactions, and other relevant details. Compactions consume node resources and could consume the disk space quickly. Monitoring compactions provides a good insight into the compaction strategy used as each strategy has a unique operational footprint. Specific Cassandra operations like repairs, high volume data writes, add/remove/replace nodes etc. increase the compaction activity. It is important to monitor the compactions while performing such operations. 

A common troubleshooting method for high compaction activities and high resource consumption is to throttle the compaction rate. In some scenarios, compactions can be temporarily stopped, but it requires a lot of caution and must be re-enabled at some point to keep the SSTable count low, and read latency optimal.

Alerting: Alerting is not essential for these metrics. However, alerts can be set if there are a higher number of pending compactions sustained for longer than expected time interval.

Garbage Collector Metrics

The Garbage Collector (GC) is yet another crucial area for monitoring. The efficiency of Cassandra throughput and performance depends on the effective use of JVM resources and streamlined GC. The GC behaviour mainly depends on these factors—the garbage collector used, the workload served by Cassandra nodes, GC parameter settings, the heap size for JVM etc. A common issue with garbage collection is long GC pause or the time taken to perform garbage collection. 

The GC works well with the default settings by Cassandra, but those can be tuned if required to suit a specific workload and the number of resources. GC parameter tuning is a non-trivial task and requires knowledge of GC internals. However, sometimes the GC can be resolved by fixing the data model, changing the workload, or JVM resources. It is essential to correlate bad GC behaviour with the exact root cause before performing a remedy. Also, any change in parameters impacting GC should be monitored carefully to ensure improvements. 

Alerting: Set alert on GC pauses for more than acceptable thresholds on production systems.

Memory Metrics

The memory metrics provide JVM heap, non-heap, and total memory used by Cassandra. The JVM heap storage is used heavily for a variety of purposes by Cassandra. The non-heap memory is also used a lot by later versions of Cassandra. Monitoring the heap and overall memory gives insight into memory usage. It can be used to correlate with any issues and determine memory requirements. 

Please note, Cassandra cannot scale with an indefinite amount of memory. This boils down to the fact that JVM and GC cannot perform optimally for large heap size. The most common range of heap size for Cassandra is 8GB-32GB where the smaller size is configured with CMS GC and the larger size with G1GC.

Alerting: Set alerts to test specific memory thresholds and tuning.  

Threadpool Metrics

Cassandra works with numerous thread pools internally. This design is aimed to achieve asynchronous tasks, and it also helps to handle back pressure. Monitoring for the thread pools makes it easy to understand the internal system behaviour. It also helps to understand  specific pools under pressure with active, pending, and blocked tasks. 

The solution for constantly saturated pools generally is to provide more processing capacity to the node or the cluster. Other core issues like poor data model and query pattern also impact on the thread pools. 

Alerting: Set alerts for more than a few blocked tasks on the production system. This helps take preventive action to help avoid performance impact.

Table Metrics 

Table metrics are useful in tracking each table independently. These can be used to monitor a specific set of tables which are performance-critical or host a large volume of data. There are various metrics for each table but some of the most important are discussed here:  

Partition Size

The partition size is a crucial factor in ensuring optimal performance. Cassandra uses partitions of data as a unit of data storage, retrieval, and replication. Hence, if the partition size is larger it impacts overall performance. The ideal range of partition size is less than 10MB with an upper limit of 100MB. These values are derived from operational experience from the Cassandra community. 

The data model and table definition control the partition size. The partition key for a table determines the data to create partitions. A partition key should be designed to accumulate data only up to acceptable size limits. Unfortunately, it is not easy to replace current partitions for a table. But, if the data model is in the design phase, it is crucial to test all the table definitions for potential large partitions sizes. In the existing tables, if large partitions are a major issue, they can be addressed by complete data rewrite. This operation could be long-running, but it can solve many performance issues, and if configured correctly, it can be performed without minimal or no downtime for the table. 

Alerting: Configure alerts on large partitions for tables with unbounded partitions. An unbounded partition is where the partition grows in size with new data insertion and does not have an upper bound.

Tombstone Scanned

Tombstones are the deletion markers in Cassandra. Tombstones are produced by data deletion, and it could be performed using various means like delete queries, TTL expiry, null inserts etc. The immutable design of SSTables and compaction operations makes tombstone eviction difficult in some scenarios. Tombstone presence directly impacts read performance; its effect increases with the number of tombstones scanned per operation. This metric provides a histogram of tombstones read for a table’s queries in recent time. 

The troubleshooting for tombstone eviction can be performed using various options like revisiting the compaction strategy, major compaction, nodetool garbagecollect etc. Note that all the mentioned remedies for tombstone eviction could operate on a large set of SSTables and are non-trivial operations. The operations must be well tested before executing on production. 

Alerting: Set alerts for tombstones-scanned per read metrics for performance-sensitive tables. 

SSTable Per Read

These metrics are related to the immutable design of SSTables and read operation. The SSTables are created per table, and the data is arranged sequentially in the order it is written. This results in multiple SSTable reads to complete a single read operation. The number of SSTables read contributes to the time consumed to complete the read operation. Hence, the number of SSTables per read should be minimized. 

A good number of SSTables per read is a relative value and depends on the data volume and compaction strategy. However, as a general rule, those should be less than 10. The compaction strategy used for a table plays a crucial role in this metric. A table should be configured with optimum compaction strategy as per the table usage. Repair operation plays a role in keeping the SSTables consistent and hence also indirectly impacts this metric. All the data in Cassandra should ideally be repaired once per gc_grace_seconds cycle. 

Alerting: Set alerts for all the read performance-sensitive and high data volume tables for SSTables per read. 

Additional Metrics

It is difficult to cover all the metrics present in Cassandra in this blog post, and it is also difficult to predict the most useful ones in general. I have tried to cover the most used metrics individually. But there are still some crucial metrics which are useful for getting insight in specific Cassandra areas. Let’s look at those briefly:

Dropped Messages

Cassandra handles many forms of messages corresponding to various functions. These messages can get dropped mostly due to load or communication error etc. The dropping of messages causes data inconsistency between nodes, and if those are frequent, it can cause performance issues. It is necessary to identify the cause of dropped messages. If those occur frequently or if those are in large numbers, the system resources and data model should be revisited. Alerts should be set for an unexpected occurrence or number of dropped messages. 

Caches For Tables

Cassandra uses quite some cache, and those are configurable. The cache metrics are useful to track effective use of a particular cache. A good example is the use of row cache for frequently accessed rows in a table. If caching hot data in row cache improves the cache hits, it is a successful use of the row cache. 

Data Streaming

Streaming is used while booting up new nodes, repair operations, and during some other cluster operations. Streaming operations can move many data across a cluster and hence consume network bandwidth. The streaming metrics are useful for monitoring node activities and repairs when planned. The streaming rate can be controlled if required to spare the bandwidth for operations.

Hinted Handoff 

Hints are a part of the anti-entropy mechanism, and those try to protect nodes from data loss when those are offline. Hints are stored and transferred, so metrics related to these attributes and delivery success, failure, delays, and timeouts are exposed. 

The hints metrics are useful to monitor all hints activities. A lot of hints stored and used indicate nodes being offline where hint delays, failures indicate a network or other communication issues.

CQL and Batch 

CQL metrics include the number of statements executed of each type. The batch metrics include the number of batch statements executed. These metrics help to monitor the application activity and query semantics used. Use of logged and unlogged batches has its caveats in Cassandra, and they can cause performance penalty if not used correctly. 

System Metrics

These metrics are not exported by Cassandra but those are obtained from the OS. These metrics are equally important as the Cassandra metrics to obtain system insights. 

Disk Usage

The disk usage is subject to monitoring as Cassandra is optimized to write a lot of data in quick time. The real risk for disk fillup is from compactions. The default compaction strategy used for Cassandra is SizeTieredCompactionStrategy STCS. This strategy merges many SSTables and outputs a single SSTable. The resulting SSTable can have a size equal to the combined size of all the SSTables merged in it. Also, until a compaction operation ends, both old and new SSTables exist on disk. 

The disk space guidelines for a cluster with most tables using STCS is to utilise the disk space up to 50% and to leave the rest as a room for compactions. Generally, disk space is cheaper in cost as compared to other resources and there is no harm to keep vacant space on nodes. However, if there is limited disk space available, disk monitoring becomes even more crucial as free disk left for compactions can be reduced further than general guidelines. 

Remedy for high disk usage includes snapshot deletion as those can consume a considerable amount of space. Another method is to stop specific compaction operation; this frees space consumed by the new SSTables. The time until the compaction starts again can be utilizd to add more space. 

Alerting: Set alerts for various stages of disk usage. The alerts can be categorized for severity based on the amount of free disk space on a node. 

CPU Usage

CPU capacity in a Cassandra cluster contributes as the main processing capacity. The number of requests served by a node and the amount of data stored are the factors directly proportional to the CPU utilization. CPU utilization should be monitored to ensure the nodes are not overloaded. 

A Cassandra cluster or a single data center should have all the nodes of similar size. Those should have an equal number of CPU cores, and the CPU utilization should also be equivalent. A single node or a few nodes with high CPU is an indication of uneven load or request processing across the nodes. It is observed that Cassandra is not CPU bound in most cases. However, a cluster or data center with high CPU utilization at most times should be considered for node size upgrade. 

Alerting: Set alerts for specific levels of CPU utilization on nodes or just for a single threshold. The levels can be defined as per expected CPU load, e.g. 80%, 90%, >95% etc. 

Monitoring tools

There are various tools available to set up Cassandra monitoring. I am describing here a few popular open-source tools used widely across the Cassandra community.

Prometheus

Prometheus is a metrics tool used for handling time-series based monitoring. It has alerting capability as well, which works on the time-series metrics. Prometheus can be configured to collect Cassandra metrics from nodes as well as the system metrics of the nodes. Prometheus uses exporters which are installed on the nodes and export data to Prometheus.  

Prometheus runs with a time-series database to store metrics. The metrics are stored in the database and can be queried using promQL, a query language for Prometheus. Prometheus also runs a web UI which can be used to visualise the actual metrics, graphs, alert rules etc. 

Alertmanager is the extension used for configuring alerts. Alertmanager has various integrations available for alerting including email, slack, hipchat, pagerduty etc. Prometheus has evolved over time, and it integrates well with the dropwizard metrics library. 

Prometheus - time-series based cassandra monitoring

Grafana

Grafana is a visualisation tool which can be used to visualize any time-series metrics. Grafana has various panels to showcase the data. The most commonly used panel is a graph. A graph is used to plot incoming data against a time-series in two dimensions. 

Grafana integrates with various data sources. These sources are queried in real-time by Grafana to obtain metrics. Each Grafana panel has one or more queries configured to query a data source; the result of the query is rendered on the panel. Grafana uses Prometheus as a well-integrated data source.

Grafana - Time series metrics visualization

Cassandra Exporter

Cassandra exporter is Instaclustr’s open-source solution for collecting Cassandra metrics efficiently. It is designed to integrate with Cassandra JVM and collect and publish metrics. Hence, Cassandra exporter is a replacement for the JMX metrics. 

JMX metrics in Cassandra have performance limitations and hence can cause some issues if used on systems with a large number of nodes. The Cassandra exporter has been well tested for optimal performance monitoring. The metrics produced by Cassandra exporter are also time-series and can be readily consumed by Prometheus. Please refer to the github page for information regarding configuration and usage. 

Conclusion

Cassandra monitoring is essential to get insight into the database internals. Monitoring is a must for production systems to ensure optimal performance, alerting, troubleshooting, and debugging. There are a large number of Cassandra metrics out of which important and relevant metrics can provide a good picture of the system. 

Finally, Instaclustr has the Cassandra monitoring expertise and capability with various options. 

  • Cassandra exporter is an excellent open source tool for optimal monitoring performance on large Cassandra clusters. 
  • Instaclustr Cassandra managed service uses a comprehensive monitoring-alerting service with 24×7 support and it is a good option to outsource all Cassandra operations and it comes with a free trial.

Instaclustr Cassandra Consulting services can help you with any monitoring or other Cassandra operations.

The post Cassandra Monitoring: A Best Practice Guide appeared first on Instaclustr.

Building a Low-Latency Distributed Stock Broker Application: Part 3

In the third blog of the  “Around the World ” series focusing on globally distributed storage, streaming, and search, we build a Stock Broker Application. 

1. Place Your Bets!

The London Stock Exchange 

How did Phileas Fogg make his fortune? Around the World in Eighty Days describes Phileas Fogg in this way: 

Was Phileas Fogg rich? Undoubtedly. But those who knew him best could not imagine how he had made his fortune, and Mr Fogg was the last person to whom to apply for the information.

I wondered if he had made his fortune on the Stock Market, until I read this:

Certainly an Englishman, it was more doubtful whether Phileas Fogg was a Londoner. He was never seen on ‘Change, nor at the Bank, nor in the counting-rooms of the “City“‘

Well, even if Fogg wasn’t seen in person at the ‘Change (London Stock Exchange), by 1872 (the year the story is set), it was common to use the telegraph (the internet of the Victorian age, which features regularly in the story) to play the market.

In fact the ability of the telegraph to send and receive information faster than horses/trains/boats etc. had been used for stock market fraud as early as 1814! (The “Great Stock Exchange Fraud of 1814”). Coincidentally (or not?), the famous London Stock Exchange Forgery, also involving the telegraph, also occurred in 1872! Perhaps this explains the ambiguity around the origin of Fogg’s wealth!

What is certain is that Phileas Fogg became the subject of intense betting, and he was even listed on the London Stock Exchange (Chapter V – IN WHICH A NEW SPECIES OF FUNDS, UNKNOWN TO THE MONEYED MEN, APPEARS ON ‘CHANGE):

Not only the members of the Reform, but the general public, made heavy wagers for or against Phileas Fogg, who was set down in the betting books as if he were a race-horse. Bonds were issued, and made their appearance on ‘Change; “Phileas Fogg bonds” were offered at par or at a premium, and a great business was done in them. But five days after the article in the bulletin of the Geographical Society appeared, the demand began to subside: “Phileas Fogg” declined. They were offered by packages, at first of five, then of ten, until at last nobody would take less than twenty, fifty, a hundred!”

The 1870’s also saw the introduction of a new technological innovation in Stock Trading, the Stock Ticker Machine. Stock tickers were a special type of telegraph receiver designed to print an alphabetical company symbol and the current price of that company’s stock on a paper roll called ticker tape. This enabled stock prices to be communicated closer to real-time across vast distances, and revolutionized trading. 

Vintage Stock Ticker Machine (Source: Shutterstock)

2. Let’s Build a Stock Broker Application

Fast forward 128 years from 1872 to 2000 and technology looked a bit different. I’m taking inspiration from an earlier project I worked with from 2000-2003 at CSIRO (Australia’s national science research agency) called “StockOnline”. This was an online Stock Broker application designed to benchmark new component-based middleware technologies, including Corba and Enterprise Java (J2EE). The original version simulated traders checking their stock holdings and current stock prices, and then buying and selling stocks, resulting in revised stock holdings. The benchmark could be configured with different workload mixes, and the number of concurrent traders could be scaled up to stress the system under test. Metrics captured included the relative number, throughput, and response time of each of the operations. 

Some of the technology innovations that the project was designed to give insights into included: 

  • the use of application servers to provide easy to manage and scalable container resourcing (yes, containers are at least 20 years old); 
  • how portable the application was across multiple different vendors application servers (sort of);
  •  the impact of JVM choice and settings (lots); 
  • explicit support for component configuration (e.g. wiring components together); and 
  • deployment into containers, rich container services, and multiple persistence models to manage state and allow database portability (e.g. Container Managed Persistence vs. Bean Managed Persistence). 

At the end of the project we made the StockOnline code and documentation open source, and I recently rediscovered it and made it available on github. I was surprised to learn that Enterprise Java is still going strong and is now run by the Eclipse Foundation and called “Jakarta EE”.  Also interesting is that there is support for persistence to Cassandra

So let’s move on to the present day.

3. High-Frequency Low-Latency Trading

Modern stock markets are fast-moving, ultra-competitive environments. Orders are sent to the market by high speed networks and executed almost instantly. This makes low-latency trading a key to profitability.  Trade related latency is any delay in the time it takes for a trader to interact with the market, and includes distance related network delays, delays in receiving and acting on information, and delays caused by brokers (e.g. queuing of orders, delays interacting with the stock exchange to trade the orders, etc.). Some of the key solutions to reducing latency are broker side hosting of orders (orders are hosted on brokers and automatically traded when conditions are met), and Direct Market Access (brokers are as close as possible to stock exchanges, with super-fast network connections).

A new type of US Stock Exchange (IEX) was even created to address some of the issues around fairness of stock trading due to latency. Some brokers are able to take advantage of even small latency differences – “price snipping”, or so called “dark pools” which fill orders from within a pool rather than via public stock exchanges, to make huge profits. Although, somewhat oddly, the IEX levelled the playing field by introducing delays to ensure that no one playing the market has more up-to-date information than anyone else.

Latency is partially caused by the global location of stock exchanges. Where are stock exchanges located?  There are 60 exchanges around the world on every continent with a total value of $69 Trillion, and 16 worth more than $1 Trillion each!

4. Initial Application Design

I had already started writing a new version of StockOnline before I rediscovered the original, so the new version doesn’t share any of the original code. However, it does turn out to have similar entities, but with some significant differences to model multiple StockExchanges and Brokers.  Here’s the UML Class diagram of my new prototype code:

The first major difference is that it’s designed to model and simulate distributed stock brokers across multiple global “georegions”. We introduced the concept of georegions in blog 1 (called “latency regions”) and blog 2 (called “georegions”). A georegion is a geographic region that has at least two AWS regions (for data center redundancy), and ensures that applications within the same georegion are within 100ms latency of each other and users in the same georegion in case of failure of one region.  Here’s the map from the previous blogs showing the eight sub 100ms latency georegions that we identified (North America, South America, Atlantic, Europe, Middle East, Central Asia, East Asia, Australasia):

This means that I have to explicitly model multiple StockExchanges. Each StockExchange is in a particular location in a georegion.  Each StockExchange is responsible for listing some stocks, providing meta-data about the stocks,  publishing changes to stock prices as StockTickers, and matching buy/sell orders (i.e. executing trades).  For simplicity we assume that each stock is only listed on a single StockExchange. 

Each georegion has one or more StockBrokers which are co-located in the same georegion as some StockExchanges to ensure low-latency (and potentially redundancy).  The StockBrokers are responsible for discovering StockExchanges, all the stocks listed for trading, obtaining StockTickers to update current price data, and computing trends and longer-term Stock Statistics that inform traders making decisions about buying and selling. They are also responsible for keeping track of trader data, updating StockHoldings for Traders, keeping track of orders and trading them on the appropriate StockExchanges, and keeping records of trades (transactions). Also different to the original version (which only had a single Market Order type), I wanted to have multiple different order types including Market, Limit and Stop Orders. This is important for the latency story as market orders are traded “manually” and immediately, but Limit and Stop Orders are traded automatically when they meet specific conditions, so can be traded very quickly and in larger volumes, this is a good explanation).

We assume that traders connect to their nearest StockBroker (to reduce latency and possibly to satisfy local financial rules). There is a series of operations supported by StockBrokers for traders, and also for interacting with the StockExchanges as follows.  First let’s look at the workflow for trading Market Orders, “Place Market Order”. These are essentially synchronous and immediate trades. The trader connects to the nearest broker, gets their current StockHoldings and current StockStatistics (for their holdings and possibly for other stocks they don’t currently own). Based on this information they decide what stocks to trade, whether to buy or sell, and the quantity of stocks, and create a Market Order. The broker then processes the Market Order (which may involve sending it to another broker), and receives confirmation that the trade occurred (including price, quantity, transaction ID etc.), and finally updates the trader’s StockHoldings for the stock traded. 

The steps to “Process Market Order” are as follows.  The order is sent to a broker in the same Georegion as the StockExchange listing the stock. This broker then immediately executes the trade (buys or sells) with the StockExchange, gets the transaction details, marks the order as filled (so it isn’t processed more than once), and updates the StockHolding amounts for the trader. 

The “Execute Trade with StockExchange” assumes that a trade is always possible (at the current price) and will occur instantaneously and completely, and has the following substeps:

Market Orders are potentially a slow process due to all the synchronous steps, “think time” for the trader, and cumulative latencies due to the trader to broker, broker to broker, and broker to StockExchange communication paths.

As an alternative we also provide some asynchronous Order types: Limit and Stop. These order types are only traded when the conditions are met, but then need to be executed as quickly as possible to prevent losses in a fast moving market.

We assume that the initial steps are mostly the same as “Place Market Order”, but with the added choice of Limit of Stop Order, and the limit price and the final step (notification of eventual Trade) is asynchronous:

Once the Order placed, it is processed by sending it to the correct broker (as for Market Orders), and then that broker is responsible for continuously checking orders to see if they match:

This is done as follows (“Trade Matching Orders”) and relies on each broker receiving a stream of StockTicker updates from the StockExchanges in the same georegion. For each StockTicker the broker finds orders for that stock, and checks which orders meet the buy/sell conditions (the logic depends on the order type, if the price is rising or dropping, and if the current price is less than or greater to the order limit price). If the matching order(s) are Sell Orders then an extra step is to check that the trader still has sufficient holdings of that stock (they may have already sold some stock due to other orders being executed first). If all conditions are met then the broker initiates an immediate “Market” trade with the StockExchange as before.

The initial prototype application code is written in pure Java and just simulates the expected behaviour of the traders, brokers, and StockExchanges. It creates a specified number of georegions, brokers, StockExchanges, stocks, and traders with initial data. Then the simulation runs lots of rounds (seconds) for a specified period (e.g. a day).  Each round results in traders checking their holdings and StockStatistics, and creating orders (about ⅓ of each type, but only if the type matches the specific stock and market conditions). The orders are sent to the correct brokers. Each round the brokers receive simulated StockTickers from StockMarkets in the same georegion (using a pseudo-random walk which keeps the stock direction for the majority of the time, but occasionally changing direction). Some stocks are more volatile than others so change faster.  Each round the brokers immediately Trade Market Orders, and check the conditions and trade matching Limit or Stop Orders. 

5. Initial Simulation Results—“Time Is Money”!

The simulation computes business level metrics including number of trades, value of trades, etc., and expected end-to-end latency based on deploying brokers in 8 georegions, and using the AWS inter-region latencies from blog 1 of this series. This gives us a baseline to eventually compare the real results with. The average latency to get the current stock price from a remote StockExchange and “Process Market Orders” is 700ms (max 1.2s), which includes multiple times for intra-region and inter-region networking. The average latency for Limit and Stop “Trade Matching” Orders is shorter at 100ms (max 200ms), as it only includes times to get StockTicker updates and the time to trade;.  i.e. it doesn’t include any AWS inter-region latencies as the operation is asynchronous and processed entirely within the georegion of the broker/StockExchange (we expect this to be slightly higher in practice due to the eventual overhead of database lookups and condition checking on the broker).

So is the saying “Time Is Money!” true in the context of low latency trading, and how much money exactly? I added a calculation to the simulation to compute potential profit losses assuming high volatility in the prices of some stocks, and higher latency times to trade. Due to potentially high volumes of trades even a small profit loss per trade can add up to big losses in profit very quickly.  For one simulation run with 2,101 completed trades, the potential profit loss for the higher latency Market Orders was 0.7% of the trade value (or Market Orders), but for the lower latency Limit and Stop Orders it was significantly less at 0.1% of the trade value (for those order types). For an average order size of $20,000 this corresponds to a $140 profit loss per Market Order, compared with only $20 profit loss for each Limit and Stop Order. Over hundreds or even thousands of trades per day (typical of HFT) this would quickly add up to significant amounts of money! Moreover, to make a profit, High Frequency Trading (HFT) relies on conducting a high volume of trades to rapidly take advantage of very small movements in prices, with potentially smaller profits per trade. So it’s easy to see why minimizing latency is a worthwhile goal in Fintech applications such as this. 

6. What Next?

In the next few blogs we’ll continue our journey “Around the World” and explore how to refine the initial simple design of the application so as to deploy and run it across multiple georegions using multiple AWS regions.

Initially this will involve mapping the components to Cassandra tables on a single data center. Once it’s working correctly with a single Cassandra data center, we’ll extend it to use multiple Cassandra data centers, which will require the use of multiple keyspaces for different replication topologies (e.g. replication across pairs of data centers vs. all data centers). We’ll also work out if, and how, to load-balance the application across multiple data centers in the same georegions, and how to enable redundancy, failover, and recovery at the application level.  It’s possible that a Kubernetes federated microservices mesh framework will help in doing this. We also plan to put Kafka to use to enable streaming StockTickers, so we’ll be investigating Kafka multi data center replication. 

7. Further Resources

IBM also has a StockTrader demonstration application, and an in-depth series about deploying it using Cassandra, Kafka, Redis, and Kubernetes.

There’s an example of stock analysis using Elasticsearch (I’m planning on using Elasticsearch to analyse the stock trends, and provide some map-based visualisation of the data).

This is an interesting article on “Hacking a HFT System”!

The Original CSIRO StockOnline code and documentation is now on github.

The post Building a Low-Latency Distributed Stock Broker Application: Part 3 appeared first on Instaclustr.

Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

Streaming is a process where nodes of a cluster exchange data in the form of SSTables. Streaming can kick in during many situations such as bootstrap, repair, rebuild, range movement, cluster expansion, etc. In this post, we discuss the massive performance improvements made to the streaming process in Apache Cassandra 4.0.

High Availability

As we know Cassandra is a Highly Available, Eventually Consistent database. The way it maintains its legendary availability is by storing redundant copies of data in nodes known as replicas, usually running on commodity hardware. During normal operations, these replicas may end up having hardware issues causing them to fail. As a result, we need to replace them with new nodes on fresh hardware.

As part of this replacement operation, the new Cassandra node streams data from the neighboring nodes that hold copies of the data belonging to this new node’s token range. Depending on the amount of data stored, this process can require substantial network bandwidth, taking some time to complete. The longer these types of operations take, the more we are exposing ourselves to loss of availability. Depending on your replication factor and consistency requirements, if another node fails during this replacement operation, ability will be impacted.

Increasing Availability

To minimize the failure window, we want to make these operations as fast as possible. The faster the new node completes streaming its data, the faster it can serve traffic, increasing the availability of the cluster. Towards this goal, Cassandra 4.0 saw the addition of Zero Copy streaming. For more details on Cassandra’s zero copy implementation, see this blog post and CASSANDRA-14556 for more information.

Talking Numbers

To quantify the results of these improvements, we, at Netflix, measured the performance impact of streaming in 4.0 vs 3.0, using our open source NDBench benchmarking tool with the CassJavaDriverGeneric plugin. Though we knew there would be improvements, we were still amazed with the overall results of a five fold increase in streaming performance. The test setup and operations are all detailed below.

Test Setup

In our test setup, we used the following configurations:

  • 6-node clusters on i3.xl, i3.2xl, i3.4xl and i3.8xl EC2 instances, each on 3.0 and trunk (sha dd7ec5a2d6736b26d3c5f137388f2d0028df7a03).
  • Table schema
CREATE TABLE testing.test (
    key text,
    column1 int,
    value text,
    PRIMARY KEY (key, column1)
) WITH CLUSTERING ORDER BY (column1 ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
    AND compression = {'enabled': 'false'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
  • Data size per node: 500GB
  • No. of tokens per node: 1 (no vnodes)

To trigger the streaming process we used the following steps in each of the clusters:

  • terminated a node
  • add a new node as a replacement
  • measure the time taken to complete streaming data by the new node replacing the terminated node

For each cluster and version, we repeated this exercise multiple times to collect several samples.

Below is the distribution of streaming times we found across the clusters Benchmark results

Interpreting the Results

Based on the graph above, there are many conclusions one can draw from it. Some of them are

  • 3.0 streaming times are inconsistent and show high degree of variability (fat distributions across multiple samples)
  • 3.0 streaming is highly affected by the instance type and generally looks generally CPU bound
  • Zero Copy streaming is approximately 5x faster
  • Zero Copy streaming time shows little variability in its performance (thin distributions across multiple samples)
  • Zero Copy streaming performance is not CPU bound and remains consistent across instance types

It is clear from the performance test results that Zero Copy Streaming has a huge performance benefit over the current streaming infrastructure in Cassandra. But what does it mean in the real world? The following key points are the main take aways.

MTTR (Mean Time to Recovery): MTTR is a KPI (Key Performance Indicator) that is used to measure how quickly a system recovers from a failure. Zero Copy Streaming has a very direct impact here with a five fold improvement on performance.

Costs: Zero Copy Streaming is ~5x faster. This translates directly into cost for some organizations primarily as a result of reducing the need to maintain spare server or cloud capacity. In other situations where you’re migrating data to larger instance types or moving AZs or DCs, this means that instances that are sending data can be turned off sooner saving costs. An added cost benefit is that now you don’t have to over provision the instance. You get a similar streaming performance whether you use a i3.xl or an i3.8xl provided the bandwidth is available to the instance.

Risk Reduction: There is a great reduction in the risk due to Zero Copy Streaming as well. Since a Cluster’s recovery mainly depends on the streaming speed, Cassandra clusters with failed nodes will be able to recover much more quickly (5x faster). This means the window of vulnerability is reduced significantly, in some situations down to few minutes.

Finally, a benefit that we generally don’t talk about is the environmental benefit of this change. Zero Copy Streaming enables us to move data very quickly through the cluster. It objectively reduces the number and sizes of instances that are used to build Cassandra cluster. As a result not only does it reduce Cassandra’s TCO (Total Cost of Ownership), it also helps the environment by consuming fewer resources!

Introducing Transient Replication

Transient Replication is a new experimental feature soon to be available in 4.0. When enabled, it allows for the creation of keyspaces where replication factor can be specified as a number of copies (full replicas) and temporary copies (transient replicas). Transient replicas retain the data they replicate only long enough for it to be propagated to full replicas, via incremental repair, at which point the data is deleted. Writing to transient replicas can be avoided almost entirely if monotonic reads are not required because it is possible to achieve a quorum of acknowledged writes without them.

This results in a savings in disk space, CPU, and IO. By deleting data as soon as it is no longer needed, transient replicas require only a fraction of the disk space of a full replica. By not having to store the data indefinitely, the CPU and IO required for compaction is reduced, and read queries are faster as they have less data to process.

So what are the benefits of not actually keeping a full copy of the data? Well, for some installations and use cases, transient replicas can be almost free if monotonic reads are disabled. In future releases where monotonic reads are supported with Transient Replication, enabling monotonic reads would reduce the savings in CPU and IO, but even then they should still be significant.

Transient Replication is designed to be transparent to applications:

  • Consistency levels continue to produce the same results for queries.
  • The number of replicas that can be lost before data loss occurs is unchanged.
  • The number of replicas that can be unavailable before some queries start to timeout or return unavailable is unchanged (with the exception of ONE).

With Transient Replication, you can go from 3 replicas to 5 replicas, two of which are transient, without adding any hardware.

If you are running an active-passive 2 DC setup with 3 replicas in each DC, you can make one replica in each DC transient and still have four full copies of the data in total.

Feature support

Transient Replication is not intended to fully replace Cassandra’s existing approach to replication. There are features that currently don’t work with transiently replicated keyspaces and features that are unlikely ever to work with them.

You can have keyspaces with and without Transient Replication enabled in the same cluster, so it is possible to use Transient Replication for just the use cases that are a good fit for the currently available functionality.

Currently unsupported but coming:

  • Monotonic reads
  • Batch log
  • LWT
  • Counters

Will never be supported:

  • Secondary indexes
  • Materialized views

How Transient Replication works

Overview

Transient replication extends Cassandra’s existing consistent hashing algorithm to designate some replicas of a point or range on the consistent hash ring as transient and some as full. The following image depicts a consistent hash ring with three replicas A, B, and C. The replicas are located at tokens 5, 10, 15 respectively. A key k hashes to token 3 on the ring.

A consistent hash ring without Transient Replication

Replicas are selected by walking the ring clockwise starting at the point on the ring the key hashes to. At RF=3, the replicas of key k **are **A, B, C. With Transient Replication, the last N replicas (where N is the configured number of transient replicas) found while walking the ring are designated as transient.

There are no nodes designated as transient replicas or full replicas. All nodes will fully replicate some ranges on the ring and transiently replicate others.

The following image depicts a consistent hash ring at RF=3/1 (three replicas, one of which is transient). The replicas of k are still A, B, and C, but C is now transiently replicating k.

A consistent hash ring with Transient Replication

Normally all replicas of a range receive all writes for that range, as depicted in the following image.

Normal write behavior

Transient replicas do not receive writes in the normal write path.

Transient write behavior

If sufficient full replicas are unavailable, transient replicas will receive writes.

Transient write with unavailable node

This optimization, which is possible with Transient Replication, is called Cheap Quorums. This minimizes the amount of work that transient replicas have to do at write time, and reduces the amount of background compaction they will have to do.

Cheap Quorums and monotonic reads: Cheap Quorums may end up being incompatible with an initial implementation of monotonic reads, and operators will be able to make a conscious trade off between performance and monotonic reads.

Rapid write protection

In keyspaces utilizing Transient Replication, writes are sent to every full replica and enough transient replicas to meet the requested consistency level (to make up for unavailable full replicas). In addition, enough transient replicas are selected to reach a quorum in every datacenter, though unless the consistency level requires it, the write will be acknowledged without ensuring all have been delivered.

Because not all replicas are sent the write, it’s possible that insufficient replicas will respond, causing timeouts. To prevent this, we implement rapid write protection, similar to rapid read protection, that sends writes to additional replicas if sufficient acknowledgements to meet the consistency level are not received promptly.

The following animation shows rapid write protection in action.

Animation of rapid write protection preventing a write timeout

Rapid write protection is configured similarly to rapid read protection using the table option additional_write_policy. The policy determines how long to wait for acknowledgements before sending additional mutations. The default is to wait for P99 of the observed latency.

Incremental repair

Incremental repair is used to clean up transient data at transient replicas and propagate it to full replicas.

When incremental repair occurs transient replicas stream out transient data, but don’t receive any. Anti-compaction is used to separate transient and fully replicated data so that only fully replicated data is retained once incremental repair completes.

The result of running an incremental repair is that all full replicas for a range are synchronized and can be used interchangeably to retrieve the repaired data set for a query.

Read path

Reads must always include at least one full replica and can include as many replicas (transient or full) as necessary to achieve the desired consistency level. At least one full replica is required in order to provide the data not available at transient replicas, but it doesn’t matter which full replica is picked because incremental repair synchronizes the repaired data set across full replicas.

Reads at transient replicas are faster than reads at full replicas because reads at transient replicas are unlikely to return any results if monotonic reads are disabled, and they haven’t been receiving writes.

Creating keyspaces with Transient Replication

Transient Replication is supported by SimpleStrategy and NetworkTopologyStrategy. When specifying the replication factor, you can specify the number of transient replicas in addition to the total number of replicas (including transient replicas). The syntax for a replication factor of 3 replicas total with one of them being transient would be “3/1”.

ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'DC1' : '3/1'};
ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : '3/1'};

Monotonic reads are not supported with Transient Replication in 4.0, so any existing tables in the keyspace must have monotonic reads disabled by setting read_repair = 'NONE'

Once the keyspace has been altered, you will need to run incremental repair and then nodetool cleanup to ensure transient data is cleaned up.

Operational matters

Transient replication requires rolling incremental repair to be run regularly in order to move data from transient replicas to full replicas. By default transient replicas will receive 1% of writes for transiently replicated ranges due to rapid write protection. If a node is down for an extended period of time, its transient replicas will receive additional write load and that data should be cleaned up using incremental repair. Running incremental repair regularly will ensure that the size of each repair is small.

It’s also a good idea to run a small number of vnodes with transient replication so that when a node goes down the load is spread out over several other nodes that transiently replicate that range. Larges numbers of vnodes are known to be problematic, so it’s best to start with a cluster that is already close to or at its maximum size so that a small number of vnodes will be sufficient. If you intend to grow the cluster in the future, you will need to be cognizant of how this will interact with the number of vnodes you select.

While the odds of any data loss should multiple nodes be permanently lost remain the same with transient replication, the magnitude of potential data loss does not. With 3/1 transient replication the permanent loss of two nodes could result in the loss of the entirety of the repaired data set. If you are running a multi-DC setup with a high level of replication such as 2 DCs, with 3/1 replicas in each, then you will have 4 full copies total and the added risk of transient replication is minimal.

Experimental features

Experimental features are a relatively new idea for Apache Cassandra. Although we recently voted to make materialized views an experimental feature retroactively, Transient Replication is the first experimental feature to be introduced as such.

The goal of introducing experimental features is to allow for incremental development across multiple releases. In the case of Transient Replication, we can avoid a giant code drop that heavily modifies the code base, and the associated risks with incorporating a new feature that way.

What it means for a feature to be experimental doesn’t have a set definition, but for Transient Replication it’s intended to set expectations. As of 4.0, Transient Replication’s intended audience is expert operators of Cassandra with the ability to write the book on how to safely deploy Transient Replication, debug any issues that result, and if necessary contribute code back to address problems as they are discovered.

It’s expected that the feature set for Transient Replication will not change in minor updates to 4.0, but eventually it should be ready for use by a wider audience.

Next steps for Transient Replication

If increasing availability or saving on capacity sounds good to you, then you can help make transient replication production-ready by testing it out or even deploying it. Experience and feedback from the community is one the of the things that will drive transient replication bug fixing and development.

Step by Step Guide to Installing and Configuring Spark 2.0 to Connect with Cassandra 3.x

In this guide, we will be installing Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program. If you have any of these software packages installed and configured already you can skip that step. This guide assumes you have a Cassandra 3.x cluster that is already up and running. For more information on installing and using Cassandra visit http://cassandra.apache.org/.Note: The following steps should be performed on all the nodes in the cluster unless otherwise noted.

Install Scala 2.11

Ensure you have Java installed


$ java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

If you don't have Java installed follow this tutorial to get Java 8 installed.
Install Scala 2.11.8


$ wget www.scala-lang.org/files/archive/scala-2.11.8.deb
$ sudo dpkg -i scala-2.11.8.deb
$ scala -version

Install SBT 0.13



echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
$ sudo apt-get update
$ sudo apt-get install sbt

Install Spark 2.0

Download Spark 2.0 from https://spark.apache.org/downloads.html and unpack the tar file


$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
$ tar zxf spark-2.0.2-bin-hadoop2.7.tgz
$ sudo mv spark-2.0.2-bin-hadoop2.7/ /usr/local/spark/

Update system variables


$ sudo nano /etc/environment

Add an environment variable called SPARK_HOME


export SPARK_HOME=/usr/local/spark

At the end of the PATH variable add $SPARK_HOME/bin


PATH="<previous_entries>:/usr/local/spark/bin"

Refresh the environment


source /etc/environment

Create spark user and make it the owner of the SPARK_HOME directory


$ sudo adduser spark --system --home /usr/local/spark/ --disabled-password
$ sudo chown -R spark:root /usr/local/spark

Create Log and Pid directories


$ sudo mkdir /var/log/spark
$ sudo chown spark:root /var/log/spark
$ sudo -u spark mkdir $SPARK_HOME/run

Create the Spark Configuration files

Create the Spark Configuration files by copying the templates


$ sudo cp /usr/local/spark/conf/spark-env.sh.template /usr/local/spark/conf/spark-env.sh
$ sudo cp /usr/local/spark/conf/spark-defaults.conf.template /usr/local/spark/conf/spark-defaults.conf
$ sudo chown spark:root /usr/local/spark/conf/spark-*

Edit the Spark Environment file spark-env.sh


export SPARK_LOG_DIR=/var/log/spark
export SPARK_PID_DIR=${SPARK_HOME}/run

Configure Spark nodes to join cluster

If you will not be managing Spark using the Mesos or YARN cluster managers, you'll be running Spark in what is called "Standalone Mode".
In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then on every worker node you must edit the /etc/spark/conf/spark-env.sh to point to the host where the Spark Master runs.


# Options for the daemons used in the standalone deploy mode
export SPARK_MASTER_HOST=<spark_master_ip_or_hostname_here>

You can also change other elements of the default configuration by editing the /etc/spark/conf/spark-env.sh. Some other configs to consider are:
  • SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
  • SPARK_WORKER_CORES, to set the number of cores to use on this machine
  • SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
  • SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
  • SPARK_WORKER_INSTANCE, to set the number of worker processes per node
  • SPARK_WORKER_DIR, to set the working directory of worker processes

Installing Spark as a service

Run the following commands to create a service for the spark-master and spark-worker


$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-master
$ sudo chmod 0755 /etc/init.d/spark-master
$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-worker
$ sudo chmod 0755 /etc/init.d/spark-worker
$ sudo update-rc.d spark-master defaults 99
$ sudo update-rc.d spark-worker defaults 99

Edit the /etc/init.d/spark-worker file. If a variable or function already exists then replace it with the text below.


DESC="Spark Worker"
NAME=spark-worker
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.worker.Worker-1.pid
export SPARK_HOME


# Exit if the package is not installed
#[ -x "$DAEMON" ] || exit 0


if [ -f $SPARK_HOME/conf/spark-env.sh ];then
        . $SPARK_HOME/conf/spark-env.sh
else
        echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service."
fi


#
# Function that returns 0 if process is running, or nonzero if not.
#
# The nonzero value is 3 if the process is simply not running, and 1 if the
# process is not running but the pidfile exists (to match the exit codes for
# the "status" command; see LSB core spec 3.1, section 20.2)
#
is_running()
{
    CMD_PATT="org.apache.spark.deploy.worker.Worker"
    if [ -f $PIDFILE ]; then
        pid=`cat $PIDFILE`
        grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0
        return 1
    fi
    return 3
}


#
# Function that starts the daemon/service
#
do_start()
{
        # Return
        #   0 if daemon has been started
        #   1 if daemon was already running
        #   2 if daemon could not be started


        [ -e `dirname "$PIDFILE"` ] || \
                install -d -ospark -groot -m755 `dirname $PIDFILE`


        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE  \
                --exec $SPARK_HOME/sbin/start-slave.sh  \
                --test > /dev/null \
                || return 1
        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \
                --exec $SPARK_HOME/sbin/start-slave.sh -- spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
                || return 2


}


#
# Function that stops the daemon/service
#
do_stop()
{
        start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE
        RETVAL="$?"
        rm -f $PIDFILE
        return "$RETVAL"
}


#
# Function that sends a SIGHUP to the daemon/service
#
do_reload() {
        #
        # If the daemon can reload its configuration without
        # restarting (for example, when it is sent a SIGHUP),
        # then implement that here.
        #
        start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE
        return 0
}


...
status)
      is_running
      stat=$?
      case "$stat" in
              0) log_success_msg "$DESC is running" ;;
              1) log_failure_msg "could not access pidfile for $DESC" ;;
              *) log_success_msg "$DESC is not running" ;;
      esac
      exit "$stat"
      ;;
...

Edit the /etc/init.d/spark-master file. If a variable or function already exists then replace it with the text below.


DESC="Spark Master"
NAME=spark-master
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.master.Master-1.pid
export SPARK_HOME


# Exit if the package is not installed
#[ -x "$DAEMON" ] || exit 0


if [ -f $SPARK_HOME/conf/spark-env.sh ];then
        . $SPARK_HOME/conf/spark-env.sh
else
        echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service."
fi


#
# Function that returns 0 if process is running, or nonzero if not.
#
# The nonzero value is 3 if the process is simply not running, and 1 if the
# process is not running but the pidfile exists (to match the exit codes for
# the "status" command; see LSB core spec 3.1, section 20.2)
#
is_running()
{
    CMD_PATT="org.apache.spark.deploy.master.Master"
    if [ -f $PIDFILE ]; then
        pid=`cat $PIDFILE`
        grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0
        return 1
    fi
    return 3
}


#
# Function that starts the daemon/service
#
do_start()
{
        # Return
        #   0 if daemon has been started
        #   1 if daemon was already running
        #   2 if daemon could not be started


        [ -e `dirname "$PIDFILE"` ] || \
                install -d -ospark -groot -m755 `dirname $PIDFILE`


        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh --test > /$
                || return 1
        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh  \
                || return 2
}


#
# Function that stops the daemon/service
#
do_stop()
{
        start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE


        RETVAL="$?"
        rm -f $PIDFILE
        return "$RETVAL"
}


#
# Function that sends a SIGHUP to the daemon/service
#
do_reload() {
        #
        # If the daemon can reload its configuration without
        # restarting (for example, when it is sent a SIGHUP),
        # then implement that here.
        #
        start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE
        return 0
}


...
status)
      is_running
      stat=$?
      case "$stat" in
              0) log_success_msg "$DESC is running" ;;
              1) log_failure_msg "could not access pidfile for $DESC" ;;
              *) log_success_msg "$DESC is not running" ;;
      esac
      exit "$stat"
      ;;
...

Running Spark as a service

Start the Spark master node first. On whichever node you've selected to be master, run:


$ sudo service spark-master start

On all the other nodes, start the workers:


$ sudo service spark-worker start

To stop Spark, run the following commands on the appropriate nodes


$ sudo service spark-worker stop
$ sudo service spark-master stop

Service logs will be stored in /var/log/spark.

Testing the Spark service

To test the Spark service, start spark-shell on one of the nodes.


$ spark-shell --master spark://<IP>:<Port>

When the prompt comes up, execute the following line of code:


$ scala> sc.parallelize( 1 to 1000 ).sum()

Get Spark-Cassandra-Connector on the client

The spark-cassandra-connector is a Scala library that exposes Cassandra tables as Spark RDDs, lets you write Spark RDDs to Cassandra tables, and allows you to execute arbitrary computations and CQL queries that are distributed to the Cassandra nodes holding the data, which allows them to be fast. Your code + the spark-cassandra-connector and all dependencies are packaged up and sent to the Spark nodes.
If you are writing ad-hoc queries / computations from the spark-shell. Start up the shell by running the command:


$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11

The --packages option downloads the connector and all of its dependencies from the Spark Packages site and places it in the path of the Spark Driver and all Spark Executors.
If you are writing a Scala application; configure a new Scala project. Your build.sbt file should look something like this:


name := "MySparkProject"


version := "1.0"


scalaVersion := "2.11.8"


val sparkVersion = "2.0.2"


resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"


libraryDependencies ++= Seq(
  "org.apache.spark"      % "spark-core_2.11"  % sparkVersion,
  "org.apache.spark"      % "spark-sql_2.11"   % sparkVersion,
  "datastax"              % "spark-cassandra-connector" % "2.0.0-M2-s_2.11"
)

Testing the connector

To start out, create a simple keyspace and table in Cassandra. Run the following statements in cqlsh:


CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);

Then insert some example data:


INSERT INTO test.kv(key, value) VALUES ('key1', 1);
INSERT INTO test.kv(key, value) VALUES ('key2', 2);

For this test, we'll use the spark-shell.


$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11



// import the spark connector namespace
import com.datastax.spark.connector._


// read Cassandra data into an RDD
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)  


// Add two more rows to the table
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))

Diagnosing and Fixing Cassandra Timeouts

We recently brought Cassandra into our infrastructure to take advantage of its robust built-in scalability model. After reading all the "marketing" material, watching/attending talks by evangelists, and reading blog posts by experts and enthusiasts we were excited to embark on our Cassandra journey.

via GIPHY


Everything was honky dory in the early going. We followed data modeling best practices, provisioned the right sized servers as recommended by DataStax, and saw great results in the test environment. The issues didn't start until we started moving over portions of our production traffic to our Cassandra production servers. That's when we noticed that we would fairly frequently get read and write timeouts as the volume of traffic got higher.


via GIPHY

My first troubleshooting step was to find out was to monitor the servers to see if there were any anomalies or resource constraints that led to the timeouts. I'm a fan of Brendan Gregg's USE method for monitoring overall system health and identifying errors and bottlenecks. However, at the time we didn't have a comprehensive monitoring solution like Datadog or Icinga2 setup for those servers. So in its place, I wrote a shell script that every 30 seconds would log memory and CPU utilization, disk I/O, network I/O, Cassandra thread pool stats (nodetool tpstats), GC stats, compaction stats, and the table stats for my most active tables. To my chagrin, my hacked together monitor did not reveal any anomalies or resource constraints that were the cause of the timeouts. On the contrary, it revealed that resource utilization (with the exception of CPU which was a bit high) was well under the hardware limits. I did notice that a number of times, a query timeout would coincide with the start of SSTable compaction which led down the rabbit hole of compaction tweaks, however I emerged from that hole with no solution to the Cassandra timeouts.


via GIPHY


Once hardware constraints had been ruled out, I kicked off my next round of troubleshooting efforts by turning on trace probability and looking at the trace for long running queries. In the traces I noticed high queue times, which is not unexpected during times of high load, but I also noticed something else. There were a number of times when a worker node received the request from the coordinator, completed the read operation for the query, queued up the result to be sent back to the coordinator, but the results were never sent and the coordinator timed out waiting for the response. Bingo! I had found the cause of the timeout! But I still hadn't found the root cause; why were the responses not getting sent back? And I didn't know how to fix it either. I spent a good amount of time looking into tweaking Cassandra thread pools (like the RequestResponseStage pool), reading the Cassandra source code on GitHub and re-verifying network traffic/congestion between the nodes during high loads. But those efforts yielded no solution to the query timeouts which led me to the third round of troubleshooting efforts.


via GIPHY


For this round of troubleshooting I turned up the logging level on the org.apache.cassandra logger to DEBUG and I live tailed both the Cassandra system.log and debug.log. That's when I noticed something weird; several times, one of the nodes will detect the other as being down for a period of time. When I looked at the system logs and Linux process activity in htop for the supposedly downed node, it appeared that the node was up and running. This lead me to take a fine-grained look at the system.log for the downed node and in there I noticed a pretty long GC pause. So I flipped over to the gc.log, examined it, and noticed even more long GC pauses. I also noticed that at times there were 35 second pauses that would occur in succession. Those super long GC pauses cause the node to appear down to its peers. It also explained cases where a coordinator sent a request and timed out because the worker node paused execution and didn't return the response in a timely manner. I had found my problem!


via GIPHY


So how did this happen? Well, when we were configuring Cassandra, some doofus (aka me!) noticed that
1. The key cache drastically improved the performance of queries
2. They had a high key cache hit rate and continued to have a high hit rate even as they increased the size of the key cache.
So this doofus (again, me), decided to bump the key cache all the way to 8 GB so he could cache entire tables' keys, essentially turning them into in-memory lookups. To accommodate having such a large key cache, he bumped up the JVM heap to 20 GB thinking the new G1GC garbage collector can handle very large heaps. News flash; it can't! At least not when dealing with my workload of high volumes of concurrent reads and writes.

The fix was simple; reduce the size of the JVM heap and the key cache accordingly. The recommended heap size is 8 GB. I'm able to have my new max heap size set to slightly above that recommendation because I switched my memtables to storing part of its data off-heap (offheap_buffers) which greatly reduced heap memory pressure. I also started using the JEMAlloc allocator which is more efficient than the native GCC allocator.

In conclusion, distributed database systems are complex beasts and tuning them has to be done on many levels. While Cassandra gives you many knobs to tune to get the right performance out of your system, it takes a lot of study, experience or both, to know just how to tune them for your specific hardware and workload. DataStax published this useful Cassandra JVM tuning article to help provide guidance for users. But my advice for you would be to try a number of JVM configs and find out what works for your production workload. If I were doing this exercise over, I would start at the hardware level again but with better monitoring tools and then I would move to the system/debug/gc logs before looking at the traces. There is a good performance tuning tutorial by Kiyu Gabriel on DataStax Academy that you might find useful. Hope this article helps someone be more productive than I was.



Notes from Cassandra Day Atlanta 2016

I attended the DataStax Cassandra 2016 in Atlanta and took down a ton of notes on things that I found interesting. After going through those notes it occurred to me that many of the nuggets in these notes could be useful to someone else other than myself. So I’ve published the notes below.

The information below is mostly composed of quotes from DataStax engineers and evangelists. Very little context is contained in these notes. However, if you are a beginning-intermediate level Cassandra developer or admin you’ll likely have the needed context already. I did attempt to organize the notes somewhat coherently in order to allow you jump to a section you care about and also to provide some context in the grouping.

Data Modeling Tips

General

  • When migrating to C*, don’t just port over your SQL schema. Be query-driven in the design of your schema.
  • If you are planning on using Spark with C*, start with a C* use case / data model 1st and then use Spark on top of it for analytics
  • Patrick McFadden (DataStax evangelist) on not having certain relational DB constructs in C*: “In the past I’ve scaled SQL DBs by removing referential integrity, indexes and denormalizing. I’ve even built a KV database on an Oracle DB that I was paying of dollars per core for”. The implication here is these constructs bound scalability in relational databases and in explicitly not having them Cassandra’s scalability is unbounded (well, at least theoretically).
  • You can stop partition hotspots by adding an additional column to the partition key (like getting the modulus of another column when divided by the number of nodes) or by increasing the resolution of the key in the case where the partition key is a time span.
  • Using the “IF NOT EXISTS” clause stops an UPSERT from happening automatically / by-default. It also creates a lock on the record while executing, so that multiple writers don’t step on each other trying to insert the same record in a race condition. This is a light weight transaction (LWT). You can also create an LWT when doing a BATCH UPSERT
  • You can set a default TTL (Time To Live) on an individual table. This will apply to all data inserted into the table. A CQL insert can also specify a TTL for the inserted data that overrides the default.
  • DTCS (DateTieredCompactionStrategy) compaction is built for time series data. It groups SSTables together by time so that older tables don’t get compacted and can be efficiently dropped if a TTL is set.
  • CQL Maps allow you to create complex types inside your data store
  • One of the reasons for limiting the size of elements that can be in a CQL collection is because on reads the entire collection must be denormalized as a whole in the JVM so you can add a lot of data to the heap.

Secondary indexes

  • Secondary indexes are not like you have them in relational DBs. They are not built for speed, they are built for access.
  • Secondary indexes get slower the more nodes you have (because of network latencies)
  • Best thing to do with a secondary index is just to test it out and see its performance, but do it on a cluster not your laptop so you can actually see how it would perform in prod. Secondary indexes are good for low cardinality data.

Development Tips

Querying

  • Use the datastax drivers not ODBC drivers because datastax drivers are token aware and therefore can send queries to the right node, removing the need for the coordinator to make excessive network requests depending on the consistency level.
  • Use PreparedStatements for repeated queries. The performance difference is significant.
  • Use ExecuteAsync with PreparedStatements when bulk loading. You can have callbacks on Future objects and use the callbacks for things like detecting a failure and responding appropriately
  • BATCH is not a performance optimization. It leads to garbage collection and hotspots because the data stays in memory on the coordinator.
  • Use BATCH only to update multiple tables at once atomically. An example is if you have a materialized view / inverted index table that needs to be kept in sync with the main table.

General

  • Updates on collections create range tombstones to mark the old version of the collection (map, set, list) as deleted & create the new one. This is important to know because tombstones affect read performance and at a certain time having too many tombstones (100K) can cause a read to fail. http://www.jsravn.com/2015/05/13/cassandra-tombstones-collections.html
  • Cassandra triggers should be used with care and only in specific use cases because you need to consider the distributed nature of C*.

Ops Tips

Replication Settings

  • SimpleStrategy fails if you have multiple datacenters (DCs). Because 50% of your traffic that’s going to the other DC becomes terribly slow. Use NetworkTopologyStrategy instead. You can configure how replication goes to each DC individually, so you can have a table that never gets replicated to the US for example, etc.
  • If you are using the NetworkTopologyStrategy then you should use the Gossiping Property File Snitch to make C* network topology aware instead of the other property file configurator because you dan’t now have to change the file on every node and reboot them.

Hardware Sizing

Recommended Node size
  • 32 GB RAM
  • 8-12 Cores
  • 2 TB SSD
Hardware should be sized appropriately. 64 cores will be hard to use. If you are adding search and/or analytics to the node, you need more RAM: 128+ GB. More memory is needed for search because it keeps its indexes in memory.

Recommendation for Spark & Cassandra on the same node: Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations.

The recommendation is to have no more that 1 TB of data per node. The reason for 2 TB disk despite a 1 TB recommendation is because once over 60% of your disk is full you run a risk of not having enough disk space during compaction. This is especially true if you are using size tiered compaction. With level tiered compaction you can use up to 70% without risk.

Use RAID 0 for your storage configuration. C* does replication for you. You can also use JBOD and C* can intelligently handle failures of some of the disks in your JBOD cluster.

Java Heap Size & Garbage Collection

  • As a general rule of thumb; start with defaults and then walk it up.
  • The ParNew/CMS GC works best with 8 GB
  • The G1GC can manage 20 GB of RAM (Note: Another engineer mentioned to me that 32 GB of RAM is no problem for G1GC). Should not be used if the heap is under 8 GB.
    • Use G1GC with Solr / DSE Search nodes

Memory Usage and Caching

  • Its very important to have ample Off-heap RAM. Some C* data structures such as memtables and bloom filters are Off-heap. You also want to have non-heap RAM for page caching.
  • Row caching can significantly speed up reads because if avoids a table scan (If the page isn’t cached already). However row caching should be judiciously used. Best use case is for tables with a high density of hotspot data. The reason being that on a large table with varying and disparate data and seemingly random reads, you’ll end up with a lot of cache misses which invalidates the point of having a cache.
  • The row cache is filled on reads. memtables are filled on writes.
  • Memtables remain in memory until there is memory pressure based on configuration in the cassandra.yaml, then they are removed from RAM.

Benchmarking

  • Use the Cassandra Stress program that comes with C*.
  • Cassandra Stress can be configured; you can specify number of columns, data size, data model, queries, types of data, cardinality, etc.
  • To model production, use multiple clients & multiple threads for clients in your Benchmarking
  • When stress testing make sure you run it long enough to run into compactions, GC, repairs. Because when you test you want to know what happens in that situation. You can even stress test and introduce failures and see how it responds. You can/should instrument the JVM during stress testing and then go back and look at it.
  • General recommended stress test times is 24 - 48 hrs run times.
  • DSE has solr-stress for testing the solr integration.
For Performance A starting expectation of 3k - 5k transactions per second per core is reasonable.

Interesting Note: A DataStax customer once conducted a stress test that ran for 6-8 weeks for 24 hrs. They were testing to see how changing the compaction strategy impacted their read heavy workload.

General

  • Turn on user authentication. At least Basic Auth. This is good for security and auditing purposes. Also it allows you to not accidentally drop a production table because you thought you were connected to staging.
  • Use TLS if you are talking between DCs across the public internet
  • Don’t just bump up the heap for greater performance or to solve your problems! You’ll have to pay for it later during GC.
  • If you have crappy latency on 1% of your operations you shouldn’t just ignore it. You should try to understand what happened, is it compaction? Is it GC? That way you can address the issue that caused the high latency. Because that 1% could one day be 5%.
  • Why should be have backups? Backups exist to protect against people not machines. data corruption is the primary reason for backups. For example someone accidentally changes all the '1’s in your DB to 'c’s.
  • There is no built in way to count the number of rows in a Cassandra table. The only way to do so is to write a Spark job. You can estimate the table size if you know the amount of data per row and divide the table size by that amount.
  • Use ntpd! C* nodes in a cluster must always be on time because time stamps are important and are used in resolving conflict. Clock drifts cannot be tolerated.
  • Tombstone Hell: queries on partitions with a lot of tombstones require a lot of filtering which can cause performance problems. Compaction gets rid of tombstones.
  • Turn off swap on C* nodes.
  • If C* runs out of memory it just dies. But that’s perfectly ok, because the data is distributed / replicated and you can just bring it back up. In the mean time data will be read from the other nodes.

Cluster Management

  • Don’t put a load balancer in front of your C* cluster.
  • Make sure you are running repairs. Repairs are essentially network defrag and help maintain consistency. Run repairs a little at a time, all the time.
  • If you can model your data to have TTLs you can run repairs much less or not at all.
  • If you never delete your data you can set gc_grace_period to 0.
  • Don’t upgrade your C* versions by replacing an outgoing node with a new node running a newer version of C*. C* is very sensitive when it comes to running mixed versions in production. The older nodes may not be able to stream data to the newer node. Instead you should do an in-place upgrade, i.e. shut down the node (the C* service), upgrade C* and then bring it back up. (https://docs.datastax.com/en/upgrade/doc/upgrade/cassandra/upgradeCassandraDetails.html)
  • When a new node is added in order to increase storage capacity / relieve storage pressure on the existing nodes. Ensure you run nodetool cleanup as the final step. This is because C* won’t automatically reclaim the space of the data streamed out to the new node.

Monitoring, Diagnostic

Monitoring Services for capturing machine level metrics
  • Monit
  • Munin
  • Icinga
  • JMX Metrics
Make sure you are capturing application metrics and deliver them to a dashboard that can integrate app metrics and server metrics

Since you are running on multiple machines it becomes important to aggregate your logs.
  • Logstash
Diagnostic tools
  • htop (a better version of top)
  • iostat
  • dstat
  • strace
  • jstack
  • tcpdump (monitor network traffic, can even see plain text queried coming in)
  • nodetool tpstats (can help diagnose performance problems by showing you which thread pools are overwhelmed / blocked. From there you can make hypotheses are to the cause of the blockage / performance problem)

DSE Offering

DSE Max => Cassandra + Support + Solr + Spark

DSE search

  • Solr fixes a couple of rough edges for C* like joins, ad-hoc querying, fuzzy text searching and secondary indexing problems in larger clusters.
  • DSE search has tight Solr integration with C*. C* stores the data, Solr stores the indexes. CQL searches that use the solr_query expression in the WHERE clause search Solr first for the location of the data to fetch and then queries C* for the actual data.
  • You can checkout killrvideo’s Github for an example of DSE search in action (https://github.com/LukeTillman/killrvideo-csharp/blob/master/src/KillrVideo.Search/SearchImpl/DataStaxEnterpriseSearch.cs)
  • Solr is about a 3x multiplication on CPU and RAM needed for a running regular C*. This is because Solr indexes must live in RAM.
  • Solr can do geospatial searches & can do arbitrary time range searches (which is another rough edge that C* cannot do). E.g. “search for all sales in the past 4 mins 30 seconds”

DSE Spark

  • Spark runs over distributed data stores and schedules analytics jobs on workers on those nodes. DSE Max has Spark integration that just requires the flipping of a switch, no additional config.
  • There’s no need for definition files, workers automatically have access to the tables and Spark is data locality aware so jobs go to the right node.
  • Optionally with DSE search integration you can have search running on the same nodes that have the analytics and data and leverage the search indexes for faster querying instead of doing table scans.
  • With DSE analytics, you can create an analytics DC and have 2-way replication between the operations DC and the analytics DC. 2 way is important because it means that the analytics DC can store the result of its computation to the Cassandra table which then gets replicated back to the ops DC.
  • The Spark jobs / workers have access to more than just the C* table data. They can do just about anything you code. They can pull data from anything; open files, read queues, JDBC data stores, HDFS, etc. And write data back out as well.
  • Recommendation for Spark & Cassandra on the same node. Appropriate resource allocation is important. Having Spark will require more memory. Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations.

Other Notes

Cassandra has a reference implementation called killrvideo. It is an actual website hosted on MS Azure. The address is killrvideo.com. It is written by Luke Tillman in C#. Checkout the source code on Github (https://github.com/LukeTillman/killrvideo-csharp).

Configuring Remote Management and Monitoring on a Cassandra Node with JConsole

In your cassandra-env.sh set

LOCAL_JMX=no

If you want username and password security. Keep the default setting for jmxremote authenticate which is true. Otherwise set it to false:

JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false"

Note: If set to true, enter a username and password for the jmx user in the jmxremote.password.file. Follow the instructions here on how to secure that file. Setting jmxremote.authenticate to true also requires you to pass in the username and password when running a nodetool command, e.g. nodetool status -u cassandra -pw cassandra

Restart the server (if needed).

Connecting to the node using JConsole for monitoring

Find the JConsole jar in your JDK_HOME/bin or JDK_HOME/lib directory. If you don’t have the Java JDK installed it can be downloaded from the Oracle Website.

Double-click the executable jar to run it (or run it from the command line). Select “Remote Process” and enter the following connection string.

 service:jmx:rmi:///jndi/rmi://<target ip address>:7199/jmxrmi

replacing <target ip address> with the address of the machine you intend to manage or monitor.

Note: JConsole may try (and fail) to connect using ssl first. If it does so it will ask if it can connect over a non-encrypted connection. You should answer this prompt in the affirmative and you are good.

Congrats! You now have everything you need to monitor and manage a cassandra node.

For help with how to monitor Cassandra using JMX and with interpreting the metrics see:

The Right Database for the Right Job - Chattanooga Developer Lunch Presentation

Does this sound like you? "OMG!! PostreSQL, Neo4j, Elasticsearch, MongoDB, RethinkDB, Cassandra, SQL Server, Riak, InfluxDB, Oracle NoSQL, SQLite, Hive, Couchbase, CouchDB, DynamoDB. I've got an issue with my current database solution or I'm starting a new project and I don't know what to choose!"

This talk is intended to help you match your data storage needs with suitable solutions from a wide field of contenders. Looking at different data types, structures and interaction patterns, we will try to understand what makes certain data stores better suited than others and how implement polyglot persistence.