Creating Flamegraphs with Apache Cassandra in Kubernetes (cass-operator)

In a previous blog post recommending disabling read repair chance, some flamegraphs were generated to demonstrate the effect read repair chance had on a cluster. Let’s go through how those flamegraphs were captured, step-by-step using Apache Cassandra 3.11.6, Kubernetes and the cass-operator, nosqlbench and the async-profiler.

In previous blog posts we would have used the existing tools of tlp-cluster or ccm, tlp-stress or cassandra-stress, and sjk. Here we take a new approach that is a lot more fun, as with k8s the same approach can be used locally or in the cloud. No need to switch between ccm clusters for local testing and tlp-cluster for cloud testing. Nor are you bound to AWS for big instance testing, that’s right: no vendor lock-in. Cass-operator and K8ssandra is getting a ton of momentum from DataStax, so it is only deserved and exciting to introduce them to as much of the open source world as we can.

This blog post is not an in-depth dive into using cass-operator, rather a simple teaser to demonstrate how we can grab some flamegraphs, as quickly as possible. The blog post is split into three sections

  • Setting up Kubernetes and getting Cassandra running
  • Getting access to Cassandra from outside Kubernetes
  • Stress testing and creating flamegraphs

Setup

Let’s go through a quick demonstration using Kubernetes, the cass-operator, and some flamegraphs.

First, download four yaml configuration files that will be used. This is not strictly necessary for the latter three, as kubectl may reference them by their URLs, but let’s download them for the sake of having the files locally and being able to make edits if and when desired.

wget https://thelastpickle.com/files/2021-01-31-cass_operator/01-kind-config.yaml
wget https://thelastpickle.com/files/2021-01-31-cass_operator/02-storageclass-kind.yaml
wget https://thelastpickle.com/files/2021-01-31-cass_operator/11-install-cass-operator-v1.1.yaml
wget https://thelastpickle.com/files/2021-01-31-cass_operator/13-cassandra-cluster-3nodes.yaml

The next steps involve kind and kubectl to create a local cluster we can test. To use kind you have docker running locally, it is recommended to have 4 CPU and 12GB RAM for this exercise.

kind create cluster --name read-repair-chance-test --config 01-kind-config.yaml

kubectl create ns cass-operator
kubectl -n cass-operator apply -f 02-storageclass-kind.yaml
kubectl -n cass-operator apply -f 11-install-cass-operator-v1.1.yaml

# watch and wait until the pod is running
watch kubectl -n cass-operator get pod

# create 3 node C* cluster
kubectl -n cass-operator apply -f 13-cassandra-cluster-3nodes.yaml

# again, wait for pods to be running
watch kubectl -n cass-operator get pod

# test the three nodes are up
kubectl -n cass-operator exec -it cluster1-dc1-default-sts-0 -- nodetool status

Access

For this example we are going to run NoSqlBench from outside the k8s cluster, so we will need access to a pod’s Native Protocol interface via port-forwarding. This approach is practical here because it was desired to have the benchmark connect to just one coordinator. In many situations you would instead run NoSqlBench from a separate dedicated pod inside the k8s cluster.

# get the cql username
kubectl -n cass-operator get secret cluster1-superuser -o yaml | grep " username" | awk -F" " '{print $2}' | base64 -d && echo ""

# get the cql password
kubectl -n cass-operator get secret cluster1-superuser -o yaml | grep " password" | awk -F" " '{print $2}' | base64 -d && echo ""

# port forward the native protocol (CQL)
kubectl -n cass-operator port-forward --address 0.0.0.0 cluster1-dc1-default-sts-0 9042:9042 

The above sets up the k8s cluster, a k8s storageClass, and the cass-operator with a three node Cassandra cluster. For a more in depth look at this setup checkout this tutorial.

Stress Testing and Flamegraphs

With a cluster to play with, let’s generate some load and then go grab some flamegraphs.

Instead of using SJK (Swiss Java Knife), as our previous blog posts have done, we will use the async-profiler. The async-profiler does not suffer from Safepoint bias problem, an issue we see more often than we would like in Cassandra nodes (protip: make sure you configure ParallelGCThreads and ConcGCThreads to the same value).

Open a new terminal window and do the following.

# get the latest NoSqlBench jarfile
wget https://github.com/nosqlbench/nosqlbench/releases/latest/download/nb.jar

# generate some load, use credentials as found above

java -jar nb.jar cql-keyvalue username=<cql_username> password=<cql_password> whitelist=127.0.0.1 rampup-cycles=10000 main-cycles=500000 rf=3 read_cl=LOCAL_ONE

# while the load is still running,
# open a shell in the coordinator pod, download async-profiler and generate a flamegraph
kubectl -n cass-operator exec -it cluster1-dc1-default-sts-0 -- /bin/bash

wget https://github.com/jvm-profiling-tools/async-profiler/releases/download/v1.8.3/async-profiler-1.8.3-linux-x64.tar.gz

tar xvf async-profiler-1.8.3-linux-x64.tar.gz

async-profiler-1.8.3-linux-x64/profiler.sh -d 300 -f /tmp/flame_away.svg <CASSANDRA_PID>

exit

# copy the flamegraph out of the pod
kubectl -n cass-operator cp cluster1-dc1-default-sts-0:/tmp/flame_away.svg flame_away.svg

Keep It Clean

After everything is done, it is time to clean up after yourself.

Delete the CassandraDatacenters first, otherwise Kubernetes will block deletion because we use a finalizer. Note, this will delete all data in the cluster.

kubectl delete cassdcs --all-namespaces --all

Remove the operator Deployment, CRD, etc.

# this command can take a while, be patient

kubectl delete -f https://raw.githubusercontent.com/datastax/cass-operator/v1.5.1/docs/user/cass-operator-manifests-v1.16.yaml

# if troubleshooting, to forcibly remove resources, though
# this should not be necessary, and take care as this will wipe all resources

kubectl delete "$(kubectl api-resources --namespaced=true --verbs=delete -o name | tr "\n" "," | sed -e 's/,$//')" --all

To remove the local Kubernetes cluster altogether

kind delete cluster --name read-repair-chance-test

To stop and remove the docker containers that are left running…

docker stop $(docker ps | grep kindest | cut -d" " -f1)
docker rm $(docker ps -a | grep kindest | cut -d" " -f1)

More… the cass-operator tutorials

There is a ton of documentation and tutorials getting released on how to use the cass-operator. If you are keen to learn more the following is highly recommended: Managing Cassandra Clusters in Kubernetes Using Cass-Operator.

The Impacts of Changing the Number of VNodes in Apache Cassandra

Apache Cassandra’s default value for num_tokens is about to change in 4.0! This might seem like a small edit note in the CHANGES.txt, however such a change can have a profound effect on day-to-day operations of the cluster. In this post we will examine how changing the value for num_tokens impacts the cluster and its behaviour.

There are many knobs and levers that can be modified in Apache Cassandra to tune its behaviour. The num_tokens setting is one of those. Like many settings it lives in the cassandra.yaml file and has a defined default value. That’s where it stops being like many of Cassandra’s settings. You see, most of Cassandra’s settings will only affect a single aspect of the cluster. However, when changing the value of num_tokens there is an array of behaviours that are altered. The Apache Cassandra project has committed and resolved CASSANDRA-13701 which changed the default value for num_tokens from 256 to 16. This change is significant, and to understand the consequences we first need to understand the role that num_tokens play in the cluster.

Never try this on production

Before we dive into any details it is worth noting that the num_tokens setting on a node should never ever be changed once it has joined the cluster. For one thing the node will fail on a restart. The value of this setting should be the same for every node in a datacenter. Historically, different values were expected for heterogeneous clusters. While it’s rare to see, nor would we recommend, you can still in theory double the num_tokens on nodes that are twice as big in terms of hardware specifications. Furthermore, it is common to see the nodes in a datacenter have a value for num_tokens that differs to nodes in another datacenter. This is partly how changing the value of this setting on a live cluster can be safely done with zero downtime. It is out of scope for this blog post, but details can be found in migration to a new datacenter.

The Basics

The num_tokens setting influences the way Cassandra allocates data amongst the nodes, how that data is retrieved, and how that data is moved between nodes.

Under the hood Cassandra uses a partitioner to decide where data is stored in the cluster. The partitioner is a consistent hashing algorithm that maps a partition key (first part of the primary key) to a token. The token dictates which nodes will contain the data associated with the partition key. Each node in the cluster is assigned one or more unique token values from a token ring. This is just a fancy way of saying each node is assigned a number from a circular number range. That is, “the number” being the token hash, and “a circular number range” being the token ring. The token ring is circular because the next value after the maximum token value is the minimum token value.

An assigned token defines the range of tokens in the token ring the node is responsible for. This is commonly known as a “token range”. The “token range” a node is responsible for is bounded by its assigned token, and the next smallest token value going backwards in the ring. The assigned token is included in the range, and the smallest token value going backwards is excluded from the range. The smallest token value going backwards typically resides on the previous neighbouring node. Having a circular token ring means that the range of tokens a node is responsible for, could include both the minimum and maximum tokens in the ring. In at least one case the smallest token value going backwards will wrap back past the maximum token value in the ring.

For example, in the following Token Ring Assignment diagram we have a token ring with a range of hashes from 0 to 99. Token 10 is allocated to Node 1. The node before Node 1 in the cluster is Node 5. Node 5 is allocated token 90. Therefore, the range of tokens that Node 1 is responsible for is between 91 and 10. In this particular case, the token range wraps around past the maximum token in the ring.

Token ring

Note that the above diagram is for only a single data replica. This is because only a single node is assigned to each token in the token ring. If multiple replicas of the data exists, a node’s neighbours become replicas for the token as well. This is illustrated in the Token Ring Assignment diagram below.

Token ring

The reason the partitioner is defined as a consistent hashing algorithm is because it is just that; no matter how many times you feed in a specific input, it will always generate the same output value. It ensures that every node, coordinator, or otherwise, will always calculate the same token for a given partition key. The calculated token can then be used to reliably pinpoint the nodes with the sought after data.

Consequently, the minimum and maximum numbers for the token ring are defined by the partitioner. The default Murur3Partitioner based on the Murmur hash has for example, a minimum and maximum range of -2^63 to +2^63 - 1. The legacy RandomPartitioner (based on the MD5 hash) on the other hand has a range of 0 to 2^127 - 1. A critical side effect of this system is that once a partitioner for a cluster is picked, it can never be changed. Changing to a different partitioner requires the creation of a new cluster with the desired partitioner and then reloading the data into the new cluster.

Further information on consistent hashing functionality can be found in the Apache Cassandra documentation.

Back in the day…

Back in the pre-1.2 era, nodes could only be manually assigned a single token. This was done and can still be done today using the initial_token setting in the cassandra.yaml file. The default partitioner at that point was the RandomPartitioner. Despite token assignment being manual, the partitioner made the process of calculating the assigned tokens fairly straightforward when setting up a cluster from scratch. For example, if you had a three node cluster you would divide 2^127 - 1 by 3 and the quotient would give you the correct increment amount for each token value. Your first node would have an initial_token of 0, your next node would have an initial_token of (2^127 - 1) / 3, and your third node would have an initial_token of (2^127 - 1) / 3 * 2. Thus, each node will have the same sized token ranges.

Dividing the token ranges up evenly makes it less likely individual nodes are overloaded (assuming identical hardware for the nodes, and an even distribution of data across the cluster). Uneven token distribution can result in what is termed “hot spots”. This is where a node is under pressure as it is servicing more requests or carrying more data than other nodes.

Even though setting up a single token cluster can be a very manual process, their deployment is still common. Especially for very large Cassandra clusters where the node count typically exceeds 1,000 nodes. One of the advantages of this type of deployment, is you can ensure that the token distribution is even.

Although setting up a single token cluster from scratch can result in an even load distribution, growing the cluster is far less straight forward. If you insert a single node into your three node cluster, the result is that two out of the four nodes will have a smaller token range than the other two nodes. To fix this problem and re-balance, you then have to run nodetool move to relocate tokens to other nodes. This is a tedious and expensive task though, involving a lot of streaming around the whole cluster. The alternative is to double the size of your cluster each time you expand it. However, this usually means using more hardware than you need. Much like having an immaculate backyard garden, maintaining an even token range per node in a single token cluster requires time, care, and attention, or alternatively, a good deal of clever automation.

Scaling in a single token world is only half the challenge. Certain failure scenarios heavily reduce time to recovery. Let’s say for example you had a six node cluster with three replicas of the data in a single datacenter (Replication Factor = 3). Replicas might reside on Node 1 and Node 4, Node 2 and Node 5, and lastly on Node 3 and Node 6. In this scenario each node is responsible for a sixth of each of the three replicas.

Six node cluster and three replicas

In the above diagram, the tokens in the token ring are assigned an alpha character. This is to make tracking the token assignment to each node easier to follow. If the cluster had an outage where Node 1 and Node 6 are unavailable, you could only use Nodes 2 and 5 to recover the unique sixth of the data they each have. That is, only Node 2 could be used to recover the data associated with token range ‘F’, and similarly only Node 5 could be used to recover the data associated with token range ‘E’. This is illustrated in the diagram below.

Six node cluster and three replicas failures scenario

vnodes to the rescue

To solve the shortcomings of a single token assignment, Cassandra version 1.2 was enhanced to allow a node to be assigned multiple tokens. That is a node could be responsible for multiple token ranges. This Cassandra feature is known as “virtual node” or vnodes for short. The vnodes feature was introduced via CASSANDRA-4119. As per the ticket description, the goals of vnodes were:

  • Reduced operations complexity for scaling up/down.
  • Reduced rebuild time in event of failure.
  • Evenly distributed load impact in the event of failure.
  • Evenly distributed impact of streaming operations.
  • More viable support for heterogeneity of hardware.

The introduction of this feature gave birth to the num_tokens setting in the cassandra.yaml file. The setting defined the number of vnodes (token ranges) a node was responsible for. By increasing the number of vnodes per node, the token ranges become smaller. This is because the token ring has a finite number of tokens. The more ranges it is divided up into the smaller each range is.

To maintain backwards compatibility with older 1.x series clusters, the num_tokens defaulted to a value of 1. Moreover, the setting was effectively disabled on a vanilla installation. Specifically, the value in the cassandra.yaml file was commented out. The commented line and previous development commits did give a glimpse into the future of where the feature was headed though.

As foretold by the cassandra.yaml file, and the git commit history, when Cassandra version 2.0 was released out the vnodes feature was enabled by default. The num_tokens line was no longer commented out, so its effective default value on a vanilla installation was 256. Thus ushering in a new era of clusters that had relatively even token distributions, and were simple to grow.

With nodes consisting of 256 vnodes and the accompanying additional features, expanding the cluster was a dream. You could insert one new node into your cluster and Cassandra would calculate and assign the tokens automatically! The token values were randomly calculated, and so over time as you added more nodes, the cluster would converge on being in a balanced state. This engineering wizardry put an end to spending hours doing calculations and nodetool move operations to grow a cluster. The option was still there though. If you had a very large cluster or another requirement, you could still use the initial_token setting which was commented out in Cassandra version 2.0. In this case, the value for the num_tokens still had to be set to the number of tokens manually defined in the initial_token setting.

Remember to read the fine print

This gave us a feature that was like a personal devops assistant; you handed them a node, told them to insert it, and then after some time it had tokens allocated and was part of the cluster. However, in a similar vein, there is a price to pay for the convenience…

While we get a more even token distribution when using 256 vnodes, the problem is that availability degrades earlier. Ironically, the more we break the token ranges up the more quickly we can get data unavailability. Then there is the issue of unbalanced token ranges when using a small number of vnodes. By small, I mean values less than 32. Cassandra’s random token allocation is hopeless when it comes to small vnode values. This is because there are insufficient tokens to balance out the wildly different token range sizes that are generated.

Pics or it didn’t happen

It is very easy to demonstrate the availability and token range imbalance issues, using a test cluster. We can set up a single token range cluster with six nodes using ccm. After calculating the tokens, configuring and starting our test cluster, it looked like this.

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  71.17 KiB  1            33.3%             8d483ae7-e7fa-4c06-9c68-22e71b78e91f  rack1
UN  127.0.0.2  65.99 KiB  1            33.3%             cc15803b-2b93-40f7-825f-4e7bdda327f8  rack1
UN  127.0.0.3  85.3 KiB   1            33.3%             d2dd4acb-b765-4b9e-a5ac-a49ec155f666  rack1
UN  127.0.0.4  104.58 KiB  1            33.3%             ad11be76-b65a-486a-8b78-ccf911db4aeb  rack1
UN  127.0.0.5  71.19 KiB  1            33.3%             76234ece-bf24-426a-8def-355239e8f17b  rack1
UN  127.0.0.6  30.45 KiB  1            33.3%             cca81c64-d3b9-47b8-ba03-46356133401b  rack1

We can then create a test keyspace and populated it using cqlsh.

$ ccm node1 cqlsh
Connected to SINGLETOKEN at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.9 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE test_keyspace WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 };
cqlsh> CREATE TABLE test_keyspace.test_table (
...   id int,
...   value text,
...   PRIMARY KEY (id));
cqlsh> CONSISTENCY LOCAL_QUORUM;
Consistency level set to LOCAL_QUORUM.
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (1, 'foo');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (2, 'bar');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (3, 'net');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (4, 'moo');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (5, 'car');
cqlsh> INSERT INTO test_keyspace.test_table (id, value) VALUES (6, 'set');

To confirm that the cluster is perfectly balanced, we can check the token ring.

$ ccm node1 nodetool ring test_keyspace


Datacenter: datacenter1
==========
Address    Rack   Status  State   Load        Owns     Token
                                                       6148914691236517202
127.0.0.1  rack1  Up      Normal  125.64 KiB  50.00%   -9223372036854775808
127.0.0.2  rack1  Up      Normal  125.31 KiB  50.00%   -6148914691236517206
127.0.0.3  rack1  Up      Normal  124.1 KiB   50.00%   -3074457345618258604
127.0.0.4  rack1  Up      Normal  104.01 KiB  50.00%   -2
127.0.0.5  rack1  Up      Normal  126.05 KiB  50.00%   3074457345618258600
127.0.0.6  rack1  Up      Normal  120.76 KiB  50.00%   6148914691236517202

We can see in the “Owns” column all nodes have 50% ownership of the data. To make the example easier to follow we can manually add a letter representation next to each token number. So the token ranges could be represented in the following way:

$ ccm node1 nodetool ring test_keyspace


Datacenter: datacenter1
==========
Address    Rack   Status  State   Load        Owns     Token                 Token Letter
                                                       6148914691236517202   F
127.0.0.1  rack1  Up      Normal  125.64 KiB  50.00%   -9223372036854775808  A
127.0.0.2  rack1  Up      Normal  125.31 KiB  50.00%   -6148914691236517206  B
127.0.0.3  rack1  Up      Normal  124.1 KiB   50.00%   -3074457345618258604  C
127.0.0.4  rack1  Up      Normal  104.01 KiB  50.00%   -2                    D
127.0.0.5  rack1  Up      Normal  126.05 KiB  50.00%   3074457345618258600   E
127.0.0.6  rack1  Up      Normal  120.76 KiB  50.00%   6148914691236517202   F

