ReversingLabs: Serving File Reputation for Twenty Billion Files

ReversingLabs are in the business of threat intelligence: “Any file. Any location. Any threat.” The only way to keep up with the constant and evolving nature of cybersecurity hazards is to automate at massive scale and to use the fastest, most scalable solutions possible, which ReversingLabs has done with their Titanium platform.

At Scylla Summit 2019, ReversingLabs’ software architect Goran Cvijanovic explained how they integrated Scylla into their Titanium platform to provide file reputation. In his talk, he focused on long-tail latencies, and how he could also optimize Scylla’s compression to get the most out of his infrastructure’s storage.

There are files that are known to be benign. Others that are known to be malign. But what about files that are not yet known and classified? Those need to be analyzed swiftly, entered into the ReversingLabs database of over twenty billion known files, and that knowledge needs to be propagated quickly to catch any instances anywhere in a network or attached system: an email file server, a source code repository, a website, a laptop.

ReversingLabs Titanium Platform

ReversingLabs’ TitaniumCloud platform maintains a list, scoring the individual reputation of billions of goodware or malware files. It can also classify files as “suspicious” (for further investigation) or those whose status is currently unknown (unclassified). Their entire database tracks over 20 billion files, and contains detailed analyses for about half of those.

ReversingLabs maintains a database of tens of billions of known “goodware” (benign) and malware files and packages. Its harvesting system then has to determine how to handle new, unknown files using file decomposition and dynamic analysis.

To test files, ReversingLabs does not run them, but performs static analysis — decomposing and reverse engineering them — hence the company name. Beyond that their dynamic analysis can execute files in a secure environment to observe behavior and discern what the file actually does. However, the dynamic analysis is computationally slow — taking minutes or hours of observation in test environments.

On top of these internal analyses, ReversingLabs also aggregates detection results from more than forty other antivirus partner companies. Information about ransomware, Advanced Persistent Threats (APTs), Common Vulnerabilities and Exposures (CVEs), plus financial and retail information sources.

The reputation analyses and metadata are stored in a highly available database system. Plus the actual files — more than one billion malware-infected files — have to be stored securely at ReversingLabs, currently spanning five petabytes of storage. Goran likened their secure storage system to the Ecto Containment Unit from the original Ghostbusters movie. “Who ya gonna call? You will call our APIs!”

ReversingLabs allows users to call more than 50 APIs providing information, from broad scans to narrow searches, plus feeds about known “malware in the wild.” ReversingLabs can also tailor specific feeds to different types of customers based on their vertical, such as banking.

ReversingLabs TitaniumCloud Services provide more than 50 APIs to users, aggregating analysis of tens of billions of files, information from allowing them to build secure applications to detect malware in their own applications and networks.

To store the reputation data and metadata ReversingLabs built their own custom distributed key-value store over seven years ago using LSM trees, and written in C++. It makes more than 3 billion reads and 1 billion writes a day.

Since developing their original system ReversingLabs desired database independence. So they implemented a connection library to allow their applications to be connected transparently with different databases, given the following initial set of requirements:

  • Key-value native protobuf format
  • Lempel-Ziv 4 (LZ4) compression to reduce storage space
  • Latencies <2 milliseconds
  • Support record sizes ranging from 1KB to 500 MB
  • High availability with replication

Goran noted size was a key issue. Because ReversingLabs deals with records that vary greatly — the “Variety” in the Three V’s of Big Data — they needed a database that could handle the various file sizes without causing significant problems.

From Proof of Concept to Production

ReversingLabs began a proof-of-concept (POC) on an 8-node Scylla cluster to test two APIs. They appreciated the SSTable 3.0 “mc” format, which would save them dramatically on storage space. They also had to play with their chunk size in compression, increasing it to 64KB for large file sizes, which achieved a 49% increase in compression. Finally they tweaked their requirements a bit, to allow for storing data as a blob instead of a protobuf.

To improve performance, their inserts/updates and reads are all done at consistency level quorum (CL=QUORUM). To minimize impacts of repairs, their deletes are done instead with consistency level all (CL=ALL).

Another aspect of performance ReversingLabs needed to ensure was scaling to meet time-of-day traffic patterns. You can see below how their requests-per-hour more than double, from ~10 million requests per hour in the early morning, to over 26 million in the afternoon peak.

In end-to-end workflow testing (not only the database), average latencies were less than 120 milliseconds, and p99 latencies were 166 milliseconds. This included the user request, authentication process, request validation, the round-trip-time to query the database, to format and then finally send the response to the application, scaled to 32 workers all in parallel. Here were their performance metrics, with the average (p50) and long-tail (p99) latencies highlighted:

Test duration: 0:05:20
Samples count: 81645, 0.00 % failures
Average times: total 0.120, latency 0.120
Percentiles:
┌───────────────┬───────────────┐
│ Percentile, % │ Resp. Time, s │
├───────────────┼───────────────┤
│           0.00.080 │
│          50.00.118 │
│          90.00.135 │
│          95.00.142 │
│          99.00.166 │
│          99.90.232 │
│         100.00.331 │
└───────────────┴───────────────┘

Within Scylla itself, Goran was able to show file reputation queries with average writes latencies of <6 milliseconds and sub-millisecond average reads. For p99 (long-tail) latencies, Scylla showed <12 millisecond writes, and <7 millisecond reads.

Scylla Monitoring Stack graphs showing average write latencies (top) and reads (bottom), proving Scylla more than capable of serving as a production-ready database for ReversingLabs.

These performance results proved Scylla’s capabilities, and ReversingLabs shifted workloads onto Scylla as a production system.

Lessons Learned

Goran advised users to “know your data,” and to test compression, with different chunk_size settings to get optimal storage savings balanced against performance. In their case, they were able to save on 49% of storage. (Find out more about compression in Scylla in our blog.)

He also noted that repairs can impact cluster performance, so the way they implemented consistency level quorum allowed them to get fast reads and writes, and the deletions with consistency level all meant that they had less of a performance hit when it came time to conduct repairs. (Find out more about repairs in this course in Scylla University.)

Goran also recommended NVMe disks in RAID0 configuration, a point ScyllaDB’s Glauber Costa reinforced in his talk on this and other tips for How to be Successful with Scylla.

Lastly, we encourage you to view Goran’s talk in full below, and check out his slides on our Tech Talks page.

The post ReversingLabs: Serving File Reputation for Twenty Billion Files appeared first on ScyllaDB.

Building a Low-Latency Distributed Stock Broker Application: Part 3

In the third blog of  “Around the World ” series focussing on globally distributed storage, streaming and search, we build a Stock Broker Application. 

1. Place Your Bets!

London Stock Exchange 1800’s

How did Phileas Fogg make his fortune? Around the World in Eighty Days describes Phileas Fogg in this way: 

Was Phileas Fogg rich? Undoubtedly. But those who knew him best could not imagine how he had made his fortune, and Mr. Fogg was the last person to whom to apply for the information.

I wondered if he had made his fortune on the Stock Market, until I read this:

Certainly an Englishman, it was more doubtful whether Phileas Fogg was a Londoner. He was never seen on ‘Change, nor at the Bank, nor in the counting-rooms of the “City“‘

Well, even if Fogg wasn’t seen in person at the ‘Change (London Stock Exchange), by 1872 (the year the story is set), it was common to use the telegraph (the internet of the Victorian age, which features regularly in the story) to play the market.

Victorian era Telegraph Office

(Not) a real Victorian era Telegraph Office

In fact the ability of the telegraph to send and receive information faster than horses/trains/boats etc. had been used for stock market fraud as early as 1814! (The “Great Stock Exchange Fraud of 1814”). Coincidentally (or not?), the famous London Stock Exchange Forgery, also involving the telegraph, also occurred in 1872! Perhaps this explains the ambiguity around the origin of Fogg’s wealth!

What is certain is that Phileas Fogg became the subject of intense betting, and he was even listed on the London Stock Exchange (Chapter V – IN WHICH A NEW SPECIES OF FUNDS, UNKNOWN TO THE MONEYED MEN, APPEARS ON ‘CHANGE):

Not only the members of the Reform, but the general public, made heavy wagers for or against Phileas Fogg, who was set down in the betting books as if he were a race-horse. Bonds were issued, and made their appearance on ‘Change; “Phileas Fogg bonds” were offered at par or at a premium, and a great business was done in them. But five days after the article in the bulletin of the Geographical Society appeared, the demand began to subside: “Phileas Fogg” declined. They were offered by packages, at first of five, then of ten, until at last nobody would take less than twenty, fifty, a hundred!”

The 1870’s also saw the introduction of a new technological innovation in Stock Trading, the Stock Ticker machine. Stock tickers were a special type of telegraph receiver designed to print an alphabetical company symbol and the current price of that company’s stock on a paper roll called ticker tape. This enabled stock prices to be communicated closer to real-time across vast distances, and revolutionized trading. 

1870’s Stock Ticker Machine

1870’s Stock Ticker Machine

2. Let’s Build a Stock Broker Application

Fast forward 128 years from 1872 to 2000 and technology looked a bit different. I’m taking inspiration from an earlier project I worked with from 2000-2003 at CSIRO (Australia’s national science research agency) called “StockOnline”. This was an online Stock Broker application designed to benchmark new component-based middleware technologies, including Corba and Enterprise Java (J2EE). The original version simulated traders checking their stock holdings and current stock prices, and then buying and selling stocks, resulting in revised stock holdings. The benchmark could be configured with different workload mixes, and the number of concurrent traders could be scaled up to stress the system under test. Metrics captured included the relative number, throughput, and response time of each of the operations. 

Some of the technology innovations that the project was designed to give insights into included: 

  • the use of application servers to provide easy to manage and scalable container resourcing (yes, containers are at least 20 years old); 
  • how portable the application was across multiple different vendors application servers (sort of);
  •  the impact of JVM choice and settings (lots); 
  • explicit support for component configuration (e.g. wiring components together); and 
  • deployment into containers, rich container services, and multiple persistence models to manage state and allow database portability (e.g. Container Managed Persistence vs. Bean Managed Persistence). 

At the end of the project we made the StockOnline code and documentation open source, and I recently rediscovered it and made it available on github. I was surprised to learn that Enterprise Java is still going strong and is now run by the Eclipse Foundation and called “Jakarta EE”.  Also interesting is that there is support for persistence to Cassandra

So let’s move on to the present day.

3. High-Frequency Low-Latency Trading

Modern stock markets are fast-moving, ultra-competitive environments. Orders are sent to the market by high speed networks and executed almost instantly. This makes low-latency trading a key to profitability.  Trade related latency is any delay in the time it takes for a trader to interact with the market, and includes distance related network delays, delays in receiving and acting on information, and delays caused by brokers (e.g. queuing of orders, delays interacting with the stock exchange to trade the orders, etc.). Some of the key solutions to reducing latency are broker side hosting of orders (orders are hosted on brokers and automatically traded when conditions are met), and Direct Market Access (brokers are as close as possible to stock exchanges, with super-fast network connections).

A new type of US Stock Exchange (IEX) was even created to address some of the issues around fairness of stock trading due to latency. Some brokers are able to take advantage of even small latency differences – “price snipping”, or so called “dark pools” which fill orders from within a pool rather than via public stock exchanges, to make huge profits. Although, somewhat oddly, the IEX levelled the playing field by introducing delays to ensure that no one playing the market has more up-to-date information than anyone else.

Latency is partially caused by the global location of stock exchanges. Where are stock exchanges located?  There are 60 exchanges around the world on every continent with a total value of $69 Trillion, and 16 worth more than $1 Trillion each!

4. Initial Application Design

I had already started writing a new version of StockOnline before I rediscovered the original, so the new version doesn’t share any of the original code. However, it does turn out to have similar entities, but with some significant differences to model multiple StockExchanges and Brokers.  Here’s the UML Class diagram of my new prototype code:

The first major difference is that it’s designed to model and simulate distributed stock brokers across multiple global “georegions”. We introduced the concept of georegions in blog 1 (called “latency regions”) and blog 2 (called “georegions”). A georegion is a geographic region that has at least two AWS regions (for data center redundancy), and ensures that applications within the same georegion are within 100ms latency of each other and users in the same georegion in case of failure of one region.  Here’s the map from the previous blogs showing the eight sub 100ms latency georegions that we identified (North America, South America, Atlantic, Europe, Middle East, Central Asia, East Asia, Australasia):

This means that I have to explicitly model multiple StockExchanges. Each StockExchange is in a particular location in a georegion.  Each StockExchange is responsible for listing some stocks, providing meta-data about the stocks,  publishing changes to stock prices as StockTickers, and matching buy/sell orders (i.e. executing trades).  For simplicity we assume that each stock is only listed on a single StockExchange. 

Each georegion has one or more StockBrokers which are co-located in the same georegion as some StockExchanges to ensure low-latency (and potentially redundancy).  The StockBrokers are responsible for discovering StockExchanges, all the stocks listed for trading, obtaining StockTickers to update current price data, and computing trends and longer term Stock Statistics that inform traders making decisions about buying and selling. They are also responsible for keeping track of trader data, updating StockHoldings for Traders, keeping track of orders and trading them on the appropriate StockExchanges, and keeping records of trades (transactions). Also different to the original version (which only had a single Market Order type), I wanted to have multiple different order types including Market, Limit and Stop Orders. This is important for the latency story as market orders are traded “manually” and immediately, but Limit and Stop Orders are traded automatically when they meet specific conditions, so can be traded very quickly and in larger volumes, this is a good explanation).

We assume that traders connect to their nearest StockBroker (to reduce latency and possibly to satisfy local financial rules). There is a series of operations supported by StockBrokers for traders, and also for interacting with the StockExchanges as follows.  First let’s look at the workflow for trading Market Orders, “Place Market Order”. These are essentially synchronous and immediate trades. The trader connects to the nearest broker, gets their current StockHoldings and current StockStatistics (for their holdings and possibly for other stocks they don’t currently own). Based on this information they decide what stocks to trade, whether to buy or sell, and the quantity of stocks, and create a Market Order. The broker then processes the Market Order (which may involve sending it to another broker), and receives confirmation that the trade occurred (including price, quantity, transaction ID etc.), and finally updates the trader’s StockHoldings for the stock traded. 

The steps to “Process Market Order” are as follows.  The order is sent to a broker in the same Georegion as the StockExchange listing the stock. This broker then immediately executes the trade (buys or sells) with the StockExchange, gets the transaction details, marks the order as filled (so it isn’t processed more than once), and updates the StockHolding amounts for the trader. 

The “Execute Trade with StockExchange” assumes that a trade is always possible (at the current price) and will occur instantaneously and completely, and has the following sub steps:

Market Orders are potentially a slow process due to all the synchronous steps, “think time” for the trader, and cumulative latencies due to the trader to broker, broker to broker, and broker to StockExchange communication paths.

As an alternative we also provide some asynchronous Order types: Limit and Stop. These brder types are only traded when the conditions are met, but then need to be executed as quickly as possible to prevent losses in a fast moving market.

We assume that the initial steps are mostly the same as “Place Market Order”, but with the added choice of Limit of Stop Order, and the limit price, and the final step (notification of eventual Trade) is asynchronous:

Once the Order placed, it is processed by sending it to the correct broker (as for Market Orders), and then that broker is responsible for continuously checking orders to see if they match:

This is done as follows (“Trade Matching Orders”) and relies on each broker receiving a stream of StockTicker updates from the StockExchanges in the same georegion. For each StockTicker the broker finds orders for that stock, and checks which orders meet the buy/sell conditions (the logic depends on the order type, if the price is rising or dropping, and if the current price is less than or greater to the order limit price). If the matching order(s) are Sell Orders then an extra step is to check that the trader still has sufficient holdings of that stock (they may have already sold some stock due to other orders being executed first). If all conditions are met then the broker initiates an immediate “Market” trade with the StockExchange as before.

