Planet Cassandra

All your NoSQL Apache Cassandra resources in one place.

TLP Dashboards for Datadog users, out of the box.

We had the pleasure to release our monitoring dashboards designed for Apache Cassandra on Datadog last week. It is a nice occasion to share our thoughts around Cassandra Dashboards design as it is a recurrent question in the community.

We wrote a post about this on the Datadog website here.

For people using Datadog we hope this will give more details on how the dashboards were designed, thus on how to use the dashboards we provided. For others, we hope this information will be useful in the process of building and then using your own dashboards, with the technology of your choice.

The Project

Building an efficient, complete, and readable set of dashboards to monitor Apache Cassandra is time consuming and far from being straightforward.

Those who tried it probably noticed it requires a fair amount of time and knowledge with both the monitoring technology in use (Datadog, Grafana, Graphite or InfluxDB, metrics-reporter, etc) and of Apache Cassandra. Creating dashboards is about picking the most relevant metrics, aggregations, units, chart type and then gather them in a way that this huge amount of data actually provides usable information. Dashboards need to be readable, understandable and easy to use for the final operator.

On one hand, creating comprehensive dashboards is a long and complex task. On the other hand, every Apache Cassandra cluster can be monitored roughly the same way. Most production issues can be detected and analyzed using a common set of charts, organized the same way, for all the Apache Cassandra clusters. Each cluster may require additional operator specific dashboards or charts depending on workload and merging of metrics outside of Cassandra, but those would supplement the standard dashboards, not replace them. There are some differences depending on the Apache Cassandra versions in use, but they are relatively minor and not subject to rapid change.

In my monitoring presentation at the 2016 Cassandra Summit I announced that we were working on this project.

In December 2017 it was release for Datadog users. If you want to get started with these dashboards and you are using Datadog, see how to do this documentation on Datadog integration for Cassandra.

Dashboard Design

Our Approach to Monitoring

The dashboards have been designed to allow the operator to do the following:

  1. Easily detect any anomaly (Overview Dashboard)
  2. Be able to efficiently troubleshoot and fix the anomaly (Themed Dashboards)
  3. Find the bottlenecks and optimize the overall performance (Themed Dashboards)

The 2 later points above can be seen as the same kind of operations which can be supported by the same set of dashboards.

Empowering the operator

We strongly believe that showing the metrics to the operator can be a nice entry point for learning about Cassandra. Each of the themed dashboards monitor a distinct internal processes of Cassandra. Most of the metrics related to this internal process are then grouped up within a Dashboard. We think it makes it easier for the operator to understand Cassandra’s internal processes.

To make it clearer, let’s consider the example of someone completely new to Cassandra. On first repair, the operator starts an incremental repair without knowing anything about it and latencies increase substantially after a short while. Classic.

The operator would notice a read latency in the ‘Overview Dashboard’, then aim at the ‘Read Path Dashboard’. There the operator would be able to notice that the number of SSTables went from 50 to 800 on each node, or for a table. If the chart is there out of the box, even if not knowing what an SSTable is the operator can understand something changed there and that it relates to the outage somehow. The operator would then search in the right direction, probably solving the issue quickly, and possibly learning in the process.

What to Monitor: Dashboards and Charts Detail

Here we will be really focusing on charts details and indications on how to use each chart efficiently. While this post is a discussion of dashboards available for DataDog, the metrics can be visualized using any tool, and we believe this would be a good starting point when setting up monitoring for Cassandra.

In the graphs, the values and percentiles chosen are sometime quite arbitrary and often depend on the use case or Cassandra setup. The point is to give a reference, a starting point on what could be ‘normal’ or ‘wrong’ values. The Apache Cassandra monitoring documentation, the mailing list archive, or #cassandra on #freenode (IRC) are good ways to answer questions that might pop while using dashboards.

Some dashboards are voluntary duplicated across dashboards or within a dashboard, but with distinct visualisation or aggregation.

Detect anomalies: Overview Dashboard

We don’t try to troubleshoot at this stage. We want to detect outages that might impact the service or check that the Cassandra cluster is globally healthy. To accomplish this, this Overview Dashboard aims at both being complete and minimalist.

Complete as we want to be warned anytime “something is happening“ in the Cassandra cluster. Minimalist because we don’t want to miss an important information here because of the flood of non-critical or too low level informations. These charts aim answer the question: “Is Cassandra healthy?”.

TLP Dashboards - Overview

Troubleshoot issues and optimize Apache Cassandra: Themed dashboards

The goal here is to divide the information into smaller, more meaningful chunks. When having an issue, it will often only affect one of the subsystems of Cassandra, so the operator can have all the needed information in one place when working on a specific issue, without having irrelevant informations (for this specific issue) hiding more important information.

For this reason these dashboards must maximize the information on a specific theme or internal process of Cassandra and show all the low level information (per table, per host). We are often repeating charts from other dashboards, so we always find the information we need as Cassandra users. This is the contrary to the overview dashboard needs mentioned above that just shows “high level” information.

Read Path Dashboard

In this dashboard we are concerned about any element that could impact a high level client read. In fact, we want to know about everything that could affect the read path in Cassandra by just looking at this dashboard.

TLP Dashboards - Read Path Dashboard

Write Path Dashboard

This dashboard focuses on a comprehensive view of the various metrics which affect write latency and throughput. Long garbage collection pause times will always result in dips in throughput and spikes in latency, so it is featured prominently on this dashboard.

TLP Dashboards - Write Path Dashboard

SSTable management Dashboard

This dashboard is about getting a comprehensive view of the various metrics which impact the asynchronous steps the data goes through after a write, from the flush to the data deletion with all the compaction processes in between. Here we will be willing to be aware of disk space evolution and make sure asynchronous management of SSTables is happening efficiently or as expected.

TLP Dashboards - SSTable Management Dashboard

Alerting, Automated Anomaly Detection.

To conclude, when happy with monitoring dashboards, it is a good idea to add some alerting rules.

It is important to detect all the anomalies as quickly as possible. To bring monitoring to the next level of efficiency, it is good to be warned automatically when something goes wrong.

We believe adding alerts on each of the “Overview Dashboard” metrics will be sufficient to detect most issues and any major outage, or at least be a good starting point. For each metric, the alerting threshold should be high enough not to trigger false alerts to ensure a mitigating action can be taken. Some alerts should use absolute value (Disk space available, CPU, etc), while others will require relative values. Manually tuning some alerts will be required based on configuration and workload, such as alerting on the latencies.

The biggest risk on alerting is probably to be flooded by false alerts as the natural inclination to start ignoring them, which leads to missing valid ones. As a global guideline, any alert should trigger an action, if it does not, this alert is relatively useless and adds noise.

DataStax Now Offering Docker Images for Development

Today we are taking the first step towards providing supported Docker images for all components of the DataStax Enterprise (DSE) data platform. We’ll be delivering these in phases, with the first being aimed at supporting developers in evaluation environments only. If you need help, file an issue in the GitHub repo or email us at techpartner@datastax.com

The following three DataStax images are now available for non-production environments, in the Docker Hub and Docker Store:

  • Docker Store:
    • DataStax Enterprise: The best distribution of Apache Cassandra™ plus integrated Search, Analytics, and multi-model capabilities. DSE also includes development and operations tooling: DSE Studio and DSE OpsCenter. DSE Studio and DSE OpsCenter are available as separate images on Docker Hub; details below.
  • Docker Hub:
    • DSE Studio: A visual, interactive web-based developer’s tool for DataStax Enterprise which is designed to accelerate your DSE development activities.
    • DSE OpsCenter: The web-based visual management and monitoring solution for DSE.

In addition, to make deploying DSE with Docker even simpler we are also providing:

  • Docker Compose scripts to enable you to easily deploy clusters and expose the components (DSE/Opscenter/Studio) to each other.
  • Access to the GitHub Repo for developers that want to customize the images

We also want to make these images universally applicable to all your key use cases. For simple use cases we’ve exposed common settings as environment variables. For advanced configuration management we’re providing a simple mechanism to let you change or modify configurations without replacing or customizing the containers. You can add any of the approved config files to a mounted host volume and we’ll handle the hard work of mapping them within the container. You can read more about that feature here.

There’s more we’d like to say but we know you’re anxious to get started. Here’s how:

Lastly – we’ll be holding office hours over the next few weeks to answer questions and get feedback. Register to attend the office hours here.

In the meantime, here are a few ways to provide feedback:

Reaper 1.0 Has Been Released!

In celebration of National Pickle Day, we’re proud to announce the 1.0 version of Reaper for Apache Cassandra. This release is a huge milestone for us. We’d like to start by thanking everyone who’s reported bugs and helped us test. We’d especially love to give a huge thank you to the teams which have sponsored development of the project along the way.

To help everyone get started using reaper, we’ve made a short video to help you get up and running in under 10 minutes.

First, the big items. To start, we’ve massively improved multi-dc support when using the Cassandra backend. It’s possible to run a reaper instance in each data center, and Reaper will use Cassandra’s multi-dc replication to coordinate. We’ve updated the documentation with the details on how to set this up.

Based on some feedback, we’ve simplified UI by hiding less used elements under a advanced panel that’s hidden by default. Setting up basic repairs should be easier with less options to consider initially.

Under the new advanced tab we added the ability for Reaper to repair specific nodes, instead of always needing to repair the entire cluster. This is useful when a node has been down for longer than the hint window. We’ve also moved the options for blacklisting, parallelism, repair intensity, and incremental repair.

One of the questions asked the most often was how many segments to configure for a given repair. The answer was not easy, because it depended on cluster size and vnode count. We’ve simplified this by making the optional field be segments per node instead of total segments.

Table blacklisting is new as well. This is useful for clusters with lots of tables, and a handful that don’t need to be repaired. This is common with tables using TTLs and TWCS.

On the performance side, the Cassandra backend has been improved to avoid excessive tombstone creation, which will help with read performance and decrease the overhead of compaction.

As usual, we strive to do more than just crank out features. We’ve made a considerable effort to reduce technical debt, improve organization, write better documentation, conform to consistent code style. We’ve made these investments to ensure a smooth path for future development.

The full list of issues can be found on GitHub.

As always, you can grab the latest version from the downloads section of cassandra-reaper.io.

New Features in the DataStax Node.js Drivers

Version 1.4.0 of the DataStax Enterprise Node.js Driver and version 3.3.0 of the DataStax Node.js Driver for Apache Cassandra are now available.

The main focus of these releases was to add support for speculative query executions. Additionally, we improved the performance of Murmur3 hashing and changed the query preparation logic along with other enhancements.

Speculative query executions

Speculative execution is a way to limit latency at high percentiles by preemptively starting one or more additional executions of the query against different nodes, that way the driver will yield the first response received while discarding the following ones.

Speculative executions are disabled by default. Speculative executions are controlled by an instance of SpeculativeExecutionPolicy provided when initializing the Client. This policy defines the threshold after which a new speculative execution is triggered.

The driver provides a ConstantSpeculativeExecutionPolicy that schedules a given number of speculative executions, separated by a fixed delay, the policy is exported under the {root}.policies.speculativeExecution submodule.

const client = new Client({
  contactPoints,
  policies: {
    speculativeExecution: new ConstantSpeculativeExecutionPolicy(
      200, // delay before a new execution is launched
      2) // maximum amount of additional executions
  }
});

Given the configuration above, an idempotent query would be handled this way:

  • Start the initial execution at t0
  • If no response has been received at t0 + 200 milliseconds, start a speculative execution on another node
  • if no response has been received at t0 + 400 milliseconds, start another speculative execution on a third node

As with the rest of policies in the driver, you can provide your own implementation by extending the SpeculativeExecutionPolicy prototype.

One important aspect to consider is whether queries are idempotent, (that is, whether they can be applied multiple times without changing the result beyond the initial application). If a query is not idempotent, the driver never schedules speculative executions for it, because there is no way to guarantee that only one node will apply the mutation. Examples of operations that are not idempotent are: counter increments/decrements; adding items to a list column; using non-idempotent CQL functions, like now() or uuid().

In the driver, query idempotence is determined by the isIdempotent flag in the QueryOptions, which defaults to false. You can set the default when initializing the Client or you can set it manually for each query, for example:

const query = 'SELECT * FROM users WHERE key = ?';
client.execute(query, [ 'usr1' ], { prepare: true, isIdempotent: true });

Note that enabling speculative executions causes the driver to send more individual requests, so throughput does not necessarily improve. You can read how speculative executions affect retries and other practical details in the documentation.

Improved Murmur3 hashing performance

Apache Cassandra uses Murmur3Partitioner to determine the distribution of the data across cluster partitions. The adapted version of the Murmur3 hashing algorithm used by Cassandra performs several 64-bit integer operations. As there isn't a native int64 representation in ECMAScript, previously we used to Google Closure's Long to support those operations.

To perform int64 add and multiply operations with int32 types requires you to use smaller int16 chunks to handle overflows. Google Closure's Long handles it by creating 4 uint16 chunks of each operand, performing the operations and creating a new int64 value (composed of 2 int32 values), as Long is immutable.

To improve the performance of the partitioner on Node.js, we created a custom type MutableLong that maintains 4 uint16 fields that are used to apply the operation, modifying the internal state, preventing additional allocations per operation.

Query preparation enhancements

Previously, the driver prepared the query only on the first node selected by the load-balancing policy, taking a lazy approach.

In this revision, we added fine tuning options on how the driver has to deal with query preparation, introducing 2 new options:

  • prepareOnAllHosts: That determines whether the driver should prepare the query on all hosts.
  • rePrepareOnUp: That when a node that has been down (unreachable) is considered back up, determines whether we should re-prepare all queries that have been prepared on other nodes.

Both properties are set to true by default. You can change it when creating the Client instance:

const client = new Client({
  contactPoints,
  prepareOnAllHosts: false,
  rePrepareOnUp: false
});

Expose connection pool state

The driver now provides a method to obtain a snapshot of the state of the pool per host. It provides the information of all hosts of the cluster, open connections per host and the amount of queries that are currently being executed (in-flight) through a given host.

You can check out the ClientState API docs for more information.

You can also use the string representation, that provides the information condensed in a readable format useful for debugging or periodic logging in production.

console.log('Pool state: %s', client.getState());

Wrapping up

More detailed information about all the features, improvements and fixes included in this release can be found in the changelogs: DSE driver changelog and Apache Cassandra driver changelog.

New version of the drivers are available on npm:

Your feedback is important to us and it influences what features we prioritize. To provide feedback use the following:

Phantom Consistency Mechanisms

In this blog post we will take a look at consistency mechanisms in Apache Cassandra. There are three reasonably well documented features serving this purpose:

  • Read repair gives the option to sync data on read requests.
  • Hinted handoff is a buffering mechanism for situations when nodes are temporarily unavailable.
  • Anti-entropy repair (or simply just repair) is a process of synchronizing data across the board.

What is far less known, and what we will explore in detail in this post, is a fourth mechanism Apache Cassandra uses to ensure data consistency. We are going to see Cassandra perform another flavour of read repairs but in far sneakier way.

Setting things up

In order to see this sneaky repair happening, we need to orchestrate a few things. Let’s just blaze through some initial setup using Cassandra Cluster Manager (ccm - available on github).

# create a cluster of 2x3 nodes
ccm create sneaky-repair -v 2.1.15
ccm updateconf 'num_tokens: 32'
ccm populate --vnodes -n 3:3

# start nodes in one DC only
ccm node1 start --wait-for-binary-proto
ccm node2 start --wait-for-binary-proto
ccm node3 start --wait-for-binary-proto

# create table and keypsace
ccm node1 cqlsh -e "CREATE KEYSPACE sneaky WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 3};"
ccm node1 cqlsh -e "CREATE TABLE sneaky.repair (k TEXT PRIMARY KEY , v TEXT);"

# insert some data
ccm node1 cqlsh -e "INSERT INTO sneaky.repair (k, v) VALUES ('firstKey', 'firstValue');"

The familiar situation

At this point, we have a cluster up and running. Suddenly, “the requirements change” and we need to expand the cluster by adding one more data center. So we will do just that and observe what happens to the consistency of our data.

Before we proceed, we need to ensure some determinism and turn off Cassandra’s known consistency mechanisms (we will not be disabling anti-entropy repair as that process must be initiated by an operator anyway):

# disable hinted handoff
ccm node1 nodetool disablehandoff
ccm node2 nodetool disablehandoff
ccm node3 nodetool disablehandoff

# disable read repairs
ccm node1 cqlsh -e "ALTER TABLE sneaky.repair WITH read_repair_chance = 0.0 AND dclocal_read_repair_chance = 0.0"

Now we expand the cluster:

# start nodes
ccm node4 start --wait-for-binary-proto
ccm node5 start --wait-for-binary-proto
ccm node6 start --wait-for-binary-proto

# alter keyspace
ccm node1 cqlsh -e "ALTER KEYSPACE sneaky WITH replication ={'class': 'NetworkTopologyStrategy', 'dc1': 3, 'dc2':3 };"

With these commands, we have effectively added a new DC into the cluster. From this point, Cassandra can start using the new DC to serve client requests. However, there is a catch. We have not populated the new nodes with data. Typically, we would do a nodetool rebuild. For this blog post we will skip that, because this situation allows some sneakiness to be observed.

Sneakiness: blocking read repairs

Without any data being put on the new nodes, we can expect no data to be actually readable from the new DC. We will go to one of the new nodes (node4) and do a read request with LOCAL_QUORUM consistency to ensure only the new DC participates in the request. After the read request we will also check the read repair statistics from nodetool, but we will set that information aside for later:

ccm node4 cqlsh -e "CONSISTENCY LOCAL_QUORUM; SELECT * FROM sneaky.repair WHERE k ='firstKey';"
ccm node4 nodetool netstats | grep -A 3 "Read Repair"

 k | v
---+---

(0 rows)

No rows are returned as expected. Now, let’s do another read request (again from node4), this time involving at least one replica from the old DC thanks to QUORUM consistency:

ccm node4 cqlsh -e "CONSISTENCY QUORUM; SELECT * FROM sneaky.repair WHERE k ='firstKey';"
ccm node4 nodetool netstats | grep -A 3 "Read Repair"

 k        | v
----------+------------
 firstKey | firstValue

(1 rows)

We now got a hit! This is quite unexpected because we did not run rebuild or repair meanwhile and hinted handoff and read repairs have been disabled. How come Cassandra went ahead and fixed our data anyway?

In order to shed some light onto this issue, let’s examine the nodetool netstat output from before. We should see something like this:

# after first SELECT using LOCAL_QUORUM
ccm node4 nodetool netstats  | grep -A 3 "Read Repair"
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 0
Mismatch (Background): 0

# after second SELECT using QUORUM
ccm node4 nodetool netstats  | grep -A 3 "Read Repair"
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 1
Mismatch (Background): 0

# after third SELECT using LOCAL_QUORUM
ccm node4 nodetool netstats  | grep -A 3 "Read Repair"
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 1
Mismatch (Background): 0

From this output we can tell that:

  • No read repairs happened (Attempted is 0).
  • One blocking read repair actually did happen (Mismatch (Blocking) is 1).
  • No background read repair happened (Mismatch (Background) is 0).

It turns out there are two read repairs that can happen:

  • A blocking read repair happens when a query can not complete with desired consistency level without actually repairing the data. read_repair_chance has no impact on this.
  • A background read repair happens in situations when a query succeeds but inconsistencies are found. This happens with read_repair_chance probability.

The take-away

To sum things up, it is not possible to entirely disable read repairs and Cassandra will sometimes try to fix inconsistent data for us. While this is pretty convenient, it also has some inconvenient implications. The best way to avoid any surprises is to keep the data consistent by running regular repairs.

In situations featuring non-negligible amounts of inconsistent data this sneakiness can cause a lot of unexpected load on the nodes, as well as the cross-DC network links. Having to do cross-DC reads can also introduce additional latency. Read-heavy workloads and workloads with large partitions are particularly susceptible to problems caused by blocking read repair.

A particular situation when a lot of inconsistent data is guaranteed happens when a new data center gets added to the cluster. In these situations, LOCAL_QUORUM is necessary to avoid doing blocking repairs until a rebuild or a full repair is done. Using a LOCAL_QUORUM is twice as important when the data center expansion happens for the first time. In one data center scenario QUORUM and LOCAL_QUORUM have virtually the same semantics and it is easy to forget which one is actually used.

Reaper has its own site!

We’re delighted to introduce cassandra-reaper.io, the dedicated site for the open source Reaper project! Since we adopted Reaper from the incredible folks at Spotify, we’ve added a significant number of features, expanded the supported versions past 2.0, added support for incremental repair, and added a Cassandra backend to simplify operations.

The road ahead is looking promising. We’re working to improve the Cassandra backend even further, leveraging Cassandra’s multi-dc features to enable multi-dc repair as well as fault tolerance for Reaper itself. We’ve tested this work internally as The Last Pickle as well as received community feedback. In addition to the site, we’ve also set up a Gitter based chat to keep development out in the open as well as help foster the community.

Over time we’re looking to expand the functionality of Reaper past handling just repairs. We would love for the Reaper WebUI to be the easiest way to perform all administrative tasks to a Cassandra cluster.

Limiting Nodetool Parallel Threads

A handy feature was silently added to Apache Cassandra’s nodetool just over a year ago. The feature added was the -j (jobs) option. This little gem controls the number of compaction threads to use when running either a scrub, cleanup, or upgradesstables. The option was added to nodetool via CASSANDRA-11179 to version 3.5. It has been back ported to Apache Cassandra versions 2.1.14, 2.2.6, and 3.5.

If unspecified, nodetool will use 2 compaction threads. When this value is set to 0 all available compaction threads are used to perform the operation. Note that the total number of available compaction threads is controlled by the concurrent_compactors property in the cassandra.yaml configuration file. Examples of how it can be used are as follows.

$ nodetool scrub -j 3
$ nodetool cleanup -j 1
$ nodetool upgradesstables -j 1 

The option is most useful in situations where disk space is scarce and a limited number of threads for the operation need to be used to avoid disk exhaustion.

Writing Scala Codecs for the Java Driver

One of the common griefs Scala developers express when using the DataStax Java driver is the overhead incurred in almost every read or write operation, if the data to be stored or retrieved needs conversion from Java to Scala or vice versa.

This could be avoided by using "native" Scala codecs. This has been occasionally solicited from the Java driver team, but such codecs unfortunately do not exist, at least not officially.

Thankfully, the TypeCodec API in the Java driver can be easily extended. For example, several convenience Java codecs are available in the driver's extras package.

In this post, we are going to piggyback on the existing extra codecs and show how developers can create their own codecs – directly in Scala.

Note: all the examples in this post are available in this Github repository.

Dealing with Nullability

It can be tricky to deal with CQL types in Scala because CQL types are all nullable, whereas most typical representations of CQL scalar types in Scala resort to value classes, and these are non-nullable.

As an example, let's see how the Java driver deserializes, say, CQL ints.

The default codec for CQL ints converts such values to java.lang.Integer instances. From a Scala perspective, this has two disadvantages: first, one needs to convert from java.lang.Integer to Int, and second, Integer instances are nullable, while Scala Ints aren't.

Granted, the DataStax Java driver's Row interface has a pair of methods named getInt that deserialize CQL ints into Java ints, converting null values into zeroes.

But for the sake of this demonstration, let's assume that these methods did not exist, and all CQL ints were being converted into java.lang.Integer. Therefore, developers would yearn to have a codec that could deserialize CQL ints into Scala Ints while at the same time addressing the nullability issue.

Let this be the perfect excuse for us to introduce IntCodec, our first Scala codec:


import java.nio.ByteBuffer
import com.datastax.driver.core.exceptions.InvalidTypeException
import com.datastax.driver.core.{DataType, ProtocolVersion, TypeCodec}
import com.google.common.reflect.TypeToken

object IntCodec extends TypeCodec[Int](DataType.cint(), TypeToken.of(classOf[Int]).wrap()) {

  override def serialize(value: Int, protocolVersion: ProtocolVersion): ByteBuffer = 
    ByteBuffer.allocate(4).putInt(0, value)

  override def deserialize(bytes: ByteBuffer, protocolVersion: ProtocolVersion): Int = {
    if (bytes == null || bytes.remaining == 0) return 0
    if (bytes.remaining != 4) throw new InvalidTypeException("Invalid 32-bits integer value, expecting 4 bytes but got " + bytes.remaining)
    bytes.getInt(bytes.position)
  }

  override def format(value: Int): String = value.toString

  override def parse(value: String): Int = {
    try {
      if (value == null || value.isEmpty || value.equalsIgnoreCase("NULL")) 0
      else value.toInt
    }
    catch {
      case e: NumberFormatException =>
        throw new InvalidTypeException( s"""Cannot parse 32-bits integer value from "$value"""", e)
    }
  }

}

All we did so far is extend TypeCodec[Int] by filling in the superclass constructor arguments (more about that later) and implementing the required methods in a very similar way compared to the driver's built-in codec.

Granted, this isn't rocket science, but it will get more interesting later. The good news is, this template is reproducible enough to make it easy for readers to figure out how to create similar codecs for every AnyVal that is mappable to a CQL type (Boolean, Long, Float, Double, etc... let your imagination run wild or just go for the ready-made solution).

(Tip: because of the automatic boxing/unboxing that occurs under the hood, don't use this codec to deserialize simple CQL ints, and prefer instead the driver's built-in one, which will avoid this overhead; but you can use IntCodec to compose more complex codecs, as we will see below – the more complex the CQL type, the more negligible the overhead becomes.)

Let's see how this piece of code solves our initial problems: as for the burden of converting between Scala and Java, Int values are now written directly with ByteBuffer.putInt, and read directly from ByteBuffer.getInt; as for the nullability of CQL ints, the issue is addressed just as the driver does: nulls are converted to zeroes.

Converting nulls into zeroes might not be satisfying for everyone, but how to improve the situation? The general Scala solution for dealing with nullable integers is to map them to Option[Int]. DataStax Spark Connector for Apache Cassandra®'s CassandraRow class has exactly one such method:

def getIntOption(index: Int): Option[Int] = ...

Under the hood, it reads a java.lang.Integer from the Java driver's Row class, and converts the value to either None if it's null, or to Some(value), if it isn't.

Let's try to achieve the same behavior, but using the composite pattern: we first need a codec that converts from any CQL value into a Scala Option. There is no such built-in codec in the Java driver, but now that we are codec experts, let's roll our own OptionCodec:


class OptionCodec[T](
    cqlType: DataType,
    javaType: TypeToken[Option[T]],
    innerCodec: TypeCodec[T])
  extends TypeCodec[Option[T]](cqlType, javaType)
    with VersionAgnostic[Option[T]] {

  def this(innerCodec: TypeCodec[T]) {
    this(innerCodec.getCqlType, TypeTokens.optionOf(innerCodec.getJavaType), innerCodec)
  }

  override def serialize(value: Option[T], protocolVersion: ProtocolVersion): ByteBuffer =
    if (value.isEmpty) OptionCodec.empty.duplicate else innerCodec.serialize(value.get, protocolVersion)

  override def deserialize(bytes: ByteBuffer, protocolVersion: ProtocolVersion): Option[T] =
    if (bytes == null || bytes.remaining() == 0) None else Option(innerCodec.deserialize(bytes, protocolVersion))

  override def format(value: Option[T]): String =
    if (value.isEmpty) "NULL" else innerCodec.format(value.get)

  override def parse(value: String): Option[T] =
    if (value == null || value.isEmpty || value.equalsIgnoreCase("NULL")) None else Option(innerCodec.parse(value))

}

object OptionCodec {

  private val empty = ByteBuffer.allocate(0)

  def apply[T](innerCodec: TypeCodec[T]): OptionCodec[T] =
    new OptionCodec[T](innerCodec)

  import scala.reflect.runtime.universe._