We can then capture the output of ccm node1 nodetool describering test_keyspace and change the token numbers to the corresponding letters in the above token ring output.

$ ccm node1 nodetool describering test_keyspace

Schema Version:6256fe3f-a41e-34ac-ad76-82dba04d92c3
TokenRange:
  TokenRange(start_token:A, end_token:B, endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.4], rpc_endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:C, end_token:D, endpoints:[127.0.0.4, 127.0.0.5, 127.0.0.6], rpc_endpoints:[127.0.0.4, 127.0.0.5, 127.0.0.6], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:B, end_token:C, endpoints:[127.0.0.3, 127.0.0.4, 127.0.0.5], rpc_endpoints:[127.0.0.3, 127.0.0.4, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:D, end_token:E, endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.1], rpc_endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:F, end_token:A, endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.3], rpc_endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1)])
  TokenRange(start_token:E, end_token:F, endpoints:[127.0.0.6, 127.0.0.1, 127.0.0.2], rpc_endpoints:[127.0.0.6, 127.0.0.1, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1)])

Using the above output, specifically the end_token, we can determine all the token ranges assigned to each node. As mentioned earlier, the token range is defined by the values after the previous token (start_token) up to and including the assigned token (end_token). The token ranges assigned to each node looked like this:

Six node cluster and three replicas

In this setup, if node3 and node6 were unavailable, we would lose an entire replica. Even if the application is using a Consistency Level of LOCAL_QUORUM, all the data is still available. We still have two other replicas across the other four nodes.

Now let’s consider the case where our cluster is using vnodes. For example purposes we can set num_tokens to 3. It will give us a smaller number of tokens making for an easier to follow example. After configuring and starting the nodes in ccm, our test cluster initially looked like this.

For the majority of production deployments where the cluster size is less than 500 nodes, it is recommended that you use a larger value for `num_tokens`. Further information can be found in the Apache Cassandra Production Recommendations.
$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  71.21 KiB  3       46.2%             7d30cbd4-8356-4189-8c94-0abe8e4d4d73  rack1
UN  127.0.0.2  66.04 KiB  3       37.5%             16bb0b37-2260-440c-ae2a-08cbf9192f85  rack1
UN  127.0.0.3  90.48 KiB  3       28.9%             dc8c9dfd-cf5b-470c-836d-8391941a5a7e  rack1
UN  127.0.0.4  104.64 KiB  3      20.7%             3eecfe2f-65c4-4f41-bbe4-4236bcdf5bd2  rack1
UN  127.0.0.5  66.09 KiB  3       36.1%             4d5adf9f-fe0d-49a0-8ab3-e1f5f9f8e0a2  rack1
UN  127.0.0.6  71.23 KiB  3       30.6%             b41496e6-f391-471c-b3c4-6f56ed4442d6  rack1

Right off the blocks we can see signs that the cluster might be unbalanced. Similar to what we did with the single node cluster, here we create the test keyspace and populate it using cqlsh. We then grab a read out of the token ring to see what that looks like. Once again, to make the example easier to follow we manually add a letter representation next to each token number.

$ ccm node1 nodetool ring test_keyspace

Datacenter: datacenter1
==========
Address    Rack   Status  State   Load        Owns    Token                 Token Letter
                                                      8828652533728408318   R
127.0.0.5  rack1  Up      Normal  121.09 KiB  41.44%  -7586808982694641609  A
127.0.0.1  rack1  Up      Normal  126.49 KiB  64.03%  -6737339388913371534  B
127.0.0.2  rack1  Up      Normal  126.04 KiB  66.60%  -5657740186656828604  C
127.0.0.3  rack1  Up      Normal  135.71 KiB  39.89%  -3714593062517416200  D
127.0.0.6  rack1  Up      Normal  126.58 KiB  40.07%  -2697218374613409116  E
127.0.0.1  rack1  Up      Normal  126.49 KiB  64.03%  -1044956249817882006  F
127.0.0.2  rack1  Up      Normal  126.04 KiB  66.60%  -877178609551551982   G
127.0.0.4  rack1  Up      Normal  110.22 KiB  47.96%  -852432543207202252   H
127.0.0.5  rack1  Up      Normal  121.09 KiB  41.44%  117262867395611452    I
127.0.0.6  rack1  Up      Normal  126.58 KiB  40.07%  762725591397791743    J
127.0.0.3  rack1  Up      Normal  135.71 KiB  39.89%  1416289897444876127   K
127.0.0.1  rack1  Up      Normal  126.49 KiB  64.03%  3730403440915368492   L
127.0.0.4  rack1  Up      Normal  110.22 KiB  47.96%  4190414744358754863   M
127.0.0.2  rack1  Up      Normal  126.04 KiB  66.60%  6904945895761639194   N
127.0.0.5  rack1  Up      Normal  121.09 KiB  41.44%  7117770953638238964   O
127.0.0.4  rack1  Up      Normal  110.22 KiB  47.96%  7764578023697676989   P
127.0.0.3  rack1  Up      Normal  135.71 KiB  39.89%  8123167640761197831   Q
127.0.0.6  rack1  Up      Normal  126.58 KiB  40.07%  8828652533728408318   R

As we can see from the “Owns” column above, there are some large token range ownership imbalances. The smallest token range ownership is by node 127.0.0.3 at 39.89%. The largest token range ownership is by node 127.0.0.2 at 66.6%. This is about 26% difference!

Once again, we capture the output of ccm node1 nodetool describering test_keyspace and change the token numbers to the corresponding letters in the above token ring.

$ ccm node1 nodetool describering test_keyspace