The initial prototype application code is written in pure Java and just simulates the expected behaviour of the traders, brokers, and StockExchanges. It creates a specified number of georegions, brokers, StockExchanges, stocks, and traders with initial data. Then the simulation runs lots of rounds (seconds) for a specified period (e.g. a day).  Each round results in traders checking their holdings and StockStatistics, and creating orders (about ⅓ of each type, but only if the type matches the specific stock and market conditions). The orders are sent to the correct brokers. Each round the brokers receive simulated StockTickers from StockMarkets in the same georegion (using a pseudo random walk which keeps the stock direction for the majority of the time, but occasionally changing direction). Some stocks are more volatile than others so change faster.  Each round the brokers immediately Trade Market Orders, and check the conditions and trade matching Limit or Stop Orders. 

5. Initial Simulation Results—“Time Is Money”!

The simulation computes business level metrics including number of trades, value of trades, etc., and expected end-to-end latency based on deploying brokers in 8 georegions, and using the AWS inter-region latencies from blog 1 of this series. This gives us a baseline to eventually compare the real results with. The average latency to get the current stock price from a remote StockExchange and “Process Market Orders” is 700ms (max 1.2s), which includes multiple times for intra-region and inter-region networking. The average latency for Limit and Stop “Trade Matching” Orders is shorter at 100ms (max 200ms), as it only includes times to get StockTicker updates and the time to trade;.  i.e. it doesn’t include any AWS inter-region latencies as the operation is asynchronous and processed entirely within the georegion of the broker/StockExchange (we expect this to be slightly higher in practice due to the eventual overhead of database lookups and condition checking on the broker).

So is the saying “Time Is Money!” true in the context of low latency trading, and how much money exactly? I added a calculation to the simulation to compute potential profit losses assuming high volatility in the prices of some stocks, and higher latency times to trade. Due to potentially high volumes of trades even a small profit loss per trade can add up to big losses in profit very quickly.  For one simulation run with 2,101 completed trades, the potential profit loss for the higher latency Market Orders was 0.7% of the trade value (or Market Orders), but for the lower latency Limit and Stop Orders it was significantly less at 0.1% of the trade value (for those order types). For an average order size of $20,000 this corresponds to a $140 profit loss per Market Order, compared with only $20 profit loss for each Limit and Stop Order. Over hundreds or even thousands of trades per day (typical of HFT) this would quickly add up to significant amounts of money! Moreover, to make a profit, High Frequency Trading (HFT) relies on conducting a high volume of trades to rapidly take advantage of very small movements in prices, with potentially smaller profits per trade. So it’s easy to see why minimizing latency is a worthwhile goal in Fintech applications such as this. 

6. What Next?

In the next few blogs we’ll continue our journey “Around the World” and explore how to refine the initial simple design of the application so as to deploy and run it across multiple georegions using multiple AWS regions.

Initially this will involve mapping the components to Cassandra tables on a single data center. Once it’s working correctly with a single Cassandra data center, we’ll extend it to use multiple Cassandra data centers, which will require the use of multiple keyspaces for different replication topologies (e.g. replication across pairs of data centers vs. all data centers). We’ll also work out if, and how, to load-balance the application across multiple data centers in the same georegions, and how to enable redundancy, failover, and recovery at the application level.  It’s possible that a Kubernetes federated microservices mesh framework will help in doing this. We also plan to put Kafka to use to enable streaming StockTickers, so we’ll be investigating Kafka multi data center replication. 

7. Further Resources

IBM also has a StockTrader demonstration application, and an in-depth series about deploying it using Cassandra, Kafka, Redis, and Kubernetes.

There’s an example of stock analysis using Elasticsearch (I’m planning on using Elasticsearch to analyse the stock trends, and provide some map-based visualisation of the data).

This is an interesting article on “Hacking a HFT System”!

The Original CSIRO StockOnline code and documentation is now on github.

The post Building a Low-Latency Distributed Stock Broker Application: Part 3 appeared first on Instaclustr.

Extracting Scylla Cloud Metrics using Prometheus Format

Database and system administrators want to track applications/infrastructure metrics and determine how to best utilize their systems. They want to track trends in time and to correlate between multiple systems to identify business impacts and improve upon their base performance.

The ability to use standard formats for metrics makes this correlation easy and empowers everyone familiar with these formats to use them without being locked in with one vendor.

With that in mind, Scylla Cloud, our fully managed NoSQL service with its DynamoDB and Cassandra APIs, supports extracting cluster metrics in Prometheus format. Prometheus is the open source monitoring solution from the Cloud Native Computing Foundation (CNCF).

Scylla’s Extract Metrics feature uses an internally-developed proxy to ship those metrics to customer systems in a secure method. Customers can then choose how to utilize those metrics, either using Scylla Monitoring Stack or any other 3rd party platform that supports the Prometheus format (e.g: Datadog, Sensu, New Relic, and many others).

What Scylla Cloud Extract Metrics gathers

By activating the Extract Metrics feature on a cluster, you can extract all ScyllaDB statistics and perform the following tasks:

  • See more dashboards than the default Scylla Cloud monitor delivers. This includes the CQL Optimization dashboard, a tool to help identify potential issues with queries, data model and driver, OS statistics, errors, table statistics and much more.CQL optimization dashboard:

Scylla Monitoring Stack CQL optimization dashboard

Scylla Monitoring Stack OS metrics dashboard

  • Create your own dashboards
    Want to organize your dashboards exactly the way you want? We give you all the relevant instructions here: Adding and Modifying Dashboards
  • Define Business-specific alerts
    Define normal behavior for your workload, and create customized alerts to notify you when your cluster exceeds or falls below acceptable bounds of these performance patterns. Read more about setting up your alerts.
  • Export metrics to any 3rd party Prometheus format monitoring tool
    Datadog, Sensu, New Relic, and many others. For example, read our blog about using Datadog with Scylla.

Key Metrics

Scylla supports thousands of metrics, and it can be challenging to extract the key metrics one should keep track of. One good way to do so, is using the provided Grafana dashboards, which represent a subset of metric Scylla Engineers identified as important.

For example, the Cluster Overview dashboard include the following metrics (and more):

  • Node Status: scylla_node_operation_mode
  • CPU (Reactor) load: scylla_reactor_utilization
  • Size of Scylla Data Directory: node_filesystem_size_bytes
  • Requests served: scylla_transport_requests_served
  • Number of write requests: scylla_storage_proxy_coordinator_write_latency_count
  • Write requests latency: scylla_storage_proxy_coordinator_write_latency_sum
  • Write Timeouts: scylla_storage_proxy_coordinator_write_timeouts
  • Number of read requests: scylla_storage_proxy_coordinator_read_latency_count
  • Read requests latency: scylla_storage_proxy_coordinator_read_latency_sum
  • Read timeouts: scylla_storage_proxy_coordinator_read_timeouts
  • Cache Hits: scylla_cache_row_hits
  • Cache Miss: scylla_cache_row_misses

Get Started Today

This feature is now available to all users of Scylla Cloud. To use this feature on existing clusters, follow this user guide: Extract Cluster Metrics in Prometheus format.

For new clusters, check the box Extract Cluster Metrics:

GET STARTED WITH SCYLLA CLOUD

The post Extracting Scylla Cloud Metrics using Prometheus Format appeared first on ScyllaDB.

A Comprehensive Guide to Cassandra Architecture

Introduction

The Apache Cassandra architecture is designed to provide scalability, availability, and reliability to store massive amounts of data. If you are new to Cassandra, we recommend going through the high-level concepts covered in what is Cassandra before diving into the architecture.  

This blog post aims to cover all the architecture components of Cassandra. After reading the post, you will have a basic understanding of the components. This can be used as a basis to learn about the Cassandra Data Model, to design your own Cassandra cluster, or simply for Cassandra knowledge.

Cluster Topology and Design

Cassandra is based on distributed system architecture. In its simplest form, Cassandra can be installed on a single machine or in a docker container, and it works well for basic testing. A single Cassandra instance is called a node. Cassandra supports horizontal scalability achieved by adding more than one node as a part of a Cassandra cluster. The scalability works with linear performance improvement if the resources are configured optimally.

Cassandra works with peer to peer architecture, with each node connected to all other nodes. Each Cassandra node performs all database operations and can serve client requests without the need for a master node. A Cassandra cluster does not have a single point of failure as a result of the peer-to-peer distributed architecture. 

Nodes in a cluster communicate with each other for various purposes. There are various components used in this process:

  • Seeds: Each node configures a list of seeds which is simply a list of other nodes. A seed node is used to bootstrap a node when it is first joining a cluster. A seed does not have any other specific purpose, and it is not a single point of failure. A node does not require a seed on subsequent restarts after bootstrap. It is recommended to use two to three seed nodes per Cassandra data center (data centers are explained below), and keep the seeds list uniform across all the nodes. 
  • Gossip: Gossip is the protocol used by Cassandra nodes for peer-to-peer communication. The gossip informs a node about the state of all other nodes. A node performs gossip with up to three other nodes every second. The gossip messages follow specific format and version numbers to make efficient communication.

A cluster is subdivided into racks and data centers. These terminologies are Cassandra’s representation of a real-world rack and data center. A physical rack is a group of bare-metal servers sharing resources like a network switch, power supply etc. In Cassandra, the nodes can be grouped in racks and data centers with snitch configuration. Ideally, the node placement should follow the node placement in actual data centers and racks. Data replication and placement depends on the rack and data center configuration. 

Cluster subdivided into Racks and Data centers

Multiple Data Centers

A rack in Cassandra is used to hold a complete replica of data if there are enough replicas, and the configuration uses NetworkTopologyStrategy, which is explained later. This configuration allows Cassandra to survive a rack failure without losing a significant level of replication to perform optimally. 

There are various scenarios to use multiple data centers in Cassandra. Few common scenarios are:

  • Build a Cassandra cluster with geographically distinct data centers which cater to clients from distinct locations, e.g.a cluster with three data centers in US, EU, and APAC serving local clients with low latency.
  • Separate Cassandra data centers which cater to distinct workloads using the same data, e.g. separate data centers to serve client requests and to run analytics jobs.
  • Active disaster recovery by creating geographically distinct data centers, e.g. a cluster with data centers in each US AWS region to support disaster recovery.

Database Structures

Cassandra stores data in tables where each table is organized in rows and columns same as any other database. Cassandra table was formerly referred to as column family. Tables are grouped in keyspaces. A keyspace could be used to group tables serving a similar purpose from a business perspective like all transactional tables, metadata tables, use information tables etc. Data replication is configured per keyspace in terms of replication factor per data center and the replication strategy.  See the replication section for more details.

Each table has a defined primary key. The primary key is divided into partition key and clustering columns. The clustering columns are optional. There is no uniqueness constraint for any of the keys.

The partition key is used by Cassandra to index the data. All rows which share a common partition key make a single data partition which is the basic unit of data partitioning, storage, and retrieval in Cassandra.  

Refer to cassandra-data-partitioning for detailed information about this topic. 

Partitioning

A partition key is converted to a token by a partitioner. There are various partitioner options available in Cassandra out of which Murmur3Partitioner is used by default. The tokens are signed integer values between -2^63 to +2^63-1, and this range is referred to as token range. Each Cassandra node owns a portion of this range and it primarily owns data corresponding to the range. A token is used to precisely locate the data among the nodes and on data storage of the corresponding node.  

It is evident that when there is only one node in a cluster, it owns the complete token range. As more nodes are added, the token range ownership is split between the nodes, and each node is aware of the range of all the other nodes. 

Here is a simplified example to illustrate the token range assignment. If we consider there are only 100 tokens used for a Cassandra cluster with three nodes. Each node is assigned approximately 33 tokens like: 

 node1: 0-33 node2: 34-66 node3: 67-99. 

 If there are nodes added or removed, the token range distribution should be shuffled to suit the new topology. This process takes a lot of calculation and configuration change for each cluster operation. 

Virtual nodes/Vnodes

To simplify the token calculation complexity and other token assignment difficulties, Cassandra uses the concept of virtual nodes referred to as Vnodes. A cluster is divided into a large number of virtual nodes for token assignment. Each physical node is assigned an equal number of virtual nodes. In our previous example, if each node is assigned three Vnodes and each Vnode 11 tokens: 

 v1:0-9, v2:10-19, v3:20-29 so on 

 Each physical node is assigned these vnodes as:

 node1: v1, v4, v7 node2: v2, v5, v8 node3: v3, v6, v9 

Virtual Nodes or Vnodes

The default number of Vnodes owned by a node in Cassandra is 256, which is set by  num_tokens property. When a node is added into a cluster, the token allocation algorithm allocates tokens to the node. The algorithm selects random token values to ensure uniform distribution. But, the num_tokens property can be changed to achieve uniform data distribution. The number of 256 Vnodes per physical node is calculated to achieve uniform data distribution for clusters of any size and with any replication factor. In some large clusters, the 256 Vnode do not perform well please refer blog cassandra-vnodes-how-many-should-i-use for more information.

Replication

The data in each keyspace is replicated with a replication factor. The most common replication factor used is three. There is one primary replica of data which resides with the token owner node as explained in the data partitioning section. The remainder of replicas are placed by Cassandra on specific nodes using replica placement strategy. All replicas are equally important for all database operations except for a few cluster mutation operations.

There are two settings which mainly impact replica placement. First is snitch, which determines the data center, and the rack a Cassandra node belongs to, and it is set at the node level. They inform Cassandra about the network topology so that requests are routed efficiently and allow Cassandra to distribute replicas by grouping machines into data centers and racks. GossipingPropertyFileSnitch is the goto snitch for any cluster deployment. It uses a configuration file called cassandra-rackdc.properties on each node. It contains the rack and data center name which hosts the node. There are cloud-specific snitch available for AWS and GCP. 

The second setting is the replication strategy. The replication strategy is set at keyspace level. There are two strategies: SimpleStrategy and NetworkTopologyStrategy. The SimpleStrategy does not consider racks and multiple data centers. It places data replicas on nodes sequentially. The NetworkTopologyStrategy is rack aware and data center aware. SimpleStrategy should be only used for temporary and small cluster deployments, for all other clusters NetworkTopologyStrategy is highly recommended. A keyspace definition when used with NetworkTopologyStrategy specifies the number of replicas per data center as:

cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy', dc_1: 3, dc_2: 1}

Here, the keyspace named ks is replicated in dc_1 with factor three and in dc_2 with factor one.

Consistency and Availability

Each distributed system works on the principle of CAP theorem. The CAP theorem states that any distributed system can strongly deliver any two out of the three properties: Consistency, Availability and Partition-tolerance. Cassandra provides flexibility for choosing between consistency and availability while querying data. In other words, data can be highly available with low consistency guarantee, or it can be highly consistent with lower availability. For example, if there are three data replicas, a query reading or writing data can ask for acknowledgments from one, two, or all three replicas to mark the completion of the request. For a read request, Cassandra requests the data from the required number of replicas and compares their write-timestamp. The replica with the latest write-timestamp is considered to be the correct version of the data. Hence, the more replicas involved in a read operation adds to the data consistency guarantee. For write requests, the requested number is considered for replicas acknowledgeing the write. 

Naturally, the time required to get the acknowledgement from replicas is directly proportional to the number of replicas requests for acknowledgement. Hence, consistency and availability are exchangeable. The concept of requesting a certain number of acknowledgements is called tunable consistency and it can be applied at the individual query level. 