  def apply[T](implicit innerTag: TypeTag[T]): OptionCodec[T] = {
    val innerCodec = TypeConversions.toCodec(innerTag.tpe).asInstanceOf[TypeCodec[T]]
    apply(innerCodec)
  }

}

And voilà! As you can see, the class body is very simple (its companion object is not very exciting at this point either, but we will see later how it could do more than just mirror the class constructor). Its main purpose when deserializing/parsing is to detect CQL nulls and return None right away, without even having to interrogate the inner codec, and when serializing/formatting, intercept None so that it can be immediately converted back to an empty ByteBuffer (the native protocol's representation of null).

We can now combine our two codecs together, IntCodec and OptionCodec, and compose a TypeCodec[Option[Int]]:

import com.datastax.driver.core._
val codec: TypeCodec[Option[Int]] = OptionCodec(IntCodec)
assert(codec.deserialize(ByteBuffer.allocate(0), ProtocolVersion.V4).isEmpty)
assert(codec.deserialize(ByteBuffer.allocate(4), ProtocolVersion.V4).isDefined)

The problem with TypeTokens

Let's sum up what we've got so far: a TypeCodec[Option[Int]] that is the perfect match for CQL ints. But how to use it?

There is nothing really particular with this codec and it is perfectly compatible with the Java driver. You can use it explicitly, which is probably the simplest way:

import com.datastax.driver.core._
val codec: TypeCodec[Option[Int]] = OptionCodec(IntCodec)
val row: Row = ??? // some CQL query containing an int column
val v: Option[Int] = row.get(0, codec)

But your application is certainly more complex than that, and you would like to register your codec beforehand so that it gets transparently used afterwards:

import com.datastax.driver.core._
// first
val codec: TypeCodec[Option[Int]] = OptionCodec(IntCodec)
cluster.getConfiguration.getCodecRegistry.register(codec)

// then
val row: Row = ??? // some CQL query containing an int column
val v: Option[Int] = row.get(0, ???) // How to get a TypeToken[Option[Int]]?

Well, before we can actually do that, we first need to solve one problem: the Row.get method comes in a few overloaded flavors, and the most flavory ones accept a TypeToken argument; let's learn how to use them in Scala.

The Java Driver API, for historical reasons — but also, let's be honest, due to the lack of alternatives – makes extensive usage of Guava's TypeToken API (if you are not familiar with the type token pattern you might want to stop and read about it first).

Scala has its own interpretation of the same reflective pattern, named type tags. Both APIs pursue identical goals – to convey compile-time type information to the runtime – through very different roads. Unfortunately, it's all but an easy path to travel from one to the other, simply because there is no easy bridge between java.lang.Type and Scala's Type.

Hopefully, all is not lost. As a matter of fact, creating a full-fledged conversion service between both APIs is not a pre-requisite: it turns out that Guava's TypeToken works pretty well in Scala, and most classes get resolved just fine. TypeTokens in Scala are just a bit cumbersome to use, and quite error-prone when instantiated, but that's something that a helper object can facilitate.

We are not going to dive any deeper in the troubled waters of Scala reflection (well, at least not until the last chapter of this tutorial). It suffices to assume that the helper object we mentioned above really exists, and that it does the job of creating TypeToken instances while at the same time sparing the developer the boiler-plate code that this operation usually incurs.

Now we can resume our example and complete our code that reads a CQL int into a Scala Option[Int], in the most transparent way:

import com.datastax.driver.core._
val tt = TypeTokens.optionOf(TypeTokens.int) // creates a TypeToken[Option[Int]]
val row: Row = ??? // some CQL query containing an int column
val v: Option[Int] = row.get(0, tt) 

Dealing with Collections

Another common friction point between Scala and the Java driver is the handling of CQL collections.

Of course, the driver has built-in support for CQL collections; but obviously, these map to typical Java collection types: CQL list maps to java.util.List (implemented by java.util.ArrayList), CQL set to java.util.Set (implemented by java.util.LinkedHashSet) and CQL map to java.util.Map (implemented by java.util.HashMap).

This leaves Scala developers with two inglorious options:

  1. Use the implicit JavaConverters object and deal with – gasp! – mutable collections in their code;
  2. Deal with custom Java-to-Scala conversion in their code, and face the consequences of conversion overhead (this is the choice made by the already-mentioned Spark Connector for Apache Cassandra®, because it has a very rich set of converters available).

All of this could be avoided if CQL collection types were directly deserialized into Scala immutable collections.

Meet SeqCodec, our third Scala codec in this tutorial:


import java.nio.ByteBuffer
import com.datastax.driver.core.CodecUtils.{readSize, readValue}
import com.datastax.driver.core._
import com.datastax.driver.core.exceptions.InvalidTypeException

class SeqCodec[E](eltCodec: TypeCodec[E])
  extends TypeCodec[Seq[E]](
    DataType.list(eltCodec.getCqlType),
    TypeTokens.seqOf(eltCodec.getJavaType))
    with ImplicitVersion[Seq[E]] {

  override def serialize(value: Seq[E], protocolVersion: ProtocolVersion): ByteBuffer = {
    if (value == null) return null
    val bbs: Seq[ByteBuffer] = for (elt <- value) yield {
      if (elt == null) throw new NullPointerException("List elements cannot be null")
      eltCodec.serialize(elt, protocolVersion)
    }
    CodecUtils.pack(bbs.toArray, value.size, protocolVersion)
  }

  override def deserialize(bytes: ByteBuffer, protocolVersion: ProtocolVersion): Seq[E] = {
    if (bytes == null || bytes.remaining == 0) return Seq.empty[E]
    val input: ByteBuffer = bytes.duplicate
    val size: Int = readSize(input, protocolVersion)
    for (_ <- 1 to size) yield eltCodec.deserialize(readValue(input, protocolVersion), protocolVersion)
  }

  override def format(value: Seq[E]): String = {
    if (value == null) "NULL" else '[' + value.map(e => eltCodec.format(e)).mkString(",") + ']'
  }

  override def parse(value: String): Seq[E] = {
    if (value == null || value.isEmpty || value.equalsIgnoreCase("NULL")) return Seq.empty[E]
    var idx: Int = ParseUtils.skipSpaces(value, 0)
    if (value.charAt(idx) != '[') throw new InvalidTypeException( s"""Cannot parse list value from "$value", at character $idx expecting '[' but got '${value.charAt(idx)}'""")
    idx = ParseUtils.skipSpaces(value, idx + 1)
    val seq = Seq.newBuilder[E]
    if (value.charAt(idx) == ']') return seq.result
    while (idx < value.length) {
      val n = ParseUtils.skipCQLValue(value, idx)
      seq += eltCodec.parse(value.substring(idx, n))
      idx = n
      idx = ParseUtils.skipSpaces(value, idx)
      if (value.charAt(idx) == ']') return seq.result
      if (value.charAt(idx) != ',') throw new InvalidTypeException( s"""Cannot parse list value from "$value", at character $idx expecting ',' but got '${value.charAt(idx)}'""")
      idx = ParseUtils.skipSpaces(value, idx + 1)
    }
    throw new InvalidTypeException( s"""Malformed list value "$value", missing closing ']'""")
  }

  override def accepts(value: AnyRef): Boolean = value match {
    case seq: Seq[_] => if (seq.isEmpty) true else eltCodec.accepts(seq.head)
    case _ => false
  }

}

object SeqCodec {

  def apply[E](eltCodec: TypeCodec[E]): SeqCodec[E] = new SeqCodec[E](eltCodec)

}

(Of course, we are talking here about scala.collection.immutable.Seq.)

The code above is still vaguely ressemblant to the equivalent Java code, and not very interesting per se; the parse method in particular is not exactly a feast for the eyes, but there's little we can do about it.

In spite of its modest body, this codec allows us to compose a more interesting TypeCodec[Seq[Option[Int]]] that can convert a CQL list<int> directly into a scala.collection.immutable.Seq[Option[Int]]:

import com.datastax.driver.core._
type Seq[+A] = scala.collection.immutable.Seq[A]
val codec: TypeCodec[Seq[Int]] = SeqCodec(OptionCodec(IntCodec))
val l = List(Some(1), None)
assert(codec.deserialize(codec.serialize(l, ProtocolVersion.V4), ProtocolVersion.V4) == l)

Some remarks about this codec:

  1. This codec is just for the immutable Seq type. It could be generalized into an AbstractSeqCodec in order to accept other mutable or immutable sequences. If you want to know how it would look, the answer is here.
  2. Ideally, TypeCodec[T] should have been made covariant in T, the type handled by the codec (i.e. TypeCodec[+T]); unfortunately, this is not possible in Java, so TypeCodec[T] is in practice invariant in T. This is a bit frustrating for Scala implementors, as they need to choose the best upper bound for T, and stick to it for both input and output operations, just like we did above.
  3. Similar codecs can be created to map CQL sets to Sets and CQL maps to Maps; again, we leave this as an exercise to the user (and again, it is possible to cheat).

Dealing with Tuples

Scala tuples are an appealing target for CQL tuples.

The Java driver does have a built-in codec for CQL tuples; but it translates them into TupleValue instances, which are unfortunately of little help for creating Scala tuples.

Luckily enough, TupleCodec inherits from AbstractTupleCodec, a class that has been designed exactly with that purpose in mind: to be extended by developers wanting to map CQL tuples to more meaningful types than TupleValue.

As a matter of fact, it is extremely simple to craft a codec for Tuple2 by extending AbstractTupleCodec:


class Tuple2Codec[T1, T2](
    cqlType: TupleType, javaType: TypeToken[(T1, T2)],
    eltCodecs: (TypeCodec[T1], TypeCodec[T2]))
  extends AbstractTupleCodec[(T1, T2)](cqlType, javaType)
    with ImplicitVersion[(T1, T2)] {

  def this(eltCodec1: TypeCodec[T1], eltCodec2: TypeCodec[T2])(implicit protocolVersion: ProtocolVersion, codecRegistry: CodecRegistry) {
    this(
      TupleType.of(protocolVersion, codecRegistry, eltCodec1.getCqlType, eltCodec2.getCqlType),
      TypeTokens.tuple2Of(eltCodec1.getJavaType, eltCodec2.getJavaType),
      (eltCodec1, eltCodec2)
    )
  }

  {
    val componentTypes = cqlType.getComponentTypes
    require(componentTypes.size() == 2, s"Expecting TupleType with 2 components, got ${componentTypes.size()}")
    require(eltCodecs._1.accepts(componentTypes.get(0)), s"Codec for component 1 does not accept component type: ${componentTypes.get(0)}")
    require(eltCodecs._2.accepts(componentTypes.get(1)), s"Codec for component 2 does not accept component type: ${componentTypes.get(1)}")
  }

  override protected def newInstance(): (T1, T2) = null

  override protected def serializeField(source: (T1, T2), index: Int, protocolVersion: ProtocolVersion): ByteBuffer = index match {
    case 0 => eltCodecs._1.serialize(source._1, protocolVersion)
    case 1 => eltCodecs._2.serialize(source._2, protocolVersion)
  }

  override protected def deserializeAndSetField(input: ByteBuffer, target: (T1, T2), index: Int, protocolVersion: ProtocolVersion): (T1, T2) = index match {
    case 0 => Tuple2(eltCodecs._1.deserialize(input, protocolVersion), null.asInstanceOf[T2])
    case 1 => target.copy(_2 = eltCodecs._2.deserialize(input, protocolVersion))
  }

  override protected def formatField(source: (T1, T2), index: Int): String = index match {
    case 0 => eltCodecs._1.format(source._1)
    case 1 => eltCodecs._2.format(source._2)
  }

  override protected def parseAndSetField(input: String, target: (T1, T2), index: Int): (T1, T2) = index match {
    case 0 => Tuple2(eltCodecs._1.parse(input), null.asInstanceOf[T2])
    case 1 => target.copy(_2 = eltCodecs._2.parse(input))
  }

}

object Tuple2Codec {

  def apply[T1, T2](eltCodec1: TypeCodec[T1], eltCodec2: TypeCodec[T2]): Tuple2Codec[T1, T2] =
    new Tuple2Codec[T1, T2](eltCodec1, eltCodec2)

}

A very similar codec for Tuple3 can be found here. Extending this principle to Tuple4, Tuple5, etc. is straightforward and left for the reader as an exercise.

Going incognito with implicits

The careful reader noticed that Tuple2Codec's constructor takes two implicit arguments: CodecRegistry and ProtocolVersion. They are omnipresent in the TypeCodec API and hence, good candidates for implicit arguments – and besides, both have nice default values. To make the code above compile, simply put in your scope something along the lines of:

object Implicits {

  implicit val protocolVersion = ProtocolVersion.NEWEST_SUPPORTED
  implicit val codecRegistry = CodecRegistry.DEFAULT_INSTANCE

}

Speaking of implicits, let's now see how we can simplify our codecs by adding a pinch of those. Let's take a look at our first trait in this tutorial:

trait VersionAgnostic[T] {  this: TypeCodec[T] =>

  def serialize(value: T)(implicit protocolVersion: ProtocolVersion, marker: ClassTag[T]): ByteBuffer = 
    this.serialize(value, protocolVersion)

  def deserialize(bytes: ByteBuffer)(implicit protocolVersion: ProtocolVersion, marker: ClassTag[T]): T = 
    this.deserialize(bytes, protocolVersion)

}

This trait basically creates two overloaded methods, serialize and deserialize, which will infer the appropriate protocol version to use and forward the call to the relevant method (the marker argument is just the usual trick to work around erasure).

We can now mix-in this trait with an existing codec, and then avoid passing the protocol version to every call to serialize or deserialize:

import Implicits._
val codec = new SeqCodec(IntCodec) with VersionAgnostic[Seq[Int]]
codec.serialize(List(1,2,3))

We can now go even further and simplify the way codecs are composed together to create complex codecs. What if, instead of writing SeqCodec(OptionCodec(IntCodec)), we could simply write SeqCodec[Option[Int]]? To achieve that, let's enhance the companion object of SeqCodec with a more sophisticated apply method:

object SeqCodec {

  def apply[E](eltCodec: TypeCodec[E]): SeqCodec[E] = new SeqCodec[E](eltCodec)

  import scala.reflect.runtime.universe._

  def apply[E](implicit eltTag: TypeTag[E]): SeqCodec[E] = {
    val eltCodec = ??? // implicit TypeTag -> TypeCodec conversion
    apply(eltCodec)
  }

}

The second apply method guesses the element type by using implicit TypeTag instances (these are created by the Scala compiler, so you don't need to worry about instantiating them), then locates the appropriate codec for it. We can now write:

val codec = SeqCodec[Option[Int]]

Elegant, huh? Of course, we need some magic to locate the right codec given a TypeTag instance. Here we need to introduce another helper object, TypeConversions. Its method toCodec takes a Scala type and, with the help of some pattern matching, locates the most appropriate codec. We refer the interested reader to TypeConversions code for more details.

With the help of TypeConversions, we can now complete our new apply method:

def apply[E](implicit eltTag: TypeTag[E]): SeqCodec[E] = {
  val eltCodec = TypeConversions.toCodec[E](eltTag.tpe)
  apply(eltCodec)
}

Note: similar apply methods can be added to other codec companion objects as well.

It's now time to go really wild, bearing in mind that the following features should only be used with caution by expert users.

If only we could convert Scala's TypeTag instances into Guava's TypeToken ones, and then make them implicit like we did above, we would be able to completely abstract away these annoying types and write very concise code, such as:

val statement: BoundStatement = ???
statement.set(0, List(1,2,3)) // implicit TypeTag -> TypeToken conversion

val row: Row = ???
val list: Seq[Int] = row.get(0) // implicit TypeTag -> TypeToken conversion

Well, this can be achieved in a few different ways; we are going to explore here the so-called Type Class pattern.

The first step is be to create implicit classes containing "get" and "set" methods that take TypeTag instances instead of TypeToken ones; we'll name them getImplicitly and setImplicitly to avoid name clashes. Let's do it for Row and BoundStatement:


implicit class RowOps(val self: Row) {

  def getImplicitly[T](i: Int)(implicit typeTag: TypeTag[T]): T = 
    self.get(i, ???) // implicit TypeTag -> TypeToken conversion

  def getImplicitly[T](name: String)(implicit typeTag: TypeTag[T]): T =
    self.get(name, ???) // implicit TypeTag -> TypeToken conversion

}

implicit class BoundStatementOps(val self: BoundStatement) {

  def setImplicitly[T](i: Int, value: T)(implicit typeTag: TypeTag[T]): BoundStatement =
    self.set(i, value, ???) // implicit TypeTag -> TypeToken conversion

  def setImplicitly[T](name: String, value: T)(implicit typeTag: TypeTag[T]): BoundStatement = 
    self.set(name, value, ???) // implicit TypeTag -> TypeToken conversion
  }

}

Remember what we stated at the beginning of this tutorial: "there is no easy bridge between Java types and Scala types"? Well, we will have to lay one now to cross that river.

Our helper object TypeConversions has another method, toJavaType, that does just that. Again, digging into its details is out of the scope of this tutorial, but with this method we can complete our implicit classes as below:

def getImplicitly[T](i: Int)(implicit typeTag: TypeTag[T]): T = 
  val javaType: java.lang.reflect.Type = TypeConversions.toJavaType(typeTag.tpe)
  self.get(i, TypeToken.of(javaType).wrap().asInstanceOf[TypeToken[T]])

And we are done!

Now, by simply placing the above implicit classes into scope, we will be able to write code as concise as:

statement.setImplicitly(0, List(1,2,3)) // implicitly converted to statement.setImplicitly(0, List(1,2,3)) (TypeTag[Seq[Int]]), then
                                        // implicitly converted to statement.set          (0, List(1,2,3), TypeToken[Seq[Int]])

When retrieving values, it's a bit more complicated because the Scala compiler needs some help from the developer to be able to fill in the appropriate implicit TypeTag instance; we do so like this:

val list = row.getImplicitly[Seq[Int]](0) // implicitly converted to statement.getImplicitly(0) (TypeTag[Seq[Int]]), then
                                          // implicitly converted to statement.get          (0,  TypeToken[Seq[Int]])

That's it. We hope that with this tutorial, we could demonstrate how easy it is to create codecs for the Java driver that are first-class citizens in Scala. Enjoy!

Cassandra Time Series Data Modeling For Massive Scale

One of the big challenges people face when starting out working with Cassandra and time series data is understanding the impact of how your write workload will affect your cluster. Writing too quickly to a single partition can create hot spots that limit your ability to scale out. Partitions that get too large can lead to issues with repair, streaming, and read performance. Reading from the middle of a large partition carries a lot of overhead, and results in increased GC pressure. Cassandra 4.0 should improve the performance of large partitions, but it won’t fully solve the other issues I’ve already mentioned. For the foreseeable future, we will need to consider their performance impact and plan for them accordingly.

In this post, I’ll discuss a common Cassandra data modeling technique called bucketing. Bucketing is a strategy that lets us control how much data is stored in each partition as well as spread writes out to the entire cluster. This post will discuss two forms of bucketing. These techniques can be combined when a data model requires further scaling. Readers should already be familiar with the anatomy of a partition and basic CQL commands.

When we first learn about data modeling with Cassandra, we might see something like the following:

CREATE TABLE raw_data (
    sensor text,
    ts timeuuid,
    readint int,
    primary key(sensor, ts)
) WITH CLUSTERING ORDER BY (ts DESC) 
  AND compaction = {'class': 'TimeWindowCompactionStrategy', 
                    'compaction_window_size': 1, 
                    'compaction_window_unit': 'DAYS'};

This is a great first data model for storing some very simple sensor data. Normally the data we collect is more complex than an integer, but in this post we’re going to focus on the keys. We’re leveraging TWCS as our compaction strategy. TWCS will help us deal with the overhead of compacting large partitions, which should keep our CPU and I/O under control. Unfortunately it still has some significant limitations. If we aren’t using a TTL, as we take in more data, our partition size will grow constantly, unbounded. As mentioned above, large partitions carry significant overhead when repairing, streaming, or reading from arbitrary time slices.

To break up this big partition, we’ll leverage our first form of bucketing. We’ll break our partitions into smaller ones based on time window. The ideal size is going to keep partitions under 100MB. For example, one partition per sensor per day would be a good choice if we’re storing 50-75MB of data per day. We could just as easily use week (starting from some epoch), or month and year as long as the partitions stay under 100MB. Whatever the choice, leaving a little headroom for growth is a good idea.

To accomplish this, we’ll add another component to our partition key. Modifying our earlier data model, we’ll add a day field:

CREATE TABLE raw_data_by_day (
sensor text,
day text,
ts timeuuid,
reading int,
primary key((sensor, day), ts)
) WITH CLUSTERING ORDER BY (ts DESC) 
       AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 
                     'compaction_window_unit': 'DAYS', 
                     'compaction_window_size': 1};

Inserting into the table requires using the date as well as the now() value (you could also generate a TimeUUID in your application code):

INSERT INTO raw_data_by_day (sensor, day, ts, reading) 
VALUES ('mysensor', '2017-01-01', now(), 10);

This is one way of limiting the amount of data per partition. For fetching large amounts of data across multiple days, you’ll need to issue one query per day. The nice part about querying like this is we can spread the work over the entire cluster rather than asking a single node to perform a lot of work. We can also issue these queries in parallel by relying on the async calls in the driver. The Python driver even has a convenient helper function for this sort of use case:

from itertools import product
from cassandra.concurrent import execute_concurrent_with_args

days = ["2017-07-01", "2017-07-12", "2017-07-03"]  # collecting three days worth of data
session  = Cluster(["127.0.0.1"]).connect("blog")
prepared = session.prepare("SELECT day, ts, reading FROM raw_data_by_day WHERE sensor = ? and day = ?")

args = product(["mysensor"], days) 
# args: ('test', '2017-07-01'), ('test', '2017-07-12'), ('test', '2017-07-03')

# driver handles concurrency for you
results = execute_concurrent_with_args(session, prepared, args)

# Results:
#[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36750>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36a90>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36550>)]

A variation on this technique is to use a different table per time window. For instance, using a table per month means you’d have twelve tables per year:

CREATE TABLE raw_data_may_2017 (
    sensor text,
    ts timeuuid,
    reading int,
    primary key(sensor, ts)
) WITH COMPACTION = {'class': 'TimeWindowCompactionStrategy', 
                     'compaction_window_unit': 'DAYS', 
                     'compaction_window_size': 1};

This strategy has a primary benefit of being useful for archiving and quickly dropping old data. For instance, at the beginning of each month, we could archive last month’s data to HDFS or S3 in parquet format, taking advantage of cheap storage for analytics purposes. When we don’t need the data in Cassandra anymore, we can simply drop the table. You can probably see there’s a bit of extra maintenance around creating and removing tables, so this method is really only useful if archiving is a requirement. There are other methods to archive data as well, so this style of bucketing may be unnecessary.

The above strategies focuses on keeping partitions from getting too big over a long period of time. This is fine if we have a predictable workload and partition sizes that have very little variance. It’s possible to be ingesting so much information that we can overwhelm a single node’s ability to write data out, or the ingest rate is significantly higher for a small percentage of objects. Twitter is a great example, where certain people have tens of millions of followers but it’s not the common case. It’s common to have a separate code path for these types of accounts where we need massive scale

The second technique uses multiple partitions at any given time to fan out inserts to the entire cluster. The nice part about this strategy is we can use a single partition for low volume, and many partitions for high volume.

The tradeoff we make with this design is on reads we need to use a scatter gather, which has significantly higher overhead. This can make pagination more difficult, amongst other things. We need to be able to track how much data we’re ingesting for each gizmo we have. This is to ensure we can pick the right number of partitions to use. If we use too many buckets, we end up doing a lot of really small reads across a lot of partitions. Too few buckets, we end up with really large partitions that don’t compact, repair, stream well, and have poor read performance.

For this example, we’ll look at a theoretical model for someone who’s following a lot of users on a social network like Twitter. Most accounts would be fine to have a single partition for incoming messages, but some people / bots might follow millions of accounts.

Disclaimer: I have no knowledge of how Twitter is actually storing their data, it’s just an easy example to discuss.

CREATE TABLE tweet_stream (
    account text,
    day text,
    bucket int,
    ts timeuuid,
    message text,
    primary key((account, day, bucket), ts)
) WITH CLUSTERING ORDER BY (ts DESC) 
         AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 
                       'compaction_window_unit': 'DAYS', 
                       'compaction_window_size': 1};

This data model extends our previous data model by adding bucket into the partition key. Each day can now have multiple buckets to fetch from. When it’s time to read, we need to fetch from all the partitions, and take the results we need. To demonstrate, we’ll insert some data into our partitions:

cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 0, now(), 'hi');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 1, now(), 'hi2');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 2, now(), 'hi3');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 3, now(), 'hi4');

If we want the ten most recent messages, we can do something like this:

from itertools import chain
from cassandra.util import unix_time_from_uuid1

prepared = session.prepare("SELECT ts, message FROM tweet_stream WHERE account = ? and day = ? and bucket = ? LIMIT 10")
# let's get 10 buckets 
partitions = range(10)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

args = product(["jon_haddad"], ["2017-07-01"], partitions)

result = execute_concurrent_with_args(session, prepared, args)

# [ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e6d0>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d710>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d4d0>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d950>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1db10>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dfd0>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dd90>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d290>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e250>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e490>)]

results = [x.result_or_exc for x in result]

# append all the results together
data = chain(*results)
            
sorted_results = sorted(data, key=lambda x: unix_time_from_uuid1(x.ts), reverse=True)            

# newest stuff first
# [Row(ts=UUID('e1c59e60-7406-11e7-9458-897782c5d96c'), message=u'hi4'),
#  Row(ts=UUID('dd6ddd00-7406-11e7-9458-897782c5d96c'), message=u'hi3'),
#  Row(ts=UUID('d4422560-7406-11e7-9458-897782c5d96c'), message=u'hi2'),
#  Row(ts=UUID('d17dae30-7406-11e7-9458-897782c5d96c'), message=u'hi')]

This example is only using a LIMIT of 10 items, so we can be lazy programmers, merge the lists, and then sort them. If we wanted to grab a lot more elements we’d want to use a k-way merge algorithm. We’ll come back to that in a future blog post when we expand on this topic.

At this point you should have a better understanding of how you can distribute your data and requests around the cluster, allowing it to scale much further than if a single partition were used. Keep in mind each problem is different, and there’s no one size fits all solution.

Bring Your Own Spark

Bring Your Own Spark (BYOS) is a feature of DSE Analytics designed to connect from external Apache Spark™ systems to DataStax Enterprise with minimal configuration efforts. In this post we introduce how to configure BYOS and show some common use cases.

BYOS extends the DataStax Spark Cassandra Connector with DSE security features such as Kerberos and SSL authentication. It also includes drivers to access the DSE Cassandra File System (CFS) and DSE File System (DSEFS) in 5.1.

There are three parts of the deployment:

  • <dse_home>clients/dse-byos_2.10-5.0.6.jar is a fat jar. It includes everything you need to connect the DSE cluster: Spark Cassandra Connector with dependencies, DSE security connection implementation, and CFS driver.
  • 'dse client-tool configuration byos-export' tool help to configure external Spark cluster to connect to the DSE
  • 'dse client-tool spark sql-schema' tool generates SparkSQL-compatible scripts to create external tables for all or part of DSE tables in SparkSQL metastore.

HDP 2.3+ and CDH 5.3+ are the only Hadoop distributions which support Java 8 officially and which have been tested with BYOS in DSE 5.0 and 5.1.