Schema Version:4b2dc440-2e7c-33a4-aac6-ffea86cb0e21
TokenRange:
    TokenRange(start_token:J, end_token:K, endpoints:[127.0.0.3, 127.0.0.1, 127.0.0.4], rpc_endpoints:[127.0.0.3, 127.0.0.1, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:K, end_token:L, endpoints:[127.0.0.1, 127.0.0.4, 127.0.0.2], rpc_endpoints:[127.0.0.1, 127.0.0.4, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:E, end_token:F, endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.4], rpc_endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:D, end_token:E, endpoints:[127.0.0.6, 127.0.0.1, 127.0.0.2], rpc_endpoints:[127.0.0.6, 127.0.0.1, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:I, end_token:J, endpoints:[127.0.0.6, 127.0.0.3, 127.0.0.1], rpc_endpoints:[127.0.0.6, 127.0.0.3, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:A, end_token:B, endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.3], rpc_endpoints:[127.0.0.1, 127.0.0.2, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:R, end_token:A, endpoints:[127.0.0.5, 127.0.0.1, 127.0.0.2], rpc_endpoints:[127.0.0.5, 127.0.0.1, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:M, end_token:N, endpoints:[127.0.0.2, 127.0.0.5, 127.0.0.4], rpc_endpoints:[127.0.0.2, 127.0.0.5, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:H, end_token:I, endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.3], rpc_endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:L, end_token:M, endpoints:[127.0.0.4, 127.0.0.2, 127.0.0.5], rpc_endpoints:[127.0.0.4, 127.0.0.2, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:N, end_token:O, endpoints:[127.0.0.5, 127.0.0.4, 127.0.0.3], rpc_endpoints:[127.0.0.5, 127.0.0.4, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:P, end_token:Q, endpoints:[127.0.0.3, 127.0.0.6, 127.0.0.5], rpc_endpoints:[127.0.0.3, 127.0.0.6, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:Q, end_token:R, endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.1], rpc_endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:F, end_token:G, endpoints:[127.0.0.2, 127.0.0.4, 127.0.0.5], rpc_endpoints:[127.0.0.2, 127.0.0.4, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:C, end_token:D, endpoints:[127.0.0.3, 127.0.0.6, 127.0.0.1], rpc_endpoints:[127.0.0.3, 127.0.0.6, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:G, end_token:H, endpoints:[127.0.0.4, 127.0.0.5, 127.0.0.6], rpc_endpoints:[127.0.0.4, 127.0.0.5, 127.0.0.6], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:B, end_token:C, endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.6], rpc_endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.6], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:O, end_token:P, endpoints:[127.0.0.4, 127.0.0.3, 127.0.0.6], rpc_endpoints:[127.0.0.4, 127.0.0.3, 127.0.0.6], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack1)])

Finally, we can determine all the token ranges assigned to each node. The token ranges assigned to each node looked like this:

Six node cluster and three replicas

Using this we can see what happens if we had the same outage as the single token cluster did, that is, node3 and node6 are unavailable. As we can see node3 and node6 are both responsible for tokens C, D, I, J, P, and Q. Hence, data associated with those tokens would be unavailable if our application is using a Consistency Level of LOCAL_QUORUM. To put that in different terms, unlike our single token cluster, in this case 33.3% of our data could no longer be retrieved.

Rack ‘em up

A seasoned Cassandra operator will notice that so far we have run our token distribution tests on clusters with only a single rack. To help increase the availability when using vnodes racks can be deployed. When racks are used Cassandra will try to place single replicas in each rack. That is, it will try to ensure no two identical token ranges appear in the same rack.

The key here is to configure the cluster so that for a given datacenter the number of racks is the same as the replication factor.

Let’s retry our previous example where we set num_tokens to 3, only this time we’ll define three racks in the test cluster. After configuring and starting the nodes in ccm, our newly configured test cluster initially looks like this:

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  71.08 KiB  3       31.8%             49df615d-bfe5-46ce-a8dd-4748c086f639  rack1
UN  127.0.0.2  71.04 KiB  3       34.4%             3fef187e-00f5-476d-b31f-7aa03e9d813c  rack2
UN  127.0.0.3  66.04 KiB  3       37.3%             c6a0a5f4-91f8-4bd1-b814-1efc3dae208f  rack3
UN  127.0.0.4  109.79 KiB  3      52.9%             74ac0727-c03b-476b-8f52-38c154cfc759  rack1
UN  127.0.0.5  66.09 KiB  3       18.7%             5153bad4-07d7-4a24-8066-0189084bbc80  rack2
UN  127.0.0.6  66.09 KiB  3       25.0%             6693214b-a599-4f58-b1b4-a6cf0dd684ba  rack3

We can still see signs that the cluster might be unbalanced. This is a side issue, as the main point to take from the above is that we now have three racks defined in the cluster with two nodes assigned in each. Once again, similar to the single node cluster, we can create the test keyspace and populate it using cqlsh. We then grab a read out of the token ring to see what it looks like. Same as the previous tests, to make the example easier to follow, we manually add a letter representation next to each token number.

ccm node1 nodetool ring test_keyspace


Datacenter: datacenter1
==========
Address    Rack   Status  State   Load        Owns    Token                 Token Letter
                                                      8993942771016137629   R
127.0.0.5  rack2  Up      Normal  122.42 KiB  34.65%  -8459555739932651620  A
127.0.0.4  rack1  Up      Normal  111.07 KiB  53.84%  -8458588239787937390  B
127.0.0.3  rack3  Up      Normal  116.12 KiB  60.72%  -8347996802899210689  C
127.0.0.1  rack1  Up      Normal  121.31 KiB  46.16%  -5712162437894176338  D
127.0.0.4  rack1  Up      Normal  111.07 KiB  53.84%  -2744262056092270718  E
127.0.0.6  rack3  Up      Normal  122.39 KiB  39.28%  -2132400046698162304  F
127.0.0.2  rack2  Up      Normal  121.42 KiB  65.35%  -1232974565497331829  G
127.0.0.4  rack1  Up      Normal  111.07 KiB  53.84%  1026323925278501795   H
127.0.0.2  rack2  Up      Normal  121.42 KiB  65.35%  3093888090255198737   I
127.0.0.2  rack2  Up      Normal  121.42 KiB  65.35%  3596129656253861692   J
127.0.0.3  rack3  Up      Normal  116.12 KiB  60.72%  3674189467337391158   K
127.0.0.5  rack2  Up      Normal  122.42 KiB  34.65%  3846303495312788195   L
127.0.0.1  rack1  Up      Normal  121.31 KiB  46.16%  4699181476441710984   M
127.0.0.1  rack1  Up      Normal  121.31 KiB  46.16%  6795515568417945696   N
127.0.0.3  rack3  Up      Normal  116.12 KiB  60.72%  7964270297230943708   O
127.0.0.5  rack2  Up      Normal  122.42 KiB  34.65%  8105847793464083809   P
127.0.0.6  rack3  Up      Normal  122.39 KiB  39.28%  8813162133522758143   Q
127.0.0.6  rack3  Up      Normal  122.39 KiB  39.28%  8993942771016137629   R

Once again we capture the output of ccm node1 nodetool describering test_keyspace and change the token numbers to the corresponding letters in the above token ring.

$ ccm node1 nodetool describering test_keyspace

Schema Version:aff03498-f4c1-3be1-b133-25503becf208
TokenRange:
    TokenRange(start_token:B, end_token:C, endpoints:[127.0.0.3, 127.0.0.1, 127.0.0.2], rpc_endpoints:[127.0.0.3, 127.0.0.1, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:L, end_token:M, endpoints:[127.0.0.1, 127.0.0.3, 127.0.0.5], rpc_endpoints:[127.0.0.1, 127.0.0.3, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:N, end_token:O, endpoints:[127.0.0.3, 127.0.0.5, 127.0.0.4], rpc_endpoints:[127.0.0.3, 127.0.0.5, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:P, end_token:Q, endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.4], rpc_endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:K, end_token:L, endpoints:[127.0.0.5, 127.0.0.1, 127.0.0.3], rpc_endpoints:[127.0.0.5, 127.0.0.1, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3)])
    TokenRange(start_token:R, end_token:A, endpoints:[127.0.0.5, 127.0.0.4, 127.0.0.3], rpc_endpoints:[127.0.0.5, 127.0.0.4, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3)])
    TokenRange(start_token:I, end_token:J, endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.1], rpc_endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:Q, end_token:R, endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.4], rpc_endpoints:[127.0.0.6, 127.0.0.5, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:E, end_token:F, endpoints:[127.0.0.6, 127.0.0.2, 127.0.0.4], rpc_endpoints:[127.0.0.6, 127.0.0.2, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:H, end_token:I, endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.1], rpc_endpoints:[127.0.0.2, 127.0.0.3, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:D, end_token:E, endpoints:[127.0.0.4, 127.0.0.6, 127.0.0.2], rpc_endpoints:[127.0.0.4, 127.0.0.6, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:A, end_token:B, endpoints:[127.0.0.4, 127.0.0.3, 127.0.0.2], rpc_endpoints:[127.0.0.4, 127.0.0.3, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:C, end_token:D, endpoints:[127.0.0.1, 127.0.0.6, 127.0.0.2], rpc_endpoints:[127.0.0.1, 127.0.0.6, 127.0.0.2], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2)])
    TokenRange(start_token:F, end_token:G, endpoints:[127.0.0.2, 127.0.0.4, 127.0.0.3], rpc_endpoints:[127.0.0.2, 127.0.0.4, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3)])
    TokenRange(start_token:O, end_token:P, endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.4], rpc_endpoints:[127.0.0.5, 127.0.0.6, 127.0.0.4], endpoint_details:[EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.6, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:J, end_token:K, endpoints:[127.0.0.3, 127.0.0.5, 127.0.0.1], rpc_endpoints:[127.0.0.3, 127.0.0.5, 127.0.0.1], endpoint_details:[EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1)])
    TokenRange(start_token:G, end_token:H, endpoints:[127.0.0.4, 127.0.0.2, 127.0.0.3], rpc_endpoints:[127.0.0.4, 127.0.0.2, 127.0.0.3], endpoint_details:[EndpointDetails(host:127.0.0.4, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.2, datacenter:datacenter1, rack:rack2), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3)])
    TokenRange(start_token:M, end_token:N, endpoints:[127.0.0.1, 127.0.0.3, 127.0.0.5], rpc_endpoints:[127.0.0.1, 127.0.0.3, 127.0.0.5], endpoint_details:[EndpointDetails(host:127.0.0.1, datacenter:datacenter1, rack:rack1), EndpointDetails(host:127.0.0.3, datacenter:datacenter1, rack:rack3), EndpointDetails(host:127.0.0.5, datacenter:datacenter1, rack:rack2)])

Lastly, we once again determine all the token ranges assigned to each node:

Six node cluster and three replicas

As we can see from the way Cassandra has assigned the tokens, there is now a complete data replica spread across two nodes in each of our three racks. If we go back to our failure scenario where node3 and node6 become unavailable, we can still service queries using a Consistency Level of LOCAL_QUORUM. The only elephant in the room here is node3 has a lot more tokens distributed to it than other nodes. Its counterpart in the same rack, node6, is at the opposite end with fewer tokens allocated to it.

Too many vnodes spoil the cluster

Given the token distribution issues with a low numbers of vnodes, one would think the best option is to have a large vnode value. However, apart from having a higher chance of some data being unavailable in a multi-node outage, large vnode values also impact streaming operations. To repair data on a node, Cassandra will start one repair session per vnode. These repair sessions need to be processed sequentially. Hence, the larger the vnode value the longer the repair times, and the overhead needed to run a repair.

In an effort to fix slow repair times as a result of large vnode values, CASSANDRA-5220 was introduced in 3.0. This change allows Cassandra to group common token ranges for a set of nodes into a single repair session. It increased the size of the repair session as multiple token ranges were being repaired, but reduced the number of repair sessions being executed in parallel.

We can see the effect that vnodes have on repair by running a simple test on a cluster backed by real hardware. To do this test we first need create a cluster that uses single tokens run a repair. Then we can create the same cluster except with 256 vnodes, and run the same repair. We will use tlp-cluster to create a Cassandra cluster in AWS with the following properties.

  • Instance size: i3.2xlarge
  • Node count: 12
  • Rack count: 3 (4 nodes per rack)
  • Cassandra version: 3.11.9 (latest stable release at the time of writing)

The commands to build this cluster are as follows.

$ tlp-cluster init --azs a,b,c --cassandra 12 --instance i3.2xlarge --stress 1 TLP BLOG "Blogpost repair testing"
$ tlp-cluster up
$ tlp-cluster use --config "cluster_name:SingleToken" --config "num_tokens:1" 3.11.9
$ tlp-cluster install

Once we provision the hardware we set the initial_token property for each of the nodes individually. We can calculate the initial tokens for each node using a simple Python command.

Python 2.7.16 (default, Nov 23 2020, 08:01:20)
[GCC Apple LLVM 12.0.0 (clang-1200.0.30.4) [+internal-os, ptrauth-isa=sign+stri on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> num_tokens = 1
>>> num_nodes = 12
>>> print("\n".join(['[Node {}] initial_token: {}'.format(n + 1, ','.join([str(((2**64 / (num_tokens * num_nodes)) * (t * num_nodes + n)) - 2**63) for t in range(num_tokens)])) for n in range(num_nodes)]))
[Node 1] initial_token: -9223372036854775808
[Node 2] initial_token: -7686143364045646507
[Node 3] initial_token: -6148914691236517206
[Node 4] initial_token: -4611686018427387905
[Node 5] initial_token: -3074457345618258604
[Node 6] initial_token: -1537228672809129303
[Node 7] initial_token: -2
[Node 8] initial_token: 1537228672809129299
[Node 9] initial_token: 3074457345618258600
[Node 10] initial_token: 4611686018427387901
[Node 11] initial_token: 6148914691236517202
[Node 12] initial_token: 7686143364045646503

After starting Cassandra on all the nodes, around 3 GB of data per node can be preloaded using the following tlp-stress command. In this command we set our keyspace replication factor to 3 and set gc_grace_seconds to 0. This is done to make hints expire immediately when they are created, which ensures they are never delivered to the destination node.

ubuntu@ip-172-31-19-180:~$ tlp-stress run KeyValue --replication "{'class': 'NetworkTopologyStrategy', 'us-west-2':3 }" --cql "ALTER TABLE tlp_stress.keyvalue WITH gc_grace_seconds = 0" --reads 1 --partitions 100M --populate 100M --iterations 1

Upon completion of the data loading, the cluster status looks like this.

ubuntu@ip-172-31-30-95:~$ nodetool status
Datacenter: us-west-2
=====================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.30.95   2.78 GiB   1            25.0%             6640c7b9-c026-4496-9001-9d79bea7e8e5  2a
UN  172.31.31.106  2.79 GiB   1            25.0%             ceaf9d56-3a62-40be-bfeb-79a7f7ade402  2a
UN  172.31.2.74    2.78 GiB   1            25.0%             4a90b071-830e-4dfe-9d9d-ab4674be3507  2c
UN  172.31.39.56   2.79 GiB   1            25.0%             37fd3fe0-598b-428f-a84b-c27fc65ee7d5  2b
UN  172.31.31.184  2.78 GiB   1            25.0%             40b4e538-476a-4f20-a012-022b10f257e9  2a
UN  172.31.10.87   2.79 GiB   1            25.0%             fdccabef-53a9-475b-9131-b73c9f08a180  2c
UN  172.31.18.118  2.79 GiB   1            25.0%             b41ab8fe-45e7-4628-94f0-a4ec3d21f8d0  2a
UN  172.31.35.4    2.79 GiB   1            25.0%             246bf6d8-8deb-42fe-bd11-05cca8f880d7  2b
UN  172.31.40.147  2.79 GiB   1            25.0%             bdd3dd61-bb6a-4849-a7a6-b60a2b8499f6  2b
UN  172.31.13.226  2.79 GiB   1            25.0%             d0389979-c38f-41e5-9836-5a7539b3d757  2c
UN  172.31.5.192   2.79 GiB   1            25.0%             b0031ef9-de9f-4044-a530-ffc67288ebb6  2c
UN  172.31.33.0    2.79 GiB   1            25.0%             da612776-4018-4cb7-afd5-79758a7b9cf8  2b

We can then run a full repair on each node using the following commands.

$ source env.sh
$ c_all "nodetool repair -full tlp_stress"

The repair times recorded for each node were.

[2021-01-22 20:20:13,952] Repair command #1 finished in 3 minutes 55 seconds
[2021-01-22 20:23:57,053] Repair command #1 finished in 3 minutes 36 seconds
[2021-01-22 20:27:42,123] Repair command #1 finished in 3 minutes 32 seconds
[2021-01-22 20:30:57,654] Repair command #1 finished in 3 minutes 21 seconds
[2021-01-22 20:34:27,740] Repair command #1 finished in 3 minutes 17 seconds
[2021-01-22 20:37:40,449] Repair command #1 finished in 3 minutes 23 seconds
[2021-01-22 20:41:32,391] Repair command #1 finished in 3 minutes 36 seconds
[2021-01-22 20:44:52,917] Repair command #1 finished in 3 minutes 25 seconds
[2021-01-22 20:47:57,729] Repair command #1 finished in 2 minutes 58 seconds
[2021-01-22 20:49:58,868] Repair command #1 finished in 1 minute 58 seconds
[2021-01-22 20:51:58,724] Repair command #1 finished in 1 minute 53 seconds
[2021-01-22 20:54:01,100] Repair command #1 finished in 1 minute 50 seconds

These times give us a total repair time of 36 minutes and 44 seconds.

The same cluster can be reused to test repair times when 256 vnodes are used. To do this we execute the following steps.

  • Shut down Cassandra on all the nodes.
  • Delete the contents in each of the directories data, commitlog, hints, and saved_caches (these are located in /var/lib/cassandra/ on each node).
  • Set num_tokens in the cassandra.yaml configuration file to a value of 256 and remove the initial_token setting.
  • Start up Cassandra on all the nodes.

After populating the cluster with data its status looked like this.

ubuntu@ip-172-31-30-95:~$ nodetool status
Datacenter: us-west-2
=====================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.30.95   2.79 GiB   256          24.3%             10b0a8b5-aaa6-4528-9d14-65887a9b0b9c  2a
UN  172.31.2.74    2.81 GiB   256          24.4%             a748964d-0460-4f86-907d-a78edae2a2cb  2c
UN  172.31.31.106  3.1 GiB    256          26.4%             1fc68fbd-335d-4689-83b9-d62cca25c88a  2a
UN  172.31.31.184  2.78 GiB   256          23.9%             8a1b25e7-d2d8-4471-aa76-941c2556cc30  2a
UN  172.31.39.56   2.73 GiB   256          23.5%             3642a964-5d21-44f9-b330-74c03e017943  2b
UN  172.31.10.87   2.95 GiB   256          25.4%             540a38f5-ad05-4636-8768-241d85d88107  2c
UN  172.31.18.118  2.99 GiB   256          25.4%             41b9f16e-6e71-4631-9794-9321a6e875bd  2a
UN  172.31.35.4    2.96 GiB   256          25.6%             7f62d7fd-b9c2-46cf-89a1-83155feebb70  2b
UN  172.31.40.147  3.26 GiB   256          27.4%             e17fd867-2221-4fb5-99ec-5b33981a05ef  2b
UN  172.31.13.226  2.91 GiB   256          25.0%             4ef69969-d9fe-4336-9618-359877c4b570  2c
UN  172.31.33.0    2.74 GiB   256          23.6%             298ab053-0c29-44ab-8a0a-8dde03b4f125  2b
UN  172.31.5.192   2.93 GiB   256          25.2%             7c690640-24df-4345-aef3-dacd6643d6c0  2c

When we run the same repair test for the single token cluster on the vnode cluster, the following repair times were recorded.

[2021-01-22 22:45:56,689] Repair command #1 finished in 4 minutes 40 seconds
[2021-01-22 22:50:09,170] Repair command #1 finished in 4 minutes 6 seconds
[2021-01-22 22:54:04,820] Repair command #1 finished in 3 minutes 43 seconds
[2021-01-22 22:57:26,193] Repair command #1 finished in 3 minutes 27 seconds
[2021-01-22 23:01:23,554] Repair command #1 finished in 3 minutes 44 seconds
[2021-01-22 23:04:40,523] Repair command #1 finished in 3 minutes 27 seconds
[2021-01-22 23:08:20,231] Repair command #1 finished in 3 minutes 23 seconds
[2021-01-22 23:11:01,230] Repair command #1 finished in 2 minutes 45 seconds
[2021-01-22 23:13:48,682] Repair command #1 finished in 2 minutes 40 seconds
[2021-01-22 23:16:23,630] Repair command #1 finished in 2 minutes 32 seconds
[2021-01-22 23:18:56,786] Repair command #1 finished in 2 minutes 26 seconds
[2021-01-22 23:21:38,961] Repair command #1 finished in 2 minutes 30 seconds

These times give us a total repair time of 39 minutes and 23 seconds.

While the time difference is quite small for 3 GB of data per node (up to an additional 45 seconds per node), it is easy to see how the difference could balloon out when we have data sizes in the order of hundreds of gigabytes per node.

Unfortunately, all data streaming operations like bootstrap and datacenter rebuild fall victim to the same issue repairs have with large vnode values. Specifically, when a node needs to stream data to another node a streaming session is opened for each token range on the node. This results in a lot of unnecessary overhead, as data is transferred via the JVM.

Secondary indexes impacted too

To add insult to injury, the negative effect of a large vnode values extends to secondary indexes because of the way the read path works.

When a coordinator node receives a secondary index request from a client, it fans out the request to all the nodes in the cluster or datacenter depending on the locality of the consistency level. Each node then checks the SSTables for each of the token ranges assigned to it for a match to the secondary index query. Matches to the query are then returned to the coordinator node.

Hence, the larger the number of vnodes, the larger the impact to the responsiveness of the secondary index query. Furthermore, the performance impacts on secondary indexes grow exponentially with the number of replicas in the cluster. In a scenario where multiple datacenters have nodes using many vnodes, secondary indexes become even more inefficient.

A new hope

So what we are left with then is a property in Cassandra that really hits the mark in terms of reducing the complexities when resizing a cluster. Unfortunately, their benefits come at the expense of unbalanced token ranges on one end, and degraded operations performance at the other. That being said, the vnodes story is far from over.

Eventually, it became a well-known fact in the Apache Cassandra project that large vnode values had undesirable side effects on a cluster. To combat this issue, clever contributors and committers added CASSANDRA-7032 in 3.0; a replica aware token allocation algorithm. The idea was to allow a low value to be used for num_tokens while maintaining relatively even balanced token ranges. The enhancement includes the addition of the allocate_tokens_for_keyspace setting in the cassandra.yaml file. The new algorithm is used instead of the random token allocator when an existing user keyspace is assigned to the allocate_tokens_for_keyspace setting.

Behind the scenes, Cassandra takes the replication factor of the defined keyspace and uses it when calculating the token values for the node when it first enters the cluster. Unlike the random token generator, the replica aware generator is like an experienced member of a symphony orchestra; sophisticated and in tune with its surroundings. So much so, that the process it uses to generate token ranges involves:

  • Constructing an initial token ring state.
  • Computing candidates for new tokens by splitting all existing token ranges right in the middle.
  • Evaluating the expected improvements from all candidates and forming a priority queue.
  • Iterating through the candidates in the queue and selecting the best combination.
    • During token selection, re-evaluate the candidate improvements in the queue.

While this was good advancement for Cassandra, there are a few gotchas to watch out for when using the replica aware token allocation algorithm. To start with, it only works with the Murmur3Partitioner partitioner. If you started with an old cluster that used another partitioner such as the RandomPartitioner and have upgraded over time to 3.0, the feature is unusable. The second and more common stumbling block is that some trickery is required to use this feature when creating a cluster from scratch. The question was common enough that we wrote a blog post specifically on how to use the new replica aware token allocation algorithm to set up a new cluster with even token distribution.

As you can see, Cassandra 3.0 made a genuine effort to address vnode’s rough edges. What’s more, there are additional beacons of light on the horizon with the upcoming Cassandra 4.0 major release. For instance, a new allocate_tokens_for_local_replication_factor setting has been added to the cassandra.yaml file via CASSANDRA-15260. Similar to its cousin the allocate_tokens_for_keyspace setting, the replica aware token allocation algorithm is activated when a value is supplied to it.

However, unlike its close relative, it is more user-friendly. This is because no phaffing is required to create a balanced cluster from scratch. In the simplest case, you can set a value for the allocate_tokens_for_local_replication_factor setting and just start adding nodes. Advanced operators can still manually assign tokens to the initial nodes to ensure the desired replication factor is met. After that, subsequent nodes can be added with the replication factor value assigned to the allocate_tokens_for_local_replication_factor setting.

Arguably, one of the longest time coming and significant changes to be released with Cassandra 4.0 is the update to the default value of the num_tokens setting. As mentioned at the beginning of this post thanks to CASSANDRA-13701 Cassandra 4.0 will ship with a num_tokens value set to 16 in the cassandra.yaml file. In addition, the allocate_tokens_for_local_replication_factor setting is enabled by default and set to a value of 3.

These changes are much better user defaults. On a vanilla installation of Cassandra 4.0, the replica aware token allocation algorithm kicks in as soon as there are enough hosts to satisfy a replication factor of 3. The result is an evenly distributed token ranges for new nodes with all the benefits that a low vnodes value has to offer.

Conclusion

The consistent hashing and token allocation functionality form part of Cassandra’s backbone. Virtual nodes take the guess work out of maintaining this critical functionality, specifically, making cluster resizing quicker and easier. As a rule of thumb, the lower the number of vnodes, the less even the token distribution will be, leading to some nodes being over worked. Alternatively, the higher the number of vnodes, the slower cluster wide operations take to complete and more likely data will be unavailable if multiple nodes are down. The features in 3.0 and the enhancements to those features thanks to 4.0, allow Cassandra to use a low number of vnodes while still maintaining a relatively even token distribution. Ultimately, it will produce a better out-of-the-box experience for new users when running a vanilla installation of Cassandra 4.0.

Get Rid of Read Repair Chance

Apache Cassandra has a feature called Read Repair Chance that we always recommend our clients to disable. It is often an additional ~20% internal read load cost on your cluster that serves little purpose and provides no guarantees.

What is read repair chance?

The feature comes with two schema options at the table level: read_repair_chance and dclocal_read_repair_chance. Each representing the probability that the coordinator node will query the extra replica nodes, beyond the requested consistency level, for the purpose of read repairs.

The original setting read_repair_chance now defines the probability of issuing the extra queries to all replicas in all data centers. And the newer dclocal_read_repair_chance setting defines the probability of issuing the extra queries to all replicas within the current data center.

The default values are read_repair_chance = 0.0 and dclocal_read_repair_chance = 0.1. This means that cross-datacenter asynchronous read repair is disabled and asynchronous read repair within the datacenter occurs on 10% of read requests.

What does it cost?

Consider the following cluster deployment:

  • A keyspace with a replication factor of three (RF=3) in a single data center
  • The default value of dclocal_read_repair_chance = 0.1
  • Client reads using a consistency level of LOCAL_QUORUM
  • Client is using the token aware policy (default for most drivers)

In this setup, the cluster is going to see ~10% of the read requests result in the coordinator issuing two messaging system queries to two replicas, instead of just one. This results in an additional ~5% load.

If the requested consistency level is LOCAL_ONE, which is the default for the java-driver, then ~10% of the read requests result in the coordinator increasing messaging system queries from zero to two. This equates to a ~20% read load increase.

With read_repair_chance = 0.1 and multiple datacenters the situation is much worse. With three data centers each with RF=3, then 10% of the read requests will result in the coordinator issuing eight extra replica queries. And six of those extra replica queries are now via cross-datacenter queries. In this use-case it becomes a doubling of your read load.

Let’s take a look at this with some flamegraphs…

The first flamegraph shows the default configuration of dclocal_read_repair_chance = 0.1. When the coordinator’s code hits the AbstractReadExecutor.getReadExecutor(..) method, it splits paths depending on the ReadRepairDecision for the table. Stack traces containing either AlwaysSpeculatingReadExecutor, SpeculatingReadExecutor or NeverSpeculatingReadExecutor provide us a hint to which code path we are on, and whether either the read repair chance or speculative retry are in play.

Async Read Repairs in the default configuration

The second flamegraph shows the behaviour when the configuration has been changed to dclocal_read_repair_chance = 0.0. The AlwaysSpeculatingReadExecutor flame is gone and this demonstrates the degree of complexity removed from runtime. Specifically, read requests from the client are now forwarded to every replica instead of only those defined by the consistency level.

No Read Repairs

ℹ️   These flamegraphs were created with Apache Cassandra 3.11.9, Kubernetes and the cass-operator, nosqlbench and the async-profiler.

Previously we relied upon the existing tools of tlp-cluster, ccm, tlp-stress and cassandra-stress. This new approach with new tools is remarkably easy, and by using k8s the same approach can be used locally or against a dedicated k8s infrastructure. That is, I don't need to switch between ccm clusters for local testing and tlp-cluster for cloud testing. The same recipe applies everywhere. Nor am I bound to AWS for my cloud testing. It is also worth mentioning that these new tools are gaining a lot of focus and momentum from DataStax, so the introduction of this new approach to the open source community is deserved.

The full approach and recipe to generating these flamegraphs will follow in a [subsequent blog post](/blog/2021/01/31/cassandra_and_kubernetes_cass_operator.html).

What is the benefit of this additional load?

The coordinator returns the result to the client once it has received the response from one of the replicas, per the user’s requested consistency level. This is why we call the feature asynchronous read repairs. This means that read latencies are not directly impacted though the additional background load will indirectly impact latencies.

Asynchronous read repairs also means that there’s no guarantee that the response to the client is repaired data. In summary, 10% of the data you read will be guaranteed to be repaired after you have read it. This is not a guarantee clients can use or rely upon. And it is not a guarantee Cassandra operators can rely upon to ensure data at rest is consistent. In fact it is not a guarantee an operator would want to rely upon anyway, as most inconsistencies are dealt with by hints and nodes down longer than the hint window are expected to be manually repaired.

Furthermore, systems that use strong consistency (i.e. where reads and writes are using quorum consistency levels) will not expose such unrepaired data anyway. Such systems only need repairs and consistent data on disk for lower read latencies (by avoiding the additional digest mismatch round trip between coordinator and replicas) and ensuring deleted data is not resurrected (i.e. tombstones are properly propagated).

So the feature gives us additional load for no usable benefit. This is why disabling the feature is always an immediate recommendation we give everyone.

It is also the rationale for the feature being removed altogether in the next major release, Cassandra version 4.0. And, since 3.0.17 and 3.11.3, if you still have values set for these properties in your table, you may have noticed the following warning during startup:

dclocal_read_repair_chance table option has been deprecated and will be removed in version 4.0

Get Rid of It

For Cassandra clusters not yet on version 4.0, do the following to disable all asynchronous read repairs:

cqlsh -e 'ALTER TABLE <keyspace_name>.<table_name> WITH read_repair_chance = 0.0 AND dclocal_read_repair_chance = 0.0;'

When upgrading to Cassandra 4.0 no action is required, these settings are ignored and disappear.

Renaming and reshaping Scylla tables using scylla-migrator

We have recently faced a problem where some of the first Scylla tables we created on our main production cluster were not in line any more with the evolved s...

Python scylla-driver: how we unleashed the Scylla monster's performance

At Scylla summit 2019 I had the chance to meet Israel Fruchter and we dreamed of working on adding **shard...

Scylla Summit 2019

I've had the pleasure to attend again and present at the Scylla Summit in San Francisco and the honor to be awarded the...

Scylla: four ways to optimize your disk space consumption

We recently had to face free disk space outages on some of our scylla clusters and we learnt some very interesting things while outlining some improvements t...

Scylla Summit 2018 write-up

It's been almost one month since I had the chance to attend and speak at Scylla Summit 2018 so I'm reliev...

Authenticating and connecting to a SSL enabled Scylla cluster using Spark 2

This quick article is a wrap up for reference on how to connect to ScyllaDB using Spark 2 when authentication and SSL are enforced for the clients on the...

A botspot story

I felt like sharing a recent story that allowed us identify a bot in a haystack thanks to Scylla.

...

Evaluating ScyllaDB for production 2/2

In my previous blog post, I shared [7 lessons on our experience in evaluating Scylla](https://www.ultrabug.fr...

Evaluating ScyllaDB for production 1/2

I have recently been conducting a quite deep evaluation of ScyllaDB to find out if we could benefit from this database in some of...

Step by Step Guide to Installing and Configuring Spark 2.0 to Connect with Cassandra 3.x

In this guide, we will be installing Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program. If you have any of these software packages installed and configured already you can skip that step. This guide assumes you have a Cassandra 3.x cluster that is already up and running. For more information on installing and using Cassandra visit http://cassandra.apache.org/.Note: The following steps should be performed on all the nodes in the cluster unless otherwise noted.

Install Scala 2.11

Ensure you have Java installed

$ java -version java version "1.8.0_91" Java(TM) SE Runtime Environment (build 1.8.0_91-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

If you don't have Java installed follow this 
tutorial to get Java 8 installed.
Install Scala 2.11.8

$ wget www.scala-lang.org/files/archive/scala-2.11.8.deb $ sudo dpkg -i scala-2.11.8.deb $ scala -version

Install SBT 0.13


echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list $ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823 $ sudo apt-get update $ sudo apt-get install sbt

Install Spark 2.0

Download Spark 2.0 from https://spark.apache.org/downloads.html and unpack the tar file

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz $ tar zxf spark-2.0.2-bin-hadoop2.7.tgz $ sudo mv spark-2.0.2-bin-hadoop2.7/ /usr/local/spark/

Update system variables

$ sudo nano /etc/environment

Add an environment variable called SPARK_HOME

export SPARK_HOME=/usr/local/spark

At the end of the PATH variable add
$SPARK_HOME/bin

PATH="<previous_entries>:/usr/local/spark/bin"

Refresh the environment

source /etc/environment

Create spark user and make it the owner of the SPARK_HOME
directory

$ sudo adduser spark --system --home /usr/local/spark/ --disabled-password $ sudo chown -R spark:root /usr/local/spark

Create Log and Pid directories

$ sudo mkdir /var/log/spark $ sudo chown spark:root /var/log/spark $ sudo -u spark mkdir $SPARK_HOME/run

Create the Spark Configuration files

Create the Spark Configuration files by copying the templates

$ sudo cp /usr/local/spark/conf/spark-env.sh.template /usr/local/spark/conf/spark-env.sh $ sudo cp /usr/local/spark/conf/spark-defaults.conf.template /usr/local/spark/conf/spark-defaults.conf $ sudo chown spark:root /usr/local/spark/conf/spark-*

Edit the Spark Environment file spark-env.sh

export SPARK_LOG_DIR=/var/log/spark export SPARK_PID_DIR=${SPARK_HOME}/run

Configure Spark nodes to join cluster

If you will not be managing Spark using the Mesos or YARN cluster managers, you'll be running Spark in what is called "Standalone Mode".
In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then on every worker node you must edit the /etc/spark/conf/spark-env.sh to point to the host where the Spark Master runs.

# Options for the daemons used in the standalone deploy mode export SPARK_MASTER_HOST=<spark_master_ip_or_hostname_here>

You can also change other elements of the default configuration by
editing the /etc/spark/conf/spark-env.sh. Some other
configs to consider are:
  • SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
  • SPARK_WORKER_CORES, to set the number of cores to use on this machine
  • SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
  • SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
  • SPARK_WORKER_INSTANCE, to set the number of worker processes per node
  • SPARK_WORKER_DIR, to set the working directory of worker processes

Installing Spark as a service

Run the following commands to create a service for the spark-master and spark-worker

$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-master $ sudo chmod 0755 /etc/init.d/spark-master $ sudo cp /etc/init.d/skeleton /etc/init.d/spark-worker $ sudo chmod 0755 /etc/init.d/spark-worker $ sudo update-rc.d spark-master defaults 99 $ sudo update-rc.d spark-worker defaults 99

Edit the /etc/init.d/spark-worker file. If a variable
or function already exists then replace it with the text below.

DESC="Spark Worker" NAME=spark-worker SPARK_HOME=/usr/local/spark PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.worker.Worker-1.pid export SPARK_HOME

# Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0

if [ -f $SPARK_HOME/conf/spark-env.sh ];then         . $SPARK_HOME/conf/spark-env.sh else         echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi

# # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() {     CMD_PATT="org.apache.spark.deploy.worker.Worker"     if [ -f $PIDFILE ]; then         pid=`cat $PIDFILE`         grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0         return 1     fi     return 3 }

# # Function that starts the daemon/service # do_start() {         # Return         #   0 if daemon has been started         #   1 if daemon was already running         #   2 if daemon could not be started

        [ -e `dirname "$PIDFILE"` ] || \                 install -d -ospark -groot -m755 `dirname $PIDFILE`

        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE  \                 --exec $SPARK_HOME/sbin/start-slave.sh  \                 --test > /dev/null \                 || return 1         start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \                 --exec $SPARK_HOME/sbin/start-slave.sh -- spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \                 || return 2

}

# # Function that stops the daemon/service # do_stop() {         start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE         RETVAL="$?"         rm -f $PIDFILE         return "$RETVAL" }

# # Function that sends a SIGHUP to the daemon/service # do_reload() {         #         # If the daemon can reload its configuration without         # restarting (for example, when it is sent a SIGHUP),         # then implement that here.         #         start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE         return 0 }

... status)       is_running       stat=$?       case "$stat" in               0) log_success_msg "$DESC is running" ;;               1) log_failure_msg "could not access pidfile for $DESC" ;;               *) log_success_msg "$DESC is not running" ;;       esac       exit "$stat"       ;; ...

