Improving Apache Cassandra’s Front Door and Backpressure

As part of CASSANDRA-15013, we have improved Cassandra’s ability to handle high throughput workloads, while having enough safeguards in place to protect itself from potentially going out of memory. In order to better explain the change we have made, let us understand at a high level, on how an incoming request is processed by Cassandra before the fix, followed by what we changed, and the new relevant configuration knobs available.

How inbound requests were handled before

Let us take the scenario of a client application sending requests to C* cluster. For the purpose of this blog, let us focus on one of the C* coordinator nodes.

alt_text

Below is the microscopic view of client-server interaction at the C* coordinator node. Each client connection to Cassandra node happens over a netty channel, and for efficiency purposes, each Netty eventloop thread is responsible for more than one netty channel.

alt_text

The eventloop threads read requests coming off of netty channels and enqueue them into a bounded inbound queue in the Cassandra node.

alt_text

A thread pool dequeues requests from the inbound queue, processes them asynchronously and enqueues the response into an outbound queue. There exist multiple outbound queues, one for each eventloop thread to avoid races.

alt_text

alt_text

alt_text

The same eventloop threads that are responsible for enqueuing incoming requests into the inbound queue, are also responsible for dequeuing responses off from the outbound queue and shipping responses back to the client.

alt_text

alt_text

Issue with this workflow

Let us take a scenario where there is a spike in operations from the client. The eventloop threads are now enqueuing requests at a much higher rate than the rate at which the requests are being processed by the native transport thread pool. Eventually, the inbound queue reaches its limit and says it cannot store any more requests in the queue.

alt_text

Consequently, the eventloop threads get into a blocked state as they try to enqueue more requests into an already full inbound queue. They wait until they can successfully enqueue the request in hand, into the queue.

alt_text

As noted earlier, these blocked eventloop threads are also supposed to dequeue responses from the outbound queue. Given they are in blocked state, the outbound queue (which is unbounded) grows endlessly, with all the responses, eventually resulting in C* going out of memory. This is a vicious cycle because, since the eventloop threads are blocked, there is no one to ship responses back to the client; eventually client side timeout triggers, and clients may send more requests due to retries. This is an unfortunate situation to be in, since Cassandra is doing all the work of processing these requests as fast as it can, but there is no one to ship the produced responses back to the client.

alt_text

So far, we have built a fair understanding of how the front door of C* works with regard to handling client requests, and how blocked eventloop threads can affect Cassandra.

What we changed

Backpressure

The essential root cause of the issue is that eventloop threads are getting blocked. Let us not block them by making the bounded inbound queue unbounded. If we are not careful here though, we could have an out of memory situation, this time because of the unbounded inbound queue. So we defined an overloaded state for the node based on the memory usage of the inbound queue.

We introduced two levels of thresholds, one at the node level, and the other more granular, at client IP. The one at client IP helps to isolate rogue client IPs, while not affecting other good clients, if there is such a situation.

These thresholds can be set using cassandra yaml file.

native_transport_max_concurrent_requests_in_bytes_per_ip
native_transport_max_concurrent_requests_in_bytes

These thresholds can be further changed at runtime (CASSANDRA-15519).

Configurable server response to the client as part of backpressure