There are a few considerations related to data availability and consistency: 

  • The replication factor should ideally be an odd number. The common replication factor used is three, which provides a balance between replication overhead, data distribution, and consistency for most workloads.    
  • The number of racks in a data center should be in multiples of the replication factor. The common number used for nodes is in multiples of three. 
  • There are various terms used to refer to the consistency levels – 
    • One, two, three: Specified number of replicas must acknowledge the operation.
    • Quorum: The strict majority of nodes is called a quorum. The majority is one more than half of the nodes. This consistency level ensures that most of the replicas confirm the operation without having to wait for all replicas. It balances the operation efficiency and good consistency. e.g.Quorum for a replication factor of three is (3/2)+1=2; For replication factor five it is (5/2)+1=3.
    • Local_*: This is a consistency level for a local data center in a multi-data center cluster. A local data center is where the client is connected to a coordinator node. The * takes a value of any specific number specified above or quorum, e.g. local_three, local_quorum. 
    • Each_*: This level is also related to multi data center setup. It denotes the consistency to be achieved in each of the data centers independently, e.g. each_quorum means quorum consistency in each data center. 

The data written and read at a low consistency level does not mean it misses the advantage of replication. The data is kept consistent across all replicas by Cassandra, but it happens in the background. This concept is referred to as eventual consistency. In the three replica example, if a user queries data at consistency level one, the query will be acknowledged when the read/write happens for a single replica. In case of a read operation, this could mean relying on a single data replica as a source of truth for the data. In case of a write operation, the remainder replicas receive the data later on and are made consistent eventually. In case of failure of replication, the replicas might not get the data. Cassandra handles replication shortcomings with a mechanism called anti-entropy which is covered later in the post. 

Query Interface

Cassandra Query Language CQL is the interface to query Cassandra with a binary protocol. Earlier versions of Cassandra supported thrift which is now entirely replaced by CQL. CQL is designed to be similar to SQL for a quicker learning curve and familiar syntax. The DDL operations allow to create keyspace and tables, the CRUD operations are select, insert, update, and delete where select is a Cassandra read operation, and all others are Cassandra write operations. 

A table definition includes column definitions and primary, partition, and clustering keys. The table definition also contains several settings for data storage and maintenance. The primary key is a combination of partition key and clustering columns. The clustering columns are optional. The partition key can be a single column or a composite key. 

The query set available in CQL is quite limited as compared to SQL. A few highlights: 

  • Cassandra does not support join operations and nested queries. 
  • Each select query should specify a complete partition key. It is possible to query multiple partitions, but not recommended. Refer cassandra scalability 
  • Cassandra supports a limited set of data aggregation operations.
  • The order by clause can be used only for columns in the clustering key. Also, those should be used in the correct order of precedence.

The reason for a limited query set in Cassandra comes from specific data modelling requirements. The data model for a Cassandra database should be aimed to create denormalized tables which can cater to the select query patterns. Cassandra data modeling is one of the essential operations while designing the database. All the features provided by Cassandra architecture like scalability and reliability are directly subject to an optimum data model. Refer cassandra-data-modelling for details on the topic.  

The Cassandra driver program provides a toolset for connection management, pooling, and querying. The driver creates a connection with a Cassandra node which is then referred to as the coordinator node for the query. The coordinator is responsible for query execution and to aggregate partial results. 

The Datastax Java Driver is the most popular, efficient and feature rich driver available for Cassandra. There are several other technology drivers which provide similar functionality. 

Data Storage

Cassandra uses commit log for each incoming write request on a node. Commit log is a write-ahead log, and it can be replayed in case of failure. The on-disk data structure is called SSTable. SSTables are created per table in the database. 

Example:

Consider a sample keyspace and table created as follows.

cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy','datacenter_1' : 3};

cqlsh> CREATE TABLE ks.tb (
    id int PRIMARY KEY,
    col1 text);

And insert some data:
cqlsh> insert into ks.tb (id, col1) values (1, 'first_row');
cqlsh> insert into ks.tb (id, col1) values (2, 'second_row');
cqlsh> insert into ks.tb (id, col1) values (3, 'third_row');

The data we inserted looks as given below in an SSTable. 

Note that this representation is obtained by a utility to generate human-readable data from SSTables. The actual data in SSTables is in binary format and compressed for efficiency.

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 33,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:07.756013Z" },
        "cells" : [
          { "name" : "col1", "value" : "first_row" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "2" ],
      "position" : 34
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 71,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:29.923397Z" },
        "cells" : [
          { "name" : "col1", "value" : "second_row" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "3" ],
      "position" : 72
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 108,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:39.282459Z" },
        "cells" : [
          { "name" : "col1", "value" : "third_row" }
        ]
      }
    ]
  }
]

Cassandra maintains immutability for data storage to provide optimal performance. Hence, SSTables are immutable. The updates and deletes to data are handled with a new version of data. This strategy results in multiple versions of data at any given time. Cassandra is designed to be optimistic for write operations as compared to the read operations. The read operation consolidates all versions of the data and returns the most recent version. Each data cell is written with a write-timestamp which specifies the time when the particular data was written. This timestamp is used to find the latest version of data while retrieving data for a read operation. 

In the above example, we update data for a column of id 1 and see the result:

cqlsh> update ks.tb set col1='updated_row_one' where id=1;

The resulting data in the SSTable for this update looks like:

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 39,
        "cells" : [
          { "name" : "col1", "value" : "updated_row_one", "tstamp" : "2020-04-14T13:38:37.794697Z" }
        ]
      }
    ]
  }
]

The data looks precisely the same to the newly inserted data. Cassandra identifies this and considers the updated value as it has greater timestamp value. 

The deletes are handled uniquely in Cassandra to make those compatible with immutable data. Each delete is recorded as a new record which marks the deletion of the referenced data. This special data record is called a tombstone. Cassandra read operation discards all the information for a row or cell if a tombstone exists, as it denotes deletion of the data. There are various types of tombstones to denote data deletion for each element, e.g. cell, row, partition, range of rows etc. Cassandra allows setting a Time To Live TTL on a data row to expire it after a specified amount of time after insertion. The data once past its TTL is regarded as a tombstone in Cassandra. Refer managing-tombstones-in-cassandra for operational information and efficiency about tombstones. 

Now with the SSTable example, a cell delete looks like:

cqlsh> delete col1 from ks.tb where id=1;

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 24,
        "cells" : [
          { "name" : "col1", "deletion_info" : { "local_delete_time" : "2020-04-14T13:44:27Z" },
            "tstamp" : "2020-04-14T13:44:27.179254Z"
          }
        ]
      }
    ]
  }
]

The deletion_info indicates that the cell is deleted. This data is the tombstone for the original data and all the data versions. 

Cassandra performs compaction operation on SSTables which consolidates two or more SSTables to form a new SSTable. This process combines all versions of data in participating SSTables. The compaction outputs a single version of data among all obtained versions in the resulting SSTable. Compactions also purge the data associated with a tombstone if all the required conditions for purging are met. There are various strategies to trigger and perform compaction. Refer apache-cassandra-compactions

  • SizeTieredCompactionStrategy (STCS): This is the default compaction strategy. It is triggered using the size of SSTables on-disk. 
  • LevelledCompactionStrategy (LCS): This strategy is used to optimize read performance. This strategy considers the data partitions present in SSTables, and arranges SSTables in levels. Each level has a fixed set of tables and those are compacted with each other.  
  • TimeWindowCompactionStrategy (TWCS): This is a specialized strategy for time series data. It arranges SSTables in time window buckets defined in the table definition. The SSTables within a time window are only compacted with each other. 

The other crucial set of operations performed in Cassandra is anti-entropy. The aim of these operations is to keep data as consistent as possible. The anti-entropy enables Cassandra to provide the eventual consistency model.

  • Hinted Handoff: If a node in Cassandra is not available for a short period, the data which is supposed to be replicated on the node is stored on a peer node. This data is called hints. Once the original node becomes available, the hints are transferred to the node, and the node is caught up with missed data. There are time and storage restrictions for hints. If a node is not available for a longer duration than configured, no hints are saved for it. Hints cannot be considered as a primary anti-entropy mechanism.
  • Read Repair: Read operation is used as an opportunity to repair inconsistent data across replicas. The latest write-timestamp is used as a marker for the correct version of data. The read repair operation is performed only in a portion of the total reads to avoid performance degradation. Read repairs are opportunistic operations and not a primary operation for anti-entropy.
  • Repair: Repair is the primary anti-entropy operation to make data consistent across replicas. Repairs are performed by creating specialized data structures called Merkel-trees. These are hash values of all data values in a replica. Then these are transferred to other replicas and compared to detect inconsistencies. The correct data is then streamed across nodes to repair the inconsistencies.

Repairs need to be scheduled manually as these are intensive operations that consume a significant amount of cluster resources. 

Write Path

Cassandra write path is the process followed by a Cassandra node to store data in response to a write operation. A coordinator node initiates a write path and is responsible for the request completion. 

The high-level steps are as follows:

  1. The partitioner applies hash to the partition key of an incoming data partition and generates a token.
  2. The node is identified where the partition belongs to and all the nodes where the replicas reside for the partition.
  3. Write request is forwarded to all replica nodes, and acknowledgement is awaited. 
  4. As the number of nodes required to fulfil the write consistency level acknowledge the request completion, the write operation completes. 

An example with a six node cluster, a replication factor of three and a write request consistency of quorum. 

Quorum for RF 3 = (3/2)+1 = 2

Common error scenarios:

  1. If the sufficient number of nodes required to fulfil the request are not available, or do not return the request acknowledgement, coordinator throws an exception.  
  2. Even after satisfying the request with the required number of replica acknowledgements, if an additional node which stores a replica for the data is not available,  the data could be saved as a hint on another node. 

In a multi-data center cluster, the coordinator forwards write requests to all applicable local nodes. For the remote data centers, the write request is forwarded to a single node per data center. The node replicates data to the data center with the required number of nodes to satisfy the consistency level. 

The Anatomy of a Write Operation on a Node

This operation involves commit log, memtable and SSTable. Commit log is a write-ahead log which is stored on-disk. The write operation is recorded in the commit log of a node, and the acknowledgement is returned. The data is then stored in a memtable which is in memory structure representing SSTable on-disk. The memtable is flushed to disk after reaching the memory threshold which creates a new SSTable. The SSTables are eventually compacted to consolidate the data and optimize read performance.

Read Path 

Cassandra read path is the process followed by a Cassandra node to retrieve data in response to a read operation. The read path has more steps than the write path. Actions performed to serve a read request are as follows:

  1. The coordinator generates a hash using the partition key and gathers the replica nodes which are responsible for storing the data.
  2. The coordinator checks if replicas required to satisfy the read consistency level are available. If not, an exception is thrown, and the read operation ends.
  3. The coordinator then sends a read data request to the fastest responding replica; the fastest replica could be the coordinator itself. The fast replica is determined by dynamic snitch, which keeps track of node latencies dynamically.
  4. The coordinator then sends a digest request to the replicas of data. The digest is a hash calculated over requested data by the replica nodes.
  5. The coordinator compares all the digests to determine whether all the replicas have a consistent version of the data. If those are equal, it returns the result obtained from the fastest replica.

If the digests from all the replicas are not equal, it means some replicas do not have the latest version of the data. In this case, read data requests for all the replicas are issued, and the data with the latest timestamp is returned to the client. Also, read repair requests are issued for the replicas which do not have the latest data version.

Components involved in a read operation on a node:

  • Row cache: This is a cache for frequently read data rows, also referred to as hot data. It stores a complete data row which can be returned directly to the client if requested by a read operation. This is an optional feature and works best if there are a small number of hot rows which can fit in the row cache.
  • Partition key cache: This component caches the partition index entries per table which are frequently used. In other words, it stores the location of partitions which are commonly queried but not the complete rows. This feature is used by default in Cassandra, but it can be optimized more.
  • Bloom filter: A bloom filter is a data structure which indicates if a data partition could be included in a given SSTable. The positive result returned by a bloom filter can be a false signal, but the negative results are always accurate. Hence it saves a lot of seek-time for read operations.
  • Partition index and summary: A partition index contains offset of all partitions for their location in SSTable. The partition summary is a summary of the index. These components enable locating a partition exactly in an SSTable rather than scanning data.
  • Memtable: Memtable is in-memory representation of SSTables. If a data partition is present in memtable, it can be directly read for specific data rows and returned to the client.
  • Compression offset map: This is the map for locating data in SSTables when it is compressed on-disk. 
  • SSTable: The on-disk data structure which holds all the data once flushed from memory. 

Anatomy of Read Operation on a Node

  1. Cassandra checks the row cache for data presence. If present, the data is returned, and the request ends.
  2. The flow of request includes checking bloom filters. If the bloom filter indicates data presence in an SSTable, Cassandra continues to look for the required partition in the SSTable.
  3. The key cache is checked for the partition key presence. The cache hit provides an offset for the partition in SSTable. This offset is then used to retrieve the partition, and the request completes.
  4. Cassandra continues to seek the partition in the partition summary and partition index. These structures also provide the partition offset in an SSTable which is then used to retrieve the partition and return. The caches are updated if present with the latest data read. 

Conclusion

Cassandra architecture is uniquely designed to provide scalability, reliability, and performance. It is based on distributed system architecture and operates on CAP theorem.  Cassandra’s unique architecture needs careful configuration and tuning. It is essential to understand the components in order to use Cassandra efficiently.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post A Comprehensive Guide to Cassandra Architecture appeared first on Instaclustr.

Dial C* for Operator - Creating a Cassandra Cluster with Cass Operator

In this post we are going to take a deep dive look at provisioning a Cassandra cluster using the DataStax Kubernetes operator for Cassandra, Cass Operator. We will set up a multi-rack cluster with each rack in a different availability zone.

For the examples, I will use a nine node, regional cluster in Google Kubernetes Engine (GKE) that is spread across three zones. Here is what my Kubernetes cluster looks like:

$ kubectl get nodes --label-columns failure-domain.beta.kubernetes.io/region,failure-domain.beta.kubernetes.io/zone | awk {'print $1" "$6" "$7'} | column -t
NAME                                     REGION    ZONE
gke-cass-dev-default-pool-3cab2f1f-3swp  us-east1  us-east1-d
gke-cass-dev-default-pool-3cab2f1f-408v  us-east1  us-east1-d
gke-cass-dev-default-pool-3cab2f1f-pv6v  us-east1  us-east1-d
gke-cass-dev-default-pool-63ec3f9d-5781  us-east1  us-east1-b
gke-cass-dev-default-pool-63ec3f9d-blrh  us-east1  us-east1-b
gke-cass-dev-default-pool-63ec3f9d-g4cb  us-east1  us-east1-b
gke-cass-dev-default-pool-b1ee1c3c-5th7  us-east1  us-east1-c
gke-cass-dev-default-pool-b1ee1c3c-ht20  us-east1  us-east1-c
gke-cass-dev-default-pool-b1ee1c3c-xp2v  us-east1  us-east1-c

Operator Concepts

Without getting into too much detail, I want to quickly cover some fundamental concepts for some of the things we will discuss in this post. Kubernetes is made up of controllers. A controller manages the state of one more Kubernetes resource types. The controller executes an infinite loop continually trying to converge the desired state of resources with their actual state. The controller watches for changes of interest in the Kubernetes cluster, i.e., a resource added, deleted, or updated. When there is a change, a key uniquely identifying the effected resource is added to a work queue. The controller eventually gets the key from the queue and begins whatever work is necessary.

Sometimes a controller has to perform potentially long-running operations like pulling an image from a remote registry. Rather than blocking until the operation completes, the controller usually requeues the key so that it can continue with other work while the operation completes in the background. When there is no more work to do for a resource, i.e. the desired state matches the actual state, the controller removes the key from the work queue.

An operator consists of one or more controllers that manage the state of one or more custom resources. Every controller has a Reconciler object that implements a reconcile loop. The reconcile loop is passed a request, which is the resource key.

A Few Words on Terminology