Quick Start Guide

Pre-requisites:

There is installed and configured a Hadoop or standalone Spark system and you have access to at least one host on the cluster with a preconfigured Spark client. Let’s call it spark-host. The Spark installation should be pointed to by $SPARK_HOME.

There is installed and configured a DSE cluster and you have access to it. Let’s call it dse-host. I will assume you have a cassandra_keyspace.exampletable C* table created on it.The DSE is located at $DSE_HOME.

DSE supports Java 8 only. Make sure your Hadoop, Yarn and Spark use Java 8. See your Hadoop distro documentation on how to upgrade Java version (CDH, HDP).

Prepare the configuration file

On dse-host run:

$DSE_HOME/bin/dse client-tool configuration byos-export byos.conf

It will store DSE client connection configuration in Spark-compatible format into byos.conf.

Note: if SSL or password authentication is enabled, additional parameters needed to be stored. See dse client-tool documentation for details.

Copy the byos.conf to spark-host.

On spark-host append the ~/byos.conf file to the Spark default configuration

cat byos.conf >> $SPARK_HOME/conf/conf/spark-defaults.conf

Note: If you expect conflicts with spark-defaults.conf, the byos-export tool can merge properties itself; refer to the documentation for details.

Prepare C* to SparkSQL mapping (optional)

On dse-host run:

dse client-tool spark sql-schema -all > cassandra_maping.sql

That will create cassandra_maping.sql with spark-sql compatible create table statements.

Copy the file to spark-host.

Run Spark

Copy $DSE_HOME/dse/clients/dse-byos-5.0.0-all.jar to the spark-host

Run Spark with the jar.

$SPARK_HOME/bin/spark-shell --jars dse-byos-5.0.0-all.jar
scala> import com.datastax.spark.connector._
scala> sc.cassandraTable(“cassandra_keyspace”, "exampletable" ).collect

Note: External Spark can not connect to DSE Spark master and submit jobs. Thus you can not point it to DSE Spark master.

SparkSQL

BYOS does not support the legacy Cassandra-to-Hive table mapping format. The spark data frame external table format should be used for mapping: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

DSE provides a tool to auto generate the mapping for external spark metastore: dse client-tool spark sql-schema

On the dse-host run:

dse client-tool spark sql-schema -all > cassandra_maping.sql

That will create cassandra_maping.sql with spark-sql compatible create table statements

Copy the file to spark-host

Create C* tables mapping in spark meta-store

$SPARK_HOME/bin/spark-sql--jars dse-byos-5.0.0-all.jar -f cassandra_maping.sql

Tables are now ready to use in both SparkSQL and Spark shell.

$SPARK_HOME/bin/spark-sql --jars dse-byos-5.0.0-all.jar
spark-sql> select * from cassandra_keyspace.exampletable
$SPARK_HOME/bin/spark-shell —jars dse-byos-5.0.0-all.jar
scala>sqlConext.sql(“select * from cassandra_keyspace.exampletable");

Access external HDFS from dse spark

DSE is built with Hadoop 2.7.1 libraries. So it is able to access any Hadoop 2.x HDFS file system.

To get access you need just proved full path to the file in Spark commands:

scala> sc.textFile("hdfs://<namenode_host>/<path to the file>")

To get a namenode host you can run the following command on the Hadoop cluster:

hdfs getconf -namenodes

If the Hadoop cluster has custom configuration or enabled kerberos security, the configuration should be copied into the DSE Hadoop config directory:

cp /etc/hadoop/conf/hdfs-site.xml $DSE_HOME/resources/hadoop2-client/conf/hdfs-site.xml

Make sure that firewall does not block the following HDFS data node and name node ports:

NameNode metadata service 8020/9000
DataNode 50010,50020

 

Security configuration

SSL

Start with truststore generation with DSE nodes certificates. If client certificate authentication is enabled (require_client_auth=true), client keystore will be needed.

More info on certificate generation:

https://docs.datastax.com/en/cassandra/2.1/cassandra/security/secureSSLCertificates_t.html

Copy both file to each Spark node on the same location. The Spark '--files' parameter can be used for the coping in Yarn cluster.

Use byos-export parameters to add store locations, type and passwords into byos.conf.

dse client-tool configuration byos-export --set-truststore-path .truststore --set-truststore-password 
password --set-keystore-path .keystore --set-keystore-password password byos.conf

Yarn example:

spark-shell --jars byos.jar --properties-file byos.conf --files .truststore,.keystore

Kerberos

Make sure your Spark client host (where spark driver will be running) has kerberos configured and C* nodes DNS entries are configured properly. See more details in the Spark Kerberos documentation.

If the Spark cluster mode deployment will be used or no Kerberos configured on the spark client host use "Token based authentication" to access Kerberized DSE cluster.

byos.conf file will contains all necessary Kerberos principal and service names exported from the DSE.

The JAAS configuration file with the following options need to be copied from DSE node or created manually on the Spark client node only and stored at $HOME/.java.login.config file.

DseClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useTicketCache=true
       renewTGT=true;
};

Note: If a custom file location is used, Spark driver property need to be set pointing to the location of the file.

--conf 'spark.driver.extraJavaOptions=-Djava.security.auth.login.config=login_config_file'

BYOS authenticated by Kerberos and request C* token for executors authentication. The token authentication should be enabled in DSE. the spark driver will automatically cancel the token on exit

Note: the CFS root should be passed to the Spark to request token with:

--conf spark.yarn.access.namenodes=cfs://dse_host/

Spark Thrift Server with Kerberos

It is possible to authenticate services with keytab. Hadoop/YARN services already preconfigured with keytab files and kerberos useк if kerberos was enabled in the hadoop. So you need to grand permissions to these users. Here is example for hive user

cqlsh> create role 'hive/hdp0.dc.datastax.com@DC.DATASTAX.COM' with LOGIN = true;

Now you can login as a hive kerberos user, merge configs and start Spark thrift server. It will be able to query DSE data:

#> kinit -kt /etc/security/keytabs/hive.service.keytab \ hive/hdp0.dc.datastax.com@DC.DATASTAX.COM
#> cat /etc/spark/conf/spark-thrift-sparkconf.conf byos.conf > byos-thrift.conf
#> start-thriftserver.sh --properties-file byos-thrift.conf --jars dse-byos*.jar

Connect to it with beeline for testing:

#> kinit
#> beeline -u 'jdbc:hive2://hdp0:10015/default;principal=hive/_HOST@DC.DATASTAX.COM'

Token based authentication

Note: This approach is less secure than Kerberos one, use it only in case kerberos is not enabled on your spark cluster.

DSE clients use hadoop like token based authentication when Kerberos is enabled in DSE server.

The Spark driver authenticates to DSE server with Kerberos credentials, requests a special token, send the token to the executors. Executors authenticates to DSE server with the token. So no kerberos libraries needed on executors node.

If the Spark driver node has no Kerberos configured or spark application should be run in cluster mode. The token could be requested during configuration file generation with --generate-token parameters.

$DSE_HOME/bin/dse client-tool configuration byos-export --generate-token byos.conf

Following property will be added to the byos.conf:

spark.hadoop.cassandra.auth.token=NwAJY2Fzc2FuZHJhCWNhc3NhbmRyYQljYXNzYW5kcmGKAVPlcaJsigFUCX4mbIQ7YU_yjEJgRUwQNIzpkl7yQ4inoxtZtLDHQBpDQVNTQU5EUkFfREVMRUdBVElPTl9UT0tFTgA

It is important to manually cancel it after task is finished to prevent re usage attack.

dse client-tool cassandra cancel-token NwAJY2Fzc2FuZHJhCWNhc3NhbmRyYQljYXNzYW5kcmGKAVPlcaJsigFUCX4mbIQ7YU_yjEJgRUwQNIzpkl7yQ4inoxtZtLDHQBpDQVNTQU5EUkFfREVMRUdBVElPTl9UT0tFTgA

Instead of Conclusion

Open Source Spark Cassandra Connector and Bring Your Own Spark feature comparison:

Feature OSS DSE BYOS
DataStax Official Support No Yes
Spark SQL Source Tables / Cassandra DataFrames Yes Yes
CassandraDD batch and streaming Yes Yes
C* to Spark SQL table mapping generator No Yes
Spark Configuration Generator No Yes
Cassandra File System Access No Yes
SSL Encryption Yes Yes
User/password authentication Yes Yes
Kerberos authentication No Yes

 

DSE Advanced Replication in DSE 5.1

DSE Advanced Replication feature in DataStax Enterprise underwent a major refactoring between DSE 5.0 (“V1”) and DSE 5.1 (“V2”), radically overhauling its design and performance characteristics.

DSE Advanced Replication builds on the multi-datacenter support in Apache Cassandra® to facilitate scenarios where selective or "hub and spoke" replication is required. DSE Advanced Replication is specifically designed to tolerate sporadic connectivity that can occur in constrained environments, such as retail, oil-and-gas remote sites and cruise ships.

This blog post provides a broad overview of the main performance improvements and  drills down into how we support CDC ingestion and deduplication to ensure efficient transmission of mutations.

Note: This blog post was written targeting DSE 5.1. Please refer to the DataStax documentation for your specific version of DSE if different.

Overview

Discussion of performance enhancements is split into three broad stages:

  1. Ingestion: Capturing the Cassandra mutations for an Advance Replication enabled table
  2. Queueing: Sorting and storing the ingested mutations in an appropriate message queue
  3. Replication: Replicating the ingested mutation to the desired destination(s).

Ingestion

In Advanced Replication v1 (included in DSE 5.0); capturing mutations for an Advanced Replication enabled table used Cassandra triggers. Inside the trigger we unbundled the mutation and extract the various partition updates and key fields for the mutation. By using the trigger in the ingestion transaction, we provided backpressure to ingestion and reduced throughput latency, as the mutations were processed in the ingestion cycle.

In Advanced Replication v2 (included in DSE 5.1), we replaced triggers with the Cassandra Change Data Capture (CDC) feature added in Cassandra version 3.8. CDC is an optional mechanism for extracting mutations from specific tables from the commitlog. This mutation extraction occurs outside the Ingestion transaction, so it adds negligible direct overhead to the ingestion cycle latency.

Post-processing the CDC logs requires CPU and memory. This process competes with DSE for resources, so decoupling of ingestion into DSE and ingestion into Advanced Replication allows us to support bursting for mutation ingestion.

The trigger in v1 was previously run on a single node in the source cluster. CDC is run on every node in the source cluster, which means that there are replication factor (RF) number of copies of each mutation. This change creates the need for deduplication which we’ll explain later on.

Queuing

In Advanced Replication v1, we stored the mutations in a blob of data within a vanilla DSE table, relying on DSE to manage the replication of the queue and maintain the data integrity. The issue was that this insertion was done within the ingestion cycle with a negative impact on ingestion latency, at a minimum doubling the ingestion time. This could increase the latency enough to create a query timeout, causing an exception for the whole Cassandra query.

In Advanced Replication v2 we offloaded the queue outside of DSE and used local files. So for each mutation, we have RF copies of it that mutation - due to capturing the mutations at the replica level via CDC versus at the coordinator level via triggers in v1 – on the same nodes as the mutation is stored for Cassandra. This change ensures data integrity and redundancy and provides RF copies of the mutation.

We have solved this CDC deduplication problem based on an intimate understanding of token ranges, gossip, and mutation structures to ensure that, on average, each mutation is only replicated once.The goal is to replicate all mutations at least once, and to try to minimize replicating a given mutation multiple times. This solution will be described later.

Replication

Previously in Advanced Replication v1, replication could be configured only to a single destination. This replication stream was fine for a use case which was a net of source clusters storing data and forwarding to a central hub destination, essentially 'edge-to-hub.'

In Advanced Replication v2 we added support for multiple destinations, where data could be replicated to multiple destinations for distribution or redundancy purposes. As part of this we added the ability to prioritize which destinations and channels (pairs of source table to destination table) are replicated first, and  configure whether channel replication is LIFO or FIFO to ensure newest or oldest data is replicated first.

CDC Deduplication and its integration into the Message Queue to support replication

With the new implementation of the v2 mutation Queue, we have the situation where we have each mutation stored in Replication Factor number of queues, and the mutations on each Node are interleaved depending on which subset of token ranges are stored on that node.

There is no guarantee that the mutations are received on each node in the same order.

With the Advanced Replication v1 trigger implementation there was a single consolidated queue which made it significantly easier to replicate each mutation only once.

Deduplication

In order to minimize the number of times we process each mutation, we triage the mutations that extract from the CDC log in the following way:

  1. Separate the mutations into their distinct tables.
  2. Separate them into their distinct token ranges.
  3. Collect the mutations in time sliced buckets according to their mutation timestamp (which is the same for that mutation across all the replica nodes.)

Distinct Tables

Separating them into their distinct table represents the directory structure:

token Range configuration

Assume a three node cluster with a replication factor of 3.

For the sake of simplicity, this is the token-range structure on the nodes:

Primary, Secondary and Tertiary are an arbitrary but consistent way to prioritize the token Ranges on the node – and are based on the token Configuration of the keyspace – as we know that Cassandra has no concept of a primary, secondary or tertiary node.

However, it allows us to illustrate that we have three token ranges that we are dealing with in this example. If we have Virtual-Nodes, then naturally there will be more token-ranges, and a node can be ‘primary’ for multiple ranges.

Time slice separation

Assume the following example CDC files for a given table:

As we can see the mutation timestamps are NOT always received in order (look at the id numbers), but in this example we contain the same set of mutations.

In this case, all three nodes share the same token ranges, but if we had a 5 node cluster with a replication factor of 3, then the token range configuration would look like this, and the mutations on each node would differ:

Time slice buckets

As we process the mutations from the CDC file, we store them in time slice buckets of one minute’s worth of data. We also keep a stack of 5 time slices in memory at a time, which means that we can handle data up to 5 minutes out of order. Any data which is processed more than 5 minutes out of order would be put into the out of sequence file and treated as exceptional data which will be need to be replicated from all replica nodes.

Example CDC Time Window Ingestion

  • In this example, assume that there are 2 time slices of 30 seconds
  • Deltas which are positive are ascending in time so are acceptable.
  • Id’s 5, 11 and 19 jump backwards in time.
  • As the sliding time window is 30 seconds, Id’s 5, 12 & 19 would be processed, whilst ID 11 is a jump back of 45 seconds so would not be processed into the correct Time Slice but placed in the Out Of Sequence files.

Comparing Time slices

So we have a time slice of mutations on different replica nodes, they should be identical, but there is no guarantee that they are in the same order. But we need to be able to compare the time slices and treat them as identical regardless of order. So we take the CRC of each mutation, and when we have sealed (rotated it out of memory because the current mutation that we are ingesting is 5 minutes later than this time slice) the time slice , we sort the CRCs and take a CRC of all of the mutation CRCs.
That [TimeSlice] CRC is comparable between time slices to ensure they are identical.

The CRCs for each time slice are communicated between nodes in the cluster via the Cassandra table.

Transmission of mutations

In the ideal situation, identical time slices and all three nodes are active – so each node is happily ticking away only transmitting its primary token range segment files.

However, we need to deal with robustness and assume that nodes fail, time slices do not match and we still have the requirement that ALL data is replicated.

We use gossip to monitor which nodes are active and not, and then if a node fails – the ‘secondary’ become active for that nodes ‘primary’ token range.

Time slice CRC processing

If a CRC matches for a time slice between 2 node – then when that time slice is fully transmitted (for a given destination), then the corresponding time slice (with the matching crc) can be marked as sent (synchdeleted.)

If the CRC mismatches, and there is no higher priority active node with a matching CRC, then that time slice is to be transmitted – this is to ensure that no data is missed and everything is fully transmitted.

Active Node Monitoring Algorithm

Assume that the token ranges are (a,b], (b,c], (c,a], and the entire range of tokens is [a,c], we have three nodes (n1, n2 and n3) and replication factor 3.

  • On startup the token ranges for the keyspace are determined - we actively listen for token range changes and adjust the schema appropriately.
  • These are remapped so we have the following informations:
    • node => [{primary ranges}, {secondary ranges}, {tertiary ranges}]
    • Note: We support vnodes where there may be multiple primary ranges for a node.
  • In our example we have:
    • n1 => [{(a,b]}, {(b,c]}, {c,a]}]
    • n2 => [{(b,c]}, {c,a]}, {(a,b]}]
    • n3 => [{c,a]}, {(a,b]}, {(b,c]}]
  • When all three nodes are live, the active token ranges for the node are as follows:
    • n1 => [{(a,b]}, {(b,c]}, {c,a]}] => {(a,b]}
    • n2 => [{(b,c]}, {c,a]}, {(a,b]}] => {(b,c]}
    • n3 => [{c,a]}, {(a,b]}, {(b,c]}] => {(c,a]}
  • Assume that n3 has died, its primary range is then searched for in the secondary replicas of live nodes:
    • n1 => [{(a,b]}, {(b,c]}, {c,a]}] => {(a,b], }
    • n2 => [{(b,c]}, {c,a]}, {(a,b]}] => {(b,c], (c,a]}
    • n3 => [{c,a]}, {(a,b]}, {(b,c]}] => {}
  • Assume that n2 and n3 have died, their primary range is then searched for in the secondary replicas of live nodes, and if not found the tertiary replicas (assuming replication factor 3) :
    • n1 => [{(a,b]}, {(b,c]}, {c,a]}] => {(a,b], (b,c], (c,a]}
    • n2 => [{(b,c]}, {c,a]}, {(a,b]}] => {}
    • n3 => [{c,a]}, {(a,b]}, {(b,c]}] => {}
  • This ensures that data is only sent once from each edge node, and that dead nodes do not result in orphaned data which is not sent.

Handling the Node Failure Case

Below illustrates the three stages of a failure case.

  1. Before - where everything is working as expected.
  2. Node 2 Fails - so Node 1 becomes Active for its token Slices and ignores what it has already been partially sent for 120-180, and resends from its secondary directory.
  3. Node 2 restarts - this is after Node 1 has sent 3 Slices for which Node 2 was primary (but Node 1 was Active because it was Node 2’s secondary), it synchronously Deletes those because the CRCs match. It ignores what has already been partially sent for 300-360 and resends those from its primary directory and carries on.

Before

Node 2 Dies

Node 2 Restarts

 

Conclusion

The vastly improved and revamped DSE Advanced Replication v2 in DSE 5.1 is more resilient and performant with support for multi-hubs and multi-clusters.

For more information see our documentation here.