Edit the /etc/init.d/spark-master file. If a variable
or function already exists then replace it with the text below.

DESC="Spark Master" NAME=spark-master SPARK_HOME=/usr/local/spark PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.master.Master-1.pid export SPARK_HOME

# Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0

if [ -f $SPARK_HOME/conf/spark-env.sh ];then         . $SPARK_HOME/conf/spark-env.sh else         echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi

# # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() {     CMD_PATT="org.apache.spark.deploy.master.Master"     if [ -f $PIDFILE ]; then         pid=`cat $PIDFILE`         grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0         return 1     fi     return 3 }

# # Function that starts the daemon/service # do_start() {         # Return         #   0 if daemon has been started         #   1 if daemon was already running         #   2 if daemon could not be started

        [ -e `dirname "$PIDFILE"` ] || \                 install -d -ospark -groot -m755 `dirname $PIDFILE`

        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh --test > /$                 || return 1         start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh  \                 || return 2 }

# # Function that stops the daemon/service # do_stop() {         start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE

        RETVAL="$?"         rm -f $PIDFILE         return "$RETVAL" }

# # Function that sends a SIGHUP to the daemon/service # do_reload() {         #         # If the daemon can reload its configuration without         # restarting (for example, when it is sent a SIGHUP),         # then implement that here.         #         start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE         return 0 }

... status)       is_running       stat=$?       case "$stat" in               0) log_success_msg "$DESC is running" ;;               1) log_failure_msg "could not access pidfile for $DESC" ;;               *) log_success_msg "$DESC is not running" ;;       esac       exit "$stat"       ;; ...

Running Spark as a service

Start the Spark master node first. On whichever node you've selected to be master, run:

$ sudo service spark-master start

On all the other nodes, start the workers:

$ sudo service spark-worker start

To stop Spark, run the following commands on the appropriate
nodes

$ sudo service spark-worker stop $ sudo service spark-master stop

Service logs will be stored in /var/log/spark.

Testing the Spark service

To test the Spark service, start spark-shell on one of the nodes.

$ spark-shell --master spark://<IP>:<Port>

When the prompt comes up, execute the following line of code:

$ scala> sc.parallelize( 1 to 1000 ).sum()

Get Spark-Cassandra-Connector on the client

The spark-cassandra-connector is a Scala library that exposes Cassandra tables as Spark RDDs, lets you write Spark RDDs to Cassandra tables, and allows you to execute arbitrary computations and CQL queries that are distributed to the Cassandra nodes holding the data, which allows them to be fast. Your code + the spark-cassandra-connector and all dependencies are packaged up and sent to the Spark nodes.
If you are writing ad-hoc queries / computations from the spark-shell. Start up the shell by running the command:

$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11

The --packages option downloads the connector and all
of its dependencies from the Spark Packages site and places it in
the path of the Spark Driver and all Spark Executors.
If you are writing a Scala application; configure a new Scala project. Your build.sbt file should look something like this:

name := "MySparkProject"

version := "1.0"

scalaVersion := "2.11.8"

val sparkVersion = "2.0.2"

resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq(   "org.apache.spark"      % "spark-core_2.11"  % sparkVersion,   "org.apache.spark"      % "spark-sql_2.11"   % sparkVersion,   "datastax"              % "spark-cassandra-connector" % "2.0.0-M2-s_2.11" )

Testing the connector

To start out, create a simple keyspace and table in Cassandra. Run the following statements in cqlsh:

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE test.kv(key text PRIMARY KEY, value int);

Then insert some example data:

INSERT INTO test.kv(key, value) VALUES ('key1', 1); INSERT INTO test.kv(key, value) VALUES ('key2', 2);

For this test, we'll use the spark-shell.

$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11


// import the spark connector namespace import com.datastax.spark.connector._

// read Cassandra data into an RDD val rdd = sc.cassandraTable("test", "kv") println(rdd.count) println(rdd.first) println(rdd.map(_.getInt("value")).sum)  

// Add two more rows to the table val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))
                    

Diagnosing and Fixing Cassandra Timeouts

We recently brought Cassandra into our infrastructure to take advantage of its robust built-in scalability model. After reading all the "marketing" material, watching/attending talks by evangelists, and reading blog posts by experts and enthusiasts we were excited to embark on our Cassandra journey.

via GIPHY


Everything was honky dory in the early going. We followed data modeling best practices, provisioned the right sized servers as recommended by DataStax, and saw great results in the test environment. The issues didn't start until we started moving over portions of our production traffic to our Cassandra production servers. That's when we noticed that we would fairly frequently get read and write timeouts as the volume of traffic got higher.


via GIPHY

My first troubleshooting step was to find out was to monitor the servers to see if there were any anomalies or resource constraints that led to the timeouts. I'm a fan of Brendan Gregg's USE method for monitoring overall system health and identifying errors and bottlenecks. However, at the time we didn't have a comprehensive monitoring solution like Datadog or Icinga2 setup for those servers. So in its place, I wrote a shell script that every 30 seconds would log memory and CPU utilization, disk I/O, network I/O, Cassandra thread pool stats (nodetool tpstats), GC stats, compaction stats, and the table stats for my most active tables. To my chagrin, my hacked together monitor did not reveal any anomalies or resource constraints that were the cause of the timeouts. On the contrary, it revealed that resource utilization (with the exception of CPU which was a bit high) was well under the hardware limits. I did notice that a number of times, a query timeout would coincide with the start of SSTable compaction which led down the rabbit hole of compaction tweaks, however I emerged from that hole with no solution to the Cassandra timeouts.


via GIPHY


Once hardware constraints had been ruled out, I kicked off my next round of troubleshooting efforts by turning on trace probability and looking at the trace for long running queries. In the traces I noticed high queue times, which is not unexpected during times of high load, but I also noticed something else. There were a number of times when a worker node received the request from the coordinator, completed the read operation for the query, queued up the result to be sent back to the coordinator, but the results were never sent and the coordinator timed out waiting for the response. Bingo! I had found the cause of the timeout! But I still hadn't found the root cause; why were the responses not getting sent back? And I didn't know how to fix it either. I spent a good amount of time looking into tweaking Cassandra thread pools (like the RequestResponseStage pool), reading the Cassandra source code on GitHub and re-verifying network traffic/congestion between the nodes during high loads. But those efforts yielded no solution to the query timeouts which led me to the third round of troubleshooting efforts.


via GIPHY


For this round of troubleshooting I turned up the logging level on the org.apache.cassandra logger to DEBUG and I live tailed both the Cassandra system.log and debug.log. That's when I noticed something weird; several times, one of the nodes will detect the other as being down for a period of time. When I looked at the system logs and Linux process activity in htop for the supposedly downed node, it appeared that the node was up and running. This lead me to take a fine-grained look at the system.log for the downed node and in there I noticed a pretty long GC pause. So I flipped over to the gc.log, examined it, and noticed even more long GC pauses. I also noticed that at times there were 35 second pauses that would occur in succession. Those super long GC pauses cause the node to appear down to its peers. It also explained cases where a coordinator sent a request and timed out because the worker node paused execution and didn't return the response in a timely manner. I had found my problem!


via GIPHY


So how did this happen? Well, when we were configuring Cassandra, some doofus (aka me!) noticed that
1. The key cache drastically improved the performance of queries
2. They had a high key cache hit rate and continued to have a high hit rate even as they increased the size of the key cache.
So this doofus (again, me), decided to bump the key cache all the way to 8 GB so he could cache entire tables' keys, essentially turning them into in-memory lookups. To accommodate having such a large key cache, he bumped up the JVM heap to 20 GB thinking the new G1GC garbage collector can handle very large heaps. News flash; it can't! At least not when dealing with my workload of high volumes of concurrent reads and writes.

The fix was simple; reduce the size of the JVM heap and the key cache accordingly. The recommended heap size is 8 GB. I'm able to have my new max heap size set to slightly above that recommendation because I switched my memtables to storing part of its data off-heap (offheap_buffers) which greatly reduced heap memory pressure. I also started using the JEMAlloc allocator which is more efficient than the native GCC allocator.

In conclusion, distributed database systems are complex beasts and tuning them has to be done on many levels. While Cassandra gives you many knobs to tune to get the right performance out of your system, it takes a lot of study, experience or both, to know just how to tune them for your specific hardware and workload. DataStax published this useful Cassandra JVM tuning article to help provide guidance for users. But my advice for you would be to try a number of JVM configs and find out what works for your production workload. If I were doing this exercise over, I would start at the hardware level again but with better monitoring tools and then I would move to the system/debug/gc logs before looking at the traces. There is a good performance tuning tutorial by Kiyu Gabriel on DataStax Academy that you might find useful. Hope this article helps someone be more productive than I was.



Notes from Cassandra Day Atlanta 2016

I attended the DataStax Cassandra 2016 in Atlanta and took down a ton of notes on things that I found interesting. After going through those notes it occurred to me that many of the nuggets in these notes could be useful to someone else other than myself. So I’ve published the notes below.

The information below is mostly composed of quotes from DataStax engineers and evangelists. Very little context is contained in these notes. However, if you are a beginning-intermediate level Cassandra developer or admin you’ll likely have the needed context already. I did attempt to organize the notes somewhat coherently in order to allow you jump to a section you care about and also to provide some context in the grouping.

Data Modeling Tips

General

  • When migrating to C*, don’t just port over your SQL schema. Be query-driven in the design of your schema.
  • If you are planning on using Spark with C*, start with a C* use case / data model 1st and then use Spark on top of it for analytics
  • Patrick McFadden (DataStax evangelist) on not having certain relational DB constructs in C*: “In the past I’ve scaled SQL DBs by removing referential integrity, indexes and denormalizing. I’ve even built a KV database on an Oracle DB that I was paying of dollars per core for”. The implication here is these constructs bound scalability in relational databases and in explicitly not having them Cassandra’s scalability is unbounded (well, at least theoretically).
  • You can stop partition hotspots by adding an additional column to the partition key (like getting the modulus of another column when divided by the number of nodes) or by increasing the resolution of the key in the case where the partition key is a time span.
  • Using the “IF NOT EXISTS” clause stops an UPSERT from happening automatically / by-default. It also creates a lock on the record while executing, so that multiple writers don’t step on each other trying to insert the same record in a race condition. This is a light weight transaction (LWT). You can also create an LWT when doing a BATCH UPSERT
  • You can set a default TTL (Time To Live) on an individual table. This will apply to all data inserted into the table. A CQL insert can also specify a TTL for the inserted data that overrides the default.
  • DTCS (DateTieredCompactionStrategy) compaction is built for time series data. It groups SSTables together by time so that older tables don’t get compacted and can be efficiently dropped if a TTL is set.
  • CQL Maps allow you to create complex types inside your data store
  • One of the reasons for limiting the size of elements that can be in a CQL collection is because on reads the entire collection must be denormalized as a whole in the JVM so you can add a lot of data to the heap.

Secondary indexes

  • Secondary indexes are not like you have them in relational DBs. They are not built for speed, they are built for access.
  • Secondary indexes get slower the more nodes you have (because of network latencies)
  • Best thing to do with a secondary index is just to test it out and see its performance, but do it on a cluster not your laptop so you can actually see how it would perform in prod. Secondary indexes are good for low cardinality data.

Development Tips

Querying

  • Use the datastax drivers not ODBC drivers because datastax drivers are token aware and therefore can send queries to the right node, removing the need for the coordinator to make excessive network requests depending on the consistency level.
  • Use PreparedStatements for repeated queries. The performance difference is significant.
  • Use ExecuteAsync with PreparedStatements when bulk loading. You can have callbacks on Future objects and use the callbacks for things like detecting a failure and responding appropriately
  • BATCH is not a performance optimization. It leads to garbage collection and hotspots because the data stays in memory on the coordinator.
  • Use BATCH only to update multiple tables at once atomically. An example is if you have a materialized view / inverted index table that needs to be kept in sync with the main table.

General

  • Updates on collections create range tombstones to mark the old version of the collection (map, set, list) as deleted & create the new one. This is important to know because tombstones affect read performance and at a certain time having too many tombstones (100K) can cause a read to fail. http://www.jsravn.com/2015/05/13/cassandra-tombstones-collections.html
  • Cassandra triggers should be used with care and only in specific use cases because you need to consider the distributed nature of C*.

Ops Tips

Replication Settings

  • SimpleStrategy fails if you have multiple datacenters (DCs). Because 50% of your traffic that’s going to the other DC becomes terribly slow. Use NetworkTopologyStrategy instead. You can configure how replication goes to each DC individually, so you can have a table that never gets replicated to the US for example, etc.
  • If you are using the NetworkTopologyStrategy then you should use the Gossiping Property File Snitch to make C* network topology aware instead of the other property file configurator because you dan’t now have to change the file on every node and reboot them.

Hardware Sizing

Recommended Node size
  • 32 GB RAM
  • 8-12 Cores
  • 2 TB SSD
Hardware should be sized appropriately. 64 cores will be hard to use. If you are adding search and/or analytics to the node, you need more RAM: 128+ GB. More memory is needed for search because it keeps its indexes in memory.

Recommendation for Spark & Cassandra on the same node: Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations.

The recommendation is to have no more that 1 TB of data per node. The reason for 2 TB disk despite a 1 TB recommendation is because once over 60% of your disk is full you run a risk of not having enough disk space during compaction. This is especially true if you are using size tiered compaction. With level tiered compaction you can use up to 70% without risk.