If C* happens to be in overloaded state (as defined by the thresholds mentioned above), C* can react in one of the following ways:

  • Apply backpressure by setting “Autoread” to false on the netty channel in question (default behavior).
  • Respond back to the client with Overloaded Exception (if client sets “THROW_ON_OVERLOAD” connection startup option to “true.”

Let us look at the client request-response workflow again, in both these cases.

THROW_ON_OVERLOAD = false (default)

If the inbound queue is full (i.e. the thresholds are met).

alt_text

C* sets autoread to false on the netty channel, which means it will stop reading bytes off of the netty channel.

alt_text

Consequently, the kernel socket inbound buffer becomes full since no bytes are being read off of it by netty eventloop.

alt_text

Once the Kernel Socket Inbound Buffer is full on the server side, things start getting piled up in the Kernel Socket Outbound Buffer on the client side, and once this buffer gets full, client will start experiencing backpressure.

alt_text

THROW_ON_OVERLOAD = true

If the inbound queue is full (i.e. the thresholds are met), eventloop threads do not enqueue the request into the Inbound Queue. Instead, the eventloop thread creates an OverloadedException response message and enqueues it into the flusher queue, which will then be shipped back to the client.

alt_text

This way, Cassandra is able to serve very large throughput, while protecting itself from getting into memory starvation issues. This patch has been vetted through thorough performance benchmarking. Detailed performance analysis can be found here.

Apache Cassandra vs DynamoDB

In this post, we’ll look at some of the key differences between Apache Cassandra (hereafter just Cassandra) and DynamoDB.

Both are distributed databases and have similar architecture, and both offer incredible scalability, reliability, and resilience. However, there are also differences,  and understanding the differences and cost benefits can help you determine the right solution for your application.

Apache Cassandra is an open source database available at no cost from the Apache Foundation. Installing and configuring Cassandra can be challenging and there is more than one pitfall along the way. However, Cassandra can be installed on any cloud service or at a physical location you choose.

The typical Cassandra installation is a cluster which is a collection of nodes (a node is a single instance of Cassandra installed on a computer or in a Docker container). Nodes can then be grouped in racks and data centers which can be in different locations (cloud zones and regions or physical collocations). You must scale Cassandra as your demand grows and are responsible for the ongoing management tasks such as backups, replacing bad nodes, or adding new nodes to meet demand.

Amazon DynamoDB is a fully managed database as a service. All implementation details are hidden and from the user viewpoint DynamoDB is serverless. DynamoDB automatically scales throughput capacity to meet workload demands, and partitions and repartitions your data as your table size grows, and distributes data across multiple availability zones. However, the service is available only through Amazon Web Services (AWS).

Replica Configuration and Placement

NoSQL data stores like Cassandra and DynamoDB use multiple replicas (copies of data) to ensure high availability and durability. The number of replicas and their placement determines the availability of your data.

With Cassandra, the number of replicas to have per cluster—the replication factor—and their placement is configurable. A cluster can be subdivided into two or more data centers which can be located in different cloud regions or physical collocations. The nodes in a data center can be assigned to different racks that can be assigned to different zones or to different physical racks.

In contrast, with DynamoDB, Amazon makes these decisions for you. By default, data is located in a single region and is replicated to three (3) availability zones in that region. Replication to different AWS regions is available as an option. Amazon streams must be enabled for multi-region replication.

Data Model

The top level data structure in Cassandra is the keyspace which is analogous to a relational database. The keyspace is the container for the tables and it is where you configure the replica count and placement. Keyspaces contain tables (formerly called column families) composed of rows and columns. A table schema must be defined at the time of table creation.

The top level structure for DynamoDB is the table which has the same functionality as the Cassandra table. Rows are items, and cells are attributes. In DynamoDB, it’s possible to define a schema for each item, rather than for the whole table.

Both tables store data in sparse rows—for a given row, they store only the columns present in that row. Each table must have a primary key that uniquely identifies rows or items. Every table must have a primary key which has two components: 

  • A partition key that determines the placement of the data by subsetting the table rows into partitions. This key is required.
  • A key that sorts the rows within a partition. In Cassandra, this is called the clustering key while DynamoDB calls it the sort key. This key is optional.

Taken together, the primary key ensures that each row in a table is unique. 

 Differences 

  • DynamoDB limits the number of tables in an AWS region to 256.  If you need more tables, you must contact AWS support. There are no hard limits in Cassandra. The practical limit is around 500 tables.
  • DynamoDB is schemaless. Only the primary key attributes need to be defined at table creation. 
  • DynamoDB charges for read and write throughput and requires you to manage capacity for each table. Read and write throughput and associated costs must be considered when designing tables and applications. 
  • The maximum size of an item in DynamoDB is 400KB. With Cassandra, the hard limit is 2GB; the practical limit is a few megabytes.
  • In DynamoDB, the primary key can have only one attribute as the primary key and one attribute as the sort key. Cassandra allows composite partition keys and multiple clustering columns.
  • Cassandra supports counter, time, timestamp, uuid, and timeuuid data types not found in DynamoDB.

Allocating Table Capacity

Both Cassandra and DynamoDB require capacity planning before setting up a cluster. However, the approaches are different. 

To create a performant Cassandra cluster, you must first make reasonably accurate estimates of your future workloads. Capacity is allocated by creating a good data model, choosing the right hardware, and properly sizing the cluster. Increasing workloads are met by adding nodes.

With DynamoDB, capacity planning is determined by the type of the read/write capacity modes you choose. On demand capacity requires no capacity planning other than setting an upper limit on each table. You pay only for the read and write requests on the table. Capacity is measured in Read Resource Units and Write Resource Units. On demand mode is best when you have an unknown workload, unpredictable application traffic, or you prefer the ease of paying for only what you use.

With provisioned capacity, you must specify the number of reads and write throughput limits for each table at the time of creation. If you exceed these limits for a table or tables, DynamoDB will throttle queries until usage is below defined capacity. Auto-scaling will adjust your table’s provisioned capacity automatically in response to traffic changes although there is a lag between the time throttling starts and increased capacity is applied.   

The throughput limits are provisioned in units called Read Capacity Units (RCU) and Write Capacity Units (WCU); queries are throttled whenever these limits are exceeded. One read capacity unit represents one strongly consistent read per second, or two eventually consistent reads per second, for an item up to 4 KB in size. Transactional read requests require two read capacity units to perform one read per second for items up to 4 KB. If you need to read an item that is larger than 4 KB, DynamoDB must consume additional read capacity units. One write capacity unit represents one write per second for an item up to 1 KB in size. If you need to write an item that is larger than 1 KB, DynamoDB must consume additional write capacity units. Transactional write requests require 2 write capacity units to perform one write per second for items up to 1 KB. For more information, see Managing Settings on DynamoDB Provisioned Capacity Tables.

Provisioned mode is a good option if any of the following are true: 

  • You have predictable application traffic.
  • Application traffic is consistent or ramps gradually.
  • You can forecast capacity requirements to control costs.

Partitions

Both Cassandra and DynamoDB group and distribute data based on the hashed value of the partition key. Both call these grouping partitions but they have very different definitions.

In Dynamo the partition is a storage unit that has a maximum size of 10 GB. When a partition fills, DynamoDB creates a new partition and the user has no control over the process. A partition can contain items with different partition key values. When a sort key is used, all the items with the same partition key value physically close together, ordered by sort key value.

DynamoDB partitions have capacity limits of 3,000 RCU or 1,000 WCU even for on-demand tables. Furthermore, these limits cannot be increased. If you exceed the partition limits, your queries will be throttled even if you have not exceeded the capacity of the table. See Throttling and Hot Keys (below) for more information.

A Cassandra partition is a set of rows that share the same hashed partition key value.  Rows with the same partition key are stored on the same node. Rows within the partition are sorted by the clustering columns. If no clustering column was specified, the partition holds a single row. While it would not be desirable, it would be possible for an application to drive tens of thousands of reads/writes to a single partition. 

See Cassandra Data Partitioning.

Query Language 

Cassandra provides a SQL-like language called Cassandra Query Language (CQL) to access data. DynamoDB uses JSON syntax. The following table shows the syntax for the query “return all information from the Music table for the song title ‘Lullaby of Broadway’ and the artist ‘Tommy Dorsey’”

CQL DynamoDB
Request all information for the song  ‘Lullaby of Broadway‘ played by Tommy Dorsey

SELECT *

FROM Music

WHERE Artist=’Tommy Dorsey’ AND SongTitle = ‘Lullaby of Broadway

get-item {

    TableName: “Music”,

    Key: {

        “Artist”: “Tommy Dorsey”,

        “SongTitle”: “Lullaby of Broadway

    }

} 


Secondary Indexes

By default, Cassandra and DynamoDB queries can use only the primary key columns in the search condition which must include all partition key columns. Non-key columns can be used in a search by creating an index on that column.

Cassandra supports creating an index on most columns including a clustering column of a compound primary key or on the partition key itself. Creating an index on a collection or the key of a collection map is also supported. However, when used incorrectly a secondary index can hurt performance. A general rule of thumb is to index a column with low cardinality of few values and to use only with the partition key in the search clause. Because the index table is stored on each node in a cluster, a query using a secondary index can degrade performance if multiple nodes are accessed. 

DynamoDB has local secondary indexes. This index uses the same partition key as the base table but has a different sort key. Scoped to the base table partition that has the same partition key value. Local secondary indexes must be created at the same time the table is created. A maximum of 5 local secondary indexes may be created per table. 

Materialized Views versus Global Secondary Indexes 

In Cassandra, a Materialized View (MV) is a table built from the results of a query from another table but with a new primary key and new properties. Queries are optimized by the primary key definition. The purpose of a materialized view is to provide multiple queries for a single table. It is an alternative to the standard practice of creating a new table with the same data if a different query is needed. Data in the materialized view is updated automatically by changes to the source table. However, the materialized view is an experimental feature and should not be used in production.

A similar object is DynamoDB is the Global Secondary Index (GSI) which creates an eventually consistent replica of a table. The GSI are created at table creation time and each table has a limit of 20. The GSI must be provisioned for reads and writes; there are storage costs as well.

Time To Live

Time To Live (TTL) is a feature that automatically removes items from your table after a period of time has elapsed.

  • Cassandra specifies TTL as the number of seconds from the time of creating or updating a row, after which the row expires.
  • In DynamoDB, TTL is a timestamp value representing the date and time at which the item expires.
  • DynamoDB applies TTL at item level. Cassandra applies it to the column.

Consistency

Both Cassandra and DynamoDB are distributed data stores.  In a distributed system there is a tradeoff between consistency—every read receives the most recent write or an error, and availability—every request receives a (non-error) response but without the guarantee that it contains the most recent write. In such a system there are two levels possible levels of consistency:

  • Eventual consistency. This implies that all updates reach all replicas eventually. A read with eventual consistency may return stale data until all replicas are reconciled to a consistent state.
  • Strong consistency returns up-to-date data for all prior successful writes but at the cost of slower response time and decreased availability.

DynamoDB supports eventually consistent and strongly consistent reads on a per query basis. The default is eventual consistency. How it is done is hidden from the user.

 Strongly consistent reads in DynamoDB have the following issues: 

  • The data might not be available if there is a network delay or outage.
  • The operation may have higher latency.
  • Strongly consistent reads are not supported on global secondary indexes.
  • Strongly consistent reads use more throughput capacity than eventually consistent reads and therefore is more expensive. See Throttling and Hot Keys (below).

Cassandra offers tunable consistency for any given read or write operation that is configurable by adjusting consistency levels. The consistency level is defined as the minimum number of Cassandra nodes that must acknowledge a read or write operation before the operation can be considered successful. You are able to configure strong consistency for both reads and writes at a tradeoff of increasing latency.

Conflicts Resolution

In a distributed system, there is the possibility that a query may return inconsistent data from the replicas. Both Cassandra and DynamoDB resolve any inconsistencies with a “last write wins” solution but with Cassandra, every time a piece of data is written to the cluster, a timestamp is attached. Then, when Cassandra has to deal with conflicting data, it simply chooses the data with the most recent timestamp.

For DynamoDB, “last write wins” applies only to global tables and strongly consistent reads.

Security Features

Cassandra and DynamoDB provide methods for user authentication and authorization and data access permissions  Both use encryption for client and inter-node communication. DynamoDB also offers encryption at rest. Instaclustr offers encryption at rest for its Cassandra Managed Services.

Performance Issues

Consistency and Read Speed

Choosing strong consistency requires more nodes to respond to a request which increases latency.

Scans

Scans are expensive for both systems. Scans in Cassandra are slow because Cassandra has to scan all nodes in the cluster. Scans are faster in DynamoDB but are expensive because resource use is based on the amount of data read not returned to the client. If the scan exceeds your provisioned read capacity, DynamoDB will generate errors.

DynamoDB’s Issues

Auto Scaling

Amazon DynamoDB auto scaling uses the AWS Application Auto Scaling Service to dynamically adjust provisioned throughput capacity on your behalf, in response to actual traffic patterns. This enables a table or a global secondary index to increase its provisioned read and write capacity to handle sudden increases in traffic, without throttling. When the workload decreases, application auto scaling decreases the throughput so that you don’t pay for unused provisioned capacity.

  • Auto scaling does not work well with varying and bursty workloads. The table will scale up only based on consumption, triggering these alarms time after time until it reaches the desired level.
  • There can be a lag (5-15 minutes) between the time capacity is exceeded and autoscaling takes effect.
  • Capacity decreases are limited. The maximum is 27 per day (A day is defined according to the GMT time zone).
  • Tables scale based on the number of requests made, not the number of successful requests.
  • Autoscaling cannot exceed hard I/O limits for tables.

Throttling and Hot Keys

In DynamoDB, the provisioned capacity of a table is shared evenly across all the partitions in a table with the consequence that each partition has a capacity less than the table capacity. For example, a table that has been provisioned with 400 WCU and 100 RCU and had 4 partitions, each partition would have a write capacity of 100 WCU and 25 RCU.  

So if you had a query send most read/write requests to a single partition—a hot partition— and that throughput exceeded the shared capacity of that partition, your queries would be throttled even though you had unused capacity in the table. If your application creates a hot partition, your only recourse would be either to redesign the data model or to over allocate capacity to the table.

Adaptive capacity can provide up to 5 minutes of grace time by allocating unused capacity from other partitions to the “hot” one provided unused capacity is available and hard limits are not reached. The hard limits on a partition are 3,000 RCU or 1,000 WCU.

Cross-Region Replication

If you want a DynamoDB table to span regions, you’ll need to use global tables that require careful planning and implementation. The tables in all regions must use on-demand allocation or have auto scaling enabled. The tables in each region must be the same: time to live, the number of global, and local secondary indexes, and so on.

Global tables support only eventual consistency that can lead to data inconsistency. In such a conflict, DynamoDB applies a last writer wins (lww) policy. The data with the most recent timestamp is used.

Migrating an existing local table to a global table requires additional steps. First, DynamoDB Streams is enabled on the original local table to create a changelog. Then an AWS Lambda function is configured to copy the corresponding change from the original table to the new global table.

Cost Explosion 

As highlighted in Managed Cassandra Versus DynamoDB, DynamoDB’s pricing model can easily make it expensive for a fast growing company:

  • The pricing model is based on throughput, therefore costs increase as IO increases. 
  • A hot partition may require you to overprovision a table.
  • Small reads and writes are penalized because measured throughput is rounded up to the nearest 1 KB boundary for writes and 4 KB boundary for reads.
  • Writes are four times more expensive than reads.
  • Strongly consistent reads are twice the cost of eventually consistent reads. 
  • Workloads performing scans or queries can be costly because the read capacity units are calculated on the number of bytes read rather than the amount of data returned.
  • Read heavy workloads may require DynamoDB Accelerator (DAX) which is priced per node-hour consumed. A partial node-hour consumed is billed as a full hour.
  • Cross region replication incurs additional charges for the amount of data replicated by the Global Secondary Index and the data read from DynamoDB Streams.
  • DynamoDB does not distinguish between a customer-facing, production table versus tables used for development, testing, and staging.

Summary 

DynamoDB’s advantages are an easy start; absence of the database management burden; sufficient flexibility, availability, and auto scaling; in-built metrics for monitoring; encryption of data at rest.

Cassandra’s main advantages are: fast speed of writes and reads; constant availability; a SQL-like Cassandra Query Language instead of DynamoDB’s complex API; reliable cross data center replication; linear scalability and high performance. 

However, the mere technical details of the two databases shouldn’t be the only aspect to analyze before making a choice.

Cassandra versus DynamoDB costs must be compared carefully. If you have an application that mostly writes or one that relies on strong read consistency, Cassandra may be a better choice.

You will also need to know which technologies you need to supplement the database? If you need the open source like ElasticSearch, Apache Spark, or Apache Kafka, Cassandra is your choice. If you plan to make extensive use of AWS products, then it’s DynamoDB.

If you use a cloud provider other than AWS or run your own data centers, then you would need to choose Cassandra.

The post Apache Cassandra vs DynamoDB appeared first on Instaclustr.

Benchmarking Cassandra with Local Storage on Azure

Continuing our efforts in adding support for the latest generation of Azure Virtual Machines, Instaclustr is pleased to announce support for the D15v2 and L8v2 types with local storage support for Apache Cassandra clusters on the Instaclustr Managed Platform. So let’s begin by introducing the two forms of storage.

Local storage is a non-persistent (i.e. volatile) storage that is internal to the physical Hyper-V host that a VM is running on at any given time. As soon as the VM is moved to another physical host as a result of a deallocate (stop) and start command, hardware or VM crash, or Azure maintenance, the local storage is wiped clean and any data stored on it is lost.

Remote storage is persistent and comes in the form of managed (or unmanaged) disks.  These disks can have different performance and redundancy characteristics. Remote storage is generally used for any VM’s OS disk or data disks that are intended to permanently store important data. Remote storage can be detached from the VM from one physical host to another in order to preserve the data. It is stored on Azure’s storage account which VMs use to attach it via the network. The physical Hyper-V host running the VM is independent of the remote storage account, which means that any Hyper-V host or VM can mount a remotely stored disk on more than one occasion.

Remote storage has the critical advantage of being persistent, while local storage has the advantage of being faster (because it’s local to the VM – on the same physical host) and included in the VM cost. When starting an Azure VM you only pay for its OS and disks. Local storage on a VM is included in the price of the VM and often provides a much cheaper storage alternative to remote storage.

The Azure Node Types

D15_v2s

D15_v2s are the latest addition to Azure’s Dv2 series, featuring more powerful CPU and optimal CPU-to-memory configuration making them suitable for most production workloads. The Dv2-series is about 35% faster than the D-series. D15_v2s are a memory optimized VM size offering a high memory-to-CPU ratio that is great for distributed database servers like Cassandra, medium to large caches, and in-memory data stores such as Redis.

Instaclustr customers can now leverage these benefits with the release of the D15_v2 VMs, which provide 20 virtual CPU cores and 140 GB RAM backed with 1000 GB of local storage.

L8_v2s

L8_v2s are a part of the Lsv2-series featuring high throughput, low latency, and directly mapped local NVMe storage. L8s like the rest of the series were designed to cater for high throughput and high IOPS workloads including big data applications, SQL and NoSQL databases, data warehousing, and large transactional databases. Examples include Cassandra, Elasticsearch and Redis. In general, applications that can benefit from large in-memory databases are a good fit for these VMs.

L8s offer 8 virtual CPU cores and 64 GB in RAM backed by local storage space of 1388 GB. L8s provide roughly equivalent compute capacity (cores and memory) to a D13v2 at about ¾ of the price. This offers speed and efficiency at the cost of forgoing persistent storage.

Benchmarking

Prior to the public release of these new instances on Instaclustr, we conducted Cassandra benchmarking for:

VM Type
CPU Cores RAM Storage Type Disk Type
DS13v2 8 56 GB remote 2046 GiB (SSD)
D15v2 20 140 GB local 1000 GiB (SSD)
L8s w/local 8 64 GB local 1388 GiB (SSD)
L8s w/remote 8 64 GB remote 2046 GiB (SSD)

All tests were conducted using Cassandra 3.11.6

The results are designed to outline the performance of utilising local storage and the benefits it has over remote storage. Testing is split into two groups, fixed and variable testing. Each group runs two different sets of payload:

  1. Small – Smaller more frequent payloads which are quick to execute but are requested in large numbers.
  2. Medium – Larger Payloads which take longer to execute and are potentially capable of slowing Cassandra’s ability to process requests

The fixed testing procedure is:

  1. Insert data to fill disks to ~30% full.
  2. Wait for compactions to complete.
  3. Run a fixed rate of operations
  4. Run a sequence of tests consisting of read, write and mixed read/write operations.
  5. Run the tests and measure the operational latency for a fixed rate of operations. 

The variable testing procedure is:

  1. Insert data to fill disks to ~30% full.
  2. Wait for compactions to complete.
  3. Target a median latency of 10ms.
  4. Run a sequence of tests comprising read, write and mixed read/write operations. These tests were broken down into multiple types.
  5. Run the tests and measure the results including operations per second and median operation latency. 
  6. Quorum consistency for all operations.
  7. We incrementally increase the thread count and re-run the tests until we have hit the target median latency and are under the allowable pending compactions.
  8. Wait for compactions to complete after each test.

As with any generic benchmarking results the performance may vary from the benchmark for different data models or application workloads. However, we have found this to be a reliable test for comparison of relative performance that will match many practical use cases.

Comparison

When comparing performance between instance types with local and remote storage it is important to take note of the latency of the operations. The latency of read operations indicate how long a request takes to retrieve information from the disk and return it back to the client.

In the fixed small and medium read tests, local storage offered better latency results in comparison to instances with remote storage. This is more noticeable in the medium read tests, where the tests had a larger payload and required more interaction with the locally available disks. Both L8s (with local storage) and D15_v2s offered a lower latency to match the operation rate given to the cluster.

When running variable-small-read tests under a certain latency, the operation rate for D15v2 reached nearly 3 times the number of ops/s for D13v2s with remote storage. Likewise L8s with local storage out performed L8s (with remote storage). L8s (with local storage) had twice the performance and half the latency of the L8s (with remote storage). 

variable-read-small L8s w/local L8s w/remote DS13_v2 DS15_v2
Operation Rate 19,974 7,198 23,985 61,489
Latency mean (avg) 22.1 48.3 21.9 19.4
Latency medium 20.1 40.3 19.9 17.7
Latency 95th percentile 43.6 123.5 43.3 39.2
Latency 99th percentile 66.9 148.9 74.3 58.9

In the medium read tests, instances with local storage outperformed instances with remote storage. Both L8s (with local storage) and D15s had a far better ops/s and latency result, even with a significantly higher operation rate. This makes a very convincing argument for local storage over remote storage when seeking optimal performance.

variable-read-medium L8s w/local L8s w/remote DS13_v2 DS15_v2
Operation Rate 235 76 77 368
Latency mean (avg) 4.2 13 12.9 2.7
Latency medium 3.2 8.4 9.5 2.5
Latency 95th percentile 4.9 39.7 39.3 3.4
Latency 99th percentile 23.5 63.4 58.8 4.9

Looking at the writes on the other hand, D15s outperformed due to their large pool of CPU cores. While differences were less obvious in the small tests, results were more obvious in the medium tests. Further investigation will be conducted to determine why this is the case.

Benchmarking Cassandra

Based on the graph for variable medium testing, D15s outperformed in all categories. D15v2s have both a larger number of cores to outpace competition with heavy loads of writes and local storage offering faster disk intensive reads. This was additionally supported by strong performance in the mixed medium testing results.

L8s with local storage took second place, performing better than DS13v2s in the read and mixed tests. Whilst DS13v2 nodes slightly edged out L8s in writes for larger payloads. The mixed results showed a substantial difference in performance between the two with L8s taking a lead thanks to local storage providing faster disk intensive reads. 

Conclusion

Based on the results from this comparison, we find that local storage offers amazing performance results for disk intensive operations such as reads. D15v2 nodes, with their large number of cores to perform CPU intensive writes and local storage to help with disk intensive reads, offer top tier performance for any production environment.

Furthermore, L8s with local storage offer a great cost efficient solution at around ¾ of the price of D13v2s and offer a better price-performance gain notably in read operations. This is especially beneficial for a production environment which prioritizes reads over writes.

In order to assist customers in upgrading their Cassandra clusters from currently used VM types to D15v2s or L8 VM type nodes, Instaclustr technical operations team has built several tried and tested node replacement strategies to provide zero-downtime, non-disruptive migrations for our customers. Reach out to our support team if you are interested in these new instance types for your Cassandra cluster.

You can access pricing details through the Instaclustr Console when you log in, or contact our Support team.

The post Benchmarking Cassandra with Local Storage on Azure appeared first on Instaclustr.

Cassandra and Kubernetes: SIG Update and Survey

Five operators for Apache Cassandra have been created that have made it easier to run containerized Cassandra on Kubernetes. Recently the major contributors to these operators came together to discuss the creation of a community-based operator with the intent of making one that makes it easy to run C* on K8s. One of the project’s organizational goals is that the end result will eventually become part of the Apache Software Foundation or the Apache Cassandra project.

The community created a special interest group (SIG) to set goals for what the operator should do at different levels to find a path for creating a standard community-based operator. The Operator Framework suggests five maturity levels for operator capabilities starting from basic installation to auto-pilot.

Operator Capability Maturity Levels

(Source: OperatorFramework.io)

The five Cassandra Kubernetes operators all come from different backgrounds, so the first major goal is to develop a common understanding as to what an operator needs to do and at which level. This first step involves collaborating on a Custom Resource Definition (CRD) that will set the syntax / schema which will be used to create Cassandra clusters on Kubernetes. Once this is done, a software extension can be developed in a variety of languages including Go, Java, or using the Operator SDK in Helm or Ansible without making changes to Kubernetes.

We’re not starting from zero, as the creators of the five operators are actively participating in the SIG. Hopefully much of the decided upon CRD will have code fragments that can be leveraged from the other projects. The major operators out publicly today are those by Sky UK, Orange Telecom, Instaclustr, Elassandra, and DataStax (list sourced from the awesome-cassandra project):

  • CassKop - Cassandra Kubernetes Operator - This Kubernetes operator by Orange automates Cassandra operations such as deploying a new rack aware cluster, adding/removing nodes, configuring the C and JVM parameters, upgrading JVM and C versions. Written in Go. This one was also one of the first ones out and is the only one that can support multiple Kubernetes clusters using Multi-CassKop
  • Cassandra Operator - A Kubernetes operator by SkyUK that manages Cassandra clusters inside Kubernetes. Well designed and organized. This was among the first operators to be released.
  • Instaclustr - Kubernetes Operator for Cassandra operator - The Cassandra operator by Instaclustr manages Cassandra clusters deployed to Kubernetes and automates tasks related to operating an Cassandra cluster.
  • Cass Operator - DataStax’s Kubernetes Operator supports Apache Cassandra as well as DSE containers on Kubernetes. Cassandra configuration is managed directly in the CRD, and Cassandra nodes are managed via a RESTful management API.
  • Elassandra Operator - The Elassandra Kubernetes Operator automates the deployment and management of Elassandra clusters deployed in multiple Kubernetes clusters.

If you’re interested in catching up on what the SIG has been talking about, you can watch the YouTube videos of the sessions and read up on the working documents:

As with any Kubernetes operator, the goal is to create a robot which takes the manual work of setting up complex configurations of containers in Kubernetes easier. An operator can also be seen as a translator between the logical concepts of the software and the concrete Kubernetes resources such as nodes, pods, services. Combined with controllers, operators can abstract out operations such that the human operators can focus on problems related to their industry or domain. As mentioned above, the different operator capability levels offer a roadmap to creating a robust operator for Cassandra users that is easy to use, set up, maintain and upgrade, and expand a cluster.

When a platform needs Cassandra, it’s probably exhausted the other potential datastores available because it needs high availability and fault tolerance, at high speeds, around the world. Kubernetes is a technology that can match well with Cassandra’s capabilities because it shares the features of being linearly scalable, vendor neutral, and cloud agnostic. There is a healthy debate about whether Cassandra belongs in Kubernetes — and whether databases belong in Kubernetes at all — because other orchestration tools are good enough, though the growing user base of Kubernetes in hobby and commercial realms suggests that we need to provide an operator that can keep up with the demand.

Most likely if someone is thinking about moving Cassandra workloads from public cloud, on-premises VMs, or even on-premises bare metal servers to either a public or private cloud hosted K8s, they’ll want to evaluate whether or not the existing architecture could run and be performant.

As part of the SIG, we’re also coming up with reference architectures on which to test the operator. Here are some of the common and most basic reference architectures that are likely candidates.

  • Single Workload in Single Region
    • 1 DCs in 1 region, with 3 nodes (3 total)
    • DC expands to 6 (6 total)
    • DC contracts to 3 ( 3 total)

Single Workload / Datacenter in a Single Region

  • Multi-Workload in Single Region
    • 2 DCs, both in the same region, with 3 nodes in each DC (6 total)
    • Both DCs expand to 6 each (12 total)
    • Both DCs contract to 3 each ( 6 total)
    • Add a third DC in the same region with 3 nodes (9 nodes)
    • Remove third DC

Multiple Workloads / Datacenters in a Single Region

  • Single Workload in Multi-Regions
    • 2 DCs, 1 in each region, with 3 nodes in each DC (6 total)
    • Both DCs expand to 6 each (12 total)
    • Both DCs contract to 3 each ( 6 total)
    • Add a third DC in a 3rd region with 3 nodes (9 total)
    • Remove third DC

Although each organization is different, these scenarios or combinations of these scenarios account for 80% of most pure Apache Cassandra use cases. The SIG would love to know more about Cassandra users’ use cases for Kubernetes. Please take this short survey, which will remain open through September 17, 2020.

Join the biweekly meetings to stay informed.

Apache Cassandra 4.0 Beta Released

The beta release of Apache Cassandra 4.0 is finally here, it’s been two years in the making. We’ve had a preview release available to customers since March for testing. A wide range of improvements have been made.

Stability

The explicit goal of this release has been to be “the most stable major release ever” to accelerate adoption within the release cycle, which I blogged about in January.  For this release a series of new testing frameworks were implemented focusing on stability, and performance, which have paid off handsomely. The feeling of the team at Instaclustr is that we have never been more confident about a release (and we’ve seen quite a few!)

Performance

This release integrates the async event-driven networking code from Netty for communication between nodes. I blogged about Netty in February but it’s worth reiterating what’s been achieved with this upgrade. It has enabled Cassandra 4.0 to have a single thread pool for all connections to other nodes instead of maintaining N threads per peer which was cramping performance by causing lots of context switching. It has also facilitated zero copy streaming for SStables which now goes x5 times faster than before.

This complete overhaul of the networking infrastructure has delivered some serious gains.

  • Tail end latency has been reduced by 40%+ in P99s in initial testing
  • Node recovery time has been vastly reduced
  • Scaling large clusters is easier and faster

Auditing and Observability

Cassandra 4.0 introduces a powerful new set of enterprise class audit capabilities that I covered here in March. These help Cassandra operators meet their SOX and PCI requirements with a robust high level interface. Audit logging saves to the node, outside of the database, with configurable log rollover. Audit logs can be configured to attend to particular keyspaces, commands, or users and they can be inspected with the auditlogviewer utility.

Full Query Logging is also supported and the fqltool allows inspection of these logs.

Virtual Tables

Virtual tables, which I covered here in February, enable a series of metrics to be pulled from a node via CQL from read-only tables. This is a more elegant mechanism than JMX access as it avoids the additional configuration required. JMX access is not going anywhere soon, but this presents a really solid improvement to a number of metric monitoring tasks.

Community

Our community is our most powerful feature in all our releases and I can’t think of a better validation of open source under the Apache Foundation community model than this release. I just want to take this opportunity to congratulate and thank everyone in the community who have taken both Cassandra and its release processes to the next level with this beta release. 

As always, you can spin up a free trial of Cassandra on our platform. Even with the performance gains delivered in this release our popular Cassandra Data Modeling Guide to Best Practices is always worth a read to get the most out of Cassandra.

The post Apache Cassandra 4.0 Beta Released appeared first on Instaclustr.

Introducing Apache Cassandra 4.0 Beta: Battle Tested From Day One

This is the most stable Apache Cassandra in history; you should start using Apache Cassandra 4.0 Beta today in your test and QA environments, head to the downloads site to get your hands on it. The Cassandra community is on a mission to deliver a 4.0 GA release that is ready to be deployed to production. You can guarantee this holds true by running your application workloads against the Beta release and contributing to the community’s validation effort to get Cassandra 4.0 to GA.

With over 1,000 bug fixes, improvements and new features and the project’s wholehearted focus on quality with replay, fuzz, property-based, fault-injection, and performance tests on clusters as large as 1,000 nodes and with hundreds of real world use cases and schemas tested, Cassandra 4.0 redefines what users should expect from any open or closed source database. With software, hardware, and QA testing donations from the likes of Instaclustr, iland, Amazon, and Datastax, this release has seen an unprecedented cross-industry collaboration towards releasing a battle-tested database with enterprise security features and an understanding of what it takes to deliver scale in the cloud.

There will be no new features or breaking API changes in future Beta or GA builds. You can expect the time you put into the beta to translate into transitioning your production workloads to 4.0 in the near future.

Quality in distributed infrastructure software takes time and this release is no exception. Open source projects are only as strong as the community of people that build and use them, so your feedback is a critical part of making this the best release in project history; share your thoughts on the user or dev mailing lists or in the #cassandra ASF slack channel.

Redefining the elasticity you should expect from your distributed systems with Zero Copy Streaming

5x faster scaling operations

Cassandra streams data between nodes during scaling operations such as adding a new node or datacenter during peak traffic times. Thanks to the new Zero Copy Streaming functionality in 4.0, this critical operation is now up to 5x faster without vnodes compared to previous versions, which means a more elastic architecture particularly in cloud and Kubernetes environments.

Globally distributed systems have unique consistency caveats and Cassandra keeps the data replicas in sync through a process called repair. Many of the fundamentals of the algorithm for incremental repair were rewritten to harden and optimize incremental repair for a faster and less resource intensive operation to maintain consistency across data replicas.

Giving you visibility and control over what’s happening in your cluster with real time Audit Logging and Traffic Replay

Enterprise-grade security & observability

To ensure regulatory and security compliance with SOX, PCI or GDPR, it’s critical to understand who is accessing data and when they are accessing it. Cassandra 4.0 delivers a long awaited audit logging feature for operators to track the DML, DDL, and DCL activity with minimal impact to normal workload performance. Built on the same underlying implementation, there is also a new fqltool that allows the capture and replay of production workloads for analysis.

There are new controls to enable use cases that require data access on a per data center basis. For example, if you have a data center in the United States and a data center in Europe, you can now configure a Cassandra role to only have access to a single data center using the new CassandraNetworkAuthorizer.

For years, the primary way to observe Cassandra clusters has been through JMX and open source tools such as Instaclustr’s Cassandra Exporter and DataStax’s Metrics Collector. In this most recent version of Cassandra you can selectively expose system metrics or configuration settings via Virtual Tables that are consumed like any other Cassandra table. This delivers flexibility for operators to ensure that they have the signals in place to keep their deployments healthy.

Looking to the future with Java 11 support and ZGC

One of the most exciting features of Java 11 is the new Z Garbage Collector (ZGC) that aims to reduce GC pause times to a max of a few milliseconds with no latency degradation as heap sizes increase. This feature is still experimental and thorough testing should be performed before deploying to production. These improvements significantly improve the node availability profiles from garbage collection on a cluster which is why this feature has been included as experimental in the Cassandra 4.0 release.

Part of a vibrant and healthy ecosystem

The third-party ecosystem has their eyes on this release and a number of utilities have already added support for Cassandra 4.0. These include the client driver libraries, Spring Boot and Spring Data, Quarkus, the DataStax Kafka Connector and Bulk Loader, The Last Pickle’s Cassandra Reaper tool for managing repairs, Medusa for handling backup and restore, the Spark Cassandra Connector, The Definitive Guide for Apache Cassandra, and the list goes on.

Get started today

There’s no doubt that open source drives innovation and the Cassandra 4.0 Beta exemplifies the value in a community of contributors that run Cassandra in some of the largest deployments in the world.

To put it in perspective, if you use a website or a smartphone today, you’re probably touching a Cassandra-backed system.

To download the Beta, head to the Apache Cassandra downloads site.

Resources:

Apache Cassandra Blog: Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

The Last Pickle Blog: Incremental Repair Improvements in Cassandra 4

Apache Cassandra Blog: Audit Logging in Apache Cassandra 4.0

The Last Pickle Blog: Cassandra 4.0 Data Center Security Enhancements

The Last Pickle Blog: Virtual tables are coming in Cassandra 4.0

The Last Pickle Blog: Java 11 Support in Apache Cassandra 4.0

Apache Cassandra Infographic

Cassandra Data Modeling Best Practices Guide

Apache Cassandra is an open source non-relational, or NoSQL, distributed database that enables continuous availability, tremendous scale, and data distribution across multiple data centers and cloud availability zones. Simply put, it provides a highly reliable data storage engine for applications requiring immense scale.

Data modeling is a process used to analyze, organize, and understand the data requirements for a product or service. Data modeling creates the structure your data will live in. It defines how things are labeled and organized, and determines how your data can and will be used. The process of data modeling is similar to designing a house. You start with a conceptual model and add detail to produce the final blueprint. 

The ultimate goal of Cassandra data modeling and analysis is to develop a complete, well organized, and high performance Cassandra cluster. Following the five Cassandra data modeling best practices outlined will hopefully help you meet that goal:

  1. Cassandra is not a relational database, don’t try to model it like one
  2. Design your model to meet 3 fundamental goals for data distribution
  3. Understand the importance of the Primary Key in the overall data structure 
  4. Model around your queries but don’t forget about your data
  5. Follow a six step structured approach to building your model. 

Cassandra Is Not a Relational Database

Do not try to design a Cassandra data model like you would with a relational database. 

Query first design: You must define how you plan to access the data tables at the beginning of the data modeling process not towards the end. 

No joins or derived tables: Tables cannot be joined so if you need data from more than one table, the tables must be merged into a denormalized table.   

Denormalization: Cassandra does not support joins or derived tables so denormalization is a key practice in Cassandra table design.

Designing for optimal storage: For relational databases this is usually transparent to the designer. With Cassandra, an important goal of the design is to optimize how data is distributed around the cluster. 

Sorting is a Design Decision: In Cassandra, sorting can be done only on the clustering columns specified in the PRIMARY KEY.

The Fundamental Goals of the Cassandra Data Model

Distributed data systems, such as Cassandra, distribute incoming data into chunks called partitions.  Cassandra groups data into distinct partitions by hashing a data attribute called partition key and distributes these partitions among the nodes in the cluster. 

(A detailed explanation can be found in Cassandra Data Partitioning.)

A good Cassandra data model is one that: 

  1. Distributes data evenly across the nodes in the cluster
  2. Place limits on the size of a partition
  3. Minimizes the number of partitions returned by a query.

Distributes Data Evenly Around the Cassandra Cluster

Choose a partition key that has a high cardinality to avoid hot spots—a situation where one or a few nodes are under heavy load while others are idle.   

Limits the Size of Partitions

For performance reasons choose partition keys whose number of possible values is bounded. For optimal performance, keep the size of a partition between 10 and 100MB. 

Minimize the Number of Partitions Read by a Single Query 

Ideally, each of your queries will read a single partition. Reading many partitions at one time is expensive because each partition may reside on a different node. The coordinator (this is the node in the cluster that first receives the request) will generally need to issue separate commands to separate nodes for each partition you request. This adds overhead and increases the variation in latency. Unless the data set is small, attempting to read an entire table, that is all the partitions, fails due to a read timeout. 

Understand the Importance of the Primary Key

Every table in Cassandra must have a  set of columns called the primary key. (In older versions of Cassandra, tables were called column families). In addition to determining the uniqueness of a row, the primary key also shapes the data structure of a table. The Cassandra primary key has two parts:

Partition key: The first column or set of columns in the primary key. This is required. The hashed value of the partition key value determines where the partition will reside within the cluster.

Clustering key (aka clustering columns): Are the columns after the partition key. The clustering key is optional. The clustering key determines the default sort order of rows within a partition.  

A very important part of the design process is to make sure a partition key will: 

  1. Distribute data evenly across all nodes in a cluster.  Avoid using keys that have a very small domain of possible values, such as gender, status, school grades, and the like.  The minimum number of possible values should always be greater  than the number of nodes in the cluster.  Also, avoid using keys where the distribution of possible values is highly skewed. Using such a key will create “hotspots” on the cluster. 
  2. Have a bounded range of values. Large partitions can increase read latency and cause stress on a node during a background process called compaction. Try to keep the size of partitions under 100MB. 

Model Around Your Queries

The Cassandra Query Language (CQL) is the primary language used to communicate with a Cassandra database. In syntax and function, CQL resembles SQL which makes it easy for those who know the latter to quickly learn how to write queries for Cassandra. But there are some important differences that affect your design choices. 

A well known one is that Cassandra does not support joins or derived tables. Whenever you require data from two or more tables, you must denormalize. 

Search conditions have restrictions that also impact the design. 

  • Only primary key columns can be used as query predicates. (Note: a predicate is an operation on expressions that evaluates to TRUE, FALSE).
  • Partition key columns are limited to equality searches. Range searches can only be done on clustering columns.
  • If there are multiple partition key columns (i.e. a composite partition key), all partition columns must be included in the search condition.
  • Not all clustering columns need to be included in the search condition. But there are some restrictions: 
    • When omiting columns you must start with the rightmost column listed in the primary key definition;  
    • An equality search cannot follow a range search.

Don’t Forget About the Data

Creating a complete Cassandra data model involves more than knowing your queries. You can identify all the queries correctly but if you miss some data, your model will not be complete.  Attempting to refactor a mature Cassandra data can be an arduous task. 

Developing a good conceptual model (see below) will help identify the data your application needs. 

Take a Structured Approach to Design 

In order to create a data model that is complete and high performing, it helps to follow a big data modeling methodology for Apache Cassandra that can be summarized as: 

  1. Data Discovery (DD). This is a high level view of the data your application needs and identifies the entities (things), the attributes of the entities, and which attributes are the identifiers. This may be an iterative process as development. 
  2. Identify the Access Patterns (AP).  Identify and list the queries your application will want to perform.  You need to answer: What data needs to be retrieved together, what are the search criteria, and what are the update patterns? This also may be an iterative process. 
  3. Map data and queries (MDQ).  Maps the queries to the data identified in steps 1 and 2 to create logical tables which are high level representations of Cassandra tables.
  4. Create the physical tables (PT).  Convert the logical data model to a physical data model (PDM) by using CQL CREATE TABLE statements. 
  5. Review and Refine physical data model.  Confirm that the physical tables will meet the 3 Basic Goals for Cassandra Data Model.

Structured approach to cassandra data model design

A more detail examination of these steps can be found in an earlier Instaclustr Whitepaper: 6 Step Guide to Apache Cassandra Data Modelling

If you have worked with relational database design, some steps will be familiar because they are also in the entity-relationship (ER) model.  At the conceptual stage, it can be useful to visually represent the data model by ER diagrams using either the Chen or Information Engineering (IE) notation. The Chebotko diagram uses a notation developed by Artem Chebotko to represent data and queries at the logical and physical modeling stages. 

Cassandra Model Example

Let’s assume that we have a simple logging system with two entities: LogSource and LogMessage.  For LogSource the key attribute is sourceName.  For the entity LogMessage, the key attribute is messageID.  

The query we want to execute is:  Q1) show the message information about the 10 most recent messages for a given source. 