A Kubernetes worker node, Kubernetes worker, or worker node is a machine that runs services necessary to run and manage pods. These services include:

  • kubelet
  • kube-proxy
  • container runtime, e.g., Docker

A Cassandra node is the process running in a container.

A Cassandra container is the container, i.e., Docker container, in which the Cassandra node is running.

A Cassandra pod is a Kubernetes pod that includes one more containers. One of those containers is running the Cassandra node.

Installing the Operator

Apply the cass-operator-manifests.yaml manifests as follows:

$ kubectl create -f https://raw.githubusercontent.com/datastax/cass-operator/b96bfd77775b5ba909bd9172834b4a56ef15c319/docs/user/cass-operator-manifests.yaml
namespace/cass-operator created
serviceaccount/cass-operator created
secret/cass-operator-webhook-config created
customresourcedefinition.apiextensions.k8s.io/cassandradatacenters.cassandra.datastax.com created
clusterrole.rbac.authorization.k8s.io/cass-operator-cluster-role created
clusterrolebinding.rbac.authorization.k8s.io/cass-operator created
role.rbac.authorization.k8s.io/cass-operator created
rolebinding.rbac.authorization.k8s.io/cass-operator created
service/cassandradatacenter-webhook-service created
deployment.apps/cass-operator created
validatingwebhookconfiguration.admissionregistration.k8s.io/cassandradatacenter-webhook-registration created

Note: The operator is deployed in the cass-operator namespace.

Make sure that the operator has deployed successfully. You should see output similar to this:

$ kubectl -n cass-operator get deployments
NAME            READY   UP-TO-DATE   AVAILABLE   AGE
cass-operator   1/1     1            1           2m8s

Create a Storage Class

We need to create a StorageClass that is suitable for Cassandra. Place the following in a file named server-storageclass.yaml:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: server
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-ssd
  replication-type: none
volumeBindingMode: WaitForFirstConsumer
reclaimPolicy: Delete

One thing thing to note here is volumeBindingMode: WaitForFirstConsumer. The default value is Immediate and should not be used. It can prevent Cassandra pods from being scheduled on a worker node. If a pod fails to run and its status reports a message like, had volume node affinity conflict, then check the volumeBindingMode of the StorageClass being used. See Topology-Aware Volume Provisioning in Kubernetes for more details.

Create the StorageClass with:

$ kubectl -n cass-operator apply -f server-storageclass.yaml
storageclass.storage.k8s.io/server-storage created

The Spec

Most Kubernetes resources define spec and status properties. The spec declares the desired state of a resource which includes configuration settings provided by the user, default values expanded by the system, and other properties initialized by other internal components after resource creation. We will talk about the status in a little bit.

The manifest below declares a CassandraDatacenter custom resource. It does not include all possible properties. It includes the minimum necessary to create a multi-zone cluster.

apiVersion: cassandra.datastax.com/v1beta1
kind: CassandraDatacenter
metadata:
  name: multi-rack
spec:
  clusterName: multi-rack
  serverType: cassandra
  serverVersion: 3.11.6
  managementApiAuth:
    insecure: {}
  size: 9
  racks:
  - name: us-east1-b
    zone: us-east1-b
  - name: us-east1-c
    zone: us-east1-c
  - name: us-east1-d
    zone: us-east1-d    
  storageConfig:
    cassandraDataVolumeClaimSpec:
      storageClassName: standard
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 5Gi

This spec declares a single Cassandra datacenter. Cass Operator does support multi-DC clusters. It requires creating a separate CassandraDatacenter for each datacenter. Discussion of multi-DC clusters is outside the scope of this post.

The size property specifies the total number of Cassandra nodes in the datacenter.

racks is an array of Rack objects which consist of name and zone properties. The zone should be the name of a zone in GCP (or an AWS zone if the cluster was running in AWS for example). The operator will use this to pin Cassandra pods to Kubernetes workers in the zone. More on this later.

Create the CassandraDatacenter

Put the above manifest in a file named multi-rack-cassdc.yaml and then run:

$ kubectl -n cass-operator apply -f multi-rack-cassdc.yaml
cassandradatacenter.cassandra.datastax.com/multi-rack created

This creates a CassandraDatacenter object named multi-rack in the Kubernetes API server. The API server provides a REST API with which clients, like kubectl, interact. The API server maintains state in etcd. Creating a Kubernetes resource ultimately means persisting state in etcd. When the CassandraDatacenter object is persisted, the API server notifies any clients watching for changes, namely Cass Operator. From here the operator takes over. The new object is added to the operator’s internal work queue. The job of the operator is to make sure the desired state, i.e., the spec, matches the actual state of the CassandraDatacenter.

Now that we have created the CassandraDatacenter, it is time to focus our attention on what Cass Operator is doing to build the Cassandra cluster.

Monitor the Progress

We will look at a couple things to monitor the progress of the provisioning or scaling up of the cluster:

  • Changes in the status of the CassandraDatacenter
  • Kubernetes events emitted by the operator

We have already discussed that the spec describes a resource’s desired state. The status on the other hand, describes the object’s current, observed state. Earlier I mentioned that the Kubernetes API server provides a REST API to clients. A Kubernetes object or resource is a REST resource. The status of a Kubernetes resource is typically implemented as a REST subresource that can only be modified by internal, system components. In the case of a CassandraDatacenter, Cass Operator manages the status property.

An event is a Kubernetes resource that is created when objects like pods change state, or when an error occurs. Like other resources, events get stored in the API server. Cass Operator generates a number of events for a CassandraDatacenter.

Understandng both the changes in a CassandraDatacenter’s status and the events emitted by the operator provide valuable insight into what is actually happening during the provisioning process. That understanding also makes it easier to resolve issues when things go wrong. This applies not only to CassandraDatacenter, but also to other Kubernetes resource as well.

Monitor Status Updates

We can watch for changes in the status with:

$ kubectl -n cass-operator get -w cassdc multi-rack -o yaml

In the following sections we will discuss each of the status updates that occur while the operator works to create the Cassandra cluster.

Initial Status Update

Here is what the status looks like initially after creating the CassandraDatacenter:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  nodeStatuses: {}

cassandraOperatorProgress can have one of two values, Ready or Updating. It will change to Ready when the operator has no more work to do for the resource. This simple detail is really important, particularly if you are performing any automation with Cass Operator. For example, I have used Cassandra operators to provision clusters for integration tests. With Cass Operator my test setup code could simply poll cassandraOperatorProgress to know when the cluster is ready.

conditions is an array of DatacenterCondition objects. A lot of Kubernetes resources use conditions in their statuses. Conditions represent the latest observations of an object’s state. They should minimally include type and status fields. The status field can have as its value either True, False, or Unknown. lastTransitionTime is the time the condition transitioned from one status to another. type identifies the condition. CassandraDatacenter currently has the following condition types:

  • Ready
  • Initialized
  • ReplacingNodes
  • ScalingUp
  • Updating
  • Stopped
  • Resuming
  • RollingRestart

Implementing, understanding, and using conditions are often points of confusion. It is intuitive to think of and model a resource’s state as a state machine. Reminding yourself conditions are observations and not a state machine will go a long way in avoiding some of that confusion. It is worth noting there has been a lot of debate in the Kubernetes community about whether conditions should be removed. Some of the latest discussions in this ticket indicate that they are will remain.

lastRollingRestart is only updated when a rolling restart is explicitly requested. As we will see its value will remain unchanged, and therefore we will be ignoring it for this post.

nodeStatuses is a map that provides some details for each node. We will see it get updated as nodes are deployed.

Cassandra Node Starting

With the next update we see that a lastServerNodeStarted property has been added to the status:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:41:24Z"
  nodeStatuses: {}

lastServerNodeStarted gets updated when a Cassandra node is starting up. The operator also adds the label cassandra.datastax.com/node-state: Starting to the Cassandra pod. The astute reader may have noted that I said the lastServerNodeStarted is updated when Cassandra node is starting up rather than when the pod is starting up. For Cass Operator, there is a important distinction between the Cassandra node and the Cassandra container. The Cassandra Container section at the end of the post goes over this in some detail.

Cassandra Node Started

In the next update lastServerNodeStarted is modified and another entry is added to nodeStatuses:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:41:50Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5

The entry is keyed by the pod name, multi-rack-multi-rack-us-east1-b-sts-2. The value consists of two fields - the node’s host id and its IP address.

When Cass Operator determines the Cassandra node is up running, it updates the node-state label to cassandra.datastax.com/node-state: Started. After the label update, the operator uses a label selector query to see which pods have been started and are running. When the operator finds another node running, its host ID and IP address will be added to nodeStatuses.

Remaining Nodes Started

In this section we follow the progression of the rest of the Cassandra cluster being started. lastServerNodeStarted is changed with each of these status updates in addition to nodeStatuses being updated.

multi-rack-multi-rack-us-east1-c-sts-0 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:42:49Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3

Next, multi-rack-multi-rack-us-east1-d-sts-0 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:43:53Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4

Next, multi-rack-multi-rack-us-east1-c-sts-2 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:44:54Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4

Next, multi-rack-multi-rack-us-east1-d-sts-0 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:45:50Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3

With five out of the nine nodes started, now is a good time point out a couple things. First, we see one node at a time is added to nodeStatuses. Based on this it stands to reason Cass Operator is starting nodes serially. That is precisely what is happening.

Secondly, there is roughly a minute difference between the values of lastServerNodeStarted in each status update. It is taking about a minute or so to start each node, which means it should take somewhere between nine and ten minutes for the cluster to be ready. These times will almost certainly vary depending on a number of factors like the type of disks used, the machine type, etc. It is helpful though, particularly for larger cluster sizes, to be able to gauge how long it will take to get the entire cluster up and running.

Next, multi-rack-multi-rack-us-east1-d-sts-2 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:46:51Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Next, multi-rack-multi-rack-us-east1-b-sts- is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:00Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Next, multi-rack-multi-rack-us-east1-c-sts-1 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Finally, multi-rack-multi-rack-us-east1-b-sts-1 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Although all nine nodes are now started, the operator still has more work to do. This is evident based on the ScalingUp condition still being True and cassandraOperatorProgress still having a value of Updating.

Cassandra Super User Created

With the next update the superUserUpserted property is added to the status:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

superUserUpserted is the timestamp at which the operator creates a super user in Cassandra. We will explore this in a little more detail when we go through the events.

ScalingUp Transition

In this update the ScalingUp condition transitions to False. This condition changes only after all nodes have been started and after the super user has been created.

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "False"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

Add Initialized and Ready Conditions

Next, the operator adds the Initialized and Ready conditions to the status. Initialized means the CassandraDatacenter was successfully created. The transition for this condition should only happen once. Ready means the cluster can start serving client requests. The Ready condition will remain True during a rolling restart for example but will transition to False when all nodes are stopped. See The Cassandra Container section at the end of the post for more details on starting and stopping Cassandra nodes.

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "False"
    type: ScalingUp
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Initialized
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Ready
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

Final Status Update

In the last update, the value of cassandraOperatorProgress is changed to Ready:

status:
  cassandraOperatorProgress: Ready
  conditions:
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "False"
    type: ScalingUp
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Initialized
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Ready
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

We now know the operator has completed its work to scale up the cluster. We also know the cluster is initialized and ready for use. Let’s verify the desired state of the CassandraDatacenter matches actual state. We can do this with nodetool status and kubectl get nodes.

$ kubectl -n cass-operator exec -it multi-rack-multi-rack-us-east1-b-sts-0 -c cassandra -- nodetool status
Datacenter: multi-rack
======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  10.32.4.4  84.43 KiB  1            4.8%              c7e43757-92ee-4ca3-adaa-46a128045d4d  us-east1-d
UN  10.32.1.4  70.2 KiB   1            7.4%              3b1b60e0-62c6-47fb-93ff-3d164825035a  us-east1-b
UN  10.32.6.3  65.36 KiB  1            32.5%             dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76  us-east1-c
UN  10.32.3.3  103.54 KiB  1            34.0%             785e30ca-5772-4a57-b4bc-4bd7b3b24ebf  us-east1-d
UN  10.32.7.6  70.34 KiB  1            18.1%             a55082ba-0692-4ee9-97a2-a1bb16383d31  us-east1-c
UN  10.32.8.5  65.36 KiB  1            19.8%             facbbaa0-ffa7-403c-b323-e83e4cab8756  us-east1-c
UN  10.32.2.6  65.36 KiB  1            36.5%             d7246bca-ae64-45ec-8533-7c3a2540b5ef  us-east1-b
UN  10.32.0.5  65.36 KiB  1            39.9%             62399b3b-80f0-42f2-9930-6c4f2477c9bd  us-east1-b
UN  10.32.5.4  65.36 KiB  1            7.0%              8e8733ab-6f7b-4102-946d-c855adaabe49  us-east1-d

nodetool status reports nine nodes up across three racks. That looks good. Now let’s verify the pods are running where we expect them to be.

$ kubectl -n cass-operator get pods -l "cassandra.datastax.com/cluster=multi-rack" -o wide | awk {'print $1" "$7'} | column -t
NAME                                    NODE
multi-rack-multi-rack-us-east1-b-sts-0  gke-cass-dev-default-pool-63ec3f9d-5781
multi-rack-multi-rack-us-east1-b-sts-1  gke-cass-dev-default-pool-63ec3f9d-blrh
multi-rack-multi-rack-us-east1-b-sts-2  gke-cass-dev-default-pool-63ec3f9d-g4cb
multi-rack-multi-rack-us-east1-c-sts-0  gke-cass-dev-default-pool-b1ee1c3c-5th7
multi-rack-multi-rack-us-east1-c-sts-1  gke-cass-dev-default-pool-b1ee1c3c-ht20
multi-rack-multi-rack-us-east1-c-sts-2  gke-cass-dev-default-pool-b1ee1c3c-xp2v
multi-rack-multi-rack-us-east1-d-sts-0  gke-cass-dev-default-pool-3cab2f1f-3swp
multi-rack-multi-rack-us-east1-d-sts-1  gke-cass-dev-default-pool-3cab2f1f-408v
multi-rack-multi-rack-us-east1-d-sts-2  gke-cass-dev-default-pool-3cab2f1f-pv6v

Look carefully at the output, and you will see each pod is in fact running on a separate worker node. Furthermore, the pods are running on worker nodes in the expected zones.

Monitor Events

The operator reports a number of events useful for monitoring and debugging the provisioning process. As we will see, the events provide additional insights absent from the status updates alone.

There are some nuances with events that can make working with them a bit difficult. First, events are persisted with a TTL. They expire after one hour. Secondly, events can be listed out of order. The ordering appears to be done on the client side with a sort on the Age column. We will go through the events in the order in which the operator generates them. Lastly, while working on this post, I discovered that some events can get dropped. I created this ticket to investigate the issue. Kubernetes has in place some throttling mechanisms to prevent the system from getting overwhelmed by too many events. We won’t go through every single event as there are a lot. We will however cover enough, including some that may be dropped, in order to get an overall sense of what is going on.

We can list all of the events for the CassandraDatacenter with the describe command as follows:

$ kubectl -n cass-operator describe cassdc multi-rack
Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-b
  Normal  CreatedResource    12m    cass-operator  Created service multi-rack-seed-service
  Normal  CreatedResource    12m    cass-operator  Created service multi-rack-multi-rack-all-pods-service
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-b-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-c-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-d-sts
  Normal  CreatedResource    12m    cass-operator  Created service multi-rack-multi-rack-service
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-c
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-d
  Normal  LabeledPodAsSeed   12m    cass-operator  Labeled pod a seed node multi-rack-multi-rack-us-east1-b-sts-2
  Normal  StartingCassandra  12m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2
  Normal  StartedCassandra   11m    cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2
  Normal  StartingCassandra  11m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-0
  Normal  StartingCassandra  10m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-0
  Normal  StartedCassandra   10m    cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-0
  Normal  LabeledPodAsSeed   10m    cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-c-sts-0
  Normal  LabeledPodAsSeed   9m44s  cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-d-sts-0
  Normal  StartedCassandra   9m43s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-0
  Normal  StartingCassandra  9m43s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-2
  Normal  StartedCassandra   8m43s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-2
  Normal  StartingCassandra  8m43s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  Normal  StartedCassandra   7m47s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  Normal  StartingCassandra  7m46s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-2
  Normal  StartedCassandra   6m45s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-2
  Normal  StartingCassandra  6m45s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-0
  Normal  LabeledPodAsSeed   5m36s  cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-b-sts-0

