Registration Now Open for DataStax Accelerate!

I’m very excited to announce that registration is now open for DataStax Accelerate—the world’s premier Apache CassandraTM conference. The call for papers is also open.

Today’s enterprises are facing massive shifts in the use of real time data to create differentiated experiences through internally and externally facing applications. Simple, single-cloud deployments are insufficient for these globally distributed and scalable applications and requires a hybrid and multi-cloud approach.

This demands new ways to think about data management and how enterprises can make the most of their data while still keeping it portable and secure.

To discuss these trends and the technologies that underpin them, we are bringing together the best minds and most cutting-edge innovations in distributed database technology for DataStax Accelerate.  This premier conference will be held May 21-23, 2019 at the Gaylord National Resort and Convention Center in Oxon Hill, Maryland, just outside Washington, D.C.

Mark your calendar—because you will not want to miss it.

DataStax Accelerate will feature separate executive and technical tracks, as well as training, hands-on sessions, an exhibitor pavilion, networking, and a full presentation agenda from DataStax executives, customers, and partners.

You’ll be able to learn from your peers, industry experts, and thought leaders on how Apache Cassandra and DataStax can help you build and deploy game-changing enterprise applications and easily scale your data needs to fit your company’s growth and today’s hybrid and multi-cloud world.

Additionally, you can learn from:

  • Deep technical sessions for developers, administrators, and architects on DataStax and Apache Cassandra internals, theory, and operations
  • An informative and engaging showcase of some of the world’s largest enterprise companies that are running their strategic business initiatives with DataStax
  • In-depth and hands-on workshops around DataStax Enterprise 6.0 advanced technologies, like Graph, Solr, Kafka and SPARK
  • Face-time with DataStax engineers in various workshops and breakout sessions
  • Designer and innovator sessions on how to accelerate your hybrid and multi-cloud deployments using DataStax’s unique masterless database architecture that is always-on, active everywhere, and infinitely scalable

Register now!

Custom commands in cstar

Welcome to the next part of the cstar post series. The previous post introduced cstar and showed how it can run simple shell commands using various execution strategies. In this post, we will teach you how to build more complex custom commands.

Basic Custom Commands

Out of the box, cstar comes with three commands:

$ cstar
usage: cstar [-h] {continue,cleanup-jobs,run} ...

cstar

positional arguments:
  {continue,cleanup-jobs,run}
    continue            Continue a previously created job (*)
    cleanup-jobs        Cleanup old finished jobs and exit (*)
    run                 Run an arbitrary shell command

Custom commands allow extending these three with anything one might find useful. Adding a custom command to cstar is as easy as placing a file to ~/.cstar/commands or /etc/cstar/commands. For example, we can create ~/.cstar/commands/status that looks like this:

#!/usr/bin/env bash
nodetool status

With this file in place, cstar now features a brand new status command:

$ cstar
usage: cstar [-h] {continue,cleanup-jobs,run,status} ...

cstar

positional arguments:
  {continue,cleanup-jobs,run,status}
    continue            Continue a previously created job (*)
    cleanup-jobs        Cleanup old finished jobs and exit (*)
    run                 Run an arbitrary shell command
    status

A command like this allows us to stop using:

cstar run --command "nodetool status" --seed-host <host_ip>

And use a shorter version instead:

cstar status --seed-host <host_ip>

We can also declare the command description and default values for cstar’s options in the command file. We can do this by including commented lines with a special prefix. For example, we can include the following lines in our ~/.cstar/commands/status file:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: all
# C* description: Run nodetool status

nodetool status

Once we do this, the status will show up with a proper description in cstar’s help and running cstar status --seed-host <host_ip> will be equivalent to:

cstar status --seed-host <host_ip> --cluster-parallel --dc-parallel --strategy all

When cstar begins the execution of a command, it will print an unique ID of the command being run. This ID is needed for resuming a job but more on this later. We also need the job ID to examine the output of the commands. We can find the output in:

$ ~/.cstar/jobs/<job_id>/<hostname>/out

Parametrized Custom Commands

When creating custom commands, cstar allows declaring custom arguments as well. We will explain this feature by introducing a command that deletes snapshots older than given number of days.

We will create a new file, ~/.cstar/commands/clear-snapshots, that will start like this:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: all
# C* description: Clear snapshots older than given number of days
# C* argument: {"option":"--days", "name":"DAYS", "description":"Snapshots older than this many days will be deleted", "default":"7", "required": false}

The new element here is the last line starting with # C* argument:. Upon seeing this prefix, cstar will parse the remainder of the line as a JSON payload describing the custom argument. In the case above, cstar will:

  • Use --days as the name of the argument.
  • Save the value of this argument into a variable named DAYS. We will see how to access this in a bit.
  • Associate a description with this argument.
  • Use 7 as a default value.
  • Do not require this option.

With this file in place, cstar already features the command in its helps:

$ cstar
usage: cstar [-h] {continue,cleanup-jobs,run,status,clear-snapshots} ...

cstar

positional arguments:
  {continue,cleanup-jobs,run,clear-snapshots}
    continue            Continue a previously created job (*)
    cleanup-jobs        Cleanup old finished jobs and exit (*)
    run                 Run an arbitrary shell command
    status              Run nodetool status
    clear-snapshots     Clear snapshots older than given number of days
    

$ cstar clear-snapshots --help
usage: cstar clear-snapshots [-h] [--days DAYS]
                             [--seed-host [SEED_HOST [SEED_HOST ...]]]
                             ...
                             <other default options omitted>

optional arguments:
 -h, --help            show this help message and exit
 --days DAYS
                       Snapshots older than this many days will be deleted
 --seed-host [SEED_HOST [SEED_HOST ...]]
                       One or more hosts to use as seeds for the cluster (edited)
 ...
 <other default options omitted>

Now we need to add the command which will actually clear the snapshots. This command needs to do three things:

  • Find the snapshots that are older than given number of days.
    • We will use the -mtime filter of the find utility:
    • find /var/lib/cassandra/*/data/*/*/snapshots/ -mtime +"$DAYS" -type d
    • Note we are using "$DAYS" to reference the value of the custom argument.
  • Extract the snapshot names from the findings.
    • We got absolute paths to the directories found. Snapshot names are the last portion of these paths. Also, we will make sure to keep each snapshot name only once:
    • sed -e 's#.*/##' | sort -u
  • Invoke nodetool clearsnapshot -t <snapshot_name> to clear each of the snapshots.

Putting this all together, the clear-snapshots file will look like this:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: all
# C* description: Clear up snapshots older than given number of days
# C* argument: {"option":"--days", "name":"DAYS", "description":"Snapshots older than this many days will be deleted", "default":"7", "required": false}

find /var/lib/cassandra/data/*/*/snapshots/ -mtime +"$DAYS" -type d |\
sed -e 's#.*/##' |\
sort -u |\
while read line; do nodetool clearsnapshot -t "${line}"; done

We can now run the clear-snpahsots command like this:

$ cstar clear-snapshots --days 2 --seed-host <seed_host>

Complex Custom Commands

One of the main reasons we consider cstar so useful is that the custom commands can be arbitrary shell scripts, not just one-liners we have seen so far. To illustrate this, we are going to share two relatively complicated commands.

Upgrading Cassandra Version

The first command will cover a rolling upgrade of the Cassandra version. Generally speaking, the upgrade should happen as quickly as possible and with as little downtime as possible. This is the ideal application of cstar’s topology strategy: it will execute the upgrade on as many nodes as possible while ensuring a quorum of replicas stays up at any moment. Then, the upgrade of a node should follow these steps:

  • Create snapshots to allow rollback if the need arises.
  • Upgrade the Cassandra installation.
  • Restart the Cassandra process.
  • Check the upgrade happened successfully.

Clearing the snapshots, or upgrading SSTables is something that should not be part of the upgrade itself. Snapshots being just hardlinks will not consume excessive space and Cassandra (in most cases) can operate with older SSTable versions. Once all nodes are upgraded, these actions are easy enough to perform with dedicated cstar commands.

The ~/.cstar/commands/upgrade command might look like this:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: topology
# C* description: Upgrade Cassandra package to given target version
# C* argument: {"option":"--snapshot-name", "name":"SNAPSHOT_NAME", "description":"Name of pre-upgrade snapshot", "default":"preupgrade", "required": false}
# C* argument: {"option":"--target-version", "name":"VERSION", "description":"Target version", "required": true}

# -x prints the executed commands commands to standard output
# -e fails the entire script if any of the commands fails
# -u fails the script if any of the variables is not bound
# -o pipefail instrucs the interpreter to return right-most non-zero status of a piped command in case of failure
set -xeuo pipefail

# exit if a node is already on the target version
if [[ $(nodetool version) == *$VERSION ]]; then
  exit 0
fi

# create Cassandra snapshots to allow rollback in case of problems
nodetool clearsnapshot -t "$SNAPSHOT_NAME"
nodetool snapshot -t "$SNAPSHOT_NAME"

# upgrade Cassandra version
sudo apt-get install -y cassandra="$VERSION"

# gently stop the cassandra process
nodetool drain && sleep 5 && sudo service cassandra stop

# start the Cassandra process again
sudo service cassandra start

# wait for Cassandra to start answering JMX queries
for i in $(seq 60); do
    if ! nodetool version 2>&1 > /dev/null; then
        break
    fi
    sleep 1s
done

# fail if the upgrade did not happen
if ! [[ $(nodetool version) == *$VERSION ]]; then
  exit 1
fi

When running this command, we can be extra-safe and use the --stop-after option:

$ cstar upgrade --seed-host <host_name> --target-version 3.11.2 --stop-after 1

This will instruct cstar to upgrade only one node and exit the execution. Once that happens, we can take our time to inspect the node to see if the upgrade went smoothly. When we are confident enough, we can resume the command. Output of each cstar command starts with a highlighted job identifier, which we can use with the continue command:

$ cstar continue <job_id>

Changing Compaction Strategy

The second command we would like to share performs a compaction strategy change in a rolling fashion.

Compaction configuration is a table property. It needs an ALTER TABLE CQL statement execution to change. Running a CQL statement is effective immediately across the cluster. This means once we issue the statement, each node will react to the compaction change. The exact reaction depends on the change, but it generally translates to increased compaction activity. It is not always desirable to have this happen: compaction can be an intrusive process and affect the cluster performance.

Thanks to CASSANDRA-9965, there is a way of altering compaction configuration on a single node via JMX since Cassandra version 2.1.9. We can set CompactionParametersJson MBean value and change the compaction configuration the node uses. Once we know how to change one node, we can have cstar do the same but across the whole cluster.

Once we change the compaction settings, we should also manage the aftermath. Even though the change is effective immediately, it might take a very long time until each SSTable undergoes a newly configured compaction. The best way of doing this is to trigger a major compaction and wait for it to finish. After a major compaction, all SSTables are organised according to the new compaction settings and there should not be any unexpected compaction activity afterwards.

While cstar is excellent in checking which nodes are up or down, it does not check for other aspects of nodes health. It does not have the ability to monitor compaction activity. Therefore we should include the wait for major compaction in the command we are about to build. The command will then follow these steps:

  • Stop any compactions that are currently happening.
  • Set the CompactionParametersJson MBean to the new value.
    • We will use jmxterm for this and assume the JAR file is already present on the nodes.
  • Run a major compaction to force Cassandra to organise SSTables according to the new setting and make cstar wait for the compactions to finish.
    • This step is not mandatory. Cassandra would re-compact the SSTables eventually.
    • Doing a major compaction will cost extra resources and possibly impact the node’s performance. We do not recommend doing this at all nodes in parallel.
    • We are taking advantage of the topology strategy which will guarantee a quorum of replicas free from this load at any time.

The ~/.cstar/commands/change-compaction command might look like this:

#! /bin/bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: topology
# C* description: Switch compaction strategy using jmxterm and perform a major compaction on a specific table
# C* argument: {"option":"--keyspace-name", "name":"KEYSPACE", "description":"Keyspace containing the target table", "required": true}
# C* argument: {"option":"--table", "name":"TABLE", "description":"Table to switch the compaction strategy on", "required": true}
# C* argument: {"option":"--compaction-parameters-json", "name":"COMPACTION_PARAMETERS_JSON", "description":"New compaction parameters", "required": true}
# C* argument: {"option":"--major-compaction-flags", "name":"MAJOR_COMPACTION_FLAGS", "description":"Flags to add to the major compaction command", "default":"", "required": false}
# C* argument: {"option":"--jmxterm-jar-location", "name":"JMXTERM_JAR", "description":"jmxterm jar location on disk", "required": true}

set -xeuo pipefail

echo "Switching compaction strategy on $KEYSPACE.$TABLE"
echo "Stopping running compactions"
nodetool stop COMPACTION
echo "Altering compaction through JMX..."
echo "set -b org.apache.cassandra.db:columnfamily=$TABLE,keyspace=$KEYSPACE,type=ColumnFamilies CompactionParametersJson $COMPACTION_PARAMETERS_JSON"  | java -jar $JMXTERM_JAR --url 127.0.0.1:7199 -e

echo "Running a major compaction..."
nodetool compact ${KEYSPACE} ${TABLE} $MAJOR_COMPACTION_FLAGS

The command requires options specifying which keyspace and table to apply the change on. The jmxterm location and the new value for the compaction parameters are another two required arguments. The command also allows passing in flags to the major compaction. This is useful for cases when we are switching to SizeTieredCompactionStrategy, where the -s flag will instruct cassandra to produce several size-tiered files instead of a single big file.

Running the nodetool compact command will not return until the major compaction finishes. This will cause the execution on one node to not complete until this happens. Consequently, cstar will see this long execution and dutifully wait for it to complete before moving on to other nodes.

Here is an example of running this command:

$ cstar change-compaction --seed-host <host_name> --keyspace tlp_stress --table KeyValue --jmxterm-jar-location /usr/share/jmxterm-1.0.0-uber.jar --compaction-parameters-json "{\"class\":\"LeveledCompactionStrategy\",\"sstable_size_in_mb\":\"120\"}"

This command also benefits from the --stop-after option. Moreover, once all nodes are changed, we should not forget to persist the schema change by doing the actual ALTER TABLE command.

Conclusion

In this post we talked about cstar and its feature of adding custom commands. We have seen:

  • How to add a simple command to execute nodetool status on all nodes at once.
  • How define custom parameters for our commands, which allowed us to build a command for deleting old snapshots.
  • That the custom commands are essentially regular bash scripts and can include multiple statements. We used this feature to do a safe and fast Cassandra version upgrade.
  • That the custom commands can call external utilities such as jmxterm, which we used to change compaction strategy for a table in a rolling fashion.

In the next post, we are going to look into cstar’s cousin called cstarpar. cstarpar differs in the way commands are executed on remote nodes and allows for heavier operations such as rolling reboots.

How to Tweak the Number of num_tokens (vnodes) in Live Cassandra Cluster

Some clients have asked us to change the number of num_tokens as their requirement changes.
For example lower number of num_tokens are recommended is using DSE search etc..
The most important thing during this process is that the cluster stays up, and is healthy and fast. Anything we do needs to be deliberate and safe, as we have production traffic flowing through.

The process includes adding a new DC with a changed number of num_tokens, decommissioning the old DC one by one, and letting Cassandra automatic mechanisms distribute the existing data into the new nodes.

The below procedure is based on the assumption that you have 2 DC DC1 & DC2.

Procedure:


1. Run Repair To Keep Data Consistent Across Cluster

Make sure to run a full repair with nodetool repair. More detail about repairs can be found here. This ensures that all data is propagated from the datacenter which is being decommissioned.


2. Add New DC DC3 And Decommission Old Datacenter DC1

Step 1: Download and Install a similar Cassandra version to the other nodes in the cluster, but do not start.

How-To stop Cassandra

Note: Don’t stop any node in DC1 unless DC3 added.

If you used the Debian package, Cassandra starts automatically. You must stop the node and clear the data.
Stop the node:
Packaged installations: $ sudo service cassandra stop
Tarball installations: nodetool stopdaemon
If for some reason the previous command doesn’t work, find the Cassandra Java process ID (PID), and then kill the process using its PID number:
$ ps auwx | grep cassandra
$ sudo kill pid

Step 2: Clear the data from the default directories once the node is down.