The primary access entity is LogSource because it contains the equality search attribute (sourceName).  We create a logical table named LogMessage_by_Source and push the attribute sourceName into it. That becomes the partition key (indicated by the K).

We need to sort by time so messageTime becomes the clustering column in  LogMessage_by_Source.  (Indicated by C↑) 

The secondary entity is LogMessage. The key attribute messageID becomes a 2nd clustering column of the primary key in  LogMessage_By_Source to ensure uniqueness of the row.  Finally, we add the remaining columns from the secondary source to complete the data needed by the query. 

An example of Cassandra data model

Data Duplication 

Data duplication refers to the number of times data must be duplicated in different tables to satisfy access patterns.   For example, if  we wanted to search for a specific message by its  unique identifier we would duplicate the data by creating a new table called LogMessage_by_ID that uses  messageID as the partition key.

Two issues can arise from duplication: 

  • Increased complexity to maintain  data integrity across multiple tables; 
  • If the data being duplicated is very large it puts size and write pressure on the database.

In a case where data duplication would cause more problems than it solves, an alternative is to duplicate only lookup keys, that is a lookup table. However, this solution requires the client perform a second query to read the secondary data. The trade-off between read performance and data maintenance cost needs to be judged in the context of the specific performance requirements of your application and such a solution would need to be benchmarked to ensure that it is a workable solution.

Materialized Views

These are objects created by a query which are copies of a base table but with a different partition key. The data between the materialized view and the base table is automatically synchronized by Cassandra. Their purpose was to make modeling to new query patterns easier and more flexible.  

Instaclustr’s advice is not to use them in Cassandra 3.x because of problems in keeping the view and the base table synchronized. The Apache Cassandra project has classified Materialized Views as an experimental feature for Cassandra 3.x. 

Summary

Cassandra Data modeling is a process used to define and analyze data requirements and access patterns on the data needed to support a business process. 

A data model helps define the problem, enabling you to consider different approaches and choose the best one.  It ensures that all necessary data is captured and stored efficiently. 

Models document important concepts and jargon, proving a basis for long-term maintenance.

Creating a Cassandra is a non-relational database.  Do not design it as you would a relational database. Don’t be afraid to denormalize data. Writes in Cassandra are relatively cheaper than for relational databases.

 The goals of a successful Cassandra Data Model are to choose a partition key that (1)  distributes data evenly across the nodes in the cluster; (2) minimizes the number of partitions read by one query, and (3) bounds the size of a partition.

Take a structured approach to your model. Your first steps are understanding your and identifying access patterns on the data. These are most critical to developing a complete model.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post Cassandra Data Modeling Best Practices Guide appeared first on Instaclustr.

Cassandra Monitoring: A Best Practice Guide

Introduction to Cassandra Monitoring

Apache Cassandra is a NoSQL database designed to provide scalability, reliability, and availability with linear performance scaling. Cassandra database is designed as a distributed system, and aims to handle big data efficiently. Refer to what-is-apache-cassandra and cassandra-architecture for more information. Note that knowledge of Cassandra architecture and basic terminology is a prerequisite to understanding Cassandra monitoring. 

Cassandra monitoring is an essential area of database operations to ensure the good health of a cluster and optimal performance. Alerting is another crucial area for production systems, and it is complementary to monitoring. Good alerting in Cassandra can be achieved by utilization of the monitoring infrastructure and relevant toolset. Alerting and monitoring help create a robust environment for any Cassandra deployment.

This blog post aims to touch all important aspects of Cassandra monitoring. We hope it provides the reader with crucial information about monitoring tools, components, and metrics.

Monitoring Terminologies

JVM Based Monitoring

Cassandra is developed in Java and is a JVM based system. Each Cassandra node runs a single Cassandra process. JVM based systems are enabled with JMX (Java Management Extensions) for monitoring and management. Cassandra exposes various metrics using MBeans which can be accessed through JMX. Cassandra monitoring tools are configured to scrape the metrics through JMX and then filter, aggregate, and render the metrics in the desired format. There are a few performance limitations in the JMX monitoring method, which are referred to later. 

The metrics management in Cassandra is performed using Dropwizard library. The metrics are collected per node in Cassandra. However, those can be aggregated by the monitoring system. 

Cassandra Monitoring

Metrics 

There are a large number of metrics exposed by Cassandra to cover all possible areas including performance, resources, communication, node, and cluster state etc. The metrics are defined with distinct types, and those can be categorized as well for operational ease.    

Metrics Types

Cassandra metrics are defined with specific data types. These types are designed to accommodate metrics representations to represent the metrics like latency, counts, and others correctly. 

The metrics types are not intuitive and you might need some time to get familiar. 

  • Gauge: A single value representing a metric at a specific point in time, e.g. value of memory allocated or a number of active tasks. 
  • Counter: Counters are the same as a gauge but are used for value comparisons. Generally, a counter is only incremented, and it is reset when the functionality gets disrupted like a node restart. An example is cache_hit count.
  • Histogram: Histogram is a count of data elements from a data stream grouped in fixed intervals. A histogram gives a statistical distribution of values. The data elements are provided over min, max, mean, median, 75th, 90th, 95th, 98th, 99th, 99.9th percentile value intervals. 
  • Timer: Timer keeps the rate of execution and histogram of duration for a metric. 
  • Latency: This is a special type to measure latency. It includes Timer and the latency is in microseconds. There is also a TotalLatency with each latency metric. The total latency is the count of latency since the beginning. The beginning means the start of a node. 
  • Meter: Meter is a unit to measure throughput. It also includes a weighted moving average for first, fifth, and fifteenth minute.

Metrics Categories

The metrics are categorised based on Cassandra domains, e.g. table, keyspace, storage, communication, JVM etc. Not all metrics should be monitored all the time, but those should be available in case required, i.e. during troubleshooting or application performance testing. 

The metrics are further subdivided in terms of broader areas like resources, network, internals, crucial data elements etc. Metrics can be represented as per topology levels like cluster level, node level, table level etc. to organize all the information. 

The categorization becomes clear as we go through specific metrics and correlate those with specific Cassandra areas.

Metrics Format

The Cassandra dropwizard metrics are specified in format below:

Dropwizard Metric Name: org.apache.cassandra.metrics.<Metric scope>.<Metric type>.<MetricName>

Mbean: org.apache.cassandra.metrics:type=<Metric type> scope=<Metric scope> name=<MetricName>

Metric Type: This is the category of metrics e.g. table, keyspace, threadpool. Do not confuse this with the data type of metrics.

Metric scope: This is the metric sub type for more granularity wherever required. The scope is hence optional. E.g. the table name or keyspace name. 

Metric name: The final metric name like LiveSSTableCount. 

Essential Metrics

Cassandra Metrics

Node Status 

The status of nodes must be monitored and alerted immediately if a node is down. Cassandra cluster’s availability directly depends on the uptime of all the nodes in the cluster. Although the anti-entropy mechanism in Cassandra helps protect data from inconsistency, there is no replacement for lost performance during a node downtime. A down node puts pressure on other nodes in the data center to handle requests and store hints. Hence, downtime for a node should be minimum. 

Cassandra operational activity requires node restart or downtime but those can be scheduled at least busy times for the cluster. This alert helps keep track of any service disruption and the need to run repair a node. A node should be repaired if it is out of the cluster for more than the hinted handoff window which is three hours by default. 

Client Request Metrics

The client requests metrics provide information about client communication in forms of read and write requests per second between the client and a coordinator node. Other than normal read and write requests, there are special types of read and write operations CAS, View, and RangeSlice which have their own set of metrics. These metrics help to track the request count, latency, failures, and a few other statistics. The basic statistic to monitor is the number of requests per seconds, i.e. throughput and request latency.

Requests Per Second

The number of requests should be aggregated per data center and per node. There could be some nodes receiving more requests as compared to other nodes. This behaviour creates extra pressure for the nodes receiving more requests. The specific requests like CAS and RangeSlice should be tracked separately for clarity. These operations are resource-intensive and have a unique effect on the nodes. The ratio of read requests to write requests is crucial to understand the type of workload. There are specific configurations to optimize a read-heavy or a write-heavy workload. 

Each cluster can handle a certain amount of client requests per second efficiently. If the number of requests exceeds the cluster capacity, it can result in undesirable results like dropped messages, inconsistency, increased latency etc. The CAS and RangeSlice request can cause increased latency. 

Uneven load on a few nodes can be handled with optimal load balancing at the driver side. The read and write latency or throughput issues caused by constant overloading should be addressed by adding more nodes to the data center and revisiting the data model if required.

Alerting: Set alerts on the number of requests threshold served per node and data center. 

Client Request Latency

Latency tracked by these metrics is the read and write latency experienced by client applications. There are various percentiles of latency, as mentioned in latency metric type. These metric types should be tracked separately as well as overall values so that there is a clear view of system performance metrics. Production systems generally have latency SLAs. The SLA on a specific or overall latency should be tracked and alerted upon the client latency.

There are various factors which affect latency including, the amount of load served by a node or cluster, system resources and tuning, GC settings and behaviour, type of requests. Troubleshooting latency issues mainly depends on the accurate investigation of the root cause. Correlating latency metrics with other metrics helps to track down root causes. Using a graph solution like Grafana for visualization is the most efficient way to sight and track issues.

Alerting: Set alerts for latency SLA thresholds if any or expected latency range.

Request Timeout and Failure 

These metrics are the number of client requests timed out or failed. Failed requests are a clear indication of errors, and those should be addressed immediately. The common causes for request failure are unavailability of data, failure to get a response from the required number of replicas, data inconsistency, and network error. Troubleshooting for error is performed using the error messages and other metrics correlation. 

Alerting: Set alerts for more than a few failure requests on production systems.

Compaction Statistics 

This group of metrics include the amount of data compacted, the number of active/completed compactions, and other relevant details. Compactions consume node resources and could consume the disk space quickly. Monitoring compactions provides a good insight into the compaction strategy used as each strategy has a unique operational footprint. Specific Cassandra operations like repairs, high volume data writes, add/remove/replace nodes etc. increase the compaction activity. It is important to monitor the compactions while performing such operations. 

A common troubleshooting method for high compaction activities and high resource consumption is to throttle the compaction rate. In some scenarios, compactions can be temporarily stopped, but it requires a lot of caution and must be re-enabled at some point to keep the SSTable count low, and read latency optimal.

Alerting: Alerting is not essential for these metrics. However, alerts can be set if there are a higher number of pending compactions sustained for longer than expected time interval.

Garbage Collector Metrics

The Garbage Collector (GC) is yet another crucial area for monitoring. The efficiency of Cassandra throughput and performance depends on the effective use of JVM resources and streamlined GC. The GC behaviour mainly depends on these factors—the garbage collector used, the workload served by Cassandra nodes, GC parameter settings, the heap size for JVM etc. A common issue with garbage collection is long GC pause or the time taken to perform garbage collection. 

The GC works well with the default settings by Cassandra, but those can be tuned if required to suit a specific workload and the number of resources. GC parameter tuning is a non-trivial task and requires knowledge of GC internals. However, sometimes the GC can be resolved by fixing the data model, changing the workload, or JVM resources. It is essential to correlate bad GC behaviour with the exact root cause before performing a remedy. Also, any change in parameters impacting GC should be monitored carefully to ensure improvements. 

Alerting: Set alert on GC pauses for more than acceptable thresholds on production systems.

Memory Metrics

The memory metrics provide JVM heap, non-heap, and total memory used by Cassandra. The JVM heap storage is used heavily for a variety of purposes by Cassandra. The non-heap memory is also used a lot by later versions of Cassandra. Monitoring the heap and overall memory gives insight into memory usage. It can be used to correlate with any issues and determine memory requirements. 

Please note, Cassandra cannot scale with an indefinite amount of memory. This boils down to the fact that JVM and GC cannot perform optimally for large heap size. The most common range of heap size for Cassandra is 8GB-32GB where the smaller size is configured with CMS GC and the larger size with G1GC.

Alerting: Set alerts to test specific memory thresholds and tuning.  

Threadpool Metrics

Cassandra works with numerous thread pools internally. This design is aimed to achieve asynchronous tasks, and it also helps to handle back pressure. Monitoring for the thread pools makes it easy to understand the internal system behaviour. It also helps to understand  specific pools under pressure with active, pending, and blocked tasks. 

The solution for constantly saturated pools generally is to provide more processing capacity to the node or the cluster. Other core issues like poor data model and query pattern also impact on the thread pools. 

Alerting: Set alerts for more than a few blocked tasks on the production system. This helps take preventive action to help avoid performance impact.

Table Metrics 

Table metrics are useful in tracking each table independently. These can be used to monitor a specific set of tables which are performance-critical or host a large volume of data. There are various metrics for each table but some of the most important are discussed here:  

Partition Size

The partition size is a crucial factor in ensuring optimal performance. Cassandra uses partitions of data as a unit of data storage, retrieval, and replication. Hence, if the partition size is larger it impacts overall performance. The ideal range of partition size is less than 10MB with an upper limit of 100MB. These values are derived from operational experience from the Cassandra community. 

The data model and table definition control the partition size. The partition key for a table determines the data to create partitions. A partition key should be designed to accumulate data only up to acceptable size limits. Unfortunately, it is not easy to replace current partitions for a table. But, if the data model is in the design phase, it is crucial to test all the table definitions for potential large partitions sizes. In the existing tables, if large partitions are a major issue, they can be addressed by complete data rewrite. This operation could be long-running, but it can solve many performance issues, and if configured correctly, it can be performed without minimal or no downtime for the table. 

Alerting: Configure alerts on large partitions for tables with unbounded partitions. An unbounded partition is where the partition grows in size with new data insertion and does not have an upper bound.

Tombstone Scanned

Tombstones are the deletion markers in Cassandra. Tombstones are produced by data deletion, and it could be performed using various means like delete queries, TTL expiry, null inserts etc. The immutable design of SSTables and compaction operations makes tombstone eviction difficult in some scenarios. Tombstone presence directly impacts read performance; its effect increases with the number of tombstones scanned per operation. This metric provides a histogram of tombstones read for a table’s queries in recent time. 

The troubleshooting for tombstone eviction can be performed using various options like revisiting the compaction strategy, major compaction, nodetool garbagecollect etc. Note that all the mentioned remedies for tombstone eviction could operate on a large set of SSTables and are non-trivial operations. The operations must be well tested before executing on production. 

Alerting: Set alerts for tombstones-scanned per read metrics for performance-sensitive tables. 

SSTable Per Read

These metrics are related to the immutable design of SSTables and read operation. The SSTables are created per table, and the data is arranged sequentially in the order it is written. This results in multiple SSTable reads to complete a single read operation. The number of SSTables read contributes to the time consumed to complete the read operation. Hence, the number of SSTables per read should be minimized. 

A good number of SSTables per read is a relative value and depends on the data volume and compaction strategy. However, as a general rule, those should be less than 10. The compaction strategy used for a table plays a crucial role in this metric. A table should be configured with optimum compaction strategy as per the table usage. Repair operation plays a role in keeping the SSTables consistent and hence also indirectly impacts this metric. All the data in Cassandra should ideally be repaired once per gc_grace_seconds cycle. 

Alerting: Set alerts for all the read performance-sensitive and high data volume tables for SSTables per read. 

Additional Metrics

It is difficult to cover all the metrics present in Cassandra in this blog post, and it is also difficult to predict the most useful ones in general. I have tried to cover the most used metrics individually. But there are still some crucial metrics which are useful for getting insight in specific Cassandra areas. Let’s look at those briefly:

Dropped Messages

Cassandra handles many forms of messages corresponding to various functions. These messages can get dropped mostly due to load or communication error etc. The dropping of messages causes data inconsistency between nodes, and if those are frequent, it can cause performance issues. It is necessary to identify the cause of dropped messages. If those occur frequently or if those are in large numbers, the system resources and data model should be revisited. Alerts should be set for an unexpected occurrence or number of dropped messages. 

Caches For Tables

Cassandra uses quite some cache, and those are configurable. The cache metrics are useful to track effective use of a particular cache. A good example is the use of row cache for frequently accessed rows in a table. If caching hot data in row cache improves the cache hits, it is a successful use of the row cache. 

Data Streaming

Streaming is used while booting up new nodes, repair operations, and during some other cluster operations. Streaming operations can move many data across a cluster and hence consume network bandwidth. The streaming metrics are useful for monitoring node activities and repairs when planned. The streaming rate can be controlled if required to spare the bandwidth for operations.

Hinted Handoff 

Hints are a part of the anti-entropy mechanism, and those try to protect nodes from data loss when those are offline. Hints are stored and transferred, so metrics related to these attributes and delivery success, failure, delays, and timeouts are exposed. 

The hints metrics are useful to monitor all hints activities. A lot of hints stored and used indicate nodes being offline where hint delays, failures indicate a network or other communication issues.

CQL and Batch 

CQL metrics include the number of statements executed of each type. The batch metrics include the number of batch statements executed. These metrics help to monitor the application activity and query semantics used. Use of logged and unlogged batches has its caveats in Cassandra, and they can cause performance penalty if not used correctly. 

System Metrics

These metrics are not exported by Cassandra but those are obtained from the OS. These metrics are equally important as the Cassandra metrics to obtain system insights. 

Disk Usage

The disk usage is subject to monitoring as Cassandra is optimized to write a lot of data in quick time. The real risk for disk fillup is from compactions. The default compaction strategy used for Cassandra is SizeTieredCompactionStrategy STCS. This strategy merges many SSTables and outputs a single SSTable. The resulting SSTable can have a size equal to the combined size of all the SSTables merged in it. Also, until a compaction operation ends, both old and new SSTables exist on disk. 

The disk space guidelines for a cluster with most tables using STCS is to utilise the disk space up to 50% and to leave the rest as a room for compactions. Generally, disk space is cheaper in cost as compared to other resources and there is no harm to keep vacant space on nodes. However, if there is limited disk space available, disk monitoring becomes even more crucial as free disk left for compactions can be reduced further than general guidelines. 

Remedy for high disk usage includes snapshot deletion as those can consume a considerable amount of space. Another method is to stop specific compaction operation; this frees space consumed by the new SSTables. The time until the compaction starts again can be utilizd to add more space. 

Alerting: Set alerts for various stages of disk usage. The alerts can be categorized for severity based on the amount of free disk space on a node. 

CPU Usage

CPU capacity in a Cassandra cluster contributes as the main processing capacity. The number of requests served by a node and the amount of data stored are the factors directly proportional to the CPU utilization. CPU utilization should be monitored to ensure the nodes are not overloaded. 

A Cassandra cluster or a single data center should have all the nodes of similar size. Those should have an equal number of CPU cores, and the CPU utilization should also be equivalent. A single node or a few nodes with high CPU is an indication of uneven load or request processing across the nodes. It is observed that Cassandra is not CPU bound in most cases. However, a cluster or data center with high CPU utilization at most times should be considered for node size upgrade. 

Alerting: Set alerts for specific levels of CPU utilization on nodes or just for a single threshold. The levels can be defined as per expected CPU load, e.g. 80%, 90%, >95% etc. 

Monitoring tools

There are various tools available to set up Cassandra monitoring. I am describing here a few popular open-source tools used widely across the Cassandra community.

Prometheus

Prometheus is a metrics tool used for handling time-series based monitoring. It has alerting capability as well, which works on the time-series metrics. Prometheus can be configured to collect Cassandra metrics from nodes as well as the system metrics of the nodes. Prometheus uses exporters which are installed on the nodes and export data to Prometheus.  

Prometheus runs with a time-series database to store metrics. The metrics are stored in the database and can be queried using promQL, a query language for Prometheus. Prometheus also runs a web UI which can be used to visualise the actual metrics, graphs, alert rules etc. 

Alertmanager is the extension used for configuring alerts. Alertmanager has various integrations available for alerting including email, slack, hipchat, pagerduty etc. Prometheus has evolved over time, and it integrates well with the dropwizard metrics library. 

Prometheus - time-series based cassandra monitoring

Grafana

Grafana is a visualisation tool which can be used to visualize any time-series metrics. Grafana has various panels to showcase the data. The most commonly used panel is a graph. A graph is used to plot incoming data against a time-series in two dimensions. 

Grafana integrates with various data sources. These sources are queried in real-time by Grafana to obtain metrics. Each Grafana panel has one or more queries configured to query a data source; the result of the query is rendered on the panel. Grafana uses Prometheus as a well-integrated data source.

Grafana - Time series metrics visualization

Cassandra Exporter

Cassandra exporter is Instaclustr’s open-source solution for collecting Cassandra metrics efficiently. It is designed to integrate with Cassandra JVM and collect and publish metrics. Hence, Cassandra exporter is a replacement for the JMX metrics. 

JMX metrics in Cassandra have performance limitations and hence can cause some issues if used on systems with a large number of nodes. The Cassandra exporter has been well tested for optimal performance monitoring. The metrics produced by Cassandra exporter are also time-series and can be readily consumed by Prometheus. Please refer to the github page for information regarding configuration and usage. 

Conclusion

Cassandra monitoring is essential to get insight into the database internals. Monitoring is a must for production systems to ensure optimal performance, alerting, troubleshooting, and debugging. There are a large number of Cassandra metrics out of which important and relevant metrics can provide a good picture of the system. 

Finally, Instaclustr has the Cassandra monitoring expertise and capability with various options. 

  • Cassandra exporter is an excellent open source tool for optimal monitoring performance on large Cassandra clusters. 
  • Instaclustr Cassandra managed service uses a comprehensive monitoring-alerting service with 24×7 support and it is a good option to outsource all Cassandra operations and it comes with a free trial.

Instaclustr Cassandra Consulting services can help you with any monitoring or other Cassandra operations.

The post Cassandra Monitoring: A Best Practice Guide appeared first on Instaclustr.

Building a Low-Latency Distributed Stock Broker Application: Part 3

In the third blog of the  “Around the World ” series focusing on globally distributed storage, streaming, and search, we build a Stock Broker Application. 

1. Place Your Bets!

The London Stock Exchange 

How did Phileas Fogg make his fortune? Around the World in Eighty Days describes Phileas Fogg in this way: 

Was Phileas Fogg rich? Undoubtedly. But those who knew him best could not imagine how he had made his fortune, and Mr Fogg was the last person to whom to apply for the information.

I wondered if he had made his fortune on the Stock Market, until I read this:

Certainly an Englishman, it was more doubtful whether Phileas Fogg was a Londoner. He was never seen on ‘Change, nor at the Bank, nor in the counting-rooms of the “City“‘

Well, even if Fogg wasn’t seen in person at the ‘Change (London Stock Exchange), by 1872 (the year the story is set), it was common to use the telegraph (the internet of the Victorian age, which features regularly in the story) to play the market.