In the following sections we will go through several of these events as well as some that are missing.

Create Headless Services

The first thing that Cass Operator does during the initial reconciliation loop is create a few headless services:

  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedResource    10m    cass-operator  Created service multi-rack-seed-service
  Normal  CreatedResource    10m    cass-operator  Created service multi-rack-multi-rack-all-pods-service
  Normal  CreatedResource    10m    cass-operator  Created service multi-rack-multi-rack-service    

multi-rack-seed-service exposes all pods running seed nodes. This service is used by Cassandra to configure seed nodes.

multi-rack-multi-rack-all-pods-service exposes all pods that are part of the CassandraDatacenter, regardless of whether they are ready. It is used to scrape metrics with Prometheus.

multi-rack-multi-rack-service exposes ready pods. CQL clients should use this service to establish connections to the cluster.

Create StatefulSets

Next the operator creates three StatefulSets, one for each rack:

  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-b-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-c-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-d-sts

I mentioned earlier the operator will use the zone property specified for each rack to pin pods to Kubernetes workers in the respective zones. The operator uses affinity rules to accomplish this.

Let’s take a look at the spec for multi-rack-multi-rack-us-east1-c-sts to see how this is accomplished:

$ kubectl -n cass-operator get sts multi-rack-multi-rack-us-east1-c-sts -o yaml
...
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: failure-domain.beta.kubernetes.io/zone
                operator: In
                values:
                - us-east1-c
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchExpressions:
              - key: cassandra.datastax.com/cluster
                operator: Exists
              - key: cassandra.datastax.com/datacenter
                operator: Exists
              - key: cassandra.datastax.com/rack
                operator: Exists
            topologyKey: kubernetes.io/hostname
...            

The nodeAffinity property constrains the worker nodes on which pods in the StatefulSet can be scheduled. requiredDuringSchedulingIgnoredDuringExecution is a NodeSelector which basically declares a query based on labels. In this case, if a node has the label failure-domain.beta.kubernetes.io/zone with a value of us-east1-c, then pods can be scheduled on that node.

Note: failure-domain.beta.kubernetes.io/zone is one of a number of well known labels that are used by the Kubernetes runtime.

I added emphasis to can be because of the podAntiAffinity property that is declared. It constrains the worker nodes on which the pods can be scheduled based on the labels of pods currently running on the nodes. The requiredDuringSchedulingIgnoredDuringExecution property is a PodAffinityTerm that defines labels that determine which pods cannot be co-located on a particular host. In short, this prevents pods from being scheduled on any node on which pods from a CassandraDatacenter are already running. In other words, no two Cassandra nodes should be running on the same Kubernetes worker node.

Note: You can run multiple Cassandra pods on a single worker node by setting .spec.allowMultipleNodesPerWorker to true.

Scale up the Racks

The next events involve scaling up the racks:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-b
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-c
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-d

The StatefulSets are initially created with zero replicas. They are subsequently scaled up to the desired replica count, which is three (per StatefulSet) in this case.

Label the First Seed Node Pod

After the StatefulSet controller starts creating pods, Cass Operator applies the following label to a pod to designate it as a Cassandra seed node:

cassandra.datastax.com/seed-node: "true"

At this stage in the provisioning process, no pods have the seed-node label. The following event indicates that the operator designates the pod to be a seed node:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  LabeledPodAsSeed   12m    cass-operator  Labeled pod a seed node multi-rack-multi-rack-us-east1-b-sts-2   

Note: You can use a label selector to query for all seed node pods, e.g., kubectl -n cass-operator get pods -l cassandra.datastax.com/seed-node="true".

Start the First Seed Node

Next the operator starts the first seed node:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  StartingCassandra  12m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2

The operator applies the label cassandra.datastax.com/node-state: Starting to the pod. The operator then requeues the request with a short delay, allowing time for the Cassandra node to start. Requeuing the request ends the current reconciliation.

If you are familiar with Kubernetes, this step of starting the Cassandra node may seem counter-intuitive because pods/containers cannot exist in a stopped state. See The Cassandra Container section at the end of the post for more information.

Update Status of First Seed Node Pod

In a subsequent reconciliation loop the operator finds that multi-rack-multi-rack-us-east1-b-sts-2 has been started and records the following event:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  StartedCassandra   11m    cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2

Then the cassandra.datastax.com/node-state label is updated to a value of Started to indicate the Cassandra node is now running. The event is recorded and the labeled is updated only when the Cassandra container’s readiness probe passes. If the readiness probe fails, the operator will requeue the request, ending the current reconciliation loop.

Start One Node Per Rack

After the first node, multi-rack-multi-rack-us-east1-b-sts-2, is running, the operator makes sure there is a node per rack running. Here is the sequence of events for a given node:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  StartingCassandra  8m43s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  
  Normal  StartedCassandra   7m47s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  
 Normal  LabeledPodAsSeed   9m44s  cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-d-sts-1 

Let’s break down what is happening here.

  • The cassandra.datastax.com/node-state: Starting label is applied to multi-rack-multi-rack-us-east1-d-sts-1
  • Cassandra is started
  • The request is requeued
  • On a subsequent reconciliation loop when Cassandra is running (as determined by the readiness probe), two things happen
    • The cassandra.datastax.com/seed-node="true" label is applied to the pod, making it a seed node
    • The cassandra.datastax.com/node-state label is updated to a value of Started

The operator will repeat this process for another rack which does not yet have a node running.

Now is a good time to discuss how the operator determines how many seeds there should be in total for the datacenter as well as how many seeds there should be per rack.

If the datacenter consists of only one or two nodes, then there will be one or two seeds respectively. If there are more than three racks, then the number of seeds will be set to the number of racks. If neither of those conditions hold, then there will be three seeds.

The seeds per rack are calculated as follows:

seedsPerRack = totalSeedCount / numRacks
extraSeeds = totalSeedCount % numRacks

For the example cluster in this post totalSeedCount will be three. Then seedsPersRack will be one, and extraSeeds will be zero.

Start Remaining Nodes

After we have a Cassandra node up and running in each rack, the operator proceeds to start the remaining non-seed nodes. I will skip over listing events here because they are the same as the previous ones. At this point the operator iterates over the pods without worrying about the racks. For each pod in which Cassandra is not already running, it will start Cassandra following the same process previously described.

Create a PodDisruptionBudget

After all Cassandra nodes have been started, Cass Operator creates a PodDisruptionBudget. It generates an event like this:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedResource    10m6s  cass-operator  Created PodDisruptionBudget multi-rack-pdb

Note: This is one of the dropped events.

A PodDisruptionBudget limits the number of pods that can be down from a voluntary disruption. Examples of voluntary disruptions include accidentally deleting a pod or draining a worker node for upgrade or repair.

All Cassandra pods in the CassandraDatacenter are managed by the disruption budget. When creating the PodDisruptionBudget, Cass Operator sets the .spec.minAvailable property. This specifies the number of pods that must be available after a pod eviction. Cass Operator sets this to the total number of Cassandra nodes minus one.

Create a Cassandra Super User

The final thing that Cass Operator does is to create a super user in Cassandra:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedSuperuser   10m6s  cass-operator  Created superuser

Earlier in the provisioning process, Cass Operator creates the super user credentials and stores them in a secret. The secret name can be specified by setting .spec.superuserSecretName.

The username is set <.spec.clusterName>-superuser which will be multi-rack-superuser for our example. The password is a random UTF-8 string less than or equal to 55 characters.

Note: Cass Operator disables the default super user, cassandra.

The Cassandra Container

Each Cassandra pod runs a container named cassandra. We need to talk about sidecars before we talk about the cassandra container. The sidecar pattern is a very well-known and used architectural pattern in Kubernetes. A pod consists of one or more containers. The containers in a pod share the same volume and network interfaces. Examples for sidecars include things like log aggregation, gRPC proxy, and backup / restore to name a few. Cass Operator utilizes the sidecar pattern but in a more unconventional manner.

We can take a look at the spec of one of the Cassandra pods to learn more about the cassandra container. Because we are only focused on this one part, most of the output is omitted.

$ kubectl -n cass-operator get pod multi-rack-multi-rack-us-east1-b-sts-0 -o yaml
apiVersion: v1
kind: Pod
...
spec:
...
      containers:
      - env:
        - name: DS_LICENSE
          value: accept
        - name: DSE_AUTO_CONF_OFF
          value: all
        - name: USE_MGMT_API
          value: "true"
        - name: MGMT_API_EXPLICIT_START
          value: "true"
        - name: DSE_MGMT_EXPLICIT_START
          value: "true"
        image: datastax/cassandra-mgmtapi-3_11_6:v0.1.0
        imagePullPolicy: IfNotPresent
        livenessProbe:
          failureThreshold: 3
          httpGet:
            path: /api/v0/probes/liveness
            port: 8080
            scheme: HTTP
          initialDelaySeconds: 15
          periodSeconds: 15
          successThreshold: 1
          timeoutSeconds: 1
        name: cassandra
        ports:
        - containerPort: 9042
          name: native
          protocol: TCP
        - containerPort: 8609
          name: inter-node-msg
          protocol: TCP
        - containerPort: 7000
          name: intra-node
          protocol: TCP
        - containerPort: 7001
          name: tls-intra-node
          protocol: TCP
        - containerPort: 8080
          name: mgmt-api-http
          protocol: TCP
        readinessProbe:
          failureThreshold: 3
          httpGet:
            path: /api/v0/probes/readiness
            port: 8080
            scheme: HTTP
          initialDelaySeconds: 20
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 1
        resources: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /config
          name: server-config
        - mountPath: /var/log/cassandra
          name: server-logs
        - mountPath: /var/lib/cassandra
          name: server-data
...          

There are two lines in the output on which we want to focus. The first line is:

name: cassandra

This is the name of the container. There are other containers listed in the output, but we are only concerned with the cassandra one.

The second line that we are interested in is:

image: datastax/cassandra-mgmtapi-3_11_6:v0.1.0

The image property specifies the image that the cassandra container is running. This is different from the Cassandra images such as the ones found on Docker Hub. This image is for the Management API Sidecar. There have been lots of discussions on the Cassandra community mailing lists about management sidecars. In fact there is even a Cassandra Enhancement Proposal (CEP) for providing an official, community based sidecar. The Management API Sidecar, or management sidecar for short, was not designed specifically for Kubernetes.

The process started in the cassandra container is the management sidecar rather than the CassandraDaemon process. The sidecar is responsible for starting/stopping the node. In addition to providing lifecycle management, the sidecar also provides configuration management, health checks, and per node actions (i.e., nodetool).

There is plenty to more to say about the management sidecar, but that is for another post.

Wrapping Up

Hopefully this post gives you a better understanding of Cass Operator and Kubernetes in general. While we covered a lot of ground, there is plenty more to discuss like multi-DC clusters and the management sidecar. If you want to hear more about Cassandra and Kubernetes, Patrick McFadin put together a series of interviews where he talks to early adopters in the field. Check out “Why Tomorrow’s Cassandra Deployments Will Be on Kubernetes” It will be available for streams as a part of the DataStax Accelerate online conference https://dtsx.io/3ex1Eop.

Comparing stress tools for Apache Cassandra

Editors Note: The Last Pickle was recently acquired by DataStax and as part of the new DataStax mission of reorienting to embrace open source Apache Cassandra, this is the first in a series of blog posts that will compare new open source offerings, particularly those now coming out of the new DataStax. In open source spirit we want to embrace you, the community, in choosing the right tool for the right job.

Benchmarking and stress testing Apache Cassandra are important exercises that both operators and developers do regularly. Approaches come in numerous flavours, from “look how my code is better than everyone else’s”, to “what do I need to run this?” and “how much money will this save me?”, and my favourite, “when this dies a horrible death what will that look like?”.

Knowing what you want to achieve and how to interpret the results of these tests is a big part of the challenge. With a run through of these available tools, hopefully that will become easier.

Comparing stress tools for Apache Cassandra


This blog post will look at and compare the following three stress tools:


With these three tools we will step through a number of basic use cases:

  1. Just Generate Some Load
  2. Using a Real Cluster
  3. Iteration and Thread Counts
  4. Observability
  5. Batch sizes and Overwrites
  6. Predefined Workloads
  7. Custom Workloads
  8. Client Contention and Server Saturation

The versions of the tools used in these steps are 3.11.6 for cassandra-stress, 4.0.0 for tlp-stress, and 3.12.77 for nosqlbench.


1. Just Generate Some Load

Sometimes all you want to do is generate some load or data. This is good for when all we want is a cassandra node that is doing something. It can be just to raise the CPU, or to generate some commitlogs, memtables, or sstables on disk.

Each tool will generate a slightly different load configuration for these tests:

$ cassandra-stress write

      Performs over a million writes (after an initial 50k warmup writes) iterating a number of times increasing the number of threads used in the client, starting with four threads.

$ tlp-stress run BasicTimeSeries -i 1M

      Performs exactly one million requests with a 9:1 write-to-read ratio.

$ nb cql-iot write_cl=LOCAL_ONE

      Performs ten million writes during a warmup phase and then ten million requests with a 9:1 write-to-read ratio.

All of them execute writes connected to a localhost Cassandra node, using the java-driver and consistency level LOCAL_ONE.

There is a difference in the model, however, as cassandra-stress uses a simple key value table, while tlp-stress and nosqlbench are using time-series data models.


2. Using a Real Cluster

This repeats the exercise of just generating any load or data, but is used when you have an actual cluster you are targeting.

$ cassandra-stress write -node cassandra1

$ tlp-stress run BasicTimeSeries --host cassandra1

$ nb cql-iot host=cassandra1

Note: There is no need to specify multiple hosts with any of these stress tools. These are contact hosts that are passed to the java driver, and unlike a coded application where you would want multiple contact hosts specified for reliability during deployment and startup, with a stress tool invocation it is reasonable to expect the single contact host specified to be up and running.


3. Iteration and Thread Counts

The following shows how to specify the number of iterations and the number of threads to use, in each of the tools.

$ cassandra-stress write n=100000 -rate threads=16

$ tlp-stress run BasicTimeSeries -n 100k -t 16

$ nb cql-iot write_cl=LOCAL_ONE main-cycles=100k threads=16


4. Observability

Even with well-designed workloads there is a lot more to benchmarking than the final throughput numbers. We want to see how the cluster operates over time. This can be from spikes in traffic to the many background operations Cassandra can perform. Taking a closer look at how Cassandra performs helps plan for a healthy and stable cluster over a longer period of time than what we are able to benchmark.


cassandra-stress
$ cassandra-stress write -graph file=example-benchmark.html title=example revision=benchmark-0

      For more information on this, read our previous blog post on Cassandra-Stress and Graphs.  


nosqlbench
$ nb cql-iot write_cl=LOCAL_ONE --docker-metrics

      Then open http://localhost:3000 in your browser. Note this only works on linux and requires docker to be running on the host. For more info see here.

 

tlp-stress

      Note: tlp-stress has no similar observability feature, but does export Prometheus metrics on port 9501.

The out of the box generated graphs from cassandra-stress are a nice feature. For any serious benchmarking activity though you will want to have metrics from Cassandra graphed and to have insight into the stress tools behaviour beyond just performance numbers.