Cassandra Fundamentals & Data Modelling

This course is designed for developers, and database administrators who want to a rapid, deep-dive and ‘hands on’ exploration of core Cassandra theories and data modelling practices.

Continue reading Cassandra Fundamentals & Data Modelling on opencredo.com.

Studio 2.0 Goes Multi-Model with CQL Support

Great news!  In addition to support for DSE Graph and Apache TinkerPop™, Datastax Studio 2.0 introduces support for the Apache Cassandra™ Query Language(CQL).  A big part of that support is an intelligent CQL editor that will give you a productivity boost when working with CQL and Datastax Enterprise(DSE) 5.0+.  In this blog post we’ll take a deep dive on what the CQL editor has to offer.

Getting Started with CQL and Studio

CQL support for Studio requires DSE 5.0 or higher and Studio 2.0 or higher.  Both can be downloaded here http://docs.datastax.com/en/latest-dse/ and here http://docs.datastax.com/en/latest-studio/.

As is customary in Studio, you work with CQL in a notebook with one or more notebook cells.  To use CQL, just select it as the language for one of your notebook cells:


Figure 1.a:  Shows where to click to get the drop down menu of language options


Figure 1.b:  Demonstrates selecting CQL as a language which will enable the intelligent editor features

You won’t have to make this selection every time as any new cell automatically inherits the prior cells language to avoid having to select the language you want to work with repeatedly.

Keyspaces

If you have worked with CQL in the past the next thing you’ll want to know is how to select a keyspace.  You have a few options with Studio:

  1. Fully qualify schema elements by their keyspace in your CQL statements:
  2. Use a USE statement, which will change the keyspace context for all subsequent statements in the same cell:
  3. Configure the keyspace by selecting one from keyspace drop down for a cell, which will set the keyspace context for all statements in a cell (except for statements following a USE statement):

Like with the cell language, if a new cell is created directly below a CQL cell the keyspace setting will be inherited.

Now that we know how to work with keyspaces, let’s move on.

CQL Validations


Figure 2:  A CQL schema aware domain validation error, indicating a keyspace needs to be specified

In the previous section we showed several ways to work with keyspaces.  But if you don’t use any of the above options how do you know you’ve made a mistake without executing the statement?  The answer is shown in Figure 3 above.  There we can see that Studio let’s us know when our statements has an issue by showing a validation error.

Studio supports supports both CQL syntax and DSE domain specific validations.  A syntax validation is simply whether or not your statement is valid with respect to the CQL grammar:

Domain validations provide you with errors or warnings that you would get from DSE when executing a statement when some constraint is violated.  Most are based on checking if a statement is valid with regards to your schema.  But they can include anything, such as informing you that you’ve specified an invalid table option:


Figure 3:  Example of a CQL domain validation error, that if you execute the statement gives you similar feedback from DSE

In this case, and many others you can figure out how to correct your statement by removing the part of the statement with an error and invoking content assist with ctrl+space to get a list of proposals.  Let’s take a look at content assist now.

Content Assist

Like validations, content assist can help you by proposing the next valid keywords in the grammar, or it can provide domain specific proposals.  Let’s see how we might correct the statement that specified an invalid table option by invoking content assist with ctrl+space:


Figure 4:  Example of proposing valid table options in a CREATE TABLE statement

In Figure 4 above we can see that the table option that we probably wanted before was bloom_filter_fp_chance, which after being selected will be inserted with a valid default value.

There are many places in CQL statements that Studio supports invoking content assist.  Some of the more common are:

  • Proposing table names anywhere a table can be referenced in a statement
  • Proposing column names anywhere a table can be referenced
  • Proposing the next valid keyword, e.g. CREATE <ctrl+space> should propose the TABLE keyword, among others

CQL Templates

Perhaps the most useful place to invoke content assist is at the very beginning of a statement:


Figure 5:  Invocation of content assist at the beginning of a statement that propose CQL statement templates

What you see in Figure 5 is that the proposals that contain placeholder values ({keyspaceName}, {viewName}) are CQL statement templates.  If we select the ALTER TABLE(add column) template a statement is inserted with each placeholder being a portion of the statement you need to complete.  You can TAB through these placeholders to jump around a statement, as well as use SHIFT+TAB to move back to the previous placeholder:


Figure 6:  Show the ALTER TABLE(add column) template inserted, with the current placeholder highlighted

In Figure 6 you can see that the placeholders are emphasized with the current placeholder being highlighted.  For this template we need to provide a table name, a column name and a type for the column.  Templates like these can be very handy when dealing with large complicated statements that you might not remember the syntax for off hand.  Such as the CREATE MATERIALIZED VIEW statement:


Figure 7:  Shows how handy templates can be for large complex statements such as CREATE MATERIALIZED VIEW(with clustering order)

When in doubt, give content assist a try!  All you have to do is invoke it with ctrl+space and you will pleasantly surprised how much Studio can help you with crafting your CQL statements.

Effective Schema

When either validating your statements or making content assist proposals, Studio makes schema based domain validations and content assist proposals using an effective schema.  The effective schema is your existing schema combined with changes each of your DDL statements would effectively make to the database schema.  More specifically the changes from every DDL statement prior to the current statement that you are either trying to invoke content assist on, or that the editor is validating.

This ensures that if you were to execute your cells one by one from the top down that they would each execute successfully.

To make this clearer, take a look at the following example:


Figure 8:  Example of effective schema in a single cell

In the example above, assume that the database schema does not have the videos table, and that we have not executed this notebook cell.  In this cell we can see the following being demonstrated:

  1. A CREATE TABLE statement applies a change to the effective schema so that the videos table now exists from the perspective of the second statement
  2. Even though we haven’t executed it, the second statement(drop table videos) does not have a validation error, because the videos table exists in the effective schema of the drop table statement.
  3. The third statement tries to select from the videos table.  But the effective schema for that statement no longer has the videos table due to the prior drop statement, so it is flagged with a validation error.

Note that effective schema also carries across cells:


Figure 9:  Example of effective schema across multiple cells

And as mentioned previously, content assist also leverages effective schema


Figure 10:  Example of content assist leveraging the effective schema

The example above shows that content assist is aware that the videos2 table exists in the effective schema, but that videos1 has been dropped, so it isn’t proposed as a possible table to drop for the current statement.

Effective schema is a great tool to have to ensure you are writing statements that will execute successfully when working on a notebook that contains DDL statements. Especially notebooks with many statements.

One last topic for this post is a way for you to view your database schema from the editor itself using Studio’s DESCRIBE statement support.

DESCRIBE Statement Support

Suppose you want to create a new user defined type(UDT) that is fairly similar to an existing UDT, or you just don’t remember the syntax.  One way to do this quickly is to leverage Studio’s support for describing CQL schema elements.  Like CQL shell(cqlsh), executing DESCRIBE statements will produce the equivalent DDL to create that schema element.  Which is a handy thing to copy and then modify to meet your new types needs:


Figure 11:  Shows the result of executing a DESCRIBE TYPE statement

In general Studio's DESCRIBE command support is a great way to inspect parts of your schema quickly without leaving the editor.  However, it’s important to note that DESCRIBE commands are not actual CQL statements and don't execute against your DSE cluster. Instead Studio uses the metadata it knows about your schema to generate equivalent output that you would find if issuing DESCRIBE commands using cqlsh.

What DESCRIBE commands does Studio support?

  • DESCRIBE CLUSTER
  • DESCRIBE KEYSPACES
  • DESCRIBE KEYSPACE
  • DESCRIBE TABLES
  • DESCRIBE TABLE
  • DESCRIBE INDEX
  • DESCRIBE MATERIALIZED VIEW
  • DESCRIBE TYPES
  • DESCRIBE TYPE
  • DESCRIBE FUNCTIONS
  • DESCRIBE FUNCTION
  • DESCRIBE AGGREGATES
  • DESCRIBE AGGREGATE

 Next Steps

A great place for you to go next is to download Studio and walk through the Working With CQL tutorial that ships with it.  That tutorial contains even more info about how to work with CQL in Studio, including:

  • Browsing your CQL schema with our fabulous CQL schema viewer
  • Different ways to visualize your CQL results, including a detailed JSON view of nested data in a single column
  • How to create custom CQL execution configurations, including ones that enable tracing and give you a profile view of your queries execution

Thanks!

We hope that Studio will be an extremely productive environment for you to craft your CQL queries to run against Datastax Enterprise.  If you have any feedback or requests, don’t hesitate to contact the Studio team at:  studio-feedback@datastax.com.  

Spark Application Dependency Management

This blog post was written for DataStax Enterprise 5.1.0. Refer to the DataStax documentation for your specific version of DSE.

Compiling and executing Apache Spark™ applications with custom dependencies can be a challenging task. Spark beginners can feel overwhelmed by the number of different solutions to this problem. Diversity of library versions, the number of different build tools and finally the build techniques, such as assembling fat JARs and dependency shading, can cause a headache.

In this blog post, we shed light on how to manage compile-time and runtime dependencies of a Spark Application that is compiled and executed against DataStax Enterprise (DSE) or open source Apache Spark (OSS).

Along the way we use a set of predefined bootstrap projects that can be adopted and used as a starting point for developing a new Spark Application. These examples are all about connecting, reading, and writing to and from a DataStax Enterprise or Apache Cassandra(R) system.

Quick Glossary:

Spark Driver: A user application that contains a Spark Context.
Spark Context: A Scala class that functions as the control mechanism for distributed work.
Spark Executor: A remote Java Virtual Machine (JVM) that performs work as orchestrated by the Spark Driver.
Runtime classpath: A list of all dependencies available during execution (in execution environment such as Apache Spark cluster). It's important to note that the runtime classpath of the Spark Driver is not necessarily identical to the runtime classpath of the Spark Executor.
Compile classpath: A full list of all dependencies available during compilation (specified with build tool syntax in a build file).

Choose language and build tool

First, git clone the DataStax repository https://github.com/datastax/SparkBuildExamples that provides the code that you are going to work with. Within cloned directories there are Spark Application bootstrap projects for Java and Scala, and for the most frequently used build tools:

  • Scala Build Tool (sbt)
  • Apache Maven™
  • Gradle

In the context of managing dependencies for the Spark Application, these build tools are equivalent. It is up to you to select the language and build tool that best fits you and your team.

For each build tool, the way the application is built is defined with declarative syntax embedded in files in the application’s directory:

  • Sbt: build.sbt
  • Apache Maven: pom.xml
  • Gradle: build.gradle

From now on we are going to refer to those files as a build files.

Choose execution environment

Two different execution environments are supported in the repository: DSE and OSS.

DSE

If you are planning to execute your Spark Application on a DSE cluster, use the dse bootstrap project which greatly simplifies dependency management.

It leverages the dse-spark-dependencies library which instructs a build tool to include all dependency JAR files that are distributed with DSE and are available in the DSE cluster runtime classpath. These JAR files include Apache Spark JARs and their dependencies, Apache Cassandra JARs, Spark Cassandra Connector JAR, and many others. Everything that is needed to build your bootstrap Spark Application is supplied by the dse-spark-dependencies dependency. To view the list of all dse-spark-dependencies dependencies, visit our public repo and inspect the pom files that are relevant to your DSE cluster version.

An example of an DSE built.sbt:

libraryDependencies += "com.datastax.dse" % "dse-spark-dependencies" % "5.1.1" % "provided"

Using this managed dependency will automatically match your compile time dependencies with the DSE dependencies on the runtime classpath. This means there is no possibility in the execution environment for dependency version conflicts, unresolved dependencies etc.

Note: The DSE version must match the one in your cluster, please see “Execution environment version” section for details.

DSE projects templates are built with sbt 0.13.13 or later. In case of unresolved dependencies errors, update sbt and then clean ivy cache (with rm ~/.ivy2/cache/com.datastax.dse/dse-spark-dependencies/ command).

OSS

If you are planning to execute your Spark Application on an open source Apache Spark cluster, use the oss bootstrap project. For the oss bootstrap project, all compilation classpath dependencies must be manually specified in build files.

An example of an OSS built.sbt:

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion % "provided"
)

For OSS, you must specify these four dependencies for the compilation classpath.

During execution, the Spark runtime classpath already contains the org.apache.spark.* dependencies, so all we need to do is to add spark-cassandra-connector as an extra dependency. The DataStax spark-cassandra-connector doesn’t exist in the Spark cluster by default. The most common method to include this additional dependency is to use --packages argument for the spark-submit command. An example of --packages argument usage is shown in the “Execute” section below.

The Apache Spark versions in the build file must match the Spark version in your Spark cluster. See next section for details.

Execution environment versions

It is possible that your DSE or OSS cluster version is different than the one specified in bootstrap project.

DSE

If you are a DSE user then checkout the SparkBuildExamples version that matches your DSE cluster version, for example:

git checkout <DSE_version>
# example: git checkout 5.0.6

If you are a DSE 4.8.x user then checkout 4.8.13 or newer 4.8.x version.

OSS

If you are planning to execute your application against a Spark cluster different than the one specified in a bootstrap project build file, adjust all dependencies version listed there. Fortunately, the main component versions are variables. See the example below and adjust following according to your needs.

Sbt

val sparkVersion = "2.0.2"
val connectorVersion = "2.0.0"

 

Maven

<properties>
  <spark.version>2.0.2</spark.version>
  <connector.version>2.0.0</connector.version>
</properties>

 

Gradle

def sparkVersion = "2.0.2"
def connectorVersion = "2.0.0"

Let’s say that your Spark cluster has 1.5.1 version. Go to version compatibility table, there you can see compatible Apache Cassandra versions and Spark Cassandra Connector versions. In this example, our Apache Spark 1.5.1 cluster is compatible with 1.5.x Spark Cassandra Connector, the newest one is 1.5.2 (newest versions can be found on Releases page). Adjust the variables accordingly and you are good to go!

Build

The build command differs for each build tool. The bootstrap projects can be built with the following commands.

Sbt

sbt clean assembly
# produces jar in path: target/scala-2.11/writeRead-assembly-0.1.jar

Maven

mvn clean package
# produces jar in path: target/writeRead-0.1.jar

Gradle

gradle clean shadowJar
# produces jar in path: build/libs/writeRead-0.1.jar

Execute

The spark-submit command differs between environments. In DSE environment, the command is simplified to autodetect parameters like --master. In addition, various other Apache Cassandra and DSE specific parameters are added to the default SparkConf. Use the following commands to execute the JAR that you built. Refer to the Spark docs for details about spark-submit command.

DSE

dse spark-submit --class com.datastax.spark.example.WriteRead <path_to_produced_jar>

OSS

spark-submit --conf spark.cassandra.connection.host=<cassandra_host> --class com.datastax.spark.example.WriteRead --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0 --master <master_url> <path_to_produced_jar>

Note the usage of --packages to include the spark-cassandra-connector on the runtime classpath for all application JVMs.

Provide additional dependencies

Now that you have successfully built and executed this simple application, it’s time to see how extra dependencies can be added to your Spark Application.

Let’s say your application grows with time and there is a need to incorporate an external dependency to add functionality to your application. For this argument, let the new dependency  be commons-math3.

To supply this dependency to the compilation classpath, we must provide proper configuration entries in build files.

There are two ways to provide additional dependencies to runtime classpath assembling or manually providing all dependencies with the spark-submit command.

Assembly

Assembling is a way of directly including dependencies classes in the resulting JAR file (sometimes called fat-jar or uber-jar) as if these dependency classes were developed along with your application. When the user code is shipped to Apache Spark Executors, these dependency classes are included in the application JAR on the runtime classpath. To see an example, uncomment the following sections in any of your build files.

Sbt

libraryDependencies += "org.apache.commons" %% "commons-math3" % "3.6.1"

 

Maven

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-math3</artifactId>
  <version>3.6.1</version>
</dependency>

 

Gradle

assembly "org.apache.commons:commons-math3:3.6.1"

Now you can use commons-math3 classes in your application code. When your development is finished, you can create a JAR file using the build command and submit it without any modifications to the spark-submit command. If you are curious to see where the additional dependency is, use any archive application to open the produced JAR to see that commons-math3 classes are included.

When assembling, you might run into conflicts where multiple jars attempt to include a file with the same filename but different contents. There are several solutions to this problem, most common are: removing one of the conflicting dependencies or shading (which is described later in this blog post). If all else fails, most plugins have a variety of other merge strategies for handling these situations. For example, the  https://github.com/sbt/sbt-assembly#merge-strategy.

Manually adding JARs to the runtime classpath

If you don’t want to assembly a fat JAR (maybe the number of additional dependencies produced a 100MB JAR file and you consider this size unusable), use an alternate way to provide additional dependencies to runtime classpath.

Mark some of the dependencies with provided keyword to exclude them from the assembly JAR.

Sbt

libraryDependencies += "org.apache.commons" %% "commons-math3" % "3.6.1" % "provided"

 

Maven

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-math3</artifactId>
  <version>3.6.1</version>
  <scope>provided</scope>
</dependency>

 

Gradle

provided "org.apache.commons:commons-math3:3.6.1"

After building a JAR, manually specify additional dependencies with spark-submit command during application submission. Add or extend existing --packages argument of spark-submit command. Note that multiple dependencies are separated by commas. For example:

--packages org.apache.commons:commons-math3:3.6.1,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0

User dependencies conflicting with Spark dependencies

What if you want to use different version of a dependency than the version that is present in the execution environment?

For example, a Spark cluster already has commons-csv in its runtime classpath and the developer needs a different version in their application. Maybe the Spark version is old and doesn’t contain all the needed functionality. Maybe the new version is not backward compatible and breaks Spark Application execution.

This is a common problem and there is a solution: shading.

Shading

Shading is a build technique where dependency classes are packaged with application JAR files (like in assembling) but additionally package structure of this classes is altered. This process happens at compile time and is transparent to the developer. Shading simply substitutes all dependency references in a Spark Application with the same (functionality-wise) classes but located in different packages. For example, the class org.apache.commons.csv.CSVParser for Spark Application becomes shaded.org.apache.commons.csv.CSVParser.

To see shading in action uncomment following sections in build file of your choice. This will embed old commons-csv in resulting jar but with prepended package “shaded”.

Sbt

assembly "org.apache.commons:commons-csv:1.0"

and

assemblyShadeRules in assembly := Seq( 
 ShadeRule.rename("org.apache.commons.csv.**" -> "shaded.org.apache.commons.csv.@1").inAll 
)

Maven

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-csv</artifactId>
  <version>1.0</version>
</dependency>

and