In fact the ability of the telegraph to send and receive information faster than horses/trains/boats etc. had been used for stock market fraud as early as 1814! (The “Great Stock Exchange Fraud of 1814”). Coincidentally (or not?), the famous London Stock Exchange Forgery, also involving the telegraph, also occurred in 1872! Perhaps this explains the ambiguity around the origin of Fogg’s wealth!

What is certain is that Phileas Fogg became the subject of intense betting, and he was even listed on the London Stock Exchange (Chapter V – IN WHICH A NEW SPECIES OF FUNDS, UNKNOWN TO THE MONEYED MEN, APPEARS ON ‘CHANGE):

Not only the members of the Reform, but the general public, made heavy wagers for or against Phileas Fogg, who was set down in the betting books as if he were a race-horse. Bonds were issued, and made their appearance on ‘Change; “Phileas Fogg bonds” were offered at par or at a premium, and a great business was done in them. But five days after the article in the bulletin of the Geographical Society appeared, the demand began to subside: “Phileas Fogg” declined. They were offered by packages, at first of five, then of ten, until at last nobody would take less than twenty, fifty, a hundred!”

The 1870’s also saw the introduction of a new technological innovation in Stock Trading, the Stock Ticker Machine. Stock tickers were a special type of telegraph receiver designed to print an alphabetical company symbol and the current price of that company’s stock on a paper roll called ticker tape. This enabled stock prices to be communicated closer to real-time across vast distances, and revolutionized trading. 

Vintage Stock Ticker Machine (Source: Shutterstock)

2. Let’s Build a Stock Broker Application

Fast forward 128 years from 1872 to 2000 and technology looked a bit different. I’m taking inspiration from an earlier project I worked with from 2000-2003 at CSIRO (Australia’s national science research agency) called “StockOnline”. This was an online Stock Broker application designed to benchmark new component-based middleware technologies, including Corba and Enterprise Java (J2EE). The original version simulated traders checking their stock holdings and current stock prices, and then buying and selling stocks, resulting in revised stock holdings. The benchmark could be configured with different workload mixes, and the number of concurrent traders could be scaled up to stress the system under test. Metrics captured included the relative number, throughput, and response time of each of the operations. 

Some of the technology innovations that the project was designed to give insights into included: 

  • the use of application servers to provide easy to manage and scalable container resourcing (yes, containers are at least 20 years old); 
  • how portable the application was across multiple different vendors application servers (sort of);
  •  the impact of JVM choice and settings (lots); 
  • explicit support for component configuration (e.g. wiring components together); and 
  • deployment into containers, rich container services, and multiple persistence models to manage state and allow database portability (e.g. Container Managed Persistence vs. Bean Managed Persistence). 

At the end of the project we made the StockOnline code and documentation open source, and I recently rediscovered it and made it available on github. I was surprised to learn that Enterprise Java is still going strong and is now run by the Eclipse Foundation and called “Jakarta EE”.  Also interesting is that there is support for persistence to Cassandra

So let’s move on to the present day.

3. High-Frequency Low-Latency Trading

Modern stock markets are fast-moving, ultra-competitive environments. Orders are sent to the market by high speed networks and executed almost instantly. This makes low-latency trading a key to profitability.  Trade related latency is any delay in the time it takes for a trader to interact with the market, and includes distance related network delays, delays in receiving and acting on information, and delays caused by brokers (e.g. queuing of orders, delays interacting with the stock exchange to trade the orders, etc.). Some of the key solutions to reducing latency are broker side hosting of orders (orders are hosted on brokers and automatically traded when conditions are met), and Direct Market Access (brokers are as close as possible to stock exchanges, with super-fast network connections).

A new type of US Stock Exchange (IEX) was even created to address some of the issues around fairness of stock trading due to latency. Some brokers are able to take advantage of even small latency differences – “price snipping”, or so called “dark pools” which fill orders from within a pool rather than via public stock exchanges, to make huge profits. Although, somewhat oddly, the IEX levelled the playing field by introducing delays to ensure that no one playing the market has more up-to-date information than anyone else.

Latency is partially caused by the global location of stock exchanges. Where are stock exchanges located?  There are 60 exchanges around the world on every continent with a total value of $69 Trillion, and 16 worth more than $1 Trillion each!

4. Initial Application Design

I had already started writing a new version of StockOnline before I rediscovered the original, so the new version doesn’t share any of the original code. However, it does turn out to have similar entities, but with some significant differences to model multiple StockExchanges and Brokers.  Here’s the UML Class diagram of my new prototype code:

The first major difference is that it’s designed to model and simulate distributed stock brokers across multiple global “georegions”. We introduced the concept of georegions in blog 1 (called “latency regions”) and blog 2 (called “georegions”). A georegion is a geographic region that has at least two AWS regions (for data center redundancy), and ensures that applications within the same georegion are within 100ms latency of each other and users in the same georegion in case of failure of one region.  Here’s the map from the previous blogs showing the eight sub 100ms latency georegions that we identified (North America, South America, Atlantic, Europe, Middle East, Central Asia, East Asia, Australasia):

This means that I have to explicitly model multiple StockExchanges. Each StockExchange is in a particular location in a georegion.  Each StockExchange is responsible for listing some stocks, providing meta-data about the stocks,  publishing changes to stock prices as StockTickers, and matching buy/sell orders (i.e. executing trades).  For simplicity we assume that each stock is only listed on a single StockExchange. 

Each georegion has one or more StockBrokers which are co-located in the same georegion as some StockExchanges to ensure low-latency (and potentially redundancy).  The StockBrokers are responsible for discovering StockExchanges, all the stocks listed for trading, obtaining StockTickers to update current price data, and computing trends and longer-term Stock Statistics that inform traders making decisions about buying and selling. They are also responsible for keeping track of trader data, updating StockHoldings for Traders, keeping track of orders and trading them on the appropriate StockExchanges, and keeping records of trades (transactions). Also different to the original version (which only had a single Market Order type), I wanted to have multiple different order types including Market, Limit and Stop Orders. This is important for the latency story as market orders are traded “manually” and immediately, but Limit and Stop Orders are traded automatically when they meet specific conditions, so can be traded very quickly and in larger volumes, this is a good explanation).

We assume that traders connect to their nearest StockBroker (to reduce latency and possibly to satisfy local financial rules). There is a series of operations supported by StockBrokers for traders, and also for interacting with the StockExchanges as follows.  First let’s look at the workflow for trading Market Orders, “Place Market Order”. These are essentially synchronous and immediate trades. The trader connects to the nearest broker, gets their current StockHoldings and current StockStatistics (for their holdings and possibly for other stocks they don’t currently own). Based on this information they decide what stocks to trade, whether to buy or sell, and the quantity of stocks, and create a Market Order. The broker then processes the Market Order (which may involve sending it to another broker), and receives confirmation that the trade occurred (including price, quantity, transaction ID etc.), and finally updates the trader’s StockHoldings for the stock traded. 

The steps to “Process Market Order” are as follows.  The order is sent to a broker in the same Georegion as the StockExchange listing the stock. This broker then immediately executes the trade (buys or sells) with the StockExchange, gets the transaction details, marks the order as filled (so it isn’t processed more than once), and updates the StockHolding amounts for the trader. 

The “Execute Trade with StockExchange” assumes that a trade is always possible (at the current price) and will occur instantaneously and completely, and has the following substeps:

Market Orders are potentially a slow process due to all the synchronous steps, “think time” for the trader, and cumulative latencies due to the trader to broker, broker to broker, and broker to StockExchange communication paths.

As an alternative we also provide some asynchronous Order types: Limit and Stop. These order types are only traded when the conditions are met, but then need to be executed as quickly as possible to prevent losses in a fast moving market.

We assume that the initial steps are mostly the same as “Place Market Order”, but with the added choice of Limit of Stop Order, and the limit price and the final step (notification of eventual Trade) is asynchronous:

Once the Order placed, it is processed by sending it to the correct broker (as for Market Orders), and then that broker is responsible for continuously checking orders to see if they match:

This is done as follows (“Trade Matching Orders”) and relies on each broker receiving a stream of StockTicker updates from the StockExchanges in the same georegion. For each StockTicker the broker finds orders for that stock, and checks which orders meet the buy/sell conditions (the logic depends on the order type, if the price is rising or dropping, and if the current price is less than or greater to the order limit price). If the matching order(s) are Sell Orders then an extra step is to check that the trader still has sufficient holdings of that stock (they may have already sold some stock due to other orders being executed first). If all conditions are met then the broker initiates an immediate “Market” trade with the StockExchange as before.

The initial prototype application code is written in pure Java and just simulates the expected behaviour of the traders, brokers, and StockExchanges. It creates a specified number of georegions, brokers, StockExchanges, stocks, and traders with initial data. Then the simulation runs lots of rounds (seconds) for a specified period (e.g. a day).  Each round results in traders checking their holdings and StockStatistics, and creating orders (about ⅓ of each type, but only if the type matches the specific stock and market conditions). The orders are sent to the correct brokers. Each round the brokers receive simulated StockTickers from StockMarkets in the same georegion (using a pseudo-random walk which keeps the stock direction for the majority of the time, but occasionally changing direction). Some stocks are more volatile than others so change faster.  Each round the brokers immediately Trade Market Orders, and check the conditions and trade matching Limit or Stop Orders. 

5. Initial Simulation Results—“Time Is Money”!

The simulation computes business level metrics including number of trades, value of trades, etc., and expected end-to-end latency based on deploying brokers in 8 georegions, and using the AWS inter-region latencies from blog 1 of this series. This gives us a baseline to eventually compare the real results with. The average latency to get the current stock price from a remote StockExchange and “Process Market Orders” is 700ms (max 1.2s), which includes multiple times for intra-region and inter-region networking. The average latency for Limit and Stop “Trade Matching” Orders is shorter at 100ms (max 200ms), as it only includes times to get StockTicker updates and the time to trade;.  i.e. it doesn’t include any AWS inter-region latencies as the operation is asynchronous and processed entirely within the georegion of the broker/StockExchange (we expect this to be slightly higher in practice due to the eventual overhead of database lookups and condition checking on the broker).

So is the saying “Time Is Money!” true in the context of low latency trading, and how much money exactly? I added a calculation to the simulation to compute potential profit losses assuming high volatility in the prices of some stocks, and higher latency times to trade. Due to potentially high volumes of trades even a small profit loss per trade can add up to big losses in profit very quickly.  For one simulation run with 2,101 completed trades, the potential profit loss for the higher latency Market Orders was 0.7% of the trade value (or Market Orders), but for the lower latency Limit and Stop Orders it was significantly less at 0.1% of the trade value (for those order types). For an average order size of $20,000 this corresponds to a $140 profit loss per Market Order, compared with only $20 profit loss for each Limit and Stop Order. Over hundreds or even thousands of trades per day (typical of HFT) this would quickly add up to significant amounts of money! Moreover, to make a profit, High Frequency Trading (HFT) relies on conducting a high volume of trades to rapidly take advantage of very small movements in prices, with potentially smaller profits per trade. So it’s easy to see why minimizing latency is a worthwhile goal in Fintech applications such as this. 

6. What Next?

In the next few blogs we’ll continue our journey “Around the World” and explore how to refine the initial simple design of the application so as to deploy and run it across multiple georegions using multiple AWS regions.

Initially this will involve mapping the components to Cassandra tables on a single data center. Once it’s working correctly with a single Cassandra data center, we’ll extend it to use multiple Cassandra data centers, which will require the use of multiple keyspaces for different replication topologies (e.g. replication across pairs of data centers vs. all data centers). We’ll also work out if, and how, to load-balance the application across multiple data centers in the same georegions, and how to enable redundancy, failover, and recovery at the application level.  It’s possible that a Kubernetes federated microservices mesh framework will help in doing this. We also plan to put Kafka to use to enable streaming StockTickers, so we’ll be investigating Kafka multi data center replication. 

7. Further Resources

IBM also has a StockTrader demonstration application, and an in-depth series about deploying it using Cassandra, Kafka, Redis, and Kubernetes.

There’s an example of stock analysis using Elasticsearch (I’m planning on using Elasticsearch to analyse the stock trends, and provide some map-based visualisation of the data).

This is an interesting article on “Hacking a HFT System”!

The Original CSIRO StockOnline code and documentation is now on github.

The post Building a Low-Latency Distributed Stock Broker Application: Part 3 appeared first on Instaclustr.

Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

Streaming is a process where nodes of a cluster exchange data in the form of SSTables. Streaming can kick in during many situations such as bootstrap, repair, rebuild, range movement, cluster expansion, etc. In this post, we discuss the massive performance improvements made to the streaming process in Apache Cassandra 4.0.

High Availability

As we know Cassandra is a Highly Available, Eventually Consistent database. The way it maintains its legendary availability is by storing redundant copies of data in nodes known as replicas, usually running on commodity hardware. During normal operations, these replicas may end up having hardware issues causing them to fail. As a result, we need to replace them with new nodes on fresh hardware.

As part of this replacement operation, the new Cassandra node streams data from the neighboring nodes that hold copies of the data belonging to this new node’s token range. Depending on the amount of data stored, this process can require substantial network bandwidth, taking some time to complete. The longer these types of operations take, the more we are exposing ourselves to loss of availability. Depending on your replication factor and consistency requirements, if another node fails during this replacement operation, ability will be impacted.

Increasing Availability

To minimize the failure window, we want to make these operations as fast as possible. The faster the new node completes streaming its data, the faster it can serve traffic, increasing the availability of the cluster. Towards this goal, Cassandra 4.0 saw the addition of Zero Copy streaming. For more details on Cassandra’s zero copy implementation, see this blog post and CASSANDRA-14556 for more information.

Talking Numbers

To quantify the results of these improvements, we, at Netflix, measured the performance impact of streaming in 4.0 vs 3.0, using our open source NDBench benchmarking tool with the CassJavaDriverGeneric plugin. Though we knew there would be improvements, we were still amazed with the overall results of a five fold increase in streaming performance. The test setup and operations are all detailed below.

Test Setup

In our test setup, we used the following configurations:

  • 6-node clusters on i3.xl, i3.2xl, i3.4xl and i3.8xl EC2 instances, each on 3.0 and trunk (sha dd7ec5a2d6736b26d3c5f137388f2d0028df7a03).
  • Table schema
CREATE TABLE testing.test (
    key text,
    column1 int,
    value text,
    PRIMARY KEY (key, column1)
) WITH CLUSTERING ORDER BY (column1 ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
    AND compression = {'enabled': 'false'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
  • Data size per node: 500GB
  • No. of tokens per node: 1 (no vnodes)

To trigger the streaming process we used the following steps in each of the clusters:

  • terminated a node
  • add a new node as a replacement
  • measure the time taken to complete streaming data by the new node replacing the terminated node

For each cluster and version, we repeated this exercise multiple times to collect several samples.

Below is the distribution of streaming times we found across the clusters Benchmark results

Interpreting the Results

Based on the graph above, there are many conclusions one can draw from it. Some of them are

  • 3.0 streaming times are inconsistent and show high degree of variability (fat distributions across multiple samples)
  • 3.0 streaming is highly affected by the instance type and generally looks generally CPU bound
  • Zero Copy streaming is approximately 5x faster
  • Zero Copy streaming time shows little variability in its performance (thin distributions across multiple samples)
  • Zero Copy streaming performance is not CPU bound and remains consistent across instance types

It is clear from the performance test results that Zero Copy Streaming has a huge performance benefit over the current streaming infrastructure in Cassandra. But what does it mean in the real world? The following key points are the main take aways.

MTTR (Mean Time to Recovery): MTTR is a KPI (Key Performance Indicator) that is used to measure how quickly a system recovers from a failure. Zero Copy Streaming has a very direct impact here with a five fold improvement on performance.

Costs: Zero Copy Streaming is ~5x faster. This translates directly into cost for some organizations primarily as a result of reducing the need to maintain spare server or cloud capacity. In other situations where you’re migrating data to larger instance types or moving AZs or DCs, this means that instances that are sending data can be turned off sooner saving costs. An added cost benefit is that now you don’t have to over provision the instance. You get a similar streaming performance whether you use a i3.xl or an i3.8xl provided the bandwidth is available to the instance.

Risk Reduction: There is a great reduction in the risk due to Zero Copy Streaming as well. Since a Cluster’s recovery mainly depends on the streaming speed, Cassandra clusters with failed nodes will be able to recover much more quickly (5x faster). This means the window of vulnerability is reduced significantly, in some situations down to few minutes.

Finally, a benefit that we generally don’t talk about is the environmental benefit of this change. Zero Copy Streaming enables us to move data very quickly through the cluster. It objectively reduces the number and sizes of instances that are used to build Cassandra cluster. As a result not only does it reduce Cassandra’s TCO (Total Cost of Ownership), it also helps the environment by consuming fewer resources!

Introducing Transient Replication

Transient Replication is a new experimental feature soon to be available in 4.0. When enabled, it allows for the creation of keyspaces where replication factor can be specified as a number of copies (full replicas) and temporary copies (transient replicas). Transient replicas retain the data they replicate only long enough for it to be propagated to full replicas, via incremental repair, at which point the data is deleted. Writing to transient replicas can be avoided almost entirely if monotonic reads are not required because it is possible to achieve a quorum of acknowledged writes without them.

This results in a savings in disk space, CPU, and IO. By deleting data as soon as it is no longer needed, transient replicas require only a fraction of the disk space of a full replica. By not having to store the data indefinitely, the CPU and IO required for compaction is reduced, and read queries are faster as they have less data to process.

So what are the benefits of not actually keeping a full copy of the data? Well, for some installations and use cases, transient replicas can be almost free if monotonic reads are disabled. In future releases where monotonic reads are supported with Transient Replication, enabling monotonic reads would reduce the savings in CPU and IO, but even then they should still be significant.

Transient Replication is designed to be transparent to applications:

  • Consistency levels continue to produce the same results for queries.
  • The number of replicas that can be lost before data loss occurs is unchanged.
  • The number of replicas that can be unavailable before some queries start to timeout or return unavailable is unchanged (with the exception of ONE).

With Transient Replication, you can go from 3 replicas to 5 replicas, two of which are transient, without adding any hardware.

If you are running an active-passive 2 DC setup with 3 replicas in each DC, you can make one replica in each DC transient and still have four full copies of the data in total.

Feature support

Transient Replication is not intended to fully replace Cassandra’s existing approach to replication. There are features that currently don’t work with transiently replicated keyspaces and features that are unlikely ever to work with them.

You can have keyspaces with and without Transient Replication enabled in the same cluster, so it is possible to use Transient Replication for just the use cases that are a good fit for the currently available functionality.

Currently unsupported but coming:

  • Monotonic reads
  • Batch log
  • LWT
  • Counters

Will never be supported:

  • Secondary indexes
  • Materialized views

How Transient Replication works

Overview

Transient replication extends Cassandra’s existing consistent hashing algorithm to designate some replicas of a point or range on the consistent hash ring as transient and some as full. The following image depicts a consistent hash ring with three replicas A, B, and C. The replicas are located at tokens 5, 10, 15 respectively. A key k hashes to token 3 on the ring.

A consistent hash ring without Transient Replication

Replicas are selected by walking the ring clockwise starting at the point on the ring the key hashes to. At RF=3, the replicas of key k **are **A, B, C. With Transient Replication, the last N replicas (where N is the configured number of transient replicas) found while walking the ring are designated as transient.

There are no nodes designated as transient replicas or full replicas. All nodes will fully replicate some ranges on the ring and transiently replicate others.

The following image depicts a consistent hash ring at RF=3/1 (three replicas, one of which is transient). The replicas of k are still A, B, and C, but C is now transiently replicating k.

A consistent hash ring with Transient Replication

Normally all replicas of a range receive all writes for that range, as depicted in the following image.

Normal write behavior

Transient replicas do not receive writes in the normal write path.

Transient write behavior

If sufficient full replicas are unavailable, transient replicas will receive writes.

Transient write with unavailable node

This optimization, which is possible with Transient Replication, is called Cheap Quorums. This minimizes the amount of work that transient replicas have to do at write time, and reduces the amount of background compaction they will have to do.

Cheap Quorums and monotonic reads: Cheap Quorums may end up being incompatible with an initial implementation of monotonic reads, and operators will be able to make a conscious trade off between performance and monotonic reads.

Rapid write protection

In keyspaces utilizing Transient Replication, writes are sent to every full replica and enough transient replicas to meet the requested consistency level (to make up for unavailable full replicas). In addition, enough transient replicas are selected to reach a quorum in every datacenter, though unless the consistency level requires it, the write will be acknowledged without ensuring all have been delivered.

Because not all replicas are sent the write, it’s possible that insufficient replicas will respond, causing timeouts. To prevent this, we implement rapid write protection, similar to rapid read protection, that sends writes to additional replicas if sufficient acknowledgements to meet the consistency level are not received promptly.

The following animation shows rapid write protection in action.

Animation of rapid write protection preventing a write timeout

Rapid write protection is configured similarly to rapid read protection using the table option additional_write_policy. The policy determines how long to wait for acknowledgements before sending additional mutations. The default is to wait for P99 of the observed latency.

Incremental repair

Incremental repair is used to clean up transient data at transient replicas and propagate it to full replicas.

When incremental repair occurs transient replicas stream out transient data, but don’t receive any. Anti-compaction is used to separate transient and fully replicated data so that only fully replicated data is retained once incremental repair completes.

The result of running an incremental repair is that all full replicas for a range are synchronized and can be used interchangeably to retrieve the repaired data set for a query.

Read path

Reads must always include at least one full replica and can include as many replicas (transient or full) as necessary to achieve the desired consistency level. At least one full replica is required in order to provide the data not available at transient replicas, but it doesn’t matter which full replica is picked because incremental repair synchronizes the repaired data set across full replicas.

Reads at transient replicas are faster than reads at full replicas because reads at transient replicas are unlikely to return any results if monotonic reads are disabled, and they haven’t been receiving writes.

Creating keyspaces with Transient Replication

Transient Replication is supported by SimpleStrategy and NetworkTopologyStrategy. When specifying the replication factor, you can specify the number of transient replicas in addition to the total number of replicas (including transient replicas). The syntax for a replication factor of 3 replicas total with one of them being transient would be “3/1”.

ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'DC1' : '3/1'};
ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : '3/1'};

Monotonic reads are not supported with Transient Replication in 4.0, so any existing tables in the keyspace must have monotonic reads disabled by setting read_repair = 'NONE'

Once the keyspace has been altered, you will need to run incremental repair and then nodetool cleanup to ensure transient data is cleaned up.

Operational matters

Transient replication requires rolling incremental repair to be run regularly in order to move data from transient replicas to full replicas. By default transient replicas will receive 1% of writes for transiently replicated ranges due to rapid write protection. If a node is down for an extended period of time, its transient replicas will receive additional write load and that data should be cleaned up using incremental repair. Running incremental repair regularly will ensure that the size of each repair is small.

It’s also a good idea to run a small number of vnodes with transient replication so that when a node goes down the load is spread out over several other nodes that transiently replicate that range. Larges numbers of vnodes are known to be problematic, so it’s best to start with a cluster that is already close to or at its maximum size so that a small number of vnodes will be sufficient. If you intend to grow the cluster in the future, you will need to be cognizant of how this will interact with the number of vnodes you select.

