## Apache Cassandra Changelog #1 (October 2020)

Introducing the first Cassandra Changelog blog! Our monthly roundup of key activities and knowledge to keep the community informed.

## Release Notes

#### Updated

The most current Apache Cassandra releases are 4.0-beta2, 3.11.8, 3.0.22, 2.2.18 and 2.1.22 released on August 31 and are in the repositories. The next cut of releases will be out soon―join the Cassandra mailing list to stay up-to-date.

We continue to make progress toward the 4.0 GA release with the overarching goal of it being at a state where major users should feel confident running it in production when it is cut. Over 1,300 Jira tickets have been closed and less than 100 remain as of this post. To gain this confidence, there are various ongoing testing efforts involving correctness, performance, and ease of use.

With CASSANDRA-15013, the community improved Cassandra's ability to handle high throughput workloads, while having enough safeguards in place to protect itself from potentially going out of memory.

The Harry project is a fuzz testing tool that aims to generate reproducible workloads that are as close to real-life as possible, while being able to efficiently verify the cluster state against the model without pausing the workload itself.

The community published its first Apache Cassandra Usage Report 2020 detailing findings from a comprehensive global survey of 901 practitioners on Cassandra usage to provide a baseline understanding of who, how, and why organizations use Cassandra.

## Community Notes

Updates on new and active Cassandra Enhancement Proposals (CEPs) and how to contribute.

#### Changed

CEP-2: Kubernetes Operator was introduced this year and is an active discussion on creation of a community-based operator with the goal of making it easy to run Cassandra on Kubernetes.

CEP-7: Storage Attached Index (SAI) is a new secondary index for Cassandra that builds on the advancements made with SASI. It is intended to replace the existing built-in secondary index implementations.

Cassandra was selected by the ASF Diversity & Inclusion committee to be included in a research project to evaluate and understand the current state of diversity.

## User Space

Bigmate

"In vetting MySQL, MongoDB, and other potential databases for IoT scale, we found they couldn't match the scalability we could get with open source Apache Cassandra. Cassandra's built-for-scale architecture enables us to handle millions of operations or concurrent users each second with ease – making it ideal for IoT deployments." - Brett Orr

Bloomberg

"Our group is working on a multi-year build, creating a new Index Construction Platform to handle the daily production of the Bloomberg Barclays fixed income indices. This involves building and productionizing an Apache Solr-backed search platform to handle thousands of searches per minute, an Apache Cassandra back-end database to store millions of data points per day, and a distributed computational engine to handle millions of computations daily." - Noel Gunasekar

## In the News

Solutions Review - The Five Best Apache Cassandra Books on Our Reading List

Container Journal - 5 to 1: An Overview of Apache Cassandra Kubernetes Operators

## Cassandra Tutorials & More

A Cassandra user was in search of a tool to perform schema DDL upgrades. Another user suggested https://github.com/patka/cassandra-migration to ensure you don't get schema mismatches if running multiple upgrade statements in one migration. See the full email on the user mailing list for other recommended tools.

Start using virtual tables in Apache Cassandra 4.0 - Ben Bromhead, Instaclustr

Benchmarking Apache Cassandra with Rust - Piotr Kołaczkowski, DataStax

Open Source BI Tools and Cassandra - Arpan Patel, Anant Corporation

Build Fault Tolerant Applications With Cassandra API for Azure Cosmos DB - Abhishek Gupta, Microsoft

Understanding Data Modifications in Cassandra - Sameer Shukla, Redgate

Cassandra Changelog is curated by the community. Please send submissions to cassandra@constantia.io.

## Building a Low-Latency Distributed Stock Broker Application: Part 4

In the fourth blog of the  “Around the World ” series we built a prototype of the application, designed to run in two georegions.

Recently I re-watched “Star Trek: The Motion Picture” (The original 1979 Star Trek Film). I’d forgotten how much like “2001: A Space Odyssey” the vibe was (a drawn out quest to encounter a distant, but rapidly approaching very powerful and dangerous alien called “V’ger”), and also that the “alien” was originally from Earth and returning in search of “the Creator”—V’ger was actually a seriously upgraded Voyager spacecraft!

The original Voyager 1 and 2 had only been recently launched when the movie came out, and were responsible for some remarkable discoveries (including the famous “Death Star” image of Saturn’s moon Mimas, taken in 1980). Voyager 1 has now traveled further than any other human artefact and has since left the solar system! It’s amazing that after 40+ years it’s still working, although communicating with it now takes a whopping 40 hours in round-trip latency (which happens via the Deep Space Network—one of the stations is at Tidbinbilla, near our Canberra office).

Luckily we are only interested in traveling “Around the World” in this blog series, so the latency challenges we face in deploying a globally distributed stock trading application are substantially less than the 40 hours latency to outer space and back. In Part 4 of this blog series we catch up with Phileas Fogg and Passepartout in their journey, and explore the initial results from a prototype application deployed in two locations, Sydney and North Virginia.

## 1. The Story So Far

In Part 1 of this blog series we built a map of the world to take into account inter-region AWS latencies, and identified some “georegions” that enabled sub 100ms latency between AWS regions within the same georegion. In Part 2 we conducted some experiments to understand how to configure multi-DC Cassandra clusters and use java clients, and measured latencies from Sydney to North Virginia. In Part 3 we explored the design for a globally distributed stock broker application, and built a simulation and got some indicative latency predictions.

The goal of the application is to ensure that stock trades are done as close as possible to the stock exchanges the stocks are listed on, to reduce the latency between receiving a stock ticker update, checking the conditions for a stock order, and initiating a trade if the conditions are met.

## 2. The Prototype

For this blog I built a prototype of the application, designed to run in two georegions, Australia and the USA. We are initially only trading stocks available from stock exchanges in these two georegions, and orders for stocks traded in New York will be directed to a Broker deployed in the AWS North Virginia region, and orders for stocks traded in Sydney will be directed to a Broker deployed in the AWS Sydney region. As this Google Earth image shows, they are close to being antipodes (diametrically opposite each other, at 15,500 km apart, so pretty much satisfy the definition of  traveling “Around the World”), with a measured inter-region latency (from blog 2) of 230ms.

The design of the initial (simulated) version of the application was changed in a couple of ways to:

• Ensure that it worked correctly when instances of the Broker were deployed in multiple AWS regions
• Measure actual rather than simulated latencies, and
• Use a multi-DC Cassandra Cluster.

The prototype isn’t a complete implementation of the application yet. In particular it only uses a single Cassandra table for Orders—to ensure that Orders are made available in both georegions, and can be matched against incoming stock tickers by the Broker deployed in that region.