Use RAID 0 for your storage configuration. C* does replication for you. You can also use JBOD and C* can intelligently handle failures of some of the disks in your JBOD cluster.

Java Heap Size & Garbage Collection

  • As a general rule of thumb; start with defaults and then walk it up.
  • The ParNew/CMS GC works best with 8 GB
  • The G1GC can manage 20 GB of RAM (Note: Another engineer mentioned to me that 32 GB of RAM is no problem for G1GC). Should not be used if the heap is under 8 GB.
    • Use G1GC with Solr / DSE Search nodes

Memory Usage and Caching

  • Its very important to have ample Off-heap RAM. Some C* data structures such as memtables and bloom filters are Off-heap. You also want to have non-heap RAM for page caching.
  • Row caching can significantly speed up reads because if avoids a table scan (If the page isn’t cached already). However row caching should be judiciously used. Best use case is for tables with a high density of hotspot data. The reason being that on a large table with varying and disparate data and seemingly random reads, you’ll end up with a lot of cache misses which invalidates the point of having a cache.
  • The row cache is filled on reads. memtables are filled on writes.
  • Memtables remain in memory until there is memory pressure based on configuration in the cassandra.yaml, then they are removed from RAM.

Benchmarking

  • Use the Cassandra Stress program that comes with C*.
  • Cassandra Stress can be configured; you can specify number of columns, data size, data model, queries, types of data, cardinality, etc.
  • To model production, use multiple clients & multiple threads for clients in your Benchmarking
  • When stress testing make sure you run it long enough to run into compactions, GC, repairs. Because when you test you want to know what happens in that situation. You can even stress test and introduce failures and see how it responds. You can/should instrument the JVM during stress testing and then go back and look at it.
  • General recommended stress test times is 24 - 48 hrs run times.
  • DSE has solr-stress for testing the solr integration.
For Performance A starting expectation of 3k - 5k transactions per second per core is reasonable.

Interesting Note: A DataStax customer once conducted a stress test that ran for 6-8 weeks for 24 hrs. They were testing to see how changing the compaction strategy impacted their read heavy workload.

General

  • Turn on user authentication. At least Basic Auth. This is good for security and auditing purposes. Also it allows you to not accidentally drop a production table because you thought you were connected to staging.
  • Use TLS if you are talking between DCs across the public internet
  • Don’t just bump up the heap for greater performance or to solve your problems! You’ll have to pay for it later during GC.
  • If you have crappy latency on 1% of your operations you shouldn’t just ignore it. You should try to understand what happened, is it compaction? Is it GC? That way you can address the issue that caused the high latency. Because that 1% could one day be 5%.
  • Why should be have backups? Backups exist to protect against people not machines. data corruption is the primary reason for backups. For example someone accidentally changes all the '1’s in your DB to 'c’s.
  • There is no built in way to count the number of rows in a Cassandra table. The only way to do so is to write a Spark job. You can estimate the table size if you know the amount of data per row and divide the table size by that amount.
  • Use ntpd! C* nodes in a cluster must always be on time because time stamps are important and are used in resolving conflict. Clock drifts cannot be tolerated.
  • Tombstone Hell: queries on partitions with a lot of tombstones require a lot of filtering which can cause performance problems. Compaction gets rid of tombstones.
  • Turn off swap on C* nodes.
  • If C* runs out of memory it just dies. But that’s perfectly ok, because the data is distributed / replicated and you can just bring it back up. In the mean time data will be read from the other nodes.

Cluster Management

  • Don’t put a load balancer in front of your C* cluster.
  • Make sure you are running repairs. Repairs are essentially network defrag and help maintain consistency. Run repairs a little at a time, all the time.
  • If you can model your data to have TTLs you can run repairs much less or not at all.
  • If you never delete your data you can set gc_grace_period to 0.
  • Don’t upgrade your C* versions by replacing an outgoing node with a new node running a newer version of C*. C* is very sensitive when it comes to running mixed versions in production. The older nodes may not be able to stream data to the newer node. Instead you should do an in-place upgrade, i.e. shut down the node (the C* service), upgrade C* and then bring it back up. (https://docs.datastax.com/en/upgrade/doc/upgrade/cassandra/upgradeCassandraDetails.html)
  • When a new node is added in order to increase storage capacity / relieve storage pressure on the existing nodes. Ensure you run nodetool cleanup as the final step. This is because C* won’t automatically reclaim the space of the data streamed out to the new node.

Monitoring, Diagnostic

Monitoring Services for capturing machine level metrics
  • Monit
  • Munin
  • Icinga
  • JMX Metrics
Make sure you are capturing application metrics and deliver them to a dashboard that can integrate app metrics and server metrics

Since you are running on multiple machines it becomes important to aggregate your logs.
  • Logstash
Diagnostic tools
  • htop (a better version of top)
  • iostat
  • dstat
  • strace
  • jstack
  • tcpdump (monitor network traffic, can even see plain text queried coming in)
  • nodetool tpstats (can help diagnose performance problems by showing you which thread pools are overwhelmed / blocked. From there you can make hypotheses are to the cause of the blockage / performance problem)

DSE Offering

DSE Max => Cassandra + Support + Solr + Spark

DSE search

  • Solr fixes a couple of rough edges for C* like joins, ad-hoc querying, fuzzy text searching and secondary indexing problems in larger clusters.
  • DSE search has tight Solr integration with C*. C* stores the data, Solr stores the indexes. CQL searches that use the solr_query expression in the WHERE clause search Solr first for the location of the data to fetch and then queries C* for the actual data.
  • You can checkout killrvideo’s Github for an example of DSE search in action (https://github.com/LukeTillman/killrvideo-csharp/blob/master/src/KillrVideo.Search/SearchImpl/DataStaxEnterpriseSearch.cs)
  • Solr is about a 3x multiplication on CPU and RAM needed for a running regular C*. This is because Solr indexes must live in RAM.
  • Solr can do geospatial searches & can do arbitrary time range searches (which is another rough edge that C* cannot do). E.g. “search for all sales in the past 4 mins 30 seconds”

DSE Spark

  • Spark runs over distributed data stores and schedules analytics jobs on workers on those nodes. DSE Max has Spark integration that just requires the flipping of a switch, no additional config.
  • There’s no need for definition files, workers automatically have access to the tables and Spark is data locality aware so jobs go to the right node.
  • Optionally with DSE search integration you can have search running on the same nodes that have the analytics and data and leverage the search indexes for faster querying instead of doing table scans.
  • With DSE analytics, you can create an analytics DC and have 2-way replication between the operations DC and the analytics DC. 2 way is important because it means that the analytics DC can store the result of its computation to the Cassandra table which then gets replicated back to the ops DC.
  • The Spark jobs / workers have access to more than just the C* table data. They can do just about anything you code. They can pull data from anything; open files, read queues, JDBC data stores, HDFS, etc. And write data back out as well.
  • Recommendation for Spark & Cassandra on the same node. Appropriate resource allocation is important. Having Spark will require more memory. Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations.

Other Notes

Cassandra has a reference implementation called killrvideo. It is an actual website hosted on MS Azure. The address is killrvideo.com. It is written by Luke Tillman in C#. Checkout the source code on Github (https://github.com/LukeTillman/killrvideo-csharp).

Configuring Remote Management and Monitoring on a Cassandra Node with JConsole

In your cassandra-env.sh set

LOCAL_JMX=no

If you want username and password security. Keep the default setting for jmxremote authenticate which is true. Otherwise set it to false:

JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false"

Note: If set to true, enter a username and password for the jmx user in the jmxremote.password.file. Follow the instructions here on how to secure that file. Setting jmxremote.authenticate to true also requires you to pass in the username and password when running a nodetool command, e.g. nodetool status -u cassandra -pw cassandra

Restart the server (if needed).

Connecting to the node using JConsole for monitoring

Find the JConsole jar in your JDK_HOME/bin or JDK_HOME/lib directory. If you don’t have the Java JDK installed it can be downloaded from the Oracle Website.

Double-click the executable jar to run it (or run it from the command line). Select “Remote Process” and enter the following connection string.

 service:jmx:rmi:///jndi/rmi://<target ip address>:7199/jmxrmi

replacing <target ip address> with the address of the machine you intend to manage or monitor.

Note: JConsole may try (and fail) to connect using ssl first. If it does so it will ask if it can connect over a non-encrypted connection. You should answer this prompt in the affirmative and you are good.

Congrats! You now have everything you need to monitor and manage a cassandra node.

For help with how to monitor Cassandra using JMX and with interpreting the metrics see:

The Right Database for the Right Job - Chattanooga Developer Lunch Presentation

Does this sound like you? "OMG!! PostreSQL, Neo4j, Elasticsearch, MongoDB, RethinkDB, Cassandra, SQL Server, Riak, InfluxDB, Oracle NoSQL, SQLite, Hive, Couchbase, CouchDB, DynamoDB. I've got an issue with my current database solution or I'm starting a new project and I don't know what to choose!"

This talk is intended to help you match your data storage needs with suitable solutions from a wide field of contenders. Looking at different data types, structures and interaction patterns, we will try to understand what makes certain data stores better suited than others and how implement polyglot persistence.


Stream Processing With Spring, Kafka, Spark and Cassandra - Part 1

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Part 1 - Overview

Before starting any project I like to make a few drawings, just to keep everything in perspective. My main motivation for this series is to get better acquainted wit Apache Kafka. I just didn't have a chance to use it on some of the projects that I work on in my day to day life, but it's this new technology everybody is buzzing about so I wanted to give it a try. One other thing is that I also didn't get a chance to write Spark Streaming applications, so why not hit two birds with one stone? Here is 10 000 feet overview of the series:

Avoiding the tl;dr

Part of the motivation for splitting is in avoiding the tl;dr effect ;) Now, let's get back to the overview. We'll break down previous image box by box.

Using Spring Boot

We're basically just prototyping here, but to keep everything flexible and in the spirit of the newer architectural paradigms like Microservices the post will be split in 5 parts. The software will also be split so we won't use any specific container for our applications we'll just go with Spring Boot. In the posts we won't go much over the basic, you can always look it up in the official documentation.

Apache Kafka

This is the reason why I'm doing this in the first place. It's this new super cool messaging system that all the big players are using and I want to learn how to put it to everyday use.

Spark Streaming

For some time now I'm doing a lot of stuff with Apache Spark. But somehow I didn't get a chance to look into streaming a little bit better.

Cassandra

Why not?

What this series is about?

It's a year where everybody is talking about voting ... literary everywhere :) so let's make a voting app. In essence it will be a basic word count in the stream. But let's give some context to it while we're at it. We won't do anything complicated or useful. Basically the end result will be total count of token occurrence in the stream. We'll also break a lot of best practices in data modeling etc. in this series.

Series is for people oriented toward learning something new. I guess experienced and battle proven readers will find a ton of flaws in the concept but again most of them are deliberate. One thing I sometimes avoid in my posts is including source code. My opinion is that a lot more remains remembered and learners feel much more comfortable when faced with problems in practice. So I'll just copy paste crucial code parts. One more assumption from my side will be that the readers will be using IntelliJ IDEA. Let's got to Part 2 and see how to setup kafka.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 2

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Setting up Kafka

In this section we'll setup two kafka brokers. We'll also need a zookeeper. If you are reading this my guess is that you don't have one setup already so we'll use the one bundled with kafka. We won't cover everything here. Do read the official documentation for more in depth understanding.

Downloading

Download latest Apache Kafka. In this tutorial we'll use binary distribution. Pay attention to the version of scala if you attend to use kafka with specific scala version. In this tutorial we'll concentrate more on Java. But this will be more important in parts to come. In this section we'll use the tools that ship with Kafka distribution to test everything out. Once again download and extract the distribution of Apache Kafka from official pages.

Configuring brokers

Go into directory where you downloaded and extracted your kafka installation. There is a properties file template and we are going to use properties files to start the brokers. Make two copies of the file:

        $ cd your_kafka_installation_dir
        $ cp config/server.properties config/server0.properties
        $ cp config/server.properties config/server1.properties
    
Now use your favorite editor to make changes to broker configuration files. I'll just use vi, after all it has been around for 40 years :)
        $ vi config/server0.properties
    
Now make changes (check if they are set) to following properties:
        broker.id=0
        listeners=PLAINTEXT://:9092
        num.partitions=2
        log.dirs=/var/tmp/kafka-logs-0
    
Make the changes for the second node too:
        $ vi config/server1.properties
    
        broker.id=1
        listeners=PLAINTEXT://:9093
        num.partitions=2
        log.dirs=/var/tmp/kafka-logs-1
    

Starting everything up

First you need to start the zookeeper, it will be used to store the offsets for topics. There are more advanced versions of using where you don't need it but for someone just starting out it's much easier to use zookeeper bundled with the downloaded kafka. I recommend opening one shell tab where you can hold all of the running processes. We didn't make any changes to the zookeeper properties, they are just fine for our example:

        $ bin/zookeeper-server-start.sh config/zookeeper.properties &
    
From the output you'll notice it started a zookeeper on default port 2181. You can try telnet to this port on localhost just to check if everything is running fine. Now we'll start two kafka brokers:
        $ bin/kafka-server-start.sh config/server0.properties &
        $ bin/kafka-server-start.sh config/server1.properties &
    

Creating a topic

Before producing and consuming messages we need to create a topic for now you can think of it as of queue name. We need to give a reference to the zookeeper. We'll name a topic "votes", topic will have 2 partitions and a replication factor of 2. Please read the official documentation for further explanation. You'll see additional output coming from broker logs because we are running the examples in the background.

        $ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic votes --partitions 2 --replication-factor 2
    

Sending and receiving messages with bundled command line tools

Open two additional shell tabs and position yourself in the directory where you installed kafka. We'll use one tab to produce messages. And second tab will consume the topic and will simply print out the stuff that we typed in in the first tab. Now this might be a bit funny, but imagine you are actually using kafka already!

In tab for producing messages run:

        $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic votes
    

In tab for consuming messages run:

        $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic votes
    

Next part

We covered a lot here but writing from one console window to another can be achieved wit far simpler combination of shell commands. In Part 3 we'll make an app that writes to a topic. We'll also use console reader just to verify that our app is actually sending something to topic.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 3

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Writing a Spring Boot Kafka Producer

We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot. The application will essentially be a simple proxy application and will receive a JSON containing the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: spring-boot-kafka-example
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: spring-boot-kafka-example
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Kafka Example
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Next
  18. Project name: spring-boot-kafka-example
  19. The rest is just fine ...
  20. Finish
  21. After creating project check sdk setting, it should be java 8

build.gradle dependencies

        compile('org.apache.kafka:kafka_2.11:0.9.0.0')
        compile('org.apache.zookeeper:zookeeper:3.4.7')
    

application.properties

        brokerList=localhost:9092
        sync=sync
        topic=votes
    

SpringBootKafkaProducer

This is the class where all the important stuff is happening

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

@Configuration
public class SpringBootKafkaProducer {

    @Value("${brokerList}")
    private String brokerList;

    @Value("${sync}")
    private String sync;

    @Value("${topic}")
    private String topic;

    private Producer<String, String> producer;

    public SpringBootKafkaProducer() {
    }

    @PostConstruct
    public void initIt() {
        Properties kafkaProps = new Properties();

        kafkaProps.put("bootstrap.servers", brokerList);

        kafkaProps.put("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("acks", "1");

        kafkaProps.put("retries", "1");
        kafkaProps.put("linger.ms", 5);

        producer = new KafkaProducer<>(kafkaProps);

    }

    public void send(String value) throws ExecutionException, 
            InterruptedException {
        if ("sync".equalsIgnoreCase(sync)) {
            sendSync(value);
        } else {
            sendAsync(value);
        }
    }

    private void sendSync(String value) throws ExecutionException,
            InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
        producer.send(record).get();

    }

    private void sendAsync(String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);

        producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
            if (e != null) {
                e.printStackTrace();
            }
        });
    }
}
    

SpringBootKafkaExampleApplication

This one will be automatically generated.

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootKafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaExampleApplication.class, args);
    }
}
    

AppBeans

Setup beans for the controller.

package com.example;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppBeans {

    @Bean
    public SpringBootKafkaProducer initProducer() {
        return new SpringBootKafkaProducer();
    }
}
    

Helper beans

Status to return to clients, we'll just send "ok" every time.

package com.example;

public class Status {
    private String status;

    public Status(String status) {
        this.status = status;
    }

    public Status() {
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}
    
This will be the input to our app
package com.example;

public class Vote {
    private String name;

    public Vote(String name) {
        this.name = name;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
    

SpringBootKafkaController

This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class SpringBootKafkaController {

    @Autowired
    SpringBootKafkaProducer springBootKafkaProducer;

    @RequestMapping("/vote")
    public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException {

        springBootKafkaProducer.send(vote.getName());

        return new Status("ok");
    }

}
    

Checking everything

There should be an active console reader from previous post so we won't cover this. After running the SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the following JSON to http://localhost:8080/vote

{
    "name": "Test"
}
    
If everything was fine you should see the name that you send in this json in the console consumer. In Part 4 we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and push them back to cassandra.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 4

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Consuming Kafka data with Spark Streaming and Output to Cassandra

In this section we are going to use spark streaming to read the data in coming from kafka. We'll also combine it with the data already in cassandra, we're going to do some computation with it and we're going to put the results back to cassandra. The best practice would be to have a spark cluster running but for the sake of simplicity we are going to launch local spark context from a java application and do some processing there. We won't go into configuring Cassandra to run, there is plenty documentation there and it takes just minutes to setup.

Cassandra

Nothing fancy here, just a name of the entity for votes and a number of votes

CREATE KEYSPACE voting
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };

USE voting;

CREATE TABLE votes (name text PRIMARY KEY, votes int);
    

Let's create a simple java project with gradle for stream processing

  1. File, New Project, Gradle
  2. Project SDK: Java 8
  3. Java
  4. Next
  5. GroupId: spark-kafka-streaming-example
  6. ArtifactId: spark-kafka-streaming-example
  7. Version: 1.0-SNAPSHOT
  8. Next
  9. Use default gradle wrapper
  10. Next
  11. Project name: spark-kafka-streaming-example
  12. The rest is just fine ...
  13. Finish
  14. After creating project check sdk setting, it should be java 8

Let's have a look at the dependencies

group 'spark-kafka-streaming-example'
version '1.0-SNAPSHOT'

apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile('org.apache.spark:spark-core_2.10:1.5.2')
    compile('org.apache.spark:spark-streaming_2.10:1.5.2')
    compile('org.apache.spark:spark-streaming-kafka_2.10:1.5.2')
    compile('com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3')
    compile('com.datastax.spark:spark-cassandra-connector-java_2.10:1.5.0-M3')

    testCompile group: 'junit', name: 'junit', version: '4.11'
}
    

Simple Voting Class to go with Cassandra Table

We'll use this class for storing data into cassandra

import java.io.Serializable;

public class Vote implements Serializable {
    private String name;
    private Integer votes;