5. Batch sizes and Overwrites

The following invocation is of particular interest because it has been a pain for those using cassandra-stress. In Cassandra, unlogged batches are not normal and not recommended unless for very small groupings (10-20) of rows within the one partition.

cassandra-stress, by default, puts all writes for any partition into single batches, which makes for poor and unrealistic results. It is impossible to get cassandra-stress to not use batches, and quite convoluted to get it to write batches that consist only of single inserts. More info on this can be read in this ticket CASSANDRA-11105

Overwrite and deletes are not something we see a lot of among published Cassandra benchmarks because its harder to implement. Often this makes sense as workloads like key-value and time-series are likely not overwrite data models. Yet, there are plenty of data models out there that do require these patterns and that we would like to benchmark.


cassandra-stress

      First download batch_too_large.yaml.

$ cassandra-stress user profile=batch_too_large.yaml ops\(insert=1\) -insert visits=FIXED\(10M\)  


tlp-stress

      tlp-stress does not perform unlogged batches by default like cassandra-stress. If unlogged batches are desired you need to write your own workload, see the Custom Workloads section.

      tlp-stress does make deletes very easy, treating them in a similar fashion to the read rate flag. This will make 10% of the operations deletes of previously written data

$ tlp-stress run KeyValue --deletes 0.1

      tlp-stress does overwrites in a similar way to cassandra-stress. This will write 100k operations over 100 partitions. Without clustering keys, this is roughly 1k overwrites on each partition

$ tlp-stress run KeyValue -p 100 -n 100k  


nosqlbench

      nosqlbench can handle overwrites in the same manner as cassandra-stress and tlp-stress by providing a smaller partition count than the iteration count. nosqlbench does not currently provide any deletes or unlogged batch examples. Logged batches have been implemented with custom workloads, so deletes and unlogged batches are probably possible with a custom implemented workload.  


6. Predefined Workloads

cassandra-stress

      cassandra-stress does not have built in workloads. You need to specify the user mode and supply your own configuration as shown in the next section.  


tlp-stress

      tlp-stress has the most extensive list of workloads. These workloads have been used at TLP to demonstrate real limitations with certain features and to provide a hands on approach to recommending the best production solutions.

$ tlp-stress list

    Available Workloads:

    AllowFiltering
    BasicTimeSeries
    CountersWide
    KeyValue
    LWT
    Locking
    Maps
    MaterializedViews
    RandomPartitionAccess
    Sets
    UdtTimeSeries

$ tlp-stress run CountersWide  


nosqlbench

      nosqlbench lists the workloads from its predefined yaml workload files. Within these workloads it lists the different phases that are used, and that can be combined. This offers us our first glimpse of how complex and specific a workload can be defined. It also lists the sequences workload, which is not based on the cql driver.

$ nb --list-workloads

    from: cql-keyvalue.yaml
        …
    from: cql-iot.yaml
        …
    from: cql-iot-dse.yaml
        …
    from: cql-tabular.yaml
        …
    from: sequences.yaml
        …


$ nb cql-tabular  


7. Custom Workloads

A benchmark that is part of a feasibility or capacity planning exercise for production environments will nearly always require a custom defined workload.  


cassandra-stress

      For cassandra-stress an example of this was done for the Zipkin project. cassandra-stress can not benchmark more than one table at a time, so there is a separate workload yaml for each table and these have to run as separate invocations. Here we see that cassandra-stress does not support Zipkin’s original schema, specifically UDTs and collections, so the folder above also contains some cql files to create a schema we can stress.

      Create the zipkin test schema

cqlsh -f zipkin2-test-schema.cql

      Fill this schema with some data, throttle as appropriate

$ cassandra-stress  user profile=span-stress.yaml ops\(insert=1\) no-warmup duration=1m  -rate threads=4 throttle=50/s

      Now benchmark a mixed read and write workload, again throttle as appropriate

$ cassandra-stress  user profile=span-stress.yaml ops\(insert=1,by_trace=1,by_trace_ts_id=1,by_annotation=1\)  duration=1m  -rate threads=4 throttle=50/s  

As can be seen above, creating custom workloads in cassandra-stress has always been an involved and difficult experience. While tlp-stress and nosqlbench improve on this situation, they each do so in different ways.


nosqlbench

      nosqlbench provides all of its workload configurations via yaml files. Getting the hang of these will be quite daunting for the newcomer, but along with the documentation provided, and practicing first with taking and tweaking the predefined workloads, there’s a wealth of possibility here.  


tlp-stress

      tlp-stress on the other hand focuses on writing workloads in the code. tlp-stress is written in Kotlin, so if you find Kotlin enjoyable you will find it quick and intuitive to write workloads. The existing workloads can be found here, take a peek and you will see that they are quite simple to write.  


8. Client Contention and Server Saturation

Which benchmark tool is faster? That may sound like a weird question, but it opens some real concerns. Not just in choosing what hardware to run the client on, or how many clients are required, but to know when the results you are getting are nonsense. Understanding the load you want to generate versus what you need to measure is as important to benchmarking as the workload.

It is important to avoid saturating the server. Any benchmark that pushes throughput to its limit is meaningless. A real world (and overly simplified) comparison of this is in OLAP clusters, like those paired with Apache Spark, where without appropriate thresholds put onto the spark-cassandra-connector you can get a yo-yo effect on throughput as the cluster saturates, jams up, and then accepts writes again. With tuning and throughput thresholds put into place, higher throughput is sustainable over time. Responsiveness Under Load (RUL) benchmark is where we apply such throughput limits and observe responsiveness instead.

These problems extend into the client stress tool as well. Unlike the server that can block or load-shed at the defined throughput threshold, the client’s throughput of operations can be either limited or scheduled. This difference can be important, but explaining it goes beyond this blog post. For those interested I’d recommend reading this post on Fixing Coordinated Omission in Cassandra Stress.  


cassandra-stress
$ cassandra-stress write -rate threads=4 fixed=50/s  


nosqlbench

      nosqlbench has no scheduler per se, but deals with reducing coordinated omission via asynchronous requests and a non-fixed thread count. More information on nosqlbench’s timing terminology can be found here.

$ nb cql-iot cyclerate=50 async=256 threads=auto

      Very few production clusters ever demonstrate constant throughput like this, so benchmarking bursts is a real thing. Currently only nosqlbench does this in-process.

$ nb cql-iot cyclerate=50,1.5 async=256 threads=auto

      This specifies a rate threshold of 50 operations per second, with bursts of up to 50%. More information on bursts is available here


tlp-stress

      tlp-stress does not deal with Coordinated Omission. Its --rate flag relies on google’s RateLimiter and limits the throughput, but does not schedule.  


Documentation

Looking through the documentation for each of the tools it is easy to see that nosqlbench offers substantially more. But tlp-stress docs are elegant and easy for the beginner, though they are still missing information on how to implement your own workload (or profile as tlp-stress refers to them).


Wrap Up

cassandra-stress is an advanced tool for very narrow applications against Cassandra. It is quickly a clumsy user-experience and often requires adventures into some awkward code to understand and get things working as expected.

tlp-stress was written as a replacement to cassandra-stress. Apart from not dealing with Coordinated Omission it succeeds in that goal in every aspect: good documentation, a rich command-line user-interface, and is an easy code to understand and contribute to.

nosqlbench takes the next step, aiming to be a YCSB replacement. It feels like a power-user’s tool and comes with the features and capacity to earn that title. Expect to see more and more workloads be made available for testing lots of different technologies in the NoSQL world.

An Introduction to Cassandra Multi-Data Centers: Part 2

In this second blog of  “Around the World in (Approximately) 8 Data Centers” series we catch our first mode of transportation (Cassandra) and explore how it works to get us started on our journey to multiple destinations (Data Centers).

1. What Is a (Cassandra) Data Center?

What does a Data Center (DC) look like? Here are some cool examples of DCs (old and new)!

Arguably the first “electronic” data center was ENIAC, circa 1946. It was, however, just a single (albeit monster) machine! It weighed more than 30 tonnes, occupied 1,800 square feet, consumed 200kW of power, got up to 50C inside, and was rumoured to cause blackouts in neighboring Philadelphia when it was switched on!

first electronic data centre

Jumping forward to the present, a photo of Google DC shows mainly cooling pipes. In common with ENIAC, power and heat are still a feature of modern DCs:

Google data centre

Google Data Center (plumbing)

So what is a Cassandra Data Center?! Ever since I started using Cassandra I’ve been puzzled about Cassandra Data Centers (DCs). When you create a keyspace you typically also specify a DC name, for example:

CREATE KEYSPACE here
    WITH replication = {'class': 'NetworkTopologyStrategy', ‘DCHere’ : 3};

The NetworkTopologyStrategy is a production ready replication strategy that allows you to have an explicit DC name. But why do you need an explicit DC name? The reason is that you can actually have more than one data center in a Cassandra Cluster, and each DC can have a different replication factor, for example, here’s an example with two DCs:

CREATE KEYSPACE here_and_there
    WITH replication = {'class': 'NetworkTopologyStrategy', ‘DCHere’ : 3,  ‘DCThere' : 3};

So what does having multiple DCs achieve? Powerful automatic global replication of data! Essentially you can easily create a globally distributed Cassandra cluster where data written to the local DC is asynchronously replicated to all the other DCs in the keyspace.  You can have multiple keyspaces with different DCs and replication factors depending on how many copies and where you want your data replicated to.

But where do the Cassandra DCs come from? Well, it’s easy to create a cluster in a given location and Data Center name in Instaclustr Managed Cassandra!

When creating a Cassandra cluster using the Instaclustr console, there is a section called “Data Center” where you can select from options including: 

Infrastructure Provider, Region, Custom Name, Data Center Network address block, Node Size, EBS Encryption option, Replication Factor, and number of nodes.

The Custom Name is a logical name you can choose for a data center within Cassandra, and is how you reference the data center when you create a keyspace with NetworkTopologyStrategy

So that explains the mystery of single Cassandra Data Center creation. What does this look like once it’s provisioned and running? Well, you can use CQLSH to connect to a node in the cluster, and then discover the data center you are connected to as follows:

cqlsh> use system;
cqlsh:system> select data_center from local;

data_center
-------------
DCHere

How about multiple DCs?  Using Instaclustr Managed Cassandra the simplest way of creating multiple DC Cassandra clusters is to create a single DC cluster first (call it ‘DCHere’). Then in the management console for this cluster, click on “Add a DC”.  You can add one DC at a time to create a cluster with the total number of DCs you need, just follow our support documentation here and here.

2. Multi-Data Center Experiments With CQLSH

So, to better understand how Cassandra DCs work I created a test cluster with 3 nodes in each of three DCs, located in Sydney, Singapore, and North Virginia (USA) AWS regions (9 nodes in total) as follows:

Cassandra test cluster with 3 nodes in each of three DCs
For this experiment, I used cqlsh running on my laptop, located in Canberra (close to Sydney). My initial goal was limited simply to explore latencies and try out failures of DCs. 

To measure latency I turned “tracing on”, and to simulate DC failures I created multiple keyspaces, connected cqlsh to different DCs, and used different consistency levels. 

I created three separate keyspaces for each DC location. This doesn’t result in data replication across DCs, but instead directs all data written to any local DC to the single DC with RF > = 1. I.e. All data will be written to (and read from) the DCSydney DC for the “sydney” keyspace:

Create keyspace "sydney" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 0, 'DCSingapore' : 3, 'DCUSA' : 0 }; 

Create keyspace "singapore" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 0, 'DCUSA' : 0 }; 

Create keyspace "usa" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 0, 'DCUSA' : 0 };

I used a fourth keyspace for replication. Because this has multiple DCs with RF >= 1 the data will be replicated across all of the DCs, i.e. data written to any local DC will be written locally as well as to all other DCs:

Create keyspace "everywhere" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 3, 'DCUSA' : 3 };

2.1 Latency

First let’s look at latency.  To run latency tests I connected cqlsh to the Sydney Data Center.

I varied which keyspace I was writing or reading to/from (Keyspace column), and used consistency level ONE for all of these. ONE means that a write must be written to and acknowledged by at least one replica node, in any DC, so we don’t expect any write/read errors due to writing/read in a local DC which is different to the DC’s in the keyspace. The results show that latency increases from a minimum of 1-2ms (Sydney), to 200ms (Singapore) and 231ms (USA). Compared to the average inter-region network latencies I reported in the previous blog these latencies are 14% higher—the Singapore latency is 200ms (c.f. 174ms), and the USA latency is 231ms (c.f. 204ms). Longer times are to be expected as there is a Cassandra write or read included in this time, on top of the basic network latency. As expected (using consistency ONE) all of the operations succeeded. This table shows the results:

What does this reveal about how Cassandra keyspaces and DCs work? Cqlsh is connected to the Sydney DC as the local DC. For the keyspaces that just have a single DC, the write or read operation can only use that DC and therefore includes the overhead of network latency for the local DC to communicate with the remote DC (with no network overhead for sydney). However, for the “everywhere” keyspace which contains all three DCs, it behaves as if it’s just using the local DC and therefore has a low latency indistinguishable to the results for the “sydney” keyspace. The difference is that the row is written to all the other DCs asynchronously, which does not impact the operation time.  This picture shows the latencies on a map:

3 Cassandra DC's Latencies

2.2 Failures

I also wanted to understand what happens if a DC is unavailable. This was tricky to achieve acting as a typical user for Instaclustr Managed Cassandra (as there’s no way for users to stop/start Cassandra nodes), so I simulated it by using permutations of local DC, keyspaces, and a consistency level of LOCAL_ONE (a write must be sent to, and successfully acknowledged by, at least one replica node in the local DC). This also allows customers to try this out as well. Using LOCAL_ONE means that if cqlsh is connected to the Sydney DC, and the keyspace has a Sydney DC with RF >= 1 then writes and reads will succeed. But if the keyspace only has DCs in other regions (Singapore or USA) then the writes and reads will fail (simulating the failure of remote DCs).  This table shows the results of this experiment:

The results are almost identical to before, but with the key difference that we get a NoHostAvailable error (and therefore no latencies) when the keyspaces are singapore or usa. The keyspace of sydney or everywhere works ok still as the sydney DC is available in both cases. 

Note that Cassandra consistency levels are highly tunable, and there are more options that are relevant to multi-DC Cassandra operation. For example, ALL and EACH_QUORUM (writes only) work across all the DCs, and have stronger consistency, but at the penalty of higher latency and lower availability.

3. Multi-Data Centers Experiments With the Cassandra Java Client

Around the world in 80 days

In our journey “Around the World” it’s important to always have the latest information! As Phileas Fogg discovered, train timetables, like Cassandra documentation, can get out of date very quickly.

I was also interested in testing out the Cassandra Java client with my multi-DC cluster.   I had previously read that by default it supports automatic failover across multiple DCs which I thought would be interesting to see happening in practice. The DCAwareRoundRobinPolicy was recommended in the O’Reilly book “Learning Apache Cassandra (2nd edition 2017)” which says that “this policy is datacenter aware and routes queries to the local nodes first”. This is also the policy I used in my first attempt to connect with Cassandra way back in 2017!

However, a surprise was lying in wait! It turns out that since version 4.0 of the Java client there is no longer a DCAwareRoundRobinPolicy!  

Instead, the default policy now only connects to a single data center, so naturally there is no failover across DCs. You must provide the local DC name and this is the only one the client connects to. This also means that it behaves exactly like the last (LOCAL_ONE) results with cqlsh. This prevents potentially undesirable data consistency issues if you are using DC-local consistency levels but transparently failover to a different DC.