While the odds of any data loss should multiple nodes be permanently lost remain the same with transient replication, the magnitude of potential data loss does not. With 3/1 transient replication the permanent loss of two nodes could result in the loss of the entirety of the repaired data set. If you are running a multi-DC setup with a high level of replication such as 2 DCs, with 3/1 replicas in each, then you will have 4 full copies total and the added risk of transient replication is minimal.

Experimental features

Experimental features are a relatively new idea for Apache Cassandra. Although we recently voted to make materialized views an experimental feature retroactively, Transient Replication is the first experimental feature to be introduced as such.

The goal of introducing experimental features is to allow for incremental development across multiple releases. In the case of Transient Replication, we can avoid a giant code drop that heavily modifies the code base, and the associated risks with incorporating a new feature that way.

What it means for a feature to be experimental doesn’t have a set definition, but for Transient Replication it’s intended to set expectations. As of 4.0, Transient Replication’s intended audience is expert operators of Cassandra with the ability to write the book on how to safely deploy Transient Replication, debug any issues that result, and if necessary contribute code back to address problems as they are discovered.

It’s expected that the feature set for Transient Replication will not change in minor updates to 4.0, but eventually it should be ready for use by a wider audience.

Next steps for Transient Replication

If increasing availability or saving on capacity sounds good to you, then you can help make transient replication production-ready by testing it out or even deploying it. Experience and feedback from the community is one the of the things that will drive transient replication bug fixing and development.

Step by Step Guide to Installing and Configuring Spark 2.0 to Connect with Cassandra 3.x

In this guide, we will be installing Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program. If you have any of these software packages installed and configured already you can skip that step. This guide assumes you have a Cassandra 3.x cluster that is already up and running. For more information on installing and using Cassandra visit http://cassandra.apache.org/.Note: The following steps should be performed on all the nodes in the cluster unless otherwise noted.

Install Scala 2.11

Ensure you have Java installed


$ java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

If you don't have Java installed follow this tutorial to get Java 8 installed.
Install Scala 2.11.8


$ wget www.scala-lang.org/files/archive/scala-2.11.8.deb
$ sudo dpkg -i scala-2.11.8.deb
$ scala -version

Install SBT 0.13



echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
$ sudo apt-get update
$ sudo apt-get install sbt

Install Spark 2.0

Download Spark 2.0 from https://spark.apache.org/downloads.html and unpack the tar file


$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
$ tar zxf spark-2.0.2-bin-hadoop2.7.tgz
$ sudo mv spark-2.0.2-bin-hadoop2.7/ /usr/local/spark/

Update system variables


$ sudo nano /etc/environment

Add an environment variable called SPARK_HOME


export SPARK_HOME=/usr/local/spark

At the end of the PATH variable add $SPARK_HOME/bin


PATH="<previous_entries>:/usr/local/spark/bin"

Refresh the environment


source /etc/environment

Create spark user and make it the owner of the SPARK_HOME directory


$ sudo adduser spark --system --home /usr/local/spark/ --disabled-password
$ sudo chown -R spark:root /usr/local/spark

Create Log and Pid directories


$ sudo mkdir /var/log/spark
$ sudo chown spark:root /var/log/spark
$ sudo -u spark mkdir $SPARK_HOME/run

Create the Spark Configuration files

Create the Spark Configuration files by copying the templates


$ sudo cp /usr/local/spark/conf/spark-env.sh.template /usr/local/spark/conf/spark-env.sh
$ sudo cp /usr/local/spark/conf/spark-defaults.conf.template /usr/local/spark/conf/spark-defaults.conf
$ sudo chown spark:root /usr/local/spark/conf/spark-*

Edit the Spark Environment file spark-env.sh


export SPARK_LOG_DIR=/var/log/spark
export SPARK_PID_DIR=${SPARK_HOME}/run

Configure Spark nodes to join cluster

If you will not be managing Spark using the Mesos or YARN cluster managers, you'll be running Spark in what is called "Standalone Mode".
In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then on every worker node you must edit the /etc/spark/conf/spark-env.sh to point to the host where the Spark Master runs.


# Options for the daemons used in the standalone deploy mode
export SPARK_MASTER_HOST=<spark_master_ip_or_hostname_here>

You can also change other elements of the default configuration by editing the /etc/spark/conf/spark-env.sh. Some other configs to consider are:
  • SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
  • SPARK_WORKER_CORES, to set the number of cores to use on this machine
  • SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
  • SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
  • SPARK_WORKER_INSTANCE, to set the number of worker processes per node
  • SPARK_WORKER_DIR, to set the working directory of worker processes

Installing Spark as a service

Run the following commands to create a service for the spark-master and spark-worker


$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-master
$ sudo chmod 0755 /etc/init.d/spark-master
$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-worker
$ sudo chmod 0755 /etc/init.d/spark-worker
$ sudo update-rc.d spark-master defaults 99
$ sudo update-rc.d spark-worker defaults 99

Edit the /etc/init.d/spark-worker file. If a variable or function already exists then replace it with the text below.


DESC="Spark Worker"
NAME=spark-worker
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.worker.Worker-1.pid
export SPARK_HOME


# Exit if the package is not installed
#[ -x "$DAEMON" ] || exit 0


if [ -f $SPARK_HOME/conf/spark-env.sh ];then
        . $SPARK_HOME/conf/spark-env.sh
else
        echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service."
fi


#
# Function that returns 0 if process is running, or nonzero if not.
#
# The nonzero value is 3 if the process is simply not running, and 1 if the
# process is not running but the pidfile exists (to match the exit codes for
# the "status" command; see LSB core spec 3.1, section 20.2)
#
is_running()
{
    CMD_PATT="org.apache.spark.deploy.worker.Worker"
    if [ -f $PIDFILE ]; then
        pid=`cat $PIDFILE`
        grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0
        return 1
    fi
    return 3
}


#
# Function that starts the daemon/service
#
do_start()
{
        # Return
        #   0 if daemon has been started
        #   1 if daemon was already running
        #   2 if daemon could not be started


        [ -e `dirname "$PIDFILE"` ] || \
                install -d -ospark -groot -m755 `dirname $PIDFILE`


        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE  \
                --exec $SPARK_HOME/sbin/start-slave.sh  \
                --test > /dev/null \
                || return 1
        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \
                --exec $SPARK_HOME/sbin/start-slave.sh -- spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
                || return 2


}


#
# Function that stops the daemon/service
#
do_stop()
{
        start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE
        RETVAL="$?"
        rm -f $PIDFILE
        return "$RETVAL"
}


#
# Function that sends a SIGHUP to the daemon/service
#
do_reload() {
        #
        # If the daemon can reload its configuration without
        # restarting (for example, when it is sent a SIGHUP),
        # then implement that here.
        #
        start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE
        return 0
}


...
status)
      is_running
      stat=$?
      case "$stat" in
              0) log_success_msg "$DESC is running" ;;
              1) log_failure_msg "could not access pidfile for $DESC" ;;
              *) log_success_msg "$DESC is not running" ;;
      esac
      exit "$stat"
      ;;
...

Edit the /etc/init.d/spark-master file. If a variable or function already exists then replace it with the text below.


DESC="Spark Master"
NAME=spark-master
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.master.Master-1.pid
export SPARK_HOME


# Exit if the package is not installed
#[ -x "$DAEMON" ] || exit 0


if [ -f $SPARK_HOME/conf/spark-env.sh ];then
        . $SPARK_HOME/conf/spark-env.sh
else
        echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service."
fi


#
# Function that returns 0 if process is running, or nonzero if not.
#
# The nonzero value is 3 if the process is simply not running, and 1 if the
# process is not running but the pidfile exists (to match the exit codes for
# the "status" command; see LSB core spec 3.1, section 20.2)
#
is_running()
{
    CMD_PATT="org.apache.spark.deploy.master.Master"
    if [ -f $PIDFILE ]; then
        pid=`cat $PIDFILE`
        grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0
        return 1
    fi
    return 3
}


#
# Function that starts the daemon/service
#
do_start()
{
        # Return
        #   0 if daemon has been started
        #   1 if daemon was already running
        #   2 if daemon could not be started


        [ -e `dirname "$PIDFILE"` ] || \
                install -d -ospark -groot -m755 `dirname $PIDFILE`


        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh --test > /$
                || return 1
        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh  \
                || return 2
}


#
# Function that stops the daemon/service
#
do_stop()
{
        start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE


        RETVAL="$?"
        rm -f $PIDFILE
        return "$RETVAL"
}


#
# Function that sends a SIGHUP to the daemon/service
#
do_reload() {
        #
        # If the daemon can reload its configuration without
        # restarting (for example, when it is sent a SIGHUP),
        # then implement that here.
        #
        start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE
        return 0
}


...
status)
      is_running
      stat=$?
      case "$stat" in
              0) log_success_msg "$DESC is running" ;;
              1) log_failure_msg "could not access pidfile for $DESC" ;;
              *) log_success_msg "$DESC is not running" ;;
      esac
      exit "$stat"
      ;;
...

Running Spark as a service

Start the Spark master node first. On whichever node you've selected to be master, run:


$ sudo service spark-master start

On all the other nodes, start the workers:


$ sudo service spark-worker start

To stop Spark, run the following commands on the appropriate nodes


$ sudo service spark-worker stop
$ sudo service spark-master stop

Service logs will be stored in /var/log/spark.

Testing the Spark service

To test the Spark service, start spark-shell on one of the nodes.


$ spark-shell --master spark://<IP>:<Port>

When the prompt comes up, execute the following line of code:


$ scala> sc.parallelize( 1 to 1000 ).sum()

Get Spark-Cassandra-Connector on the client

The spark-cassandra-connector is a Scala library that exposes Cassandra tables as Spark RDDs, lets you write Spark RDDs to Cassandra tables, and allows you to execute arbitrary computations and CQL queries that are distributed to the Cassandra nodes holding the data, which allows them to be fast. Your code + the spark-cassandra-connector and all dependencies are packaged up and sent to the Spark nodes.
If you are writing ad-hoc queries / computations from the spark-shell. Start up the shell by running the command:


$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11

The --packages option downloads the connector and all of its dependencies from the Spark Packages site and places it in the path of the Spark Driver and all Spark Executors.
If you are writing a Scala application; configure a new Scala project. Your build.sbt file should look something like this:


name := "MySparkProject"


version := "1.0"


scalaVersion := "2.11.8"


val sparkVersion = "2.0.2"


resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"


libraryDependencies ++= Seq(
  "org.apache.spark"      % "spark-core_2.11"  % sparkVersion,
  "org.apache.spark"      % "spark-sql_2.11"   % sparkVersion,
  "datastax"              % "spark-cassandra-connector" % "2.0.0-M2-s_2.11"
)

Testing the connector

To start out, create a simple keyspace and table in Cassandra. Run the following statements in cqlsh:


CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);

Then insert some example data:


INSERT INTO test.kv(key, value) VALUES ('key1', 1);
INSERT INTO test.kv(key, value) VALUES ('key2', 2);

For this test, we'll use the spark-shell.


$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11



// import the spark connector namespace
import com.datastax.spark.connector._


// read Cassandra data into an RDD
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)  


// Add two more rows to the table
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))

Diagnosing and Fixing Cassandra Timeouts

We recently brought Cassandra into our infrastructure to take advantage of its robust built-in scalability model. After reading all the "marketing" material, watching/attending talks by evangelists, and reading blog posts by experts and enthusiasts we were excited to embark on our Cassandra journey.

via GIPHY


Everything was honky dory in the early going. We followed data modeling best practices, provisioned the right sized servers as recommended by DataStax, and saw great results in the test environment. The issues didn't start until we started moving over portions of our production traffic to our Cassandra production servers. That's when we noticed that we would fairly frequently get read and write timeouts as the volume of traffic got higher.


via GIPHY

My first troubleshooting step was to find out was to monitor the servers to see if there were any anomalies or resource constraints that led to the timeouts. I'm a fan of Brendan Gregg's USE method for monitoring overall system health and identifying errors and bottlenecks. However, at the time we didn't have a comprehensive monitoring solution like Datadog or Icinga2 setup for those servers. So in its place, I wrote a shell script that every 30 seconds would log memory and CPU utilization, disk I/O, network I/O, Cassandra thread pool stats (nodetool tpstats), GC stats, compaction stats, and the table stats for my most active tables. To my chagrin, my hacked together monitor did not reveal any anomalies or resource constraints that were the cause of the timeouts. On the contrary, it revealed that resource utilization (with the exception of CPU which was a bit high) was well under the hardware limits. I did notice that a number of times, a query timeout would coincide with the start of SSTable compaction which led down the rabbit hole of compaction tweaks, however I emerged from that hole with no solution to the Cassandra timeouts.


via GIPHY


Once hardware constraints had been ruled out, I kicked off my next round of troubleshooting efforts by turning on trace probability and looking at the trace for long running queries. In the traces I noticed high queue times, which is not unexpected during times of high load, but I also noticed something else. There were a number of times when a worker node received the request from the coordinator, completed the read operation for the query, queued up the result to be sent back to the coordinator, but the results were never sent and the coordinator timed out waiting for the response. Bingo! I had found the cause of the timeout! But I still hadn't found the root cause; why were the responses not getting sent back? And I didn't know how to fix it either. I spent a good amount of time looking into tweaking Cassandra thread pools (like the RequestResponseStage pool), reading the Cassandra source code on GitHub and re-verifying network traffic/congestion between the nodes during high loads. But those efforts yielded no solution to the query timeouts which led me to the third round of troubleshooting efforts.


via GIPHY


For this round of troubleshooting I turned up the logging level on the org.apache.cassandra logger to DEBUG and I live tailed both the Cassandra system.log and debug.log. That's when I noticed something weird; several times, one of the nodes will detect the other as being down for a period of time. When I looked at the system logs and Linux process activity in htop for the supposedly downed node, it appeared that the node was up and running. This lead me to take a fine-grained look at the system.log for the downed node and in there I noticed a pretty long GC pause. So I flipped over to the gc.log, examined it, and noticed even more long GC pauses. I also noticed that at times there were 35 second pauses that would occur in succession. Those super long GC pauses cause the node to appear down to its peers. It also explained cases where a coordinator sent a request and timed out because the worker node paused execution and didn't return the response in a timely manner. I had found my problem!


via GIPHY


So how did this happen? Well, when we were configuring Cassandra, some doofus (aka me!) noticed that
1. The key cache drastically improved the performance of queries
2. They had a high key cache hit rate and continued to have a high hit rate even as they increased the size of the key cache.
So this doofus (again, me), decided to bump the key cache all the way to 8 GB so he could cache entire tables' keys, essentially turning them into in-memory lookups. To accommodate having such a large key cache, he bumped up the JVM heap to 20 GB thinking the new G1GC garbage collector can handle very large heaps. News flash; it can't! At least not when dealing with my workload of high volumes of concurrent reads and writes.

The fix was simple; reduce the size of the JVM heap and the key cache accordingly. The recommended heap size is 8 GB. I'm able to have my new max heap size set to slightly above that recommendation because I switched my memtables to storing part of its data off-heap (offheap_buffers) which greatly reduced heap memory pressure. I also started using the JEMAlloc allocator which is more efficient than the native GCC allocator.

In conclusion, distributed database systems are complex beasts and tuning them has to be done on many levels. While Cassandra gives you many knobs to tune to get the right performance out of your system, it takes a lot of study, experience or both, to know just how to tune them for your specific hardware and workload. DataStax published this useful Cassandra JVM tuning article to help provide guidance for users. But my advice for you would be to try a number of JVM configs and find out what works for your production workload. If I were doing this exercise over, I would start at the hardware level again but with better monitoring tools and then I would move to the system/debug/gc logs before looking at the traces. There is a good performance tuning tutorial by Kiyu Gabriel on DataStax Academy that you might find useful. Hope this article helps someone be more productive than I was.



Notes from Cassandra Day Atlanta 2016

I attended the DataStax Cassandra 2016 in Atlanta and took down a ton of notes on things that I found interesting. After going through those notes it occurred to me that many of the nuggets in these notes could be useful to someone else other than myself. So I’ve published the notes below.

The information below is mostly composed of quotes from DataStax engineers and evangelists. Very little context is contained in these notes. However, if you are a beginning-intermediate level Cassandra developer or admin you’ll likely have the needed context already. I did attempt to organize the notes somewhat coherently in order to allow you jump to a section you care about and also to provide some context in the grouping.

Data Modeling Tips

General

  • When migrating to C*, don’t just port over your SQL schema. Be query-driven in the design of your schema.
  • If you are planning on using Spark with C*, start with a C* use case / data model 1st and then use Spark on top of it for analytics
  • Patrick McFadden (DataStax evangelist) on not having certain relational DB constructs in C*: “In the past I’ve scaled SQL DBs by removing referential integrity, indexes and denormalizing. I’ve even built a KV database on an Oracle DB that I was paying of dollars per core for”. The implication here is these constructs bound scalability in relational databases and in explicitly not having them Cassandra’s scalability is unbounded (well, at least theoretically).
  • You can stop partition hotspots by adding an additional column to the partition key (like getting the modulus of another column when divided by the number of nodes) or by increasing the resolution of the key in the case where the partition key is a time span.
  • Using the “IF NOT EXISTS” clause stops an UPSERT from happening automatically / by-default. It also creates a lock on the record while executing, so that multiple writers don’t step on each other trying to insert the same record in a race condition. This is a light weight transaction (LWT). You can also create an LWT when doing a BATCH UPSERT
  • You can set a default TTL (Time To Live) on an individual table. This will apply to all data inserted into the table. A CQL insert can also specify a TTL for the inserted data that overrides the default.
  • DTCS (DateTieredCompactionStrategy) compaction is built for time series data. It groups SSTables together by time so that older tables don’t get compacted and can be efficiently dropped if a TTL is set.
  • CQL Maps allow you to create complex types inside your data store
  • One of the reasons for limiting the size of elements that can be in a CQL collection is because on reads the entire collection must be denormalized as a whole in the JVM so you can add a lot of data to the heap.

Secondary indexes

  • Secondary indexes are not like you have them in relational DBs. They are not built for speed, they are built for access.
  • Secondary indexes get slower the more nodes you have (because of network latencies)
  • Best thing to do with a secondary index is just to test it out and see its performance, but do it on a cluster not your laptop so you can actually see how it would perform in prod. Secondary indexes are good for low cardinality data.

Development Tips

Querying

  • Use the datastax drivers not ODBC drivers because datastax drivers are token aware and therefore can send queries to the right node, removing the need for the coordinator to make excessive network requests depending on the consistency level.
  • Use PreparedStatements for repeated queries. The performance difference is significant.
  • Use ExecuteAsync with PreparedStatements when bulk loading. You can have callbacks on Future objects and use the callbacks for things like detecting a failure and responding appropriately
  • BATCH is not a performance optimization. It leads to garbage collection and hotspots because the data stays in memory on the coordinator.
  • Use BATCH only to update multiple tables at once atomically. An example is if you have a materialized view / inverted index table that needs to be kept in sync with the main table.

General

  • Updates on collections create range tombstones to mark the old version of the collection (map, set, list) as deleted & create the new one. This is important to know because tombstones affect read performance and at a certain time having too many tombstones (100K) can cause a read to fail. http://www.jsravn.com/2015/05/13/cassandra-tombstones-collections.html
  • Cassandra triggers should be used with care and only in specific use cases because you need to consider the distributed nature of C*.

Ops Tips

Replication Settings

  • SimpleStrategy fails if you have multiple datacenters (DCs). Because 50% of your traffic that’s going to the other DC becomes terribly slow. Use NetworkTopologyStrategy instead. You can configure how replication goes to each DC individually, so you can have a table that never gets replicated to the US for example, etc.
  • If you are using the NetworkTopologyStrategy then you should use the Gossiping Property File Snitch to make C* network topology aware instead of the other property file configurator because you dan’t now have to change the file on every node and reboot them.

Hardware Sizing

Recommended Node size
  • 32 GB RAM
  • 8-12 Cores
  • 2 TB SSD
Hardware should be sized appropriately. 64 cores will be hard to use. If you are adding search and/or analytics to the node, you need more RAM: 128+ GB. More memory is needed for search because it keeps its indexes in memory.

Recommendation for Spark & Cassandra on the same node: Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations.

The recommendation is to have no more that 1 TB of data per node. The reason for 2 TB disk despite a 1 TB recommendation is because once over 60% of your disk is full you run a risk of not having enough disk space during compaction. This is especially true if you are using size tiered compaction. With level tiered compaction you can use up to 70% without risk.

Use RAID 0 for your storage configuration. C* does replication for you. You can also use JBOD and C* can intelligently handle failures of some of the disks in your JBOD cluster.

Java Heap Size & Garbage Collection

  • As a general rule of thumb; start with defaults and then walk it up.
  • The ParNew/CMS GC works best with 8 GB
  • The G1GC can manage 20 GB of RAM (Note: Another engineer mentioned to me that 32 GB of RAM is no problem for G1GC). Should not be used if the heap is under 8 GB.
    • Use G1GC with Solr / DSE Search nodes

Memory Usage and Caching

  • Its very important to have ample Off-heap RAM. Some C* data structures such as memtables and bloom filters are Off-heap. You also want to have non-heap RAM for page caching.
  • Row caching can significantly speed up reads because if avoids a table scan (If the page isn’t cached already). However row caching should be judiciously used. Best use case is for tables with a high density of hotspot data. The reason being that on a large table with varying and disparate data and seemingly random reads, you’ll end up with a lot of cache misses which invalidates the point of having a cache.
  • The row cache is filled on reads. memtables are filled on writes.
  • Memtables remain in memory until there is memory pressure based on configuration in the cassandra.yaml, then they are removed from RAM.

Benchmarking

  • Use the Cassandra Stress program that comes with C*.
  • Cassandra Stress can be configured; you can specify number of columns, data size, data model, queries, types of data, cardinality, etc.
  • To model production, use multiple clients & multiple threads for clients in your Benchmarking
  • When stress testing make sure you run it long enough to run into compactions, GC, repairs. Because when you test you want to know what happens in that situation. You can even stress test and introduce failures and see how it responds. You can/should instrument the JVM during stress testing and then go back and look at it.
  • General recommended stress test times is 24 - 48 hrs run times.
  • DSE has solr-stress for testing the solr integration.
For Performance A starting expectation of 3k - 5k transactions per second per core is reasonable.

Interesting Note: A DataStax customer once conducted a stress test that ran for 6-8 weeks for 24 hrs. They were testing to see how changing the compaction strategy impacted their read heavy workload.