sudo rm -rf /var/lib/cassandra/*
Step 3: Configure the parameter by similar settings of other nodes in the cluster.
Properties which should be set by comparing to other nodes.
Cassandra.yaml:
  • Seeds: This should include nodes from live DC because new nodes have to stream data from them.
  • snitch: Keep it similar to the nodes in live DC.
  • cluster_name: Similar to the nodes in another live DC.
  • num_tokens: Number of vnodes required.
  • initial_tokne: Make sure this is commented out.

Set the local parameters below:

  • auto_bootstrap: false
  • listen_address: Local to the node
  • rpc_address: Local to the node
  • data_directory: Local to the node
  • saved_cache_directory: Local to the node
  • commitlog_directory: Local to the node

Cassandra-rackdc.properties:  Set the parameter for new datacenter and rack:

  • dc: “dc name”
  • rack: “rack name”
Set the below configurations files, as needed:
Cassandra-env.sh
Logback.xml
Jvm.options
Step 4: Start Cassandra on each node, one by one.
Step 5: Now that all nodes are up and running, alter Keyspaces to set RF in a new datacenter with the number of replicas, as well.
ALTER KEYSPACE Keyspace_name WITH REPLICATION = {‘class’ : ‘NetworkTopologyStrategy’, ‘dc1’ : 3, ‘dc2’ : 3, ‘dc3’ : 3};

Step 6: Finally, now that the nodes are up and empty, we should run “nodetool rebuild” on each node to stream data from the existing datacenter.

nodetool rebuild “Existing DC Name”

Step 7: Remove “auto_bootstrap: false” from each Cassandra.yaml or set it to true after the complete process.

auto_bootstrap: true

Decommission DC1:

Now that we have added DC3 into a cluster, it’s time to decommission DC1. However, before decommissioning the datacenter in a production environment, the first step should be to prevent the client from connecting to it and ensure reads or writes do not query this datacenter.
Step 1: Prevent clients from communicating with DC1
  • First of all, ensure that the clients point to an existing datacenter.
  • Set DCAwareRoundRobinPolicy to local to avoid any requests.
Make sure to change QUORUM consistency level to LOCAL_QUORUM and ONE to LOCAL_ONE.
Step 2: ALTER KEYSPACE to not have a replica in decommissioning DC.
ALTER KEYSPACE “Keyspace_name” WITH REPLICATION = {‘class’ : ‘NetworkTopologyStrategy’, ‘dc2’ : 3, ‘dc3’ : 3};
Step 3: Decommission each node using nodetool decommission.
nodetool decommission
Step 4: Remove all data from data, saved caches, and commitlog directory after all nodes are decommissioned to reclaim disk space.
sudo rm -rf “Data_directory”/“Saved_cache_directory”/“Commitlog_directory”
Step 5: Finally, stop Cassandra as described in Step 1.
Step 6: Decommission each node in DC2 by following the above procedure.

3. Add New DC DC4 And Decommission Old DC2


Hopefully, this blog post will help you to understand the procedure for changing the number of vnodes on a live Cluster. Keep in mind that bootstrapping/rebuilding/decommissioning process time depends upon data size.

Introduction to cstar

Spotify is a long time user of Apache Cassandra at very large scale. It is also a creative company which tries to open source most of the tools they build for internal needs. They released Cassandra Reaper a few years ago to give the community a reliable way of repairing clusters, which we now love and actively maintain. Their latest open sourced tool for Cassandra is cstar, a parallel-ssh equivalent (distributed shell) that is Cassandra topology aware. At TLP, we love it already and are sure you soon will too.

What is cstar?

Running distributed databases requires good automation, especially at scale. But even with small clusters, running the same command or roll restarting a cluster can quickly get tedious. Sure, you can use tools like dsh and pssh, but they run commands on all servers at the same time (or just a given number) and you need to keep a list of the nodes to connect to locally. Each time your cluster scales out/in or if nodes get replaced you need to update the list. If you forget to update you may run commands that won’t touch the whole cluster without noticing.

All commands cannot run on all nodes at the same time either. For instance upgrading sstables, running cleanup, major compaction or restarting nodes will have an impact on either latencies or availability and require more granularity of execution.

Cstar doesn’t suffer any of the above problems. It will discover the topology of the cluster dynamically and tune concurrency based on replication settings. In addition, cstar will run from a single machine (not necessarily within the cluster) that has SSH access to all nodes in the cluster, and perform operations through SSH and SFTP. It requires no dependency, other than nodetool, to be installed on the Cassandra nodes.

Installing cstar

You’ll need to have Python 3 and pip3 installed on your server/laptop and then follow the README instructions which will, in the simplest case, boil down to:

pip3 install cstar

Running cstar

Cstar is built with Python 3 and offers a straightforward way to run simple commands or complex scripts on an Apache Cassandra cluster using a single contact point.

The following command, for example, will perform a rolling restart of Cassandra in the cluster, one node at a time using the one strategy:

cstar run --command="sudo service cassandra restart" --seed-host=<contact_point_ip> --strategy=one

During the execution, cstar will update progress with a clear and pleasant output:

 +  Done, up      * Executing, up      !  Failed, up      . Waiting, up
 -  Done, down    / Executing, down    X  Failed, down    : Waiting, down
Cluster: Test Cluster
DC: dc1
+....
....
....
DC: dc2
+....
....
....
DC: dc3
*....
....
....
2 done, 0 failed, 1 executing

If we want to perform cleanup with topology awareness and have only one replica at a time, running the command for each token range (leaving a quorum of unaffected replicas at RF=3), we can use the default topology strategy:

cstar run --command="nodetool cleanup" --seed-host=<contact_point_ip> --strategy=topology

This way, we’ll have several nodes processing the command to minimize the overall time spent on the operation and still ensure low impact on latencies:

 +  Done, up      * Executing, up      !  Failed, up      . Waiting, up
 -  Done, down    / Executing, down    X  Failed, down    : Waiting, down
Cluster: Test Cluster
DC: dc1
****.
....
....
DC: dc2
++++.
****
....
DC: dc3
+****
....
....
5 done, 0 failed, 12 executing

Finally, if we want to run a command that doesn’t involve pressure on latencies and display the outputs locally, we can use strategy all and add the -v flag to display the command outputs:

cstar run --command="nodetool getcompactionthroughput" --seed-host=<contact_point_ip> --strategy=all -v

Which will give us the following output:

 +  Done, up      * Executing, up      !  Failed, up      . Waiting, up
 -  Done, down    / Executing, down    X  Failed, down    : Waiting, down
Cluster: Test Cluster
DC: dc1
*****
****
****
DC: dc2
*****
****
****
DC: dc3
*****
****
****
0 done, 0 failed, 39 executing
Host node1.mycompany.com finished successfully
stdout:
Current compaction throughput: 0 MB/s

Host node21.mycompany.com finished successfully
stdout:
Current compaction throughput: 0 MB/s

Host node10.mycompany.com finished successfully
stdout:
Current compaction throughput: 0 MB/s

...
...

Host node7.mycompany.com finished successfully
stdout:
Current compaction throughput: 0 MB/s

Host node18.mycompany.com finished successfully
stdout:
Current compaction throughput: 0 MB/s

 +  Done, up      * Executing, up      !  Failed, up      . Waiting, up
 -  Done, down    / Executing, down    X  Failed, down    : Waiting, down
Cluster: Test Cluster
DC: dc1
+++++
++++
++++
DC: dc2
+++++
++++
++++
DC: dc3
+++++
++++
++++
39 done, 0 failed, 0 executing
Job cff7f435-1b9a-416f-99e4-7185662b88b2 finished successfully

How cstar does its magic

When you run a cstar command it will first connect to the seed node you provided and run a set of nodetool commands through SSH.

First, nodetool ring will give it the cluster topology with the state of each node. By default, cstar will stop the execution if one node in the cluster is down or unresponsive. If you’re aware that nodes are down and want to run a command nonetheless, you can add the --ignore-down-nodes flag to bypass the check.

Then cstar will list the keyspaces using nodetool cfstats and build a map of the replicas for all token ranges for each of them. This will allow it to identify which nodes contain the same token ranges, using nodetool describering, and apply the topology strategy accordingly. As shown before, the topology strategy will not allow two nodes that are replicas for the same token to run the command at the same time. If the cluster does not use vnodes, the topology strategy will run the command every RF node. If the cluster uses vnodes but is not using NetworkTopologyStrategy (NTS) for all keyspaces nor spreading across racks, chances are only one node will be able to run the command at once, even with the topology strategy.If both NTS and racks are in use, the topology strategy will run the command on a whole rack at a time.

By default, cstar will process the datacenters in parallel, so 2 nodes being replicas for the same tokens but residing in different datacenters can be processed at the same time.

Once the cluster has been fully mapped execution will start in token order. Cstar is very resilient because it uploads a script on each remote node through SFTP and runs it using nohup. Each execution will write output (std and err) files along with the exit code for cstar to check on regularly. If the command is interrupted on the server that runs cstar, it can be resumed safely as cstar will first check if the script is still running or has finished already on each node that hasn’t gone through yet.
Note that interrupting the command on the cstar host will not stop it on the remote nodes that are already running it.
Resuming an interrupted command is done simply by executing : cstar continue <job_id>

Each time a node finishes running the command cstar will check if the cluster health is still good and if the node is up. This way, if you perform a rolling restart and one of the nodes doesn’t come back up properly, although the exit code of the restart command is 0, cstar will wait indefinitely to protect the availability of the cluster. That is unless you specified a timeout on the job. In such a case, the job will fail. Once the node is up after the command has run, cstar will look for the next candidate node in the ring to run the command.

A few handy flags

Two steps execution

Some commands may be scary to run on the whole cluster and you may want to run them on a subset of the nodes first, check that they are in the expected state manually, and then continue the execution on the rest of the cluster. The --stop-after=<number-of-nodes> flag will do just that. Setting it to --stop-after=1 will run the command on a single node and exit. Once you’ve verified that you’re happy with the execution on that one node you can process the rest of the cluster using cstar continue <job_id>.

Retry failed nodes

Some commands might fail mid-course due to transient problems. By default, cstar continue <job_id> will halt if there is any failed execution in the history of the job. In order to resume the job and retry the execution on the failed nodes, add the --retry-failed flag.

Run the command on a specific datacenter

To process only a specific datacenter add the --dc-filter=<datacenter-name> flag. All other datacenters will be ignored by cstar.

Datacenter parallelism

By default, cstar will process the datacenters in parallel. If you only want only one datacenter to process the command at a time, add the --dc-serial flag.

Specifying a maximum concurrency

You can forcefully limit the number of nodes running the command at the same time, regardless of topology, by adding the --max-concurrency=<number-of-nodes> flag.

Wait between each node

You may want to delay executions between nodes in order to give some room for the cluster to recover from the command. The --node-done-pause-time=<time-in-seconds> flag will allow to specify a pause time that cstar will apply before looking for the next node to run the command on.

Run the command regardless down nodes

If you want to run a command while nodes are down in the cluster add the --ignore-down-nodes flag to cstar.

Run on specific nodes only

If the command is meant to run on some specific nodes only you can use either the --host or the --host-file flags.

Control the verbosity of the output

By default, cstar will only display the progress of the execution as shown above in this post. To get the output of the remote commands, add the -v flag. If you want to get more verbosity on the executions and get debug loggings use either -vv (very verbose) or -vvv (extra verbose).

You haven’t installed it already?

Cstar is the tool that all Apache Cassandra operators have been waiting for to manage clusters of all sizes. We were happy to collaborate closely with Spotify to help them open source it. It has been built and matured at one of the most smart and successful start-ups in the world and was developed to manage hundreds of clusters of all sizes. It requires no dependency to be installed on the cluster and uses SSH exclusively. Thus, it will comply nicely with any security policy and you should be able to run it within minutes on any cluster of any size.

We love cstar so much we are already working on integrating it with Reaper as you can see in the following video :

We’ve seen in this blog post how to run simple one line commands with cstar, but there is much more than meets the eye. In an upcoming blog post we will introduce complex command scripts that perform operations like upgrading a Cassandra cluster, selectively clearing snapshots, or safely switching compaction strategies in a single cstar invocation.

Assassinate - A Command of Last Resort within Apache Cassandra

The nodetool assassinate command is meant specifically to remove cosmetic issues after nodetool decommission or nodetool removenode commands have been properly run and at least 72 hours have passed. It is not a command that should be run under most circumstances nor included in your regular toolbox. Rather the lengthier nodetool decommission process is preferred when removing nodes to ensure no data is lost. Note that you can also use the nodetool removenode command if cluster consistency is not the primary concern.

This blog post will explain:

  • How gossip works and why assassinate can disrupt it.
  • How to properly remove nodes.
  • When and how to assassinate nodes.
  • How to resolve issues when assassination attempts fail.

Gossip: Cassandra’s Decentralized Topology State

Since all topological changes happen within Cassandra’s gossip layer, before we discuss how to manipulate the gossip layer, let’s discuss the fundamentals of how gossip works.

From Wikipedia:

A gossip (communication) protocol is a procedure or process of computer-computer communication that is based on the way social networks disseminate information or how epidemics spread… Modern distributed systems often use gossip protocols to solve problems that might be difficult to solve in other ways, either because the underlying network has an inconvenient structure, is extremely large, or because gossip solutions are the most efficient ones available.

The gossip state within Cassandra is the decentralized, eventually consistent, agreed upon topological state of all nodes within a Cassandra cluster. Cassandra gossip heartbeats keep the topological gossip state updated, are emitted via each node in the cluster, and contain the following information:

  • What that node’s status is, and
  • What its neighbors statuses’ are.

When a node goes offline the gossip heartbeat will not be emitted and the node’s neighbors will detect that the node is offline (with help from an algorithm which is tuned by the phi_convict_threshold parameter as defined within the cassandra.yaml), and the neighbor will broadcast an updated status saying that the neighbor node is unavailable until further notice.

However, as soon as the node comes online, two things will happen:

  1. The revived node will:
    • Ask a neighbor node what the current gossip state is.
    • Modify the received gossip state to include its own status.
    • Assume the modified state as its own.
    • Broadcast the new gossip state across the network.
  2. A neighbor node will:
    • Discover the revived node is back online, either by:
      • First-hand discovery, or
      • Second-hand gossiping.
    • Update the received gossip state with the new information.
    • Modify the received gossip state to include its own status.
    • Assume the modified state as its own.
    • Broadcast the new gossip state across the network.

The above gossip protocol is responsible for the UN/DN, or Up|Down/Normal, statuses seen within nodetool status and is responsible for ensuring requests and replicas are properly routed to the available and responsible nodes, among other tasks.

Differences Between Assassination, Decommission, and Removenode

There are three main commands used to take a node offline: assassination, decommission, and removenode. Having the node be in the LEFT state ensures that each node’s gossip state will eventually be consistent and agree that:

  • The deprecated node has in fact been deprecated.
  • The deprecated node was deprecated after a given timestamp.
  • The deprecated token ranges are now owned by a new node.
  • Ideally, the deprecated LEFT stage will be automatically purged in 72 hours.

Underlying Actions of Decommission and Removenode on the Gossip Layer

When nodetool decommission and nodetool removenode commands are run, we are changing the state of the gossip layer to the LEFT state for the deprecated node.

Following the gossip protocol procedure in the previous section, the LEFT status will spread across the cluster as the the new truth since the LEFT status has a more recent timestamp than the previous status.

As more nodes begin to assimilate the LEFT status, the cluster will ultimately reach consensus that the deprecated node has LEFT the cluster.

Underlying Actions of Assassination

Unlike nodetool decommission and nodetool removenode above, when nodetool assassinate is used we are updating the gossip state to the LEFT state, then forcing an incrementation of the gossip generation number, and updating the application state to the LEFT state explicitly, which will then propagate as normal.

Removing Nodes: The “Proper” Way

When clusters grow large, an operator may need to remove a node, either due to hardware faults or horizontally scaling down the cluster. At that time, the operator will need to modify the topological gossip state with either a nodetool decommission command for online nodes or nodetool removenode for offline nodes.

Decommissioning a Node: While Saving All Replicas

The typical command to run on a live node that will be exiting the cluster is:

nodetool decommission

The nodetool decommission command will:

  • Extend certain token ranges within the gossip state.
  • Stream all of the decommissioned node’s data to the new replicas in a consistent manner (the opposite of bootstrap).
  • Report to the gossip state that the node has exited the ring.

While this command may take a while to complete, the extra time spent on this command will ensure that all owned replicas are streamed off the node and towards the new replica owners.

Removing a Node: And Losing Non-Replicated Replicas

Sometimes a node may be offline due to hardware issues and/or has been offline for longer than gc_grace_seconds within a cluster that ingests deletion mutations. In this case, the node needs to be removed from the cluster while remaining offline to prevent “zombie data” from propagating around the cluster due to already expired tombstones, as defined by the gc_grace_seconds window. In the case where the node will remain offline, the following command should be run on a neighbor node:

nodetool removenode $HOST_ID

The nodetool removenode command will:

  • Extend certain token ranges within the gossip state.
  • Report to the gossip state that the node has exited the ring.
  • Will NOT stream any of the decommissioned node’s data to the new replicas.

Increasing Consistency After Removing a Node

Typically a follow up repair is required in a rolling fashion around the data center to ensure each new replica has the required information:

nodetool repair -pr

Note that:

  • The above command will only repair replica consistencies if the replication factor is greater than 1 and one of the surviving nodes contains a replica of the data.
  • Running a rolling repair will generate disk, CPU, and network load proportional to the amount of data needing to be repaired.
  • Throttling a rolling repair by repairing only one node at a time may be ideal.
  • Using Reaper for Apache Cassandra can schedule, manage, and load balance the repair operations throughout the lifetime of the cluster.

How We Can Detect Assassination is Needed

In either of the above cases, sometimes the gossip state will continue to be out of sync. There will be echoes of past statuses that claim not only the node is still part of the cluster, but it may still be alive. And then missing. Intermittently.

When the gossip truth is continuously inconsistent, nodetool assassinate will resolve these inconsistencies, but should only be run after nodetool decommission or nodetool removenode have been run and at least 72 hours have passed.

These issues are typically cosmetic and appear as similar lines within the system.log:

2014-09-26 01:26:41,901 DEBUG [Reconnection-1] Cluster - Failed reconnection to /172.x.y.zzz:9042 ([/172.x.y.zzz:9042] Cannot connect), scheduling retry in 600000 milliseconds

Or may appear as UNREACHABLE within the nodetool describecluster output:

Cluster Information:
        Name: Production Cluster
       Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
       Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
       Schema versions:
              65e78f0e-e81e-30d8-a631-a65dff93bf82: [172.x.y.z]
              UNREACHABLE: [172.x.y.zzz]

Sometimes you may find yourself looking even deeper and spot the deprecated node within nodetool gossipinfo months after removing the node:

/172.x.y.zzz
generation:1486750581
heartbeat:9999
STATUS:131099:LEFT,-5747706879722151680,1487011227852
TOKENS: not present

Note that the LEFT status should stick around for 72 hours to ensure all nodes come to the consensus that the node has been removed. So please don’t rush things if that’s the case. Again, it’s only cosmetic.

In all of these cases the truth may be slightly outdated and an operator may want to set the record straight with truth-based gossip states instead of cosmetic rumor-filled gossip states that include offline deprecated nodes.

How to Run the Assassination Command

Pre-2.2.0, operators used to have to use Java MBeans to assassinate a token (see below). Post-2.2.0, operators can use the nodetool assassinate method.

From an online node, run the command:

nodetool assassinate $IP_ADDRESS

Internally, the nodetool assassinate command will execute the unsafeAssassinateEndpoint command over JMX on the Gossiper MBean.

Java Mbeans Assassination

If using a version of Cassandra that does not yet have the nodetool assassinate command, we’ll have to rely on jmxterm.

You can use the following command to download jmxterm:

wget http://downloads.sourceforge.net/cyclops-group/jmxterm-1.0.0-uber.jar

Then we’ll want to use the Gossiper MBean and run the unsafeAssassinateEndpoint command:

echo "run -b org.apache.cassandra.net:type=Gossiper unsafeAssassinateEndpoint $IP_TO_ASSASSINATE" \
    | java -jar jmxterm-1.0.0-uber.jar -l $IP_OF_LIVE_NODE:7199

Both of the assassination commands will trigger the same MBean command over JMX, however the nodetool assassinatecommand is preferred for its ease of use without additional dependencies.

Resolving Failed Assassination Attempts: And Why the First Attempts Failed

When clusters grow large enough, are geospatially distant enough, or are under intense load, the gossip state may become a bit out of sync with reality. Sometimes this causes assassination attempts to fail and while the solution may sound unnerving, it’s relatively simple once you consider how gossip states act and are maintained.

Because gossip states can be decentralized across high latency nodes, sometimes gossip state updates can be delayed and cause a variety of race conditions that may show offline nodes as still being online. Most times these race conditions will be corrected in relatively short-time periods, as tuned by the phi_convict_threshold within the cassandra.yaml (between a value of 8 for hardware and 12 for virtualized instances). In almost all cases the gossip state will converge into a global truth.

However, because gossip state from nodes that are no longer participating in gossip heartbeat rounds do not have an explicit source and are instead fueled by rumors, dead nodes may sometimes continue to live within the gossip state even after calling the assassinate command.

To solve these issues, you must ensure all race conditions are eliminated.

If a gossip state will not forget a node that was removed from the cluster more than a week ago:

  • Login to each node within the Cassandra cluster.
  • Download jmxterm on each node, if nodetool assassinate is not an option.
  • Run nodetool assassinate, or the unsafeAssassinateEndpoint command, multiple times in quick succession.
    • I typically recommend running the command 3-5 times within 2 seconds.
    • I understand that sometimes the command takes time to return, so the “2 seconds” suggestion is less of a requirement than it is a mindset.
    • Also, sometimes 3-5 times isn’t enough. In such cases, shoot for the moon and try 20 assassination attempts in quick succession.

What we are trying to do is to create a flood of messages requesting all nodes completely forget there used to be an entry within the gossip state for the given IP address. If each node can prune its own gossip state and broadcast that to the rest of the nodes, we should eliminate any race conditions that may exist where at least one node still remembers the given IP address.

As soon as all nodes come to agreement that they don’t remember the deprecated node, the cosmetic issue will no longer be a concern in any system.logs, nodetool describecluster commands, nor nodetool gossipinfo output.

Recap: How To Properly Remove Nodes Completely

Operators shouldn’t opt for the assassinate command as a first resort for taking a Cassandra node out since it is sledgehammer and most of the time operators are dealing with a screw.

However, when operators follow best practices and perform a nodetool decommission for live nodes or nodetool removenode for offline nodes, sometimes lingering cosmetic issues may lead the operator to want to keep the gossip state consistent.

After at least a week of inconsistent gossip state, nodetool assassinate or the unsafeAssassinateEndpoint command may be used to remove deprecated nodes from the gossip state.

When a single assassination attempt does not work across an entire cluster, sometimes the assassination attempt needs to be attempted multiple times on all node within the cluster simultaneously. Doing so will ensure that each node modifies its own gossip state to accurately reflect the deprecated node’s absence within the gossip state as well as ensuring no node will further broadcast rumors of a false gossip state.

Incremental Repair Improvements in Cassandra 4

In our previous post, “Should you use incremental repair?”, we recommended to use subrange full repairs instead of incremental repair as CASSANDRA-9143 could generate some severe instabilities on a running cluster. As the 4.0 release approaches, let’s see how incremental repair was modified for the next major version of Apache Cassandra in order to become reliable in production.

Incremental Repair in Pre-4.0 Clusters

Since Apache Cassandra 2.1, incremental repair was performed as follows:

  • The repair coordinator will ask all replicas to build Merkle trees only using SSTables with a RepairedAt value of 0 (meaning they haven’t been part of a repair yet).
    Merkle trees are hash trees of the data they represent, they don’t store the original data.
  • Mismatching leaves of the Merkle trees will get streamed between replicas
  • When all streaming is done, anticompaction is performed on all SSTables that were part of the repair session

But during the whole process, SSTables could still get compacted away as part of the standard automatic compactions. If that happened, the SSTable would not get anticompacted and all the data it contains would not be marked as repaired. In the below diagram, SSTable 1 is compacted with 3, 4 and 5, creating SSTable 6 during the streaming phase. This happens before anticompaction is able to split apart the repaired and unrepaired data:

SSTable 1 gets compacted away before anticompaction could kick in
SSTable 1 gets compacted away before anticompaction could kick in.

If this happens on a single node, the next incremental repair run would find differences as the previously repaired data would be skipped on all replicas but one, which would lead potentially to a lot of overstreaming. This happens because Merkle trees only contain hashes of data, and in Cassandra, the height of the tree is bounded to prevent over allocation of memory. The more data we use to build our tree, the larger the tree would be. Limiting the height of the tree means the hashes in the leaves are responsible for bigger ranges of data.

Already repaired data in SSTable 6 will be part of the Merkle tree computation
Already repaired data in SSTable 6 will be part of the Merkle tree computation.

If you wonder what troubles can be generated by this bug, I invite you to read my previous blog post on this topic.

Incremental repair in 4.0, the theory

The incremental repair process is now supervised by a transaction to guarantee its consistency. In the “Prepare phase”, anticompaction is performed before the Merkle trees are computed, and the candidate SSTables will be marked as pending a specific repair. note that they are not marked as repaired just yet to avoid inconsistencies in case the repair session fails.

If a candidate SSTable is currently part of a running compaction, Cassandra will try to cancel that compaction and wait up to a minute. If the compaction successfully stops within that time, the SSTable will be locked for future anticompaction, otherwise the whole prepare phase and the repair session will fail.

Incremental repair in 4.0

SSTables marked as pending repair are only eligible to be compacted with other tables marked as pending.
SSTables in the pending repair pool are the only ones participating in both Merkle tree computations and streaming operations :

Incremental repair in 4.0

During repair, the pool of unrepaired SSTables receives newly flushed ones and compaction takes place as usual within it. SSTables that are being streamed in are part of the “pending repair” pool. This prevents two potential problems: If the streamed SSTables were put in the unrepaired pool, it could get compacted away as part of normal compaction tasks and would never be marked as repaired If the streamed SSTables were put in the repaired pool and the repair session failed, we would have data that is marked as repaired on some nodes and not others, which would generate overstreaming during the next repair

Once the repair succeeds, the coordinator sends a request to all replicas to mark the SSTables in pending state as repaired, by setting the RepairedAt timestamp (since anticompaction already took place, Cassandra just needs to set this timestamp).

Incremental repair in 4.0

If some nodes failed during the repair, the “pending repair” SSTables will be released and eligible for compaction (and repair) again. They will not be marked as repaired :

Incremental repair in 4.0

The practice

Let’s see how all of this process takes place by running a repair and observing the behavior of Cassandra.

To that end, I created a 5 node CCM cluster running locally on my laptop and used tlp-stress to load some data with a replication factor of 2 :

bin/tlp-stress run BasicTimeSeries -i 1M -p 1M -t 2 --rate 5000  --replication "{'class':'SimpleStrategy', 'replication_factor':2}"  --compaction "{'class': 'SizeTieredCompactionStrategy'}"  --host 127.0.0.1

Node 127.0.0.1 was then stopped and I deleted all the SSTables from the tlp_stress.sensor_data table :

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns    Host ID                               Rack
UN  127.0.0.1  247,07 KiB  1            ?       dbccdd3e-f74a-4b7f-8cea-e8770bf995db  rack1
UN  127.0.0.2  44,08 MiB  1            ?       3ce4cca5-da75-4ede-94b7-a37e01d2c725  rack1
UN  127.0.0.3  44,07 MiB  1            ?       3b9fd30d-80c2-4fa6-b324-eaecc4f9564c  rack1
UN  127.0.0.4  43,98 MiB  1            ?       f34af1cb-4862-45e5-95cd-c36404142b9c  rack1
UN  127.0.0.5  44,05 MiB  1            ?       a5add584-2e00-4adb-8949-716b7ef35925  rack1

I ran a major compaction on all nodes to easily observe the anticompactions. On node 127.0.0.2, we then have a single SSTable on disk :

sensor_data-f4b94700ad1d11e8981cd5d05c109484 adejanovski$ ls -lrt *Data*
-rw-r--r--  1 adejanovski  staff  41110587 31 aoû 15:09 na-4-big-Data.db

The sstablemetadata tool gives us interesting information about this file :

sstablemetadata na-4-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-4-big
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Bloom Filter FP chance: 0.01
Minimum timestamp: 1535720482962762 (08/31/2018 15:01:22)
Maximum timestamp: 1535720601312716 (08/31/2018 15:03:21)
SSTable min local deletion time: 2147483647 (no tombstones)
SSTable max local deletion time: 2147483647 (no tombstones)
Compressor: org.apache.cassandra.io.compress.LZ4Compressor
Compression ratio: 0.8694195642299255
TTL min: 0
TTL max: 0
First token: -9223352583900436183 (001.0.1824322)
Last token: 9223317557999414559 (001.1.2601952)
minClusteringValues: [3ca8ce0d-ad1e-11e8-80a6-91cbb8e39b05]
maxClusteringValues: [f61aabc1-ad1d-11e8-80a6-91cbb8e39b05]
Estimated droppable tombstones: 0.0
SSTable Level: 0
Repaired at: 0
Pending repair: --
Replay positions covered: {CommitLogPosition(segmentId=1535719935055, position=7307)=CommitLogPosition(segmentId=1535719935056, position=20131708)}
totalColumnsSet: 231168
totalRows: 231168
Estimated tombstone drop times: 
   Drop Time | Count  (%)  Histogram 
   Percentiles
   50th      0 
   75th      0 
   95th      0 
   98th      0 
   99th      0 
   Min       0 
   Max       0 
Partition Size: 
   Size (bytes) | Count  (%)  Histogram 
   179 (179 B)  | 56330 ( 24) OOOOOOOOOOOOOOOOOOo
   215 (215 B)  | 78726 ( 34) OOOOOOOOOOOOOOOOOOOOOOOOOO.
   258 (258 B)  | 89550 ( 39) OOOOOOOOOOOOOOOOOOOOOOOOOOOOOO
   310 (310 B)  |   158 (  0) 
   372 (372 B)  |  1166 (  0) .
   446 (446 B)  |  1691 (  0) .
   535 (535 B)  |   225 (  0) 
   642 (642 B)  |    23 (  0) 
   770 (770 B)  |     1 (  0) 
   Percentiles
   50th      215 (215 B)
   75th      258 (258 B)
   95th      258 (258 B)
   98th      258 (258 B)
   99th      372 (372 B)
   Min       150 (150 B)
   Max       770 (770 B)
Column Count: 
   Columns | Count   (%)  Histogram 
   1       | 224606 ( 98) OOOOOOOOOOOOOOOOOOOOOOOOOOOOOO
   2       |   3230 (  1) .
   3       |     34 (  0) 
   Percentiles
   50th      1
   75th      1
   95th      1
   98th      1
   99th      2
   Min       0
   Max       3
Estimated cardinality: 222877
EncodingStats minTTL: 0
EncodingStats minLocalDeletionTime: 1442880000 (09/22/2015 02:00:00)
EncodingStats minTimestamp: 1535720482962762 (08/31/2018 15:01:22)
KeyType: org.apache.cassandra.db.marshal.UTF8Type
ClusteringTypes: [org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimeUUIDType)]
StaticColumns: 
RegularColumns: data:org.apache.cassandra.db.marshal.UTF8Type

It is worth noting the cool improvements sstablemetadata has gone through in 4.0, especially regarding the histograms rendering. So far, and as expected, our SSTable is not repaired and it is not pending a running repair.

Once the repair starts, the coordinator node executes the Prepare phase and anticompaction is performed :

sensor_data-f4b94700ad1d11e8981cd5d05c109484 adejanovski$ ls -lrt *Data*
-rw-r--r--  1 adejanovski  staff  20939890 31 aoû 15:41 na-6-big-Data.db
-rw-r--r--  1 adejanovski  staff  20863325 31 aoû 15:41 na-7-big-Data.db

SSTable na-6-big is marked as pending our repair :

sstablemetadata na-6-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-6-big
...
Repaired at: 0
Pending repair: 8e584410-ad23-11e8-ba2c-0feeb881768f
Replay positions covered: {CommitLogPosition(segmentId=1535719935055, position=7307)=CommitLogPosition(segmentId=1535719935056, position=21103491)}

na-7-big remains in the “unrepaired pool” (it contains tokens that are not being repaired in this session) :

sstablemetadata na-7-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-7-big
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Bloom Filter FP chance: 0.01
...
Repaired at: 0
Pending repair: --

Once repair finishes, another look at sstablemetadata on na-6-big shows us that it is now marked as repaired :

sstablemetadata na-6-big-Data.db
SSTable: /Users/adejanovski/.ccm/inc-repair-fix-2/node2/data0/tlp_stress/sensor_data-f4b94700ad1d11e8981cd5d05c109484/na-6-big
...
Estimated droppable tombstones: 0.0
SSTable Level: 0
Repaired at: 1535722885852 (08/31/2018 15:41:25)
Pending repair: --
…

Again, I really appreciate not having to compute the repair date by myself thanks to an sstablemetadata output that is a lot more readable than it was before.

Reliable incremental repair

While Apache Cassandra 4.0 is being stabilized and there are still a few bugs to hunt down, incremental repair finally received the fix it deserved to make it production ready for all situations. The transaction that encloses the whole operation will shield Cassandra from inconsistencies and overstreaming, making cyclic repairs a fast and safe operation. Orchestration is still needed though as SSTables cannot be part of 2 distinct repair sessions that would run at the same time, and it is advised to use a topology aware tool to perform the operation without hurdles.
It it worth noting that full repair in 4.0 doesn’t involve anticompaction anymore and does not mark SSTables as repaired. This will bring full repair back to its 2.1 behavior and allow to run it on several nodes at the same time without fearing conflicts between validation compactions and anticompactions.

So you have a broken Cassandra SSTable file?

Every few months I have a customer come to me with the following concern: my compactions for one of my Cassandra tables are stuck or my repairs fail when referencing one of the nodes in my Cassandra cluster. I take a look or just ask a couple of questions and it becomes apparent that the problem is a broken SSTable file. Occasionally, they will come to me in a panic and tell me that they have looked at their logs and discovered they have a broken SSTable file.

Don’t panic. A broken SSTable file is not a crisis.

A broken SSTable file does not represent lost data or an unusable database. Well, that’s true unless you are using a Replication Factor (RF) of ONE. The cluster is still going to operate, and queries should be working just fine. But… it does need to be fixed. There are four ways to fix the problem which I will explain in this post, one of which I freely admit is not one of the community’s recommended ways, but is often the simplest and quickest with minimal downside risk.

Before I begin to explain the ways to repair an SSTable, I will spend a few lines to explain what an SSTable file is, then I will walk you through the four options from easiest and safest to the most difficult and risky.

An SSTable file is not a file. It’s a set of eight files. One of those eight contains the actual data. The others contain metadata used by Cassandra to find specific partitions and rows in the data file. Here is a sample list of the files:

mc-131-big-CRC.db Checksums of chunks of the data file.
mc-131-big-Data.db The data file that contains all of rows and columns.
mc-131-big-Digest.crc32 Single checksum of the data file.
mc-131-big-Filter.db Bloom filter containing partial checksums of all partition and cluster keys.
mc-131-big-Index.db A single level index of the partitions and cluster keys in the data file.
mc-131-big-Statistics.db Bunch of metadata that Cassandra keeps about this file including information about the columns, tombstones etc.
mc-131-big-Summary.db An index into the index file. Making this a second level index.
mc-131-big-TOC.txt This list of file names. No idea why it exists.

The “mc” is the SSTable file version. This changes whenever a new release of Cassandra changes anything in the way data is stored in any of the files listed in the table above.

The number 131 is the sequence number of the SSTable file. It increases for each new SSTable file written through memtable flush, compaction, or streaming from another node.

The word “big” was added to Cassandra SSTable files starting in Cassandra 2.2. I have no idea what its purpose is.

The rest of the file name parts are explained in the chart above.

When you get the dreaded error that an SSTable file is broken, it almost always is because an internal consistency check such as “column too long” or “one of the checksums has failed to validate”. This has relatively little effect on normal reads against the table except for the request where the failure took place. It has a serious effect on compactions and repairs, stopping them in their tracks.

Having repairs fail can result in long-term consistency issues between nodes and eventually the application returning incorrect results. Having compactions fail will degrade read performance in the short term and cause storage space problems in the long term.

So… what are the four options?

  1. Nodetool scrub command – Performed online with little difficulty. It usually has a low success rate in my own personal experience.
  2. Offline sstablescrub – Must be performed offline. The tool is in /usr/bin with a package install. Otherwise its in $CASSANDRAHOME/bin. Its effectiveness rate is significantly better than the Nodetool scrub, but it requires the node to be down to work. And it takes forever…
  3. rm -f – Performed offline. it must also be followed immediately with a Nodetool repair when you bring the node back up. This is the method I have successfully used most often but it also has some consistency risks while the repairs complete.
  4. Bootstrap the node – This is kind of like number 3 but it has less theoretical impact on consistency.

Let us get into the details

It starts out like this. You are running a Nodetool repair and you get an error:
$ nodetool repair -full

[2018-08-09 17:00:51,663] Starting repair command #2 (4c820390-9c17-11e8-8e8f-fbc0ff4d2cb8), repairing keyspace keyspace1 with repair options (parallelism: parallel, primary range: false, incremental: false, job threads: 1, ColumnFamilies: [], dataCenters: [], hosts: [], # of ranges: 768, pull repair: false)

error: Repair job has failed with the error message: [2018-08-15 09:59:41,659] Some repair failed

— StackTrace —

java.lang.RuntimeException: Repair job has failed with the error message: [2018-08-15 09:59:41,659] Some repair failed

You see the error. But it doesn’t tell you a whole lot. Just that the repair failed. Next step look at the Cassandra system.log file you want to see the errors:
$ grep -n -A10 ERROR /var/log/cassandra/system.log

ERROR [RepairJobTask:8] 2018-08-08 15:15:57,726 RepairRunnable.java:277 – Repair session 2c5f89e0-9b39-11e8-b5ee-bb8feee1767a for range [(-1377105920845202291,-1371711029446682941], (-8865445607623519086,-885162575564883…. 425683885]]] Sync failed between /192.168.1.90 and /192.168.1.92

/var/log/cassandra/debug.log:ERROR [RepairJobTask:4] 2018-08-09 16:16:50,722 RepairSession.java:281 – [repair #25682740-9c11-11e8-8e8f-fbc0ff4d2cb8] Session completed with the following error

/var/log/cassandra/debug.log:ERROR [RepairJobTask:4] 2018-08-09 16:16:50,726 RepairRunnable.java:277 – Repair session 25682740-9c11-11e8-8e8f-fbc0ff4d2cb8…… 7115161941975432804,7115472305105341673], (5979423340500726528,5980417142425683885]]] Validation failed in /192.168.1.88

/var/log/cassandra/system.log:ERROR [ValidationExecutor:2] 2018-08-09 16:16:50,707 Validator.java:268 – Failed creating a merkle tree for [repair #25682740-9c11-11e8-8e8f-fbc0ff4d2cb8 on keyspace1/standard1,

The first error message Sync Failed is misleading although sometimes it can be a clue. Looking further, you see Validation failed in /192.168.1.88. This tells us that the error occurred on 192.158.1.88 which just happens to be the node we are on. Finally, we get the message showing the keyspace and table the error occurred on. Depending on the message, you might see the table file number mentioned. In this case it was not mentioned.

Looking in the directory tree we see that we have the following SSTable files:

4,417,919,455 mc-30-big-Data.db
8,831,253,280 mc-45-big-Data.db
374,007,490 mc-49-big-Data.db
342,529,995 mc-55-big-Data.db
204,178,145 mc-57-big-Data.db
83,234,470 mc-59-big-Data.db
3,223,224,985 mc-61-big-Data.db
24,552,560 mc-62-big-Data.db
2,257,479,515 mc-63-big-Data.db
2,697,986,445 mc-66-big-Data.db
5,285 mc-67-big-Data.db

At this point we have our repair options. I’ll take them one at a time.

Online SSTable repair – Nodetool scrub

This command is easy to perform. It is also the option least likely to succeed.

Steps:

  1. Find out which SSTable is broken.
  2. Run nodetool scrub keyspace tablename.
  3. Run nodetool repair.
  4. Run nodetool listsnapshots.
  5. Run nodetool clearsnapshot keyspacename -t snapshot name.

We did the whole “find out what table is broken” thing just above, so we aren’t going to do it again. We will start with step 2.

Scrub will take a snapshot and rebuild your table files. The one(s) that are corrupt will disappear. You will lose at least a few rows and possibly all the rows from the corrupted SSTable files. Hence the need to do a repair.


$ nodetool scrub keyspace1 standard1

After the scrub, we have fewer SStable files and their names have all changed. There is also less space consumed and very likely some rows missing.

2,257,479,515 mc-68-big-Data.db
342,529,995 mc-70-big-Data.db
3,223,224,985 mc-71-big-Data.db
83,234,470 mc-72-big-Data.db
4,417,919,455 mc-73-big-Data.db
204,178,145 mc-75-big-Data.db
374,007,490 mc-76-big-Data.db
2,697,986,445 mc-77-big-Data.db
1,194,479,930 mc-80-big-Data.db

So we do a repair.

$ nodetool repair -full

[2018-08-09 17:00:51,663] Starting repair command #2 (4c820390-9c17-11e8-8e8f-fbc0ff4d2cb8), repairing keyspace keyspace1 with repair options (parallelism: parallel, primary range: false, incremental: false, job threads: 1, ColumnFamilies: [], dataCenters: [], hosts: [], # of ranges: 768, pull repair: false) [2018-08-09 18:14:09,799] Repair session 4cadf590-9c17-11e8-8e8f-fbc0ff4d2cb8 for range [(-1377105920845202291,… [2018-08-09 18:14:10,130] Repair completed successfully [2018-08-09 18:14:10,131] Repair command #2 finished in 1 hour 13 minutes 18 seconds

After the repair, we have almost twice as many SSTable files with data pulled in from other nodes to replace the corrupted data lost by the scrub process.

2,257,479,515 mc-68-big-Data.db
342,529,995 mc-70-big-Data.db
3,223,224,985 mc-71-big-Data.db
83,234,470 mc-72-big-Data.db
4,417,919,455 mc-73-big-Data.db
204,178,145 mc-75-big-Data.db
374,007,490 mc-76-big-Data.db
2,697,986,445 mc-77-big-Data.db
1,194,479,930 mc-80-big-Data.db
1,209,518,945 mc-88-big-Data.db
193,896,835 mc-89-big-Data.db
170,061,285 mc-91-big-Data.db
63,427,680 mc-93-big-Data.db
733,830,580 mc-95-big-Data.db
1,747,015,110 mc-96-big-Data.db
16,715,886,480 mc-98-big-Data.db
49,167,805 mc-99-big-Data.db

Once the scrub and repair are completed, you are almost done.

One of the side effects of the scrub is a snapshot called pre-scrub-<timestamp>. If you don’t want to run out of diskspace, you are going to want to remove it, preferably with the nodetool.

$ nodetool listsnapshots

Snapshot Details:

Snapshot name Keyspace name Column family name True size Size on disk

pre-scrub-1533897462847 keyspace1 standard1 35.93 GiB 35.93 GiB

$ nodetool clearsnapshot -t pre-scrub-1533897462847

Requested clearing snapshot(s) for [all keyspaces] with snapshot name [pre-scrub-1533897462847]

If the repair still fails to complete, we get to try one of the other methods.

Offline SSTable repair utility – sstablescrub

This option is a bit more complex to do but it often will work when the online version won’t work. Warning: it is very slow.

Steps:

  1. Bring the node down.
  2. Run the sstablescrub command.
  3. Start the node back up.
  4. Run nodetool repair on the table.
  5. Run nodetool clearsnapshot to remove the pre-scrub snapshot.

If the node is not already down, bring it down. I usually do the following commands:

$ nodetool drain

$ pkill java

$ ps -ef |grep cassandra

root 18271 14813 0 20:39 pts/1 00:00:00 grep –color=auto cassandra

Then issue the sstablescrub command with the -n option unless you have the patience of a saint. Without the -n option, every column in every row in every SSTable file will be validated. Single threaded. It will take forever. In preparing for this blog post, I forgot to use the -n and found that it took 12 hours to scrub 500 megabytes of a 30 GB table. Not willing to wait 30 days for the scrub to complete, I stopped it and switched to the -n option completing the scrub in only… hang on for this, 6 days. So, um, maybe this isn’t going to be useful in most real-world situations unless you have really small tables.

$ Sstablescrub -n keyspace1 standard1

Pre-scrub sstables snapshotted into snapshot pre-scrub-1533861799166

Scrubbing BigTableReader(path=’/home/cassandra/data/keyspace1/standard1-afd416808c7311e8a0c96796602809bc/mc-88-big-Data.db’) (1.126GiB)…

Unfortunately, this took more time than I wanted to take for this blog post. Once you have the table scrubbed, you restart Cassandra and delete.

Delete the file and do a Nodetool repair – rm

This option works every time. It is no more difficult to do than the offline sstablescrub command and its success rate is 100%. It’s usually much faster than the offline sstablescrub option. In my prep for the blog post, this approach took only two hours for my 30 GB table. The only drawback I can see is that for the time it takes to do the repair on the table after the delete is performed, there is an increased risk of consistency problems esp if you are using CF=1 which should be a fairly uncommon use case.

Steps:

  1. Stop the node.
  2. cd to the offending keyspace and sstable directory.
  3. If you know which sstable file is bad (if you learned about the problem from stalled compactions, you will know) just delete it. If not, delete all files in the directory.
  4. Restart the node.
  5. Nodetool repair.

$ nodetool drain

$ pkill java

$ ps -ef |grep cassandra

root 18271 14813 0 20:39 pts/1 00:00:00 grep –color=auto cassandra

$ cd /var/lib/cassandra/data/keyspace1/standard1-afd416808c7311e8a0c96796602809bc/

$ pwd

/var/lib/cassandra/data/keyspace1/standard1-afd416808c7311e8a0c96796602809bc

If you know the SSTable file you want to delete, you can delete just that one with rm -f *nnn*. If not, as in this case, you do them all.

$ sudo rm -f *

rm: cannot remove ‘backups’: Is a directory

rm: cannot remove ‘snapshots’: Is a directory

$ ls

backups snapshots

$systemctl start cassandra

$ nodetool status

Datacenter: datacenter1

=======================

Status=Up/Down

|/ State=Normal/Leaving/Joining/Moving

— Address Load Tokens Owns (effective) Host ID Rack

UN 192.168.1.88 1.35 MiB 256 100.0% c92d9374-cf3a-47f6-9bd1-81b827da0c1e rack1

UN 192.168.1.90 41.72 GiB 256 100.0% 3c9e61ae-8741-4a74-9e89-cfa47768ac60 rack1

UN 192.168.1.92 30.87 GiB 256 100.0% c36fecad-0f55-4945-a741-568f28a3cd8b rack1

$ nodetool repair keyspace1 standard1 -full

[2018-08-10 11:23:22,454] Starting repair command #1 (51713c00-9cb1-11e8-ba61-01c8f56621df), repairing keyspace keyspace1 with repair options (parallelism: parallel, primary range: false, incremental: false, job threads: 1, ColumnFamilies: [standard1], dataCenters: [], hosts: [], # of ranges: 768, pull repair: false) [2018-08-10 13:02:36,097] Repair completed successfully [2018-08-10 13:02:36,098] Repair command #1 finished in 1 hour 39 minutes 13 seconds

The SSTable file list now looks like this:

229,648,710 mc-10-big-Data.db
103,421,070 mc-11-big-Data.db
1,216,169,275 mc-12-big-Data.db
76,738,970 mc-13-big-Data.db
773,774,270 mc-14-big-Data.db
17,035,624,448 mc-15-big-Data.db
83,365,660 mc-16-big-Data.db
170,061,285 mc-17-big-Data.db
758,998,865 mc-18-big-Data.db
2,683,075,900 mc-19-big-Data.db
749,573,440 mc-1-big-Data.db
91,184,160 mc-20-big-Data.db
303,380,050 mc-21-big-Data.db
3,639,126,510 mc-22-big-Data.db
231,929,395 mc-23-big-Data.db
1,469,272,390 mc-24-big-Data.db
204,485,420 mc-25-big-Data.db
345,655,100 mc-26-big-Data.db
805,017,870 mc-27-big-Data.db
50,714,125 mc-28-big-Data.db
11,578,088,555 mc-2-big-Data.db
170,033,435 mc-3-big-Data.db
1,677,053,450 mc-4-big-Data.db
62,245,405 mc-5-big-Data.db
8,426,967,700 mc-6-big-Data.db
1,979,214,745 mc-7-big-Data.db
2,910,586,420 mc-8-big-Data.db
14,097,936,920 mc-9-big-Data.db

Bootstrap the node

If you are using consistency factor (CF) ONE on reads, or you are really concerned about consistency overall, use this approach instead of the rm -f approach. It will insure that the node with missing data will not participate in any reads until all data is restored. Depending on how much data the node has to recover, it will often take longer than any of the other approaches. Although since bootstrapping can operate in parallel, it may not.

Steps:

  1. Shut down the node.
  2. Remove all of the files under the $CASSANDRA_HOME. Usually /var/lib/Cassandra.
  3. Modify /etc/cassandra/conf/cassandra-env.sh.
  4. Start Cassandra. – When the server starts with no files, it will connect to one of its seeds, recreate the schema and request all nodes to stream data to it to replace the data it has lost. It will not re-select new token ranges unless you try to restart it with a different IP than it had before.
  5. Modify the /ect/cassandra/conf/cassandra-env.sh file to remove the change in Step 3.

$ nodetool drain

$ sudo pkill java

$ ps -ef |grep java

$ vi /etc/cassandra/conf/cassandra-env.sh

Add this line at the end of the file:

JVM_OPTS=”$JVM_OPTS -Dcassandra.replace_address=192.168.1.88″

$ systemctl start cassandra

Wait for the node to join the cluster

During the bootstrap we see messages like this in the log:

INFO [main] 2018-08-10 13:39:06,780 StreamResultFuture.java:90 - [Stream #47b382f0-9cc4-11e8-a010-51948a7598a1] Executing streaming plan for Bootstrap

INFO [StreamConnectionEstablisher:1] 2018-08-10 13:39:06,784 StreamSession.java:266 – [Stream #47b382f0-9cc4-11e8-a010-51948a7598a1] Starting streaming to /192.168.1.90 >/code>

Later on we see:

INFO [main] 2018-08-10 14:18:16,133 StorageService.java:1449 - JOINING: Finish joining ring

INFO [main] 2018-08-10 14:18:16,482 SecondaryIndexManager.java:509 – Executing pre-join post-bootstrap tasks for: CFS(Keyspace=’keyspace1′, ColumnFamily=’standard1′)

INFO [main] 2018-08-10 14:18:16,484 SecondaryIndexManager.java:509 – Executing pre-join post-bootstrap tasks for: CFS(Keyspace=’keyspace1′, ColumnFamily=’counter1′)

INFO [main] 2018-08-10 14:18:16,897 StorageService.java:2292 – Node /192.168.1.88 state jump to NORMAL

WARN [main] 2018-08-10 14:18:16,899 StorageService.java:2324 – Not updating token metadata for /192.168.1.88 because I am replacing it

When we do a nodetool status we see:

$ nodetool status

Datacenter: datacenter1

=======================

Status=Up/Down

|/ State=Normal/Leaving/Joining/Moving

— Address Load Tokens Owns (effective) Host ID Rack

UN 192.168.1.88 30.87 GiB 256 100.0% c92d9374-cf3a-47f6-9bd1-81b827da0c1e rack1

UN 192.168.1.90 41.72 GiB 256 100.0% 3c9e61ae-8741-4a74-9e89-cfa47768ac60 rack1

UN 192.168.1.92 30.87 GiB 256 100.0% c36fecad-0f55-4945-a741-568f28a3cd8b rack1

The node is up and running in less than one hour. Quicker than any of the options. Makes you think about your choices, doesn’t it?

If you have a keyspace with RF=1 then options 3 and 4 are not viable. You will lose data. Although with RF=1 and a corrupted SSTable file you are going to lose some data anyway.

A last view at the list of SSTable files shows you this:

773,774,270 mc-10-big-Data.db
17,148,617,040 mc-11-big-Data.db
749,573,440 mc-1-big-Data.db
170,033,435 mc-2-big-Data.db
1,677,053,450 mc-3-big-Data.db
62,245,405 mc-4-big-Data.db
8,426,967,700 mc-5-big-Data.db
229,648,710 mc-6-big-Data.db
103,421,070 mc-7-big-Data.db
1,216,169,275 mc-8-big-Data.db
76,738,970 mc-9-big-Data.db

Conclusion

If you run into corrupted SSTable files, don’t panic. It won’t have any impact on your operations in the short term unless you are using RF=ONE or CF=ONE.

Find out which node has the broken SSTable file.

Then, because its easiest and low risk, try the online nodetool scrub command.

If that does not work, then you have three choices. Offline Scrub works but is usually too slow to be useful. Rebuilding the whole node seems to be overkill but it will work, and it will maintain consistency on reads. If you have a lot of data and you want to solve the problem fairly quickly, just remove the offending SSTable file and do a repair.

All approaches have an impact on the other nodes in the cluster.

The first three require a repair which computes merkle trees and streams data to the node being fixed. The amount to be streamed is most with the delete but the total time for the recovery was less in my example. That may not always be the case. In the bootstrap example, the total time was very similar to the delete case because my test case had only one large table. If there were several large tables, the delete approach would have been the fastest to get the node back to normal.

Approach Scrub phase Repair phase Total Recovery time
Online Scrub 1:06 1:36 2:42
*Offline Scrub 144:35 1:37 146:22
Delete files 0:05 1:36 1:41
Bootstrap 0:05 1:45 1:50

All sample commands show the user in normal Linux user mode. That is because in my test environment the Cassandra cluster belonged to my user id. Most production Cassandra clusters run as the Cassandra Linux user. In that case, some amount of user id switching or sudo operations would be required to do the work.

The offline scrub time was estimated. I did not want to wait for six days to see if it was really going to take that long.

All sample output provided here was from a three-node cluster running Cassandra 3.11.2 running on Fedora 28 using a vanilla Cassandra install with pretty much everything in cassandra.env defaulted.

I corrupted the SSTable file using this command:

$ printf '\x31\xc0\xc3' | dd of=mc-8-big-Data.db bs=1 seek=0 count=100 conv=notrunc

The Fine Print When Using Multiple Data Directories

One of the longest lived features in Cassandra is the ability to allow a node to store data on more than one than one directory or disk. This feature can help increase cluster capacity or prevent a node from running out space if bootstrapping a new one will take too long to complete. Recently I was working on a cluster and saw how this feature has the potential to silently cause problems in a cluster. In this post we will go through some fine print when configuring Cassandra to use multiple disks.

Jay… what?

The feature which allows Cassandra to store data on multiple disks is commonly referred to as JBOD [pronounced jay-bod] which stands for “Just a Bunch Of Disks/Drives”. In Cassandra this feature is controlled by the data_file_directories setting in the cassandra.yaml file. In relation to this setting, Cassandra also allows its behaviour on disk failure to be controlled using the disk_failure_policy setting. For now I will leave the details of the setting alone, so we can focus exclusively on the data_file_directories setting.

Simple drives, simple pleasures

The data_file_directories feature is fairly straight forward in that it allows Cassandra to use multiple directories to store data. To use it just specify the list of directories you want Cassandra to use for data storage. For example.

data_file_directories:
    - /var/lib/cassandra/data
    - /var/lib/cassandra/data-extra

The feature has been around from day one of Cassandra’s life and the way in which Cassandra uses multiple directories has mostly stayed the same. There are no special restrictions to the directories, they can be on the same volume/disk or a different volume/disk. As far as Cassandra is concerned, the paths specified in the setting are just the directories it has available to read and write data.

At a high level, the way the feature works is Cassandra tries to evenly split data into each of the directories specified in the data_file_directories setting. No two directories will ever have an identical SSTable file name in them. Below is an example of what you could expect to see if you inspected each data directory when using this feature. In this example the node is configured to use two directories: …/data0/ and …/data1/

$ ls .../data0/music/playlists-3b90f8a0a50b11e881a5ad31ff0de720/
backups                      mc-5-big-Digest.crc32  mc-5-big-Statistics.db
mc-5-big-CompressionInfo.db  mc-5-big-Filter.db     mc-5-big-Summary.db
mc-5-big-Data.db             mc-5-big-Index.db      mc-5-big-TOC.txt

$ ls .../data1/music/playlists-3b90f8a0a50b11e881a5ad31ff0de720/
backups                      mc-6-big-Digest.crc32  mc-6-big-Statistics.db
mc-6-big-CompressionInfo.db  mc-6-big-Filter.db     mc-6-big-Summary.db
mc-6-big-Data.db             mc-6-big-Index.db      mc-6-big-TOC.txt

Data resurrection

One notable change which modified how Cassandra uses the data_file_directories setting was CASSANDRA-6696. The change was implemented in Cassandra version 3.2. To explain this problem and how it was fixed, consider the case where a node has two data directories A and B. Prior to this change in Cassandra, you could have a node that had data for a specific token in one SSTable that was on disk A. The node could also have a tombstone associated with that token in another SSTable on disk B. If the gc_grace_seconds passed, and no compactions were processed to reclaim the data tombstone there would be an issue if disk B failed. In this case if disk B did fail, the tombstone is lost and the data on disk A is still present! Running a repair in this case would resurrect the data by propagating it to other replicas! To fix this issue, CASSANDRA-6696 changed Cassandra so that a token range was always stored on a single disk.

This change did make Cassandra more robust when using the data_file_directories setting, however this change was no silver bullet and caution still needs to be taken when it is used. Most notably, consider the case where each data directory is mounted to a dedicated disk and the cluster schema results in wide partitions. In this scenario one of the disks could easily reach its maximum capacity due to the wide partitions while the other disk still has plenty of storage capacity.

How to lose a volume and influence a node

For a node running Cassandra version less than 3.2 and using the data_file_directories setting there are a number vulnerabilities to watch out for. If each data directory is mounted to a dedicated disk, and one of the disk dies or the mount disappears then this can silently cause problems. To explain this problem, consider the case where we installed and the data is located in /var/lib/cassandra/data. Say we want to add another directory to store data in only this time the data will be on another volume. It makes sense to have the data directories in the same location, so we create the directory /var/lib/cassandra/data-extra. We then mount our volume so that /var/lib/cassandra/data-extra points to it. If the disk backing /var/lib/cassandra/data-extra died or we forgot to put the mount information in fstab and lose the mount on a restart, then we will effectively lose system table data. Cassandra will start because the directory /var/lib/cassandra/data-extra exists however it will be empty.

Similarly, I have seen cases where a directory was manually added to a node that was managed by chef. In this case the node was running out disk space and there was no time to wait for new node to bootstrap. To avoid a node going down an additional volume was attached, mounted, and the data_file_directories setting in the cassandra.yaml modified to include the new data directory. Some time later chef was executed on the node to deploy an update, and as a result it reset cassandra.yaml configuration. Resetting the cassandra.yaml cleared the additional data directory that was listed under data_file_directories setting. When the node was restarted, the Cassandra process never knew that there was another data directory it had to read from.

Either of these cases can lead to more problems in the cluster. Remember how earlier I mentioned that a complete SSTable file will be always stored in a single data directory when using the data_file_directories setting? This behaviour applies to all data stored by Cassandra including its system data! So that means, in the above two scenarios Cassandra could potentially lose system table data. This is a problem because the system data table stores information about what data the node owns, what the schema is, and whether the node has bootstrapped. If the system table is lost and the node restarted the node will think it is a new node, take on a new identity and new token ranges. This results in a token range movement in the cluster. We have covered this topic in more detail in our auto bootstrapping blog post. This problem gets worse when a seed node loses its system table and comes back as a new node. This is because seed nodes never stream data and if a cleanup is run cluster wide, data is then lost.

Testing the theory

We can test these above scenarios for different versions of Cassandra using ccm. I created the following script to setup a three node cluster in ccm with each node configured to be a seed node and use two data directories. We use seed nodes to show the worst case scenario that can occur when a node using multiple data directories loses one of the directories.

#!/bin/bash

# This script creates a three node CCM cluster to demo the data_file_directories
# feature in different versions of Cassandra.

set -e

CLUSTER_NAME="${1:-TestCluster}"
NUMBER_NODES="3"
CLUSTER_VERSION="${2:-3.0.15}"

echo "Cluster Name: ${CLUSTER_NAME}"
echo "Cluster Version: ${CLUSTER_VERSION}"
echo "Number nodes: ${NUMBER_NODES}"

ccm create ${CLUSTER_NAME} -v ${CLUSTER_VERSION}

# Modifies the configuration of a node in the CCM cluster.
function update_node_config {
  CASSANDRA_YAML_SETTINGS="cluster_name:${CLUSTER_NAME} \
                          num_tokens:32 \
                          endpoint_snitch:GossipingPropertyFileSnitch \
                          seeds:127.0.0.1,127.0.0.2,127.0.0.3"

  for key_value_setting in ${CASSANDRA_YAML_SETTINGS}
  do
    setting_key=$(echo ${key_value_setting} | cut -d':' -f1)
    setting_val=$(echo ${key_value_setting} | cut -d':' -f2)
    sed -ie "s/${setting_key}\:\ .*/${setting_key}:\ ${setting_val}/g" \
      ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra.yaml
  done

  # Create and configure the additional data directory
  extra_data_dir="/Users/anthony/.ccm/${CLUSTER_NAME}/node${1}/data1"

  sed -ie '/data_file_directories:/a\'$'\n'"- ${extra_data_dir}
    " ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra.yaml

  mkdir ${extra_data_dir}

  sed -ie "s/dc=.*/dc=datacenter1/g" \
    ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra-rackdc.properties
  sed -ie "s/rack=.*/rack=rack${1}/g" \
    ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra-rackdc.properties

  # Tune Cassandra memory usage so we can run multiple nodes on the one machine
  sed -ie 's/\#MAX_HEAP_SIZE=\"4G\"/MAX_HEAP_SIZE=\"500M\"/g' \
    ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra-env.sh
  sed -ie 's/\#HEAP_NEWSIZE=\"800M\"/HEAP_NEWSIZE=\"120M\"/g' \
    ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra-env.sh

  # Allow remote access to JMX without authentication. This is for
  # demo purposes only - Never do this in production
  sed -ie 's/LOCAL_JMX=yes/LOCAL_JMX=no/g' \
    ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra-env.sh
  sed -ie 's/com\.sun\.management\.jmxremote\.authenticate=true/com.sun.management.jmxremote.authenticate=false/g' \
    ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra-env.sh
}