    public Vote(String name, Integer votes) {
        this.name = name;
        this.votes = votes;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getVotes() {
        return votes;
    }

    public void setVotes(Integer votes) {
        this.votes = votes;
    }
}
    

Spark streaming with kafka

And finally the code to accept tokens that come in, compare them with data in cassandra and then write them back to cassandra. I didn't spend much time around configuring the class for external parameters, but for the example it's good enough:

import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

public class SparkStreamingExample {

    public static JavaSparkContext sc;

    public static void main(String[] args) throws IOException {

        String brokers = "localhost:9092,localhost:9093";
        String topics = "votes";

        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster("local[2]");
        sparkConf.setAppName("SparkStreamingExample");
        sparkConf.set("spark.cassandra.connection.host",
            "127.0.0.1");

        JavaStreamingContext jssc = new JavaStreamingContext(
            sparkConf,
            Durations.seconds(10));

        HashSet<String> topicsSet = new HashSet<>(
                Arrays.asList(topics.split(",")));
        HashMap<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers);

        JavaPairInputDStream<String, String> messages =
                KafkaUtils.createDirectStream(
                        jssc,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        topicsSet
                );

        JavaDStream<String> lines =
                messages.map(
                        (Function<Tuple2
                        <String, String>,
                        String>) Tuple2::_2);

        JavaPairDStream<String, Integer> voteCount = lines
            .mapToPair(
                (PairFunction<String, String, Integer>) s ->
                        new Tuple2<>(s, 1)).reduceByKey(
                (Function2<Integer, Integer, Integer>)
                    (i1, i2) ->i1 + i2);

        sc = jssc.sparkContext();

        voteCount.foreachRDD((v1, v2) -> {
            v1.foreach((x) -> {
                CassandraTableScanJavaRDD<CassandraRow> previousVotes =
                    javaFunctions(sc)
                        .cassandraTable("voting", "votes")
                        .where("name = '" + x._1() + "'");

                Integer oldVotes = 0;
                if (previousVotes.count() > 0) {
                    oldVotes = 
                        previousVotes.first().getInt("votes");
                }

                Integer newVotes = oldVotes + x._2();

                List<Vote> votes = Arrays.asList(
                    new Vote(x._1(), newVotes));
                JavaRDD<Vote> rdd = sc.parallelize(votes);

                javaFunctions(rdd)
                    .writerBuilder("voting", "votes", mapToRow(Vote.class))
                    .saveToCassandra();
            });

            return null;
        });

        voteCount.print();

        jssc.start();
        jssc.awaitTermination();
    }
}
    

And that's it

You can check how data changes by running select statements from voting table. In Part 5 we are going to make a simple spring boot project that displays and sorts the voting data.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 5

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Displaying Cassandra Data With Spring Boot

Now that we have our voting data in Cassandra let's write a simple Spring Boot project that simply gathers all the data from cassandra sorts them and displays to user.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: boot-cassandra-data-show
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: boot-cassandra-data-show
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Display Cassandra Data
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Template Engines - Mustache
  18. Next
  19. Project name: boot-cassandra-data-show
  20. The rest is just fine ...
  21. Finish
  22. After creating project check sdk setting, it should be java 8

Cassandra dependencies

compile('com.datastax.cassandra:cassandra-driver-core:2.1.9')
    

Vote class

We'll use this class to map rows from cassandra.

package com.example;

import java.io.Serializable;

public class Vote implements Serializable {
    private String name;
    private Integer votes;

    public Vote(String name, Integer votes) {
        this.name = name;
        this.votes = votes;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getVotes() {
        return votes;
    }

    public void setVotes(Integer votes) {
        this.votes = votes;
    }
}
    

application.properties

server.port = 8090
contactPoint = 127.0.0.1
keyspace = voting
    

CassandraSessionManager

This bean is used to setup connection towards Cassandra
package com.example;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Configuration
public class CassandraSessionManager {

    private Session session;
    private Cluster cluster;

    @Value("${contactPoint}")
    private String contactPoint;

    @Value("${keyspace}")
    private String keyspace;

    public CassandraSessionManager() {

    }

    public Session getSession() {
        return session;
    }

    @PostConstruct
    public void initIt() {
        cluster = Cluster.builder().addContactPoint(
            contactPoint).build();
        session = cluster.connect(keyspace);
    }

    @PreDestroy
    public void destroy() {
        if (session != null) {
            session.close();
        }
        if (cluster != null) {
            cluster.close();
        }
    }
}

    

BootCassandraDataShowApplication

Automatically generated ...
package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BootCassandraDataShowApplication {

    public static void main(String[] args) {
        SpringApplication.run(
        BootCassandraDataShowApplication.class, args);
    }
}
    

AppBeans

Bean for holding configured objects.

package com.example;

import com.datastax.driver.core.Session;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppBeans {

    @Bean
    public Session session() {
        return sessionManager().getSession();
    }

    @Bean
    public CassandraSessionManager sessionManager() {
        return new CassandraSessionManager();
    }
}

    

Web Controller

package com.example;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;

@Configuration
@Controller
public class WelcomeController {

    @Autowired
    Session session;