<relocations>
  <relocation>
    <pattern>org.apache.commons.csv</pattern>
    <shadedPattern>shaded.org.apache.commons.csv</shadedPattern>
  </relocation>
</relocations>

 

Gradle

libraryDependencies += "org.apache.commons" % "commons-csv" % "1.0"

and

shadowJar {
  relocate 'org.apache.commons.csv', 'shaded.org.apache.commons.csv'
}

After building the JAR, you can look into its content and see that commons-csv is embedded in shaded directory.

Summary

In this article, you learned how to manage compile-time and runtime dependencies of a simple Apache Spark application that connects to an Apache Cassandra database by using the Spark Cassandra Connector. You learned how Scala and Java projects are structured with sbt, gradle, and maven build tools. You also learned different ways to provide additional dependencies and how to resolve dependency conflicts with shading.

Reaper 0.6.1 released

Since we created our hard fork of Spotify’s great repair tool, Reaper, we’ve been committed to make it the “de facto” community tool to manage repairing Apache Cassandra clusters.
This required Reaper to support all versions of Apache Cassandra (starting from 1.2) and some features it lacked like incremental repair.
Another thing we really wanted to bring in was to remove the dependency on a Postgres database to store Reaper data. As Apache Cassandra users, it felt natural to store these in our favorite database.

Reaper 0.6.1

We are happy to announce the release of Reaper 0.6.1.

Apache Cassandra as a backend storage for Reaper was introduced in 0.4.0, but it appeared that it was creating a high load on the cluster hosting its data.
While the Postgres backend could rely on indexes to search efficiently for segments to process, the C* backend had to scan all segments and filter afterwards. The initial data model didn’t account for the frequency of those scans, which generated a lot of requests per seconds once you had repairs with hundreds (if not thousands) of segments.
Then it seems, Reaper was designed to work on clusters that do not use vnodes. Computing the number of possible parallel segment repairs for a job used the number of tokens divided by the replication factor, instead of using the number of nodes divided by the replication factor.
This lead to create a lot of overhead with threads trying and failing to repair segments because the nodes were already involved in a repair operation, each attempt generating a full scan of all segments.

Both issues are fixed in Reaper 0.6.1 with a brand new data model which requires a single query to get all segments for a run, the use of timeuuids instead of long ids in order to avoid lightweight transactions when generating repair/segment ids and a fixed computation of the number of possible parallel repairs.

The following graph shows the differences before and after the fix, observed on a 3 nodes cluster using 32 vnodes :

The load on the nodes is now comparable to running Reaper with the memory backend :

This release makes Apache Cassandra a first class citizen as a Reaper backend!

Upcoming features with the Apache Cassandra backend

On top of not having to administer yet another kind of database on top of Apache Cassandra to run Reaper, we can now better integrate with multi region clusters and handle security concerns related to JMX access.

First, the Apache Cassandra backend allows us to start several instances of Reaper instead of one, bringing it fault tolerance. Instances will share the work on segments using lightweight transactions and metrics will be stored in the database. On multi region clusters, where the JMX port is closed in cross DC communications, it will give the opportunity to start one or more instances of Reaper in each region. They will coordinate together through the backend and Reaper will still be able to apply backpressure mechanisms, by monitoring the whole cluster for running repairs and pending compactions.

Next, comes the “local mode”, for companies that apply strict security policies for the JMX port and forbid all remote access. In this specific case, a new parameter was added in the configuration yaml file to activate the local mode and you will need to start one instance of Reaper on each C* node. Each instance will then only communicate with the local node on 127.0.0.1 and ignore all tokens for which this node isn’t a replica.

Those feature are both available in a feature branch that will be merged before the next release.

While the fault tolerant features have been tested in different scenarios and considered ready for use, the local mode still needs a little bit of work before usage on real clusters.

Improving the frontend too

So far, we hadn’t touched the frontend and focused on the backend.
Now we are giving some love to the UI as well. On top of making it more usable and good looking, we are pushing some new features that will make Reaper “not just a tool for managing repairs”.

The first significant addition is the new cluster health view on the home screen :

One quick look at this screen will give you the nodes individual status (up/down) and the size on disk for each node, rack and datacenter of the clusters Reaper is connected to.

Then we’ve reorganized the other screens, making forms and lists collapsible, and adding a bit of color :

All those UI changes were just merged into master for your testing pleasure, so feel free to build, deploy and be sure to give us feedback on the reaper mailing list!

Bootstrapping Apache Cassandra Nodes

Auto bootstrapping is a handy feature when it comes to growing an Apache Cassandra cluster. There are some unknowns about how this feature works which can lead to data inconsistencies in the cluster. In this post I will go through a bit about the history of the feature, the different knobs and levers available to operate it, and resolving some of the common issues that may arise.

Summary

Here are links to the various sections of the post to give you an idea of what I will cover.

Background

The bootstrap feature in Apache Cassandra controls the ability for the data in cluster to be automatically redistributed when a new node is inserted. The new node joining the cluster is defined as an empty node without system tables or data.

When a new node joins the cluster using the auto bootstrap feature, it will perform the following operations

  • Contact the seed nodes to learn about gossip state.
  • Transition to Up and Joining state (to indicate it is joining the cluster; represented by UJ in the nodetool status).
  • Contact the seed nodes to ensure schema agreement.
  • Calculate the tokens that it will become responsible for.
  • Stream replica data associated with the tokens it is responsible for from the former owners.
  • Transition to Up and Normal state once streaming is complete (to indicate it is now part of the cluster; represented by UN in the nodetool status).

The above operations can be seen in the logs.

Contact the seed nodes to learn about gossip state

INFO  [HANDSHAKE-/127.0.0.1] 2017-05-12 16:14:45,290 OutboundTcpConnection.java:487 - Handshaking version with /127.0.0.1
INFO  [GossipStage:1] 2017-05-12 16:14:45,318 Gossiper.java:1029 - Node /127.0.0.1 is now part of the cluster
INFO  [GossipStage:1] 2017-05-12 16:14:45,325 Gossiper.java:1029 - Node /127.0.0.2 is now part of the cluster
INFO  [GossipStage:1] 2017-05-12 16:14:45,326 Gossiper.java:1029 - Node /127.0.0.3 is now part of the cluster
INFO  [GossipStage:1] 2017-05-12 16:14:45,328 Gossiper.java:1029 - Node /127.0.0.4 is now part of the cluster
INFO  [SharedPool-Worker-1] 2017-05-12 16:14:45,331 Gossiper.java:993 - InetAddress /127.0.0.1 is now UP
INFO  [HANDSHAKE-/127.0.0.3] 2017-05-12 16:14:45,331 OutboundTcpConnection.java:487 - Handshaking version with /127.0.0.3
INFO  [HANDSHAKE-/127.0.0.2] 2017-05-12 16:14:45,383 OutboundTcpConnection.java:487 - Handshaking version with /127.0.0.2
INFO  [HANDSHAKE-/127.0.0.4] 2017-05-12 16:14:45,387 OutboundTcpConnection.java:487 - Handshaking version with /127.0.0.4
INFO  [SharedPool-Worker-1] 2017-05-12 16:14:45,438 Gossiper.java:993 - InetAddress /127.0.0.3 is now UP
INFO  [SharedPool-Worker-2] 2017-05-12 16:14:45,438 Gossiper.java:993 - InetAddress /127.0.0.4 is now UP
INFO  [SharedPool-Worker-3] 2017-05-12 16:14:45,438 Gossiper.java:993 - InetAddress /127.0.0.2 is now UP
...
INFO  [main] 2017-05-12 16:14:46,289 StorageService.java:807 - Starting up server gossip

Transition to Up and Joining state

INFO  [main] 2017-05-12 16:14:46,396 StorageService.java:1138 - JOINING: waiting for ring information

Contact the seed nodes to ensure schema agreement

Take note of the last entry in this log snippet.

INFO  [GossipStage:1] 2017-05-12 16:14:49,081 Gossiper.java:1029 - Node /127.0.0.1 is now part of the cluster
INFO  [SharedPool-Worker-1] 2017-05-12 16:14:49,082 Gossiper.java:993 - InetAddress /127.0.0.1 is now UP
INFO  [GossipStage:1] 2017-05-12 16:14:49,095 TokenMetadata.java:414 - Updating topology for /127.0.0.1
INFO  [GossipStage:1] 2017-05-12 16:14:49,096 TokenMetadata.java:414 - Updating topology for /127.0.0.1
INFO  [HANDSHAKE-/127.0.0.1] 2017-05-12 16:14:49,096 OutboundTcpConnection.java:487 - Handshaking version with /127.0.0.1
INFO  [GossipStage:1] 2017-05-12 16:14:49,098 Gossiper.java:1029 - Node /127.0.0.2 is now part of the cluster
INFO  [SharedPool-Worker-1] 2017-05-12 16:14:49,102 Gossiper.java:993 - InetAddress /127.0.0.2 is now UP
INFO  [GossipStage:1] 2017-05-12 16:14:49,103 TokenMetadata.java:414 - Updating topology for /127.0.0.2
INFO  [HANDSHAKE-/127.0.0.2] 2017-05-12 16:14:49,104 OutboundTcpConnection.java:487 - Handshaking version with /127.0.0.2
INFO  [GossipStage:1] 2017-05-12 16:14:49,104 TokenMetadata.java:414 - Updating topology for /127.0.0.2
INFO  [GossipStage:1] 2017-05-12 16:14:49,106 Gossiper.java:1029 - Node /127.0.0.3 is now part of the cluster
INFO  [SharedPool-Worker-1] 2017-05-12 16:14:49,111 Gossiper.java:993 - InetAddress /127.0.0.3 is now UP
INFO  [GossipStage:1] 2017-05-12 16:14:49,112 TokenMetadata.java:414 - Updating topology for /127.0.0.3
INFO  [HANDSHAKE-/127.0.0.3] 2017-05-12 16:14:49,195 OutboundTcpConnection.java:487 - Handshaking version with /127.0.0.3
INFO  [GossipStage:1] 2017-05-12 16:14:49,236 TokenMetadata.java:414 - Updating topology for /127.0.0.3
INFO  [GossipStage:1] 2017-05-12 16:14:49,247 Gossiper.java:1029 - Node /127.0.0.4 is now part of the cluster
INFO  [SharedPool-Worker-1] 2017-05-12 16:14:49,248 Gossiper.java:993 - InetAddress /127.0.0.4 is now UP
INFO  [InternalResponseStage:1] 2017-05-12 16:14:49,252 ColumnFamilyStore.java:905 - Enqueuing flush of schema_keyspaces: 1444 (0%) on-heap, 0 (0%) off-heap
INFO  [MemtableFlushWriter:2] 2017-05-12 16:14:49,254 Memtable.java:347 - Writing Memtable-schema_keyspaces@1493033009(0.403KiB serialized bytes, 10 ops, 0%/0% of on/off-heap limit)
INFO  [MemtableFlushWriter:2] 2017-05-12 16:14:49,256 Memtable.java:382 - Completed flushing .../node5/data0/system/schema_keyspaces-b0f2235744583cdb9631c43e59ce3676/system-schema_keyspaces-tmp-ka-1-Data.db (0.000KiB) for commitlog position ReplayPosition(segmentId=1494569684606, position=119856)
INFO  [InternalResponseStage:1] 2017-05-12 16:14:49,367 ColumnFamilyStore.java:905 - Enqueuing flush of schema_columnfamilies: 120419 (0%) on-heap, 0 (0%) off-heap
INFO  [MemtableFlushWriter:1] 2017-05-12 16:14:49,368 Memtable.java:347 - Writing Memtable-schema_columnfamilies@1679976057(31.173KiB serialized bytes, 541 ops, 0%/0% of on/off-heap limit)
INFO  [MemtableFlushWriter:1] 2017-05-12 16:14:49,396 Memtable.java:382 - Completed flushing .../node5/data0/system/schema_columnfamilies-45f5b36024bc3f83a3631034ea4fa697/system-schema_columnfamilies-tmp-ka-1-Data.db (0.000KiB) for commitlog position ReplayPosition(segmentId=1494569684606, position=119856)
...
INFO  [InternalResponseStage:5] 2017-05-12 16:14:50,824 ColumnFamilyStore.java:905 - Enqueuing flush of schema_usertypes: 160 (0%) on-heap, 0 (0%) off-heap
INFO  [MemtableFlushWriter:2] 2017-05-12 16:14:50,824 Memtable.java:347 - Writing Memtable-schema_usertypes@1946148009(0.008KiB serialized bytes, 1 ops, 0%/0% of on/off-heap limit)
INFO  [MemtableFlushWriter:2] 2017-05-12 16:14:50,826 Memtable.java:382 - Completed flushing .../node5/data0/system/schema_usertypes-3aa752254f82350b8d5c430fa221fa0a/system-schema_usertypes-tmp-ka-10-Data.db (0.000KiB) for commitlog position ReplayPosition(segmentId=1494569684606, position=252372)
INFO  [main] 2017-05-12 16:14:50,404 StorageService.java:1138 - JOINING: schema complete, ready to bootstrap

Calculate the tokens that it will become responsible for

INFO  [main] 2017-05-12 16:14:50,404 StorageService.java:1138 - JOINING: waiting for pending range calculation
INFO  [main] 2017-05-12 16:14:50,404 StorageService.java:1138 - JOINING: calculation complete, ready to bootstrap
INFO  [main] 2017-05-12 16:14:50,405 StorageService.java:1138 - JOINING: getting bootstrap token

Stream replica data associated with the tokens it is responsible for from the former owners

Take note of the first and last entries in this log snippet.