General

  • Turn on user authentication. At least Basic Auth. This is good for security and auditing purposes. Also it allows you to not accidentally drop a production table because you thought you were connected to staging.
  • Use TLS if you are talking between DCs across the public internet
  • Don’t just bump up the heap for greater performance or to solve your problems! You’ll have to pay for it later during GC.
  • If you have crappy latency on 1% of your operations you shouldn’t just ignore it. You should try to understand what happened, is it compaction? Is it GC? That way you can address the issue that caused the high latency. Because that 1% could one day be 5%.
  • Why should be have backups? Backups exist to protect against people not machines. data corruption is the primary reason for backups. For example someone accidentally changes all the '1’s in your DB to 'c’s.
  • There is no built in way to count the number of rows in a Cassandra table. The only way to do so is to write a Spark job. You can estimate the table size if you know the amount of data per row and divide the table size by that amount.
  • Use ntpd! C* nodes in a cluster must always be on time because time stamps are important and are used in resolving conflict. Clock drifts cannot be tolerated.
  • Tombstone Hell: queries on partitions with a lot of tombstones require a lot of filtering which can cause performance problems. Compaction gets rid of tombstones.
  • Turn off swap on C* nodes.
  • If C* runs out of memory it just dies. But that’s perfectly ok, because the data is distributed / replicated and you can just bring it back up. In the mean time data will be read from the other nodes.

Cluster Management

  • Don’t put a load balancer in front of your C* cluster.
  • Make sure you are running repairs. Repairs are essentially network defrag and help maintain consistency. Run repairs a little at a time, all the time.
  • If you can model your data to have TTLs you can run repairs much less or not at all.
  • If you never delete your data you can set gc_grace_period to 0.
  • Don’t upgrade your C* versions by replacing an outgoing node with a new node running a newer version of C*. C* is very sensitive when it comes to running mixed versions in production. The older nodes may not be able to stream data to the newer node. Instead you should do an in-place upgrade, i.e. shut down the node (the C* service), upgrade C* and then bring it back up. (https://docs.datastax.com/en/upgrade/doc/upgrade/cassandra/upgradeCassandraDetails.html)
  • When a new node is added in order to increase storage capacity / relieve storage pressure on the existing nodes. Ensure you run nodetool cleanup as the final step. This is because C* won’t automatically reclaim the space of the data streamed out to the new node.

Monitoring, Diagnostic

Monitoring Services for capturing machine level metrics
  • Monit
  • Munin
  • Icinga
  • JMX Metrics
Make sure you are capturing application metrics and deliver them to a dashboard that can integrate app metrics and server metrics

Since you are running on multiple machines it becomes important to aggregate your logs.
  • Logstash
Diagnostic tools
  • htop (a better version of top)
  • iostat
  • dstat
  • strace
  • jstack
  • tcpdump (monitor network traffic, can even see plain text queried coming in)
  • nodetool tpstats (can help diagnose performance problems by showing you which thread pools are overwhelmed / blocked. From there you can make hypotheses are to the cause of the blockage / performance problem)

DSE Offering

DSE Max => Cassandra + Support + Solr + Spark

DSE search

  • Solr fixes a couple of rough edges for C* like joins, ad-hoc querying, fuzzy text searching and secondary indexing problems in larger clusters.
  • DSE search has tight Solr integration with C*. C* stores the data, Solr stores the indexes. CQL searches that use the solr_query expression in the WHERE clause search Solr first for the location of the data to fetch and then queries C* for the actual data.
  • You can checkout killrvideo’s Github for an example of DSE search in action (https://github.com/LukeTillman/killrvideo-csharp/blob/master/src/KillrVideo.Search/SearchImpl/DataStaxEnterpriseSearch.cs)
  • Solr is about a 3x multiplication on CPU and RAM needed for a running regular C*. This is because Solr indexes must live in RAM.
  • Solr can do geospatial searches & can do arbitrary time range searches (which is another rough edge that C* cannot do). E.g. “search for all sales in the past 4 mins 30 seconds”

DSE Spark

  • Spark runs over distributed data stores and schedules analytics jobs on workers on those nodes. DSE Max has Spark integration that just requires the flipping of a switch, no additional config.
  • There’s no need for definition files, workers automatically have access to the tables and Spark is data locality aware so jobs go to the right node.
  • Optionally with DSE search integration you can have search running on the same nodes that have the analytics and data and leverage the search indexes for faster querying instead of doing table scans.
  • With DSE analytics, you can create an analytics DC and have 2-way replication between the operations DC and the analytics DC. 2 way is important because it means that the analytics DC can store the result of its computation to the Cassandra table which then gets replicated back to the ops DC.
  • The Spark jobs / workers have access to more than just the C* table data. They can do just about anything you code. They can pull data from anything; open files, read queues, JDBC data stores, HDFS, etc. And write data back out as well.
  • Recommendation for Spark & Cassandra on the same node. Appropriate resource allocation is important. Having Spark will require more memory. Spark jobs run in their own process and therefore have their own heap that can be tuned and managed separately. Depending on how much performance you are trying to get out of C*, Cassandra should get its 32 GB of RAM as usual. Anything over should then go to Spark. So for example to get great performance you could have a 64 GB RAM system with 32 GB to C* and 32 GB to Spark. Same thing for cores. You should have 12-16 cores; 8-12 for C* and the rest for Spark. If vertical scaling starts to get too expensive you can alternatively add more nodes to meet performance expectations.

Other Notes

Cassandra has a reference implementation called killrvideo. It is an actual website hosted on MS Azure. The address is killrvideo.com. It is written by Luke Tillman in C#. Checkout the source code on Github (https://github.com/LukeTillman/killrvideo-csharp).

Configuring Remote Management and Monitoring on a Cassandra Node with JConsole

In your cassandra-env.sh set

LOCAL_JMX=no

If you want username and password security. Keep the default setting for jmxremote authenticate which is true. Otherwise set it to false:

JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false"

Note: If set to true, enter a username and password for the jmx user in the jmxremote.password.file. Follow the instructions here on how to secure that file. Setting jmxremote.authenticate to true also requires you to pass in the username and password when running a nodetool command, e.g. nodetool status -u cassandra -pw cassandra

Restart the server (if needed).

Connecting to the node using JConsole for monitoring

Find the JConsole jar in your JDK_HOME/bin or JDK_HOME/lib directory. If you don’t have the Java JDK installed it can be downloaded from the Oracle Website.

Double-click the executable jar to run it (or run it from the command line). Select “Remote Process” and enter the following connection string.

 service:jmx:rmi:///jndi/rmi://<target ip address>:7199/jmxrmi

replacing <target ip address> with the address of the machine you intend to manage or monitor.

Note: JConsole may try (and fail) to connect using ssl first. If it does so it will ask if it can connect over a non-encrypted connection. You should answer this prompt in the affirmative and you are good.

Congrats! You now have everything you need to monitor and manage a cassandra node.

For help with how to monitor Cassandra using JMX and with interpreting the metrics see:

The Right Database for the Right Job - Chattanooga Developer Lunch Presentation

Does this sound like you? "OMG!! PostreSQL, Neo4j, Elasticsearch, MongoDB, RethinkDB, Cassandra, SQL Server, Riak, InfluxDB, Oracle NoSQL, SQLite, Hive, Couchbase, CouchDB, DynamoDB. I've got an issue with my current database solution or I'm starting a new project and I don't know what to choose!"

This talk is intended to help you match your data storage needs with suitable solutions from a wide field of contenders. Looking at different data types, structures and interaction patterns, we will try to understand what makes certain data stores better suited than others and how implement polyglot persistence.


Stream Processing With Spring, Kafka, Spark and Cassandra - Part 1

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Part 1 - Overview

Before starting any project I like to make a few drawings, just to keep everything in perspective. My main motivation for this series is to get better acquainted wit Apache Kafka. I just didn't have a chance to use it on some of the projects that I work on in my day to day life, but it's this new technology everybody is buzzing about so I wanted to give it a try. One other thing is that I also didn't get a chance to write Spark Streaming applications, so why not hit two birds with one stone? Here is 10 000 feet overview of the series:

Avoiding the tl;dr

Part of the motivation for splitting is in avoiding the tl;dr effect ;) Now, let's get back to the overview. We'll break down previous image box by box.

Using Spring Boot

We're basically just prototyping here, but to keep everything flexible and in the spirit of the newer architectural paradigms like Microservices the post will be split in 5 parts. The software will also be split so we won't use any specific container for our applications we'll just go with Spring Boot. In the posts we won't go much over the basic, you can always look it up in the official documentation.

Apache Kafka

This is the reason why I'm doing this in the first place. It's this new super cool messaging system that all the big players are using and I want to learn how to put it to everyday use.

Spark Streaming

For some time now I'm doing a lot of stuff with Apache Spark. But somehow I didn't get a chance to look into streaming a little bit better.

Cassandra

Why not?

What this series is about?

It's a year where everybody is talking about voting ... literary everywhere :) so let's make a voting app. In essence it will be a basic word count in the stream. But let's give some context to it while we're at it. We won't do anything complicated or useful. Basically the end result will be total count of token occurrence in the stream. We'll also break a lot of best practices in data modeling etc. in this series.

Series is for people oriented toward learning something new. I guess experienced and battle proven readers will find a ton of flaws in the concept but again most of them are deliberate. One thing I sometimes avoid in my posts is including source code. My opinion is that a lot more remains remembered and learners feel much more comfortable when faced with problems in practice. So I'll just copy paste crucial code parts. One more assumption from my side will be that the readers will be using IntelliJ IDEA. Let's got to Part 2 and see how to setup kafka.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 2

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Setting up Kafka

In this section we'll setup two kafka brokers. We'll also need a zookeeper. If you are reading this my guess is that you don't have one setup already so we'll use the one bundled with kafka. We won't cover everything here. Do read the official documentation for more in depth understanding.

Downloading

Download latest Apache Kafka. In this tutorial we'll use binary distribution. Pay attention to the version of scala if you attend to use kafka with specific scala version. In this tutorial we'll concentrate more on Java. But this will be more important in parts to come. In this section we'll use the tools that ship with Kafka distribution to test everything out. Once again download and extract the distribution of Apache Kafka from official pages.

Configuring brokers

Go into directory where you downloaded and extracted your kafka installation. There is a properties file template and we are going to use properties files to start the brokers. Make two copies of the file:


$ cd your_kafka_installation_dir
$ cp config/server.properties config/server0.properties
$ cp config/server.properties config/server1.properties
Now use your favorite editor to make changes to broker configuration files. I'll just use vi, after all it has been around for 40 years :)

$ vi config/server0.properties
Now make changes (check if they are set) to following properties:

broker.id=0
listeners=PLAINTEXT://:9092
num.partitions=2
log.dirs=/var/tmp/kafka-logs-0
Make the changes for the second node too:

$ vi config/server1.properties

broker.id=1
listeners=PLAINTEXT://:9093
num.partitions=2
log.dirs=/var/tmp/kafka-logs-1

Starting everything up

First you need to start the zookeeper, it will be used to store the offsets for topics. There are more advanced versions of using where you don't need it but for someone just starting out it's much easier to use zookeeper bundled with the downloaded kafka. I recommend opening one shell tab where you can hold all of the running processes. We didn't make any changes to the zookeeper properties, they are just fine for our example:


$ bin/zookeeper-server-start.sh config/zookeeper.properties &
From the output you'll notice it started a zookeeper on default port 2181. You can try telnet to this port on localhost just to check if everything is running fine. Now we'll start two kafka brokers:

$ bin/kafka-server-start.sh config/server0.properties &
$ bin/kafka-server-start.sh config/server1.properties &

Creating a topic

Before producing and consuming messages we need to create a topic for now you can think of it as of queue name. We need to give a reference to the zookeeper. We'll name a topic "votes", topic will have 2 partitions and a replication factor of 2. Please read the official documentation for further explanation. You'll see additional output coming from broker logs because we are running the examples in the background.


$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic votes --partitions 2 --replication-factor 2

Sending and receiving messages with bundled command line tools

Open two additional shell tabs and position yourself in the directory where you installed kafka. We'll use one tab to produce messages. And second tab will consume the topic and will simply print out the stuff that we typed in in the first tab. Now this might be a bit funny, but imagine you are actually using kafka already!

In tab for producing messages run:


$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic votes

In tab for consuming messages run:


$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic votes

Next part

We covered a lot here but writing from one console window to another can be achieved wit far simpler combination of shell commands. In Part 3 we'll make an app that writes to a topic. We'll also use console reader just to verify that our app is actually sending something to topic.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 3

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Writing a Spring Boot Kafka Producer

We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot. The application will essentially be a simple proxy application and will receive a JSON containing the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: spring-boot-kafka-example
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: spring-boot-kafka-example
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Kafka Example
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Next
  18. Project name: spring-boot-kafka-example
  19. The rest is just fine ...
  20. Finish
  21. After creating project check sdk setting, it should be java 8

build.gradle dependencies


compile('org.apache.kafka:kafka_2.11:0.9.0.0')
compile('org.apache.zookeeper:zookeeper:3.4.7')

application.properties


brokerList=localhost:9092
sync=sync
topic=votes

SpringBootKafkaProducer

This is the class where all the important stuff is happening


package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

@Configuration
public class SpringBootKafkaProducer {

@Value("${brokerList}")
private String brokerList;

@Value("${sync}")
private String sync;

@Value("${topic}")
private String topic;

private Producer<String, String> producer;

public SpringBootKafkaProducer() {
}

@PostConstruct
public void initIt() {
Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers", brokerList);

kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("acks", "1");

kafkaProps.put("retries", "1");
kafkaProps.put("linger.ms", 5);

producer = new KafkaProducer<>(kafkaProps);

}

public void send(String value) throws ExecutionException,
InterruptedException {
if ("sync".equalsIgnoreCase(sync)) {
sendSync(value);
} else {
sendAsync(value);
}
}

private void sendSync(String value) throws ExecutionException,
InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
producer.send(record).get();

}

private void sendAsync(String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);

producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
if (e != null) {
e.printStackTrace();
}
});
}
}

SpringBootKafkaExampleApplication

This one will be automatically generated.


package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootKafkaExampleApplication {

public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaExampleApplication.class, args);
}
}

AppBeans

Setup beans for the controller.


package com.example;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppBeans {

@Bean
public SpringBootKafkaProducer initProducer() {
return new SpringBootKafkaProducer();
}
}

Helper beans

Status to return to clients, we'll just send "ok" every time.


package com.example;

public class Status {
private String status;

public Status(String status) {
this.status = status;
}

public Status() {
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}
}
This will be the input to our app

package com.example;

public class Vote {
private String name;

public Vote(String name) {
this.name = name;
}

public Vote() {
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

SpringBootKafkaController

This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote


package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class SpringBootKafkaController {

@Autowired
SpringBootKafkaProducer springBootKafkaProducer;

@RequestMapping("/vote")
public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException {

springBootKafkaProducer.send(vote.getName());

return new Status("ok");
}

}

Checking everything

There should be an active console reader from previous post so we won't cover this. After running the SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the following JSON to http://localhost:8080/vote


{
"name": "Test"
}
If everything was fine you should see the name that you send in this json in the console consumer. In Part 4 we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and push them back to cassandra.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 4

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Consuming Kafka data with Spark Streaming and Output to Cassandra

In this section we are going to use spark streaming to read the data in coming from kafka. We'll also combine it with the data already in cassandra, we're going to do some computation with it and we're going to put the results back to cassandra. The best practice would be to have a spark cluster running but for the sake of simplicity we are going to launch local spark context from a java application and do some processing there. We won't go into configuring Cassandra to run, there is plenty documentation there and it takes just minutes to setup.

Cassandra

Nothing fancy here, just a name of the entity for votes and a number of votes


CREATE KEYSPACE voting
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};

USE voting;

CREATE TABLE votes (name text PRIMARY KEY, votes int);

Let's create a simple java project with gradle for stream processing

  1. File, New Project, Gradle
  2. Project SDK: Java 8
  3. Java
  4. Next
  5. GroupId: spark-kafka-streaming-example
  6. ArtifactId: spark-kafka-streaming-example
  7. Version: 1.0-SNAPSHOT
  8. Next
  9. Use default gradle wrapper
  10. Next
  11. Project name: spark-kafka-streaming-example
  12. The rest is just fine ...
  13. Finish
  14. After creating project check sdk setting, it should be java 8

Let's have a look at the dependencies


group 'spark-kafka-streaming-example'
version '1.0-SNAPSHOT'

apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
mavenCentral()
}

dependencies {
compile('org.apache.spark:spark-core_2.10:1.5.2')
compile('org.apache.spark:spark-streaming_2.10:1.5.2')
compile('org.apache.spark:spark-streaming-kafka_2.10:1.5.2')
compile('com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3')
compile('com.datastax.spark:spark-cassandra-connector-java_2.10:1.5.0-M3')

testCompile group: 'junit', name: 'junit', version: '4.11'
}

Simple Voting Class to go with Cassandra Table

We'll use this class for storing data into cassandra


import java.io.Serializable;

public class Vote implements Serializable {
private String name;
private Integer votes;

public Vote(String name, Integer votes) {
this.name = name;
this.votes = votes;
}

public Vote() {
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getVotes() {
return votes;
}

public void setVotes(Integer votes) {
this.votes = votes;
}
}

Spark streaming with kafka

And finally the code to accept tokens that come in, compare them with data in cassandra and then write them back to cassandra. I didn't spend much time around configuring the class for external parameters, but for the example it's good enough:


import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

public class SparkStreamingExample {

public static JavaSparkContext sc;

public static void main(String[] args) throws IOException {

String brokers = "localhost:9092,localhost:9093";
String topics = "votes";

SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[2]");
sparkConf.setAppName("SparkStreamingExample");
sparkConf.set("spark.cassandra.connection.host",
"127.0.0.1");

JavaStreamingContext jssc = new JavaStreamingContext(
sparkConf,
Durations.seconds(10));

HashSet<String> topicsSet = new HashSet<>(
Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);

JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);

JavaDStream<String> lines =
messages.map(
(Function<Tuple2
<String, String>,
String>) Tuple2::_2);

JavaPairDStream<String, Integer> voteCount = lines
.mapToPair(
(PairFunction<String, String, Integer>) s ->
new Tuple2<>(s, 1)).reduceByKey(
(Function2<Integer, Integer, Integer>)
(i1, i2) ->i1 + i2);

sc = jssc.sparkContext();

voteCount.foreachRDD((v1, v2) -> {
v1.foreach((x) -> {
CassandraTableScanJavaRDD<CassandraRow> previousVotes =
javaFunctions(sc)
.cassandraTable("voting", "votes")
.where("name = '" + x._1() + "'");

Integer oldVotes = 0;
if (previousVotes.count() > 0) {
oldVotes =
previousVotes.first().getInt("votes");
}

Integer newVotes = oldVotes + x._2();

List<Vote> votes = Arrays.asList(
new Vote(x._1(), newVotes));
JavaRDD<Vote> rdd = sc.parallelize(votes);

javaFunctions(rdd)
.writerBuilder("voting", "votes", mapToRow(Vote.class))
.saveToCassandra();
});

return null;
});

voteCount.print();

jssc.start();
jssc.awaitTermination();
}
}

And that's it

You can check how data changes by running select statements from voting table. In Part 5 we are going to make a simple spring boot project that displays and sorts the voting data.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 5

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Displaying Cassandra Data With Spring Boot

Now that we have our voting data in Cassandra let's write a simple Spring Boot project that simply gathers all the data from cassandra sorts them and displays to user.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: boot-cassandra-data-show
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: boot-cassandra-data-show
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Display Cassandra Data
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Template Engines - Mustache
  18. Next
  19. Project name: boot-cassandra-data-show
  20. The rest is just fine ...
  21. Finish
  22. After creating project check sdk setting, it should be java 8

Cassandra dependencies


compile('com.datastax.cassandra:cassandra-driver-core:2.1.9')

Vote class

We'll use this class to map rows from cassandra.


package com.example;

import java.io.Serializable;