    @RequestMapping("/")
    public String welcome(Map<String, Object> model) {

        final ResultSet rows = session.execute("SELECT * FROM votes");

        ArrayList results = new ArrayList<>();

        for (Row row : rows.all()) {
            results.add(new Vote(
                row.getString("name"),
                row.getInt("votes")
            ));
        }

        Collections.sort(results, (a, b) ->
        b.getVotes().compareTo(a.getVotes()));

        model.put("results", results);

        return "welcome";
    }
}
    

Template to show the results

<!DOCTYPE html>
<html lang="en">
<body>

<h1>Voting results:</h1>
<br/>
{{#results}}
    <strong>{{this.name}}</strong> {{this.votes}} <br/>
{{/results}}

</body>
</html>
    

That's all folks

Now this app might not seem as a lot, but there's a kafka cluster that receives messages comming in from a spring boot app that exposes REST interface. Messages that come in from kafka are then processed with Spark Streaming and then sent to Cassandra. There is another Spring Boot app that sorts and displays results to the users. This small tutorial covers most of the cool java/big data technologies now-days. Special thanks to the readers that went through all five parts of this tutorial ;)

Cassandra TIme Series Bucketing

Intro

Bucketing is one of the most important techniques when working with time series data in Cassandra. This post has it's roots in two very popular blog entries:

The posts are very well written and the pretty much describe all of the standard techniques when it comes down to working with time series data in Cassandra. But to be honest there isn't all that much code in them. This is partly to a fact that almost every project has it's own specifics and from my experience it often happens that even within a relatively small team there will be multiple implementations on how to bucket and access the time series data.

The Case for Bucketing

For some time now I'm in the world if IoT and I find that explaining everything with a help of a simple temperature sensor is the best method to discuss the subject. Previously mentioned articles are also a good read. This section is sort of a warm up. Theoretically in most of the use cases we'll want to access temperature readings by some sensor Id and we know where this sensor is located. In the most simple case sensor id becomes the long row in cassandra and the readings are stored in it and kept sorted by time etc. However in some cases the temperature may be read very often and this could cause the wide row to grow to a proportion that is not manageable by cassandra so the data has to be split among multiple long rows. The easiest method to make this split is to make multiple long rows based on the measurement timestamp.

How big should my buckets be?

It may vary from project to project, but it depends on two important factors. How many readings are you storing per single measurement and how often the measurement is happening. For instance if you are recording a reading once per day you probably don't even need the bucketing. Also if you are recording it once per hour the project you are working on probably wont't last long enough for you to run into problem. It applies to seconds too, but only for the most trivial case where you are making a single reading. If you go into frequencies where something is happening on the milliseconds level you will most definetly need bucketing. The most complex project I worked up until now had time bucketing on a level of a single minute. meaning every minute, new bucket. But that project is not in the IoT world, In that world I'm using partitions on a month basis.

10 000 feet Bucketing View

Main problem is how to calculate the bucket based on measurement time stamp. Also keep in mind there might be differences between the timezones, in a distributed system a very advisable practice is to save everything in the UTC format. If we decided that wee need bucketing per day it could be something as simple as the following:

    FastDateFormat dateFormat = FastDateFormat.getInstance(
        "yyyy-MM-dd", TimeZone.getTimeZone("UTC"));

    public String dateBucket(Date date) {
        return dateFormat.format(date;
    }
    
That's it, combine this with your sensor Id and you get buckets on a day level basis. Now the problem is how to retrieve the measurements from buckets. Especially if you have to fetch the measurements across multiple buckets. We'll go over this in the next section.

Anything goes

Bare in mind that you should keep buckets in time series data easy to maintain. Also try to avoid having multiple implementation for the same thing in your code base. This section will not provide 100% implemented examples but will be more on a level of a pseudo code.

When you are fetching the data from the buckets, you will have two types of query. One is to fetch data out from the bucket without any restrictions on measurement time stamp. The other is when you will want to start from a certain position within the bucket. Again there is a question of ordering and sorting the retrieved data. I worked in systems having all sorts of practices there, most of the time reversing was done with a help of a specific boolean flag but my opinion is this should be avoided. It's best to stick to the from and to parameters and order the data according to them. i.e.

        from:   01/01/2016
        to:     02/02/2016
        returns: ascending

        from:   02/02/2016
        to:     01/01/2016
        returns: descending
    
That way you don't have to break you head and think about various flags passed over the levels in your code.

Here is a bit of pseudo code:

        // constructor of your iterator object

        startPartition = dateBucket(from);
        endPartition = dateBucket(to);

        lastFetchedToken = null;

        bucketMoveCount = 0;

        String statement = "SELECT * FROM readings"

        // from past experience, somehow the driver takes out data the fastest
        // if it fetches 3000 items at once, would be interesting to drill down
        // why is this so :)

        int fetchSize = 3000;

        if (from.isBefore(to)) {
            select = statement + " ORDER BY measurement_timestamp ASC LIMIT " + fetchSize;
            selectFromBoundary = statement + " AND measurement_timestamp > ? ORDER BY measurement_timestamp ASC LIMIT " + fetchSize;

            partitionDiff = -1f;
        } else {
            selectNormal = statement + " LIMIT " + fetchSize;
            selectFromBoundary = statement + " AND measurement_timestamp < ? LIMIT " + fetchSize;

            partitionDiff = 1f;
        }
    
Partition could move by hour, day, minute. It all depends on how you decide to implement it. You will have to do some time based calculations there I recommend using Joda-Time there. Now when you defined how init of an iterator looks like, it's time to do some iterations over it:
    public List<Row> getNextPage() {

        List<Row> resultOut = new ArrayList<>();

        boolean continueFromPreviousBucket = false;

        do {
            ResultSet resultSet =
                    lastFetchedToken == null ?
                            session.execute(new SimpleStatement(select, currentBucket)) :
                            session.execute(new SimpleStatement(selectFromBoundary, currentBucket, lastToken));

            List<Row> result = resultSet.all();

            if (result.size() == fetchSize) {
                if (continueFromPreviousBucket) {
                    resultOut.addAll(result.subList(0, fetchSize - resultOut.size()));
                } else {
                    resultOut = result;
                }

                lastFetchedToken = resultOut.get(resultOut.size() - 1).getUUID("measurement_timestamp");

            } else if (result.size() == 0) {
                currentBucket = calculateNextBucket();
                bucketMoveCount++;

            } else if (result.size() < fetchSize) {
                currentBucket = calculateNextBucket();
                bucketMoveCount++;

                lastFetchedToken = null;

                if (continueFromPreviousBucket) {
                    resultOut.addAll(result.subList(0, Math.min(result.size(), fetchSize - resultOut.size())));
                } else {
                    resultOut = result;
                }

                continueFromPreviousBucket = true;
            }

            if (resultOut.size() == fetchSize
                    || bucketMoveCount >= MAX_MOVE_COUNT
                    || Math.signum(currentBucket.compareTo(endPartition)) != okPartitionDiff) {
                break;
            }

        } while (true);

        return result;
    }
    

This is just a high level overview of how to move among the buckets. Actual implementation would actually be significantly different from project to project. My hope for this post is that you give the problems I faced a thought before you run into them.

Spring Data Cassandra vs. Native Driver

Intro

For some time now spring data with cassandra is getting more and more popular. My main concern with the framework is performance characteristics when compared to native cql driver. After all with the driver everything is under your control and one can probably squeeze much more juice out of cluster. O.k. I admit it's not always about performance. If that would be the case we would all be writing software in C or assembler. But still I think it's a good practice to be aware of the drawbacks.

To be honest spring data cassandra is relatively new to me. I did the performance comparison on the lowest level without using repositories and other high level concepts that come with spring data cassandra. My focus in this post is more on the generics that decode the data that comes out from the driver. To make a comparison I'm going to use a simple cassandra table (skinny row), then I'm going to make query after query (5000 and 10000) towards cassandra and after that I'll decode results. Once again the focus in this post is not on performance characteristics of higher order functionalities like paged queries etc. I just wanted to know by a rule of thumb what can I expect from spring data cassandra.

Setup

    -- simple skinny row
    CREATE TABLE activities (
        activity_id uuid,
        activity_model_id bigint,
        activity_state text,
        asset_id text,
        attrs map<text, text>,
        creation_time timestamp,
        customer_id text,
        end_time timestamp,
        last_modified_time timestamp,
        person_id text,
        poi_id text,
        start_time timestamp,
        PRIMARY KEY (activity_id)
    );

    
To eliminate all possible effects, I just used single skinny row:
    activity_id 72b493f0-e59d-11e3-9bd6-0050568317c1
    activity_model_id 66
    activity_state DONE
    asset_id 8400848739855200000
    attrs {
        'businessDrive': '1:1',
        'customer': '4:test_test_test',
        'distance': '3:180', 
        'endLocation': '6:15.7437466839,15.9846853333,0.0000000000',
        'fromAddress': '4:XX1', 
        'locked': '1:0', 
        'reason': '4:Some reason 2', 
        'startLocation': 
        '6:15.7364385831,15.0071729736,0.0000000000', 
        'toAddress': '4:YY2'
        }
    creation_time 2014-05-27 14:50:14+0200
    customer_id 8400768435301400000
    end_time 2014-05-27 12:15:40+0200
    last_modified_time 2014-05-29 21:30:44+0200
    person_id 8401111750365200000
    poi_id null
    start_time 2014-05-27 12:13:05+0200
    
This row is fetched every time, to detect differences We'll see how long the iterations last. Network and cluster is also out of scope so everything was tested on local running datastax cassandra community (2.0.16) instance.

The code

To separate all possible interfering effects I used two separate projects. I had a situation where I used an old thrift api together with cql driver and it significantly affected performance. And it required additional configuration parameters etc. The main code snippets are located on gist. This is not the focus here, but if somebody is interested:

spring-data
native-drivers

Results in milliseconds

    3 fields - 5000 items
        spring-data
        5381
        5282
        5385
        avg: 5339

        driver
        4426
        4280
        4469
        avg: 4390

        result: driver faster 21.6%

    3 fields - 10000 items
        spring-data
        8560
        8133
        8144
        avg: 8279

        driver
        6822
        6770
        6875
        avg: 6822
        
        result: driver faster 21.3%

    12 fields - 5000 items
        spring-data
        5911
        5920
        5928
        avg: 5920 - 10.88 % slower than with 3 fields!

        driver
        4687
        4669
        4606
        avg: 4654 - 6 % slower than with 3 fields

        result: driver faster 27%

Conclusions

Spring data cassandra may be very interesting if you are interested to learn something new. It might also have very positive development effects when prototyping or doing something similar. I didn't test the higher order functionalities like pagination etc. This was just a rule of a thumb test to see what to expect. Basically the bigger the classes that you have to decode the bigger the deserialization cost. At least this is the effect I'm noticing in my basic tests.

Follow up with Object Mapping available in Cassandra driver 2.1

There was an interesting follow up disuccion on reddit. By a proposal from reddit user v_krishna another candidate was added to comparison Object-mapping API.

Let's see the results:

    3 fields - 5000 items
        spring-data
        5438
        5453
        5576
        avg: 5489

        object-map
        5390
        5299
        5476
        avg: 5388

        driver
        4382
        4410
        4249
        avg: 4347

    conclusion
        - driver 26% faster than spring data
        - object map just under 2% faster than spring data

    3 fields - 10000 items
        spring-data
        8792
        8507
        8473
        avg: 8591

        object-map
        8435
        8494
        8365
        avg: 8431

        driver
        6632
        6760
        6646
        avg: 6679

    conclusion
        - driver faster 28.6% than spring data
        - object mapping just under 2% faster than spring data

    12 fields 5000 items
        spring-data
        6193
        5999
        5938
        avg: 6043

        object-map
        6062
        5936
        5911
        avg: 5970

        driver
        4910
        4955
        4596
        avg: 4820

    conclusion
        - driver 25% faster than spring data
        - object mapping 1.2% faster than spring data

To keep everything fair, there was some deviation in test runs when compared to previous test, here are deviations:

comparison with first run:
    3 fields - 5000 items
        spring-data
        avg1: 5339
        avg2: 5489
        2.7% deviation

        driver
        avg1: 4390
        avg2: 4347
        1% deviation

    3 fields - 10000 items
        spring-data
        avg1: 8279
        avg2: 8591
        3.6% deviation

        driver
        avg1: 6822
        avg2: 6679
        2.1% deviation

    12 fields 5000 items
        spring-data
        avg1: 5920
        avg2: 6043
        2% deviation

        driver
        avg1: 4654
        avg2: 4820
        3.4% deviation
Object mapping from spring data seems to be just a bit slower then object mapping available in new driver. I can't wait to see the comparison of two in future versions. Initially I was expecting around 5-10% percent worse performance when compared to object mapping capabilities. It surprised me a bit that the difference was more on the level of 25%. So if you are planning on using object mapping capabilities there is a performance penalty.

Enhance Apache Cassandra Logging

Cassandra usually output all its logs in a system.log file. It uses log4j old 1.2 version for cassandra 2.0, and since 2.1, logback, which of course use different syntax :)
Logs can be enhanced with some configuration. These explanations works with Cassandra 2.0.x and Cassandra 2.1.x, I haven’t tested others versions yet.

I wanted to split logs in different files, depending on their “sources” (repair, compaction, tombstones etc), to ease debugging, while keeping the system.log as usual.

For example, to declare 2 new files to handle, say Repair and Tombstones logs :

Cassandra 2.0 :

You need to declare each new log files in log4j-server.properties file.

[...]
## Repair
log4j.appender.Repair=org.apache.log4j.RollingFileAppender
log4j.appender.Repair.maxFileSize=20MB
log4j.appender.Repair.maxBackupIndex=50
log4j.appender.Repair.layout=org.apache.log4j.PatternLayout
log4j.appender.Repair.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
## Edit the next line to point to your logs directory
log4j.appender.Repair.File=/var/log/cassandra/repair.log

## Tombstones
log4j.appender.Tombstones=org.apache.log4j.RollingFileAppender
log4j.appender.Tombstones.maxFileSize=20MB
log4j.appender.Tombstones.maxBackupIndex=50
log4j.appender.Tombstones.layout=org.apache.log4j.PatternLayout
log4j.appender.Tombstones.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
### Edit the next line to point to your logs directory
log4j.appender.Tombstones.File=/home/log/cassandra/tombstones.log

Cassandra 2.1 :

It is in the logback.xml file.

  <appender name="Repair" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>${cassandra.logdir}/repair.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
      <fileNamePattern>${cassandra.logdir}/system.log.%i.zip</fileNamePattern>
      <minIndex>1</minIndex>
      <maxIndex>20</maxIndex>
    </rollingPolicy>

    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
      <maxFileSize>20MB</maxFileSize>
    </triggeringPolicy>
    <encoder>
      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
      <!-- old-style log format
      <pattern>%5level [%thread] %date{ISO8601} %F (line %L) %msg%n</pattern>
      -->
    </encoder>
  </appender>

  <appender name="Tombstones" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>${cassandra.logdir}/tombstones.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
      <fileNamePattern>${cassandra.logdir}/tombstones.log.%i.zip</fileNamePattern>
      <minIndex>1</minIndex>
      <maxIndex>20</maxIndex>
    </rollingPolicy>

    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
      <maxFileSize>20MB</maxFileSize>
    </triggeringPolicy>
    <encoder>
      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
      <!-- old-style log format
      <pattern>%5level [%thread] %date{ISO8601} %F (line %L) %msg%n</pattern>
      -->
    </encoder>
  </appender>

Now that theses new files are declared, we need to fill them with logs. To do that, simply redirect some Java class to the good file. To redirect the class org.apache.cassandra.db.filter.SliceQueryFilter, loglevel WARN to the Tombstone file, simply add :

Cassandra 2.0 :

log4j.logger.org.apache.cassandra.db.filter.SliceQueryFilter=WARN,Tombstones

Cassandra 2.1 :

<logger name="org.apache.cassandra.db.filter.SliceQueryFilter" level="WARN">
    <appender-ref ref="Tombstones"/>
</logger>

It’s a on-the-fly configuration, so no need to restart Cassandra !
Now you will have dedicated files for each kind of logs.

A list of interesting Cassandra classes :

org.apache.cassandra.service.StorageService, WARN : Repair
org.apache.cassandra.net.OutboundTcpConnection, DEBUG : Repair (haha, theses fucking stuck repair)
org.apache.cassandra.repair, INFO : Repair
org.apache.cassandra.db.HintedHandOffManager, DEBUG : Repair
org.apache.cassandra.streaming.StreamResultFuture, DEBUG : Repair 
org.apache.cassandra.cql3.statements.BatchStatement, WARN : Statements
org.apache.cassandra.db.filter.SliceQueryFilter, WARN : Tombstones

You can find from which java class a log message come from by adding “%c” in log4j/logback “ConversionPattern” :

org.apache.cassandra.db.ColumnFamilyStore INFO  [BatchlogTasks:1] 2015-09-18 16:43:48,261 ColumnFamilyStore.java:939 - Enqueuing flush of batchlog: 226172 (0%) on-heap, 0 (0%) off-heap
org.apache.cassandra.db.Memtable INFO  [MemtableFlushWriter:4213] 2015-09-18 16:43:48,262 Memtable.java:347 - Writing Memtable-batchlog@1145616338(195.566KiB serialized bytes, 205 ops, 0%/0% of on/off-heap limit)
org.apache.cassandra.db.Memtable INFO  [MemtableFlushWriter:4213] 2015-09-18 16:43:48,264 Memtable.java:393 - Completed flushing /home/cassandra/data/system/batchlog/system-batchlog-tmp-ka-4267-Data.db; nothing needed to be retained.  Commitlog position was ReplayPosition(segmentId=1442331704273, position=17281204)

You can disable “additivity” (i.e avoid adding messages in system.log for example) in log4j for a specific class by adding :

log4j.additivity.org.apache.cassandra.db.filter.SliceQueryFilter=false

For logback, you can add additivity=”false” to <logger .../> elements.

To migrate from log4j logs to logback.xml, you can look at http://logback.qos.ch/translator/

Sources :

Note: you can add http://blog.alteroot.org/feed.cassandra.xml to your rss aggregator to follow all my Cassandra posts :)

Analysis of Cassandra powered Greenhouse with Apache Spark

Intro

In the previous post we went over the steps for gathering the data on the Rasperry pi.

  1. Gather Data on Raspberry Pi with Cassandra and Arduino
  2. Arduino Greenhouse
In this post I'm going to go over the steps necessary to get the data into Cassandra and then process it with Apache Spark.

Cassandra queries

    -- we'll keep the data on just one node
    CREATE KEYSPACE home
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };
    
    -- create statement, bucketed by date
    CREATE TABLE greenhouse (
        source text,
        day text,
        time timestamp,
        temperaturein decimal,
        temperatureout decimal,
        temperaturecheck decimal,
        humidity decimal,
        light int,
        PRIMARY KEY ((source, day), time)
    )
    WITH CLUSTERING ORDER BY (time DESC);
    
    -- example insert, just to check everything out
    INSERT INTO greenhouse (
        source, day, time, temperaturein,
        temperatureout, temperaturecheck,
        humidity, light)
    VALUES ('G', '2015-04-04', dateof(now()), 0,
        0, 0, 0, 0);
    
    -- check if everything is inserted
    SELECT * FROM greenhouse WHERE source = 'G' AND day = '2015-04-19';
    

Analysis results

I wanted to keep the partitions relatively small because I didn't know how RaspberryPi is going to handle the data. Timeout is possible if the rows get to big so I went with the partitioning the data by day. The analysis of the April showed that the project paid off. Here are the results of analysis:

Total Data points(not much, but it's a home DIY solution after all)
172651

First record
Measurement{source='G', day='2015-04-04', time=Sat Apr 04 17:04:41 CEST 2015, temperaturein=11.77, temperatureout=10.43, temperaturecheck=15.0, humidity=46.0, light=57}

Last record
Measurement{source='G', day='2015-05-04', time=Mon May 04 09:37:35 CEST 2015, temperaturein=22.79, temperatureout=20.49, temperaturecheck=23.0, humidity=31.0, light=68}

Cold nights(bellow 2 C outside)
2015-04-06
2015-04-07
2015-04-10
2015-04-16
2015-04-17
2015-04-18
2015-04-19
2015-04-20

Lowest In
Measurement{source='G', day='2015-04-06', time=Mon Apr 06 06:22:25 CEST 2015, temperaturein=2.28, temperatureout=2.39, temperaturecheck=4.0, humidity=41.0, light=8}

Highest In
Measurement{source='G', day='2015-04-22', time=Wed Apr 22 14:52:26 CEST 2015, temperaturein=75.53, temperatureout=43.53, temperaturecheck=71.0, humidity=21.0, light=84}

Average In
19.45

Lowest Out
Measurement{source='G', day='2015-04-20', time=Mon Apr 20 04:42:16 CEST 2015, temperaturein=4.48, temperatureout=-2.88, temperaturecheck=6.0, humidity=31.0, light=0}

Highest Out
Measurement{source='G', day='2015-04-22', time=Wed Apr 22 15:58:32 CEST 2015, temperaturein=57.69, temperatureout=45.07, temperaturecheck=56.0, humidity=24.0, light=71}

Average Out
14.71

Average Difference
4.75

Biggest Diff
Measurement{source='G', day='2015-04-20', time=Mon Apr 20 15:11:53 CEST 2015, temperaturein=69.93, temperatureout=28.36, temperaturecheck=62.0, humidity=21.0, light=83}

The code

  1. Spark analysis code

Gather Data on Raspberry Pi with Cassandra and Arduino

Intro

In the previous post we went over the steps necessary to make a sensor for a small greenhouse for the balcony.

  1. Arduino Greenhouse
In this section we are going to concentrate on how to gather the data coming in from the Greenhouse. The approach is applicable for any kind of telemetry data or something similar. The parts list is simpler than in the previous section but as a "concentrator" node we are going to use a raspberry pi. Here are the parts:
  • Arduino Uno
  • USB cable
  • Raspberry PI
  • nRF24L01+
  • 7 Wires
To install Arduino libraries please consult the previous post. The wiring for the nRF24 is the same as in the previous post.

Persisting the data

To persist the data I opted for Apache Cassandra. It's a good fit even for a low powered Raspberry Pi. Cassandra is java technology. So before installing Cassandra you have to install java. It's all written up nicely in the following posts:

  1. Install Java
  2. Installing Cassandra

Overview of the process

The code

  1. Data Gathering in Arduino
  2. Python serial to Cassandra bridge
To be continued ...

How to change Cassandra compaction strategy on a production cluster

I’ll talk about changing Cassandra CompactionStrategy on a live production Cluster.
First of all, an extract of the Cassandra documentation :

Periodic compaction is essential to a healthy Cassandra database because Cassandra does not insert/update in place. As inserts/updates occur, instead of overwriting the rows, Cassandra writes a new timestamped version of the inserted or updated data in another SSTable. Cassandra manages the accumulation of SSTables on disk using compaction. Cassandra also does not delete in place because the SSTable is immutable. Instead, Cassandra marks data to be deleted using a tombstone.

By default, Cassandra use SizeTieredCompactionStrategyi (STC). This strategy triggers a minor compaction when there are a number of similar sized SSTables on disk as configured by the table subproperty, 4 by default.

Another compaction strategy available since Cassandra 1.0 is LeveledCompactionStrategy (LCS) based on LevelDB.
Since 2.0.11, DateTieredCompactionStrategy is also available.

Depending on your needs, you may need to change the compaction strategy on a running cluster. Change this setting involves rewrite ALL sstables to the new strategy, which may take long time and can be cpu / i/o intensive.

I needed to change the compaction strategy on my production cluster to LeveledCompactionStrategy because of our workflow : lot of updates and deletes, wide rows etc.
Moreover, with the default STC, progressively the largest SSTable that is created will not be compacted until the amount of actual data increases four-fold. So it can take long time before old data are really deleted !

Note: You can test a new compactionStrategy on one new node with the write_survey bootstrap option. See the datastax blogpost about it.

The basic procedure to change the CompactionStrategy is to alter the table via cql :

cqlsh> ALTER TABLE mykeyspace.mytable  WITH compaction = { 'class' :  'LeveledCompactionStrategy'  };

If you run alter table to change to LCS like that, all nodes will recompact data at the same time, so performances problems can occurs for hours/days…

A better solution is to migrate nodes by nodes !

You need to change the compaction locally on-the-fly, via the JMX, like in write_survey mode.
I use jmxterm for that. I think I’ll write articles about all theses jmx things :)
For example, to change to LCS on mytable table with jmxterm :

~ java -jar jmxterm-1.0-alpha-4-uber.jar --url instance1:7199                                                      
Welcome to JMX terminal. Type "help" for available commands.
$>domain org.apache.cassandra.db
#domain is set to org.apache.cassandra.db
$>bean org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies
#bean is set to org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies
$>get CompactionStrategyClass
#mbean = org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies:
CompactionStrategyClass = org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
$>set CompactionStrategyClass "org.apache.cassandra.db.compaction.LeveledCompactionStrategy" 
#Value of attribute CompactionStrategyClass is set to "org.apache.cassandra.db.compaction.LeveledCompactionStrategy" 

A nice one-liner :

~ echo "set -b org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies CompactionStrategyClass org.apache.cassandra.db.compaction.LeveledCompactionStrategy" | java -jar jmxterm-1.0-alpha-4-uber.jar --url instance1:7199

On next commitlog flush, the node will start it compaction to rewrite all it mytable sstables to the new strategy.

You can see the progression with nodetool :

~ nodetool compactionstats
pending tasks: 48
compaction type        keyspace           table       completed           total      unit  progress
Compaction        mykeyspace       mytable      4204151584     25676012644     bytes    16.37%
Active compaction remaining time :   0h23m30s

You need to wait for the node to recompact all it sstables, then change the strategy to instance2, etc.
The transition will be done in multiple compactions if you have lots of data. By default new sstables will be 160MB large.

you can monitor you table with nodetool cfstats too :

~ nodetool cfstats mykeyspace.mytable
[...]
Pending Tasks: 0
        Table: sort
        SSTable count: 31
        SSTables in each level: [31/4, 0, 0, 0, 0, 0, 0, 0, 0]
[...]

You can see the 31/4 : it means that there is 31 sstables in L0, whereas cassandra try to have only 4 in L0.

Taken from the code ( src/java/org/apache/cassandra/db/compaction/LeveledManifest.java )

[...]
// L0: 988 [ideal: 4]
// L1: 117 [ideal: 10]
// L2: 12  [ideal: 100]
[...]

When all nodes have the new strategy, let’s go for the global alter table. /!\ If a node restart before the final alter table, it will recompact to default strategy (SizeTiered)!

~ cqlsh 
cqlsh> ALTER TABLE mykeyspace.mytable  WITH compaction = { 'class' :  'LeveledCompactionStrategy'  };

Et voilà, I hope this article will help you :)

My latest Cassandra blogpost was one year ago… I have several in mind (jmx things !) so stay tuned !

Cassandra Community Handling 100 000 req per second

Intro

Recently I got an assignment to prove that Cassandra cluster can hold up to 100 000 requests per second. Also all this had to be done on the budget and with not so much time spent on development of the whole application. This setup had to be as close to the real thing as possible. We will go trough the details soon. Here is just the basic overview of the experiment:

Amazon

Generating and handling the load on this scale requires the infrastructure that is usually not available within a personal budget so I turned to Amazon EC2. I listened about the EC2 for quite some time now and It turned out really easy to use. Basically All you have to do is to setup a security group and store the "pem" file for that security group. Really easy and if anybody didn't try it yet there is a free micro instance available for a whole year after registering. I won't go into details of how to setup the security group. It's all described in the DataStax documentation. Note that the security definition is a bit extensive and that defining the port range from 1024-65535 is sufficient for an inter group communication and I didn't expose any ports to the public as described in the documentation. The second part is generating the key pair. In the rest of the document I'll reference this file as "cassandra.pem".

Load

Generating the load on that scale is not as easy as it might seem. After some searching I've stumbled upon the following. So I came to a conclusion that the best solution is to use Tsung. I've setup the load generating machines with the following snippet. Note that I've placed the "cassandra.pem" file on the node from which I'll start running tsung. Read the node addresses from the aws console. The rest is pretty much here:

        # do this only for the machine from which you'll initiate tsung
        scp -i cassandra.pem cassandra.pem ec2-user@tsung_machine:~

        # connect to every load machine and install erlang and tsung
        ssh -i cassandra.pem ec2-user@every_load_machine

        # repeat this on every node
        sudo yum install erlang

        wget http://tsung.erlang-projects.org/dist/tsung-1.5.1.tar.gz
        tar -xvzf tsung-1.5.1.tar.gz
        cd tsung-1.5.1
        ./configure
        make
        sudo make install

        # you can close other load nodes now
        # go back to the first node. and move cassandra.pem to id_rsa
        mv cassandra.pem .ssh/id_rsa

        # now make an ssh connection from first tsung node to every
        # load generating machine (to add the host key) so that
        # the first tsung node won't have any problem connecting to
        # other nodes and issuing erlang commands to them
        ssh ip-a-b-c-d
        exit

        # create the basic.xml file on the first tsung node
        vi basic.xml
    

The second part with the load generating machines is to edit the basic.xml file. To make it more interesting we are going to send various kinds of messages with a timestamp. The users list will be predefined in a file userlist.csv. Note that the password is the same for all the users, you can adapt this to your own needs or completely remove the password:

        0000000001;pass
        0000000002;pass
        0000000003;pass
        ...
        ...
        ...
    

The tsung tool is well documented, the configuration I used is similar to this:

        <?xml version="1.0" encoding="utf-8"?>
        <!DOCTYPE tsung SYSTEM "/usr/share/tsung/tsung-1.0.dtd" []>
        <tsung loglevel="warning">

        <clients>
            <client host="ip-a-b-c-d0" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d1" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d2" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d3" cpu="8" maxusers="25"/>
        </clients>

        <servers>
            <server host="app-servers-ip-addresses-internal" port="8080" type="tcp"/>
            <!-- enter the rest of the app servers here-->
        </servers>

        <load>
            <arrivalphase phase="1" duration="11" unit="minute">
                <users maxnumber="100" arrivalrate="100" unit="second"/>
            </arrivalphase>
        </load>

        <options>
            <option name="file_server" id='id' value="userlist.csv"/>
        </options>

        <sessions>
            <session probability="100" name="load_session" type="ts_http">
                <setdynvars sourcetype="file" fileid="id" delimiter=";" order="iter">
                    <var name="username" />
                    <var name="pass" />
                </setdynvars>
                <setdynvars sourcetype="eval"
                            code="fun({Pid,DynVars}) -&gt;
                            {Mega, Sec, Micro} = os:timestamp(),
                            (Mega*1000000 + Sec)*1000 + round(Micro/1000)
                            end.
                            ">
                    <var name="millis" />
                </setdynvars>
                <for from="1" to="10000000" var="i">
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%ABC41.7127837,42.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%DEF43.7127837,44.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%GHI45.7127837,46.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%JKL47.7127837,48.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%MNO49.7127837,50.71278370000.0"  method="GET"/>
                    </request>
                </for>
            </session>
        </sessions>
        </tsung>
    

Resources

  • 3x c3.xlarge
  • 1x c4.xlarge
Note I've added c4 node because I was limited on the amazon with the number of instances I could boot.

App

I've spent most of the time on the app part when developing. The basics for the component handling the requests was netty listener. In one of my previous posts I described how to use netty to handle http requests and acknowledge them with HELLO message. Here I acknowledged them with OK.

The most complicated part with the messages was sending them to cassandra as fast as possible. The fastest way to send them is to use executeAsync. Initially I had trouble with it where I was loosing messages. Some of the issues were due to concurrency. Some were due to poor understanding of the DataStax driver.

Concurrency - Basically what I was doing was that I tried to save on instantiating the BoundStatement instances because of the overal speed. The BoundStatement is not thread safe and after calling the bind method it returns "this". It took me some time to figure this out because when used in loops this behavior is not dangerous. Anyway, thanks to colleague I figured it out.

        // always instantiate new in concurrent code
        // don't reuse and make multiple calls with .bind()!

        BoundStatement bs = new BoundStatement(insertStatement);
    

Asynchronous execution - also a bit tricky. The executeAsync returns a future. Initially I was just adding it to Futures.

        // don't do this under heavy load with the result of executeAsync
        // in Cassandra you will start to loose data