Some other parts of the application are “stubs”, including the Stock Ticker component (which will eventually use Kafka), and checks/updates of Holdings and generation of Transactions records (which will eventually also be Cassandra tables). Currently only the asynchronous, non-market, order types are implemented (Limit and Stop orders), as I realized that implementing Market orders (which are required to be traded immediately) using a Cassandra table would result in too many tombstones being produced—as each order is deleted immediately upon being filled (rather than being marked as “filled” in the original design, to enable the Cassandra query to quickly find unfilled orders and prevent the Orders table from growing indefinitely). However, for non-market orders it is a reasonable design choice to use a Cassandra table, as Orders may exist for extended periods of time (hours, days, or even weeks) before being traded, and as there is only a small number of successful trades (10s to 100s) per second relative to the total number of waiting Orders (potentially millions), the number of deletions, and therefore tombstones, will be acceptable.

We now have a look at the details of some of the more significant changes that were made to the application.

### Cassandra

I created a multi-DC Cassandra keyspace as follows:

CREATE KEYSPACE broker WITH replication = {'class': 'NetworkTopologyStrategy', 'NorthVirginiaDC': '3', 'SydneyDC': '3'};

In the Cassandra Java driver, the application.conf file determines which Data Center the Java client connects to. For example, to connect to the SydneyDC the file has the following settings:

datastax-java-driver {
basic.contact-points = [
"1.2.3.4:9042"
]

local-datacenter = "SydneyDC"
}
}

The Orders table was created as follows (note that I couldn’t use “limit” for a column name as it’s a reserved word in Cassandra!):

CREATE TABLE broker.orders (
symbol text,
orderid text,
customerid text,
limitthreshold double,
location text,
ordertype text,
quantity bigint,
starttime bigint,
PRIMARY KEY (symbol, orderid)
);

For the primary key, the partition key is the stock symbol, so that all outstanding orders for a stock can be found when each stock ticker is received by the Broker, and the clustering column is the (unique) orderid, so that multiple orders for the same symbol can be written and read, and a specific order (for an orderid) can be deleted. In a production environment using a single stock symbol partition may result in skewed and unbounded partitions which is not recommended.

The prepared statements for creating, reading, and deleting orders are as follows:

PreparedStatement prepared_insert = Cassandra.session.prepare(
"insert into broker.orders (symbol, orderid, buyorsell, customerid, limitthreshold, location, ordertype, quantity, starttime) values (?, ?, ?, ?, ?, ?, ?, ?, ?)");

PreparedStatement prepared_select = Cassandra.session.prepare(
"select * from broker.orders where symbol = ?");

PreparedStatement prepared_delete = Cassandra.session.prepare(
"delete from broker.orders where symbol = ? and orderid = ?");

I implemented a simplified “Place limit or stop order” operation (see Part 3), which uses the prepared_insert statement to create each new order, initially in the Cassandra Data Center local to the Broker where the order was created from, which is then automatically replicated in the other Cassandra Data Center. I also implemented the “Trade Matching Order” operation (Part 3), which uses the prepared_select statement to query orders matching each incoming Stock Ticker, checks the rules, and then if a trade is filled deletes the order.

### Deployment

I created a 3 node Cassandra cluster in the Sydney AWS region, and then added another identical Data Center in the North Virginia AWS regions using Instaclustr Managed Cassandra for AWS. This gave me 6 nodes in total, running on t3.small instances (5 GB SSD, 2GB RAM, 2 CPU Cores). This is a small developer sized cluster, but is adequate for a prototype, and very affordable (2 cents an hour per node for AWS costs) given that the Brokers are currently only single threaded so don’t produce much load. We’re more interested in latency at this point of the experiment, and we may want to increase the number of Data Centers in the future. I also spun up an EC2 instance (t3a.micro) in the same AWS regions, and deployed an instance of the Stock Broker on each (it only used 20% CPU). Here’s what the complete deployment looks like:

## 3. The Results

For the prototype, the focus was on demonstrating that the design goal of minimizing latency for trading stop and limit orders (asynchronous trades) was achieved. For the prototype, the latency for these order types is measured from the time of receiving a Stock Ticker, to the time an Order is filled. We ran a Broker in each AWS region concurrently for an hour, with the same workload for each, and measured the average and maximum latencies. For the first configuration, each Broker is connected to its local Cassandra Data Center, which is how it would be configured in practice. The results were encouraging, with an average latency of 3ms, and a maximum of 60ms, as shown in this graph.

During the run, across both Brokers, 300 new orders were created each second, 600 stock tickers were received each second, and 200 trades were carried out each second.

Given that I hadn’t implemented Market Orders yet, I wondered how I could configure and approximately measure the expected latency for these synchronous order types between different regions (i.e. Sydney and North Virginia)? The latency for Market orders in the same region will be comparable to the non-market orders. The solution turned out to be simple— just re-configure the Brokers to use the remote Cassandra Data Center, which introduces the inter-region round-trip latency which would also be encountered with Market Orders placed on one region and traded immediately in the other region. I could also have achieved a similar result by changing the consistency level to EACH_QUOROM (which requires a majority of nodes in each data center to respond). Not surprisingly, the latencies were higher, rising to 360ms average, and 1200ms maximum, as shown in this graph with both configurations (Stop and Limit Orders on the left, and Market Orders on the right):

So our initial experiments are a success, and validate the primary design goal, as asynchronous stop and limit Orders can be traded with low latency from the Broker nearest the relevant stock exchanges, while synchronous Market Orders will take significantly longer due to inter-region latency.

### Write Amplification

I wondered what else can be learned from running this experiment? We can understand more about resource utilization in multi-DC Cassandra clusters. Using the Instaclustr Cassandra Console, I monitored the CPU Utilization on each of the nodes in the cluster, initially with only one Data Center and one Broker, and then with two Data Centers and a single Broker, and then both Brokers running. It turns out that the read load results in 20% CPU Utilization on each node in the local Cassandra Data Center, and the write load also results in 20% locally.  Thus, for a single Data Center cluster the total load is 40% CPU. However, with two Data Centers things get more complex due to the replication of the local write loads to each other Data Center. This is also called “Write Amplification”.

The following table shows the measured total load for 1 and 2 Data Centers, and predicted load for up to 8 Data Centers, showing that for more than 3 Data Centers you need bigger nodes (or bigger clusters). A four CPU Core node instance type would be adequate for 7 Data Centers, and would result in about 80% CPU Utilization.

## 6. What Next?

In the next few blogs we’ll continue our journey “Around the World” and explore how to refine the initial simple design of the application so as to deploy and run it across multiple georegions using multiple AWS regions.

Initially this will involve mapping the components to Cassandra tables on a single data center. Once it’s working correctly with a single Cassandra data center, we’ll extend it to use multiple Cassandra data centers, which will require the use of multiple keyspaces for different replication topologies (e.g. replication across pairs of data centers vs. all data centers). We’ll also work out if, and how, to load-balance the application across multiple data centers in the same georegions, and how to enable redundancy, failover, and recovery at the application level.  It’s possible that a Kubernetes federated microservices mesh framework will help in doing this. We also plan to put Kafka to use to enable streaming StockTickers, so we’ll be investigating Kafka multi data center replication.