You can either handle any failures in the Java client code (e.g. if a DC is unavailable, pick the backup Cassandra DC and connect to that), or probably the intended approach is for the entire application stack in the DC with the Cassandra failure to failover to a complete copy in the backup region. I tried detecting a failure in the Java client code, and then failing over to the backup DC. This worked as expected. However, in the future I will need to explore how to recover from the failure (e.g. how do you detect when the original DC is available and consistent again, and switch back to using it).

3.1 Redundant Low-Latency DCs

This brings us full circle back to the first blog in the series where we discovered that there are 8 georegions in AWS that provide sub 100ms latency to clients in the same georegion:

8 georegions in AWS that provide sub 100ms latency to clients in the same georegion

The 8 AWS georegions with sub 100ms latency for geolocated clients

Which we then suggested could be serviced with 8 DCs to provide DC failover in each georegion as follows:

8 DCs to provide DC failover in each georegion

The matched pairs of DCs to provide High Availability (HA) and failover for each georegion look like this in table form. These are pairs of DCs that the application code will need to have knowledge of and failover between:

Table - Matched pairs of DCs to provide High Availability (HA) and failover for each georegion

In practice, the read load of the application/client would need to load balance over both of the data centers for some georegions (e.g. North America georegion load balances across both West and East Coast DCs). Depending on the data replication strategy (just replicating data written in each georegion to both redundant DC pairs, or to all DCs in the cluster—this really depends on the actual use case), and the expected extra load on each DC due to failover, DC cluster sizes will need to be sufficient to cope with the normal read/write loads on each DC, the replication write load (write amplification), and load spikes due to DC failures and failover to another DC. 

Based on these limited experiments we are now ready for the next Blog in the series, where we’ll try multi-DC Cassandra out in the context of a realistic globally distributed example application, potentially with multiple keyspaces, data centers and replication factors, to achieve goals including low latency, redundancy, and replication across georegions and even Around The World.

The post An Introduction to Cassandra Multi-Data Centers: Part 2 appeared first on Instaclustr.

Apache Cassandra 4.0 – Audit

Apache Cassandra 4.0 brings about a long awaited feature for tracking and logging database user activity. Primarily aimed at providing a robust set of audit capabilities allowing operators of Cassandra to meet external compliance obligations, it brings yet another enterprise feature into the database.  Combining work for the full query log capability, the audit log capability provides operators with the ability to audit all DML DDL and DCL changes to either a binary file or a user configurable source (including the new Diagnostics notification changes). 

This capability will go a long way toward helping Cassandra operators meet their SOX and PCI requirements.  If you are interested in reading about the development of the feature you can follow along here: https://issues.apache.org/jira/browse/CASSANDRA-12151

From a performance perspective the changes appear to only have a fairly minor hit on throughput and latency when enabled, and no discernible impact when disabled. Expect to see 10% to 15% impact on mixed workload throughput and p99 latency.

By default audit logs are written in the BinLog format and Cassandra comes with tools for parsing and processing them to human readable formats. Cassandra also supports executing an archive command for simple processing of audit logs. Audited keyspaces, users, and command categories can be whitelisted and blacklisted. Audit logging can be enabled in cassandra.yaml. 

What’s the Difference Between Audit Logging, Full Query Logging and Diagnostic Events? 

Both Audit logging (BinAuditLogger) and Full Query logging are managed internally by Apache Cassandra’s AuditLogManager. Both implement IAuditLogger, but are predefined in Apache Cassandra. The main difference is that the full query log receives AuditLogEntries before being processed by the AuditLogFilter. Both the FQL and BAL leverage the same BinLog format and share a common implementation of it. 

Diagnostic events are effectively a queue of internal events that happen in the node. There is an IAuditLogger implementation that publishes filtered LogEntries to the Diagnostics queue if users choose to consume audit records this way.

So think of it this way: Cassandra has an audit facility that enables both configurable audit on actions as well as a full query log, you can have as many AuditLoggers enabled as you want. Diagnostic events is a way for pushing events to client drivers using the CQL protocol and you can pipe AuditEvents to the Diagnostics system!

How Is This Different From Cassandra’s Change Data Capture() Mechanism?

Apache Cassandra has supported CDC on tables for some time now, however the implementation has always been a fairly low level and hard to consume mechanism. CDC in Cassandra is largely just an index into commitlog files that point to data relevant to the table with CDC enabled. It was then up to the consumer to read the commitlog format and do something with it. It also only just captured mutations that were persisted to disk.

Audit logging capability will log all reads, writes, login attempts, schema changes etc. Both features could be leveraged to build a proper CDC stream. I would hazard a guess that it’s probably easier to do with the IAuditLogger interface than consuming the CDC files!

The post Apache Cassandra 4.0 – Audit appeared first on Instaclustr.

Preview Release of Apache Cassandra 4.0

Instaclustr announces immediate availability of a preview release of Apache Cassandra 4.0 on the Instaclustr Managed Platform. This release is designed to allow Instaclustr customers to easily undertake any application-specific testing of Apache Cassandra 4.0 (alpha 3) in preparation for the forthcoming GA release of Apache Cassandra 4.0.

Apache Cassandra 4.0, as the first major release of Apache Cassandra for more than 2 years, is a major step forward for the Apache Cassandra community. Key features in Apache Cassandra 4.0 include:

  • non-blocking IO for internode communication—this has provide significant performance improvements in both user query tail-end latency and streaming operations;
  • virtual tables—providing the ability to retrieve performance metrics and other; metadata directly via CQL rather than requiring external tools;
  • audit logging—the ability to log queries that are executed for compliance purposes; and
  • many (generally minor but adding up) stability and testing improvements.

While Cassandra 4.0 is still in an alpha release phase and not yet ready for production usage, Instaclustr is making this preview release available for several reasons:

  1. Building it into our automated provisioning system was a prerequisite to beginning our own validation testing of the 4.0 stream using our Certified Cassandra Framework, and thus allowing us to be ready to release a full GA release of Cassandra 4.0 on our managed platform as soon as Cassandra 4.0 is ready.
  2. This preview release provides customers an easy path to start to undertake their own application-specific validation of Cassandra 4.0, and interacting with Instaclustr Support to debug any specific issues they may come across.

Being a preview release (of an alpha release of Apache Cassandra), this release has several limitations:

  • It is not supported for production usage and not covered by SLAs. The release should be used for testing purposes only;
  • It does not support Password Authentication, User Authorization, and Client to Node encryption (TLS);
  • CounterMutationStage, ViewMutationStage and DroppedMessage Monitoring does not report results;
  • The following add-ons are not supported:
    • Zeppelin
    • Spark
    • Lucene Index
    • Continuous Backup

We will be working to remove these limitations prior to a final release of Cassandra 4.0.

We look forward to working with our customers in this validation phase of Cassandra 4.0. Should you have any issues or questions please contact support@instaclustr.com.

The post Preview Release of Apache Cassandra 4.0 appeared first on Instaclustr.

The Last Pickle Joining DataStax

Today is a very emotional day: I’m happy, excited, and extremely proud to announce The Last Pickle has been acquired by DataStax.

I started contributing to the Apache Cassandra project in 2010, working by myself in my spare time. In March 2011, I left my job at Weta Digital and “went pro” becoming one of the first Apache Cassandra Consultants in the world. In 2013, Nate McCall joined me at The Last Pickle and we realised we could have a bigger impact on the world by working together. As the team at TLP grew over the years, so did our impact on the world. And with joining DataStax we are going to have the biggest impact possible.

We are at DataStax because we want to be. Because we have a shared passion to make Apache Cassandra the best database in the world, to make it easier to use, and to make it the first database people chose to use when starting a new project. Cassandra made large scale, highly available, databases an achievable goal for many companies around the world. For the first few years this was enough; it worked well enough if you knew how to take care of it. The addition of the Cassandra Query Language and improvements in operations expanded the user base as things got easier. But there is more work to do. We want to make open source Apache Cassandra easier to use at any scale, from 1 node to 1000, and to make it a realistic choice for every developer in the world.

The great level of technical understanding and kindness that the TLP team has given to the open source community, and importantly our customers, is not going away. The team is going to continue to help our customers as we always have, and will now be able to bring more resources when needed. Our contributions to the community are going to continue, and hopefully increase.

I’m extremely thankful to everyone that has worked at TLP over the years, everyone who has said nice things about us and helped us, and all of the customers who put their trust in my little company from New Zealand.

Thanks, Aaron

Instaclustr Announces PCI-DSS Certification

Instaclustr is very pleased to announce that we have achieved PCI-DSS certification for our Managed Apache Cassandra and Managed Apache Kafka offerings running in AWS. PCI-DSS (Payment Card Industry – Data Security Standard) is a mandated standard for many financial applications and we increasingly see the PCI-DSS controls adopted as the “gold standard” in other industries where the highest standards of security are crucial. PCI-DSS certification adds to our existing SOC2 accreditation to provide the levels of security assurance required by even the most demanding business requirements. 

Overall, this certification effort was the most significant single engineering project in the history of Instaclustr, requiring several person years of engineering effort to implement well over 100 changes touching every aspect of our systems over the course of several months. We’re very proud that, despite this level of change, impact to our customers has been absolutely minimal and we’re able to deliver another very significant piece of background infrastructure, allowing a wider range of customers to focus their efforts on building innovative business applications based on open source data technologies.

While PCI-DSS compliance may not be required by all customers, and is only supported on selected Instaclustr products, most of the security enhancements we have implemented will result in improved levels of security for all our Managed Service customers, regardless of product or platform. The most significant of these changes are:

  • Tightening of our admin access environment with technical controls to prevent egress of data via our admin systems.
  • Improved logging and auditing infrastructure.
  • Tightened operating system hardening and crypto standards.
  • Addition of a WAF (Web Application Firewall) in front of our console and APIs.
  • More automated scanning, and tightened resolution policies, for code dependency vulnerabilities.
  • More frequent security scanning of our central management systems.
  • More developer security training.

Customers wishing to achieve full PCI-DSS compliance will need to opt-in when creating a cluster as achieving PCI compliance will enforce a range of more restrictive security options (for example, password complexity in the Instaclustr console and use of Private Network Clusters) and enabling the required additional logging on the cluster incurs a performance penalty of approximately 5%.  There are also a set of customer responsibilities that customers must implement for full compliance. Additional technical controls activated for PCI compliant clusters include:

  • Logging of all user access to the managed applications (Cassandra, Kafka)
  • Locked-down outbound firewall rules
  • Second approver system for sudo access for our admins

For full details please see our support page.

Customers with existing clusters who wish to move to full PCI compliance should contact support@instaclustr.com who will arrange a plan to apply the new controls to your cluster.

We will be publishing more detail on many of these controls in the coming weeks and holding webinars to cover the Cassandra and Kafka specific implementation details which we expect will be of broad interest. In the meantime, should you have any interest in any further information please contact your Instaclustr Customer Success representative or sales@instaclustr.com who will be able to arrange technical briefings.

The post Instaclustr Announces PCI-DSS Certification appeared first on Instaclustr.

Apache Cassandra 4.0 – Netty Transport

In this series of blog posts, we’ll take a meandering tour of some of the important changes, cool features, and nice ergonomics that are coming with Apache Cassandra 4.0. In part 1 we focussed on Cassandra 4.0 stability and testing, and Part 2 around virtual tables. In this part we will learn about Netty Transport Framework