        Futures.addCallback(future, ...
    

After some trial and error I found a pattern where I didn't loose any data:

        // here we are going to keep the futures
        private ArrayBlockingQueue<ResultSetFuture> queue = 
            new ArrayBlockingQueue<>(10000);

        // in the handling code
        queue.add(session.executeAsync(bs));

        // when reaching 1000th element in the queue
        // start emptying it
        if (queue.size() % 1000 == 0) {
            ResultSetFuture elem;
            do {
                elem = queue.poll();
                if (elem != null) {
                    elem.getUninterruptibly();
                }
            } while (elem != null);
        }

        // this will make your insertions around
        // 4x faster when compared to normal execute
    

App setup

The instances come with Open JDK installed. This doesn't guarantee the best performance so I installed the Oracle java. In order not to loose the time on firewall setup I simply copied the "cassandra.pem" file to every node.

        # copy ".jar" and "cassandra.pem" file to a single app node
        # copy the two files from single node to other nodes
        # it's a lot faster then uploading to every node (at least on my connection)

        # setup the machine
        wget --no-check-certificate --no-cookies - --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u71-b14/jdk-7u71-linux-x64.tar.gz"
        
        tar -xvzf jdk-7u71-linux-x64.tar.gz

        sudo update-alternatives --install "/usr/bin/java" "java" "/home/ec2-user/jdk1.7.0_71/jre/bin/java" 1
        
        # pick the new java number in this step
        sudo update-alternatives --config java

        # check with this
        java -version
    

Resources

  • 2x c4.xlarge
  • 2x c4.2xlarge
  • 4x c3.xlarge
Note I've added c4 nodes because I was limited on the amazon with the number of instances I could boot. Also I had to request it with the customer service but I couldn't assume how many instances of every type I'll use so the instances are not of the same type for load and app servers.

Cassandra

Setting up the Cassandra is the easiest part of the whole undertaking. All I did was following this guide by DataStax.

Resources

  • 7x c3.2xlarge
After hanging on the 90 000 req/s for a while I came to conclusion that perhaps the replication factor of two might be too much for the resources I had available. I would probably need to further increase the number of Cassandra nodes but since I couldn't get any more instance up I've set the replication to 1. Notice that this replication factor does not allow loosing nodes in the cluster without loosing the data. But the goal here is 100 000 req/s on a budget :)

Results

In the end it took me around 30$ to reach the 100k limit. I'm afraid to calculate how much this setup would cost on a monthly or yearly basis.

The successful run looked like this:

Total messages: 31 145 914 messages
Checked number: 31 145 914 messages
Average: 103 809 req/s

Don't be afraid to send me an email if you have any questions what so ever ;)

Setting up Cassandra Cluster in Virtual Machines

Intro

From time to time having just one Cassandra instance installed on your machine is not enough because you want to test certain behaviors when Cassandra cluster is up and running. Having extra spare hardware on the side or processing time on amazon is not always an option. So it's a good idea to setup a simple cluster on your own machine with instances in virtual machines. This post is going to show you how to do it with VirtualBox.

Getting VirtualBox Images

The reason why I chose VirtualBox is that there are lot of free virtual images available. Most of the time you'll be installing Cassandra on a Linux machine. I decided to go with the CentOS. Head over to http://virtualboxes.org/images/centos/ and download CentOS-6.6-x86_64-minimal. The default settings are fine for every machine. Create couple of them, give them names so that you can differentiate between them (Node1, Node2, etc. ...).

Perhaps the best idea would be for you to setup one node first and then make copies afterwards. Do not forget to set the network to bridged adapter. The username and password for the virtual machines are probably set to "root/reverse" but check those options when downloading the virtual box image. To keep it short I'll just continue with using the root user. When doing things in production it's an extremely bad practice.

Setup networking

When importing .ova file virtual box is going to ask you if you want to reinitialize mac address. Check that option. There is a certain amount of buggy behavior when it comes down to networking. So to prevent those errors run the following command when logging in to the virtual machine (root/reverse):

        rm  /etc/udev/rules.d/70-persistant-net.rules
    
When VirtualBoxinitializes the networking on the virtual machine it put a new mac address to a file. There seems to be a bug where this mac address is not transferred from that file to the virtual machine settings. Run the following command and copy the MAC Address.
        cat /etc/sysconfig/network-scripts/ifcfg-eth0
    
Shutdown the machine and set the mac address under Settings > Network > Advanced > MAC Address

Install Java

Just to make things a bit easier we're going to install wget:

        yum install wget
    
Now we are going to install java:
        $ cd /opt/
        $ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u72-b14/jdk-7u72-linux-x64.tar.gz"
        $ tar xzf jdk-7u72-linux-x64.tar.gz
        $ rm jdk-7u72-linux-x64.tar.gz

        $ cd /opt/jdk1.7.0_72/

        $ alternatives --install /usr/bin/java java /opt/jdk1.7.0_72/bin/java 2
        $ alternatives --config java

        $ alternatives --install /usr/bin/jar jar /opt/jdk1.7.0_72/bin/jar 2
        $ alternatives --install /usr/bin/javac javac /opt/jdk1.7.0_72/bin/javac 2
        $ alternatives --set jar /opt/jdk1.7.0_72/bin/jar
        $ alternatives --set javac /opt/jdk1.7.0_72/bin/javac

        $ vi /etc/profile.d/java.sh
        export JAVA_HOME=/opt/jdk1.7.0_72
        export JRE_HOME=/opt/jdk1.7.0_72/jre
        export PATH=$PATH:/opt/jdk1.7.0_72/bin:/opt/jdk1.7.0_72/jre/bin
    
reboot (and check with echo $JAVA_HOME[enter])

Install Cassandra

Cassandra is installed and run by the following commands:

        $ cd /opt/
        $ wget http://downloads.datastax.com/community/dsc-cassandra-2.1.2-bin.tar.gz
        $ tar xzf dsc-cassandra-2.1.2-bin.tar.gz
        $ rm dsc-cassandra-2.1.2-bin.tar.gz

        [check ip address with ifconfig]

        $ cd conf

        $ vi cassandra.yaml
            rpc_address: ip address of the node
            broadcast_address: ip address of the node
            - seeds: ip_address of the first node

        $ cd ../bin
        $ ./cassandra
    

Firewall settings

The cluster will not work out of the box because of the firewall settings. To start everything you will need to enable the following ports:

        $ iptables -I INPUT -p tcp -m tcp --dport 9042 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7000 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7001 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7199 -j ACCEPT

        $ /etc/init.d/iptables save

        $ service iptables restart
    
Now make copies of this machine and update cassandra.yaml file with the ip addresses of the new machines. Also do check /var/log/cassandra/system.log to see if other nodes are joining in.

Installing Cassandra on MINIX NEO X5 min (android multimedia player)

Intro

I started doing some DIY home automation projects. Although I have the mega popular Raspberry Pi available I decided to use the MINIX NEO X5 mini because I felt this device could be used a lot better if it served me as some sort of home automation server. The first part in this story is getting a more server oriented OS on the device. I decided to go with the linux. After a lot of searching and trial and error I decided to deploy an application called Linux deploy and described it in my previous blog post. Trough the rest of the tutorial I'll assume you managed to install a linux instance on your MINIX. I am going to gather a lot of telemetry data with the solution I am building so installing Cassandra seems as a natural choice to me. There will be a lot of writes and Cassandra is good at writing at an incredible scale.

Installing Java

        $ echo "deb http://ppa.launchpad.net/webupd8team/java/ubuntu trusty main" | sudo tee /etc/apt/sources.list.d/webupd8team-java.list
        $ echo "deb-src http://ppa.launchpad.net/webupd8team/java/ubuntu trusty main" | sudo tee -a /etc/apt/sources.list.d/webupd8team-java.list
        $ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys EEA14886
        $ sudo apt-get update
        $ sudo apt-get install oracle-java8-installer
        # you'll need to accept license agreement

        # set environment variables
        $ sudo apt-get install oracle-java8-set-default

        # login once again just in case
        $ exit
    

Installing python

Cassandra comes with a very nice tool called cqlsh. The version of linux we currently have installed will not run it without a python available on the system. So we have to install it first.

        $ sudo apt-get install python2.7
    

Let's start the Cassandra

Configuring the Cassandra is a chapter on it's own. We'll make minimal adjustments before starting. We'll configure the Cassandra to respond to queries from other hosts and while we are at it we'll enable the virtual nodes. (Will be easier to scale later).

        $ cd CASSANDRA_INSTALL_DIRECTORY
        $ nano conf/cassandra.yaml

        # uncomment
        num_tokens: 256

        # change to 0.0.0.0
        # this will enable you to contact the cassandra
        # from other computers etc.
        rpc_address: 0.0.0.0

        #save file

        $ cd ..
        $ ./bin/cassandra

        # after seeing something like
        # Startup completed! Now serving reads.
        # press ^C (don't be afraid cassandra still runs)

        $ bin/cqlsh

        Connected to Test Cluster at localhost:9160.
        [cqlsh 3.1.8 | Cassandra 1.2.18 | CQL spec 3.0.5 | 
        Thrift protocol 19.36.2]
        Use HELP for help.
        cqlsh>
    

Shutting cassandra down:

        # find PID of a cassandra process

        $ ps -ef | grep cassandra

        # run kill -9 [the PID number ... i.e. 8212]
    

Running Cassandra on android multimedia player is fun :)

Cassandra with Node.js and Arduino

Intro

This post continues where this post stopped. The Cassandra setup used for this post is more or less the same so please read this post if you are interested in cassandra setup before continuing with the rest of the post.

Arduino

Learning big data stuff is most exciting when the data represents something from the real world and not something generated with a help of big loop and then randomized data in it. To create data for this example I've used the following components:

  1. arduino uno
  2. Photoresistor GL5528 LDR
  3. 10K OHM NTC Thermistor 5mm
  4. 2x 10k resistor
  5. Protoboard
  6. Wires
Couple of this inexpensive components combined with arduino give us a nice big data sensor / generator. Now it might not seem that complicated but sampling any data at a one second level will hit on the cassandra limitations after one month of sampling if not done right, so having a simple arduino setup is fun and motivating way to tackle learning cassandra stuff. For now let's concentrate on the arduino part. The wiring is shown here:


The Arduino sketch will be on the gitHub, so we'll concentrate on the important parts. The light level in this example is read at analog 0. Reading analog values in arduino results in values ranging from 0-1023. We'll define light level as a mapping from 0-1023 into 0-100. Arduino already has a built in function for this called map. Also, I had some trouble in my initial experiments with Arduino serial communication and reading pin values. The data written to the serial port simply got corrupted after a while. I've read a couple of forums on this subject and found out that it actually helps when one delays execution after reading a pin value for 1ms. Also to keep the things as stable as possible we'll pause the execution for 1 second after writing to serial port as shown here:

  int light = map(analogRead(0), 0, 1023, 0, 100);
  delay(1);

    ....

  sprintf(sOut, "%d,%s", light, deblank(sTemp));

  Serial.println(sOut);
  delay(1000);
 

Node.js and Cassandra

Parsing the messages that come from the measuring devices is pretty repetitive stuff that causes pretty ugly code. I've learned that the hard way. To make parsing of this messages as easy as possible I've written a small utility package for parsing the messages that come from the measuring devices and it's available on npm.

Using serial ports in node.js doesn't take a lot of steps to setup:

  var serial = require( "serialport" );
  var SerialPort = serial.SerialPort;

  var portName = "/dev/tty.usb-something";

  var sp = new SerialPort(portName, {
      baudrate:9600,
      parser:serial.parsers.readline("\n")
  });

  sp.on("data", function ( data ) {
  var arduinoData = translator.parse(data);
  //...
 

To make the data handling easier and more in accordance with cassandra best practices the readings will be partitioned by date when they were recorded.

  CREATE TABLE room_data (
    day text,
    measurementtime timestamp,
    light int,
    temperature float,
    PRIMARY KEY (day, measurementtime)
  ) WITH CLUSTERING ORDER BY (measurementtime DESC);
 

Also the data will probably be more often fetched for recent time stamps with queries that have limits set on them. To make this fetching easier we've added a clustering statement above. Also to get the current light and temperature level we would just have to run the following query (no where combined with now function):

  SELECT * FROM room_data LIMIT 1;
 

After setting up the cassandra and reading the data from the serial port and parsing the data it's time to write this data into the cassandra. Analyzing the data and doing something useful with it will be in some future posts that I'll make but for now I'll stop with writing the data into cassandra:

  client.execute('INSERT INTO room_data ' + 
   '(day, measurementtime, light, temperature)' + 
   ' VALUES (?, dateof(now()), ?, ?)',
   [
    moment().format('YYYY-MM-DD'),
    arduinoData.light,
    arduinoData.temperature
   ],
   function(err, result) {
    if (err) {
     console.log('insert failed', err);
    }
   }
  );
 

On the fifth line I've used moment.js to format current time into string representation of current date used for partitioning in cassandra. The rest of the code is pretty much the usual sql stuff found in other database environments.

I recorder couple of hours worth of data here. Just in case anybody wants a sneak peak without having to setup everything up. I've exported the data out from cassandra trought cql using this command:

  COPY room_data (day, measurementtime, light, temperature) 
   TO 'room_data.csv';
 

The rest of the example is located on gitHub.

Replace a dead node in Cassandra

Note (June 2020): this article is old and not really revelant anymore. If you use a modern version of cassandra, look at -Dcassandra.replace_address_first_boot option !

I want to share some tips about my experimentations with Cassandra (version 2.0.x).

I found some documentations on datastax website about replacing a dead node, but it is not suitable for our needs, because in case of hardware crash, we will set up a new node with exactly the same IP (replace “in place”). Update : the documentation in now up to date on datastax !

If you try to start the new node with the same IP, cassandra doesn’t start with :

java.lang.RuntimeException: A node with address /10.20.10.2 already exists, cancelling join. Use cassandra.replace_address if you want to replace this node.

So, we need to use the “cassandra.replace_address” directive (which is not really documented ? :() See this commit and this bug report, available since 1.2.11/2.0.0, it’s an easier solution and it works.

+    - New replace_address to supplant the (now removed) replace_token and
+      replace_node workflows to replace a dead node in place.  Works like the
+      old options, but takes the IP address of the node to be replaced.

It’s a JVM directive, so we can add it at the end of /etc/cassandra/cassandra-env.sh (debian package), for example:

JVM_OPTS="$JVM_OPTS -Dcassandra.replace_address=10.20.10.2" 

Of course, 10.20.10.2 = ip of your dead/new node.

Now, start cassandra, and in logs you will see :

INFO [main] 2014-03-10 14:58:17,804 StorageService.java (line 941) JOINING: schema complete, ready to bootstrap
INFO [main] 2014-03-10 14:58:17,805 StorageService.java (line 941) JOINING: waiting for pending range calculation
INFO [main] 2014-03-10 14:58:17,805 StorageService.java (line 941) JOINING: calculation complete, ready to bootstrap
INFO [main] 2014-03-10 14:58:17,805 StorageService.java (line 941) JOINING: Replacing a node with token(s): [...]
[...]
INFO [main] 2014-03-10 14:58:17,844 StorageService.java (line 941) JOINING: Starting to bootstrap...
INFO [main] 2014-03-10 14:58:18,551 StreamResultFuture.java (line 82) [Stream #effef960-6efe-11e3-9a75-3f94ec5476e9] Executing streaming plan for Bootstrap

Node is in boostraping mode and will retrieve data from cluster. This may take lots of time.
If the node is a seed node, a warning will indicate that the node did not auto bootstrap. This is normal, you need to run a nodetool repair on the node.

On the new node :

# nodetools netstats

Mode: JOINING
Bootstrap effef960-6efe-11e3-9a75-3f94ec5476e9
    /10.20.10.1
        Receiving 102 files, 17467071157 bytes total
[...]

After some time, you will see some informations on logs !
On the new node :

 INFO [STREAM-IN-/10.20.10.1] 2014-03-10 15:15:40,363 StreamResultFuture.java (line 215) [Stream #effef960-6efe-11e3-9a75-3f94ec5476e9] All sessions completed
 INFO [main] 2014-03-10 15:15:40,366 StorageService.java (line 970) Bootstrap completed! for the tokens [...]
[...]
 INFO [main] 2014-03-10 15:15:40,412 StorageService.java (line 1371) Node /10.20.10.2 state jump to normal
 WARN [main] 2014-03-10 15:15:40,413 StorageService.java (line 1378) Not updating token metadata for /10.20.30.51 because I am replacing it
 INFO [main] 2014-03-10 15:15:40,419 StorageService.java (line 821) Startup completed! Now serving reads.

And on other nodes :

 INFO [GossipStage:1] 2014-03-10 15:15:40,625 StorageService.java (line 1371) Node /10.20.10.2 state jump to normal

Et voilà, dead node has been replaced !
Don’t forget to REMOVE modifications on cassandra-env.sh after the complete bootstrap !

Enjoy !

Hello Cassandra in node.js

Intro

Since I started to work in a team that deals with BigData stuff I came into contact with Apache Cassandra. After years in the relational world it took me some getting used to the many concepts that the Cassandra relies on. Actually in the relational world the concepts would be heavy anti patterns. I went over a couple of tutorials etc. for intro into the Cassandra data model I would recommend this video by Patrick McFadin:

C* Summit 2013: The World's Next Top Data Model


Basic setup

The easiest way to get the Cassandra is to download it from here: http://planetcassandra.org/Download/StartDownload

I somehow dislike when various applications write to /var/something and having to use the root access to install something unless it's absolutely necessary. So I followed this manual to avoid this problem.


cassandra.yaml

The Cassandra is setup out of the box to support queries coming from cql shell ("cqlsh"). The goal of this blog entry is to show how to make a simple connection from node.js to the Cassandra, so there is a bit of tweaking that has to be done in order to get all this working. The necessary configuration is located in this file:

 install_dir/conf/cassandra.yaml
The properties I had to change were (basically this allows logging in with users other than default):
 authenticator: PasswordAuthenticator
 authorizer: CassandraAuthorizer
After that going into bin directory and running cqlsh will require username & password
 ./cqlsh -u cassandra -p cassandra

Cassandra keyspace setup

 CREATE KEYSPACE test WITH REPLICATION = 
  {'class': 'SimpleStrategy', 'replication_factor': 1};

 --check if it's created with this
 DESCRIBE KEYSPACES;

 USE test;

 CREATE TABLE test_table (
  id text,
  test_value text,
  PRIMARY KEY (id)
 );

 INSERT INTO test_table (id, test_value) VALUES ('1', 'a');

 INSERT INTO test_table (id, test_value) VALUES ('2', 'b');

 INSERT INTO test_table (id, test_value) VALUES ('3', 'c');

 SELECT * FROM test_table;
 
If everything is o.k. you should see something like:
  id  | test_value
 ----+------------
   3 |          c
   2 |          b
   1 |          a

 (3 rows)
Add a testuser to make the hello world example work:
  create user testuser with password 'testuser';

  grant all on test.test_table to testuser;

node-cassandra-cql

I tried several Cassandra connection libraries from gitHub for the node.js and the one that I found most easy to work with (and setup) was node-cassandra-cql by jorgebay. The story with the project is pretty much standard. Going into new project empty directory and initializing it with init and then installing module with npm.

 npm init

 npm install node-cassandra-cql

 #copy hellocassandra.js from
 #https://github.com/msval/hellocassandrainnodejs

 node hellocassandra.js
 

Anyway here's my example on gitHub.