## 7. Further Resources

IBM also has a StockTrader demonstration application, and an in-depth series about deploying it using Cassandra, Kafka, Redis, and Kubernetes.

There’s an example of stock analysis using Elasticsearch (I’m planning on using Elasticsearch to analyse the stock trends, and provide some map-based visualisation of the data).

This is an interesting article on “Hacking a HFT System”!

The post Building a Low-Latency Distributed Stock Broker Application: Part 3 appeared first on Instaclustr.

## Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

Streaming is a process where nodes of a cluster exchange data in the form of SSTables. Streaming can kick in during many situations such as bootstrap, repair, rebuild, range movement, cluster expansion, etc. In this post, we discuss the massive performance improvements made to the streaming process in Apache Cassandra 4.0.

## High Availability

As we know Cassandra is a Highly Available, Eventually Consistent database. The way it maintains its legendary availability is by storing redundant copies of data in nodes known as replicas, usually running on commodity hardware. During normal operations, these replicas may end up having hardware issues causing them to fail. As a result, we need to replace them with new nodes on fresh hardware.

As part of this replacement operation, the new Cassandra node streams data from the neighboring nodes that hold copies of the data belonging to this new node’s token range. Depending on the amount of data stored, this process can require substantial network bandwidth, taking some time to complete. The longer these types of operations take, the more we are exposing ourselves to loss of availability. Depending on your replication factor and consistency requirements, if another node fails during this replacement operation, ability will be impacted.

## Increasing Availability

To minimize the failure window, we want to make these operations as fast as possible. The faster the new node completes streaming its data, the faster it can serve traffic, increasing the availability of the cluster. Towards this goal, Cassandra 4.0 saw the addition of Zero Copy streaming. For more details on Cassandra’s zero copy implementation, see this blog post and CASSANDRA-14556 for more information.

## Talking Numbers

To quantify the results of these improvements, we, at Netflix, measured the performance impact of streaming in 4.0 vs 3.0, using our open source NDBench benchmarking tool with the CassJavaDriverGeneric plugin. Though we knew there would be improvements, we were still amazed with the overall results of a five fold increase in streaming performance. The test setup and operations are all detailed below.

### Test Setup

In our test setup, we used the following configurations:

• 6-node clusters on i3.xl, i3.2xl, i3.4xl and i3.8xl EC2 instances, each on 3.0 and trunk (sha dd7ec5a2d6736b26d3c5f137388f2d0028df7a03).
• Table schema
CREATE TABLE testing.test (
key text,
column1 int,
value text,
PRIMARY KEY (key, column1)
) WITH CLUSTERING ORDER BY (column1 ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'enabled': 'false'}
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND speculative_retry = '99PERCENTILE';

• Data size per node: 500GB
• No. of tokens per node: 1 (no vnodes)

To trigger the streaming process we used the following steps in each of the clusters:

• terminated a node
• add a new node as a replacement
• measure the time taken to complete streaming data by the new node replacing the terminated node

For each cluster and version, we repeated this exercise multiple times to collect several samples.

Below is the distribution of streaming times we found across the clusters

### Interpreting the Results

Based on the graph above, there are many conclusions one can draw from it. Some of them are

• 3.0 streaming times are inconsistent and show high degree of variability (fat distributions across multiple samples)
• 3.0 streaming is highly affected by the instance type and generally looks generally CPU bound
• Zero Copy streaming is approximately 5x faster
• Zero Copy streaming time shows little variability in its performance (thin distributions across multiple samples)
• Zero Copy streaming performance is not CPU bound and remains consistent across instance types

It is clear from the performance test results that Zero Copy Streaming has a huge performance benefit over the current streaming infrastructure in Cassandra. But what does it mean in the real world? The following key points are the main take aways.

MTTR (Mean Time to Recovery): MTTR is a KPI (Key Performance Indicator) that is used to measure how quickly a system recovers from a failure. Zero Copy Streaming has a very direct impact here with a five fold improvement on performance.

Costs: Zero Copy Streaming is ~5x faster. This translates directly into cost for some organizations primarily as a result of reducing the need to maintain spare server or cloud capacity. In other situations where you’re migrating data to larger instance types or moving AZs or DCs, this means that instances that are sending data can be turned off sooner saving costs. An added cost benefit is that now you don’t have to over provision the instance. You get a similar streaming performance whether you use a i3.xl or an i3.8xl provided the bandwidth is available to the instance.

Risk Reduction: There is a great reduction in the risk due to Zero Copy Streaming as well. Since a Cluster’s recovery mainly depends on the streaming speed, Cassandra clusters with failed nodes will be able to recover much more quickly (5x faster). This means the window of vulnerability is reduced significantly, in some situations down to few minutes.

Finally, a benefit that we generally don’t talk about is the environmental benefit of this change. Zero Copy Streaming enables us to move data very quickly through the cluster. It objectively reduces the number and sizes of instances that are used to build Cassandra cluster. As a result not only does it reduce Cassandra’s TCO (Total Cost of Ownership), it also helps the environment by consuming fewer resources!

## Introducing Transient Replication

Transient Replication is a new experimental feature soon to be available in 4.0. When enabled, it allows for the creation of keyspaces where replication factor can be specified as a number of copies (full replicas) and temporary copies (transient replicas). Transient replicas retain the data they replicate only long enough for it to be propagated to full replicas, via incremental repair, at which point the data is deleted. Writing to transient replicas can be avoided almost entirely if monotonic reads are not required because it is possible to achieve a quorum of acknowledged writes without them.

This results in a savings in disk space, CPU, and IO. By deleting data as soon as it is no longer needed, transient replicas require only a fraction of the disk space of a full replica. By not having to store the data indefinitely, the CPU and IO required for compaction is reduced, and read queries are faster as they have less data to process.

So what are the benefits of not actually keeping a full copy of the data? Well, for some installations and use cases, transient replicas can be almost free if monotonic reads are disabled. In future releases where monotonic reads are supported with Transient Replication, enabling monotonic reads would reduce the savings in CPU and IO, but even then they should still be significant.

Transient Replication is designed to be transparent to applications:

• Consistency levels continue to produce the same results for queries.
• The number of replicas that can be lost before data loss occurs is unchanged.
• The number of replicas that can be unavailable before some queries start to timeout or return unavailable is unchanged (with the exception of ONE).

With Transient Replication, you can go from 3 replicas to 5 replicas, two of which are transient, without adding any hardware.

If you are running an active-passive 2 DC setup with 3 replicas in each DC, you can make one replica in each DC transient and still have four full copies of the data in total.

## Feature support

Transient Replication is not intended to fully replace Cassandra’s existing approach to replication. There are features that currently don’t work with transiently replicated keyspaces and features that are unlikely ever to work with them.