INFO  [main] 2017-05-12 16:15:20,440 StorageService.java:1138 - JOINING: Starting to bootstrap...
INFO  [main] 2017-05-12 16:15:20,461 StreamResultFuture.java:86 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3] Executing streaming plan for Bootstrap
INFO  [StreamConnectionEstablisher:1] 2017-05-12 16:15:20,462 StreamSession.java:220 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3] Starting streaming to /127.0.0.1
INFO  [StreamConnectionEstablisher:2] 2017-05-12 16:15:20,462 StreamSession.java:220 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3] Starting streaming to /127.0.0.2
INFO  [StreamConnectionEstablisher:3] 2017-05-12 16:15:20,462 StreamSession.java:220 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3] Starting streaming to /127.0.0.3
INFO  [StreamConnectionEstablisher:1] 2017-05-12 16:15:20,478 StreamCoordinator.java:209 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3, ID#0] Beginning stream session with /127.0.0.1
INFO  [StreamConnectionEstablisher:2] 2017-05-12 16:15:20,478 StreamCoordinator.java:209 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3, ID#0] Beginning stream session with /127.0.0.2
INFO  [StreamConnectionEstablisher:3] 2017-05-12 16:15:20,478 StreamCoordinator.java:209 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3, ID#0] Beginning stream session with /127.0.0.3
INFO  [STREAM-IN-/127.0.0.2] 2017-05-12 16:15:24,339 StreamResultFuture.java:166 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3 ID#0] Prepare completed. Receiving 11 files(10176549820 bytes), sending 0 files(0 bytes)
INFO  [STREAM-IN-/127.0.0.3] 2017-05-12 16:15:27,201 StreamResultFuture.java:180 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3] Session with /127.0.0.3 is complete
INFO  [STREAM-IN-/127.0.0.1] 2017-05-12 16:15:33,256 StreamResultFuture.java:180 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3] Session with /127.0.0.1 is complete
INFO  [StreamReceiveTask:1] 2017-05-12 16:36:31,249 StreamResultFuture.java:180 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3] Session with /127.0.0.2 is complete
INFO  [StreamReceiveTask:1] 2017-05-12 16:36:31,256 StreamResultFuture.java:212 - [Stream #604b5690-36da-11e7-aeb6-9d89ad20c2d3] All sessions completed
INFO  [main] 2017-05-12 16:36:31,257 StorageService.java:1167 - Bootstrap completed! for the tokens [1577102245397509090, -713021257351906154, 5943548853755748481, -186427637333122985, 89474807595263595, -3872409873927530770, 269282297308186556, -2090619435347582830, -7442271648674805532, 1993467991047389706, 3250292341615557960, 3680244045045170206, -6121195565829299067, 2336819841643904893, 8366041580813128754, -1539294702421999531, 5559860204752248078, 4990559483982320587, -5978802488822380342, 7738662906313460122, -8543589077123834538, 8470022885937685086, 7921538168239180973, 5167628632246463806, -8217637230111416952, 7867074371397881074, -6728907721317936873, -5403440910106158938, 417632467923200524, -5024952230859509916, -2145251677903377866, 62038536271402824]

Transition to Up and Normal state once streaming is complete

INFO  [main] 2017-05-12 16:36:31,348 StorageService.java:1715 - Node /127.0.0.5 state jump to NORMAL

During the bootstrapping process, the new node joining the cluster has no effect on the existing data in terms of Replication Factor (RF). However, the new node will accept new writes for the token ranges acquired while existing data from the other nodes is being streamed to it. This ensures that no new writes are missed while data changes hands. In addition, it ensures that Consistency Level (CL) is respected all the time during the streaming process and even in the case of bootstrap failure. Once the bootstrapping process for the new node completes, it will begin to serve read requests (and continue to receive writes). Like the pre-existing nodes in the cluster, it too will then have an effect on the data in terms of RF and CL.

While the bootstrapping feature can be a time saver when expanding a cluster, there are some “gotchas” that are worth noting. But before we do, we need first revisit some basics.

Back to basics

Cassandra uses a token system to work out which nodes will hold which partition keys for the primary replica of data. To work out where data is stored in the cluster, Cassandra will first apply a hashing function to the partition key. The generated hash is then used to calculate a token value using an algorithm; most commonly Murmur3 or RandomPartitioner.

As seen from the log snippets, when a new node is added to the cluster it will calculate the tokens of the different data replicas that is to be responsible for. This process where tokens are calculated and acquired by the new node is often referred to as a range movement. i.e. token ranges are being moved between nodes. Once the range movement has completed, the node will by default begin the bootstrapping process where it streams data for the acquired tokens from other nodes.

Gotchas

Range movements

Whilst range movements may sound simple, the process can create implications with maintaining data consistency. A number of patches have been added over time to help maintain data consistency during range movements. A fairly well known issue was CASSANDRA-2434 where it was highlighted that range movements violated consistency for Apache Cassandra versions below 2.1.x using vnodes.

A fix was added for the issue CASSANDRA-2434 to ensure range movements between nodes were consistent when using vnodes. Prior to this patch inconsistencies could be caused during bootstrapping as per the example Jeff Jirsa gave on the dev mailing list.

Consider the case of a cluster containing three nodes A, B and D with a RF of 3. If node B was offline and a key ‘foo’ was written with CL of QUORUM, the value for key ‘foo’ would go to nodes A and D.

At a later point in time node B is resurrected and added back into the cluster. Around the same time a node C is added to the cluster and begins bootstrapping.

One of the tokens node C calculates and acquires during the bootstrap process is for key ‘foo’. Node B is the closest node with data for the newly acquired token and thus node C begins streaming from the neighbouring node B. This process violates the consistency guarantees of Cassandra. This is because the data on node C will be the same as node B, and both are missing the value for key ‘foo’.

Thus, a query with a CL of QUORUM may query nodes B and C and return no data which is incorrect, despite there being data for ‘foo’ on node A. Node D previously had the correct data, but it stopped being a replica after C was inserted into the cluster.

The above issue was solved in CASSANDRA-2434 by changing the default behaviour to always trying to perform a consistent range movement. That is, when node C is added (in the previous example), data is streamed from the correct replica it is replacing, node D. In this case all queries with CL of QUORUM for the key ‘foo’ would always return the correct value.

The JVM option cassandra.consistent.rangemovement was added as part of this patch. The option allows consistent range movements during bootstrapping to be disabled should the user desire this behaviour. This fix is no silver bullet though, because it requires that the correct node be available for a consistent range moment during a bootstrap. This may not always be possible, and in such cases there are two options:

  1. Get the required node back online (preferred option).
  2. If the required node is unrecoverable, set JVM_OPTS="$JVM_OPTS -Dcassandra.consistent.rangemovement=false" in the cassandra-env.sh file to perform inconsistent range movements when auto bootstrapping. Once bootstrapping is complete, a repair will need to be run using the following command on the node. This is to ensure the data it streamed is consistent with the rest of the replicas.
nodetool repair -full

Adding multiple nodes

Another common cause of grief for users was bootstrapping multiple node simultaneously; captured in CASSANDRA-7069. Adding two new nodes simultaneously to a cluster could potentially be harmful, given the operations performed by a new node when joining. Waiting two minutes for the gossip state to propagate before adding a new node is possible, however as noted in CASSANDRA-9667, there is no coordination between nodes during token selection. For example consider that case if Node A was bootstrapped, then two minutes later Node B was bootstrapped. Node B could potentially pick token ranges already selected by Node A.

The above issue was solved in CASSANDRA-7069 by changing the default behaviour such that adding a node would fail if another node was already bootstrapping in a cluster. Similar to CASSANDRA-2434, this behaviour could be disabled by setting the JVM option JVM_OPTS="$JVM_OPTS -Dcassandra.consistent.rangemovement=false" in the cassandra-env.sh file on the bootstrapping node. This means that if cassandra.consistent.rangemovement=false is set to allow multiple nodes to bootstrap, the cluster runs the risk of violating consistency guarantees because of CASSANDRA-2434.

Changes made by CASSANDRA-7069 mean that the default behaviour forces a user to add a single node at a time to expand the cluster. This is the safest way of adding nodes to expand a cluster and ensure that the correct amount of data is streamed between nodes.

Data streaming

To further add to the confusion there is a misconception about what the auto_bootstrap property does in relation to a node being added to the cluster. Despite its name, this property controls the data streaming step only in the bootstrap process. The boolean property is by default set to true. When set to true, the data streaming step will be performed during the bootstrap process.

Setting auto_bootstrap to false when bootstrapping a new node exposes the cluster to huge inconsistencies. This is because all the other steps in the process are carried out but no data is streamed to the node. Hence, the node would be in the UN state without having any data for the token ranges it has been allocated! Furthermore, the new node without data will be serving reads and nodes that previously owned the tokens will no longer be serving reads. Effectively, the token ranges for that replica would be replaced with no data.

It is worth noting that the other danger to using auto_bootstrap set to false is no IP address collision check occurs. As per CASSANDRA-10134, if a new node has auto_bootstrap set to false and has the same address as an existing down node, the new node will take over the token range of the old node. No error is thrown, only a warning messages such as the following one below is written to the logs of the other nodes in the cluster. At the time of writing this post, the fix for this issue only appears in Apache Cassandra version 3.6 and above.

WARN  [GossipStage:1] 2017-05-19 17:35:10,994 TokenMetadata.java:237 - Changing /127.0.0.3's host ID from 1938db5d-5f23-46e8-921c-edde18e9c829 to c30fbbb8-07ae-412c-baea-90865856104e

The behaviour of auto_bootstrap: false can lead to data inconsistencies in the following way. Consider the case of a cluster containing three nodes A, B and D with a RF of 3. If node B was offline and a key ‘foo’ was written with CL of QUORUM, the value for key ‘foo’ would go to nodes A and D. In this scenario Node D is the owner of the token relating to the key ‘foo’.

At a later point in time node B is resurrected and added back into the cluster. Around the same time a node C is added to the cluster with auto_bootstrap set to false and begins the joining process.

One of the tokens node C calculates and acquires during the bootstrap process is for key ‘foo’. Now node D is no longer the owner and hence its data for the key ‘foo’ will no longer be used during reads/writes. This process causes inconsistencies in Cassandra because both nodes B and C contain no data for key ‘foo’.

Thus, a query with a CL of QUORUM may query nodes B and C and return no data which is incorrect, despite there being data for ‘foo’ on node A. Node D previously had data, but it stopped being a replica after C was inserted.

This confusing behaviour is one of the reasons why if you look into the cassandra.yaml file you will notice that the auto_bootstrap configuration property is missing. Exposure of the property in the cassandra.yaml was short lived, as it was removed via CASSANDRA-2447 in version 1.0.0. As a result, the property is hidden and its default value of true means that new nodes will stream data when they join the cluster.

Adding a replacement node

So far we have examined various options that control the bootstrapping default behaviour when a new node is added to a cluster. Adding a new node is just one case where bootstrapping is performed, what about the case of replacing a node in the cluster if one goes down?

Should an existing node go down and needs to be replaced, the JVM option cassandra.replace_address can be used. Note that this option is only available for Apache Cassandra versions 2.x.x and higher. This feature has been around for a while and blogged about by other users in the past.

As the name suggests, it effectively replaces a down or dead node in the cluster with a new node. It is because of this that replace address option should only be used if the node is in a Down and Normal state (represented by DN in the nodetool status). Furthermore, there are no range movements that occur when using this feature, the new replacement node will simply inherit the old dead node’s token ranges. This is simpler than decommissioning the dead node and bootstrapping a fresh one, which would involve two range movements and two streaming phases. Yuck! To use the option, simply add JVM_OPTS="$JVM_OPTS -Dcassandra.replace_address=<IP_ADDRESS>" to the cassandra-env.sh file of the new node that will be replacing the old node. Where <IP_ADDRESS> is the IP address of the node to be replaced.

Once the node completes bootstrapping and joins the cluster, the JVM_OPTS="$JVM_OPTS -Dcassandra.replace_address=<IP_ADDRESS>" option must be removed from the cassandra-env.sh file or the node will fail to start on a restart. This is a short coming of the cassandra.replace_address feature. Many operators will typically be worried about a dead node being replaced and as a result forget to update the cassandra-env.sh file after the job is complete. It was for this reason that CASSANDRA-7356 was raised and resulted in a new option being added; cassandra.replace_address_first_boot. This option works once when Cassandra is first started and the replacement node inserted into the cluster. After that, the option is ignored for all subsequent restarts. It works in the same way as its predecessor; simply add JVM_OPTS="$JVM_OPTS -Dcassandra.replace_address_first_boot=<IP_ADDRESS>" to the cassandra-env.sh and the new node is ready to be inserted.

Hang on! What about adding a replacement a seed node?

Ok, so you need to replace a seed node. Seed nodes are just like every other node in the cluster. As per the Apache Cassandra documentation, the only difference being seed nodes are the go to node when a new node joins the cluster.

There are a few extra steps to replace a seed node and bootstrap a new one in its place. Before adding the replacement seed node, the IP address of the seed node will need to be removed from the seed_provider list in the cassandra.yaml file and replaced with another node in the cluster. This needs to be done for all the nodes in the cluster. Naturally, a rolling restart will need to be performed for the changes to take effect. Once the change is complete the replacement node can be inserted as described in the previous section of this post.

What to do after it completes successfully

Once your node has successfully completed the bootstrapping process, it will transition to Up and Normal state (represented by UN in the nodetool status) to indicate it is now part of the cluster. At this point it is time to cleanup the nodes on your cluster. Yes, your nodes are dirty and need to be cleaned. “Why?” you ask, well the reason is the data that has been acquired by the newly added node still remains on the nodes that previously owned it. Whilst the nodes that previously owned the data have streamed it to the new node and relinquished the associated tokens, the data that was streamed still remains on the original nodes. This “orphaned” data is consuming valuable disk space, and in the cases large data sets; probably consuming a significant amount.

However, before running off to the console to remove the orphaned data from the nodes, make sure it is done as a last step in a cluster expansion. If the expansion of the cluster requires only one node to be added, perform the cleanup after the node has successfully completed bootstrapping and joined the cluster. If the expansion requires three nodes to be added, perform the cleanup after all three nodes have successfully completed bootstrapping and joined the cluster. This is because the cleanup will need to be executed on all nodes in the cluster, except for the last node that was added to the cluster. The last node added to the cluster will contain only the data it needed for the tokens acquired, where as other nodes may contain data for tokens they no longer have. It is still ok to run cleanup on the last node, it will likely return immediately after it is called.

The cleanup can be executed on each node using the following command.

nodetool cleanup -j <COMPACTION_SLOTS>

Where <COMPACTION_SLOTS> is the number of compaction slots to use for cleanup. By default this is 2. If set to 0 it will use use all available compaction threads.

It is probably worth limiting the number of compaction slots used by cleanup otherwise it could potentially block compactions.

Help! It failed

The bootstrap process for a joining node can fail. Bootstrapping will put extra load on the network so should bootstrap fail, you could try tweaking the streaming_socket_timeout_in_ms. Set streaming_socket_timeout_in_ms in the cassandra.yaml file to 24 hours (60 * 60 * 24 * 1000 = 86,400,000ms). Having a socket timeout set is crucial for catching streams that hang and reporting them via an exception in the logs as per CASSANDRA-11286.

If the bootstrap process fails in Cassandra version 2.1.x, the process will need to be restarted all over again. This can be done using the following steps.

  1. Stop Cassandra on the node.
  2. Delete all files and directories from the data, commitlog and save_cache directories but leave the directories there.
  3. Wait about two minutes.
  4. Start Cassandra on the node.

If the bootstrap process fails in Cassandra 2.2.x, the process can be easily be resumed using the following command thanks to CASSANDRA-8942.

nodetool bootstrap resume

Testing the theory

We have gone through a lot of theory in this post, so I thought it would be good to test some of it out to demonstrate what can happen when bootstrapping multiple nodes at the same time.

Setup

In my test I used a three node local cluster running Apache Cassandra 2.1.14 which was created with the ccm tool. Each node was configured to use vnodes; specifically num_tokens was set to 32 in the cassandra.yaml file. The cluster was loaded with around 20 GB of data generated from the killrweather dataset. Data loading was performed in batches using cdm. Prior to starting the test the cluster looked like this.

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  19.19 GB   32      29.1%             cfb50e13-52a4-4821-bca2-4dba6061d38a  rack1
UN  127.0.0.2  9.55 GB    32      37.4%             5176598f-bbab-4165-8130-e33e39017f7e  rack1
UN  127.0.0.3  19.22 GB   32      33.5%             d261faaf-628f-4b86-b60b-3825ed552aba  rack1

It was not the most well balanced cluster, however it was good enough for testing. It should be noted that the node with IP address 127.0.0.1 was set to be the only seed node in the cluster. Taking a quick peak at the keyspace configuration in using CQLSH and we can see that it was using replication_factor: 1 i.e. RF = 1.

cqlsh> describe killrweather

CREATE KEYSPACE killrweather WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

Adding a new node

A new node (node4) was added to the cluster.

$ ccm node4 start

After a minute or so node4 was in the UJ state and began the bootstrap process.

$ ccm node1 nodetool status


Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  19.19 GB   32      29.1%             cfb50e13-52a4-4821-bca2-4dba6061d38a  rack1
UN  127.0.0.2  9.55 GB    32      37.4%             5176598f-bbab-4165-8130-e33e39017f7e  rack1
UN  127.0.0.3  19.22 GB   32      33.5%             d261faaf-628f-4b86-b60b-3825ed552aba  rack1
UJ  127.0.0.4  14.44 KB   32      ?                 ae0a26a6-fab5-4cab-a189-697818be3c95  rack1

It was observed that node4 had started streaming data from node1 (IP address 127.0.0.1) and node2 (IP address 127.0.0.2).

$ ccm node4 nodetool netstats

Mode: JOINING
Bootstrap f4e54a00-36d9-11e7-b18e-9d89ad20c2d3
    /127.0.0.1
        Receiving 9 files, 10258729018 bytes total. Already received 2 files, 459059994 bytes total
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-3-Data.db 452316846/452316846 bytes(100%) received from idx:0/127.0.0.1
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-2-Data.db 6743148/6743148 bytes(100%) received from idx:0/127.0.0.1
    /127.0.0.3
    /127.0.0.2
        Receiving 11 files, 10176549820 bytes total. Already received 1 files, 55948069 bytes total
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-1-Data.db 55948069/55948069 bytes(100%) received from idx:0/127.0.0.2
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name                    Active   Pending      Completed
Commands                        n/a         0              6
Responses                       n/a         0            471

Adding another new node

A few minutes later another new node (node5) was added to the cluster. To add this node to the cluster while node4 was bootstrapping the JVM option JVM_OPTS="$JVM_OPTS -Dcassandra.consistent.rangemovement=false" was added to the node’s cassandra-env.sh file. The node was then started.

$ ccm node5 start

After about a minute node5 was in the UJ state and it too began the bootstrap process.

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  19.19 GB   32      29.1%             cfb50e13-52a4-4821-bca2-4dba6061d38a  rack1
UN  127.0.0.2  9.55 GB    32      37.4%             5176598f-bbab-4165-8130-e33e39017f7e  rack1
UN  127.0.0.3  19.22 GB   32      33.5%             d261faaf-628f-4b86-b60b-3825ed552aba  rack1
UJ  127.0.0.4  106.52 KB  32      ?                 ae0a26a6-fab5-4cab-a189-697818be3c95  rack1
UJ  127.0.0.5  14.43 KB   32      ?                 a71ed178-f353-42ec-82c8-d2b03967753a  rack1

It was observed that node5 had started streaming data from node2 as well; the same node that node4 was streaming data from.

$ ccm node5 nodetool netstats

Mode: JOINING
Bootstrap 604b5690-36da-11e7-aeb6-9d89ad20c2d3
    /127.0.0.3
    /127.0.0.2
        Receiving 11 files, 10176549820 bytes total. Already received 1 files, 55948069 bytes total
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-1-Data.db 55948069/55948069 bytes(100%) received from idx:0/127.0.0.2
    /127.0.0.1
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name                    Active   Pending      Completed
Commands                        n/a         0              8
Responses                       n/a         0            255

The interesting point to note when looking at the netstats was that both node4 and node5 were each streaming a Data.db file exactly 55948069 bytes from node2.

Data streaming much

It had appeared that both node4 and node5 were streaming the same data from node2. This continued as the bootstrapping process progressed; the size of the files being streamed from node2 were the same for both node4 and node5. Checking the netstats on node4 produced the following.

$ ccm node4 nodetool netstats

Bootstrap f4e54a00-36d9-11e7-b18e-9d89ad20c2d3
    /127.0.0.1
        Receiving 9 files, 10258729018 bytes total. Already received 6 files, 10112487796 bytes total
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-13-Data.db 1788940555/1788940555 bytes(100%) received from idx:0/127.0.0.1
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-5-Data.db 7384377358/7384377358 bytes(100%) received from idx:0/127.0.0.1
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-12-Data.db 27960312/27960312 bytes(100%) received from idx:0/127.0.0.1
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-3-Data.db 452316846/452316846 bytes(100%) received from idx:0/127.0.0.1
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-11-Data.db 452149577/452149577 bytes(100%) received from idx:0/127.0.0.1
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-2-Data.db 6743148/6743148 bytes(100%) received from idx:0/127.0.0.1
    /127.0.0.3
    /127.0.0.2
        Receiving 11 files, 10176549820 bytes total. Already received 10 files, 10162463079 bytes total
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-1-Data.db 55948069/55948069 bytes(100%) received from idx:0/127.0.0.2
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-9-Data.db 55590043/55590043 bytes(100%) received from idx:0/127.0.0.2
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-6-Data.db 901588743/901588743 bytes(100%) received from idx:0/127.0.0.2
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-15-Data.db 14081154/14081154 bytes(100%) received from idx:0/127.0.0.2
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-16-Data.db 1450179/1450179 bytes(100%) received from idx:0/127.0.0.2
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-8-Data.db 901334951/901334951 bytes(100%) received from idx:0/127.0.0.2
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-10-Data.db 3622476547/3622476547 bytes(100%) received from idx:0/127.0.0.2
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-17-Data.db 56277615/56277615 bytes(100%) received from idx:0/127.0.0.2
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-4-Data.db 3651310715/3651310715 bytes(100%) received from idx:0/127.0.0.2
            .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-7-Data.db 902405063/902405063 bytes(100%) received from idx:0/127.0.0.2
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name                    Active   Pending      Completed
Commands                        n/a         0              6
Responses                       n/a         0           4536

Then checking netstats on node5 produced the following.

$ ccm node5 nodetool netstats

Mode: JOINING
Bootstrap 604b5690-36da-11e7-aeb6-9d89ad20c2d3
    /127.0.0.1
    /127.0.0.3
    /127.0.0.2
        Receiving 11 files, 10176549820 bytes total. Already received 9 files, 10106185464 bytes total
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-2-Data.db 3651310715/3651310715 bytes(100%) received from idx:0/127.0.0.2
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-1-Data.db 55948069/55948069 bytes(100%) received from idx:0/127.0.0.2
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-9-Data.db 1450179/1450179 bytes(100%) received from idx:0/127.0.0.2
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-3-Data.db 901588743/901588743 bytes(100%) received from idx:0/127.0.0.2
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-6-Data.db 55590043/55590043 bytes(100%) received from idx:0/127.0.0.2
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-4-Data.db 902405063/902405063 bytes(100%) received from idx:0/127.0.0.2
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-8-Data.db 14081154/14081154 bytes(100%) received from idx:0/127.0.0.2
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-5-Data.db 901334951/901334951 bytes(100%) received from idx:0/127.0.0.2
            .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-7-Data.db 3622476547/3622476547 bytes(100%) received from idx:0/127.0.0.2
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name                    Active   Pending      Completed
Commands                        n/a         0              8
Responses                       n/a         0           4383

To be absolutely sure about what was being observed, I ran a command to order the netstats output by file size for both node4 and node5.

$ for file_size in $(ccm node4 nodetool netstats  | grep '(100%)\ received' | grep '127.0.0.2' | tr -s ' ' | cut -d' ' -f3 | cut -d'/' -f1 | sort -g); do ccm node4 nodetool netstats | grep ${file_size} | tr -s ' '; done
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-16-Data.db 1450179/1450179 bytes(100%) received from idx:0/127.0.0.2
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-15-Data.db 14081154/14081154 bytes(100%) received from idx:0/127.0.0.2
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-9-Data.db 55590043/55590043 bytes(100%) received from idx:0/127.0.0.2
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-1-Data.db 55948069/55948069 bytes(100%) received from idx:0/127.0.0.2
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-17-Data.db 56277615/56277615 bytes(100%) received from idx:0/127.0.0.2
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-8-Data.db 901334951/901334951 bytes(100%) received from idx:0/127.0.0.2
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-6-Data.db 901588743/901588743 bytes(100%) received from idx:0/127.0.0.2
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-7-Data.db 902405063/902405063 bytes(100%) received from idx:0/127.0.0.2
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-10-Data.db 3622476547/3622476547 bytes(100%) received from idx:0/127.0.0.2
 .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-4-Data.db 3651310715/3651310715 bytes(100%) received from idx:0/127.0.0.2

$ for file_size in $(ccm node5 nodetool netstats  | grep '(100%)\ received' | grep '127.0.0.2' | tr -s ' ' | cut -d' ' -f3 | cut -d'/' -f1 | sort -g); do ccm node5 nodetool netstats | grep ${file_size} | tr -s ' '; done
 .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-9-Data.db 1450179/1450179 bytes(100%) received from idx:0/127.0.0.2
 .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-8-Data.db 14081154/14081154 bytes(100%) received from idx:0/127.0.0.2
 .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-6-Data.db 55590043/55590043 bytes(100%) received from idx:0/127.0.0.2
 .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-1-Data.db 55948069/55948069 bytes(100%) received from idx:0/127.0.0.2
 .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-5-Data.db 901334951/901334951 bytes(100%) received from idx:0/127.0.0.2
 .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-3-Data.db 901588743/901588743 bytes(100%) received from idx:0/127.0.0.2
 .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-4-Data.db 902405063/902405063 bytes(100%) received from idx:0/127.0.0.2
 .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-7-Data.db 3622476547/3622476547 bytes(100%) received from idx:0/127.0.0.2
 .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-tmp-ka-2-Data.db 3651310715/3651310715 bytes(100%) received from idx:0/127.0.0.2

With the exception of one file being streamed by node4, killrweather-raw_weather_data-tmp-ka-17-Data.db (size 56277615 bytes), node4 and node5 looked to be streaming the same data from node2. This was the first confirmation that node5 had stolen the tokens that where originally calculated by node4. Furthermore, it looked like node 4 was performing unnecessary streaming from node2. I noted down the file sizes displayed by node5’s netstats output to help track down data files on each node.

$ ccm node5 nodetool netstats | grep '(100%)\ received' | grep '127.0.0.2' | tr -s ' ' | cut -d' ' -f3 | cut -d'/' -f1 | sort -g > file_sizes.txt; cat file_sizes.txt
1450179
14081154
55590043
55948069
901334951
901588743
902405063
3622476547
3651310715

Token and the thief

Once both nodes had finished bootstrapping and had successfully joined the cluster it looked like this.

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  19.19 GB   32      14.8%             cfb50e13-52a4-4821-bca2-4dba6061d38a  rack1
UN  127.0.0.2  9.55 GB    32      22.0%             5176598f-bbab-4165-8130-e33e39017f7e  rack1
UN  127.0.0.3  19.22 GB   32      23.6%             d261faaf-628f-4b86-b60b-3825ed552aba  rack1
UN  127.0.0.4  19.17 GB   32      17.5%             ae0a26a6-fab5-4cab-a189-697818be3c95  rack1
UN  127.0.0.5  9.55 GB    32      22.1%             a71ed178-f353-42ec-82c8-d2b03967753a  rack1

Using the file sizes I captured earlier from node5 netstats, I checked the data directories of node4 and node5 to confirm both nodes contained files of those sizes.

$ for file_size in $(cat file_sizes.txt); do ls -al .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/ | grep ${file_size}; done
-rw-r--r--    1 anthony  staff     1450179 12 May 16:33 killrweather-raw_weather_data-ka-16-Data.db
-rw-r--r--    1 anthony  staff    14081154 12 May 16:33 killrweather-raw_weather_data-ka-15-Data.db
-rw-r--r--    1 anthony  staff    55590043 12 May 16:33 killrweather-raw_weather_data-ka-9-Data.db
-rw-r--r--    1 anthony  staff    55948069 12 May 16:33 killrweather-raw_weather_data-ka-1-Data.db
-rw-r--r--    1 anthony  staff   901334951 12 May 16:33 killrweather-raw_weather_data-ka-8-Data.db
-rw-r--r--    1 anthony  staff   901588743 12 May 16:33 killrweather-raw_weather_data-ka-6-Data.db
-rw-r--r--    1 anthony  staff   902405063 12 May 16:33 killrweather-raw_weather_data-ka-7-Data.db
-rw-r--r--    1 anthony  staff  3622476547 12 May 16:33 killrweather-raw_weather_data-ka-10-Data.db
-rw-r--r--    1 anthony  staff  3651310715 12 May 16:33 killrweather-raw_weather_data-ka-4-Data.db

$ for file_size in $(cat file_sizes.txt); do ls -al  .../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/ | grep ${file_size}; done
-rw-r--r--    1 anthony  staff     1450179 12 May 16:36 killrweather-raw_weather_data-ka-9-Data.db
-rw-r--r--    1 anthony  staff    14081154 12 May 16:36 killrweather-raw_weather_data-ka-8-Data.db
-rw-r--r--    1 anthony  staff    55590043 12 May 16:36 killrweather-raw_weather_data-ka-6-Data.db
-rw-r--r--    1 anthony  staff    55948069 12 May 16:36 killrweather-raw_weather_data-ka-1-Data.db
-rw-r--r--    1 anthony  staff   901334951 12 May 16:36 killrweather-raw_weather_data-ka-5-Data.db
-rw-r--r--    1 anthony  staff   901588743 12 May 16:36 killrweather-raw_weather_data-ka-3-Data.db
-rw-r--r--    1 anthony  staff   902405063 12 May 16:36 killrweather-raw_weather_data-ka-4-Data.db
-rw-r--r--    1 anthony  staff  3622476547 12 May 16:36 killrweather-raw_weather_data-ka-7-Data.db
-rw-r--r--    1 anthony  staff  3651310715 12 May 16:36 killrweather-raw_weather_data-ka-2-Data.db

So both nodes contained files of the same size. I then decided to check if the files on each node that were the same size had the same data content. This check was done by performing an MD5 check of file pairs that were the same size.

$ BASE_DIR=...; DATA_DIR=data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d; for file_size in $(cat file_sizes.txt); do node_4_file=$(ls -al ${BASE_DIR}/node4/${DATA_DIR}/ | grep ${file_size} | tr -s ' ' | cut -d' ' -f9); node_5_file=$(ls -al ${BASE_DIR}/node5/${DATA_DIR}/ | grep ${file_size} | tr -s ' ' | cut -d' ' -f9); md5 ${BASE_DIR}/node4/${DATA_DIR}/${node_4_file} ${BASE_DIR}/node5/${DATA_DIR}/${node_5_file}; echo; done
MD5 (.../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-16-Data.db) = a9edb85f70197c7f37aa021c817de2a2
MD5 (.../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-9-Data.db) = a9edb85f70197c7f37aa021c817de2a2

MD5 (.../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-15-Data.db) = 975f184ae36cbab07a9c28b032532f88
MD5 (.../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-8-Data.db) = 975f184ae36cbab07a9c28b032532f88

MD5 (.../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-9-Data.db) = f0160cf8e7555031b6e0835951e1896a
MD5 (.../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-6-Data.db) = f0160cf8e7555031b6e0835951e1896a

MD5 (.../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-1-Data.db) = 7789b794bb3ef24338282d4a1a960903
MD5 (.../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-1-Data.db) = 7789b794bb3ef24338282d4a1a960903

MD5 (.../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-8-Data.db) = 1738695bb6b4bd237b3592e80eb785f2
MD5 (.../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-5-Data.db) = 1738695bb6b4bd237b3592e80eb785f2

MD5 (.../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-6-Data.db) = f7d1faa5c59a26a260038d61e4983022
MD5 (.../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-3-Data.db) = f7d1faa5c59a26a260038d61e4983022

MD5 (.../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-7-Data.db) = d791179432dcdbaf9a9b315178fb04c7
MD5 (.../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-4-Data.db) = d791179432dcdbaf9a9b315178fb04c7

MD5 (.../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-10-Data.db) = 3e6623c2f06bcd3f5caeacee1917898b
MD5 (.../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-7-Data.db) = 3e6623c2f06bcd3f5caeacee1917898b

MD5 (.../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-4-Data.db) = 8775f5df08882df353427753f946bf10
MD5 (.../node5/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-2-Data.db) = 8775f5df08882df353427753f946bf10

Now I had absolute proof that both nodes did in fact stream the same data from node2. It did look as though that when node5 joined the cluster it had taken tokens calculated by node4. If this were the case, it would mean that the data files on node4 that are the same on node5 would no longer be needed. One way to prove that there is “orphaned” data on node4 i.e. data not associated to any of node4’s tokens, would be to run cleanup on the cluster. If there is orphaned data on node4 the cleanup would technically delete all or some of those files. Before running cleanup on the cluster, I took note of the files on node4 which were the same as the ones on node5.

$ for file_size in $(cat file_sizes.txt); do ls -al .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/ | grep ${file_size}; | tr -s ' ' | cut -d' '  -f9; done > node4_orphaned_files.txt; cat node4_orphaned_files.txt
killrweather-raw_weather_data-ka-16-Data.db
killrweather-raw_weather_data-ka-15-Data.db
killrweather-raw_weather_data-ka-9-Data.db
killrweather-raw_weather_data-ka-1-Data.db
killrweather-raw_weather_data-ka-8-Data.db
killrweather-raw_weather_data-ka-6-Data.db
killrweather-raw_weather_data-ka-7-Data.db
killrweather-raw_weather_data-ka-10-Data.db
killrweather-raw_weather_data-ka-4-Data.db

I then ran a cleanup on all the nodes in the cluster.

$ ccm node1 nodetool cleanup
$ ccm node2 nodetool cleanup
$ ccm node3 nodetool cleanup
$ ccm node4 nodetool cleanup
$ ccm node5 nodetool cleanup

$ ccm node1 nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.1  9.57 GB    32      14.8%             cfb50e13-52a4-4821-bca2-4dba6061d38a  rack1
UN  127.0.0.2  138.92 KB  32      22.0%             5176598f-bbab-4165-8130-e33e39017f7e  rack1
UN  127.0.0.3  19.22 GB   32      23.6%             d261faaf-628f-4b86-b60b-3825ed552aba  rack1
UN  127.0.0.4  9.62 GB    32      17.5%             ae0a26a6-fab5-4cab-a189-697818be3c95  rack1
UN  127.0.0.5  9.55 GB    32      22.1%             a71ed178-f353-42ec-82c8-d2b03967753a  rack1

From this output it was obvious that node4 contained orphaned data. Earlier I had run a nodetool status which was just after both nodes completed bootstrapping and moved to the UN state, and prior to running cleanup. The output produced at that point showed that node4 had a Load of 19.17 GB. Now after cleanup it was showing to have a load of 9.62 GB. As a final verification, I iterated through the list of files on node4 which were the same as the ones on node5 (node4_orphaned_files.txt) and checked if they still were present on node4.

$ for file_name in $(cat node4_orphaned_files.txt); do ls .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/${file_name}; done
ls: .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-16-Data.db: No such file or directory
ls: .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-15-Data.db: No such file or directory
ls: .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-9-Data.db: No such file or directory
ls: .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-1-Data.db: No such file or directory
ls: .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-8-Data.db: No such file or directory
ls: .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-6-Data.db: No such file or directory
ls: .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-7-Data.db: No such file or directory
ls: .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-10-Data.db: No such file or directory
ls: .../node4/data0/killrweather/raw_weather_data-32f23d1015cb11e79d0fa90042a0802d/killrweather-raw_weather_data-ka-4-Data.db: No such file or directory

As it can be seen the files were deleted as part of the cleanup on node4. Which means that during bootstrap node4 originally calculated tokens for that data. It then asked for a list of files that related to those tokens from node2 and began streaming them. A little while later node5 was added to the cluster while node4 was still bootstrapping. It then calculated tokens that overlapped with node4’s tokens. Node5 then asked for a list of files that related to those tokens from node2 and started streaming data for them as well. The issue here is node4 was never notified that it no longer required to stream files from node2. Hence, unnecessary resources were being consumed as a result of bootstrapping two nodes at the same time.

Conclusion

Auto bootstrapping combined with vnodes is probably one of the most handy features in Cassandra. It takes the pain out of manually having to move data around ensure a continuous availability while expanding the cluster in a reliable and efficient way. There a number of knobs and levers for controlling the default behaviour of bootstrapping.

Configuration properties

  • auto_bootstrap - controls whether data is streamed to the new node when inserted.
  • streaming_socket_timeout_in_ms - sets socket timeout for streaming operations.

JVM options

  • cassandra.consistent.rangemovement - controls consistent range movements and multiple node bootstrapping.
  • cassandra.replace_address_first_boot=<IP_ADDRESS> - allows a down node to be replaced with a new node.

As demonstrated by setting the JVM option cassandra.consistent.rangemovement=false the cluster runs the risk of over streaming of data and worse still, it can violate consistency. For new users to Cassandra, the safest way to add multiple nodes into a cluster is to add them one at a time. Stay tuned as I will be following up with another post on bootstrapping.

Testing a Spark Application

This is the third article in our Spark-related blog series, which covers some basic approaches for the testing of Cassandra/Spark code. It will show you how to restructure your code to be testable, covering unit testing, integration testing, and acceptance testing

Continue reading Testing a Spark Application on opencredo.com.

The Last Pickle Is Hiring

The Last Pickle (TLP) intends to hire a project manager in the US to work directly with customers in the US and around the world. You will be part of the TLP team, coordinating and managing delivery of high-quality consulting services including expert advice, documentation and run books, diagnostics and troubleshooting, and proof-of-concept code.

This role reports to the COO, and works closely with our CTO and technical team of Apache Cassandra consultants.

Responsibilities Include

  • Managing project budgets, resources, and schedules to ensure on-time and on-budget delivery services.
  • Enabling the Consulting team to be as effective and efficient as possible, taking a bias towards action to remove barriers to delivery as needed.
  • Providing the highest level of customer service to multiple projects simultaneously.
  • Taking the lead on day-to-day client communication and/or coordination.
  • Assisting with gathering business requirements, estimating, and scoping new projects.
  • Ensuring timely response to customer requests.
  • Managing the expectations of the internal team and clients.
  • Escalating and coordinating resolution of issues internally as needed.

Skills and Experience

  • Excellent written and oral communication skills, ensuring zero ambiguity and clear direction in a manner suiting the audience.
    • Ability to communicate in an open and ongoing way.
    • Must be comfortable asking questions.
  • Comfortable with spreadsheets and project management tools.
  • 3 years of relatable experience with similar responsibilities.
  • Experience working remotely with a high-level of autonomy.
  • Solid understanding of consulting business operations.
  • Strong organizational skills with the ability to “see at least three steps ahead” of everyone else.
  • Great attention to detail. Please address your cover letter to “Cristen.”
  • Ability to anticipate and manage risk.

Bonus points for

  • Experience with open source and/or big data platforms.
  • Ability to problem solve and think intuitively about managing teams and clients (you are not a robot).
  • Have a technical aptitude and can help translate business needs and technical delivery requirements.
  • Are currently living in Austin, Texas.

In return we offer

  • Being part of a globally recognised team of experts.
  • Flexible workday and location.
  • Time to work on open source projects and support for public speaking.
  • As much or as little business travel as you want.
  • No on-call roster.
  • A great experience helping companies big and small be successful.

Apply

If this sounds like the right job for you let us know by emailing careers@thelastpickle.com .

About

The Last Pickle was born out of our passion for the open source community and firmly held belief that Cassandra would become the ubiquitous database platform of the next generation. We have maintained our focus on Apache Cassandra since starting in March 2011 as the first pure Apache Cassandra consultancy in the world. In the last six years we have been part of the success of customers big and small around the globe as they harness the power of Cassandra to grow their companies.

Now we are looking to grow our own company by expanding our team in the US. As a profitable, self-funded start-up, TLP is able to place people at the heart of what we do. After years of working in a globally distributed team, with staff in New Zealand, America, Australia, and France, we realise happiness is the most important element in everything we do. We offer flexible work days with staff working from a mix of home and share offices, while still finding time in their day to pick up kids from school, go running, or check the surf conditions. With the help of our dedicated Happiness Coordinator, we work together to create a work-life balance that is mutually beneficial.

Deploy Spark with an Apache Cassandra cluster

This is the second article in our Spark-related blog series which moves on from demonstrating functionality and describes the details of how to set up an infrastructure capable of running such analytical processing

Continue reading Deploy Spark with an Apache Cassandra cluster on opencredo.com.

New Blog Series: Spark – The Pragmatic Bits

Interested in learning more about the practical and pragmatic aspects of using Apache Spark for your data processing challenges? Please join us for our new blog and webinar series: Apache Spark - The Pragmatic Bits. The blog series will aim to cover topics including how Spark can be used to get the most out of your Cassandra setup, how to actually deploy a Spark and Cassandra cluster through programmable infrastrcture, and how to ensure you write testable Spark code which will play nicely with the rest of your system. The series finishes with a webinar which explores the use case of “Detecting stolen AWS credential usage with Spark”.

Continue reading New Blog Series: Spark – The Pragmatic Bits on opencredo.com.