public class Vote implements Serializable {
private String name;
private Integer votes;

public Vote(String name, Integer votes) {
this.name = name;
this.votes = votes;
}

public Vote() {
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getVotes() {
return votes;
}

public void setVotes(Integer votes) {
this.votes = votes;
}
}

application.properties


server.port = 8090
contactPoint = 127.0.0.1
keyspace = voting

CassandraSessionManager

This bean is used to setup connection towards Cassandra

package com.example;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Configuration
public class CassandraSessionManager {

private Session session;
private Cluster cluster;

@Value("${contactPoint}")
private String contactPoint;

@Value("${keyspace}")
private String keyspace;

public CassandraSessionManager() {

}

public Session getSession() {
return session;
}

@PostConstruct
public void initIt() {
cluster = Cluster.builder().addContactPoint(
contactPoint).build();
session = cluster.connect(keyspace);
}

@PreDestroy
public void destroy() {
if (session != null) {
session.close();
}
if (cluster != null) {
cluster.close();
}
}
}

BootCassandraDataShowApplication

Automatically generated ...

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BootCassandraDataShowApplication {

public static void main(String[] args) {
SpringApplication.run(
BootCassandraDataShowApplication.class, args);
}
}

AppBeans

Bean for holding configured objects.


package com.example;

import com.datastax.driver.core.Session;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppBeans {

@Bean
public Session session() {
return sessionManager().getSession();
}

@Bean
public CassandraSessionManager sessionManager() {
return new CassandraSessionManager();
}
}

Web Controller


package com.example;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;

@Configuration
@Controller
public class WelcomeController {

@Autowired
Session session;

@RequestMapping("/")
public String welcome(Map<String, Object> model) {

final ResultSet rows = session.execute("SELECT * FROM votes");

ArrayList results = new ArrayList<>();

for (Row row : rows.all()) {
results.add(new Vote(
row.getString("name"),
row.getInt("votes")
));
}

Collections.sort(results, (a, b) ->
b.getVotes().compareTo(a.getVotes()));

model.put("results", results);

return "welcome";
}
}

Template to show the results


<!DOCTYPE html>
<html lang="en">
<body>

<h1>Voting results:</h1>
<br/>
{{#results}}
<strong>{{this.name}}</strong> {{this.votes}} <br/>
{{/results}}

</body>
</html>

That's all folks

Now this app might not seem as a lot, but there's a kafka cluster that receives messages comming in from a spring boot app that exposes REST interface. Messages that come in from kafka are then processed with Spark Streaming and then sent to Cassandra. There is another Spring Boot app that sorts and displays results to the users. This small tutorial covers most of the cool java/big data technologies now-days. Special thanks to the readers that went through all five parts of this tutorial ;)

Cassandra TIme Series Bucketing

Intro

Bucketing is one of the most important techniques when working with time series data in Cassandra. This post has it's roots in two very popular blog entries:

The posts are very well written and the pretty much describe all of the standard techniques when it comes down to working with time series data in Cassandra. But to be honest there isn't all that much code in them. This is partly to a fact that almost every project has it's own specifics and from my experience it often happens that even within a relatively small team there will be multiple implementations on how to bucket and access the time series data.

The Case for Bucketing

For some time now I'm in the world if IoT and I find that explaining everything with a help of a simple temperature sensor is the best method to discuss the subject. Previously mentioned articles are also a good read. This section is sort of a warm up. Theoretically in most of the use cases we'll want to access temperature readings by some sensor Id and we know where this sensor is located. In the most simple case sensor id becomes the long row in cassandra and the readings are stored in it and kept sorted by time etc. However in some cases the temperature may be read very often and this could cause the wide row to grow to a proportion that is not manageable by cassandra so the data has to be split among multiple long rows. The easiest method to make this split is to make multiple long rows based on the measurement timestamp.

How big should my buckets be?

It may vary from project to project, but it depends on two important factors. How many readings are you storing per single measurement and how often the measurement is happening. For instance if you are recording a reading once per day you probably don't even need the bucketing. Also if you are recording it once per hour the project you are working on probably wont't last long enough for you to run into problem. It applies to seconds too, but only for the most trivial case where you are making a single reading. If you go into frequencies where something is happening on the milliseconds level you will most definetly need bucketing. The most complex project I worked up until now had time bucketing on a level of a single minute. meaning every minute, new bucket. But that project is not in the IoT world, In that world I'm using partitions on a month basis.

10 000 feet Bucketing View

Main problem is how to calculate the bucket based on measurement time stamp. Also keep in mind there might be differences between the timezones, in a distributed system a very advisable practice is to save everything in the UTC format. If we decided that wee need bucketing per day it could be something as simple as the following:


FastDateFormat dateFormat = FastDateFormat.getInstance(
"yyyy-MM-dd", TimeZone.getTimeZone("UTC"));

public String dateBucket(Date date) {
return dateFormat.format(date;
}
That's it, combine this with your sensor Id and you get buckets on a day level basis. Now the problem is how to retrieve the measurements from buckets. Especially if you have to fetch the measurements across multiple buckets. We'll go over this in the next section.

Anything goes

Bare in mind that you should keep buckets in time series data easy to maintain. Also try to avoid having multiple implementation for the same thing in your code base. This section will not provide 100% implemented examples but will be more on a level of a pseudo code.

When you are fetching the data from the buckets, you will have two types of query. One is to fetch data out from the bucket without any restrictions on measurement time stamp. The other is when you will want to start from a certain position within the bucket. Again there is a question of ordering and sorting the retrieved data. I worked in systems having all sorts of practices there, most of the time reversing was done with a help of a specific boolean flag but my opinion is this should be avoided. It's best to stick to the from and to parameters and order the data according to them. i.e.


from: 01/01/2016
to: 02/02/2016
returns: ascending

from: 02/02/2016
to: 01/01/2016
returns: descending
That way you don't have to break you head and think about various flags passed over the levels in your code.

Here is a bit of pseudo code:


// constructor of your iterator object

startPartition = dateBucket(from);
endPartition = dateBucket(to);

lastFetchedToken = null;

bucketMoveCount = 0;

String statement = "SELECT * FROM readings"

// from past experience, somehow the driver takes out data the fastest
// if it fetches 3000 items at once, would be interesting to drill down
// why is this so :)

int fetchSize = 3000;

if (from.isBefore(to)) {
select = statement + " ORDER BY measurement_timestamp ASC LIMIT " + fetchSize;
selectFromBoundary = statement + " AND measurement_timestamp > ? ORDER BY measurement_timestamp ASC LIMIT " + fetchSize;

partitionDiff = -1f;
} else {
selectNormal = statement + " LIMIT " + fetchSize;
selectFromBoundary = statement + " AND measurement_timestamp < ? LIMIT " + fetchSize;

partitionDiff = 1f;
}
Partition could move by hour, day, minute. It all depends on how you decide to implement it. You will have to do some time based calculations there I recommend using Joda-Time there. Now when you defined how init of an iterator looks like, it's time to do some iterations over it:

public List<Row> getNextPage() {

List<Row> resultOut = new ArrayList<>();

boolean continueFromPreviousBucket = false;

do {
ResultSet resultSet =
lastFetchedToken == null ?
session.execute(new SimpleStatement(select, currentBucket)) :
session.execute(new SimpleStatement(selectFromBoundary, currentBucket, lastToken));

List<Row> result = resultSet.all();

if (result.size() == fetchSize) {
if (continueFromPreviousBucket) {
resultOut.addAll(result.subList(0, fetchSize - resultOut.size()));
} else {
resultOut = result;
}

lastFetchedToken = resultOut.get(resultOut.size() - 1).getUUID("measurement_timestamp");

} else if (result.size() == 0) {
currentBucket = calculateNextBucket();
bucketMoveCount++;

} else if (result.size() < fetchSize) {
currentBucket = calculateNextBucket();
bucketMoveCount++;

lastFetchedToken = null;

if (continueFromPreviousBucket) {
resultOut.addAll(result.subList(0, Math.min(result.size(), fetchSize - resultOut.size())));
} else {
resultOut = result;
}

continueFromPreviousBucket = true;
}

if (resultOut.size() == fetchSize
|| bucketMoveCount >= MAX_MOVE_COUNT
|| Math.signum(currentBucket.compareTo(endPartition)) != okPartitionDiff) {
break;
}

} while (true);

return result;
}

This is just a high level overview of how to move among the buckets. Actual implementation would actually be significantly different from project to project. My hope for this post is that you give the problems I faced a thought before you run into them.

Spring Data Cassandra vs. Native Driver

Intro

For some time now spring data with cassandra is getting more and more popular. My main concern with the framework is performance characteristics when compared to native cql driver. After all with the driver everything is under your control and one can probably squeeze much more juice out of cluster. O.k. I admit it's not always about performance. If that would be the case we would all be writing software in C or assembler. But still I think it's a good practice to be aware of the drawbacks.

To be honest spring data cassandra is relatively new to me. I did the performance comparison on the lowest level without using repositories and other high level concepts that come with spring data cassandra. My focus in this post is more on the generics that decode the data that comes out from the driver. To make a comparison I'm going to use a simple cassandra table (skinny row), then I'm going to make query after query (5000 and 10000) towards cassandra and after that I'll decode results. Once again the focus in this post is not on performance characteristics of higher order functionalities like paged queries etc. I just wanted to know by a rule of thumb what can I expect from spring data cassandra.

Setup


-- simple skinny row
CREATE TABLE activities (
activity_id uuid,
activity_model_id bigint,
activity_state text,
asset_id text,
attrs map<text, text>,
creation_time timestamp,
customer_id text,
end_time timestamp,
last_modified_time timestamp,
person_id text,
poi_id text,
start_time timestamp,
PRIMARY KEY (activity_id)
);

To eliminate all possible effects, I just used single skinny row:

activity_id 72b493f0-e59d-11e3-9bd6-0050568317c1
activity_model_id 66
activity_state DONE
asset_id 8400848739855200000
attrs {
'businessDrive': '1:1',
'customer': '4:test_test_test',
'distance': '3:180',
'endLocation': '6:15.7437466839,15.9846853333,0.0000000000',
'fromAddress': '4:XX1',
'locked': '1:0',
'reason': '4:Some reason 2',
'startLocation':
'6:15.7364385831,15.0071729736,0.0000000000',
'toAddress': '4:YY2'
}
creation_time 2014-05-27 14:50:14+0200
customer_id 8400768435301400000
end_time 2014-05-27 12:15:40+0200
last_modified_time 2014-05-29 21:30:44+0200
person_id 8401111750365200000
poi_id null
start_time 2014-05-27 12:13:05+0200
This row is fetched every time, to detect differences We'll see how long the iterations last. Network and cluster is also out of scope so everything was tested on local running datastax cassandra community (2.0.16) instance.

The code

To separate all possible interfering effects I used two separate projects. I had a situation where I used an old thrift api together with cql driver and it significantly affected performance. And it required additional configuration parameters etc. The main code snippets are located on gist. This is not the focus here, but if somebody is interested:

spring-data
native-drivers

Results in milliseconds


3 fields - 5000 items
spring-data
5381
5282
5385
avg: 5339

driver
4426
4280
4469
avg: 4390

result: driver faster 21.6%

3 fields - 10000 items
spring-data
8560
8133
8144
avg: 8279

driver
6822
6770
6875
avg: 6822

result: driver faster 21.3%

12 fields - 5000 items
spring-data
5911
5920
5928
avg: 5920 - 10.88 % slower than with 3 fields!

driver
4687
4669
4606
avg: 4654 - 6 % slower than with 3 fields

result: driver faster 27%

Conclusions

Spring data cassandra may be very interesting if you are interested to learn something new. It might also have very positive development effects when prototyping or doing something similar. I didn't test the higher order functionalities like pagination etc. This was just a rule of a thumb test to see what to expect. Basically the bigger the classes that you have to decode the bigger the deserialization cost. At least this is the effect I'm noticing in my basic tests.

Follow up with Object Mapping available in Cassandra driver 2.1

There was an interesting follow up disuccion on reddit. By a proposal from reddit user v_krishna another candidate was added to comparison Object-mapping API.

Let's see the results:


3 fields - 5000 items
spring-data
5438
5453
5576
avg: 5489

object-map
5390
5299
5476
avg: 5388

driver
4382
4410
4249
avg: 4347

conclusion
- driver 26% faster than spring data
- object map just under 2% faster than spring data

3 fields - 10000 items
spring-data
8792
8507
8473
avg: 8591

object-map
8435
8494
8365
avg: 8431

driver
6632
6760
6646
avg: 6679

conclusion
- driver faster 28.6% than spring data
- object mapping just under 2% faster than spring data

12 fields 5000 items
spring-data
6193
5999
5938
avg: 6043

object-map
6062
5936
5911
avg: 5970

driver
4910
4955
4596
avg: 4820

conclusion
- driver 25% faster than spring data
- object mapping 1.2% faster than spring data

To keep everything fair, there was some deviation in test runs when compared to previous test, here are deviations:


comparison with first run:
3 fields - 5000 items
spring-data
avg1: 5339
avg2: 5489
2.7% deviation

driver
avg1: 4390
avg2: 4347
1% deviation

3 fields - 10000 items
spring-data
avg1: 8279
avg2: 8591
3.6% deviation

driver
avg1: 6822
avg2: 6679
2.1% deviation

12 fields 5000 items
spring-data
avg1: 5920
avg2: 6043
2% deviation

driver
avg1: 4654
avg2: 4820
3.4% deviation
Object mapping from spring data seems to be just a bit slower then object mapping available in new driver. I can't wait to see the comparison of two in future versions. Initially I was expecting around 5-10% percent worse performance when compared to object mapping capabilities. It surprised me a bit that the difference was more on the level of 25%. So if you are planning on using object mapping capabilities there is a performance penalty.

Enhance Apache Cassandra Logging

Cassandra usually output all its logs in a system.log file. It uses log4j old 1.2 version for cassandra 2.0, and since 2.1, logback, which of course use different syntax :)
Logs can be enhanced with some configuration. These explanations works with Cassandra 2.0.x and Cassandra 2.1.x, I haven’t tested others versions yet.

I wanted to split logs in different files, depending on their “sources” (repair, compaction, tombstones etc), to ease debugging, while keeping the system.log as usual.

For example, to declare 2 new files to handle, say Repair and Tombstones logs :

Cassandra 2.0 :

You need to declare each new log files in log4j-server.properties file.

[...]
## Repair
log4j.appender.Repair=org.apache.log4j.RollingFileAppender
log4j.appender.Repair.maxFileSize=20MB
log4j.appender.Repair.maxBackupIndex=50
log4j.appender.Repair.layout=org.apache.log4j.PatternLayout
log4j.appender.Repair.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
## Edit the next line to point to your logs directory
log4j.appender.Repair.File=/var/log/cassandra/repair.log

## Tombstones
log4j.appender.Tombstones=org.apache.log4j.RollingFileAppender
log4j.appender.Tombstones.maxFileSize=20MB
log4j.appender.Tombstones.maxBackupIndex=50
log4j.appender.Tombstones.layout=org.apache.log4j.PatternLayout
log4j.appender.Tombstones.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
### Edit the next line to point to your logs directory
log4j.appender.Tombstones.File=/home/log/cassandra/tombstones.log

Cassandra 2.1 :

It is in the logback.xml file.

  <appender name="Repair" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>${cassandra.logdir}/repair.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
      <fileNamePattern>${cassandra.logdir}/system.log.%i.zip</fileNamePattern>
      <minIndex>1</minIndex>
      <maxIndex>20</maxIndex>
    </rollingPolicy>

    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
      <maxFileSize>20MB</maxFileSize>
    </triggeringPolicy>
    <encoder>
      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
      <!-- old-style log format
      <pattern>%5level [%thread] %date{ISO8601} %F (line %L) %msg%n</pattern>
      -->
    </encoder>
  </appender>

  <appender name="Tombstones" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>${cassandra.logdir}/tombstones.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
      <fileNamePattern>${cassandra.logdir}/tombstones.log.%i.zip</fileNamePattern>
      <minIndex>1</minIndex>
      <maxIndex>20</maxIndex>
    </rollingPolicy>

    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
      <maxFileSize>20MB</maxFileSize>
    </triggeringPolicy>
    <encoder>
      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
      <!-- old-style log format
      <pattern>%5level [%thread] %date{ISO8601} %F (line %L) %msg%n</pattern>
      -->
    </encoder>
  </appender>

Now that theses new files are declared, we need to fill them with logs. To do that, simply redirect some Java class to the good file. To redirect the class org.apache.cassandra.db.filter.SliceQueryFilter, loglevel WARN to the Tombstone file, simply add :

Cassandra 2.0 :

log4j.logger.org.apache.cassandra.db.filter.SliceQueryFilter=WARN,Tombstones

Cassandra 2.1 :

<logger name="org.apache.cassandra.db.filter.SliceQueryFilter" level="WARN">
    <appender-ref ref="Tombstones"/>
</logger>

It’s a on-the-fly configuration, so no need to restart Cassandra !
Now you will have dedicated files for each kind of logs.

A list of interesting Cassandra classes :

org.apache.cassandra.service.StorageService, WARN : Repair
org.apache.cassandra.net.OutboundTcpConnection, DEBUG : Repair (haha, theses fucking stuck repair)
org.apache.cassandra.repair, INFO : Repair
org.apache.cassandra.db.HintedHandOffManager, DEBUG : Repair
org.apache.cassandra.streaming.StreamResultFuture, DEBUG : Repair 
org.apache.cassandra.cql3.statements.BatchStatement, WARN : Statements
org.apache.cassandra.db.filter.SliceQueryFilter, WARN : Tombstones

You can find from which java class a log message come from by adding “%c” in log4j/logback “ConversionPattern” :

org.apache.cassandra.db.ColumnFamilyStore INFO  [BatchlogTasks:1] 2015-09-18 16:43:48,261 ColumnFamilyStore.java:939 - Enqueuing flush of batchlog: 226172 (0%) on-heap, 0 (0%) off-heap
org.apache.cassandra.db.Memtable INFO  [MemtableFlushWriter:4213] 2015-09-18 16:43:48,262 Memtable.java:347 - Writing Memtable-batchlog@1145616338(195.566KiB serialized bytes, 205 ops, 0%/0% of on/off-heap limit)
org.apache.cassandra.db.Memtable INFO  [MemtableFlushWriter:4213] 2015-09-18 16:43:48,264 Memtable.java:393 - Completed flushing /home/cassandra/data/system/batchlog/system-batchlog-tmp-ka-4267-Data.db; nothing needed to be retained.  Commitlog position was ReplayPosition(segmentId=1442331704273, position=17281204)

You can disable “additivity” (i.e avoid adding messages in system.log for example) in log4j for a specific class by adding :

log4j.additivity.org.apache.cassandra.db.filter.SliceQueryFilter=false

For logback, you can add additivity=”false” to <logger .../> elements.

To migrate from log4j logs to logback.xml, you can look at http://logback.qos.ch/translator/

Sources :

Note: you can add http://blog.alteroot.org/feed.cassandra.xml to your rss aggregator to follow all my Cassandra posts :)

Analysis of Cassandra powered Greenhouse with Apache Spark

Intro

In the previous post we went over the steps for gathering the data on the Rasperry pi.

  1. Gather Data on Raspberry Pi with Cassandra and Arduino
  2. Arduino Greenhouse
In this post I'm going to go over the steps necessary to get the data into Cassandra and then process it with Apache Spark.

Cassandra queries


-- we'll keep the data on just one node
CREATE KEYSPACE home
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};

-- create statement, bucketed by date
CREATE TABLE greenhouse (
source text,
day text,
time timestamp,
temperaturein decimal,
temperatureout decimal,
temperaturecheck decimal,
humidity decimal,
light int,
PRIMARY KEY ((source, day), time)
)
WITH CLUSTERING ORDER BY (time DESC);

-- example insert, just to check everything out
INSERT INTO greenhouse (
source, day, time, temperaturein,
temperatureout, temperaturecheck,
humidity, light)
VALUES ('G', '2015-04-04', dateof(now()), 0,
0, 0, 0, 0);

-- check if everything is inserted
SELECT * FROM greenhouse WHERE source = 'G' AND day = '2015-04-19';

Analysis results

I wanted to keep the partitions relatively small because I didn't know how RaspberryPi is going to handle the data. Timeout is possible if the rows get to big so I went with the partitioning the data by day. The analysis of the April showed that the project paid off. Here are the results of analysis:

Total Data points(not much, but it's a home DIY solution after all)
172651

First record
Measurement{source='G', day='2015-04-04', time=Sat Apr 04 17:04:41 CEST 2015, temperaturein=11.77, temperatureout=10.43, temperaturecheck=15.0, humidity=46.0, light=57}

Last record
Measurement{source='G', day='2015-05-04', time=Mon May 04 09:37:35 CEST 2015, temperaturein=22.79, temperatureout=20.49, temperaturecheck=23.0, humidity=31.0, light=68}

Cold nights(bellow 2 C outside)
2015-04-06
2015-04-07
2015-04-10
2015-04-16
2015-04-17
2015-04-18
2015-04-19
2015-04-20

Lowest In
Measurement{source='G', day='2015-04-06', time=Mon Apr 06 06:22:25 CEST 2015, temperaturein=2.28, temperatureout=2.39, temperaturecheck=4.0, humidity=41.0, light=8}

Highest In
Measurement{source='G', day='2015-04-22', time=Wed Apr 22 14:52:26 CEST 2015, temperaturein=75.53, temperatureout=43.53, temperaturecheck=71.0, humidity=21.0, light=84}

Average In
19.45

Lowest Out
Measurement{source='G', day='2015-04-20', time=Mon Apr 20 04:42:16 CEST 2015, temperaturein=4.48, temperatureout=-2.88, temperaturecheck=6.0, humidity=31.0, light=0}

Highest Out
Measurement{source='G', day='2015-04-22', time=Wed Apr 22 15:58:32 CEST 2015, temperaturein=57.69, temperatureout=45.07, temperaturecheck=56.0, humidity=24.0, light=71}

Average Out
14.71

Average Difference
4.75

Biggest Diff
Measurement{source='G', day='2015-04-20', time=Mon Apr 20 15:11:53 CEST 2015, temperaturein=69.93, temperatureout=28.36, temperaturecheck=62.0, humidity=21.0, light=83}

The code

  1. Spark analysis code

Gather Data on Raspberry Pi with Cassandra and Arduino

Intro

In the previous post we went over the steps necessary to make a sensor for a small greenhouse for the balcony.

  1. Arduino Greenhouse
In this section we are going to concentrate on how to gather the data coming in from the Greenhouse. The approach is applicable for any kind of telemetry data or something similar. The parts list is simpler than in the previous section but as a "concentrator" node we are going to use a raspberry pi. Here are the parts:
  • Arduino Uno
  • USB cable
  • Raspberry PI
  • nRF24L01+
  • 7 Wires
To install Arduino libraries please consult the previous post. The wiring for the nRF24 is the same as in the previous post.

Persisting the data

To persist the data I opted for Apache Cassandra. It's a good fit even for a low powered Raspberry Pi. Cassandra is java technology. So before installing Cassandra you have to install java. It's all written up nicely in the following posts:

  1. Install Java
  2. Installing Cassandra

Overview of the process

The code

  1. Data Gathering in Arduino
  2. Python serial to Cassandra bridge
To be continued ...

How to change Cassandra compaction strategy on a production cluster

I’ll talk about changing Cassandra CompactionStrategy on a live production Cluster.
First of all, an extract of the Cassandra documentation :

Periodic compaction is essential to a healthy Cassandra database because Cassandra does not insert/update in place. As inserts/updates occur, instead of overwriting the rows, Cassandra writes a new timestamped version of the inserted or updated data in another SSTable. Cassandra manages the accumulation of SSTables on disk using compaction. Cassandra also does not delete in place because the SSTable is immutable. Instead, Cassandra marks data to be deleted using a tombstone.

By default, Cassandra use SizeTieredCompactionStrategyi (STC). This strategy triggers a minor compaction when there are a number of similar sized SSTables on disk as configured by the table subproperty, 4 by default.

Another compaction strategy available since Cassandra 1.0 is LeveledCompactionStrategy (LCS) based on LevelDB.
Since 2.0.11, DateTieredCompactionStrategy is also available.

Depending on your needs, you may need to change the compaction strategy on a running cluster. Change this setting involves rewrite ALL sstables to the new strategy, which may take long time and can be cpu / i/o intensive.

I needed to change the compaction strategy on my production cluster to LeveledCompactionStrategy because of our workflow : lot of updates and deletes, wide rows etc.
Moreover, with the default STC, progressively the largest SSTable that is created will not be compacted until the amount of actual data increases four-fold. So it can take long time before old data are really deleted !

Note: You can test a new compactionStrategy on one new node with the write_survey bootstrap option. See the datastax blogpost about it.

The basic procedure to change the CompactionStrategy is to alter the table via cql :

cqlsh> ALTER TABLE mykeyspace.mytable  WITH compaction = { 'class' :  'LeveledCompactionStrategy'  };

If you run alter table to change to LCS like that, all nodes will recompact data at the same time, so performances problems can occurs for hours/days…

A better solution is to migrate nodes by nodes !

You need to change the compaction locally on-the-fly, via the JMX, like in write_survey mode.
I use jmxterm for that. I think I’ll write articles about all theses jmx things :)
For example, to change to LCS on mytable table with jmxterm :

~ java -jar jmxterm-1.0-alpha-4-uber.jar --url instance1:7199                                                      
Welcome to JMX terminal. Type "help" for available commands.
$>domain org.apache.cassandra.db
#domain is set to org.apache.cassandra.db
$>bean org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies
#bean is set to org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies
$>get CompactionStrategyClass
#mbean = org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies:
CompactionStrategyClass = org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
$>set CompactionStrategyClass "org.apache.cassandra.db.compaction.LeveledCompactionStrategy" 
#Value of attribute CompactionStrategyClass is set to "org.apache.cassandra.db.compaction.LeveledCompactionStrategy" 

A nice one-liner :

~ echo "set -b org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies CompactionStrategyClass org.apache.cassandra.db.compaction.LeveledCompactionStrategy" | java -jar jmxterm-1.0-alpha-4-uber.jar --url instance1:7199

On next commitlog flush, the node will start it compaction to rewrite all it mytable sstables to the new strategy.

You can see the progression with nodetool :

~ nodetool compactionstats
pending tasks: 48
compaction type        keyspace           table       completed           total      unit  progress
Compaction        mykeyspace       mytable      4204151584     25676012644     bytes    16.37%
Active compaction remaining time :   0h23m30s

You need to wait for the node to recompact all it sstables, then change the strategy to instance2, etc.
The transition will be done in multiple compactions if you have lots of data. By default new sstables will be 160MB large.

you can monitor you table with nodetool cfstats too :

~ nodetool cfstats mykeyspace.mytable
[...]
Pending Tasks: 0
        Table: sort
        SSTable count: 31
        SSTables in each level: [31/4, 0, 0, 0, 0, 0, 0, 0, 0]
[...]

You can see the 31/4 : it means that there is 31 sstables in L0, whereas cassandra try to have only 4 in L0.

Taken from the code ( src/java/org/apache/cassandra/db/compaction/LeveledManifest.java )

[...]
// L0: 988 [ideal: 4]
// L1: 117 [ideal: 10]
// L2: 12  [ideal: 100]
[...]

When all nodes have the new strategy, let’s go for the global alter table. /!\ If a node restart before the final alter table, it will recompact to default strategy (SizeTiered)!

~ cqlsh 
cqlsh> ALTER TABLE mykeyspace.mytable  WITH compaction = { 'class' :  'LeveledCompactionStrategy'  };

Et voilà, I hope this article will help you :)

My latest Cassandra blogpost was one year ago… I have several in mind (jmx things !) so stay tuned !

Cassandra Community Handling 100 000 req per second

Intro

Recently I got an assignment to prove that Cassandra cluster can hold up to 100 000 requests per second. Also all this had to be done on the budget and with not so much time spent on development of the whole application. This setup had to be as close to the real thing as possible. We will go trough the details soon. Here is just the basic overview of the experiment:

Amazon

Generating and handling the load on this scale requires the infrastructure that is usually not available within a personal budget so I turned to Amazon EC2. I listened about the EC2 for quite some time now and It turned out really easy to use. Basically All you have to do is to setup a security group and store the "pem" file for that security group. Really easy and if anybody didn't try it yet there is a free micro instance available for a whole year after registering. I won't go into details of how to setup the security group. It's all described in the DataStax documentation. Note that the security definition is a bit extensive and that defining the port range from 1024-65535 is sufficient for an inter group communication and I didn't expose any ports to the public as described in the documentation. The second part is generating the key pair. In the rest of the document I'll reference this file as "cassandra.pem".

Load

Generating the load on that scale is not as easy as it might seem. After some searching I've stumbled upon the following. So I came to a conclusion that the best solution is to use Tsung. I've setup the load generating machines with the following snippet. Note that I've placed the "cassandra.pem" file on the node from which I'll start running tsung. Read the node addresses from the aws console. The rest is pretty much here:


# do this only for the machine from which you'll initiate tsung
scp -i cassandra.pem cassandra.pem ec2-user@tsung_machine:~

# connect to every load machine and install erlang and tsung
ssh -i cassandra.pem ec2-user@every_load_machine

# repeat this on every node
sudo yum install erlang

wget http://tsung.erlang-projects.org/dist/tsung-1.5.1.tar.gz
tar -xvzf tsung-1.5.1.tar.gz
cd tsung-1.5.1
./configure
make
sudo make install

# you can close other load nodes now
# go back to the first node. and move cassandra.pem to id_rsa
mv cassandra.pem .ssh/id_rsa

# now make an ssh connection from first tsung node to every
# load generating machine (to add the host key) so that
# the first tsung node won't have any problem connecting to
# other nodes and issuing erlang commands to them
ssh ip-a-b-c-d
exit

# create the basic.xml file on the first tsung node
vi basic.xml

The second part with the load generating machines is to edit the basic.xml file. To make it more interesting we are going to send various kinds of messages with a timestamp. The users list will be predefined in a file userlist.csv. Note that the password is the same for all the users, you can adapt this to your own needs or completely remove the password:


0000000001;pass
0000000002;pass
0000000003;pass
...
...
...

The tsung tool is well documented, the configuration I used is similar to this:


<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE tsung SYSTEM "/usr/share/tsung/tsung-1.0.dtd" []>
<tsung loglevel="warning">

<clients>
<client host="ip-a-b-c-d0" cpu="8" maxusers="25"/>
<client host="ip-a-b-c-d1" cpu="8" maxusers="25"/>
<client host="ip-a-b-c-d2" cpu="8" maxusers="25"/>
<client host="ip-a-b-c-d3" cpu="8" maxusers="25"/>
</clients>

<servers>
<server host="app-servers-ip-addresses-internal" port="8080" type="tcp"/>
<!-- enter the rest of the app servers here-->
</servers>

<load>
<arrivalphase phase="1" duration="11" unit="minute">
<users maxnumber="100" arrivalrate="100" unit="second"/>
</arrivalphase>
</load>

<options>
<option name="file_server" id='id' value="userlist.csv"/>
</options>

<sessions>
<session probability="100" name="load_session" type="ts_http">
<setdynvars sourcetype="file" fileid="id" delimiter=";" order="iter">
<var name="username" />
<var name="pass" />
</setdynvars>
<setdynvars sourcetype="eval"
code="fun({Pid,DynVars}) -&gt;
{Mega, Sec, Micro} = os:timestamp(),
(Mega*1000000 + Sec)*1000 + round(Micro/1000)
end.
">
<var name="millis" />
</setdynvars>
<for from="1" to="10000000" var="i">
<request subst="true">
<http url="/m?c=%%_username%%%%_millis%%ABC41.7127837,42.71278370000.0" method="GET"/>
</request>
<request subst="true">
<http url="/m?c=%%_username%%%%_millis%%DEF43.7127837,44.71278370000.0" method="GET"/>
</request>
<request subst="true">
<http url="/m?c=%%_username%%%%_millis%%GHI45.7127837,46.71278370000.0" method="GET"/>
</request>
<request subst="true">
<http url="/m?c=%%_username%%%%_millis%%JKL47.7127837,48.71278370000.0" method="GET"/>
</request>
<request subst="true">
<http url="/m?c=%%_username%%%%_millis%%MNO49.7127837,50.71278370000.0" method="GET"/>
</request>
</for>
</session>
</sessions>
</tsung>

Resources