for node_num in $(seq ${NUMBER_NODES})
do
  echo "Adding 'node${node_num}'"
  ccm add node${node_num} \
    -i 127.0.0.${node_num} \
    -j 7${node_num}00 \
    -r 0 \
    -b

  update_node_config ${node_num}

  # Set localhost aliases - Mac only
  echo "ifconfig lo0 alias 127.0.0.${node_num} up"
  sudo ifconfig lo0 alias 127.0.0.${node_num} up
done

sed -ie 's/use_vnodes\:\ false/use_vnodes:\ true/g' \
  ~/.ccm/${CLUSTER_NAME}/cluster.conf

I first tested Cassandra version 2.1.20 using the following process.

Run the script and check the nodes were created.

$ ccm status
Cluster: 'mutli-dir-test'
-------------------------
node1: DOWN (Not initialized)
node3: DOWN (Not initialized)
node2: DOWN (Not initialized)

Start the cluster.

$ for i in $(seq 1 3); do echo "Starting node${i}"; ccm node${i} start; sleep 10; done
Starting node1
Starting node2
Starting node3

Check the cluster is up and note the Host IDs.

$  ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  47.3 KB    32      73.5%             4682088e-4a3c-4fbc-8874-054408121f0a  rack1
UN  127.0.0.2  80.35 KB   32      71.7%             b2411268-f168-485d-9abe-77874eef81ce  rack2
UN  127.0.0.3  64.33 KB   32      54.8%             8b55a1c6-f971-4e01-a34b-bb37dd55bb89  rack3

Insert some test data into the cluster.

$ ccm node1 cqlsh
Connected to TLP-578-2120 at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 2.1.20 | CQL spec 3.2.1 | Native protocol v3]
Use HELP for help.
cqlsh> CREATE KEYSPACE music WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 };
cqlsh> CREATE TABLE music.playlists (
   ...  id uuid,
   ...  song_order int,
   ...  song_id uuid,
   ...  title text,
   ...  artist text,
   ...  PRIMARY KEY (id, song_id));
cqlsh> INSERT INTO music.playlists (id, song_order, song_id, artist, title)
   ...  VALUES (62c36092-82a1-3a00-93d1-46196ee77204, 1,
   ...  a3e64f8f-bd44-4f28-b8d9-6938726e34d4, 'Of Monsters and Men', 'Little Talks');
cqlsh> INSERT INTO music.playlists (id, song_order, song_id, artist, title)
   ...  VALUES (62c36092-82a1-3a00-93d1-46196ee77205, 2,
   ...  8a172618-b121-4136-bb10-f665cfc469eb, 'Birds of Tokyo', 'Plans');
cqlsh> INSERT INTO music.playlists (id, song_order, song_id, artist, title)
   ...  VALUES (62c36092-82a1-3a00-93d1-46196ee77206, 3,
   ...  2b09185b-fb5a-4734-9b56-49077de9edbf, 'Lorde', 'Royals');
cqlsh> exit

Write the data to disk by running nodetool flush on all the nodes.

$ for i in $(seq 1 3); do echo "Flushing node${i}"; ccm node${i} nodetool flush; done
Flushing node1
Flushing node2
Flushing node3

Check we can retrieve data from each node.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

 id          | song_order | song_id     | artist              | title
-------------+------------+-------------+---------------------+--------------
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

  id          | song_order | song_id     | artist              | title
 -------------+------------+-------------+---------------------+--------------
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

  id          | song_order | song_id     | artist              | title
 -------------+------------+-------------+---------------------+--------------
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

Look for a node that has all of the system.local SSTable files in a single directory. In this particular test, there were no SSTable files in data directory data0 of node1.

$ ls .../node1/data0/system/local-7ad54392bcdd35a684174e047860b377/
$
$ ls .../node1/data1/system/local-7ad54392bcdd35a684174e047860b377/
system-local-ka-5-CompressionInfo.db  system-local-ka-5-Summary.db          system-local-ka-6-Index.db
system-local-ka-5-Data.db             system-local-ka-5-TOC.txt             system-local-ka-6-Statistics.db
system-local-ka-5-Digest.sha1         system-local-ka-6-CompressionInfo.db  system-local-ka-6-Summary.db
system-local-ka-5-Filter.db           system-local-ka-6-Data.db             system-local-ka-6-TOC.txt
system-local-ka-5-Index.db            system-local-ka-6-Digest.sha1
system-local-ka-5-Statistics.db       system-local-ka-6-Filter.db

Stop node1 and simulate a disk or volume mount going missing by removing the data1 directory entry from the data_file_directories setting.

$ ccm node1 stop

Before the change the setting entry was:

data_file_directories:
- .../node1/data0
- .../node1/data1

After the change the setting entry was:

data_file_directories:
- .../node1/data0

Start node1 again and check the logs. From the logs we can see the messages where the node has generated a new Host ID and took ownership of new tokens.

WARN  [main] 2018-08-21 12:34:57,111 SystemKeyspace.java:765 - No host ID found, created c62c54bf-0b85-477d-bb06-1f5d696c7fef (Note: This should happen exactly once per node).
INFO  [main] 2018-08-21 12:34:57,241 StorageService.java:844 - This node will not auto bootstrap because it is configured to be a seed node.
INFO  [main] 2018-08-21 12:34:57,259 StorageService.java:959 - Generated random tokens. tokens are [659824738410799181, 501008491586443415, 4528158823720685640, 3784300856834360518, -5831879079690505989, 8070398544415493492, -2664141538712847743, -303308032601096386, -553368999545619698, 5062218903043253310, -8121235567420561418, 935133894667055035, -4956674896797302124, 5310003984496306717, -1155160853876320906, 3649796447443623633, 5380731976542355863, -3266423073206977005, 8935070979529248350, -4101583270850253496, -7026448307529793184, 1728717941810513773, -1920969318367938065, -8219407330606302354, -795338012034994277, -374574523137341910, 4551450772185963221, -1628731017981278455, -7164926827237876166, -5127513414993962202, -4267906379878550578, -619944134428784565]

Check the cluster status again. From the output we can see that the Host ID for node1 changed from 4682088e-4a3c-4fbc-8874-054408121f0a to c62c54bf-0b85-477d-bb06-1f5d696c7fef

$ ccm node2 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  89.87 KB   32      100.0%            c62c54bf-0b85-477d-bb06-1f5d696c7fef  rack1
UN  127.0.0.2  88.69 KB   32      100.0%            b2411268-f168-485d-9abe-77874eef81ce  rack2
UN  127.0.0.3  106.66 KB  32      100.0%            8b55a1c6-f971-4e01-a34b-bb37dd55bb89  rack3

Check we can retrieve data from each node again.

for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

Cassandra 2.1.20 Results

When we run the above test against a cluster using Apache Cassandra version 2.1.20 and remove the additional data directory data1 from node1, we can see that our cql statement fails when retrieving data from node1. The error produced shows that the song_order column is unknown to the node.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

<stdin>:1:InvalidRequest: code=2200 [Invalid query] message="Undefined name song_order in selection clause"

  id          | song_order | song_id     | artist              | title
 -------------+------------+-------------+---------------------+--------------
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

  id          | song_order | song_id     | artist              | title
 -------------+------------+-------------+---------------------+--------------
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

An interesting side note, if nodetool drain is run node1 before it is shut down then the above error never occurs. Instead the following output appears when we run our cql statement to retrieve data from the nodes. As we can see below the query that failed now returns no rows of data.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

 id | song_order | song_id | artist | title
----+------------+---------+--------+-------

(0 rows)

 id          | song_order | song_id     | artist              | title
-------------+------------+-------------+---------------------+--------------
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

 id          | song_order | song_id     | artist              | title
-------------+------------+-------------+---------------------+--------------
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

Cassandra 2.2.13 Results

When we run the above test against a cluster using Apache Cassandra version 2.1.20 and remove the additional data directory data1 from node1, we can see that the cql statement fails retrieving data from node1. The error produced is similar to that produced in version 2.1.20 where id column name is unknown.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

<stdin>:1:InvalidRequest: Error from server: code=2200 [Invalid query] message="Undefined name id in selection clause"

  id          | song_order | song_id     | artist              | title
 -------------+------------+-------------+---------------------+--------------
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

  id          | song_order | song_id     | artist              | title
 -------------+------------+-------------+---------------------+--------------
  62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
  62c36092... |          3 | 2b09185b... |               Lorde |       Royals
  62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