You can have keyspaces with and without Transient Replication enabled in the same cluster, so it is possible to use Transient Replication for just the use cases that are a good fit for the currently available functionality.

• Batch log
• LWT
• Counters

### Will never be supported:

• Secondary indexes
• Materialized views

## How Transient Replication works

### Overview

Transient replication extends Cassandra’s existing consistent hashing algorithm to designate some replicas of a point or range on the consistent hash ring as transient and some as full. The following image depicts a consistent hash ring with three replicas A, B, and C. The replicas are located at tokens 5, 10, 15 respectively. A key k hashes to token 3 on the ring.

Replicas are selected by walking the ring clockwise starting at the point on the ring the key hashes to. At RF=3, the replicas of key k **are **A, B, C. With Transient Replication, the last N replicas (where N is the configured number of transient replicas) found while walking the ring are designated as transient.

There are no nodes designated as transient replicas or full replicas. All nodes will fully replicate some ranges on the ring and transiently replicate others.

The following image depicts a consistent hash ring at RF=3/1 (three replicas, one of which is transient). The replicas of k are still A, B, and C, but C is now transiently replicating k.

Normally all replicas of a range receive all writes for that range, as depicted in the following image.

Transient replicas do not receive writes in the normal write path.

If sufficient full replicas are unavailable, transient replicas will receive writes.

This optimization, which is possible with Transient Replication, is called Cheap Quorums. This minimizes the amount of work that transient replicas have to do at write time, and reduces the amount of background compaction they will have to do.

Cheap Quorums and monotonic reads: Cheap Quorums may end up being incompatible with an initial implementation of monotonic reads, and operators will be able to make a conscious trade off between performance and monotonic reads.

### Rapid write protection

In keyspaces utilizing Transient Replication, writes are sent to every full replica and enough transient replicas to meet the requested consistency level (to make up for unavailable full replicas). In addition, enough transient replicas are selected to reach a quorum in every datacenter, though unless the consistency level requires it, the write will be acknowledged without ensuring all have been delivered.

Because not all replicas are sent the write, it’s possible that insufficient replicas will respond, causing timeouts. To prevent this, we implement rapid write protection, similar to rapid read protection, that sends writes to additional replicas if sufficient acknowledgements to meet the consistency level are not received promptly.

The following animation shows rapid write protection in action.

Rapid write protection is configured similarly to rapid read protection using the table option additional_write_policy. The policy determines how long to wait for acknowledgements before sending additional mutations. The default is to wait for P99 of the observed latency.

### Incremental repair

Incremental repair is used to clean up transient data at transient replicas and propagate it to full replicas.

When incremental repair occurs transient replicas stream out transient data, but don’t receive any. Anti-compaction is used to separate transient and fully replicated data so that only fully replicated data is retained once incremental repair completes.

The result of running an incremental repair is that all full replicas for a range are synchronized and can be used interchangeably to retrieve the repaired data set for a query.

Reads must always include at least one full replica and can include as many replicas (transient or full) as necessary to achieve the desired consistency level. At least one full replica is required in order to provide the data not available at transient replicas, but it doesn’t matter which full replica is picked because incremental repair synchronizes the repaired data set across full replicas.

Reads at transient replicas are faster than reads at full replicas because reads at transient replicas are unlikely to return any results if monotonic reads are disabled, and they haven’t been receiving writes.

## Creating keyspaces with Transient Replication

Transient Replication is supported by SimpleStrategy and NetworkTopologyStrategy. When specifying the replication factor, you can specify the number of transient replicas in addition to the total number of replicas (including transient replicas). The syntax for a replication factor of 3 replicas total with one of them being transient would be “3/1”.

ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'DC1' : '3/1'};
ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : '3/1'};


Monotonic reads are not supported with Transient Replication in 4.0, so any existing tables in the keyspace must have monotonic reads disabled by setting read_repair = 'NONE'

Once the keyspace has been altered, you will need to run incremental repair and then nodetool cleanup to ensure transient data is cleaned up.

## Operational matters

Transient replication requires rolling incremental repair to be run regularly in order to move data from transient replicas to full replicas. By default transient replicas will receive 1% of writes for transiently replicated ranges due to rapid write protection. If a node is down for an extended period of time, its transient replicas will receive additional write load and that data should be cleaned up using incremental repair. Running incremental repair regularly will ensure that the size of each repair is small.

It’s also a good idea to run a small number of vnodes with transient replication so that when a node goes down the load is spread out over several other nodes that transiently replicate that range. Larges numbers of vnodes are known to be problematic, so it’s best to start with a cluster that is already close to or at its maximum size so that a small number of vnodes will be sufficient. If you intend to grow the cluster in the future, you will need to be cognizant of how this will interact with the number of vnodes you select.

While the odds of any data loss should multiple nodes be permanently lost remain the same with transient replication, the magnitude of potential data loss does not. With 3/1 transient replication the permanent loss of two nodes could result in the loss of the entirety of the repaired data set. If you are running a multi-DC setup with a high level of replication such as 2 DCs, with 3/1 replicas in each, then you will have 4 full copies total and the added risk of transient replication is minimal.

## Experimental features

Experimental features are a relatively new idea for Apache Cassandra. Although we recently voted to make materialized views an experimental feature retroactively, Transient Replication is the first experimental feature to be introduced as such.

The goal of introducing experimental features is to allow for incremental development across multiple releases. In the case of Transient Replication, we can avoid a giant code drop that heavily modifies the code base, and the associated risks with incorporating a new feature that way.

What it means for a feature to be experimental doesn’t have a set definition, but for Transient Replication it’s intended to set expectations. As of 4.0, Transient Replication’s intended audience is expert operators of Cassandra with the ability to write the book on how to safely deploy Transient Replication, debug any issues that result, and if necessary contribute code back to address problems as they are discovered.

It’s expected that the feature set for Transient Replication will not change in minor updates to 4.0, but eventually it should be ready for use by a wider audience.

## Next steps for Transient Replication

If increasing availability or saving on capacity sounds good to you, then you can help make transient replication production-ready by testing it out or even deploying it. Experience and feedback from the community is one the of the things that will drive transient replication bug fixing and development.

## Step by Step Guide to Installing and Configuring Spark 2.0 to Connect with Cassandra 3.x

In this guide, we will be installing Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program. If you have any of these software packages installed and configured already you can skip that step. This guide assumes you have a Cassandra 3.x cluster that is already up and running. For more information on installing and using Cassandra visit http://cassandra.apache.org/.Note: The following steps should be performed on all the nodes in the cluster unless otherwise noted.

### Install Scala 2.11

Ensure you have Java installed