  • 3x c3.xlarge
  • 1x c4.xlarge
Note I've added c4 node because I was limited on the amazon with the number of instances I could boot.

App

I've spent most of the time on the app part when developing. The basics for the component handling the requests was netty listener. In one of my previous posts I described how to use netty to handle http requests and acknowledge them with HELLO message. Here I acknowledged them with OK.

The most complicated part with the messages was sending them to cassandra as fast as possible. The fastest way to send them is to use executeAsync. Initially I had trouble with it where I was loosing messages. Some of the issues were due to concurrency. Some were due to poor understanding of the DataStax driver.

Concurrency - Basically what I was doing was that I tried to save on instantiating the BoundStatement instances because of the overal speed. The BoundStatement is not thread safe and after calling the bind method it returns "this". It took me some time to figure this out because when used in loops this behavior is not dangerous. Anyway, thanks to colleague I figured it out.


// always instantiate new in concurrent code
// don't reuse and make multiple calls with .bind()!

BoundStatement bs = new BoundStatement(insertStatement);

Asynchronous execution - also a bit tricky. The executeAsync returns a future. Initially I was just adding it to Futures.


// don't do this under heavy load with the result of executeAsync
// in Cassandra you will start to loose data

Futures.addCallback(future, ...

After some trial and error I found a pattern where I didn't loose any data:


// here we are going to keep the futures
private ArrayBlockingQueue<ResultSetFuture> queue =
new ArrayBlockingQueue<>(10000);

// in the handling code
queue.add(session.executeAsync(bs));

// when reaching 1000th element in the queue
// start emptying it
if (queue.size() % 1000 == 0) {
ResultSetFuture elem;
do {
elem = queue.poll();
if (elem != null) {
elem.getUninterruptibly();
}
} while (elem != null);
}

// this will make your insertions around
// 4x faster when compared to normal execute

App setup

The instances come with Open JDK installed. This doesn't guarantee the best performance so I installed the Oracle java. In order not to loose the time on firewall setup I simply copied the "cassandra.pem" file to every node.


# copy ".jar" and "cassandra.pem" file to a single app node
# copy the two files from single node to other nodes
# it's a lot faster then uploading to every node (at least on my connection)

# setup the machine
wget --no-check-certificate --no-cookies - --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u71-b14/jdk-7u71-linux-x64.tar.gz"

tar -xvzf jdk-7u71-linux-x64.tar.gz

sudo update-alternatives --install "/usr/bin/java" "java" "/home/ec2-user/jdk1.7.0_71/jre/bin/java" 1

# pick the new java number in this step
sudo update-alternatives --config java

# check with this
java -version

Resources

  • 2x c4.xlarge
  • 2x c4.2xlarge
  • 4x c3.xlarge
Note I've added c4 nodes because I was limited on the amazon with the number of instances I could boot. Also I had to request it with the customer service but I couldn't assume how many instances of every type I'll use so the instances are not of the same type for load and app servers.

Cassandra

Setting up the Cassandra is the easiest part of the whole undertaking. All I did was following this guide by DataStax.

Resources

  • 7x c3.2xlarge
After hanging on the 90 000 req/s for a while I came to conclusion that perhaps the replication factor of two might be too much for the resources I had available. I would probably need to further increase the number of Cassandra nodes but since I couldn't get any more instance up I've set the replication to 1. Notice that this replication factor does not allow loosing nodes in the cluster without loosing the data. But the goal here is 100 000 req/s on a budget :)

Results

In the end it took me around 30$ to reach the 100k limit. I'm afraid to calculate how much this setup would cost on a monthly or yearly basis.

The successful run looked like this:

Total messages: 31 145 914 messages
Checked number: 31 145 914 messages
Average: 103 809 req/s

Don't be afraid to send me an email if you have any questions what so ever ;)

Setting up Cassandra Cluster in Virtual Machines

Intro

From time to time having just one Cassandra instance installed on your machine is not enough because you want to test certain behaviors when Cassandra cluster is up and running. Having extra spare hardware on the side or processing time on amazon is not always an option. So it's a good idea to setup a simple cluster on your own machine with instances in virtual machines. This post is going to show you how to do it with VirtualBox.

Getting VirtualBox Images

The reason why I chose VirtualBox is that there are lot of free virtual images available. Most of the time you'll be installing Cassandra on a Linux machine. I decided to go with the CentOS. Head over to http://virtualboxes.org/images/centos/ and download CentOS-6.6-x86_64-minimal. The default settings are fine for every machine. Create couple of them, give them names so that you can differentiate between them (Node1, Node2, etc. ...).

Perhaps the best idea would be for you to setup one node first and then make copies afterwards. Do not forget to set the network to bridged adapter. The username and password for the virtual machines are probably set to "root/reverse" but check those options when downloading the virtual box image. To keep it short I'll just continue with using the root user. When doing things in production it's an extremely bad practice.

Setup networking

When importing .ova file virtual box is going to ask you if you want to reinitialize mac address. Check that option. There is a certain amount of buggy behavior when it comes down to networking. So to prevent those errors run the following command when logging in to the virtual machine (root/reverse):


rm /etc/udev/rules.d/70-persistant-net.rules
When VirtualBoxinitializes the networking on the virtual machine it put a new mac address to a file. There seems to be a bug where this mac address is not transferred from that file to the virtual machine settings. Run the following command and copy the MAC Address.

cat /etc/sysconfig/network-scripts/ifcfg-eth0
Shutdown the machine and set the mac address under Settings > Network > Advanced > MAC Address

Install Java

Just to make things a bit easier we're going to install wget:


yum install wget
Now we are going to install java:

$ cd /opt/
$ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u72-b14/jdk-7u72-linux-x64.tar.gz"
$ tar xzf jdk-7u72-linux-x64.tar.gz
$ rm jdk-7u72-linux-x64.tar.gz

$ cd /opt/jdk1.7.0_72/

$ alternatives --install /usr/bin/java java /opt/jdk1.7.0_72/bin/java 2
$ alternatives --config java

$ alternatives --install /usr/bin/jar jar /opt/jdk1.7.0_72/bin/jar 2
$ alternatives --install /usr/bin/javac javac /opt/jdk1.7.0_72/bin/javac 2
$ alternatives --set jar /opt/jdk1.7.0_72/bin/jar
$ alternatives --set javac /opt/jdk1.7.0_72/bin/javac

$ vi /etc/profile.d/java.sh
export JAVA_HOME=/opt/jdk1.7.0_72
export JRE_HOME=/opt/jdk1.7.0_72/jre
export PATH=$PATH:/opt/jdk1.7.0_72/bin:/opt/jdk1.7.0_72/jre/bin
reboot (and check with echo $JAVA_HOME[enter])

Install Cassandra

Cassandra is installed and run by the following commands:


$ cd /opt/
$ wget http://downloads.datastax.com/community/dsc-cassandra-2.1.2-bin.tar.gz
$ tar xzf dsc-cassandra-2.1.2-bin.tar.gz
$ rm dsc-cassandra-2.1.2-bin.tar.gz

[check ip address with ifconfig]

$ cd conf

$ vi cassandra.yaml
rpc_address: ip address of the node
broadcast_address: ip address of the node
- seeds: ip_address of the first node

$ cd ../bin
$ ./cassandra

Firewall settings

The cluster will not work out of the box because of the firewall settings. To start everything you will need to enable the following ports:


$ iptables -I INPUT -p tcp -m tcp --dport 9042 -j ACCEPT
$ iptables -I INPUT -p tcp -m tcp --dport 7000 -j ACCEPT
$ iptables -I INPUT -p tcp -m tcp --dport 7001 -j ACCEPT
$ iptables -I INPUT -p tcp -m tcp --dport 7199 -j ACCEPT

$ /etc/init.d/iptables save

$ service iptables restart
Now make copies of this machine and update cassandra.yaml file with the ip addresses of the new machines. Also do check /var/log/cassandra/system.log to see if other nodes are joining in.

Installing Cassandra on MINIX NEO X5 min (android multimedia player)

Intro

I started doing some DIY home automation projects. Although I have the mega popular Raspberry Pi available I decided to use the MINIX NEO X5 mini because I felt this device could be used a lot better if it served me as some sort of home automation server. The first part in this story is getting a more server oriented OS on the device. I decided to go with the linux. After a lot of searching and trial and error I decided to deploy an application called Linux deploy and described it in my previous blog post. Trough the rest of the tutorial I'll assume you managed to install a linux instance on your MINIX. I am going to gather a lot of telemetry data with the solution I am building so installing Cassandra seems as a natural choice to me. There will be a lot of writes and Cassandra is good at writing at an incredible scale.

Installing Java


$ echo "deb http://ppa.launchpad.net/webupd8team/java/ubuntu trusty main" | sudo tee /etc/apt/sources.list.d/webupd8team-java.list
$ echo "deb-src http://ppa.launchpad.net/webupd8team/java/ubuntu trusty main" | sudo tee -a /etc/apt/sources.list.d/webupd8team-java.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys EEA14886
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer
# you'll need to accept license agreement

# set environment variables
$ sudo apt-get install oracle-java8-set-default

# login once again just in case
$ exit

Installing python

Cassandra comes with a very nice tool called cqlsh. The version of linux we currently have installed will not run it without a python available on the system. So we have to install it first.


$ sudo apt-get install python2.7

Let's start the Cassandra

Configuring the Cassandra is a chapter on it's own. We'll make minimal adjustments before starting. We'll configure the Cassandra to respond to queries from other hosts and while we are at it we'll enable the virtual nodes. (Will be easier to scale later).


$ cd CASSANDRA_INSTALL_DIRECTORY
$ nano conf/cassandra.yaml

# uncomment
num_tokens: 256

# change to 0.0.0.0
# this will enable you to contact the cassandra
# from other computers etc.
rpc_address: 0.0.0.0

#save file

$ cd ..
$ ./bin/cassandra

# after seeing something like
# Startup completed! Now serving reads.
# press ^C (don't be afraid cassandra still runs)

$ bin/cqlsh

Connected to Test Cluster at localhost:9160.
[cqlsh 3.1.8 | Cassandra 1.2.18 | CQL spec 3.0.5 |
Thrift protocol 19.36.2]
Use HELP for help.
cqlsh>

Shutting cassandra down:


# find PID of a cassandra process

$ ps -ef | grep cassandra

# run kill -9 [the PID number ... i.e. 8212]

Running Cassandra on android multimedia player is fun :)

Cassandra with Node.js and Arduino

Intro

This post continues where this post stopped. The Cassandra setup used for this post is more or less the same so please read this post if you are interested in cassandra setup before continuing with the rest of the post.

Arduino

Learning big data stuff is most exciting when the data represents something from the real world and not something generated with a help of big loop and then randomized data in it. To create data for this example I've used the following components:

  1. arduino uno
  2. Photoresistor GL5528 LDR
  3. 10K OHM NTC Thermistor 5mm
  4. 2x 10k resistor
  5. Protoboard
  6. Wires
Couple of this inexpensive components combined with arduino give us a nice big data sensor / generator. Now it might not seem that complicated but sampling any data at a one second level will hit on the cassandra limitations after one month of sampling if not done right, so having a simple arduino setup is fun and motivating way to tackle learning cassandra stuff. For now let's concentrate on the arduino part. The wiring is shown here:


The Arduino sketch will be on the gitHub, so we'll concentrate on the important parts. The light level in this example is read at analog 0. Reading analog values in arduino results in values ranging from 0-1023. We'll define light level as a mapping from 0-1023 into 0-100. Arduino already has a built in function for this called map. Also, I had some trouble in my initial experiments with Arduino serial communication and reading pin values. The data written to the serial port simply got corrupted after a while. I've read a couple of forums on this subject and found out that it actually helps when one delays execution after reading a pin value for 1ms. Also to keep the things as stable as possible we'll pause the execution for 1 second after writing to serial port as shown here:


int light = map(analogRead(0), 0, 1023, 0, 100);
delay(1);

....

sprintf(sOut, "%d,%s", light, deblank(sTemp));

Serial.println(sOut);
delay(1000);

Node.js and Cassandra

Parsing the messages that come from the measuring devices is pretty repetitive stuff that causes pretty ugly code. I've learned that the hard way. To make parsing of this messages as easy as possible I've written a small utility package for parsing the messages that come from the measuring devices and it's available on npm.

Using serial ports in node.js doesn't take a lot of steps to setup:


var serial = require( "serialport" );
var SerialPort = serial.SerialPort;

var portName = "/dev/tty.usb-something";

var sp = new SerialPort(portName, {
baudrate:9600,
parser:serial.parsers.readline("\n")
});

sp.on("data", function ( data ) {
var arduinoData = translator.parse(data);
//...

To make the data handling easier and more in accordance with cassandra best practices the readings will be partitioned by date when they were recorded.


CREATE TABLE room_data (
day text,
measurementtime timestamp,
light int,
temperature float,
PRIMARY KEY (day, measurementtime)
) WITH CLUSTERING ORDER BY (measurementtime DESC);

Also the data will probably be more often fetched for recent time stamps with queries that have limits set on them. To make this fetching easier we've added a clustering statement above. Also to get the current light and temperature level we would just have to run the following query (no where combined with now function):


SELECT * FROM room_data LIMIT 1;

After setting up the cassandra and reading the data from the serial port and parsing the data it's time to write this data into the cassandra. Analyzing the data and doing something useful with it will be in some future posts that I'll make but for now I'll stop with writing the data into cassandra:


client.execute('INSERT INTO room_data ' +
'(day, measurementtime, light, temperature)' +
' VALUES (?, dateof(now()), ?, ?)',
[
moment().format('YYYY-MM-DD'),
arduinoData.light,
arduinoData.temperature
],
function(err, result) {
if (err) {
console.log('insert failed', err);
}
}
);

On the fifth line I've used moment.js to format current time into string representation of current date used for partitioning in cassandra. The rest of the code is pretty much the usual sql stuff found in other database environments.

I recorder couple of hours worth of data here. Just in case anybody wants a sneak peak without having to setup everything up. I've exported the data out from cassandra trought cql using this command:


COPY room_data (day, measurementtime, light, temperature)
TO 'room_data.csv';

The rest of the example is located on gitHub.

Replace a dead node in Cassandra

Note (June 2020): this article is old and not really revelant anymore. If you use a modern version of cassandra, look at -Dcassandra.replace_address_first_boot option !

I want to share some tips about my experimentations with Cassandra (version 2.0.x).

I found some documentations on datastax website about replacing a dead node, but it is not suitable for our needs, because in case of hardware crash, we will set up a new node with exactly the same IP (replace “in place”). Update : the documentation in now up to date on datastax !

If you try to start the new node with the same IP, cassandra doesn’t start with :

java.lang.RuntimeException: A node with address /10.20.10.2 already exists, cancelling join. Use cassandra.replace_address if you want to replace this node.

So, we need to use the “cassandra.replace_address” directive (which is not really documented ? :() See this commit and this bug report, available since 1.2.11/2.0.0, it’s an easier solution and it works.

+    - New replace_address to supplant the (now removed) replace_token and
+      replace_node workflows to replace a dead node in place.  Works like the
+      old options, but takes the IP address of the node to be replaced.

It’s a JVM directive, so we can add it at the end of /etc/cassandra/cassandra-env.sh (debian package), for example:

JVM_OPTS="$JVM_OPTS -Dcassandra.replace_address=10.20.10.2" 

Of course, 10.20.10.2 = ip of your dead/new node.

Now, start cassandra, and in logs you will see :

INFO [main] 2014-03-10 14:58:17,804 StorageService.java (line 941) JOINING: schema complete, ready to bootstrap
INFO [main] 2014-03-10 14:58:17,805 StorageService.java (line 941) JOINING: waiting for pending range calculation
INFO [main] 2014-03-10 14:58:17,805 StorageService.java (line 941) JOINING: calculation complete, ready to bootstrap
INFO [main] 2014-03-10 14:58:17,805 StorageService.java (line 941) JOINING: Replacing a node with token(s): [...]
[...]
INFO [main] 2014-03-10 14:58:17,844 StorageService.java (line 941) JOINING: Starting to bootstrap...
INFO [main] 2014-03-10 14:58:18,551 StreamResultFuture.java (line 82) [Stream #effef960-6efe-11e3-9a75-3f94ec5476e9] Executing streaming plan for Bootstrap

Node is in boostraping mode and will retrieve data from cluster. This may take lots of time.
If the node is a seed node, a warning will indicate that the node did not auto bootstrap. This is normal, you need to run a nodetool repair on the node.

On the new node :

# nodetools netstats

Mode: JOINING
Bootstrap effef960-6efe-11e3-9a75-3f94ec5476e9
    /10.20.10.1
        Receiving 102 files, 17467071157 bytes total
[...]

After some time, you will see some informations on logs !
On the new node :

 INFO [STREAM-IN-/10.20.10.1] 2014-03-10 15:15:40,363 StreamResultFuture.java (line 215) [Stream #effef960-6efe-11e3-9a75-3f94ec5476e9] All sessions completed
 INFO [main] 2014-03-10 15:15:40,366 StorageService.java (line 970) Bootstrap completed! for the tokens [...]
[...]
 INFO [main] 2014-03-10 15:15:40,412 StorageService.java (line 1371) Node /10.20.10.2 state jump to normal
 WARN [main] 2014-03-10 15:15:40,413 StorageService.java (line 1378) Not updating token metadata for /10.20.30.51 because I am replacing it
 INFO [main] 2014-03-10 15:15:40,419 StorageService.java (line 821) Startup completed! Now serving reads.

And on other nodes :

 INFO [GossipStage:1] 2014-03-10 15:15:40,625 StorageService.java (line 1371) Node /10.20.10.2 state jump to normal

Et voilà, dead node has been replaced !
Don’t forget to REMOVE modifications on cassandra-env.sh after the complete bootstrap !

Enjoy !