One of the headline features for Apache Cassandra 4.0 is the refactor of internode messaging to use Javas (non-blocking) NIO capability (https://issues.apache.org/jira/browse/CASSANDRA-8457 and https://issues.apache.org/jira/browse/CASSANDRA-15066) via the Netty library (link to Netty). 

This allows Cassandra to move away from having an N threads per peer model, to a single thread pool for all connections. This dramatically reduces performance issues related to thread signalling, coordination and context switching.

Moving to the Netty framework has also enabled other features like zero copy streaming for SSTables.

Performance improvements have been apparent from day one, both in terms of throughput and latency. The big win however is the significant reduction in tail end latency with up to 40%+ reductions in P99s seen in initial testing.  

Of course patches and improvements are being made all the time during the beta process, so benchmarking will need to be revalidated against each workload and within your own environment, but progress is promising!

https://issues.apache.org/jira/browse/CASSANDRA-14746

The post Apache Cassandra 4.0 – Netty Transport appeared first on Instaclustr.

Apache Cassandra 4.0 – Virtual Tables

The last major version release of Apache Cassandra was 3.11.0 and that was more than 2 years ago in 2017. So what has the Cassandra developer community been doing over the last 2 years? Well let me tell you, it’s good real good. It’s Apache Cassandra 4.0! It’s also close, and with the release of the first alpha version, we now have a pretty solid idea of the features and capabilities that will be included in the final release. 

In this series of blog posts, we’ll take a meandering tour of some of the important changes, cool features, and nice ergonomics that are coming with Apache Cassandra 4.0. In part 1 we focussed on Cassandra 4.0 stability and testing, in this part we will learn about Virtual Tables.

Implementation of Virtual Tables

Among the many exciting new features, Cassandra 4.0 boasts is the implementation of Virtual Tables. Up until now, JMX access has been required for revealing Cassandra details such as running compactions, metrics, clients, and various configuration settings. With Virtual Tables, users will be able to easily query this data as CQL rows from a read-only system table. Let’s briefly discuss the changes associated with these Virtual Tables below. 

Previously if a user wanted to look up the compaction status of a given node in a cluster, they would first require a JMX connection to be established in order to run nodetool compactionstats on the node. This alone presents a number of considerations: configuring your client for JMX access, configuring your nodes and firewall to allow for JMX access, and ensuring the necessary security and auditing measures are in place, just to name a few. 

Virtual Tables eliminate this overhead by allowing the user to query this information via the driver they already have configured. There are two new keyspaces created for this purpose: system_views and system_virtual_schema. The system_virtual_schema keyspace is as it sounds; it contains the schema information for the Virtual Tables themselves. All of the pertinent information we want is housed in the system_views keyspace which contains a number of useful tables. 

cqlsh> select * from system_virtual_schema.tables;

 keyspace_name         | table_name                | comment
-----------------------+---------------------------+------------------------------
          system_views |                    caches |                system caches
          system_views |                   clients |  currently connected clients
          system_views |  coordinator_read_latency |                             
          system_views |  coordinator_scan_latency |                             
          system_views | coordinator_write_latency |                             
          system_views |                disk_usage |                             
          system_views |         internode_inbound |                             
          system_views |        internode_outbound |                             
          system_views |        local_read_latency |                             
          system_views |        local_scan_latency |                             
          system_views |       local_write_latency |                             
          system_views |        max_partition_size |                             
          system_views |             rows_per_read |                             
          system_views |                  settings |             current settings
          system_views |             sstable_tasks |        current sstable tasks
          system_views |              thread_pools |                             
          system_views |       tombstones_per_read |                             
 system_virtual_schema |                   columns |   virtual column definitions
 system_virtual_schema |                 keyspaces | virtual keyspace definitions
 system_virtual_schema |                    tables |    virtual table definitions

Before looking at an example, it’s important to touch upon the scope of these Virtual Tables. All Virtual Tables are restricted in scope to their node, and therefore all queries on these tables return data valid only for the node acting as coordinator regardless of consistency. As a result, support for specifying the coordinator node for such queries has been added to several drivers including the Python and Datastax Java drivers. 

Let’s take a look at a Virtual Table, in this case sstable_tasks. This table shows all operations on SSTables such as compactions, cleanups, and upgrades. 

cqlsh> select * from system_views.sstable_tasks;

 keyspace_name | table_name  | task_id                              | kind       | progress | total     | unit
---------------+-------------+--------------------------------------+------------+----------+-----------+-------
     keyspace1 |  standard1  | 09e00960-064c-11ea-a48a-87683fec5884 | compaction | 15383452 | 216385920 | bytes

This is the same information we would expect out of running nodetool compactionstats. We can see that there is currently one active compaction on the node, what its progress is, as well as its keyspace and table. Being able to quickly and efficiently view this information is often key in understanding and diagnosing cluster health. 

While there are still some metrics with which JMX is the only means of querying, having the ability to use CQL to pull important metrics on a cluster is a very nice feature. With Virtual Tables offering a convenient means of querying metrics less focus needs to be placed on building JMX tools, such as Reaper, and more time can be spent working within Cassandra. We may start to see a rise in client-side tooling that takes advantage of Virtual Tables as well. 

The post Apache Cassandra 4.0 – Virtual Tables appeared first on Instaclustr.

Running tlp-stress in Kubernetes

Performance tuning and benchmarking is key to the successful operation of Cassandra. We have a great tool in tlp-stress that makes benchmarking a lot easier. I have been exploring running Cassandra in Kubernetes for a while now. At one point I thought to myself, it would be nice to be able to utilize tlp-stress in Kubernetes. After a bit of prototyping, I decided that I would write an operator. This article introduces the Kubernetes operator for tlp-stress, stress-operator.

Before getting into the details of stress-operator, let’s consider the following question: What exactly is a Kubernetes operator?

Kubernetes has a well-defined REST API with lots of built-in resource types like Pods, Deployments, and Jobs. The API for creating these built-in objects is declarative. Users typically create objects using the tool kubectl and YAML files. A controller is code that executes a control loop watching one or more of these resource types. A controller’s job is to ensure that an object’s actual state matches its expected state.

An operator extends Kubernetes with custom resource definitions (CRDs) and custom controllers. A CRD provides domain specific abstractions. The custom controller provides automation that is tailored around those abstractions.

If the concept of an operator is still a bit murky, don’t worry. It will get clearer as we look at examples of using stress-operator that hightlight some of its features including:

  • Configuring and deploying tlp-stress
  • Provisioning Cassandra
  • Monitoring with Prometheus and Grafana

Installing the Operator

You need to have kubectl installed. Check out the official Kubernetes docs if you do not already have it installed.

Lastly, you need access to a running Kubernetes cluster. For local development, my tool of choice is kind.

Download the following manifests:

stress-operator.yaml declares all of the resources necessary to install and run the operator. The other files are optional dependencies.

casskop.yaml installs the Cassandra operator casskop which stress-operator uses to provision Cassandra.

grafana-operator.yaml and prometheus-operator.yaml install grafana-operator and prometheus-operator respectively. stress-operator uses them to install, configure, and monitor tlp-stress.

Install the operator along with the optional dependencies as follows:

$ kubectl apply -f stress-operator.yaml

$ kubectl apply -f casskop.yaml

$ kubectl apply -f grafana-operator.yaml

$ kubectl apply -f prometheus-operator.yaml

The above commands install CRDs as well as the operators themselves. There should be three CRDs installed for stress-operator. We can verify this as follows:

$ kubectl get crds | grep thelastpickle
cassandraclustertemplates.thelastpickle.com   2020-02-26T16:10:00Z
stresscontexts.thelastpickle.com              2020-02-26T16:10:00Z
stresses.thelastpickle.com                    2020-02-26T16:10:00Z

Lastly, verify that each of the operators is up and running:

$ kubectl get deployments
NAME                     READY   UP-TO-DATE   AVAILABLE   AGE
cassandra-k8s-operator   1/1     1            1           6h5m
grafana-deployment       1/1     1            1           4h35m
grafana-operator         1/1     1            1           6h5m
stress-operator          1/1     1            1           4h51m

Note: The prometheus-operator is currently installed with cluster-wide scope in the prometheus-operator namespace.

Configuring and Deploying a Stress Instance

Let’s look at an example of configuring and deploying a Stress instance. First, we create a KeyValue workload in a file named key-value-stress.yaml:

apiVersion: thelastpickle.com/v1alpha1
kind: Stress
metadata:
  name: key-value
spec:
  stressConfig:
    workload: KeyValue
    partitions: 25m
    duration: 60m
    readRate: "0.3"
    consistencyLevel: LOCAL_QUORUM
    replication:
      networkTopologyStrategy:
        dc1: 3
    partitionGenerator: sequence
  cassandraConfig:
    cassandraService: stress

Each property under stressConfig corresponds to a command line option for tlp-stress.

The cassandraConfig section is Kubernetes-specific. When you run tlp-stress (outside of Kubernetes) it will try to connect to Cassandra on localhost by default. You can override the default behavior with the --host option. See the tlp-stress docs for more information about all its options.

In Kubernetes, Cassandra should be deployed using StatefulSets. A StatefulSet requires a headless Service. Among other things, a Service maintains a list of endpoints for the pods to which it provides access.

The cassandraService property specifies the name of the Cassandra cluster headless service. It is needed in order for tlp-stress to connect to the Cassandra cluster.

Now we create the Stress object:

$ kubectl apply -f key-value-stress.yaml
stress.thelastpickle.com/key-value created

# Query for Stress objects to verify that it was created
$ kubectl get stress
NAME       AGE
key-value   4s

Under the hood, stress-operator deploys a Job to run tlp-stress.

$ kubectl get jobs
NAME       COMPLETIONS   DURATION   AGE
key-value   0/1           4s         4s

We can use a label selector to find the pod that is created by the job:

$ kubectl get pods -l stress=key-value,job-name=key-value
NAME             READY   STATUS    RESTARTS   AGE
key-value-pv6kz   1/1     Running   0          3m20s

We can monitor the progress of tlp-stress by following the logs:

$ kubectl logs -f key-value-pv6kz

Note: If you are following the steps locally, the Pod name will have a different suffix.

Later we will look at how we monitor tlp-stress with Prometheus and Grafana.

Cleaning Up

When you are ready to delete the Stress instance, run:

$ kubectl delete stress key-value

The above command deletes the Stress object as well as the underlying Job and Pod.

Provisioning a Cassandra Cluster

stress-operator provides the ability to provision a Cassandra cluster using casskop. This is convenient when you want to quickly to spin up a cluster for some testing.

Let’s take a look at another example, time-series-casskop-stress.yaml:

apiVersion: thelastpickle.com/v1alpha1
kind: Stress
metadata:
  name: time-series-casskop
spec:
  stressConfig:
    workload: BasicTimeSeries
    partitions: 50m
    duration: 60m
    readRate: "0.45"
    consistencyLevel: LOCAL_QUORUM
    replication:
      networkTopologyStrategy:
        dc1: 3
    ttl: 300
  cassandraConfig:
    cassandraClusterTemplate:
      metadata:
        name: time-series-casskop
      spec:
        baseImage: orangeopensource/cassandra-image
        version: 3.11.4-8u212-0.3.1-cqlsh
        runAsUser: 1000
        dataCapacity: 10Gi
        imagepullpolicy: IfNotPresent
        deletePVC: true
        maxPodUnavailable: 0
        nodesPerRacks: 3
        resources:
          requests:
            cpu: '1'
            memory: 1Gi
          limits:
            cpu: '1'
            memory: 1Gi
        topology:
          dc:
            - name: dc1
              rack:
                - name: rack1       

This time we are running a BasicTimeSeries workload with a TTL of five minutes.

In the cassandraConfig section we declare a cassandraClusterTemplate instead of a cassandraService. CassandraCluster is a CRD provided by casskop. With this template we are creating a three-node cluster in a single rack.

We won’t go into any more detail about casskop for now. It is beyond the scope of this post.

Here is what happens when we run kubectl apply -f time-series-casskop-stress.yaml:

  • We create the Stress object
  • stress-operator creates the CassandraCluster object specified in cassandraClusterTemplate
  • casskop provisions the Cassandra cluster
  • stress-operator waits for the Cassandra cluster to be ready (in a non-blocking manner)
  • stress-operator creates the Job to run tlp-stress
  • tlp-stress runs against the Cassandra cluster

There is another benefit of this approach in addition to being able to easily spin up a cluster. We do not have to implement any steps to wait for the cluster to be ready before running tlp-stress. The stress-operator takes care of this for us.

Cleaning Up

When you are ready to delete the Stress instance, run:

$ kubectl delete stress time-series-casskop

The deletion does not cascade to the CassandraCluster object. This is by design. If you want to rerun the same Stress instance (or a different one that uses reuses the same Cassandra cluster), the stress-operator reuses the Cassandra cluster if it already exists.

Run the following to delete the Cassandra cluster:

$ kubectl delete cassandracluster time-series-casskop

Monitoring with Prometheus and Grafana

The stress-operator integrates with Prometheus and Grafana to provide robust monitoring. Earlier we installed grafana-operator and prometheus-operator. They, along with casskop, are optional dependencies. It is entirely possible to integrate with Prometheus and Grafana instances that were installed by means other than the respective operators.

If you want stress-operator to provision Prometheus and Grafana, then the operators must be installed.

There is an additional step that is required for stress-operator to automatically provision Prometheus and Grafana. We need to create a StressContext. Let’s take a look at stresscontext.yaml:

# There should only be one StressContext per namespace. It must be named
# tlpstress; otherwise, the controller will ignore it.
#
apiVersion: thelastpickle.com/v1alpha1
kind: StressContext
metadata:
  name: tlpstress
spec:
  installPrometheus: true
  installGrafana: true

Let’s create the StressContext:

$ kubectl apply -f stresscontext.yaml

Creating the StressContext causes stress-operator to perform several actions including:

  • Configure RBAC setttings so that Prometheus can scrape metrics
  • Create a Prometheus custom resource
  • Expose Prometheus with a Service
  • Create a ServiceMonitor custom resource which effectively tells Prometheus to monitor tlp-stress
  • Create a Grafana custom resource
  • Expose Grafana with a Service
  • Create and configure a Prometheus data source in Grafana

Now when we create a Stress instance, stress-operator will now also create a Grafana dashboard for the tlp-stress job. We can test this with time-series-casskop-stress.yaml. The dashboard name will be the same as the name as the Stress instance, which in this example is time-series-casskop.

Note: To re run the job we need to delete and recreate the Stress instance.

$ kubectl delete stress times-series-casskop

$ kubectl apply -f time-series-casskop-stress.yaml

Note: You do not need to delete the CassandraCluster. The stress-operator will simply reuse it.

Now we want to check out the Grafana dashboard. There are different ways of accessing a Kubernetes service from outside the cluster. We will use kubectl port-forward.

Run the following to make Grafana accessible locally:

$ kubectl port-forward svc/grafana-service 3000:3000
Forwarding from 127.0.0.1:3000 -> 3000
Forwarding from [::1]:3000 -> 3000
Handling connection for 3000

Then in your browser go to http://localhost:3000/. It should direct you to the Home dashboard. Click on the Home label in the upper left part of the screen. You should see a row for the time-series-casskop dashboard. Click on it. The dashboard should look something like this:

tlp-stress Grafana dashboard

Cleaning Up

Delete the StressContext with:

$ kubectl delete stresscontext tlpstress

The deletion does not cascade to the Prometheus or Grafana objects. To delete them and their underlying resources run:

$ kubectl delete prometheus stress-prometheus

$ kubectl delete grafana stress-grafana

Wrap Up

This concludes the brief introduction to stress-operator. The project is still in early stages of development and as such undergoing lots of changes and improvements. I hope that stress-operator makes testing Cassandra in Kubernetes a little easier and more enjoyable!

Version 4.0 of tlp-stress for Cassandra released

The Last Pickle is very pleased to announce that version 4.0 of tlp-stress for Apache Cassandra has been released.

Here is what you will find in the release:

  • The biggest addition is support for including DELETE statements in a workload via the --delete option. This option works similar to the --reads option where a value between 0 and 1 is supplied with the argument. Each workload is responsible for creating the delete requests. So they naturally fit in with each of the workload patterns. As a result of this new feature, the BasicTimeSeries workload works with only Cassandra versions 3.0 and above. See the tlp-stress documentation for more information.

  • Support for adjusting the maximum requests per connection via the --max-request option.

  • A new Sets workload to exercise the set collection type. This was useful for investigating the root cause of slow inserts to set<text>. See CASSANDRA-15464.

  • Improved error handling for non-existent workload parameters, non-existent workloads, and when the read rate plus the delete rate is greater than 1.0.

  • Documentation updates to reflect the new option and workload additions.

A complete list of the items covered in the release can be found in the tlp-stress GitHub issues under the 4.0 milestone.

As always binaries for the release can be found in The Last Pickle Bintray and Docker Hub repositories.

If you have any questions or suggestions please let us know on the tlp-dev-tools mailing list. We would love to hear from you if you are using the tool!

Apache Cassandra 4.0 – Stability and Testing

The last major version release of Apache Cassandra was 3.11.0 and that was more than 2 years ago in 2017. So what has the Cassandra developer community been doing over the last 2 years? Well let me tell you, it’s good,  real good. It’s Apache Cassandra 4.0! The final release is not here as yet, but with the release of the first alpha version, we now have a pretty solid idea of the features and capabilities that will be included in the final release. 

In this series of blog posts, we’ll take a meandering tour of some of the important changes, cool features, and nice ergonomics that are coming with Apache Cassandra 4.0.

The first blog of this series focuses on stability and testing.

Apache Cassandra 4.0: Stability and Testing

One of the explicit goals for Apache Cassandra 4.0 was to be the “most stable major release of Cassandra ever” (https://instac.io/37KfiAb

As those who’ve run Cassandra in production know it was generally advisable to wait for up to 5 or 6 minor versions before switching production clusters to a new major version. This resulted in adoption only occurring later in the supported cycle for a given major version. All in all, this was not a great user experience, and frankly a pretty poor look for a database which is the one piece of your infrastructure that really needs to operate correctly. 

In order to support a stable and safe major release, a significant amount of effort was put into improving Apache Cassandra testing. 

The first of these is the ability to run multi-node/coordinator tests in a single JVM (https://issues.apache.org/jira/browse/CASSANDRA-14821). This allows us to test distributed behavior with Java unit tests for quicker, more immediate feedback. Rather than having to leverage the longer running, more intensive DTests. This paid off immediately identifying typically hard to catch distributed bugs such as https://issues.apache.org/jira/browse/CASSANDRA-14807 and https://issues.apache.org/jira/browse/CASSANDRA-14812. It also resulted in a number of folk backporting this to earlier versions to assist in debugging tricky issues. 

See here for an example of one of the new tests, checking that a write fails properly during a schema disagreement:(https://instac.io/2ulqNQ7)

Interestingly, the implementation is a nice use of distinct Java class loaders to get around Cassandra’s horrid use of singletons everywhere and allows it to fire up multiple Cassandra Instances in a single JVM. 

From the ticket: “In order to be able to pass some information between the nodes, a common class loader is used that loads up Java standard library and several helper classes. Tests look a lot like CQLTester tests would usually look like.

Each Cassandra Instance, with its distinct class loader is using serialization and class loading mechanisms in order to run instance-local queries and execute node state manipulation code, hooks, callbacks etc.”

On top of this, the community has started adopting Quick Theories as a library for introducing property based testing. Property based testing is a nice middle ground between unit tests and fuzzing. It allows you to define a range of inputs and test the test space (and beyond) in a repeatable and reproducible manner. 

Currently, in trunk there are two test classes that have adopted property based testing: EncodingStatsTest and ChecksummingTransformerTest.  However community members are using it in their own internal validation test frameworks for Cassandra and have been contributing bugs and patches back to the community as well. 

Moving beyond correctness testing, a significant amount of effort has gone into performance testing, especially with the change to adopt Netty as the framework for internode messaging.     So far testing has included, but definitely has not been limited to:

Probably the best indication of the amount of work that has gone into testing of the Netty rewrite can be seen in 15066 https://issues.apache.org/jira/browse/CASSANDRA-15066 and is well worth a read if you are into that kind of thing. 

The post Apache Cassandra 4.0 – Stability and Testing appeared first on Instaclustr.