$java -version java version "1.8.0_91" Java(TM) SE Runtime Environment (build 1.8.0_91-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode) If you don't have Java installed follow this tutorial to get Java 8 installed. Install Scala 2.11.8$ wget www.scala-lang.org/files/archive/scala-2.11.8.deb
$sudo dpkg -i scala-2.11.8.deb$ scala -version

### Install SBT 0.13

echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823$ sudo apt-get update
$sudo apt-get install sbt ### Install Spark 2.0 Download Spark 2.0 from https://spark.apache.org/downloads.html and unpack the tar file$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
$tar zxf spark-2.0.2-bin-hadoop2.7.tgz$ sudo mv spark-2.0.2-bin-hadoop2.7/ /usr/local/spark/

Update system variables

$sudo nano /etc/environment Add an environment variable called SPARK_HOME export SPARK_HOME=/usr/local/spark At the end of the PATH variable add $SPARK_HOME/bin

PATH="<previous_entries>:/usr/local/spark/bin"

Refresh the environment

source /etc/environment

Create spark user and make it the owner of the SPARK_HOME directory

$sudo adduser spark --system --home /usr/local/spark/ --disabled-password$ sudo chown -R spark:root /usr/local/spark

Create Log and Pid directories

$sudo mkdir /var/log/spark$ sudo chown spark:root /var/log/spark
$sudo -u spark mkdir$SPARK_HOME/run

### Create the Spark Configuration files

Create the Spark Configuration files by copying the templates

$sudo cp /usr/local/spark/conf/spark-env.sh.template /usr/local/spark/conf/spark-env.sh$ sudo cp /usr/local/spark/conf/spark-defaults.conf.template /usr/local/spark/conf/spark-defaults.conf
$sudo chown spark:root /usr/local/spark/conf/spark-* Edit the Spark Environment file spark-env.sh export SPARK_LOG_DIR=/var/log/spark export SPARK_PID_DIR=${SPARK_HOME}/run

### Configure Spark nodes to join cluster

If you will not be managing Spark using the Mesos or YARN cluster managers, you'll be running Spark in what is called "Standalone Mode".
In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then on every worker node you must edit the /etc/spark/conf/spark-env.sh to point to the host where the Spark Master runs.

# Options for the daemons used in the standalone deploy mode
export SPARK_MASTER_HOST=<spark_master_ip_or_hostname_here>

You can also change other elements of the default configuration by editing the /etc/spark/conf/spark-env.sh. Some other configs to consider are:
• SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
• SPARK_WORKER_CORES, to set the number of cores to use on this machine
• SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
• SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
• SPARK_WORKER_INSTANCE, to set the number of worker processes per node
• SPARK_WORKER_DIR, to set the working directory of worker processes

### Installing Spark as a service

Run the following commands to create a service for the spark-master and spark-worker

$sudo cp /etc/init.d/skeleton /etc/init.d/spark-master$ sudo chmod 0755 /etc/init.d/spark-master
$sudo cp /etc/init.d/skeleton /etc/init.d/spark-worker$ sudo chmod 0755 /etc/init.d/spark-worker
$sudo update-rc.d spark-master defaults 99$ sudo update-rc.d spark-worker defaults 99

Edit the /etc/init.d/spark-worker file. If a variable or function already exists then replace it with the text below.

DESC="Spark Worker"
NAME=spark-worker
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.worker.Worker-1.pid export SPARK_HOME # Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0

if [ -f $SPARK_HOME/conf/spark-env.sh ];then .$SPARK_HOME/conf/spark-env.sh
else
echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi # # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() { CMD_PATT="org.apache.spark.deploy.worker.Worker" if [ -f$PIDFILE ]; then
pid=cat $PIDFILE grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0 return 1 fi return 3 } # # Function that starts the daemon/service # do_start() { # Return # 0 if daemon has been started # 1 if daemon was already running # 2 if daemon could not be started [ -e dirname "$PIDFILE" ] || \
install -d -ospark -groot -m755 dirname $PIDFILE start-stop-daemon --start --quiet --chuid spark --pidfile$PIDFILE  \
--exec $SPARK_HOME/sbin/start-slave.sh \ --test > /dev/null \ || return 1 start-stop-daemon --start --quiet --chuid spark --pidfile$PIDFILE \
--exec $SPARK_HOME/sbin/start-slave.sh -- spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \ || return 2 } # # Function that stops the daemon/service # do_stop() { start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile$PIDFILE
RETVAL="$?" rm -f$PIDFILE
return "$RETVAL" } # # Function that sends a SIGHUP to the daemon/service # do_reload() { # # If the daemon can reload its configuration without # restarting (for example, when it is sent a SIGHUP), # then implement that here. # start-stop-daemon --stop --signal 1 --quiet --pidfile$PIDFILE
return 0
}

...
status)
is_running
stat=$? case "$stat" in
0) log_success_msg "$DESC is running" ;; 1) log_failure_msg "could not access pidfile for$DESC" ;;
*) log_success_msg "$DESC is not running" ;; esac exit "$stat"
;;
...

Edit the /etc/init.d/spark-master file. If a variable or function already exists then replace it with the text below.

DESC="Spark Master"
NAME=spark-master
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.master.Master-1.pid export SPARK_HOME # Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0

if [ -f $SPARK_HOME/conf/spark-env.sh ];then .$SPARK_HOME/conf/spark-env.sh
else
echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi # # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() { CMD_PATT="org.apache.spark.deploy.master.Master" if [ -f$PIDFILE ]; then
pid=cat $PIDFILE grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0 return 1 fi return 3 } # # Function that starts the daemon/service # do_start() { # Return # 0 if daemon has been started # 1 if daemon was already running # 2 if daemon could not be started [ -e dirname "$PIDFILE" ] || \
install -d -ospark -groot -m755 dirname $PIDFILE start-stop-daemon --start --quiet --chuid spark --pidfile$PIDFILE --exec $SPARK_HOME/sbin/start-master.sh --test > /$
|| return 1
start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec$SPARK_HOME/sbin/start-master.sh  \
|| return 2
}

#
# Function that stops the daemon/service
#
do_stop()
{
start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE RETVAL="$?"
rm -f $PIDFILE return "$RETVAL"
}

#
# Function that sends a SIGHUP to the daemon/service
#
#
# If the daemon can reload its configuration without
# restarting (for example, when it is sent a SIGHUP),
# then implement that here.
#
start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE return 0 } ... status) is_running stat=$?
case "$stat" in 0) log_success_msg "$DESC is running" ;;
1) log_failure_msg "could not access pidfile for $DESC" ;; *) log_success_msg "$DESC is not running" ;;
esac
exit "$stat" ;; ... ### Running Spark as a service Start the Spark master node first. On whichever node you've selected to be master, run:$ sudo service spark-master start

On all the other nodes, start the workers:

$sudo service spark-worker start To stop Spark, run the following commands on the appropriate nodes$ sudo service spark-worker stop
$sudo service spark-master stop Service logs will be stored in /var/log/spark. ### Testing the Spark service To test the Spark service, start spark-shell on one of the nodes.$ spark-shell --master spark://<IP>:<Port>

When the prompt comes up, execute the following line of code:

$scala> sc.parallelize( 1 to 1000 ).sum() ### Get Spark-Cassandra-Connector on the client The spark-cassandra-connector is a Scala library that exposes Cassandra tables as Spark RDDs, lets you write Spark RDDs to Cassandra tables, and allows you to execute arbitrary computations and CQL queries that are distributed to the Cassandra nodes holding the data, which allows them to be fast. Your code + the spark-cassandra-connector and all dependencies are packaged up and sent to the Spark nodes. If you are writing ad-hoc queries / computations from the spark-shell. Start up the shell by running the command:$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11

The --packages option downloads the connector and all of its dependencies from the Spark Packages site and places it in the path of the Spark Driver and all Spark Executors.
If you are writing a Scala application; configure a new Scala project. Your build.sbt file should look something like this:

name := "MySparkProject"

version := "1.0"

scalaVersion := "2.11.8"

val sparkVersion = "2.0.2"

resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq(
"org.apache.spark"      % "spark-core_2.11"  % sparkVersion,
"org.apache.spark"      % "spark-sql_2.11"   % sparkVersion,
"datastax"              % "spark-cassandra-connector" % "2.0.0-M2-s_2.11"
)

### Testing the connector

To start out, create a simple keyspace and table in Cassandra. Run the following statements in cqlsh:

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);

Then insert some example data:

INSERT INTO test.kv(key, value) VALUES ('key1', 1);
INSERT INTO test.kv(key, value) VALUES ('key2', 2);

For this test, we'll use the spark-shell.

$spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11 // import the spark connector namespace import com.datastax.spark.connector._ // read Cassandra data into an RDD val rdd = sc.cassandraTable("test", "kv") println(rdd.count) println(rdd.first) println(rdd.map(_.getInt("value")).sum) // Add two more rows to the table val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) collection.saveToCassandra("test", "kv", SomeColumns("key", "value")) ## Diagnosing and Fixing Cassandra Timeouts We recently brought Cassandra into our infrastructure to take advantage of its robust built-in scalability model. After reading all the "marketing" material, watching/attending talks by evangelists, and reading blog posts by experts and enthusiasts we were excited to embark on our Cassandra journey. via GIPHY Everything was honky dory in the early going. We followed data modeling best practices, provisioned the right sized servers as recommended by DataStax, and saw great results in the test environment. The issues didn't start until we started moving over portions of our production traffic to our Cassandra production servers. That's when we noticed that we would fairly frequently get read and write timeouts as the volume of traffic got higher. My first troubleshooting step was to find out was to monitor the servers to see if there were any anomalies or resource constraints that led to the timeouts. I'm a fan of Brendan Gregg's USE method for monitoring overall system health and identifying errors and bottlenecks. However, at the time we didn't have a comprehensive monitoring solution like Datadog or Icinga2 setup for those servers. So in its place, I wrote a shell script that every 30 seconds would log memory and CPU utilization, disk I/O, network I/O, Cassandra thread pool stats (nodetool tpstats), GC stats, compaction stats, and the table stats for my most active tables. To my chagrin, my hacked together monitor did not reveal any anomalies or resource constraints that were the cause of the timeouts. On the contrary, it revealed that resource utilization (with the exception of CPU which was a bit high) was well under the hardware limits. I did notice that a number of times, a query timeout would coincide with the start of SSTable compaction which led down the rabbit hole of compaction tweaks, however I emerged from that hole with no solution to the Cassandra timeouts. Once hardware constraints had been ruled out, I kicked off my next round of troubleshooting efforts by turning on trace probability and looking at the trace for long running queries. In the traces I noticed high queue times, which is not unexpected during times of high load, but I also noticed something else. There were a number of times when a worker node received the request from the coordinator, completed the read operation for the query, queued up the result to be sent back to the coordinator, but the results were never sent and the coordinator timed out waiting for the response. Bingo! I had found the cause of the timeout! But I still hadn't found the root cause; why were the responses not getting sent back? And I didn't know how to fix it either. I spent a good amount of time looking into tweaking Cassandra thread pools (like the RequestResponseStage pool), reading the Cassandra source code on GitHub and re-verifying network traffic/congestion between the nodes during high loads. But those efforts yielded no solution to the query timeouts which led me to the third round of troubleshooting efforts. For this round of troubleshooting I turned up the logging level on the org.apache.cassandra logger to DEBUG and I live tailed both the Cassandra system.log and debug.log. That's when I noticed something weird; several times, one of the nodes will detect the other as being down for a period of time. When I looked at the system logs and Linux process activity in htop for the supposedly downed node, it appeared that the node was up and running. This lead me to take a fine-grained look at the system.log for the downed node and in there I noticed a pretty long GC pause. So I flipped over to the gc.log, examined it, and noticed even more long GC pauses. I also noticed that at times there were 35 second pauses that would occur in succession. Those super long GC pauses cause the node to appear down to its peers. It also explained cases where a coordinator sent a request and timed out because the worker node paused execution and didn't return the response in a timely manner. I had found my problem! So how did this happen? Well, when we were configuring Cassandra, some doofus (aka me!) noticed that 1. The key cache drastically improved the performance of queries 2. They had a high key cache hit rate and continued to have a high hit rate even as they increased the size of the key cache. So this doofus (again, me), decided to bump the key cache all the way to 8 GB so he could cache entire tables' keys, essentially turning them into in-memory lookups. To accommodate having such a large key cache, he bumped up the JVM heap to 20 GB thinking the new G1GC garbage collector can handle very large heaps. News flash; it can't! At least not when dealing with my workload of high volumes of concurrent reads and writes. The fix was simple; reduce the size of the JVM heap and the key cache accordingly. The recommended heap size is 8 GB. I'm able to have my new max heap size set to slightly above that recommendation because I switched my memtables to storing part of its data off-heap (offheap_buffers) which greatly reduced heap memory pressure. I also started using the JEMAlloc allocator which is more efficient than the native GCC allocator. In conclusion, distributed database systems are complex beasts and tuning them has to be done on many levels. While Cassandra gives you many knobs to tune to get the right performance out of your system, it takes a lot of study, experience or both, to know just how to tune them for your specific hardware and workload. DataStax published this useful Cassandra JVM tuning article to help provide guidance for users. But my advice for you would be to try a number of JVM configs and find out what works for your production workload. If I were doing this exercise over, I would start at the hardware level again but with better monitoring tools and then I would move to the system/debug/gc logs before looking at the traces. There is a good performance tuning tutorial by Kiyu Gabriel on DataStax Academy that you might find useful. Hope this article helps someone be more productive than I was. ## Notes from Cassandra Day Atlanta 2016 I attended the DataStax Cassandra 2016 in Atlanta and took down a ton of notes on things that I found interesting. After going through those notes it occurred to me that many of the nuggets in these notes could be useful to someone else other than myself. So I’ve published the notes below. The information below is mostly composed of quotes from DataStax engineers and evangelists. Very little context is contained in these notes. However, if you are a beginning-intermediate level Cassandra developer or admin you’ll likely have the needed context already. I did attempt to organize the notes somewhat coherently in order to allow you jump to a section you care about and also to provide some context in the grouping. ## Data Modeling Tips ### General • When migrating to C*, don’t just port over your SQL schema. Be query-driven in the design of your schema. • If you are planning on using Spark with C*, start with a C* use case / data model 1st and then use Spark on top of it for analytics • Patrick McFadden (DataStax evangelist) on not having certain relational DB constructs in C*: “In the past I’ve scaled SQL DBs by removing referential integrity, indexes and denormalizing. I’ve even built a KV database on an Oracle DB that I was paying  of dollars per core for”. The implication here is these constructs bound scalability in relational databases and in explicitly not having them Cassandra’s scalability is unbounded (well, at least theoretically). • You can stop partition hotspots by adding an additional column to the partition key (like getting the modulus of another column when divided by the number of nodes) or by increasing the resolution of the key in the case where the partition key is a time span. • Using the “IF NOT EXISTS” clause stops an UPSERT from happening automatically / by-default. It also creates a lock on the record while executing, so that multiple writers don’t step on each other trying to insert the same record in a race condition. This is a light weight transaction (LWT). You can also create an LWT when doing a BATCH UPSERT • You can set a default TTL (Time To Live) on an individual table. This will apply to all data inserted into the table. A CQL insert can also specify a TTL for the inserted data that overrides the default. • DTCS (DateTieredCompactionStrategy) compaction is built for time series data. It groups SSTables together by time so that older tables don’t get compacted and can be efficiently dropped if a TTL is set. • CQL Maps allow you to create complex types inside your data store • One of the reasons for limiting the size of elements that can be in a CQL collection is because on reads the entire collection must be denormalized as a whole in the JVM so you can add a lot of data to the heap. ### Secondary indexes • Secondary indexes are not like you have them in relational DBs. They are not built for speed, they are built for access. • Secondary indexes get slower the more nodes you have (because of network latencies) • Best thing to do with a secondary index is just to test it out and see its performance, but do it on a cluster not your laptop so you can actually see how it would perform in prod. Secondary indexes are good for low cardinality data. ## Development Tips ### Querying • Use the datastax drivers not ODBC drivers because datastax drivers are token aware and therefore can send queries to the right node, removing the need for the coordinator to make excessive network requests depending on the consistency level. • Use PreparedStatements for repeated queries. The performance difference is significant. • Use ExecuteAsync with PreparedStatements when bulk loading. You can have callbacks on Future objects and use the callbacks for things like detecting a failure and responding appropriately • BATCH is not a performance optimization. It leads to garbage collection and hotspots because the data stays in memory on the coordinator. • Use BATCH only to update multiple tables at once atomically. An example is if you have a materialized view / inverted index table that needs to be kept in sync with the main table. ### General • Updates on collections create range tombstones to mark the old version of the collection (map, set, list) as deleted & create the new one. This is important to know because tombstones affect read performance and at a certain time having too many tombstones (100K) can cause a read to fail. http://www.jsravn.com/2015/05/13/cassandra-tombstones-collections.html • Cassandra triggers should be used with care and only in specific use cases because you need to consider the distributed nature of C*. ## Ops Tips ### Replication Settings • SimpleStrategy fails if you have multiple datacenters (DCs). Because 50% of your traffic that’s going to the other DC becomes terribly slow. Use NetworkTopologyStrategy instead. You can configure how replication goes to each DC individually, so you can have a table that never gets replicated to the US for example, etc. • If you are using the NetworkTopologyStrategy then you should use the Gossiping Property File Snitch to make C* network topology aware instead of the other property file configurator because you dan’t now have to change the file on every node and reboot them. ### Hardware Sizing Recommended Node size • 32 GB RAM • 8-12 Cores • 2 TB SSD Hardware should be sized appropriately. 64 cores will be hard to use. If you are adding search and/or analytics to the node, you need more RAM: 128+ GB. More memory is needed for search because it keeps its indexes in memory. Recommendation for Spark & Cassandra on the same node: Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations. The recommendation is to have no more that 1 TB of data per node. The reason for 2 TB disk despite a 1 TB recommendation is because once over 60% of your disk is full you run a risk of not having enough disk space during compaction. This is especially true if you are using size tiered compaction. With level tiered compaction you can use up to 70% without risk. Use RAID 0 for your storage configuration. C* does replication for you. You can also use JBOD and C* can intelligently handle failures of some of the disks in your JBOD cluster. ### Java Heap Size & Garbage Collection • As a general rule of thumb; start with defaults and then walk it up. • The ParNew/CMS GC works best with 8 GB • The G1GC can manage 20 GB of RAM (Note: Another engineer mentioned to me that 32 GB of RAM is no problem for G1GC). Should not be used if the heap is under 8 GB. • Use G1GC with Solr / DSE Search nodes ### Memory Usage and Caching • Its very important to have ample Off-heap RAM. Some C* data structures such as memtables and bloom filters are Off-heap. You also want to have non-heap RAM for page caching. • Row caching can significantly speed up reads because if avoids a table scan (If the page isn’t cached already). However row caching should be judiciously used. Best use case is for tables with a high density of hotspot data. The reason being that on a large table with varying and disparate data and seemingly random reads, you’ll end up with a lot of cache misses which invalidates the point of having a cache. • The row cache is filled on reads. memtables are filled on writes. • Memtables remain in memory until there is memory pressure based on configuration in the cassandra.yaml, then they are removed from RAM. ### Benchmarking • Use the Cassandra Stress program that comes with C*. • Cassandra Stress can be configured; you can specify number of columns, data size, data model, queries, types of data, cardinality, etc. • To model production, use multiple clients & multiple threads for clients in your Benchmarking • When stress testing make sure you run it long enough to run into compactions, GC, repairs. Because when you test you want to know what happens in that situation. You can even stress test and introduce failures and see how it responds. You can/should instrument the JVM during stress testing and then go back and look at it. • General recommended stress test times is 24 - 48 hrs run times. • DSE has solr-stress for testing the solr integration. For Performance A starting expectation of 3k - 5k transactions per second per core is reasonable. Interesting Note: A DataStax customer once conducted a stress test that ran for 6-8 weeks for 24 hrs. They were testing to see how changing the compaction strategy impacted their read heavy workload. ### General • Turn on user authentication. At least Basic Auth. This is good for security and auditing purposes. Also it allows you to not accidentally drop a production table because you thought you were connected to staging. • Use TLS if you are talking between DCs across the public internet • Don’t just bump up the heap for greater performance or to solve your problems! You’ll have to pay for it later during GC. • If you have crappy latency on 1% of your operations you shouldn’t just ignore it. You should try to understand what happened, is it compaction? Is it GC? That way you can address the issue that caused the high latency. Because that 1% could one day be 5%. • Why should be have backups? Backups exist to protect against people not machines. data corruption is the primary reason for backups. For example someone accidentally changes all the '1’s in your DB to 'c’s. • There is no built in way to count the number of rows in a Cassandra table. The only way to do so is to write a Spark job. You can estimate the table size if you know the amount of data per row and divide the table size by that amount. • Use ntpd! C* nodes in a cluster must always be on time because time stamps are important and are used in resolving conflict. Clock drifts cannot be tolerated. • Tombstone Hell: queries on partitions with a lot of tombstones require a lot of filtering which can cause performance problems. Compaction gets rid of tombstones. • Turn off swap on C* nodes. • If C* runs out of memory it just dies. But that’s perfectly ok, because the data is distributed / replicated and you can just bring it back up. In the mean time data will be read from the other nodes. ### Cluster Management • Don’t put a load balancer in front of your C* cluster. • Make sure you are running repairs. Repairs are essentially network defrag and help maintain consistency. Run repairs a little at a time, all the time. • If you can model your data to have TTLs you can run repairs much less or not at all. • If you never delete your data you can set gc_grace_period to 0. • Don’t upgrade your C* versions by replacing an outgoing node with a new node running a newer version of C*. C* is very sensitive when it comes to running mixed versions in production. The older nodes may not be able to stream data to the newer node. Instead you should do an in-place upgrade, i.e. shut down the node (the C* service), upgrade C* and then bring it back up. (https://docs.datastax.com/en/upgrade/doc/upgrade/cassandra/upgradeCassandraDetails.html) • When a new node is added in order to increase storage capacity / relieve storage pressure on the existing nodes. Ensure you run nodetool cleanup as the final step. This is because C* won’t automatically reclaim the space of the data streamed out to the new node. ### Monitoring, Diagnostic Monitoring Services for capturing machine level metrics • Monit • Munin • Icinga • JMX Metrics Make sure you are capturing application metrics and deliver them to a dashboard that can integrate app metrics and server metrics Since you are running on multiple machines it becomes important to aggregate your logs. • Logstash Diagnostic tools • htop (a better version of top) • iostat • dstat • strace • jstack • tcpdump (monitor network traffic, can even see plain text queried coming in) • nodetool tpstats (can help diagnose performance problems by showing you which thread pools are overwhelmed / blocked. From there you can make hypotheses are to the cause of the blockage / performance problem) ## DSE Offering DSE Max => Cassandra + Support + Solr + Spark ### DSE search • Solr fixes a couple of rough edges for C* like joins, ad-hoc querying, fuzzy text searching and secondary indexing problems in larger clusters. • DSE search has tight Solr integration with C*. C* stores the data, Solr stores the indexes. CQL searches that use the solr_query expression in the WHERE clause search Solr first for the location of the data to fetch and then queries C* for the actual data. • You can checkout killrvideo’s Github for an example of DSE search in action (https://github.com/LukeTillman/killrvideo-csharp/blob/master/src/KillrVideo.Search/SearchImpl/DataStaxEnterpriseSearch.cs) • Solr is about a 3x multiplication on CPU and RAM needed for a running regular C*. This is because Solr indexes must live in RAM. • Solr can do geospatial searches & can do arbitrary time range searches (which is another rough edge that C* cannot do). E.g. “search for all sales in the past 4 mins 30 seconds” ### DSE Spark • Spark runs over distributed data stores and schedules analytics jobs on workers on those nodes. DSE Max has Spark integration that just requires the flipping of a switch, no additional config. • There’s no need for definition files, workers automatically have access to the tables and Spark is data locality aware so jobs go to the right node. • Optionally with DSE search integration you can have search running on the same nodes that have the analytics and data and leverage the search indexes for faster querying instead of doing table scans. • With DSE analytics, you can create an analytics DC and have 2-way replication between the operations DC and the analytics DC. 2 way is important because it means that the analytics DC can store the result of its computation to the Cassandra table which then gets replicated back to the ops DC. • The Spark jobs / workers have access to more than just the C* table data. They can do just about anything you code. They can pull data from anything; open files, read queues, JDBC data stores, HDFS, etc. And write data back out as well. • Recommendation for Spark & Cassandra on the same node. Appropriate resource allocation is important. Having Spark will require more memory. Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations. ## Other Notes Cassandra has a reference implementation called killrvideo. It is an actual website hosted on MS Azure. The address is killrvideo.com. It is written by Luke Tillman in C#. Checkout the source code on Github (https://github.com/LukeTillman/killrvideo-csharp). ## Configuring Remote Management and Monitoring on a Cassandra Node with JConsole In your cassandra-env.sh set LOCAL_JMX=no If you want username and password security. Keep the default setting for jmxremote authenticate which is true. Otherwise set it to false: JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false"