Unlike Cassandra version 2.1.20, node1 never generated a new Host ID or calculated new tokens. This is because it replayed the commitlog and recovered most of the writes that had gone missing.

INFO  [main] ... CommitLog.java:160 - Replaying .../node1/commitlogs/CommitLog-5-1534865605274.log, .../node1/commitlogs/CommitLog-5-1534865605275.log
WARN  [main] ... CommitLogReplayer.java:149 - Skipped 1 mutations from unknown (probably removed) CF with id 5bc52802-de25-35ed-aeab-188eecebb090
...
INFO  [main] ... StorageService.java:900 - Using saved tokens [-1986809544993962272, -2017257854152001541, -2774742649301489556, -5900361272205350008, -5936695922885734332, -6173514731003460783, -617557464401852062, -6189389450302492227, -6817507707445347788, -70447736800638133, -7273401985294399499, -728761291814198629, -7345403624129882802, -7886058735316403116, -8499251126507277693, -8617790371363874293, -9121351096630699623, 1551379122095324544, 1690042196927667551, 2403633816924000878, 337128813788730861, 3467690847534201577, 419697483451380975, 4497811278884749943, 4783163087653371572, 5213928983621160828, 5337698449614992094, 5502889505586834056, 6549477164138282393, 7486747913914976739, 8078241138082605830, 8729237452859546461]

Cassandra 3.0.15 Results

When we run the above test against a cluster using Apache Cassandra version 3.0.15 and remove the additional data directory data1 from node1, we can see that the cql statement returns no data from node1.

$ for i in $(seq 1 3); do ccm node${i} cqlsh -e "SELECT id, song_order, song_id, artist, title FROM music.playlists"; done

 id | song_order | song_id | artist | title
----+------------+---------+--------+-------

(0 rows)

 id          | song_order | song_id     | artist              | title
-------------+------------+-------------+---------------------+--------------
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

 id          | song_order | song_id     | artist              | title
-------------+------------+-------------+---------------------+--------------
 62c36092... |          2 | 8a172618... |      Birds of Tokyo |        Plans
 62c36092... |          3 | 2b09185b... |               Lorde |       Royals
 62c36092... |          1 | a3e64f8f... | Of Monsters and Men | Little Talks

(3 rows)

Cassandra 3.11.3 Results

When we run the above test against a cluster using Apache Cassandra version 3.11.3 and remove the additional data directory data1 from node1, the node fails to start and we can see the following error message in the logs.

ERROR [main] 2018-08-21 16:30:53,489 CassandraDaemon.java:708 - Exception encountered during startup
java.lang.RuntimeException: A node with address /127.0.0.1 already exists, cancelling join. Use cassandra.replace_address if you want to replace this node.
    at org.apache.cassandra.service.StorageService.checkForEndpointCollision(StorageService.java:558) ~[apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.StorageService.prepareToJoin(StorageService.java:804) ~[apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:664) ~[apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:613) ~[apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:379) [apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:602) [apache-cassandra-3.11.3.jar:3.11.3]
    at org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:691) [apache-cassandra-3.11.3.jar:3.11.3]

In this case, the cluster reports node1 as down and still shows its original Host ID.

$ ccm node2 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
DN  127.0.0.1  191.96 KiB  32      100.0%            35a3c8ff-fa20-4f10-81cd-7284caeb00bd  rack1
UN  127.0.0.2  191.82 KiB  32      100.0%            2ebe4f0b-dc8f-4f46-93cd-37c410174a49  rack2
UN  127.0.0.3  170.46 KiB  32      100.0%            0384793e-7f59-40aa-a487-97f410dded4b  rack3

After inspecting the SSTables in both data directories we can see that a new Host ID 7d910c98-f69b-41b4-988a-f432b2e54b38 has been assigned to the node even though it failed to start.

$ ./tools/bin/sstabledump .../node1/data0/system/local-7ad54392bcdd35a684174e047860b377/mc-12-big-Data.db | grep host_id
      { "name" : "host_id", "value" : "35a3c8ff-fa20-4f10-81cd-7284caeb00bd", "tstamp" : "2018-08-21T06:24:08.106Z" },


$ ./tools/bin/sstabledump .../node1/data1/system/local-7ad54392bcdd35a684174e047860b377/mc-10-big-Data.db | grep host_id
      { "name" : "host_id", "value" : "7d910c98-f69b-41b4-988a-f432b2e54b38" },

Take away messages

As we have seen from testing there are potential dangers with using multiple directories in Cassandra. By simply removing one of the data directories in the setting a node can become a brand new node and affect the rest of the cluster. The JBOD feature can be useful in emergencies where disk space is urgently needed, however its usage in this case should be temporary.

The use of multiple disks in a Cassandra node, I feel is better done at the OS or hardware layer. Systems like LVM and RAID were designed to allow multiple disks to be used together to make up a volume. Using something like LVM or RAID rather than Cassandra’s JBOD feature reduces the complexity of the Cassandra configuration and the number of moving parts on the Cassandra side that can go wrong. By using the JBOD feature in Cassandra, it subtlety increases operational complexity and reduces the nodes ability to fail fast. In most cases I feel it is more useful for a node to fail out right rather than limp on and potentially impact the cluster in a negative way.

As a final thought, I think one handy feature that could be added to Apache Cassandra to help prevent issues associated with JBOD is the ability to check if the data, commitlog, saved_caches and hints are all empty prior to bootstrapping. If they are empty, then the node proceeds as normal. If they contain data, then perhaps the node could fail to start and print an error message in the logs.

Testing Apache Cassandra 4.0

With the goal of ensuring reliability and stability in Apache Cassandra 4.0, the project’s committers have voted to freeze new features on September 1 to concentrate on testing and validation before cutting a stable beta. Towards that goal, the community is investing in methodologies that can be performed at scale to exercise edge cases in the largest Cassandra clusters. The result, we hope, is to make Apache Cassandra 4.0 the best-tested and most reliable major release right out of the gate.

In the interests of communication (and hopefully more participation), here’s a look at some of the approaches being used to test Apache Cassandra 4.0:


Replay Testing

Workload Recording, Log Replay, and Comparison

Replay testing allows for side-by-side comparison of a workload using two versions of the same database. It is a black-box technique that answers the question, “did anything change that we didn’t expect?”

Replay testing is simple in concept: record a workload, then re-issue it against two clusters – one running a stable release and the second running a candidate build. Replay testing a stateful distributed system is more challenging. For a subset of workloads, we can achieve determinism in testing by grouping writes by CQL partition and ordering them via client-supplied timestamps. This also allows us to achieve parallelism, as recorded workloads can be distributed by partition across an arbitrarily-large fleet of writers. Though linearizing updates within a partition and comparing differences does not allow for validation of all possible workloads (e.g., CAS queries), this subset is very useful.

The suite of Full Query Logging (“FQL”) tools in Apache Cassandra enable workload recording. CASSANDRA-14618 and CASSANDRA-14619 will add fqltool replay and fqltool compare, enabling log replay and comparison. Standard tools in the Apache ecosystem such as Apache Spark and Apache Mesos can also make parallelizing replay and comparison across large clusters of machines straightforward.


Fuzz Testing and Property-Based Testing

Dynamic Test Generation and Fuzzing

Fuzz testing dynamically generates input to be passed through a function for validation. We can make fuzz testing smarter in stateful systems like Apache Cassandra to assert that persisted data conforms to the database’s contracts: acknowledged writes are not lost, deleted data is not resurrected, and consistency levels are respected. Fuzz testing of storage systems to validate these properties requires maintaining a record of responses received from the system; the development of a model representing valid legal states of data within the database; and a validation pass to assert that responses reflect valid states according to that model.

Property-based testing combines fuzz testing and assertions to explore a state space using randomly-generated input. These tests provide dynamic input to the system and assert that its fundamental properties are not violated. These properties can range from generic (e.g., “I can write data and read it back”) to specific (“range tombstone bounds synthesized during short-read-protection reads are properly closed”); and from local to distributed (e.g., “replacing every single node in a cluster results in an identical database”). To simplify debugging, property-based testing libraries like QuickTheories also provide a “shrinker,” which attempts to generate the simplest possible failing case after detecting input or a sequence of actions that triggers a failure.

Unlike model checkers, property-based tests don’t exhaust the state space – but explore it until a threshold of examples is reached. This allows for the computation to be distributed across many machines to gain confidence in code and infrastructure that scales with the amount of computation applied to test it.


Distributed Tests and Fault-Injection Testing

Validating Behavior Under Fault Scenarios

All of the above techniques can be combined with fault injection testing to validate that the system maintains availability where expected in fault scenarios, that fundamental properties hold, and that reads and writes conform to the system’s contracts. By asserting series of invariants under fault scenarios using different techniques, we gain the ability to exercise edge cases in the system that may reveal unexpected failures in extreme scenarios. Injected faults can take many forms – network partitions, process pauses, disk failures, and more.


Upgrade Testing

Ensuring a Safe Upgrade Path

Finally, it’s not enough to test one version of the database. Upgrade testing allows us to validate the upgrade path between major versions, ensuring that a rolling upgrade can be completed successfully, and that contents of the resulting upgraded database is identical to the original. To perform upgrade tests, we begin by snapshotting a cluster and cloning it twice, resulting in two identical clusters. One of the clusters is then upgraded. Finally, we perform a row-by-row scan and comparison of all data in each partition to assert that all rows read are identical, logging any deltas for investigation. Like fault injection tests, upgrade tests can also be thought of as an operational scenario all other types of tests can be parameterized against.


Wrapping Up

The Apache Cassandra developer community is working hard to deliver Cassandra 4.0 as the most stable major release to date, bringing a variety of methodologies to bear on the problem. We invite you to join us in the effort, deploying these techniques within your infrastructure and testing the release on your workloads. Learn more about how to get involved here.

The more that join, the better the release we’ll ship together.

Java 11 Support in Apache Cassandra 4.0

At the end of July, support for Java 11 was merged into the Apache Cassandra trunk, which will be shipped in the next major release, Cassandra 4.0. Prior to this, Cassandra 3.0 only ran using Java 8, since there were breaking changes in Java that prevented it from run on later versions. Cassandra now supports both Java 8 and 11.

To run Cassandra on Java 11, you’ll need to first download an early access build of jdk java 11, since there’s still no official released. I downloaded a build for my Mac and untar’ed the archive.

Next, you’ll need to set the environment variables. On my mac I’ve set the following variables:

$ export JAVA_HOME="/Users/jhaddad/Downloads/jdk-11.jdk/Contents/Home"
$ export JAVA8_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home"

You can get Cassandra by cloning the git repo and building using ant:

$ git clone https://github.com/apache/cassandra.git
$ cd cassandra
$ ant

You should see the build script finish with something like the following:

write-poms:
   [script] Warning: Nashorn engine is planned to be removed from a future JDK release

init:

maven-ant-tasks-localrepo:

maven-ant-tasks-download:

maven-ant-tasks-init:

maven-declare-dependencies:

_write-poms:

build-test:

jar:
      [jar] Building jar: /Users/jhaddad/dev/cassandra/build/tools/lib/stress.jar

BUILD SUCCESSFUL
Total time: 7 seconds

You can now start Cassandra with the following:

$ bin/cassandra -f

One feature that could be a big deal over time is the new garbage collection algorithm, ZGC. The goal of ZGC is to work on huge heaps while maintaining low latency, 10ms or less. If it delivers on the promise, we could avoid an entire optimization process that many teams struggle with. It can be enabled with these JVM flags.

-XX:+UnlockExperimentalVMOptions
-XX:+UseZGC

To use ZGC in Cassandra 4.0, you can add the JVM flags to the cassandra-env.sh file located in the conf directory of the repository as shown below. Note that flags are add above the JVM_OPTS="$JVM_OPTS $JVM_EXTRA_OPTS" line at the end of the file.

JVM_OPTS="$JVM_OPTS -XX:+UnlockExperimentalVMOptions"
JVM_OPTS="$JVM_OPTS -XX:+UseZGC"
JVM_OPTS="$JVM_OPTS $JVM_EXTRA_OPTS"

The Cassandra team intends to freeze the trunk branch in September, committing to bug fixes and stability improvements before releasing 4.0. We’d love feedback on the release during this period - especially in regards to performance with Java 11. We appreciate any help testing real world workloads (in a staging environment!). Bugs can be reported to the Cassandra JIRA. We aim to make the 4.0 release stable on day one. We encourage everyone to get involved early to ensure the high quality of this important release!

Proposal for a New Cassandra Cluster Key Compaction Strategy

Cassandra storage is generally described as a log-structured merge tree (LSM). In general, LSM storage provides great speed in performing writes, updates and deletes over reads. As a general rule, a write in Cassandra is an order of magnitude faster than a read. Not that reads are necessarily slow, but rather that the entire design of the server is to do writes very quickly and efficiently.

To manage data written to LSM storage, the files created by the fast writes need to be re-organized to help read efficacy and manage storage space. The process to perform this reorganization is called “compaction.” There are currently three generally available compaction strategies, each designed to optimize certain workloads.

Unfortunately, there are many workloads which don’t necessarily fit well into any of the current compaction strategies. What I hope to do here is present a convincing argument for a fourth compaction strategy which I think will fit the needs of many use cases which today are left out in the cold.

I am calling my proposed compaction strategy: Cluster Key Compaction Strategy (CKCS)

Existing strategies

Size Tiered

Size Tiered Compaction Strategy (STCS) is the default compaction strategy and it has worked for many workloads through the years Cassandra has been in existence. It is recognized as having a relatively low write amplification level and it can generally keep the total number of SSTable files reasonably low, limiting the number of SSTable files that need to be referenced to find all the parts of a partition required by a read. One of its largest drawbacks is the amount of disk space required for a compaction.

Leveled Compaction

Leveled Compaction Strategy (LCS) attempts to address the large amount of disk space required for compaction, and at the same time it also works to drastically limit the number of SSTable files required to fulfill a read from a partition to just one or two SSTable files. Its main drawback is the dramatic increase in write amplification for all data stored in LCS. With LCS, SSTable files are allowed to grow only to a predefined fixed size with the requirement that all columns of a specific partition exist in only one SSTable file at each level. When compacting from one level to the next, many SSTable files are both merged and distributed to many SSTable files.

Time Window

Time Window Compaction Strategy (TWCS) uses STCS inside of a set of predefined windows or buckets based on time to store data. It deliberately keeps partitions spread across many SSTable files. By the use of windows, the space required to perform a compaction can be reduced by up to the number of windows. For example, if the number of windows is 20, then the space required for any TWCS compaction will be no more than 1/20 of the space consumed by the table. It also results in the lowest write amplification of any of the compaction strategies. While this is not enforced, it is strongly recommended that TWCS be used only with data that is known to have a limited lifetime, preferably through the Time To Live (TTL) feature of Cassandra. TWCS was designed to store time series data where the data coming in is dividable into well-defined time chunks.

TWCS does not play well with hinted handoffs, read repairs or regular repairs, all of which can end up putting data which might belong in one window into a different window. This is not usually a problem if the data is short-lived or not of a critical nature. But that is not always the case in the real world.

Limits to existing strategies

As discussed above, each strategy has its strengths and weaknesses. Each needs to be carefully evaluated to decide which is best for your application.

STCS requires a large amount of space to perform compactions and may need many SSTable files read to find all parts of a specific row for a given partition, but it has fairly low write amplification.

LCS dramatically reduces the amount of space required for a compaction and greatly improves the likelihood that all the rows of a partition will be in the same place, but it can produce a huge number of SSTable files and it results in a massive increase in write amplification. It’s best used with workloads where reads are 90% or better of the workload.

TWCS is designed to work with time series data only. It is based on server time, having nothing at all to do with anything stored in the data itself. Like LCS, it greatly reduces the space required for compaction and it also has even better write amplification than STCS. It does not work well with Cassandra’s current anti-entropy mechanisms which makes it unsuitable for some kinds of data which might otherwise fit

Why a new strategy

In the last four years I have spent time consulting for different organizations which are using or planning to use Cassandra, and I keep finding workloads which would benefit from a compaction strategy that has features of both LCS and TWCS, and yet is still distinct from either one.

In fact, there are really two distinct use cases one could argue belong to separate strategies, but I think a single strategy could be created to fit both.

I would like to propose Cluster Key Compaction Strategy (CKCS). In the CKCS, SSTable files will be grouped together based on its cluster key values. Either on a set of moving windows, much like TWCS uses where a specific number of windows contain data expected to expire over time to limit the total number of windows, or based on a predefined set of set of partitions for the entire key range. By basing the window selection on cluster key values, the windows become defined outside of current server time, allowing Cassandra anti-entropy tools to work, although this will increase write amplification and SSTable file counts over traditional TWCS. It will also allow data sets which are not time-based to benefit from the compaction space and partition spread out that is in the nature of the current TWCS strategy.

Proposed CKCS Details

The proposed CKCS will use the first column of the cluster key to define buckets which will be used to designate groups of SSTable file sets used to store data. In order to make the definition simple, the data type of that first column will need to be fixed in width and the possible key values well understood. Small integer, integer, large integer and timestamps would be the simplest to use and not, in my opinion, an unnecessarily restricted list.

How the CKCS would work

When a table is created with CKCS one of two bucket definition parameter types will be used.

  1. Moving window variation. Two parameters are used: one defines the unit size much like TWCS and should be caused unit. A unit can be a timeframe (seconds, minutes, hours, days) or it can be a number scale (ones, tens, hundreds, thousands, millions). The second parameter is the window size in units. With the moving window variation, it is assumed that all data written to the table will eventually expire and the number of windows will therefore be limited based on the lifetime of data stored in the table.
  2. Static window variation. One parameter is used: The static window variation assumes long-lived data which is to be spread into multiple windows, or buckets based on the value of the entire contents of the cluster key column. With this variation, the window size is not specified by the user. Instead, the number of windows or buckets is specified. Cassandra will compute the “size” by taking the maximum absolute value range of the column and dividing by the number of desired windows or buckets.

In both approaches, when an SSTable is flushed to disk, behavior is normal. When enough SSTable files have been flushed defined by a compaction threshold variable, instead of compacting the SSTable files together, the data in the existing SSTable files will be distributed to a single SSTable file in each window or bucket. For normal operation of the moving window variation, this will look much like the first compaction in TWCS and probably result in an actual compaction. For the static window variation, this will cause the data in the tables to be distributed out, creating more rather than fewer SSTable files.

After data is distributed to a defined SSTable file window or bucket, compaction proceeds using Size Tiered compaction within that window or bucket. To allow efficient queries based on cluster key ranges, the range of cluster key values for a specific SSTable file will be stored as a part of the SSTable file’s metadata in its statistics file.

Benefits

This new compaction strategy will have benefits over TWCS and might likely succeed it as the primary time series compaction strategy, as it avoids many of the current issues with TWCS or its predecessor DTCS. In addition, this new strategy will bring some of the benefits of TWCS to database workloads which are not time series in nature.

Large partitions

Large partitions under both STCS and LCS cause significant extra work during compaction. By spreading the partition data out over a number of windows or buckets, partitions can become significantly larger before having the heap and CPU impact on Cassandra during compaction that large partitions do today.

Dealing with anti-entropy

Currently, Cassandra anti-entropy mechanisms tend to work counter-purpose to both TWCS and DTCS and often make it necessary to turn them off to avoid pushing data into the wrong windows. It is also impossible to reload existing data or add a new DC or even a new host without disrupting the windowing.

CKCS will ensure data gets put into the correct windows even with anti-entropy running. It will also allow maintenance activities, including data reloads, adding a new DC or a new host to an existing DC storing data into the correct window.

What CKCS won’t be able to do is ensure a final window compaction since there is never a certain final point in time for a given window. A “final” compaction is still likely to be a good idea; it just won’t ensure that all data will be in a single SSTable file for the window.

Compaction space savings

For both modes, moving window and static window, the compaction space savings will be comparable to what can be accomplished with TWCS or DTCS.

Write amplification

Write amplification benefits should be similar to TWCS for the moving window mode as long as writes take place during the actual time windows and anti-entropy is not generating significant out of window writes. In Static window mode, write amplification should be similar to standard STCS but the number of compactions increases while the sizes will decrease making for the overall I/O workload somewhat less spiky.

 


Find out how Pythian can help you with Cassandra Services.

Apache Cassandra Performance Tuning - Compression with Mixed Workloads

This is our third post in our series on performance tuning with Apache Cassandra. In our first post, we discussed how we can use Flame Graphs to visually diagnose performance problems. In our second post, we discussed JVM tuning, and how the different JVM settings can have an affect on different workloads.

In this post, we’ll dig into a table level setting which is usually overlooked: compression. Compression options can be specified when creating or altering a table, and it defaults to enabled if not specified. The default is great when working with write heavy workloads, but can become a problem on read heavy and mixed workloads.

Before we get into optimizations, let’s take a step back to understand the basics of compression in Cassandra. Once we’ve built a foundation of knowledge, we’ll see how to apply it to real world workloads.

How it works

When we create a table in Cassandra, we can specify a variety of table options in addition to our fields. In addition to options such as using TWCS for our compaction strategy, specifying gc grace seconds, and caching options, we can also tell Cassandra how we want it to compress our data. If the compression option is not specified, LZ4Compressor will be used, which is known for it’s excellent performance and compression rate. In addition to the algorithm, we can specify our chunk_length_in_kb, which is the size of the uncompressed buffer we write our data to as an intermediate step before writing to disk. Here’s an example of a table using LZ4Compressor with 64KB chunk length:

create table sensor_data ( 
    id text primary key, 
    data text) 
WITH compression = {'sstable_compression': 'LZ4Compressor', 
                    'chunk_length_kb': 64};

We can examine how well compression is working at the table level by checking tablestats:

$ bin/nodetool tablestats tlp_stress

Keyspace : tlp_stress
        Read Count: 89766
        Read Latency: 0.18743983245326737 ms
        Write Count: 8880859
        Write Latency: 0.009023213069816781 ms
        Pending Flushes: 0
                Table: sensor_data
                SSTable count: 5
                Old SSTable count: 0
                Space used (live): 864131294
                Space used (total): 864131294
                Off heap memory used (total): 2472433
                SSTable Compression Ratio: 0.8964684393508305
                Compression metadata off heap memory used: 140544

The SSTable Compression Ratio line above tells us how effective compression is. Compression ratio is calculated by the following:

compressionRatio = (double) compressed/uncompressed;

meaning the smaller the number, the better the compression. In the above example our compressed data is taking up almost 90% of the original data, which isn’t particularly great.

How data is written

I’ve found digging into the codebase, profiling and working with a debugger to be the most effective way to learn how software works.

When data is written to / read from SSTables, we’re not dealing with convenient typed objects, we’re dealing with streams of bytes. Our compressed data is written in the CompressedSequentialWriter class, which extends BufferedDataOutputStreamPlus. This writer uses a temporary buffer. When the data is written out to disk the buffer is compressed and some meta data about it is recorded to a CompressionInfo file. If there is more data than available space in the buffer, the buffer is written to, flushed, and the buffer starts fresh to be written to again (and perhaps flushed again). You can see this in org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java:

@Override
public void write(byte[] b, int off, int len) throws IOException
{
    if (b == null)
        throw new NullPointerException();

    // avoid int overflow
    if (off < 0 || off > b.length || len < 0
        || len > b.length - off)
        throw new IndexOutOfBoundsException();

    if (len == 0)
        return;

    int copied = 0;
    while (copied < len)
    {
        if (buffer.hasRemaining())
        {
            int toCopy = Math.min(len - copied, buffer.remaining());
            buffer.put(b, off + copied, toCopy);
            copied += toCopy;
        }
        else
        {
            doFlush(len - copied);
        }
    }
}

The size of this buffer is determined by chunk_length_in_kb.

How data is read

The read path in Cassandra is (more or less) the opposite of the write path. We pull chunks out of SSTables, decompress them, and return them to the client. The full path is a little more complex - there’s a a ChunkCache (managed by caffeine) that we go through, but that’s beyond the scope of this post.

During the read path, the entire chunk must be read and decompressed. We’re not able to selectively read only the bytes we need. The impact of this is that if we are using 4K chunks, we can get away with only reading 4K off disk. If we use 256KB chunks, we have to read the entire 256K. This might be fine for a handful of requests but when trying to maximize throughput we need to consider what happens when we have requests in the thousands per second. If we have to read 256KB off disk for ten thousand requests a second, we’re going to need to read 2.5GB per second off disk, and that can be an issue no matter what hardware we are using.

What about page cache?

Linux will automatically leverage any RAM that’s not being used by applications to keep recently accessed filesystem blocks in memory. We can see how much page cache we’re using by using the free tool:

$ free -mhw
              total        used        free      shared     buffers       cache   available
Mem:            62G        823M         48G        1.7M        261M         13G         61G
Swap:          8.0G          0B        8.0G

Page cache can be a massive benefit if you have a working data set that fits in memory. With smaller data sets this is incredibly useful, but Cassandra was built to solve big data problems. Typically that means having a lot more data than available RAM. If our working data set on each node is 2 TB, and we only have 20-30 GB of free RAM, it’s very possible we’ll serve almost none of our requests out of cache. Yikes.

Ultimately, we need to ensure we use a chunk length that allows us to minimize our I/O. Larger chunks can compress better, giving us a smaller disk footprint, but we end up needing more hardware, so the space savings becomes meaningless for certain workloads. There’s no perfect setting that we can apply to every workload. Frequently, the most reads you do, the smaller the chunk size. Even this doesn’t apply uniformly; larger requests will hit more chunks, and will benefit from a larger chunk size.

The Benchmarks

Alright - enough with the details! We’re going to run a simple benchmark to test how Cassandra performs with a mix of read and write requets with a simple key value data model. We’ll be doing this using our stress tool, tlp-stress (commit 40cb2d28fde). We will get into the details of this stress tool in a later post - for now all we need to cover is that it includes a key value workload out of the box we can leverage here.

For this test I installed Apache Cassandra 3.11.3 on an AWS c5d.4xlarge instance running Ubuntu 16.04 following the instructions on cassandra.apache.org, and updated all the system packages using apt-get upgrade. I’m only using a single node here in order to isolate the compression settings and not introduce noise from the network overhead of running a full cluster.

The ephemeral NVMe disk is using XFS and mounted it at /var/lib/cassandra. I set readahead using blockdev --setra 0 /dev/nvme1n1 so we can see the impact that compression has on our disk requests and not hide it with page cache.

For each workload, I put the following command in a shell script, and ran tlp-stress from a separate c5d.4xlarge instance (passing the chunk size as the first parameter):

$ bin/tlp-stress run KeyValue -i 10B -p 10M --populate -t 4 \
  --replication "{'class':'SimpleStrategy', 'replication_factor':1}" \
  --field.keyvalue.value='book(100,200)' -r .5  \
  --compression "{'chunk_length_in_kb': '$1', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}" \
  --host 172.31.42.30

This runs a key value workload across 10 million partitions (-p 10M), pre-populating the data (--populate), with 50% reads (-r .5), picking 100-200 words from of one of the books included in the stress tool (--field.keyvalue.value='book(100,200)'). We can specify a compression strategy using --compression.

For the test I’ve used slightly modified Cassandra configuration files to reduce the effect of GC pauses by increasing the total heap (12GB) as well as the new gen (6GB). I spend a small amount of time on this as optimizing it perfectly isn’t necessary. I also set compaction throughput to 160.

For the test, I monitored the JVM’s allocate rate using the Swiss Java Knife (sjk-plus) and disk / network / cpu usage with dstat.

Default 64KB Chunk Size

The first test used the default of 64KB chunk length. I started the stress command and walked away to play with my dog for a bit. When I came back, I was through about 35 million requests:

stress 64kb

You can see in the above screenshot our 5 minute rate is about 22K writes / second and 22K reads/ second. Looking at the output of dstat at this time, we can see we’re doing between 500 and 600MB / second of reads / second:

DStat 64KB

Memory allocation fluctuated a bit, but it hovered around 1GB/s:

sjk 4kb

Not the most amazing results in the world. Of the disk reads, some of that throughput can be attributed to compaction, which we’ll always have to contend with in the real world. That’s capped at 160MB/s, leaving around 400MB/s to handle reads. That’s a lot considering we’re only sending 25MB across the network. That means we’re doing over 15x the disk I/O than our network I/O. We are very much disk bound in this workload.

4KB Chunk Size

Let’s see if the 4KB chunk size does any better. Before the test I shut down Cassandra, cleared the data directory, and started things back up. I ran the same stress test above using the above shell script, passing 4 as the chunk size. I once again played fetch with my dog for a bit and came back after around the same time as the previous test.

Looking at the stress output, it’s immediately obvious there’s a significant improvement:

stress

In almost every single metric reported by the metric library the test with 4KB outperforms the 64KB test. Our throughput is better (62K ops / second vs 44K ops / second in the 1 minute rate), and our p99 for reads is better (13ms vs 24ms).