Note: If set to true, enter a username and password for the jmx user in the jmxremote.password.file. Follow the instructions here on how to secure that file. Setting jmxremote.authenticate to true also requires you to pass in the username and password when running a nodetool command, e.g. nodetool status -u cassandra -pw cassandra

Restart the server (if needed).

#### Connecting to the node using JConsole for monitoring

Find the JConsole jar in your JDK_HOME/bin or JDK_HOME/lib directory. If you don’t have the Java JDK installed it can be downloaded from the Oracle Website.

Double-click the executable jar to run it (or run it from the command line). Select “Remote Process” and enter the following connection string.

service:jmx:rmi:///jndi/rmi://<target ip address>:7199/jmxrmi

replacing <target ip address> with the address of the machine you intend to manage or monitor.

Note: JConsole may try (and fail) to connect using ssl first. If it does so it will ask if it can connect over a non-encrypted connection. You should answer this prompt in the affirmative and you are good.

Congrats! You now have everything you need to monitor and manage a cassandra node.

For help with how to monitor Cassandra using JMX and with interpreting the metrics see:

## The Right Database for the Right Job - Chattanooga Developer Lunch Presentation

Does this sound like you? "OMG!! PostreSQL, Neo4j, Elasticsearch, MongoDB, RethinkDB, Cassandra, SQL Server, Riak, InfluxDB, Oracle NoSQL, SQLite, Hive, Couchbase, CouchDB, DynamoDB. I've got an issue with my current database solution or I'm starting a new project and I don't know what to choose!"

This talk is intended to help you match your data storage needs with suitable solutions from a wide field of contenders. Looking at different data types, structures and interaction patterns, we will try to understand what makes certain data stores better suited than others and how implement polyglot persistence.