If we’re doing less I/O on each request, how does that impact our total disk and network I/O?

dstat 4kb

As you can see above, there’s a massive improvement. Disk I/O is significantly reduced from making smaller (but more) requests to disk, and our network I/O is significantly higher from responding to more requests.

sjk 4kb

It was initially a small surprise to see an increased heap allocation rate (because we’re reading WAY less data into memory), but this is simply the result of doing a lot more requests. There are a lot of objects created in order to satisfy a request; far more than the number created to read the data off disk. More requests results in higher allocation. We’d want to ensure those objects don’t make it into the Old Gen as we go through JVM tuning.

Off Heap Memory Usage

The final thing to consider here is off heap memory usage. Along side each compressed SSTable is compression metadata. The compression files have names like na-9-big-CompressionInfo.db. The compression metadata is stored in memory, off the Cassandra heap. The size of the offheap usage is directly proportional to the amount of chunks used. More chunks = more space used. More chunks are used when a smaller chunk size is used, hence more offheap memory is used to store the metadata for each chunk. It’s important to understand this trade off. A table using 4KB chunks will use 16 times the memory as one using 64KB chunks.

In the example I used above the memory usage can be seen as follows:

Compression metadata off heap memory used: 140544 

Changing Existing Tables

Now that you can see how a smaller chunk size can benefit read heavy and mixed workloads, it’s time to try it out. If you have a table you’d like to change the compression setting on, you can do the following at the cqlsh shell:

cqlsh:tlp_stress> alter table keyvalue with compression = {'sstable_compression': 'LZ4Compressor', 'chunk_length_kb': 4};

New SSTables that are written after this change is applied will use this setting, but existing SSTables won’t be rewritten automatically. Because of this, you shouldn’t expect an immediate performance difference after applying this setting. If you want to rewrite every SSTable immediately, you’ll need to do the following:

nodetool upgradesstables -a tlp_stress keyvalue

Conclusion

The above is a single test demonstrating how a tuning compression settings can affect Cassandra performance in a significant way. Using out of the box settings for compression on read heavy or mixed workloads will almost certainly put unnecessary strain on your disk while hurting your read performance. I highly recommend taking the time to understand your workload and analyze your system resources to understand where your bottleneck is, as there is no absolute correct setting to use for every workload.

Keep in mind the tradeoff between memory and chunk size as well. When working with a memory constrained environment it may seem tempting to use 4KB chunks everywhere, but it’s important to understand that it’ll use more memory. In these cases, it’s a good idea to start with smaller tables that are read from the most.

Hardware-bound Zero Copy Streaming in Apache Cassandra 4.0

Streaming in Apache Cassandra powers host replacement, range movements, and cluster expansions. Streaming plays a crucial role in the cluster and as such its performance is key to not only the speed of the operations its used in but the cluster’s health generally. In Apache Cassandra 4.0, we have introduced an improved streaming implementation that reduces GC pressure and increases throughput several folds and are now limited, in some cases, only by the disk / network IO (See: CASSANDRA-14556).

Fig 1. Cassandra Streaming To get an understanding of the impact of these changes, let’s first have a look at the current streaming code path. The diagram below illustrates the stream session setup when a node attempts to stream data from a peer. Let’s say, we have a 3 node cluster (Nodes A, B, C). Node C is being rebuilt and has to stream all data that it is responsible for from A & B. C setups a streaming session with each of it’s peers (See: CASSANDRA-4560 how Cassandra applies Ford Fulkerson to optimize streaming peers). It exchanges messages to request ranges and begins streaming data from the selected nodes.

During the streaming phase, A collects all SSTables that have partitions in the requested ranges. It streams each SSTable by serializing individual partitions. Upon receiving the partition, node C reifies the data in memory and then writes it to disk. This is necessary to accurately transfer partitions from all possible SSTables for the requested ranges. This streaming path generates garbage and could be avoided in scenarios where all partitions within the SSTable need to be transmitted. This is common when you’re using LeveledCompactionStrategy or have enabled partitioning SSTables by token range (See: CASSANDRA-6696), etc.

To solve this problem CASSANDRA-14556 adds a Zero Copy streaming path. This significantly speeds up the transfer of SSTables and reduces garbage and unnecessary object creation. It modifies the streaming path to add additional information into the streaming header and uses ZeroCopy APIs to transfer bytes to and from the network and disk. So now, an SSTable may be transferred using this strategy when Cassandra detects that a complete SSTable needs to be transferred.

How do I use this feature?

It just works. This feature is controlled using stream_entire_sstables in cassandra.yaml and is enabled by default. Even though this feature is enabled, it will respect the throttling limits as defined by stream_throughput_outbound_megabits_per_sec.

Impact

Cassandra can stream SSTables only bounded by the hardware limitations (Network and Disk IO). With this optimization, we hope to make Cassandra more performant and reliable.

Microbenchmarking this feature shows a marked improvement (higher is better). Block Stream Writers are the ZeroCopy writers and Partial Stream Writers are the existing writers.

Benchmark Mode Cnt Score Error Units
ZeroCopyStreamingBenchmark.blockStreamReader thrpt 10 20.119 ± 1.300 ops/s
ZeroCopyStreamingBenchmark.blockStreamWriter thrpt 10 1339.672 ± 352.242 ops/s
ZeroCopyStreamingBenchmark.partialStreamReader thrpt 10 0.590 ± 0.135 ops/s
ZeroCopyStreamingBenchmark.partialStreamWriter thrpt 10 17.556 ± 0.323 ops/s

Conclusion

If you’re a Cassandra user, we would love to hear back from you. Please send us feedback via user Mailing List, Jira, or IRC (or any combination of the three).

How to migrate data from Cassandra to Elassandra in Docker containers

A client recently asked us to migrate a Cassandra cluster running in Docker containers to Elassandra, with the data directory persisted via a bind mount. Elassandra is a fork of Cassandra integrated closely with Elasticsearch, to allow for a highly scalable search infrastructure.

To prepare the maintenance plan, we tested some of the methods as shown below.

The following are the commands used if you would like to test the process locally. Docker commands are used on one node at a time throughout the process to execute test statements. The Cassandra container is named my_cassandra_container, and the test Elassandra container is called my_elassandra_container. Replace the local directory /Users/youruser below as appropriate.

Start the Cassandra Container

First, start a container with the latest Cassandra version (3.11.2), binding the data volume locally as datadir.

In our use case, variables such as data center were pre-determined, but note that Cassandra and Elassandra have different default values in the container startup scripts for some of the variables. In the example below, data center, rack, snitch, and token number will be sent explicitly via environment variables flags (-e), but you can alternatively adjust these in the configuration files before starting Elassandra.

It will take about 15 seconds for this to start up before Cassandra is ready to accept the write statement following this. If you’re following the logs, look for “Created default superuser role ‘cassandra'” before proceeding.

docker run --name my_cassandra_container -e CASSANDRA_DC=DC1 -e CASSANDRA_RACK=RAC1 -e CASSANDRA_ENDPOINT_SNITCH=SimpleSnitch  -e CASSANDRA_NUM_TOKENS=8 -v /Users/youruser/mytest/datadir:/var/lib/cassandra -d cassandra:latest

Copy Configuration Files

Copy the Cassandra configuration files to a local location for ease of editing.

docker cp my_cassandra_container:/etc/cassandra/ /Users/youruser/mytest/cassandra

Create and Validate Test Data

Next, create some data in Cassandra using cassandra-stress as a data generator.

docker exec -it my_cassandra_container cassandra-stress write n=20000 -pop seq=1..20000 -rate threads=4

For comparison later, do a simple validation of the data by executing count and sample queries.

docker exec -it my_cassandra_container cqlsh -e "select count(*) from keyspace1.standard1"
docker exec -it my_cassandra_container cqlsh -e "select * from keyspace1.standard1 limit 1"

Stop and Remove the Cassandra Container

To prepare for the migration, stop Cassandra and remove the container.

docker exec -it my_cassandra_container nodetool flush
docker stop my_cassandra_container
docker rm my_cassandra_container

Install Elassandra Container

On a new container, install the latest Elassandra version using the same local data and configuration file paths as above. Again, it will take 15 seconds or so before the next statement can be run. If you are following the logs, look for “Elassandra started.”

docker run --name my_elassandra_container -e CASSANDRA_DC=DC1 -e CASSANDRA_RACK=RAC1 -e CASSANDRA_ENDPOINT_SNITCH=SimpleSnitch  -e CASSANDRA_NUM_TOKENS=8 -v /Users/youruser/mytest/datadir:/var/lib/cassandra -d strapdata/elassandra:latest

Validate Data

Now that Elassandra is running, re-validate the data. Note that at this point, only the fork of Cassandra is running, not integrated yet with Elasticsearch.

docker exec -it my_elassandra_container cqlsh -e "select count(*) from keyspace1.standard1"
docker exec -it my_elassandra_container cqlsh -e "select * from keyspace1.standard1 limit 1"

Repeat the above steps on remaining nodes.

Enable Elasticsearch

To enable the Elasticsearch part of Elassandra, stop Cassandra on all nodes. A rolling update does not work for this step. Enable Elasticsearch by updating the elasticsearch.yml configuration file as below. (Note that you have linked it to your local filesystem via the cp statement, so edit it directly on your local machine.)

docker stop my_elassandra_container
docker cp my_elassandra_container:/opt/elassandra-6.2.3.1/conf/elasticsearch.yml /Users/youruser/mytest/cassandra

vi /Users/youruser/mytest/cassandra/elasticsearch.yml

cluster.name: Test Cluster  ## Name of cluster
network.host: 172.17.0.2  ## Listen address
http.port: 9200

Restart and Validate Elassandra

Finally, restart and test the Elassandra container.

docker start my_elassandra_container

docker exec -it my_elassandra_container curl -X GET http://localhost:9200/

Sample output:

Elassandra GET Output

Elassandra GET Output

Thank you to Valerie Parham-Thompson for assistance in testing.

Cassandra CQL Cheatsheet

Every now and then I find myself looking for a couple of commands I do often. In some other software/technologies we sometimes find a thing called a “cheatsheet” that displays the more used (and some more obscure commands) of that software/technology.

I tried to find one for CQL and since I didn’t find one… I created a CQL one! This is not an extensive, exhaustive list, its just the commands I tend to use to most and related ones.

Suggestions are accepted! Leave your suggestions in the comments below!

Also, Printable version coming soon!

DISCLAIMER: This is for the latest Cassandra version (3.11.2)

Without further conversation, here it is:

CQLSH Specific

Login

$ cqlsh [node_ip] -u username -p password

Use Color

$ cqlsh [node_ip] -C -u username -p password

Execute command

$ cqlsh -e ‘describe cluster’

Execute from file

$ cqlsh -f cql_commands.cql

Set consistency

$cqlsh> CONSISTENCY QUORUM ;

Run commands from file

$cqlsh> SOURCE ‘/home/cjrolo/cql_commands.cql’ ;

Capture output to file

$cqlsh> CAPTURE ‘/home/cjrolo/cql_output.cql’ ;

Enable Tracing

$cqlsh> TRACING ONE;

Vertical Printing of Rows

$cqlsh> EXPAND ON;

Print tracing session

$cqlsh> SHOW SESSION 898de000-6d83-11e8-9960-3d86c0173a79;

Full Reference:

https://cassandra.apache.org/doc/latest/tools/cqlsh.html

 

CQL Commands

Create Keyspace

CREATE KEYSPACE carlos WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’ : 3};

Alter Keyspace

ALTER KEYSPACE carlos WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’ : 1};

Drop Keyspace

DROP KEYSPACE carlos;

Create Table

CREATE TABLE carlos.foobar (

  foo int PRIMARY KEY,

  bar int

);

Alter Table

ALTER TABLE carlos.foobar WITH compaction = { ‘class’ : ‘LeveledCompactionStrategy’} AND read_repair_chance = 0;

Drop Table

DROP TABLE foobar;

Create Role

CREATE ROLE admins WITH LOGIN = true AND SUPERUSER = true;

Create User

CREATE USER carlos WITH PASSWORD ‘some_password’ NOSUPERUSER;

Assign Role

GRANT admins TO carlos;

Revoke Role

REVOKE admins FROM carlos;

List Roles

LIST ROLES;

Use Keyspace

USE carlos;

Insert

INSERT INTO foobar (foo, bar) VALUES (0, 1);

Insert with TTL

INSERT INTO foobar (foo, bar) VALUES (1, 2) USING TTL 3600;

Update

UPDATE foobar SET bar = 42 WHERE foo = 1;

Select

SELECT * FROM foobar WHERE foo=0;

Delete

DELETE FROM foobar WHERE foo = 1;

Full Reference

https://cassandra.apache.org/doc/latest/cql/index.html

Cassandra backups using nodetool

Cassandra nodetool provides several types of commands to manage your Cassandra cluster. See my previous posts for an orientation to Cassandra nodetool and using nodetool to get Cassandra information. My colleague has provided an in-depth analysis of backup strategies in Cassandra that you can review to learn more about ways to minimize storage cost and time-to-recovery, and to maximize performance. Below I will cover the nodetool commands used in scripting these best practices for managing Cassandra full and incremental backups.

Snapshots

The basic way to backup Cassandra is to take a snapshot. Since sstables are immutable, and since the snapshot command flushes data from memory before taking the snapshot, this will provide a complete backup.

Use nodetool snapshot to take a snapshot of sstables. You can specify a particular keyspace as an optional argument to the command, like nodetool snapshot keyspace1. This will produce a snapshot for each table in the keyspace, as shown in this sample output from nodetool listsnapshots:

Snapshot Details:
Snapshot name Keyspace name Column family name True size Size on disk
1528233451291 keyspace1 standard1 1.81 MiB 1.81 MiB
1528233451291 keyspace1 counter1 0 bytes 864 bytes

The first column is the snapshot name, to refer to the snapshot in other nodetool backup commands. You can also specify tables in the snapshot command.

The output at the end of the list of snapshots — for example, Total TrueDiskSpaceUsed: 5.42 MiB — shows, as the name suggests, the actual size of the snapshot files, as calculated using the walkFileTree Java method. Verify this by adding up the files within each snapshots directory under your data directory keyspace/tablename (e.g., du -sh /var/lib/cassandra/data/keyspace1/standard1*/snapshots).

To make the snapshots more human readable, you can tag them. Running nodetool snapshot -t 2018June05b_DC1C1_keyspace1 keyspace1 results in a more obvious snapshot name as shown in this output from nodetool listsnapshots:

2018June05b_DC1C1_keyspace1 keyspace1 standard1 1.81 MiB 1.81 MiB
2018June05b_DC1C1_keyspace1 keyspace1 counter1 0 bytes 864 bytes

However, if you try to use a snapshot name that exists, you’ll get an ugly error:

error: Snapshot 2018June05b_DC1C1_keyspace1 already exists.
-- StackTrace --
java.io.IOException: Snapshot 2018June05b_DC1C1_keyspace1 already exists....

The default snapshot name is already a timestamp (number of milliseconds since the Unix epoch), but it’s a little hard to read. You could get the best of both worlds by doing something like (depending on your operating system): nodetool snapshot -t keyspace1_date +”%s” keyspace1. I like how the results of listsnapshots sorts that way, too. In any case, with inevitable snapshot automation, the human-readable factor becomes largely irrelevant.

You may also see snapshots in this listing that you didn’t take explicitly. By default, auto_snapshot is turned on in the cassandra.yaml configuration file, causing a snapshot to be taken anytime a table is truncated or dropped. This is an important safety feature, and it’s recommended that you leave it enabled. Here’s an example of a snapshot created when a table is truncated:

cqlsh> truncate keyspace1.standard1;

root@DC1C1:/# nodetool listsnapshots
Snapshot Details:
Snapshot name Keyspace name Column family name True size Size on disk
truncated-1528291995840-standard1 keyspace1 standard1 3.57 MiB 3.57 MiB

To preserve disk space (or cost), you will want to eventually delete snapshots. Use nodetool clearsnapshot with the -t flag and the snapshot name (recommended, to avoid deleting all snapshots). Specifying and the keyspace name will additionally filter the deletion to the keyspace specified. For example, nodetool clearsnapshot -t 1528233451291 — keyspace1 will remove just the two snapshot files listed above, as reported in this sample output:

Requested clearing snapshot(s) for [keyspace1] with snapshot name [1528233451291]

Note that if you forget the -t flag or the you will get undesired results. Without the -t flag, the command will not read the snapshot name, and without the delimiter, you will end up deleting all snapshots for the keyspace. Check syntax carefully.

The sstables are not tied to any particular instance of Cassandra or server, so you can pass them around as needed. (For example, you may need to populate a test server.) If you put an sstable in your data directory and run nodetool refresh, it will load into Cassandra. Here’s a simple demonstration:

cqlsh> truncate keyspace1.standard1

cp /var/lib/cassandra/data/keyspace1/standard1-60a1a450690111e8823fa55ed562cd82/snapshots/keyspace1_1528236376/* /var/lib/cassandra/data/keyspace1/standard1-60a1a450690111e8823fa55ed562cd82/

cqlsh> select * from keyspace1.standard1 limit 1;

key | C0 | C1 | C2 | C3 | C4
-----+----+----+----+----+----
(0 rows)

nodetool refresh keyspace1 standard1

cqlsh> select count(*) from keyspace1.standard1;
count
7425

This simple command has obvious implications for your backup and restore automation.

Incrementals

Incremental backups are taken automatically — generally more frequently than snapshots are scheduled — whenever sstables are flushed from memory to disk. This provides a more granular point-in-time recovery, as needed.

There’s not as much operational fun to be had with incremental backups. Use nodetool statusbackup to show if they are Running or not. By default, unless you’ve changed the cassandra.yaml configuration file, they will be not running. Turn them on with nodetool enablebackup and turn them off with nodetool disablebackup.

A nodetool listbackups command doesn’t exist, but you can view the incremental backups in the data directory under keyspace/table/backups. The backups/snapshots nomenclature is truly confusing, but you could think of snapshots as something you do, and backups as something that happen.

Restoring from incrementals is similar to restoring from a snapshot — copying the files and running nodetool refresh — but incrementals require a snapshot.

These various nodetool commands can be used in combination in scripts to automate your backup and recovery processes. Don’t forget to monitor disk space and clean up the files created by your backup processes.

Remember that if you’d like to try out these commands locally, you can use the ccm tool or the Cassandra Docker cluster here.

Backup Strategies in Cassandra

Cassandra is a distributed, decentralized, fault-tolerant system. Data is replicated throughout multiple nodes (centers) across various data centers. The fact that Cassandra is decentralized means that it can survive single or even multi-node failures without losing any data. With Cassandra, there is no single point of failure, making Cassandra a highly available database.

As long as there is one node containing the data, Cassandra can recover the data without resorting to an external backup. If set up right, Cassandra will be able to handle disk or other hardware failures even in the case of an entire data center going down.

However, Cassandra backups are still necessary to recover from the following scenarios:

  1. Errors made in data updates by client applications
  2. Accidental deletions
  3. Catastrophic failures that require the entire cluster to be rebuilt
  4. Data corruption
  5. A desire to rollback cluster to a previous known good state

 

Setting up a Backup Strategy

When setting up your backup strategy, you should consider some points:

  • Secondary storage footprint: Backup footprints can be much larger than the live database setup depending on the frequency of backups and retention period. It is therefore vital to create an efficient storage solution that decreases storage CAPEX (capital expenditure) as much as possible.
  • Recovery point objective (RPO):  The maximum targeted period in which data might be lost from service due to a significant incident.
  • Recovery time objective (RTO): The targeted duration of time and a Service Level Agreement within which a backup must be restored after a disaster/disruption to avoid unacceptable consequences associated with a break in business continuity.
  • Backup performance: The backup performance should be sufficient enough to at least match the data change rate in the Cassandra Cluster.

 

 

Backup Alternatives

 

Snapshot-Based Backups

The purpose of a snapshot is to make a copy of all or part of keyspaces and tables in a node and to save it into a separate file. When you take a snapshot, Cassandra first performs a flush to push any data residing in the memtables into the disk (SStables), and then makes a hard link to each SSTable file.

Each snapshot contains a manifest.json file that lists the SSTable files included in the snapshot to make sure that the entire contents of the snapshot are present.

Nodetool snapshot operates at the node level, meaning that you will need to run it at the same time on multiple nodes.

 

Incremental Backups

When incremental backups are enabled, Cassandra creates backups as part of the process of flushing SSTables to disk. The backup consists of a hard link to each data file that is stored in a backup directory. In Cassandra, incremental backups contain only new SStables files, making them dependent on the last snapshot created. Files created due to compaction are not hard linked.

 

Incremental Backups in Combination with Snapshots

By combining both methods, you can achieve a better granularity of the backups. Data is backed up periodically via the snapshot, and incremental backup files are used to obtain granularity between scheduled snapshots.

 

Commit Log Backup in Combination with Snapshot

This approach is a similar method to the incremental backup with snapshots. Rather than relying on incremental backups to backup newly added SStables, commit logs are archived. As with the previous solution, snapshots provide the bulk of backup data, while the archive of commit log is used for point-in-time backup.

 

CommitLog Backup in Combination with Snapshot and Incremental

In addition to incremental backups, commit logs are archived. This process relies on a feature called Commitlog Archiving.  Like with the previous solution, snapshots provide the bulk of backup data, incremental complement and the archive of commit log used for point-in-time backup.

Due to the nature of commit logs, it is not possible to restore commit logs to a different node other than the one it was backed up from. This limitation restricts the scope of restoring commit logs in case of catastrophic hardware failure. (And a node is not fully restored, only its data.)

 

Datacenter Backup

With this setup, Cassandra will stream data to the backup as it is added. This mechanism prevents cumbersome snapshot-based backups requiring files stored on a network. However, this will not protect from a developer mistake (e.g., deletion of data), unless there is a time buffer between both data centers.

 

 

Backup Options Comparison

 

TYPE ADVANTAGES DISADVANTAGES
Snapshot-based backups Simple to manage: Requires simple scheduled snapshot command to run on each of the nodes. Cassandra nodetool utility provides the clearsnapshot command that removes the snapshot files. (Auto snapshots on table drops are not visible to this command.) Potential large RPO: Snapshots require flushing all in-memory data to disk; therefore, frequent snapshot calls will impact the cluster’s performance.

Storage Footprint: Depending on various factors — such as workload type, compaction strategy, or versioning interval — compaction may cause multi-fold data to be backed up, causing an increase in Capital Expenditure (CapEx).

Snapshot storage management overhead: Cassandra admins are expected to remove the snapshot files to a safe location such as AWS S3.

Incremental Backups Better storage utilization: There are no duplicate records in backup, as compacted files are not backed up. Point-in-time backup: companies can achieve better RPO, as backing up from the incremental backup folder is a continuous process. Space management overhead: The incremental backup folders must be emptied after being backed up. Failure to do so may cause severe space issues on the cluster.

Spread across many files: Since incremental backups create files every time a flush occurs, it typically produces many small files, making file management and recovery not easy tasks and that can have an impact on RTO and the Service Level.

Incremental Backups in Combination with Snapshots Large backup files: Only data between snapshots are from the incremental backups.

Point-in-time: It provides point-in-time backup and restores.

Space management overhead: Every time a snapshot is backed up, data needs to be cleaned up.

Operationally burdensome: Requires DBAs to script solutions.

CommitLog Backup in Combination with Snapshot and Incremental Point in time: It provides the best point in time backup and restores. Space management overhead: Every time a snapshot backed-up data needs to be cleaned up, it increases Operational Expenditure (OpEx.)

Restore Complexity: Restore is more complicated as part of the restore will happen from the commit log replay.

Storage overhead: Snapshot-based backup will provide storage overhead because of duplication of data due to compaction, resulting in higher CapEx expenditure.

Highly complex: Due to the nature of dealing with three times the backups, plus the streaming and managing of the commit log, it is a highly sophisticated backup solution.

Datacenter Backup Hot Backup: It can provide a swift way to restore data.

Space management: Using RF = 1, you can avoid data replication

Additional Datacenter: Since it requires a new datacenter to be built, it needs higher CapEx as well as OpEx.

Prone to Developer Mistakes: Will not protect from developer mistakes (unless there is a time buffer, as mentioned above).

Cassandra information using nodetool

Cassandra nodetool provides several types of commands to manage your Cassandra cluster. See my previous post about Cassandra nodetool for an orientation to the types of things you can do with this helpful Cassandra administration tool. Here, I am sharing details about one type — getting Cassandra information about your installation using nodetool. These can be used to get different displays of the status and other insights into the Cassandra nodes and full cluster.

Cassandra nodetool is installed along with the database management software, and is used on the command line interface (e.g., inside the Terminal window), like this:

"<yoastmark

Cassandra Information: Nodes

Let’s start with some very basic information about the node.

nodetool version 

This will show the version of Cassandra running on this node. Another way to get similar information is by using cassandra -v.

Example output:

ReleaseVersion: 3.11.2

nodetool info 

In the same way that the popular nodetool status (see below) provides a single-glance overview of the cluster, this command provides a quick overview of the node. It is a convenient way, for example, to see the memory usage.

Example output:

ID                     : 817788af-4209-44df-9ae8-dc345376c946
Gossip active          : true
Thrift active          : false
Native Transport active: true
Load                   : 749.07 KiB
Generation No          : 1526478319
Uptime (seconds)       : 15813
Heap Memory (MB)       : 72.76 / 95.00
Off Heap Memory (MB)   : 0.01
Data Center            : DC1
Rack                   : RAC1
Exceptions             : 0
Key Cache              : entries 35, size 2.8 KiB, capacity 4 MiB, 325 hits, 396 requests, 0.821 recent hit rate, 14400 save period in seconds
Row Cache              : entries 0, size 0 bytes, capacity 0 bytes, 0 hits, 0 requests, NaN recent hit rate, 0 save period in seconds
Counter Cache          : entries 0, size 0 bytes, capacity 2 MiB, 0 hits, 0 requests, NaN recent hit rate, 7200 save period in seconds
Percent Repaired       : 100.0%
Token                  : (invoke with -T/--tokens to see all 256 tokens)
 

Cassandra Information: Cluster

Similarly, nodetool can provide basic information about the full cluster:

nodetool describecluster 

This will show a quick view of some of the important cluster configuration values: name, snitch type, partitioner type, and schema version.

Example output:

Cluster Information:
Name: Dev_Cluster
Snitch: org.apache.cassandra.locator.GossipingPropertyFileSnitch
DynamicEndPointSnitch: enabled
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
414b0faa-ac94-3062-808b-8f1e6776d456: [172.16.238.2, 172.16.238.3, 172.16.238.4, 172.16.238.5, 172.16.238.6, 172.16.238.7]

The “Schema Versions”: segment is especially important because it identifies any schema disagreements you might have between nodes. Every time a schema is changed, the schema is propagated to the other nodes. Sometimes you might see some persistent schema disagreements that could indicate that one of the nodes is down, and this is often resolved by either restarting the node or by doing a rolling restart to the entire node.

nodetool status 

If you run just one nodetool command on a server when you log in, this is it: a brief output of node state, address, and location.

Example output:

Datacenter: DC1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load Tokens       Owns (effective) Host ID                               Rack
UN  172.16.238.2  749.07 KiB 256          31.8% 817788af-4209-44df-9ae8-dc345376c946  RAC1
UN  172.16.238.3  471.93 KiB 256          33.6% a2e9a7b2-d665-4272-8327-ae7fbb0cf712  RAC2
UN  172.16.238.4  749.49 KiB 256          34.6% 603b610a-f8e3-476c-9952-5de57418ccff  RAC3
Datacenter: DC2
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load Tokens       Owns (effective) Host ID                               Rack
UN  172.16.238.5  528.98 KiB 256          34.0% f9bbf676-75c9-47ca-8826-1e8f0a3268e4  RAC1
UN  172.16.238.6  566.61 KiB 256          33.9% 93fd527c-e244-4aef-ab3f-2a6ee1f1d917  RAC2
UN  172.16.238.7  809.37 KiB 256          32.1% 25b1f103-85f9-4020-a4b3-8d1912443f55  RAC3
 

Cassandra Information: Backups

Other nodetool status commands can provide information about backups:

nodetool statusbackup 

Use this to view the status of incremental backups. If you have turned on incremental backups (e.g., via nodetool enablebackup), then the status will be running. If the incremental backups are disabled, the status will be not running.

Example output:

running

nodetool listsnapshots 

Use this command to view the schema snapshots on this node. This assumes you have created at least one snapshot (e.g., with nodetool snapshot).

Example output:

Snapshot Details:
Snapshot name Keyspace name      Column family name True size Size on disk
1526402908000 system_distributed parent_repair_history          0 bytes 13 bytes
1526402908000 system_distributed repair_history                 0 bytes 13 bytes
1526402908000 system_distributed view_build_status              0 bytes 13 bytes
1526402908000 keyspace1          standard1 1.51 MiB  1.51 MiB
1526402908000 keyspace1          counter1 0 bytes   864 bytes
1526402908000 system_auth        roles 0 bytes   13 bytes
1526402908000 system_auth        role_members 0 bytes   13 bytes
1526402908000 system_auth        resource_role_permissons_index 0 bytes   13 bytes
1526402908000 system_auth        role_permissions 0 bytes   13 bytes
1526402908000 system_traces      sessions 0 bytes   13 bytes
1526402908000 system_traces      events 0 bytes   13 bytes
Total TrueDiskSpaceUsed: 1.51 MiB

Cassandra Information: Data and Schema

Understand the details of your schema and data around the cluster with the following commands:

nodetool ring 

This command will produce a very long output of all tokens (primary key hashes) on a given node.

Example output:

Datacenter: DC1
==========
Address       Rack Status State   Load Owns         Token
172.16.238.3  RAC2 Up     Normal 421.98 KiB     32.70% -9220489979566737719
172.16.238.2  RAC1 Up     Normal 589.17 KiB     34.48% -9115796826660667716
172.16.238.3  RAC2 Up     Normal 421.98 KiB     32.70% -9100537612334946272
...

nodetool describering 

View detailed information on tokens present on a given node. Use a keyspace name along with this command (e.g., nodetool describering keyspace1).

Example output:

Schema Version:d7d68b06-5c21-3aa4-a2e4-f44eff6e25e3
TokenRange: TokenRange(start_token:2266716358050113757, end_token:2267497540130521369, endpoints:[172.16.238.6, 172.16.238.3], rpc_endpoints:[172.16.238.6, 172.16.238.3], endpoint_details:[EndpointDetails(host:172.16.238.6, datacenter:DC1, rack:RAC2), EndpointDetails(host:172.16.238.3, datacenter:DC2, rack:RAC1)]) TokenRange(start_token:-3767342014734755518, end_token:-3764135679630864587, endpoints:[172.16.238.6, 172.16.238.5], rpc_endpoints:[172.16.238.6, 172.16.238.5], endpoint_details:[EndpointDetails(host:172.16.238.6, datacenter:DC1, rack:RAC2), EndpointDetails(host:172.16.238.5, datacenter:DC2, rack:RAC3)]) TokenRange(start_token:-7182326699472165951, end_token:-7168882311135889918, endpoints:[172.16.238.3, 172.16.238.6], rpc_endpoints:[172.16.238.3, 172.16.238.6], endpoint_details:[EndpointDetails(host:172.16.238.3, datacenter:DC2, rack:RAC1), EndpointDetails(host:172.16.238.6, datacenter:DC1, rack:RAC2)]) TokenRange(start_token:-4555990503674633274, end_token:-4543114046836888769, endpoints:[172.16.238.5, 172.16.238.4], rpc_endpoints:[172.16.238.5, 172.16.238.4], endpoint_details:[EndpointDetails(host:172.16.238.5, datacenter:DC2, rack:RAC3), EndpointDetails(host:172.16.238.4, datacenter:DC1, rack:RAC3)])...

nodetool rangekeysample

This command will display a distribution of keys around the cluster.

Example output:

RangeKeySample: 2401899971471489924 8125817615588445820 6180648043275199428 -7666714398617260110 -59419700177700973...

nodetool viewbuildstatus 

Materialized views are populated in the background. This command will show the status of this building process. Specify the keyspace and view name (e.g., nodetool viewbuildstatus keyspace1 mv1). As the process runs, the output will change accordingly. (Note that materialized views are not recommended to be used in production.)

Example output:

keyspace1.mv1 has not finished building; node status is below.
Host          Info
/172.16.238.4 STARTED
/172.16.238.2 STARTED
/172.16.238.7 UNKNOWN

Later:

keyspace1.mv1 has not finished building; node status is below.
Host          Info
/172.16.238.4 SUCCESS
/172.16.238.2 STARTED
/172.16.238.7 SUCCESS

Finally:

keyspace1.mv1 has finished building

nodetool getendpoints 

Use getendpoints to find the node(s) holding a partition key using keyspace, table name, and token.

First, retrieve the key:

select key from keyspace1.standard1 where [your search terms];
 key
------------------------
 0x3138324b305033384e30

Then use the key to find the node(s):

nodetool getendpoints keyspace1 standard1 3138324b305033384e30
172.16.238.7
172.16.238.2
 

Cassandra Information: Processes

The inner workings of the Cassandra cluster are made more clear with the following nodetool commands.

nodetool compactionstats 

This command will display active and pending compactions.

Compactions are often intensive in terms of I/O and CPU, so monitoring pending compactions is often useful if you see any performance degradation or want to track operations such as repairs, SSTableupgrades, or rebuilds.

Example output:

pending tasks: 0

nodetool compactionhistory 

Similarly, this command will show completed compactions.

Example output:

4ed95830-5907-11e8-a690-df4f403979ef keyspace1     standard1 2018-05-16T12:47:35.731 1461715  1461715 {1:6357}
2de781b0-5907-11e8-a690-df4f403979ef keyspace1     standard1 2018-05-16T12:46:40.459 1462110  1461715 {1:6357}

nodetool gcstats 

A summarized view of garbage collection (gc) for a given node, in milliseconds since the last gc, is shown with this command. This might be useful for a dashboard display or for quick insight into the server.

Example output:

       Interval (ms) Max GC Elapsed (ms)Total GC Elapsed (ms)Stdev GC Elapsed (ms)   GC Reclaimed (MB) Collections Direct Memory Bytes
            20073305            9154 71095                 703 4353376560 305                       -1

nodetool statusgossip 

The node reports briefly whether or not it is communicating metadata to/from other nodes with this command. The default will be on unless it has been turned off (e.g., with nodetool disablegossip), perhaps for maintenance. If it is on, the output will be running. If gossip is disabled, the output will be (as you might guess) not running.

Example output:

Running

nodetool gossipinfo 

Assuming gossip is enabled, this will show what this node is communicating to other nodes in the cluster about itself.

Example output:

/172.16.238.2
  generation:1526415339
  heartbeat:21284
  STATUS:21216:NORMAL,-1122832159022483270
  LOAD:21238:6569245.0
  SCHEMA:12719:d386650c-2a99-336d-a7a8-9c25a4f39801
  DC:8:DC1
  RACK:10:RAC1
  RELEASE_VERSION:4:3.11.2
  INTERNAL_IP:21218:172.16.238.2
  RPC_ADDRESS:3:172.16.238.2
  NET_VERSION:1:11
  HOST_ID:2:817788af-4209-44df-9ae8-dc345376c946
  RPC_READY:58:true
  TOKENS:21215:<hidden>
...

nodetool statushandoff 

Hinted handoff might be turned off manually if you think a node will be down for too long (longer than max_hint_window_ms), or to avoid the traffic of hints over network when the node recovers. If it is running, the output will be Hinted handoff is running. If it has been disabled, the output will be Hinted handoff is not running.

Example output:

Hinted handoff is running

nodetool tpstats 

Usage statistics of the thread pools are shown with this command. The details of thread pool statistics warrant a separate blog entry, but meanwhile, the Cassandra documentation will provide a basic overview.

Example output:

Pool Name                         Active Pending Completed Blocked  All time blocked
ReadStage                              0 0 1789 0              0
MiscStage                              0 0 0 0              0
CompactionExecutor                     0 0 13314 0              0
MutationStage                          0 0 36069 0              0
MemtableReclaimMemory                  0 0 82 0              0
PendingRangeCalculator                 0 0 12 0              0
GossipStage                            0 0 132003 0              0
SecondaryIndexManagement               0 0 0 0              0
HintsDispatcher                        0 0 3 0              0
RequestResponseStage                   0 0 15060 0              0
Native-Transport-Requests              0 0 3390 0              2
ReadRepairStage                        0 0 6 0              0
CounterMutationStage                   0 0 0 0              0
MigrationStage                         0 0 10 0              0
MemtablePostFlush                      0 0 118 0              0
PerDiskMemtableFlushWriter_0           0 0 82 0              0
ValidationExecutor                     0 0 0 0              0
Sampler                                0 0 0 0              0
MemtableFlushWriter                    0 0 82 0              0
InternalResponseStage                  0 0 5651 0              0
ViewMutationStage                      0 0 0 0              0
AntiEntropyStage                       0 0 0 0              0
CacheCleanupExecutor                   0 0 0 0              0

Message type           Dropped
READ                         0
RANGE_SLICE                  0
_TRACE                       0
HINT                         0
MUTATION                   843
COUNTER_MUTATION             0
BATCH_STORE                  0
BATCH_REMOVE                 0
REQUEST_RESPONSE             0
PAGED_RANGE                  0
READ_REPAIR                  0
 

Cassandra Information: Performance Tuning

Finally, the following nodetool commands will provide information to support Cassandra performance tuning efforts.

nodetool proxyhistograms 

This command will give an overall sense of node performance, showing count and latency in microseconds. The values will be calculated as an average of the last 5 minutes.

Example output:

proxy histograms
Percentile       Read Latency Write Latency      Range Latency CAS Read Latency CAS Write Latency View Write Latency
                     (micros) (micros)           (micros) (micros) (micros)           (micros)
50%                   5839.59 8409.01           30130.99 0.00     0.00 0.00
75%                  14530.76 20924.30           89970.66 0.00    0.00 0.00
95%                  52066.35 155469.30          268650.95 0.00      0.00 0.00
98%                 107964.79 223875.79          268650.95 0.00     0.00 0.00
99%                 155469.30 268650.95          268650.95 0.00     0.00 0.00
Min                        263.21 315.85               5839.59 0.00 0.00               0.00
Max                 386857.37 464228.84          268650.95 0.00     0.00 0.00

nodetool tablehistograms 

This command displays count and latency as a measure of performance for a particular table. Specify the keyspace and table name when running tablehistograms. These values are also calculated over the last 5 minutes.

Example output:

keyspace1/standard1 histograms
Percentile  SSTables Write Latency      Read Latency Partition Size      Cell Count
                              (micros) (micros)       (bytes)
50%             0.00 152.32            454.83 NaN     NaN
75%             0.00 654.95           1629.72 NaN       NaN
95%             0.00 4866.32          10090.81 NaN         NaN
98%             0.00 8409.01          25109.16 NaN         NaN
99%             0.00 8409.01          30130.99 NaN         NaN
Min             0.00 20.50             51.01 NaN   NaN
Max             0.00 14530.76         155469.30 NaN           NaN

nodetool tablestats 

This will show the read/write count and latency per keyspace, and detailed count, space, latency, and indexing information per table. Note that this used to be called nodetool cfstats, so you will still see some references to that tool around (and cfstats is still aliased to tablestats).

nodetool toppartitions 

This command displays top partitions used for a table during a specified sampling period, in milliseconds. Cardinality is a count of unique operations in the sample. Count is the total number of operations during the sample per partition. The third column in the partition display (+/-) indicates the margin of error; an estimate rather than an exact count is captured to avoid overhead. Specify the keyspace, table name, and desired duration of sample.

Example output:

WRITES Sampler:
Cardinality: ~3 (256 capacity)
Top 10 partitions:
Partition  Count +/-
31344f50364f34343430 39 38
314c4c33373737333830 39 38
4b394e304e31334d4c30 39 38
4b32304e4d38304e3730 39 38
5034333737304b324d30 39 38
383335344e4b354c5030 39 38
343037374e33384b3331 39 38
3438504e324d4f345030 39 38
4f4f3834334f33333731 39 38
30353131333531343331 39 38

READS Sampler:
Cardinality: ~2 (256 capacity)
Top 10 partitions:
Partition Count +/-
4b3137304d3435303930 37 36
3431324b32334d354c30 37 36
4e31313739354c393930 37 36
34335039334c50303630 37 36
334e3437503839313131 37 36
314f31303933364c3130 37 36
4d4c36343533354b3431 37 36
4c4d334e4f4e31343530 37 36
4e324d394f3450343130 37 36
39364c30503634333930 37 36

The above has been a review of the informational commands available with Cassandra nodetool. Again, see my previous post for an orientation to Cassandra nodetool, and stay tuned for future posts on combining nodetools commands for administration tasks such as backups, performance tuning, and upgrades.

 

Orientation to Cassandra Nodetool

Nodetool is a broadly useful tool for managing Cassandra clusters. A large percentage of questions concerning Cassandra can easily be answered with a nodetool function.

Having been developed over time by a diverse open source community, the nodetool commands can seem at first glance to be defined within a minimally consistent syntax. On closer inspection, the individual commands can be organized into several overlapping buckets.

The first grouping consists of commands to view (get) or change (set) configuration variables. An example pair is getlogginglevels and setlogginglevel. By default, logging is set to INFO, midway in the available range of ALL, TRACE, DEBUG, INFO, WARN, ERROR, and OFF. Running nodetool getlogginglevels will display the currently set value.

Other get/set (sometimes prefixed as enable/disable) commands can be set either at startup or while Cassandra is running. For example, incremental backups can be enabled in the startup configuration file cassandra.yaml by setting incremental_backups=true. Alternatively, they can be started or stopped using nodetool, with the commands nodetool enablebackup and nodetool disablebackup. In general, though, most configuration values are either set in startup configuration files or set dynamically using nodetool; there is little overlap.

Several nodetool commands can be used to get insight into status of the Cassandra node, cluster, or even data. Two very basic informational commands are nodetool status and nodetool info. Nodetool status provides a brief output of node state (up, down, joining cluster, etc.), IP addresses, and datacenter location. Nodetool info provides a less brief output of key status variables. It is a convenient way to see memory utilization, for example.

Although the tool is named nodetool, not all commands apply to nodes. For example, nodetool describecluster provides information about the cluster — snitch and partitioner type, name, and schema versions. For another example, nodetool netstats provides information about communication among nodes.

The nodetool can not only be used for basic configuration and information; it is also a powerful tool for cluster operations and data management. The operations tasks of shutting down a node within a cluster or doing maintenance on a live node are made easier with commands like nodetool drain (flushes writes from memory to disk, shuts off connections, replays commitlog) and nodetool disablegossip (makes node invisible to the cluster). Data management tasks are made easier with commands like nodetool repair to sync data among nodes (perhaps due to missed writes across the cluster) and nodetool garbagecollect to remove deleted data.

Now that I have provided an orientation to nodetool, in future posts I will describe how to combine various information, set/get, and management commands to do common tasks such as backups, performance tuning, and upgrades.


Learn more about Pythian services for Cassandra.

Cassandra open-source log analysis in Kibana, using filebeat, modeled in Docker

I was recently asked to set up a solution for Cassandra open-source log analysis to include in an existing Elasticsearch-Logstash-Kibana (ELK) stack. After some research on more of the newer capabilities of the technologies, I realized I could use “beats” in place of the heavier logstash processes for basic monitoring. This basic monitoring would not involve extensive log transformation.

The code to run this demo is available to clone or fork at https://github.com/pythian/cassandra-elk. The only other requirement is Docker (I am using Docker version 18.05.0-ce-rc1) — using Docker for Mac or Docker for Windows will be most convenient.

In a typical production system, you would already have Cassandra running, but all the pieces are included in the Docker stack here so you can start from zero. The model here assumes ELK and a Cassandra cluster are running in your environment, and you need to stream the Cassandra logs into your monitoring system. 

In this setup, the Cassandra logs are being ingested into Elasticsearch and visualized via Kibana. I have included some ways to see data at each step of the workflow in the final section below.

Start the containers:

docker-compose up -d 

(Note: The cassandra-env.sh included with this test environment limits the memory used by the setup via MAX_HEAP_SIZE and HEAP_NEWSIZE, allowing it to be run on a laptop with small memory. This would not be the case in production.)

Set up the test Cassandra cluster:

As the Docker containers are starting up, it can be convenient to see resource utilization via ctop:

Example of ctop resource monitor for Docker containers in open-source log analysis for Cassandra

Set up the filebeat software

Do the following on each Cassandra node.

1. Download the software

You would likely not need to install curl in your environment, but the Docker images used here are bare-bones by design. The apt update statement is also necessary since typically repos are cleared of files after the requested packages are installed via the Dockerfile.

apt update

apt install curl -y

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-amd64.deb

dpkg -i filebeat-6.2.3-amd64.deb

For other operating systems, see: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation.html.

 

2. Configure filebeat

The beats software allows for basic filtering and transformation via this configuration file. Put the below in /etc/filebeat/filebeat.yml.

(This is edited from an example at: https://github.com/thelastpickle/docker-cassandra-bootstrap/blob/master/cassandra/config/filebeat.yml.)

The values in the output.elasticsearch and setup.kibana are their respective IP addresses and port numbers. For filebeat.prospectors — a prospector manages all the log inputs — two types of logs are used here, the system log and the garbage collection log. For each, we will exclude any compressed (.zip) files. The multiline* settings define how multiple lines in the log files are handled. Here, the log manager will find files that start with any of the patterns shown and append the following lines not matching the pattern until it reaches a new match. More options available at: https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html.

output.elasticsearch:

    enabled: true

    hosts: ["172.16.238.31:9200"]

setup.kibana:

    host: "172.16.238.33:5601"

filebeat.prospectors:

    - input_type: log

      paths:

        - "/var/log/cassandra/system.log*"

      document_type: cassandra_system_logs

      exclude_files: ['\.zip$']

      multiline.pattern: '^TRACE|DEBUG|WARN|INFO|ERROR'

      multiline.negate: true

      multiline.match: after

    - input_type: log

      paths:

        - "/var/log/cassandra/debug.log*"

      document_type: cassandra_debug_logs

      exclude_files: ['\.zip$']

      multiline.pattern: '^TRACE|DEBUG|WARN|INFO|ERROR'

      multiline.negate: true

      multiline.match: after

 

3. Set up Kibana dashboards

filebeat setup --dashboards

 

Example output:

Loaded dashboards

 

4. Start the beat

service filebeat start

 

Example output:

2018-04-12T20:43:03.798Z INFO instance/beat.go:468 Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]

2018-04-12T20:43:03.799Z INFO instance/beat.go:475 Beat UUID: 2f43562f-985b-49fc-b229-83535149c52b

2018-04-12T20:43:03.800Z INFO instance/beat.go:213 Setup Beat: filebeat; Version: 6.2.3

2018-04-12T20:43:03.801Z INFO elasticsearch/client.go:145 Elasticsearch url: http://172.16.238.31:9200

2018-04-12T20:43:03.802Z INFO pipeline/module.go:76 Beat name: C1

Config OK

 

View the graphs:

Then view the Kibana graphs in a local browser at: http://localhost:5601.

 

Run some sample load against one of the nodes to get more logs to experiment with:

cassandra-stress write n=20000 -pop seq=1..20000 -rate threads=4

Example output from Cassandra-stress being used to populate test data

Here are some sample queries to run in Kibana:

  • message:WARN*
  • message:(ERROR* OR WARN*)
  • message:(ERROR* OR WARN*) AND beat.hostname:DC1C2

 

You can also filter the display by choosing from the available fields on the left.

Kibana dashboard example display

 

If you would like to see what the logs look at each step of the workflow, view logs within the Cassandra container in /var/log/cassandra like this:

tail /var/log/cassandra/debug.log

Example output:

WARN  [PERIODIC-COMMIT-LOG-SYNCER] 2018-05-07 14:01:09,216 NoSpamLogger.java:94 - Out of 0 commit log syncs over the past 0.00s with average duration of Infinityms, 1 have exceeded the configured commit interval by an average of 80.52ms

 

View this data stored in Elasticsearch (in JSON format) in a browser like this:

http://localhost:9200/_search?q=(message:(ERROR*%20OR%20WARN*)%20AND%20beat.hostname:DC1C2)

Example output:

New Features in the DataStax Node.js Drivers

Version 1.4.0 of the DataStax Enterprise Node.js Driver and version 3.3.0 of the DataStax Node.js Driver for Apache Cassandra are now available.

The main focus of these releases was to add support for speculative query executions. Additionally, we improved the performance of Murmur3 hashing and changed the query preparation logic along with other enhancements.

Speculative query executions

Speculative execution is a way to limit latency at high percentiles by preemptively starting one or more additional executions of the query against different nodes, that way the driver will yield the first response received while discarding the following ones.

Speculative executions are disabled by default. Speculative executions are controlled by an instance of SpeculativeExecutionPolicy provided when initializing the Client. This policy defines the threshold after which a new speculative execution is triggered.

The driver provides a ConstantSpeculativeExecutionPolicy that schedules a given number of speculative executions, separated by a fixed delay, the policy is exported under the {root}.policies.speculativeExecution submodule.

const client = new Client({
  contactPoints,
  policies: {
    speculativeExecution: new ConstantSpeculativeExecutionPolicy(
      200, // delay before a new execution is launched
      2) // maximum amount of additional executions
  }
});

Given the configuration above, an idempotent query would be handled this way:

  • Start the initial execution at t0
  • If no response has been received at t0 + 200 milliseconds, start a speculative execution on another node
  • if no response has been received at t0 + 400 milliseconds, start another speculative execution on a third node

As with the rest of policies in the driver, you can provide your own implementation by extending the SpeculativeExecutionPolicy prototype.

One important aspect to consider is whether queries are idempotent, (that is, whether they can be applied multiple times without changing the result beyond the initial application). If a query is not idempotent, the driver never schedules speculative executions for it, because there is no way to guarantee that only one node will apply the mutation. Examples of operations that are not idempotent are: counter increments/decrements; adding items to a list column; using non-idempotent CQL functions, like now() or uuid().

In the driver, query idempotence is determined by the isIdempotent flag in the QueryOptions, which defaults to false. You can set the default when initializing the Client or you can set it manually for each query, for example:

const query = 'SELECT * FROM users WHERE key = ?';
client.execute(query, [ 'usr1' ], { prepare: true, isIdempotent: true });

Note that enabling speculative executions causes the driver to send more individual requests, so throughput does not necessarily improve. You can read how speculative executions affect retries and other practical details in the documentation.

Improved Murmur3 hashing performance

Apache Cassandra uses Murmur3Partitioner to determine the distribution of the data across cluster partitions. The adapted version of the Murmur3 hashing algorithm used by Cassandra performs several 64-bit integer operations. As there isn't a native int64 representation in ECMAScript, previously we used to Google Closure's Long to support those operations.

To perform int64 add and multiply operations with int32 types requires you to use smaller int16 chunks to handle overflows. Google Closure's Long handles it by creating 4 uint16 chunks of each operand, performing the operations and creating a new int64 value (composed of 2 int32 values), as Long is immutable.

To improve the performance of the partitioner on Node.js, we created a custom type MutableLong that maintains 4 uint16 fields that are used to apply the operation, modifying the internal state, preventing additional allocations per operation.

Query preparation enhancements

Previously, the driver prepared the query only on the first node selected by the load-balancing policy, taking a lazy approach.

In this revision, we added fine tuning options on how the driver has to deal with query preparation, introducing 2 new options:

  • prepareOnAllHosts: That determines whether the driver should prepare the query on all hosts.
  • rePrepareOnUp: That when a node that has been down (unreachable) is considered back up, determines whether we should re-prepare all queries that have been prepared on other nodes.

Both properties are set to true by default. You can change it when creating the Client instance:

const client = new Client({
  contactPoints,
  prepareOnAllHosts: false,
  rePrepareOnUp: false
});

Expose connection pool state

The driver now provides a method to obtain a snapshot of the state of the pool per host. It provides the information of all hosts of the cluster, open connections per host and the amount of queries that are currently being executed (in-flight) through a given host.

You can check out the ClientState API docs for more information.

You can also use the string representation, that provides the information condensed in a readable format useful for debugging or periodic logging in production.

console.log('Pool state: %s', client.getState());

Wrapping up

More detailed information about all the features, improvements and fixes included in this release can be found in the changelogs: DSE driver changelog and Apache Cassandra driver changelog.

New version of the drivers are available on npm:

Your feedback is important to us and it influences what features we prioritize. To provide feedback use the following:

Writing Scala Codecs for the Java Driver

One of the common griefs Scala developers express when using the DataStax Java driver is the overhead incurred in almost every read or write operation, if the data to be stored or retrieved needs conversion from Java to Scala or vice versa.

This could be avoided by using "native" Scala codecs. This has been occasionally solicited from the Java driver team, but such codecs unfortunately do not exist, at least not officially.

Thankfully, the TypeCodec API in the Java driver can be easily extended. For example, several convenience Java codecs are available in the driver's extras package.

In this post, we are going to piggyback on the existing extra codecs and show how developers can create their own codecs – directly in Scala.

Note: all the examples in this post are available in this Github repository.

Dealing with Nullability

It can be tricky to deal with CQL types in Scala because CQL types are all nullable, whereas most typical representations of CQL scalar types in Scala resort to value classes, and these are non-nullable.

As an example, let's see how the Java driver deserializes, say, CQL ints.

The default codec for CQL ints converts such values to java.lang.Integer instances. From a Scala perspective, this has two disadvantages: first, one needs to convert from java.lang.Integer to Int, and second, Integer instances are nullable, while Scala Ints aren't.

Granted, the DataStax Java driver's Row interface has a pair of methods named getInt that deserialize CQL ints into Java ints, converting null values into zeroes.

But for the sake of this demonstration, let's assume that these methods did not exist, and all CQL ints were being converted into java.lang.Integer. Therefore, developers would yearn to have a codec that could deserialize CQL ints into Scala Ints while at the same time addressing the nullability issue.

Let this be the perfect excuse for us to introduce IntCodec, our first Scala codec:


import java.nio.ByteBuffer
import com.datastax.driver.core.exceptions.InvalidTypeException
import com.datastax.driver.core.{DataType, ProtocolVersion, TypeCodec}
import com.google.common.reflect.TypeToken

object IntCodec extends TypeCodec[Int](DataType.cint(), TypeToken.of(classOf[Int]).wrap()) {

  override def serialize(value: Int, protocolVersion: ProtocolVersion): ByteBuffer = 
    ByteBuffer.allocate(4).putInt(0, value)

  override def deserialize(bytes: ByteBuffer, protocolVersion: ProtocolVersion): Int = {
    if (bytes == null || bytes.remaining == 0) return 0
    if (bytes.remaining != 4) throw new InvalidTypeException("Invalid 32-bits integer value, expecting 4 bytes but got " + bytes.remaining)
    bytes.getInt(bytes.position)
  }

  override def format(value: Int): String = value.toString

  override def parse(value: String): Int = {
    try {
      if (value == null || value.isEmpty || value.equalsIgnoreCase("NULL")) 0
      else value.toInt
    }
    catch {
      case e: NumberFormatException =>
        throw new InvalidTypeException( s"""Cannot parse 32-bits integer value from "$value"""", e)
    }
  }

}

All we did so far is extend TypeCodec[Int] by filling in the superclass constructor arguments (more about that later) and implementing the required methods in a very similar way compared to the driver's built-in codec.

Granted, this isn't rocket science, but it will get more interesting later. The good news is, this template is reproducible enough to make it easy for readers to figure out how to create similar codecs for every AnyVal that is mappable to a CQL type (Boolean, Long, Float, Double, etc... let your imagination run wild or just go for the ready-made solution).

(Tip: because of the automatic boxing/unboxing that occurs under the hood, don't use this codec to deserialize simple CQL ints, and prefer instead the driver's built-in one, which will avoid this overhead; but you can use IntCodec to compose more complex codecs, as we will see below – the more complex the CQL type, the more negligible the overhead becomes.)

Let's see how this piece of code solves our initial problems: as for the burden of converting between Scala and Java, Int values are now written directly with ByteBuffer.putInt, and read directly from ByteBuffer.getInt; as for the nullability of CQL ints, the issue is addressed just as the driver does: nulls are converted to zeroes.

Converting nulls into zeroes might not be satisfying for everyone, but how to improve the situation? The general Scala solution for dealing with nullable integers is to map them to Option[Int]. DataStax Spark Connector for Apache Cassandra®'s CassandraRow class has exactly one such method:

def getIntOption(index: Int): Option[Int] = ...

Under the hood, it reads a java.lang.Integer from the Java driver's Row class, and converts the value to either None if it's null, or to Some(value), if it isn't.

Let's try to achieve the same behavior, but using the composite pattern: we first need a codec that converts from any CQL value into a Scala Option. There is no such built-in codec in the Java driver, but now that we are codec experts, let's roll our own OptionCodec:


class OptionCodec[T](
    cqlType: DataType,
    javaType: TypeToken[Option[T]],
    innerCodec: TypeCodec[T])
  extends TypeCodec[Option[T]](cqlType, javaType)
    with VersionAgnostic[Option[T]] {

  def this(innerCodec: TypeCodec[T]) {
    this(innerCodec.getCqlType, TypeTokens.optionOf(innerCodec.getJavaType), innerCodec)
  }

  override def serialize(value: Option[T], protocolVersion: ProtocolVersion): ByteBuffer =
    if (value.isEmpty) OptionCodec.empty.duplicate else innerCodec.serialize(value.get, protocolVersion)

  override def deserialize(bytes: ByteBuffer, protocolVersion: ProtocolVersion): Option[T] =
    if (bytes == null || bytes.remaining() == 0) None else Option(innerCodec.deserialize(bytes, protocolVersion))

  override def format(value: Option[T]): String =
    if (value.isEmpty) "NULL" else innerCodec.format(value.get)

  override def parse(value: String): Option[T] =
    if (value == null || value.isEmpty || value.equalsIgnoreCase("NULL")) None else Option(innerCodec.parse(value))

}

object OptionCodec {

  private val empty = ByteBuffer.allocate(0)

  def apply[T](innerCodec: TypeCodec[T]): OptionCodec[T] =
    new OptionCodec[T](innerCodec)

  import scala.reflect.runtime.universe._

  def apply[T](implicit innerTag: TypeTag[T]): OptionCodec[T] = {
    val innerCodec = TypeConversions.toCodec(innerTag.tpe).asInstanceOf[TypeCodec[T]]
    apply(innerCodec)
  }

}

And voilà! As you can see, the class body is very simple (its companion object is not very exciting at this point either, but we will see later how it could do more than just mirror the class constructor). Its main purpose when deserializing/parsing is to detect CQL nulls and return None right away, without even having to interrogate the inner codec, and when serializing/formatting, intercept None so that it can be immediately converted back to an empty ByteBuffer (the native protocol's representation of null).

We can now combine our two codecs together, IntCodec and OptionCodec, and compose a TypeCodec[Option[Int]]:

import com.datastax.driver.core._
val codec: TypeCodec[Option[Int]] = OptionCodec(IntCodec)
assert(codec.deserialize(ByteBuffer.allocate(0), ProtocolVersion.V4).isEmpty)
assert(codec.deserialize(ByteBuffer.allocate(4), ProtocolVersion.V4).isDefined)

The problem with TypeTokens

Let's sum up what we've got so far: a TypeCodec[Option[Int]] that is the perfect match for CQL ints. But how to use it?

There is nothing really particular with this codec and it is perfectly compatible with the Java driver. You can use it explicitly, which is probably the simplest way:

import com.datastax.driver.core._
val codec: TypeCodec[Option[Int]] = OptionCodec(IntCodec)
val row: Row = ??? // some CQL query containing an int column
val v: Option[Int] = row.get(0, codec)

But your application is certainly more complex than that, and you would like to register your codec beforehand so that it gets transparently used afterwards:

import com.datastax.driver.core._
// first
val codec: TypeCodec[Option[Int]] = OptionCodec(IntCodec)
cluster.getConfiguration.getCodecRegistry.register(codec)

// then
val row: Row = ??? // some CQL query containing an int column
val v: Option[Int] = row.get(0, ???) // How to get a TypeToken[Option[Int]]?

Well, before we can actually do that, we first need to solve one problem: the Row.get method comes in a few overloaded flavors, and the most flavory ones accept a TypeToken argument; let's learn how to use them in Scala.

The Java Driver API, for historical reasons — but also, let's be honest, due to the lack of alternatives – makes extensive usage of Guava's TypeToken API (if you are not familiar with the type token pattern you might want to stop and read about it first).

Scala has its own interpretation of the same reflective pattern, named type tags. Both APIs pursue identical goals – to convey compile-time type information to the runtime – through very different roads. Unfortunately, it's all but an easy path to travel from one to the other, simply because there is no easy bridge between java.lang.Type and Scala's Type.

Hopefully, all is not lost. As a matter of fact, creating a full-fledged conversion service between both APIs is not a pre-requisite: it turns out that Guava's TypeToken works pretty well in Scala, and most classes get resolved just fine. TypeTokens in Scala are just a bit cumbersome to use, and quite error-prone when instantiated, but that's something that a helper object can facilitate.

We are not going to dive any deeper in the troubled waters of Scala reflection (well, at least not until the last chapter of this tutorial). It suffices to assume that the helper object we mentioned above really exists, and that it does the job of creating TypeToken instances while at the same time sparing the developer the boiler-plate code that this operation usually incurs.

Now we can resume our example and complete our code that reads a CQL int into a Scala Option[Int], in the most transparent way:

import com.datastax.driver.core._
val tt = TypeTokens.optionOf(TypeTokens.int) // creates a TypeToken[Option[Int]]
val row: Row = ??? // some CQL query containing an int column
val v: Option[Int] = row.get(0, tt) 

Dealing with Collections

Another common friction point between Scala and the Java driver is the handling of CQL collections.

Of course, the driver has built-in support for CQL collections; but obviously, these map to typical Java collection types: CQL list maps to java.util.List (implemented by java.util.ArrayList), CQL set to java.util.Set (implemented by java.util.LinkedHashSet) and CQL map to java.util.Map (implemented by java.util.HashMap).

This leaves Scala developers with two inglorious options:

  1. Use the implicit JavaConverters object and deal with – gasp! – mutable collections in their code;
  2. Deal with custom Java-to-Scala conversion in their code, and face the consequences of conversion overhead (this is the choice made by the already-mentioned Spark Connector for Apache Cassandra®, because it has a very rich set of converters available).

All of this could be avoided if CQL collection types were directly deserialized into Scala immutable collections.

Meet SeqCodec, our third Scala codec in this tutorial:


import java.nio.ByteBuffer
import com.datastax.driver.core.CodecUtils.{readSize, readValue}
import com.datastax.driver.core._
import com.datastax.driver.core.exceptions.InvalidTypeException

class SeqCodec[E](eltCodec: TypeCodec[E])
  extends TypeCodec[Seq[E]](
    DataType.list(eltCodec.getCqlType),
    TypeTokens.seqOf(eltCodec.getJavaType))
    with ImplicitVersion[Seq[E]] {

  override def serialize(value: Seq[E], protocolVersion: ProtocolVersion): ByteBuffer = {
    if (value == null) return null
    val bbs: Seq[ByteBuffer] = for (elt <- value) yield {
      if (elt == null) throw new NullPointerException("List elements cannot be null")
      eltCodec.serialize(elt, protocolVersion)
    }
    CodecUtils.pack(bbs.toArray, value.size, protocolVersion)
  }

  override def deserialize(bytes: ByteBuffer, protocolVersion: ProtocolVersion): Seq[E] = {
    if (bytes == null || bytes.remaining == 0) return Seq.empty[E]
    val input: ByteBuffer = bytes.duplicate
    val size: Int = readSize(input, protocolVersion)
    for (_ <- 1 to size) yield eltCodec.deserialize(readValue(input, protocolVersion), protocolVersion)
  }

  override def format(value: Seq[E]): String = {
    if (value == null) "NULL" else '[' + value.map(e => eltCodec.format(e)).mkString(",") + ']'
  }

  override def parse(value: String): Seq[E] = {
    if (value == null || value.isEmpty || value.equalsIgnoreCase("NULL")) return Seq.empty[E]
    var idx: Int = ParseUtils.skipSpaces(value, 0)
    if (value.charAt(idx) != '[') throw new InvalidTypeException( s"""Cannot parse list value from "$value", at character $idx expecting '[' but got '${value.charAt(idx)}'""")
    idx = ParseUtils.skipSpaces(value, idx + 1)
    val seq = Seq.newBuilder[E]
    if (value.charAt(idx) == ']') return seq.result
    while (idx < value.length) {
      val n = ParseUtils.skipCQLValue(value, idx)
      seq += eltCodec.parse(value.substring(idx, n))
      idx = n
      idx = ParseUtils.skipSpaces(value, idx)
      if (value.charAt(idx) == ']') return seq.result
      if (value.charAt(idx) != ',') throw new InvalidTypeException( s"""Cannot parse list value from "$value", at character $idx expecting ',' but got '${value.charAt(idx)}'""")
      idx = ParseUtils.skipSpaces(value, idx + 1)
    }
    throw new InvalidTypeException( s"""Malformed list value "$value", missing closing ']'""")
  }

  override def accepts(value: AnyRef): Boolean = value match {
    case seq: Seq[_] => if (seq.isEmpty) true else eltCodec.accepts(seq.head)
    case _ => false
  }

}

object SeqCodec {

  def apply[E](eltCodec: TypeCodec[E]): SeqCodec[E] = new SeqCodec[E](eltCodec)

}

(Of course, we are talking here about scala.collection.immutable.Seq.)

The code above is still vaguely ressemblant to the equivalent Java code, and not very interesting per se; the parse method in particular is not exactly a feast for the eyes, but there's little we can do about it.

In spite of its modest body, this codec allows us to compose a more interesting TypeCodec[Seq[Option[Int]]] that can convert a CQL list<int> directly into a scala.collection.immutable.Seq[Option[Int]]:

import com.datastax.driver.core._
type Seq[+A] = scala.collection.immutable.Seq[A]
val codec: TypeCodec[Seq[Int]] = SeqCodec(OptionCodec(IntCodec))
val l = List(Some(1), None)
assert(codec.deserialize(codec.serialize(l, ProtocolVersion.V4), ProtocolVersion.V4) == l)

Some remarks about this codec:

  1. This codec is just for the immutable Seq type. It could be generalized into an AbstractSeqCodec in order to accept other mutable or immutable sequences. If you want to know how it would look, the answer is here.
  2. Ideally, TypeCodec[T] should have been made covariant in T, the type handled by the codec (i.e. TypeCodec[+T]); unfortunately, this is not possible in Java, so TypeCodec[T] is in practice invariant in T. This is a bit frustrating for Scala implementors, as they need to choose the best upper bound for T, and stick to it for both input and output operations, just like we did above.
  3. Similar codecs can be created to map CQL sets to Sets and CQL maps to Maps; again, we leave this as an exercise to the user (and again, it is possible to cheat).

Dealing with Tuples

Scala tuples are an appealing target for CQL tuples.

The Java driver does have a built-in codec for CQL tuples; but it translates them into TupleValue instances, which are unfortunately of little help for creating Scala tuples.

Luckily enough, TupleCodec inherits from AbstractTupleCodec, a class that has been designed exactly with that purpose in mind: to be extended by developers wanting to map CQL tuples to more meaningful types than TupleValue.

As a matter of fact, it is extremely simple to craft a codec for Tuple2 by extending AbstractTupleCodec:


class Tuple2Codec[T1, T2](
    cqlType: TupleType, javaType: TypeToken[(T1, T2)],
    eltCodecs: (TypeCodec[T1], TypeCodec[T2]))
  extends AbstractTupleCodec[(T1, T2)](cqlType, javaType)
    with ImplicitVersion[(T1, T2)] {

  def this(eltCodec1: TypeCodec[T1], eltCodec2: TypeCodec[T2])(implicit protocolVersion: ProtocolVersion, codecRegistry: CodecRegistry) {
    this(
      TupleType.of(protocolVersion, codecRegistry, eltCodec1.getCqlType, eltCodec2.getCqlType),
      TypeTokens.tuple2Of(eltCodec1.getJavaType, eltCodec2.getJavaType),
      (eltCodec1, eltCodec2)
    )
  }

  {
    val componentTypes = cqlType.getComponentTypes
    require(componentTypes.size() == 2, s"Expecting TupleType with 2 components, got ${componentTypes.size()}")
    require(eltCodecs._1.accepts(componentTypes.get(0)), s"Codec for component 1 does not accept component type: ${componentTypes.get(0)}")
    require(eltCodecs._2.accepts(componentTypes.get(1)), s"Codec for component 2 does not accept component type: ${componentTypes.get(1)}")
  }

  override protected def newInstance(): (T1, T2) = null

  override protected def serializeField(source: (T1, T2), index: Int, protocolVersion: ProtocolVersion): ByteBuffer = index match {
    case 0 => eltCodecs._1.serialize(source._1, protocolVersion)
    case 1 => eltCodecs._2.serialize(source._2, protocolVersion)
  }

  override protected def deserializeAndSetField(input: ByteBuffer, target: (T1, T2), index: Int, protocolVersion: ProtocolVersion): (T1, T2) = index match {
    case 0 => Tuple2(eltCodecs._1.deserialize(input, protocolVersion), null.asInstanceOf[T2])
    case 1 => target.copy(_2 = eltCodecs._2.deserialize(input, protocolVersion))
  }

  override protected def formatField(source: (T1, T2), index: Int): String = index match {
    case 0 => eltCodecs._1.format(source._1)
    case 1 => eltCodecs._2.format(source._2)
  }

  override protected def parseAndSetField(input: String, target: (T1, T2), index: Int): (T1, T2) = index match {
    case 0 => Tuple2(eltCodecs._1.parse(input), null.asInstanceOf[T2])
    case 1 => target.copy(_2 = eltCodecs._2.parse(input))
  }

}

object Tuple2Codec {

  def apply[T1, T2](eltCodec1: TypeCodec[T1], eltCodec2: TypeCodec[T2]): Tuple2Codec[T1, T2] =
    new Tuple2Codec[T1, T2](eltCodec1, eltCodec2)

}

A very similar codec for Tuple3 can be found here. Extending this principle to Tuple4, Tuple5, etc. is straightforward and left for the reader as an exercise.

Going incognito with implicits

The careful reader noticed that Tuple2Codec's constructor takes two implicit arguments: CodecRegistry and ProtocolVersion. They are omnipresent in the TypeCodec API and hence, good candidates for implicit arguments – and besides, both have nice default values. To make the code above compile, simply put in your scope something along the lines of:

object Implicits {

  implicit val protocolVersion = ProtocolVersion.NEWEST_SUPPORTED
  implicit val codecRegistry = CodecRegistry.DEFAULT_INSTANCE

}

Speaking of implicits, let's now see how we can simplify our codecs by adding a pinch of those. Let's take a look at our first trait in this tutorial:

trait VersionAgnostic[T] {  this: TypeCodec[T] =>

  def serialize(value: T)(implicit protocolVersion: ProtocolVersion, marker: ClassTag[T]): ByteBuffer = 
    this.serialize(value, protocolVersion)

  def deserialize(bytes: ByteBuffer)(implicit protocolVersion: ProtocolVersion, marker: ClassTag[T]): T = 
    this.deserialize(bytes, protocolVersion)

}

This trait basically creates two overloaded methods, serialize and deserialize, which will infer the appropriate protocol version to use and forward the call to the relevant method (the marker argument is just the usual trick to work around erasure).

We can now mix-in this trait with an existing codec, and then avoid passing the protocol version to every call to serialize or deserialize:

import Implicits._
val codec = new SeqCodec(IntCodec) with VersionAgnostic[Seq[Int]]
codec.serialize(List(1,2,3))

We can now go even further and simplify the way codecs are composed together to create complex codecs. What if, instead of writing SeqCodec(OptionCodec(IntCodec)), we could simply write SeqCodec[Option[Int]]? To achieve that, let's enhance the companion object of SeqCodec with a more sophisticated apply method:

object SeqCodec {

  def apply[E](eltCodec: TypeCodec[E]): SeqCodec[E] = new SeqCodec[E](eltCodec)

  import scala.reflect.runtime.universe._

  def apply[E](implicit eltTag: TypeTag[E]): SeqCodec[E] = {
    val eltCodec = ??? // implicit TypeTag -> TypeCodec conversion
    apply(eltCodec)
  }

}

The second apply method guesses the element type by using implicit TypeTag instances (these are created by the Scala compiler, so you don't need to worry about instantiating them), then locates the appropriate codec for it. We can now write:

val codec = SeqCodec[Option[Int]]

Elegant, huh? Of course, we need some magic to locate the right codec given a TypeTag instance. Here we need to introduce another helper object, TypeConversions. Its method toCodec takes a Scala type and, with the help of some pattern matching, locates the most appropriate codec. We refer the interested reader to TypeConversions code for more details.

With the help of TypeConversions, we can now complete our new apply method:

def apply[E](implicit eltTag: TypeTag[E]): SeqCodec[E] = {
  val eltCodec = TypeConversions.toCodec[E](eltTag.tpe)
  apply(eltCodec)
}

Note: similar apply methods can be added to other codec companion objects as well.

It's now time to go really wild, bearing in mind that the following features should only be used with caution by expert users.

If only we could convert Scala's TypeTag instances into Guava's TypeToken ones, and then make them implicit like we did above, we would be able to completely abstract away these annoying types and write very concise code, such as:

val statement: BoundStatement = ???
statement.set(0, List(1,2,3)) // implicit TypeTag -> TypeToken conversion

val row: Row = ???
val list: Seq[Int] = row.get(0) // implicit TypeTag -> TypeToken conversion

Well, this can be achieved in a few different ways; we are going to explore here the so-called Type Class pattern.

The first step is be to create implicit classes containing "get" and "set" methods that take TypeTag instances instead of TypeToken ones; we'll name them getImplicitly and setImplicitly to avoid name clashes. Let's do it for Row and BoundStatement:


implicit class RowOps(val self: Row) {

  def getImplicitly[T](i: Int)(implicit typeTag: TypeTag[T]): T = 
    self.get(i, ???) // implicit TypeTag -> TypeToken conversion

  def getImplicitly[T](name: String)(implicit typeTag: TypeTag[T]): T =
    self.get(name, ???) // implicit TypeTag -> TypeToken conversion

}

implicit class BoundStatementOps(val self: BoundStatement) {

  def setImplicitly[T](i: Int, value: T)(implicit typeTag: TypeTag[T]): BoundStatement =
    self.set(i, value, ???) // implicit TypeTag -> TypeToken conversion

  def setImplicitly[T](name: String, value: T)(implicit typeTag: TypeTag[T]): BoundStatement = 
    self.set(name, value, ???) // implicit TypeTag -> TypeToken conversion
  }

}

Remember what we stated at the beginning of this tutorial: "there is no easy bridge between Java types and Scala types"? Well, we will have to lay one now to cross that river.

Our helper object TypeConversions has another method, toJavaType, that does just that. Again, digging into its details is out of the scope of this tutorial, but with this method we can complete our implicit classes as below:

def getImplicitly[T](i: Int)(implicit typeTag: TypeTag[T]): T = 
  val javaType: java.lang.reflect.Type = TypeConversions.toJavaType(typeTag.tpe)
  self.get(i, TypeToken.of(javaType).wrap().asInstanceOf[TypeToken[T]])

And we are done!

Now, by simply placing the above implicit classes into scope, we will be able to write code as concise as:

statement.setImplicitly(0, List(1,2,3)) // implicitly converted to statement.setImplicitly(0, List(1,2,3)) (TypeTag[Seq[Int]]), then
                                        // implicitly converted to statement.set          (0, List(1,2,3), TypeToken[Seq[Int]])

When retrieving values, it's a bit more complicated because the Scala compiler needs some help from the developer to be able to fill in the appropriate implicit TypeTag instance; we do so like this:

val list = row.getImplicitly[Seq[Int]](0) // implicitly converted to statement.getImplicitly(0) (TypeTag[Seq[Int]]), then
                                          // implicitly converted to statement.get          (0,  TypeToken[Seq[Int]])

That's it. We hope that with this tutorial, we could demonstrate how easy it is to create codecs for the Java driver that are first-class citizens in Scala. Enjoy!

Bring Your Own Spark

Bring Your Own Spark (BYOS) is a feature of DSE Analytics designed to connect from external Apache Spark™ systems to DataStax Enterprise with minimal configuration efforts. In this post we introduce how to configure BYOS and show some common use cases.

BYOS extends the DataStax Spark Cassandra Connector with DSE security features such as Kerberos and SSL authentication. It also includes drivers to access the DSE Cassandra File System (CFS) and DSE File System (DSEFS) in 5.1.

There are three parts of the deployment:

  • <dse_home>clients/dse-byos_2.10-5.0.6.jar is a fat jar. It includes everything you need to connect the DSE cluster: Spark Cassandra Connector with dependencies, DSE security connection implementation, and CFS driver.
  • 'dse client-tool configuration byos-export' tool help to configure external Spark cluster to connect to the DSE
  • 'dse client-tool spark sql-schema' tool generates SparkSQL-compatible scripts to create external tables for all or part of DSE tables in SparkSQL metastore.

HDP 2.3+ and CDH 5.3+ are the only Hadoop distributions which support Java 8 officially and which have been tested with BYOS in DSE 5.0 and 5.1.

Quick Start Guide

Pre-requisites:

There is installed and configured a Hadoop or standalone Spark system and you have access to at least one host on the cluster with a preconfigured Spark client. Let’s call it spark-host. The Spark installation should be pointed to by $SPARK_HOME.

There is installed and configured a DSE cluster and you have access to it. Let’s call it dse-host. I will assume you have a cassandra_keyspace.exampletable C* table created on it.The DSE is located at $DSE_HOME.

DSE supports Java 8 only. Make sure your Hadoop, Yarn and Spark use Java 8. See your Hadoop distro documentation on how to upgrade Java version (CDH, HDP).

Prepare the configuration file

On dse-host run:

$DSE_HOME/bin/dse client-tool configuration byos-export byos.conf

It will store DSE client connection configuration in Spark-compatible format into byos.conf.

Note: if SSL or password authentication is enabled, additional parameters needed to be stored. See dse client-tool documentation for details.

Copy the byos.conf to spark-host.

On spark-host append the ~/byos.conf file to the Spark default configuration

cat byos.conf >> $SPARK_HOME/conf/conf/spark-defaults.conf

Note: If you expect conflicts with spark-defaults.conf, the byos-export tool can merge properties itself; refer to the documentation for details.

Prepare C* to SparkSQL mapping (optional)

On dse-host run:

dse client-tool spark sql-schema -all > cassandra_maping.sql

That will create cassandra_maping.sql with spark-sql compatible create table statements.

Copy the file to spark-host.

Run Spark

Copy $DSE_HOME/dse/clients/dse-byos-5.0.0-all.jar to the spark-host

Run Spark with the jar.

$SPARK_HOME/bin/spark-shell --jars dse-byos-5.0.0-all.jar
scala> import com.datastax.spark.connector._
scala> sc.cassandraTable(“cassandra_keyspace”, "exampletable" ).collect

Note: External Spark can not connect to DSE Spark master and submit jobs. Thus you can not point it to DSE Spark master.

SparkSQL

BYOS does not support the legacy Cassandra-to-Hive table mapping format. The spark data frame external table format should be used for mapping: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

DSE provides a tool to auto generate the mapping for external spark metastore: dse client-tool spark sql-schema

On the dse-host run:

dse client-tool spark sql-schema -all > cassandra_maping.sql

That will create cassandra_maping.sql with spark-sql compatible create table statements

Copy the file to spark-host

Create C* tables mapping in spark meta-store

$SPARK_HOME/bin/spark-sql--jars dse-byos-5.0.0-all.jar -f cassandra_maping.sql

Tables are now ready to use in both SparkSQL and Spark shell.

$SPARK_HOME/bin/spark-sql --jars dse-byos-5.0.0-all.jar
spark-sql> select * from cassandra_keyspace.exampletable
$SPARK_HOME/bin/spark-shell —jars dse-byos-5.0.0-all.jar
scala>sqlConext.sql(“select * from cassandra_keyspace.exampletable");

Access external HDFS from dse spark

DSE is built with Hadoop 2.7.1 libraries. So it is able to access any Hadoop 2.x HDFS file system.

To get access you need just proved full path to the file in Spark commands:

scala> sc.textFile("hdfs://<namenode_host>/<path to the file>")

To get a namenode host you can run the following command on the Hadoop cluster:

hdfs getconf -namenodes

If the Hadoop cluster has custom configuration or enabled kerberos security, the configuration should be copied into the DSE Hadoop config directory:

cp /etc/hadoop/conf/hdfs-site.xml $DSE_HOME/resources/hadoop2-client/conf/hdfs-site.xml

Make sure that firewall does not block the following HDFS data node and name node ports:

NameNode metadata service 8020/9000
DataNode 50010,50020

 

Security configuration

SSL

Start with truststore generation with DSE nodes certificates. If client certificate authentication is enabled (require_client_auth=true), client keystore will be needed.

More info on certificate generation:

https://docs.datastax.com/en/cassandra/2.1/cassandra/security/secureSSLCertificates_t.html

Copy both file to each Spark node on the same location. The Spark '--files' parameter can be used for the coping in Yarn cluster.

Use byos-export parameters to add store locations, type and passwords into byos.conf.

dse client-tool configuration byos-export --set-truststore-path .truststore --set-truststore-password 
password --set-keystore-path .keystore --set-keystore-password password byos.conf

Yarn example:

spark-shell --jars byos.jar --properties-file byos.conf --files .truststore,.keystore

Kerberos

Make sure your Spark client host (where spark driver will be running) has kerberos configured and C* nodes DNS entries are configured properly. See more details in the Spark Kerberos documentation.

If the Spark cluster mode deployment will be used or no Kerberos configured on the spark client host use "Token based authentication" to access Kerberized DSE cluster.

byos.conf file will contains all necessary Kerberos principal and service names exported from the DSE.

The JAAS configuration file with the following options need to be copied from DSE node or created manually on the Spark client node only and stored at $HOME/.java.login.config file.

DseClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useTicketCache=true
       renewTGT=true;
};

Note: If a custom file location is used, Spark driver property need to be set pointing to the location of the file.

--conf 'spark.driver.extraJavaOptions=-Djava.security.auth.login.config=login_config_file'

BYOS authenticated by Kerberos and request C* token for executors authentication. The token authentication should be enabled in DSE. the spark driver will automatically cancel the token on exit

Note: the CFS root should be passed to the Spark to request token with:

--conf spark.yarn.access.namenodes=cfs://dse_host/

Spark Thrift Server with Kerberos

It is possible to authenticate services with keytab. Hadoop/YARN services already preconfigured with keytab files and kerberos useк if kerberos was enabled in the hadoop. So you need to grand permissions to these users. Here is example for hive user

cqlsh> create role 'hive/hdp0.dc.datastax.com@DC.DATASTAX.COM' with LOGIN = true;

Now you can login as a hive kerberos user, merge configs and start Spark thrift server. It will be able to query DSE data:

#> kinit -kt /etc/security/keytabs/hive.service.keytab \ hive/hdp0.dc.datastax.com@DC.DATASTAX.COM
#> cat /etc/spark/conf/spark-thrift-sparkconf.conf byos.conf > byos-thrift.conf
#> start-thriftserver.sh --properties-file byos-thrift.conf --jars dse-byos*.jar

Connect to it with beeline for testing:

#> kinit
#> beeline -u 'jdbc:hive2://hdp0:10015/default;principal=hive/_HOST@DC.DATASTAX.COM'

Token based authentication

Note: This approach is less secure than Kerberos one, use it only in case kerberos is not enabled on your spark cluster.

DSE clients use hadoop like token based authentication when Kerberos is enabled in DSE server.

The Spark driver authenticates to DSE server with Kerberos credentials, requests a special token, send the token to the executors. Executors authenticates to DSE server with the token. So no kerberos libraries needed on executors node.

If the Spark driver node has no Kerberos configured or spark application should be run in cluster mode. The token could be requested during configuration file generation with --generate-token parameters.

$DSE_HOME/bin/dse client-tool configuration byos-export --generate-token byos.conf

Following property will be added to the byos.conf:

spark.hadoop.cassandra.auth.token=NwAJY2Fzc2FuZHJhCWNhc3NhbmRyYQljYXNzYW5kcmGKAVPlcaJsigFUCX4mbIQ7YU_yjEJgRUwQNIzpkl7yQ4inoxtZtLDHQBpDQVNTQU5EUkFfREVMRUdBVElPTl9UT0tFTgA

It is important to manually cancel it after task is finished to prevent re usage attack.

dse client-tool cassandra cancel-token NwAJY2Fzc2FuZHJhCWNhc3NhbmRyYQljYXNzYW5kcmGKAVPlcaJsigFUCX4mbIQ7YU_yjEJgRUwQNIzpkl7yQ4inoxtZtLDHQBpDQVNTQU5EUkFfREVMRUdBVElPTl9UT0tFTgA

Instead of Conclusion

Open Source Spark Cassandra Connector and Bring Your Own Spark feature comparison:

Feature OSS DSE BYOS
DataStax Official Support No Yes
Spark SQL Source Tables / Cassandra DataFrames Yes Yes
CassandraDD batch and streaming Yes Yes
C* to Spark SQL table mapping generator No Yes
Spark Configuration Generator No Yes
Cassandra File System Access No Yes
SSL Encryption Yes Yes
User/password authentication Yes Yes
Kerberos authentication No Yes

 

DSE Advanced Replication in DSE 5.1

DSE Advanced Replication feature in DataStax Enterprise underwent a major refactoring between DSE 5.0 (“V1”) and DSE 5.1 (“V2”), radically overhauling its design and performance characteristics.

DSE Advanced Replication builds on the multi-datacenter support in Apache Cassandra® to facilitate scenarios where selective or "hub and spoke" replication is required. DSE Advanced Replication is specifically designed to tolerate sporadic connectivity that can occur in constrained environments, such as retail, oil-and-gas remote sites and cruise ships.

This blog post provides a broad overview of the main performance improvements and  drills down into how we support CDC ingestion and deduplication to ensure efficient transmission of mutations.

Note: This blog post was written targeting DSE 5.1. Please refer to the DataStax documentation for your specific version of DSE if different.

Overview

Discussion of performance enhancements is split into three broad stages:

  1. Ingestion: Capturing the Cassandra mutations for an Advance Replication enabled table
  2. Queueing: Sorting and storing the ingested mutations in an appropriate message queue
  3. Replication: Replicating the ingested mutation to the desired destination(s).

Ingestion

In Advanced Replication v1 (included in DSE 5.0); capturing mutations for an Advanced Replication enabled table used Cassandra triggers. Inside the trigger we unbundled the mutation and extract the various partition updates and key fields for the mutation. By using the trigger in the ingestion transaction, we provided backpressure to ingestion and reduced throughput latency, as the mutations were processed in the ingestion cycle.

In Advanced Replication v2 (included in DSE 5.1), we replaced triggers with the Cassandra Change Data Capture (CDC) feature added in Cassandra version 3.8. CDC is an optional mechanism for extracting mutations from specific tables from the commitlog. This mutation extraction occurs outside the Ingestion transaction, so it adds negligible direct overhead to the ingestion cycle latency.

Post-processing the CDC logs requires CPU and memory. This process competes with DSE for resources, so decoupling of ingestion into DSE and ingestion into Advanced Replication allows us to support bursting for mutation ingestion.

The trigger in v1 was previously run on a single node in the source cluster. CDC is run on every node in the source cluster, which means that there are replication factor (RF) number of copies of each mutation. This change creates the need for deduplication which we’ll explain later on.

Queuing

In Advanced Replication v1, we stored the mutations in a blob of data within a vanilla DSE table, relying on DSE to manage the replication of the queue and maintain the data integrity. The issue was that this insertion was done within the ingestion cycle with a negative impact on ingestion latency, at a minimum doubling the ingestion time. This could increase the latency enough to create a query timeout, causing an exception for the whole Cassandra query.

In Advanced Replication v2 we offloaded the queue outside of DSE and used local files. So for each mutation, we have RF copies of it that mutation - due to capturing the mutations at the replica level via CDC versus at the coordinator level via triggers in v1 – on the same nodes as the mutation is stored for Cassandra. This change ensures data integrity and redundancy and provides RF copies of the mutation.

We have solved this CDC deduplication problem based on an intimate understanding of token ranges, gossip, and mutation structures to ensure that, on average, each mutation is only replicated once.The goal is to replicate all mutations at least once, and to try to minimize replicating a given mutation multiple times. This solution will be described later.

Replication

Previously in Advanced Replication v1, replication could be configured only to a single destination. This replication stream was fine for a use case which was a net of source clusters storing data and forwarding to a central hub destination, essentially 'edge-to-hub.'

In Advanced Replication v2 we added support for multiple destinations, where data could be replicated to multiple destinations for distribution or redundancy purposes. As part of this we added the ability to prioritize which destinations and channels (pairs of source table to destination table) are replicated first, and  configure whether channel replication is LIFO or FIFO to ensure newest or oldest data is replicated first.

CDC Deduplication and its integration into the Message Queue to support replication

With the new implementation of the v2 mutation Queue, we have the situation where we have each mutation stored in Replication Factor number of queues, and the mutations on each Node are interleaved depending on which subset of token ranges are stored on that node.

There is no guarantee that the mutations are received on each node in the same order.

With the Advanced Replication v1 trigger implementation there was a single consolidated queue which made it significantly easier to replicate each mutation only once.

Deduplication

In order to minimize the number of times we process each mutation, we triage the mutations that extract from the CDC log in the following way:

  1. Separate the mutations into their distinct tables.
  2. Separate them into their distinct token ranges.
  3. Collect the mutations in time sliced buckets according to their mutation timestamp (which is the same for that mutation across all the replica nodes.)

Distinct Tables

Separating them into their distinct table represents the directory structure:

token Range configuration

Assume a three node cluster with a replication factor of 3.

For the sake of simplicity, this is the token-range structure on the nodes:

Primary, Secondary and Tertiary are an arbitrary but consistent way to prioritize the token Ranges on the node – and are based on the token Configuration of the keyspace – as we know that Cassandra has no concept of a primary, secondary or tertiary node.

However, it allows us to illustrate that we have three token ranges that we are dealing with in this example. If we have Virtual-Nodes, then naturally there will be more token-ranges, and a node can be ‘primary’ for multiple ranges.

Time slice separation

Assume the following example CDC files for a given table:

As we can see the mutation timestamps are NOT always received in order (look at the id numbers), but in this example we contain the same set of mutations.

In this case, all three nodes share the same token ranges, but if we had a 5 node cluster with a replication factor of 3, then the token range configuration would look like this, and the mutations on each node would differ:

Time slice buckets

As we process the mutations from the CDC file, we store them in time slice buckets of one minute’s worth of data. We also keep a stack of 5 time slices in memory at a time, which means that we can handle data up to 5 minutes out of order. Any data which is processed more than 5 minutes out of order would be put into the out of sequence file and treated as exceptional data which will be need to be replicated from all replica nodes.

Example CDC Time Window Ingestion

  • In this example, assume that there are 2 time slices of 30 seconds
  • Deltas which are positive are ascending in time so are acceptable.
  • Id’s 5, 11 and 19 jump backwards in time.
  • As the sliding time window is 30 seconds, Id’s 5, 12 & 19 would be processed, whilst ID 11 is a jump back of 45 seconds so would not be processed into the correct Time Slice but placed in the Out Of Sequence files.

Comparing Time slices

So we have a time slice of mutations on different replica nodes, they should be identical, but there is no guarantee that they are in the same order. But we need to be able to compare the time slices and treat them as identical regardless of order. So we take the CRC of each mutation, and when we have sealed (rotated it out of memory because the current mutation that we are ingesting is 5 minutes later than this time slice) the time slice , we sort the CRCs and take a CRC of all of the mutation CRCs.
That [TimeSlice] CRC is comparable between time slices to ensure they are identical.

The CRCs for each time slice are communicated between nodes in the cluster via the Cassandra table.

Transmission of mutations

In the ideal situation, identical time slices and all three nodes are active – so each node is happily ticking away only transmitting its primary token range segment files.

However, we need to deal with robustness and assume that nodes fail, time slices do not match and we still have the requirement that ALL data is replicated.

We use gossip to monitor which nodes are active and not, and then if a node fails – the ‘secondary’ become active for that nodes ‘primary’ token range.

Time slice CRC processing

If a CRC matches for a time slice between 2 node – then when that time slice is fully transmitted (for a given destination), then the corresponding time slice (with the matching crc) can be marked as sent (synchdeleted.)

If the CRC mismatches, and there is no higher priority active node with a matching CRC, then that time slice is to be transmitted – this is to ensure that no data is missed and everything is fully transmitted.

Active Node Monitoring Algorithm

Assume that the token ranges are (a,b], (b,c], (c,a], and the entire range of tokens is [a,c], we have three nodes (n1, n2 and n3) and replication factor 3.

    • On startup the token ranges for the keyspace are determined - we actively listen for token range changes and adjust the schema appropriately.
    • These are remapped so we have the following informations:
      • node => [{primary ranges}, {secondary ranges}, {tertiary ranges}]
      • Note: We support vnodes where there may be multiple primary ranges for a node.
    • In our example we have:
      • n1 => [{(a,b]}, {(b,c]}, {c,a]}]
      • n2 => [{(b,c]}, {c,a]}, {(a,b]}]
      • n3 => [{c,a]}, {(a,b]}, {(b,c]}]
    • When all three nodes are live, the active token ranges for the node are as follows:
      • n1 => [{(a,b]}, {(b,c]}, {c,a]}] => {(a,b]}
      • n2 => [{(b,c]}, {c,a]}, {(a,b]}] => {(b,c]}
      • n3 => [{c,a]}, {(a,b]}, {(b,c]}] => {(c,a]}
    • Assume that n3 has died, its primary range is then searched for in the secondary replicas of live nodes:
      • n1 => [{(a,b]}, {(b,c]}, {c,a]}] => {(a,b], }
      • n2 => [{(b,c]}, {c,a]}, {(a,b]}] => {(b,c], (c,a]}
      • n3 => [{c,a]}, {(a,b]}, {(b,c]}] => {}
    • Assume that n2 and n3 have died, their primary range is then searched for in the secondary replicas of live nodes, and if not found the tertiary replicas (assuming replication factor 3) :
      • n1 => [{(a,b]}, {(b,c]}, {c,a]}] => {(a,b], (b,c], (c,a]}
      • n2 => [{(b,c]}, {c,a]}, {(a,b]}] => {}
      • n3 => [{c,a]}, {(a,b]}, {(b,c]}] => {}
  • This ensures that data is only sent once from each edge node, and that dead nodes do not result in orphaned data which is not sent.

Handling the Node Failure Case

Below illustrates the three stages of a failure case.

  1. Before - where everything is working as expected.
  2. Node 2 Fails - so Node 1 becomes Active for its token Slices and ignores what it has already been partially sent for 120-180, and resends from its secondary directory.
  3. Node 2 restarts - this is after Node 1 has sent 3 Slices for which Node 2 was primary (but Node 1 was Active because it was Node 2’s secondary), it synchronously Deletes those because the CRCs match. It ignores what has already been partially sent for 300-360 and resends those from its primary directory and carries on.

Before

Node 2 Dies

Node 2 Restarts

 

Conclusion

The vastly improved and revamped DSE Advanced Replication v2 in DSE 5.1 is more resilient and performant with support for multi-hubs and multi-clusters.

For more information see our documentation here.

Cassandra Fundamentals & Data Modelling

This course is designed for developers, and database administrators who want to a rapid, deep-dive and ‘hands on’ exploration of core Cassandra theories and data modelling practices.

Continue reading Cassandra Fundamentals & Data Modelling on opencredo.com.

Studio 2.0 Goes Multi-Model with CQL Support

Great news!  In addition to support for DSE Graph and Apache TinkerPop™, Datastax Studio 2.0 introduces support for the Apache Cassandra™ Query Language(CQL).  A big part of that support is an intelligent CQL editor that will give you a productivity boost when working with CQL and Datastax Enterprise(DSE) 5.0+.  In this blog post we’ll take a deep dive on what the CQL editor has to offer.

Getting Started with CQL and Studio

CQL support for Studio requires DSE 5.0 or higher and Studio 2.0 or higher.  Both can be downloaded here http://docs.datastax.com/en/latest-dse/ and here http://docs.datastax.com/en/latest-studio/.

As is customary in Studio, you work with CQL in a notebook with one or more notebook cells.  To use CQL, just select it as the language for one of your notebook cells:


Figure 1.a:  Shows where to click to get the drop down menu of language options


Figure 1.b:  Demonstrates selecting CQL as a language which will enable the intelligent editor features

You won’t have to make this selection every time as any new cell automatically inherits the prior cells language to avoid having to select the language you want to work with repeatedly.

Keyspaces

If you have worked with CQL in the past the next thing you’ll want to know is how to select a keyspace.  You have a few options with Studio:

  1. Fully qualify schema elements by their keyspace in your CQL statements:
  2. Use a USE statement, which will change the keyspace context for all subsequent statements in the same cell:
  3. Configure the keyspace by selecting one from keyspace drop down for a cell, which will set the keyspace context for all statements in a cell (except for statements following a USE statement):

Like with the cell language, if a new cell is created directly below a CQL cell the keyspace setting will be inherited.

Now that we know how to work with keyspaces, let’s move on.

CQL Validations


Figure 2:  A CQL schema aware domain validation error, indicating a keyspace needs to be specified

In the previous section we showed several ways to work with keyspaces.  But if you don’t use any of the above options how do you know you’ve made a mistake without executing the statement?  The answer is shown in Figure 3 above.  There we can see that Studio let’s us know when our statements has an issue by showing a validation error.

Studio supports supports both CQL syntax and DSE domain specific validations.  A syntax validation is simply whether or not your statement is valid with respect to the CQL grammar:

Domain validations provide you with errors or warnings that you would get from DSE when executing a statement when some constraint is violated.  Most are based on checking if a statement is valid with regards to your schema.  But they can include anything, such as informing you that you’ve specified an invalid table option:


Figure 3:  Example of a CQL domain validation error, that if you execute the statement gives you similar feedback from DSE

In this case, and many others you can figure out how to correct your statement by removing the part of the statement with an error and invoking content assist with ctrl+space to get a list of proposals.  Let’s take a look at content assist now.

Content Assist

Like validations, content assist can help you by proposing the next valid keywords in the grammar, or it can provide domain specific proposals.  Let’s see how we might correct the statement that specified an invalid table option by invoking content assist with ctrl+space:


Figure 4:  Example of proposing valid table options in a CREATE TABLE statement

In Figure 4 above we can see that the table option that we probably wanted before was bloom_filter_fp_chance, which after being selected will be inserted with a valid default value.

There are many places in CQL statements that Studio supports invoking content assist.  Some of the more common are:

  • Proposing table names anywhere a table can be referenced in a statement
  • Proposing column names anywhere a table can be referenced
  • Proposing the next valid keyword, e.g. CREATE <ctrl+space> should propose the TABLE keyword, among others

CQL Templates

Perhaps the most useful place to invoke content assist is at the very beginning of a statement:


Figure 5:  Invocation of content assist at the beginning of a statement that propose CQL statement templates

What you see in Figure 5 is that the proposals that contain placeholder values ({keyspaceName}, {viewName}) are CQL statement templates.  If we select the ALTER TABLE(add column) template a statement is inserted with each placeholder being a portion of the statement you need to complete.  You can TAB through these placeholders to jump around a statement, as well as use SHIFT+TAB to move back to the previous placeholder:


Figure 6:  Show the ALTER TABLE(add column) template inserted, with the current placeholder highlighted

In Figure 6 you can see that the placeholders are emphasized with the current placeholder being highlighted.  For this template we need to provide a table name, a column name and a type for the column.  Templates like these can be very handy when dealing with large complicated statements that you might not remember the syntax for off hand.  Such as the CREATE MATERIALIZED VIEW statement:


Figure 7:  Shows how handy templates can be for large complex statements such as CREATE MATERIALIZED VIEW(with clustering order)

When in doubt, give content assist a try!  All you have to do is invoke it with ctrl+space and you will pleasantly surprised how much Studio can help you with crafting your CQL statements.

Effective Schema

When either validating your statements or making content assist proposals, Studio makes schema based domain validations and content assist proposals using an effective schema.  The effective schema is your existing schema combined with changes each of your DDL statements would effectively make to the database schema.  More specifically the changes from every DDL statement prior to the current statement that you are either trying to invoke content assist on, or that the editor is validating.

This ensures that if you were to execute your cells one by one from the top down that they would each execute successfully.

To make this clearer, take a look at the following example:


Figure 8:  Example of effective schema in a single cell

In the example above, assume that the database schema does not have the videos table, and that we have not executed this notebook cell.  In this cell we can see the following being demonstrated:

  1. A CREATE TABLE statement applies a change to the effective schema so that the videos table now exists from the perspective of the second statement
  2. Even though we haven’t executed it, the second statement(drop table videos) does not have a validation error, because the videos table exists in the effective schema of the drop table statement.
  3. The third statement tries to select from the videos table.  But the effective schema for that statement no longer has the videos table due to the prior drop statement, so it is flagged with a validation error.

Note that effective schema also carries across cells:


Figure 9:  Example of effective schema across multiple cells

And as mentioned previously, content assist also leverages effective schema


Figure 10:  Example of content assist leveraging the effective schema

The example above shows that content assist is aware that the videos2 table exists in the effective schema, but that videos1 has been dropped, so it isn’t proposed as a possible table to drop for the current statement.

Effective schema is a great tool to have to ensure you are writing statements that will execute successfully when working on a notebook that contains DDL statements. Especially notebooks with many statements.

One last topic for this post is a way for you to view your database schema from the editor itself using Studio’s DESCRIBE statement support.

DESCRIBE Statement Support

Suppose you want to create a new user defined type(UDT) that is fairly similar to an existing UDT, or you just don’t remember the syntax.  One way to do this quickly is to leverage Studio’s support for describing CQL schema elements.  Like CQL shell(cqlsh), executing DESCRIBE statements will produce the equivalent DDL to create that schema element.  Which is a handy thing to copy and then modify to meet your new types needs:


Figure 11:  Shows the result of executing a DESCRIBE TYPE statement

In general Studio's DESCRIBE command support is a great way to inspect parts of your schema quickly without leaving the editor.  However, it’s important to note that DESCRIBE commands are not actual CQL statements and don't execute against your DSE cluster. Instead Studio uses the metadata it knows about your schema to generate equivalent output that you would find if issuing DESCRIBE commands using cqlsh.

What DESCRIBE commands does Studio support?

  • DESCRIBE CLUSTER
  • DESCRIBE KEYSPACES
  • DESCRIBE KEYSPACE
  • DESCRIBE TABLES
  • DESCRIBE TABLE
  • DESCRIBE INDEX
  • DESCRIBE MATERIALIZED VIEW
  • DESCRIBE TYPES
  • DESCRIBE TYPE
  • DESCRIBE FUNCTIONS
  • DESCRIBE FUNCTION
  • DESCRIBE AGGREGATES
  • DESCRIBE AGGREGATE

 Next Steps

A great place for you to go next is to download Studio and walk through the Working With CQL tutorial that ships with it.  That tutorial contains even more info about how to work with CQL in Studio, including:

  • Browsing your CQL schema with our fabulous CQL schema viewer
  • Different ways to visualize your CQL results, including a detailed JSON view of nested data in a single column
  • How to create custom CQL execution configurations, including ones that enable tracing and give you a profile view of your queries execution

Thanks!

We hope that Studio will be an extremely productive environment for you to craft your CQL queries to run against Datastax Enterprise.  If you have any feedback or requests, don’t hesitate to contact the Studio team at:  studio-feedback@datastax.com.  

Spark Application Dependency Management

This blog post was written for DataStax Enterprise 5.1.0. Refer to the DataStax documentation for your specific version of DSE.

Compiling and executing Apache Spark™ applications with custom dependencies can be a challenging task. Spark beginners can feel overwhelmed by the number of different solutions to this problem. Diversity of library versions, the number of different build tools and finally the build techniques, such as assembling fat JARs and dependency shading, can cause a headache.

In this blog post, we shed light on how to manage compile-time and runtime dependencies of a Spark Application that is compiled and executed against DataStax Enterprise (DSE) or open source Apache Spark (OSS).

Along the way we use a set of predefined bootstrap projects that can be adopted and used as a starting point for developing a new Spark Application. These examples are all about connecting, reading, and writing to and from a DataStax Enterprise or Apache Cassandra(R) system.

Quick Glossary:

Spark Driver: A user application that contains a Spark Context.
Spark Context: A Scala class that functions as the control mechanism for distributed work.
Spark Executor: A remote Java Virtual Machine (JVM) that performs work as orchestrated by the Spark Driver.
Runtime classpath: A list of all dependencies available during execution (in execution environment such as Apache Spark cluster). It's important to note that the runtime classpath of the Spark Driver is not necessarily identical to the runtime classpath of the Spark Executor.
Compile classpath: A full list of all dependencies available during compilation (specified with build tool syntax in a build file).

Choose language and build tool

First, git clone the DataStax repository https://github.com/datastax/SparkBuildExamples that provides the code that you are going to work with. Within cloned directories there are Spark Application bootstrap projects for Java and Scala, and for the most frequently used build tools:

  • Scala Build Tool (sbt)
  • Apache Maven™
  • Gradle

In the context of managing dependencies for the Spark Application, these build tools are equivalent. It is up to you to select the language and build tool that best fits you and your team.

For each build tool, the way the application is built is defined with declarative syntax embedded in files in the application’s directory:

  • Sbt: build.sbt
  • Apache Maven: pom.xml
  • Gradle: build.gradle

From now on we are going to refer to those files as a build files.

Choose execution environment

Two different execution environments are supported in the repository: DSE and OSS.

DSE

If you are planning to execute your Spark Application on a DSE cluster, use the dse bootstrap project which greatly simplifies dependency management.

It leverages the dse-spark-dependencies library which instructs a build tool to include all dependency JAR files that are distributed with DSE and are available in the DSE cluster runtime classpath. These JAR files include Apache Spark JARs and their dependencies, Apache Cassandra JARs, Spark Cassandra Connector JAR, and many others. Everything that is needed to build your bootstrap Spark Application is supplied by the dse-spark-dependencies dependency. To view the list of all dse-spark-dependencies dependencies, visit our public repo and inspect the pom files that are relevant to your DSE cluster version.

An example of an DSE built.sbt:

libraryDependencies += "com.datastax.dse" % "dse-spark-dependencies" % "5.1.1" % "provided"

Using this managed dependency will automatically match your compile time dependencies with the DSE dependencies on the runtime classpath. This means there is no possibility in the execution environment for dependency version conflicts, unresolved dependencies etc.

Note: The DSE version must match the one in your cluster, please see “Execution environment version” section for details.

DSE projects templates are built with sbt 0.13.13 or later. In case of unresolved dependencies errors, update sbt and then clean ivy cache (with rm ~/.ivy2/cache/com.datastax.dse/dse-spark-dependencies/ command).

OSS

If you are planning to execute your Spark Application on an open source Apache Spark cluster, use the oss bootstrap project. For the oss bootstrap project, all compilation classpath dependencies must be manually specified in build files.

An example of an OSS built.sbt:

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion % "provided"
)

For OSS, you must specify these four dependencies for the compilation classpath.

During execution, the Spark runtime classpath already contains the org.apache.spark.* dependencies, so all we need to do is to add spark-cassandra-connector as an extra dependency. The DataStax spark-cassandra-connector doesn’t exist in the Spark cluster by default. The most common method to include this additional dependency is to use --packages argument for the spark-submit command. An example of --packages argument usage is shown in the “Execute” section below.

The Apache Spark versions in the build file must match the Spark version in your Spark cluster. See next section for details.

Execution environment versions

It is possible that your DSE or OSS cluster version is different than the one specified in bootstrap project.

DSE

If you are a DSE user then checkout the SparkBuildExamples version that matches your DSE cluster version, for example:

git checkout <DSE_version>
# example: git checkout 5.0.6

If you are a DSE 4.8.x user then checkout 4.8.13 or newer 4.8.x version.

OSS

If you are planning to execute your application against a Spark cluster different than the one specified in a bootstrap project build file, adjust all dependencies version listed there. Fortunately, the main component versions are variables. See the example below and adjust following according to your needs.

Sbt

val sparkVersion = "2.0.2"
val connectorVersion = "2.0.0"

 

Maven

<properties>
  <spark.version>2.0.2</spark.version>
  <connector.version>2.0.0</connector.version>
</properties>

 

Gradle

def sparkVersion = "2.0.2"
def connectorVersion = "2.0.0"

Let’s say that your Spark cluster has 1.5.1 version. Go to version compatibility table, there you can see compatible Apache Cassandra versions and Spark Cassandra Connector versions. In this example, our Apache Spark 1.5.1 cluster is compatible with 1.5.x Spark Cassandra Connector, the newest one is 1.5.2 (newest versions can be found on Releases page). Adjust the variables accordingly and you are good to go!

Build

The build command differs for each build tool. The bootstrap projects can be built with the following commands.

Sbt

sbt clean assembly
# produces jar in path: target/scala-2.11/writeRead-assembly-0.1.jar

Maven

mvn clean package
# produces jar in path: target/writeRead-0.1.jar

Gradle

gradle clean shadowJar
# produces jar in path: build/libs/writeRead-0.1.jar

Execute

The spark-submit command differs between environments. In DSE environment, the command is simplified to autodetect parameters like --master. In addition, various other Apache Cassandra and DSE specific parameters are added to the default SparkConf. Use the following commands to execute the JAR that you built. Refer to the Spark docs for details about spark-submit command.

DSE

dse spark-submit --class com.datastax.spark.example.WriteRead <path_to_produced_jar>

OSS

spark-submit --conf spark.cassandra.connection.host=<cassandra_host> --class com.datastax.spark.example.WriteRead --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0 --master <master_url> <path_to_produced_jar>

Note the usage of --packages to include the spark-cassandra-connector on the runtime classpath for all application JVMs.

Provide additional dependencies

Now that you have successfully built and executed this simple application, it’s time to see how extra dependencies can be added to your Spark Application.

Let’s say your application grows with time and there is a need to incorporate an external dependency to add functionality to your application. For this argument, let the new dependency  be commons-math3.

To supply this dependency to the compilation classpath, we must provide proper configuration entries in build files.

There are two ways to provide additional dependencies to runtime classpath assembling or manually providing all dependencies with the spark-submit command.

Assembly

Assembling is a way of directly including dependencies classes in the resulting JAR file (sometimes called fat-jar or uber-jar) as if these dependency classes were developed along with your application. When the user code is shipped to Apache Spark Executors, these dependency classes are included in the application JAR on the runtime classpath. To see an example, uncomment the following sections in any of your build files.

Sbt

libraryDependencies += "org.apache.commons" %% "commons-math3" % "3.6.1"

 

Maven

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-math3</artifactId>
  <version>3.6.1</version>
</dependency>

 

Gradle

assembly "org.apache.commons:commons-math3:3.6.1"

Now you can use commons-math3 classes in your application code. When your development is finished, you can create a JAR file using the build command and submit it without any modifications to the spark-submit command. If you are curious to see where the additional dependency is, use any archive application to open the produced JAR to see that commons-math3 classes are included.

When assembling, you might run into conflicts where multiple jars attempt to include a file with the same filename but different contents. There are several solutions to this problem, most common are: removing one of the conflicting dependencies or shading (which is described later in this blog post). If all else fails, most plugins have a variety of other merge strategies for handling these situations. For example, the  https://github.com/sbt/sbt-assembly#merge-strategy.

Manually adding JARs to the runtime classpath

If you don’t want to assembly a fat JAR (maybe the number of additional dependencies produced a 100MB JAR file and you consider this size unusable), use an alternate way to provide additional dependencies to runtime classpath.

Mark some of the dependencies with provided keyword to exclude them from the assembly JAR.

Sbt

libraryDependencies += "org.apache.commons" %% "commons-math3" % "3.6.1" % "provided"

 

Maven

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-math3</artifactId>
  <version>3.6.1</version>
  <scope>provided</scope>
</dependency>

 

Gradle

provided "org.apache.commons:commons-math3:3.6.1"

After building a JAR, manually specify additional dependencies with spark-submit command during application submission. Add or extend existing --packages argument of spark-submit command. Note that multiple dependencies are separated by commas. For example:

--packages org.apache.commons:commons-math3:3.6.1,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0

User dependencies conflicting with Spark dependencies

What if you want to use different version of a dependency than the version that is present in the execution environment?

For example, a Spark cluster already has commons-csv in its runtime classpath and the developer needs a different version in their application. Maybe the Spark version is old and doesn’t contain all the needed functionality. Maybe the new version is not backward compatible and breaks Spark Application execution.

This is a common problem and there is a solution: shading.

Shading

Shading is a build technique where dependency classes are packaged with application JAR files (like in assembling) but additionally package structure of this classes is altered. This process happens at compile time and is transparent to the developer. Shading simply substitutes all dependency references in a Spark Application with the same (functionality-wise) classes but located in different packages. For example, the class org.apache.commons.csv.CSVParser for Spark Application becomes shaded.org.apache.commons.csv.CSVParser.

To see shading in action uncomment following sections in build file of your choice. This will embed old commons-csv in resulting jar but with prepended package “shaded”.

Sbt

assembly "org.apache.commons:commons-csv:1.0"

and

assemblyShadeRules in assembly := Seq( 
 ShadeRule.rename("org.apache.commons.csv.**" -> "shaded.org.apache.commons.csv.@1").inAll 
)

Maven

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-csv</artifactId>
  <version>1.0</version>
</dependency>

and

<relocations>
  <relocation>
    <pattern>org.apache.commons.csv</pattern>
    <shadedPattern>shaded.org.apache.commons.csv</shadedPattern>
  </relocation>
</relocations>

 

Gradle

libraryDependencies += "org.apache.commons" % "commons-csv" % "1.0"

and

shadowJar {
  relocate 'org.apache.commons.csv', 'shaded.org.apache.commons.csv'
}

After building the JAR, you can look into its content and see that commons-csv is embedded in shaded directory.

Summary

In this article, you learned how to manage compile-time and runtime dependencies of a simple Apache Spark application that connects to an Apache Cassandra database by using the Spark Cassandra Connector. You learned how Scala and Java projects are structured with sbt, gradle, and maven build tools. You also learned different ways to provide additional dependencies and how to resolve dependency conflicts with shading.

Testing a Spark Application

This is the third article in our Spark-related blog series, which covers some basic approaches for the testing of Cassandra/Spark code. It will show you how to restructure your code to be testable, covering unit testing, integration testing, and acceptance testing

Continue reading Testing a Spark Application on opencredo.com.

Deploy Spark with an Apache Cassandra cluster

This is the second article in our Spark-related blog series which moves on from demonstrating functionality and describes the details of how to set up an infrastructure capable of running such analytical processing

Continue reading Deploy Spark with an Apache Cassandra cluster on opencredo.com.

New Blog Series: Spark – The Pragmatic Bits

Interested in learning more about the practical and pragmatic aspects of using Apache Spark for your data processing challenges? Please join us for our new blog and webinar series: Apache Spark - The Pragmatic Bits. The blog series will aim to cover topics including how Spark can be used to get the most out of your Cassandra setup, how to actually deploy a Spark and Cassandra cluster through programmable infrastrcture, and how to ensure you write testable Spark code which will play nicely with the rest of your system. The series finishes with a webinar which explores the use case of “Detecting stolen AWS credential usage with Spark”.

Continue reading New Blog Series: Spark – The Pragmatic Bits on opencredo.com.