Two Accelerate Talks Certain to Cause ‘Sparks’

There are going to be so many amazing talks this year at DataStax Accelerate. But one of the talks I’m particularly excited for is “To Spark or Not to Spark” by Russell Spitzer.

I feel like this talk is the perfect pairing with my talk, “Lightening A Spark with Machine Learning”. While my talk will focus on the practical “what and how” of machine learning with Apache Spark™ and Apache Cassandra™, Russell’s will focus mostly on the  “why”. Russell will cover the best and most accessible use cases for using distributed analytics with Spark and DataStax Analytics.

Russell will also talk about advanced use cases using Spark’s streaming service. I will be sitting in the front row to make sure I get to ask the first question: “How is Spark streaming different than Apache Kafka™!?” Russel will also be covering some of the “what and how” that my talk will not be covering, such as using Spark to load data, modify tables, and move data from cluster to cluster. These are the topics I am frequently asked about, so I am excited to finally get my own questions answered!

I can not wait to hear this talk and I think it’s the perfect pairing to my talk—like peanut butter and jelly! And of course the answer is: “TO SPARK!”

Come check out both our talks at Accelerate!

DataStax Accelerate

REGISTER NOW

Why Anyone into Apache Cassandra Needs to Attend Accelerate

If you are building modern applications that scale in all sorts of ways, then you’re probably into Apache Cassandra. If you’re attracted to multi-data center replication, fault tolerance, tunable consistency, and the distribution of data across clusters in a masterless architecture, then you are definitely into Cassandra. As a Cassandra enthusiast, you probably need to attend DataStax Accelerate, the world’s premier Cassandra conference, which is taking place May 21–23 near Washington, D.C. This is a great opportunity to spend a couple of focused days learning from others and meeting people just like yourself.

Cassandra has come a long way since it was open sourced more than 10 years ago. I myself have a long history with Cassandra, some of it painful in a “nobody gets it” kind of way. But we all know where the story is headed!

We have collectively pushed through the fear, uncertainty, and doubt to arrive at an interesting point in the history of NoSQL databases and databases in general. The real value behind the transformational technology in Cassandra is leading the migration, intentional or not, to hybrid and multi-cloud computing environments.

Hybrid cloud popularity is building as organizations become more aware of the growing technical debt around data usage and their databases. What’s your cloud strategy? How has that radically changed in the past few years? As of today, it’s hardly ever a bulk migration to one cloud. It’s a mix of old and new, which has put a lot of burden on engineers struggling to make it work.

As an open source project built with scaling in mind, Cassandra is uniquely positioned to be the go-to database of the hybrid cloud future, and DataStax expertise will play a key role in allowing companies to take full advantage of it. We get what you are trying to do and we are here to help.

As a part of that commitment, we are hosting a gathering place for the community of engineers trying to build that future. At Accelerate, we’re looking forward to bringing together all kinds of Cassandra enthusiasts to meet one another, share ideas, and learn how some of today’s leading enterprises are using Cassandra to change the world.

You do not want to miss this event. Just take a quick peek at some Accelerate sessions to give you an idea of what to expect:

1. Cassandra at Yahoo! Japan

Learn how Yahoo! Japan uses Cassandra at scale, spinning up as many as 5,000 servers at any time. Yahoo! Japan has been a driving force for Cassandra in Japan and hosts many awesome community events. They bring a deep expertise you won’t want to miss.

2. Two Years in the Making: What’s New with Apache Cassandra 4.0?

Apache Cassandra™ 4.0 is almost here! This session explores the new features and performance improvements you can expect, as well as the thinking that inspired the architecture of the release. There is no better source of 4.0 information than Cassandra committer  Dinesh Joshi.

3. 10 Easy Ways to Tune Your Cassandra Cluster

Are you getting the most out of your Cassandra cluster? Probably not. Discover how to pinpoint bottlenecks and learn 10 simple ways to boost performance while cutting costs. This is a talk by Jon Haddad, who has been a leader in Cassandra performance for years. You may see him on the user mailing list talking about this topic often. Here is your chance to meet him in person and get some firsthand knowledge.

4. Cassandra Traffic Management at Instagram

Instagram has been using Cassandra for years, and their deployment is still growing fast. Learn how the company has worked to improve its infrastructure over the years, increasing the efficiency and reliability of their clusters along the way.

5. Modern Data Architectures with Kafka and Cassandra

Modern companies are connecting Cassandra and Kafka to stream data from microservices, databases, IoT events, and other critical systems to answer real-time questions and make better decisions. Find out why Cassandra and Kafka should be core components of any modern data architecture.

6. Speed Up Your Apache Cassandra™ Applications: A Practical Guide to Reactive Programming

Everything is an event. To speed up applications, you need to think reactive. The DataStax team has been developing Cassandra drivers for years, and in our latest version of the enterprise driver, we introduced reactive programming. Learn how to migrate a CRUD Java service into reactive and bring home a working project.

7. Cassandra Use Cases for Large-Scale Family History

See how FamilySearch uses DataStax Enterprise to solve large-scale family history problems, handling three unique workloads in production.

And that’s just the tip of the iceberg. Check out other Cassandra sessions you won’t want to miss here.

See you in D.C.!

DataStax Accelerate

REGISTER NOW

Going Head-to-Head: Scylla Cloud vs. Google Cloud Bigtable

Scylla Cloud vs. Cloud Bigtable

In this article, we will compare Scylla Cloud and Google Cloud Bigtable, two different managed solutions. The TL;DR is the following: We show that Scylla Cloud is 1/5th the cost of Cloud Bigtable under optimal conditions (perfect uniform distribution) and that when applied with real-world, unoptimized data distribution, Scylla performs 26x better than Cloud Bigtable. You’ll see that Scylla manages to maintain its SLA while Cloud Bigtable fails to do so. Finally, we investigate the worst case for both offerings, where a single hot row is queried. In this case, both databases fail to meet the 90,000 ops SLA but Scylla processes 195x more requests than Cloud Bigtable.

In this benchmark study we will simulate a scenario in which the user has a predetermined workload with the following SLA: 90,000 operations per second (ops), half of those being reads and half being updates, and needs to survive zone failures. We set business requirements so that 95% of the reads need to be completed in 10ms or less and will determine what is the minimum cost of running such cluster in each of the offerings.

A Nod to Our Lineage

Competing against Cloud Bigtable is a pivotal moment for us, as Scylla is, in a way, also a descendant of Bigtable. Scylla was designed to be a drop-in-replacement for Cassandra, which, in turn, was inspired by the original Bigtable and Dynamo papers. In fact, Cassandra is described as the “daughter” of Dynamo and Bigtable. We’ve already done a comparison of Scylla versus Amazon DynamoDB. Now, in this competitive benchmark, Scylla is tackling Google Cloud Bigtable, the commercially available version of Bigtable, the database that is still used internally at Google to power their apps and services. You can see the full “family tree” in Figure 1 below.

Scylla's Family Tree

Figure 1: The “Family Tree” for Scylla.

The Comparison

Our goal was to perform 90,000 operations per second in the cluster (50% updates, 50% reads) while keeping read latencies at 10ms or lower for 95% of the requests. We want the cluster to be present in three different zones, leading to higher availability and lower, local latencies. This is important not only to protect against entire-zone failures, which are rare, but also to reduce latency spikes and timeouts of Bigtable. This article does a good job of describing how Cloud Bigtable zone maintenance jobs can impact latencies for the application. Both offerings allow for tunable consistency settings, and we use eventual consistency.

For Google Cloud Bigtable, increasing the replication factor means adding replication clusters. Cloud Bigtable claims that each group node (per replica cluster) should be able to do 10,000 operations per second at 6ms latencies, although it does not specify at which percentile and rightfully makes the disclaimer that those numbers are workload-dependent. Still, we use them as a basis for analysis and will start our tests by provisioning three clusters of 9 nodes each. We will then increase the total size of the deployment by adding a node to each cluster until the desired SLAs are met.

For Scylla, we will select a 3-node cluster of AWS i3.4xlarge instances. The selection is based on ScyllaDB’s recommendation of leaving 50% free space per instance and the fact that this is the smallest instance capable of holding the data generated in the population phase of the benchmark— each i3.4xlarge can store a total of 3.8TB of data and should be able to comfortably hold 1TB at 50% utilization. We will follow the same procedure as Cloud Bigtable and keep adding nodes (one per zone) until our SLA is met.

Test Results

The results are summarized in Table 1. As we can see, Cloud Bigtable is not able to meet the desired number of 90,000 operations per second with the initial setup of 9 nodes per cluster. We report the latencies for completeness, but they are uninteresting since the cluster was clearly at a bottleneck and operating over capacity. During this run, we can verify that this is indeed the case by looking at Cloud Bigtable’s monitoring dashboard. Figure 4 shows the average CPU utilization among the nodes, already way above the recommended threshold of 70%. Figure 5 shows CPU utilization at the hottest node: for those, we are already at the limit.

Google Cloud Bigtable is still unable to meet the desired amount of operations with clusters of 10 nodes, and is finally able to do so with 11 nodes. However, the 95th percentile for reads is above the desired goal of 10 ms so we take an extra step. With clusters of 12 nodes each, Cloud Bigtable is finally able to achieve the desired SLA.

Scylla Cloud had no issues meeting the SLA at its first attempt.

The above was actually a fast-forward version of what we encountered. Originally, we didn’t start with the perfect uniform distribution and chose the real-life-like Zipfian distribution. Over and over we received only 3,000 operations per second instead of the desired 90,000. We thought something was wrong with the test until we cross-checked everything and switched to uniform distribution testing. Since Zipfian test results mimic real-life behaviour, we ran additional tests and received the same original poor result (as described in the Zipfian section below).

For uniform distribution, the results are shown in Table 1, and the average and hottest node CPU utilizations are shown just below in Figures 2 and 3, respectively.

OPS Maximum Latency P95 (microseconds) Cost per replica/AZ
per year ($USD)
Total Cost for 3 replicas/AZ
per year ($USD)
READ UPDATE
Cloud Bigtable
3×9 nodes
(total 27 nodes)
79,890 27,679 27,183 $53,334.98 $160,004.88
Cloud Bigtable
3×10 nodes
(total 30 nodes)
87,722 21,199 21,423 $59,028.96 $177,086.88
Cloud Bigtable
3×11 nodes
(total 33 nodes)
90,000 12,847 12,479 $64,772.96 $184,168.88
Cloud Bigtable
3×12 nodes
(total 36 nodes)
90,000 8,487 8,059 $70,416.96 $211,250.88
Scylla
3×1 nodes
(total 3 nodes)
90,000 5,871 2,042 $14,880 $44,680

Table 1: Scylla Cloud is able to meet the desired SLA with just one instance per zone (for a total of three). For Cloud Bigtable 12 instances per cluster (total of 36) are needed to meet the performance characteristics of our workload. For both Scylla Cloud and Cloud Bigtable, costs exclude network transfers. Cloud Bigtable price was obtained from Google Calculator and used as-is, and for Scylla Cloud prices were obtained from the official pricing page, with the network and backup rates subtracted. For Scylla Cloud, the price doesn’t vary with the amount of data stored up to the instance’s limit. For Cloud Bigtable, price depends on data that is actually stored up until the instance’s maximum. We use 1TB in the calculations in this table.

Figure 5: Average CPU Utilization for Bigtable

Figure 2: Average CPU load on a 3-cluster 9-node Cloud Bigtable instance, running Workload A

Figure 6: Hottest CPU utilization for Bigtable

Figure 3: Hottest node CPU load on a 3-cluster 9-node Cloud Bigtable instance, running Workload A

Behavior under Real-life, Non-uniform Workloads

Both Scylla Cloud and Cloud Bigtable will behave better under uniform data and request distribution and all users are advised to strive for that. But no matter how much work is put in guaranteeing good partitioning, workloads in real life often behave differently — either permanent or temporary — and that affects performance in practice.

For example, a user profile application can see patterns in time where groups of users are more active than others. An IoT application tracking data for sensors can have sensors that end up accumulating more data than others, or having time periods in which data gets clustered. A famous case is the dress that broke the internet, where a lively discussion among tens of millions of Internet users about the color of a particular dress led to issues in handling traffic for the underlying database.

In this session we will keep the cluster size determined in the previous phase constant, and study how both offerings behave under such scenarios.

Zipfian Distribution

To simulate real-world conditions, we changed the request distribution in the YCSB loaders to a Zipfian distribution. We have kept all other parameters the same, so the loaders are still trying to send the same 90,000 requests per second (with 50% reads, 50% updates).

Zipfian distribution was originally used to describe word frequency in languages. However, this distribution curve, known as Zipf’s Law, has also shown correlation to many other real-life scenarios. It can often indicate a form of “rich-get-richer” self-reinforcing algorithm, such as a bandwagon or network effect, where the distribution of results is heavily skewed and disproportionately weighted. For instance, in searching over time, once a certain search result becomes popular, more people click on it, and thus, it becomes an increasingly popular search result. Examples include the number of “likes” different NBA teams have on social media, as shown in Figure 4, or the activity distribution among users of the Quora website.

When these sort of result skews occur in a database, it can lead to incredibly unequal access to the database and, resultantly, poor performance. For database testing, this means we’ll have keys randomly accessed in a heavy-focused distribution pattern that allows us to visualize how the database in question handles hotspot scenarios.

Figure 7: Distribution of Facebook "likes" for NBA teams

Figure 4: The number of “likes” NBA teams get on Facebook follows a Zipfian distribution.

The results of our Zipfian distribution test are summarized in Table 2. Cloud Bigtable is not able to sustain answering the 90,000 requests per second that the clients send. It can process only 3,450 requests per second. Scylla Cloud, on the other hand, actually gets slightly better in its latencies. This result is surprising at first and deserves a deeper explanation. But to understand why that is the case it will be helpful to first look at our next proposed test scenario — a single hot row. We’ll then address these surprising results in the section Why did Zipfian latencies go down?”.

Zipfian Distribution
Overall Throughput
(ops/second)
p95 latency, milliseconds
READ UPDATE
Google Cloud Bigtable 3,450 1,601 ms 122 ms
Scylla Cloud 90,000 3 ms 1 ms

Table 2: Results of Scylla Cloud and Cloud Bigtable under the Zipfian distribution. Cloud Bigtable is not able to sustain the full 90,000 requests per second. Scylla Cloud was able to sustain 26x the throughput, and with read latencies 1/800th and write latencies less than 1/100th of Cloud Bigtable.

A Few Words about Consistency/Cost

Scylla Cloud leverages instance-local ephemeral storage for its data, meaning parts of the dataset will not survive hardware failures in the case of a single replica. This means that running a single replica is not acceptable for data durability reasons— and that’s not even a choice in Scylla Cloud’s interface.

Due to its choice of network storage, Cloud Bigtable, on the other hand, does not lose any local data when an individual node fails, meaning it is reasonable to run it without any replicas. Still, if an entire zone fails service availability will be disrupted. Also, databases under the hood have to undergo node-local maintenance, which can temporarily disrupt availability in single-replica setups as this article does a good job of explaining.

Still, it’s fair to say that not all users require such a high level of availability and could run single-zone Cloud Bigtable clusters. These users would be forced to run more zones in Scylla Cloud in order to withstand node failure without data loss, even if availability is not a concern. However, even if we compare the total Scylla Cloud cost of $44,640.00 per year with the cost of running Cloud Bigtable in a single zone (sacrificing availability) of $70,416.96, or $140,833.92 for two zones, Scylla Cloud is still a fraction of the cost of Cloud Bigtable.

Note that we did not conduct tests to see precisely how many Cloud Bigtable nodes would be needed to achieve the same required 90,000 ops/second throughput under Zipfian distribution. We had already scaled Cloud Bigtable to 36 nodes (versus the 3 for Scylla) to simply achieve 90,000 ops/second under uniform distribution at the required latencies. However, Cloud Bigtable’s throughput under Zipfian distribution was 1/26th that of Scylla.

Theoretically, presuming Cloud Bigtable throughput scaling continued linearly under a Zipfian distribution scenario, it would have required over 300 nodes (as a single replica/AZ) to more than 900 nodes (triple replicated) to achieve the same 90,000 ops as the SLA required. The annual cost for a single-replicated cluster of that scale would have been approximately $1.8 million annually; or nearly $5.5 million if triple-replicated. That would be, 41x or 123x the cost of Scylla, respectively. Presuming a middle-ground situation where Cloud Bigtable was deployed in two availability zones, the cost would be approximately $3.6 million annually; 82x the cost of Scylla Cloud running triple-replicated. We understand these are only hypothetical projections and welcome others to run these Zipfian tests themselves to see how many nodes would be required to meet the 90,000 ops/second test requirement on Cloud Bigtable.

A Single Hot Row

We want to understand how each offering will behave under the extreme case of a single hot row being accessed over a certain period of time. To do that, we kept the same YCSB parameters, meaning the client still tries to send 90,000 requests per second, but set the key population of the workload to a single row. Results are summarized in Table 2.

Single Hot Row
Overall Throughput
(ops/second)
p95 latency, milliseconds
READ UPDATE
Google Cloud Bigtable 180 7,733 ms 5,365 ms
Scylla Cloud 35,400 73 ms 13 ms

Table 3: Scylla Cloud and Cloud Bigtable results when accessing a single hot row. Both databases see their throughput reduced for this worst-case scenario, but Scylla Cloud is still able to achieve 35,400 requests per second. Given the data anomaly, neither Scylla Cloud nor Cloud Bigtable were able to meet the SLA, but Scylla was able to perform much better, with nearly 200 times Cloud Bigtable’s anemic throughput and multi-second latency. This Scylla environment is 1/5th the cost of Cloud Bigtable.

We see that Cloud Bigtable is capable of processing only 180 requests per second, and the latencies, not surprisingly, shoot up. As we can see in Figure 5, from Cloud Bigtable monitoring, although the overall CPU utilization is low, the hottest node is already bottlenecked.

Figure 7: Single Hot Row performance for Bigtable

Figure 5: While handling a single hot row, Cloud Bigtable shows its hottest node at 100% utilization even though overall utilization is low.

Scylla Cloud is also not capable of processing requests at the rate the client sends. However the rate drops to a baseline of 35,400 requests per second.

This result clearly shows that Scylla Cloud has a far more efficient engine underneath. While higher throughput can always be achieved by stacking processing units side by side, a more efficient engine guarantees better behavior during worst-case scenarios.

Why Did Zipfian Latencies Go Down?

We draw the reader’s attention to the fact that while the uniform distribution provides the best case scenario for resource distribution, if the dataset is larger than memory (which it clearly is in our case), it also provides the worst-case scenario for the database internal cache as most requests have to be fetched from disk.

In the Zipfian distribution case, the number of keys being queried gets reduced and the ability of the database to cache those values improves and, as a result, the latencies get better. Serving requests from cache is not only cheaper in general, but also easier on the CPU as well.

With each request being cheaper due to their placement in the cache, the number of requests that each processing unit can serve also increases (note that the Single Hot Row scenario provides the best case for cache access, with a 100% hit rate). As a combination of those two effects, the busiest CPU in the Scylla Cloud nodes is working less than in the uniform case and is actually further away from the bottleneck than it was in the uniform case.

We don’t mean to imply that Scylla Cloud will always perform better in Zipfian cases than uniform. The results will depend on which set of many, many real-life competing factors wins. But we do believe that our results for both the Zipfian and Single Hot Row scenario clearly show that Scylla Cloud performs better under real-life conditions than the Cloud Bigtable offering.

Optimize Your Data Model

All distributed databases will prefer near-perfect access distribution. In some cases, it may be impossible, like the hot row case, and some of them are hard to plan for. However, it is possible to optimize your data model by making the primary key a composite key and adding another column to it. For instance, in the case where your primary key is a customer ID, you can make a composite key of the ID and another field: Location or Date or sub department. In case your key is an IoT device, add the date to it with year/month/day/hour granularity of your choice.

Data model optimization comes with a price, apart from the development time, when you need to scan all of the items that belong to a customer ID or all of the events from the IoT device, you will need to scan multiple composite keys, causing an increase in read times, reduction of consistency and complex development.

Of course, it is much better when the database can offload these complexities on your behalf!

Conclusion

When both Scylla Cloud and Google Cloud Bigtable are exposed to synthetic lab, well-behaved, uniform workload, Scylla Cloud is 1/5th the cost of Cloud Bigtable. However, under more likely real-world Zipfian distribution of data, which we see in both our own customer experience as well as academic research and empirical validation, Cloud Bigtable is no where near meeting the desired SLA and is therefore not practical.

To recap our results, as shown below, Scylla Cloud is 1/5th the expense while providing 26x the performance of Cloud Bigtable, better latency and better behavior in the face of hot rows. Scylla met its SLA with 90,000 OPS and low latency response times while Bigtable couldn’t do more than 3,450 OPS. As an exercise to the reader, try to execute or compute the total cost of ownership of a Bigtable, real-life workload at 90,000 OPS. Would it be 5x (the Scylla savings) multiplied by 26x? Even if you decrease the Bigtable replication to two zones or even a single zone, the difference is enormous.

We explained how to optimize the data model by composite keys as a workaround to Bigtable’s limitation. However, that approach requires development time and makes scans more expensive, less consistent and more complicated. You may still hit a hot row that cannot return more than 180 requests at a time with Bigtable.

Scylla Cloud vs. Google Cloud Bigtable: Bottom Line
As you can see, Scylla has a notable advantage on every metric and on through data test we conducted. We invite you can judge how much better it is in a scenario similar to your needs.

Cloud Bigtable is an offering available exclusively on the Google Cloud Platform (GCP), which locks the user in to Google as both the database provider and infrastructure service provider — in this case, to its own public cloud offering.

Scylla Cloud is a managed offering of the Scylla Enterprise database. It is available on Amazon Web Services (AWS) at the moment, and is soon coming to GCP and other public clouds. Beyond Scylla Cloud, Scylla also offers Open Source and Enterprise versions, which can be deployed to private clouds, on-premises, or co-location environments. While Scylla Cloud is ScyllaDB’s own public cloud offering, Scylla provides greater flexibility and does not lock-in users to any particular deployment option.

Yet while vendors like us can make claims, the best judge of performance is your own assessment based on your specific use case. Give us a try on Scylla Cloud and please feel free to drop in to ask questions via our Slack channel.


Appendix

In the following sections, we will provide a detailed discussion of what is behind the choices made in this comparison.

Google Cloud Bigtable Replication Settings and Test Setup

In Cloud Bigtable terminology, replication is achieved by adding additional clusters in different zones. To withstand the loss of two of them, we set three replicas, as shown below in Figure 7:

Figure 1: Google Cloud Bigtable setupFigure 7: Example setting of Google Cloud Bigtable in three zones. A single zone is enough to guarantee that node failures will not lead to data loss, but is not enough to guarantee service availability/data-survivability if the zone is down.

Cloud Bigtable allows for different consistency models as described in their documentation according to the cluster routing-model. By default, data will be eventually consistent and multi-cluster routing is used, which makes for highest availability of data. Since eventual consistency is our goal, Cloud Bigtable had their settings kept at the defaults and multi-cluster routing is used. Figure 8 shows the explanation about routing that can be seen in Cloud Bigtable’s routing selection interface.

Figure 2: Bigtable routing comparison

Figure 8: Google Cloud Bigtable settings for availability. In this test, we want to guarantee maximum availability across three zones.

We verified that the cluster is, indeed, set up in this mode, as it can be seen in Figure 11 below:

Figure 3: Application configuration for Google Cloud Bigtable

Figure 9: Google Cloud Bigtable is configured in multi-cluster mode.

Scylla Cloud Replication Settings and Test Setup

For Scylla, consistency is set in a per-request basis and is not a cluster-wide property, as described in Scylla’s documentation (referred to as “tunable consistency”). Unlike Cloud Bigtable, in Scylla’s terminology all nodes are part of the same cluster. Replication across availability zones is achieved by adding nodes present in different racks and setting the replication factor to match. We set up the Scylla Cluster in a single datacenter (us-east-1), and set the number of replicas to three (RF 3), placing them in three different availability zones within the us-east-1 region for AWS. This setup can be seen in Figure 12 below:

Figure 4: Scylla Cloud setup in 3 availability zones.

Figure 10: Scylla Cloud will be set up in 3 availability zones. To guarantee data durability, Scylla Cloud requires replicas, but that also means that any Scylla Cloud setup is able to maintain availability of service when availability zones go down.

Eventual consistency is achieved by setting the consistency of the requests to LOCAL_ONE for both reads and writes. This will cause Scylla to acknowledge write requests as successful when one of the replicas respond, and serve reads from a single replica. Mechanisms such as hinted handoff and periodic repairs are used to guarantee that data will eventually be consistent among all replicas.

Client Setup

We used the YCSB benchmark running across multiple zones in the same region as the cluster. To achieve the desired distribution of 50% reads and 50% updates, we will use YCSB’s Workload A, which is already pre-configured to have that ratio. We will keep the workload’s default record size (1kB), and add one billion records— enough to generate approximately 1TB of data on each database.

We will then run the load for the total time of 1.5 hours. At the end of the run, YCSB produces a latency report that includes the 95th-percentile latency per each client. Aggregating percentiles is a challenge on its own. Throughout this report, we will use the client that reported the highest 95th-percentile as our number. While we understand this is not the “real” 95th-percentile of the distribution, it at least maps well to a real-world situation where the clients are independent and we want to guarantee that no client sees a percentile higher than the desired SLA.

We will start the SLA investigation by using a uniform distribution, since this guarantees good data distribution across all processing units of each database while being adversarial to caching. This allows us to make sure that both databases are exercising their I/O subsystems and not relying only on in-memory behavior.

1. Google Cloud Bigtable Clients

Our instance has 3 clusters all in the same region. We spawned 12 small (4 cpus, 8GB RAM) GCE machines, 4 in each of the 3 zones where Cloud Bigtable clusters are located. Each client ran 1 YCSB client with 50 threads and a target of 7,500 ops per second; for a total of 90,000 operations per second. The command used is:

~/YCSB/bin/ycsb run googlebigtable -p requestdistribution=uniform -p columnfamily=cf -p google.bigtable.project.id=$PROJECT_ID -p google.bigtable.instance.id=$INSTANCE_ID -p google.bigtable.auth.json.keyfile=auth.json -p recordcount=1000000000 -p operationcount=125000000 -p maxexecutiontime=5400 -s -P ~/YCSB/workloads/$WORKLOAD -threads 50

2. Scylla Cloud Clients

Our Scylla cluster has 3 nodes spread across 3 different Availability Zones. We spawned 12 c5.xlarge machines, 4 in each AZ, running one YCSB client with 50 threads and a target of 7,500 ops per second; for a total of 90,000 ops per second. For this benchmark, we used a YCSB version that handles prepared statements, which means all queries will be compiled only once and then reused. We also used the Scylla-optimized Java driver. Although Scylla is compatible with Apache Cassandra drivers, it ships with optimized drivers that increase performance through Scylla-specific features.

~/YCSB/bin/ycsb run cassandra-cql -p hosts=$HOSTS -p recordcount=1000000000 -p operationcount=125000000 -p requestdistribution=uniform -p cassandra.writeconsistencylevel=LOCAL_ONE -p cassandra.readconsistencylevel=LOCAL_ONE -p maxexecutiontime=5400 -s -P workloads/$WORKLOAD -threads 50 -target 7500

Availability and Consistency Models

When comparing different database offerings it is important to make sure they are both providing similar data placement availability and consistency guarantees. Stronger consistency and higher availability are often available but come at a cost, so benchmarks should take this into consideration.

Both Cloud Bigtable and Scylla have flexible settings for replication and consistency options, so the first step is to understand how those are set for each offering. Cloud Bigtable does not lose any local data when an individual node fails, meaning it is reasonable to run it without any replicas. Still, nothing will save you from an entire zone failure, i.e., disaster recovery, thus it’s highly recommended to have at least one more availability zone.

Scylla Cloud, on the other hand, utilizes instance-local ephemeral storage for its data, meaning local-node data will not survive hardware failures. This means that running a single replica is not acceptable for data durability reasons, but as a nice side-effect of that any standard Scylla Cloud setup already is replicated across availability zones and the service will be kept available in the face of availability zone failures.

To properly compare such different solutions, we will start with a user-driven set of requirements and will compare the cost of both solutions. We will simulate a scenario in which the user wants the data available in three different zones, with all zones in the same region. This ensures that all communications are still low latency but can withstand failure of up to two zones without becoming unavailable as long as the region is still available.

Among different zones, we will assume a scenario in which the user aims for eventual consistency. This should lead to the most performing setup possible for both offerings given the restriction of maintaining three replicas we imposed above.

The post Going Head-to-Head: Scylla Cloud vs. Google Cloud Bigtable appeared first on ScyllaDB.

Scylla Summit 2019 Call for Speakers

Scylla Summit 2019 Call for Speakers

Got a Real-Time Big Data Story to Share?

Scylla Summit 2019 is coming to San Francisco this November 5-6. Our sessions will cover real-world Scylla use cases, roadmap plans, technical product demonstrations, integrations with NoSQL and other Big Data ecosystem components, training sessions, the latest Scylla news, and much more.

Everyone has a story about their journey to find the best database solution for their specific use case. What’s yours? Are you a developer, architect, designer, or product manager leveraging the close-to-the-hardware design of Scylla or Seastar? Are you an executive, entrepreneur, or innovator faced with navigating the impact of NoSQL databases on your organization? If so, come and share your experiences to help build community and your own profile within it.

Best of all, speakers receive complimentary admission to both the 2-day summit and the pre-summit training!

What Our Attendees Most Want to Hear

Our user and customer talks are the key aspect of Scylla Summit! We’re looking for compelling case studies, technical sessions, technical and organizational best practices, and more. See below for a list of suggested topics, and feel free to recommend others. The deadline for submissions is Friday, June 28th, 2019.

Real-world Use Cases

Everything from design and architectural considerations to POCs, to production deployment and operational lessons learned, including…

  • Practical examples for vertical markets — Ad Tech, FinTech, fraud detection, customer/user data, e-commerce, media/multimedia, social, B2B or B2C apps, IoT, and more.
  • Migrations — How did you get from where you were in the past to where you are today? What were the limitations of other systems you needed to overcome? How did Scylla address those issues?
  • Hard numbers — Clusters, nodes, CPUs, RAM and disk, data size, growth, OPS, throughput, latencies, benchmark and stress test results (think graphs, charts and tables)
  • War stories — What were the hardest Big Data challenges you faced? How did you solve them? What lessons can you pass along?

Tips & Tricks

From getting started to performance optimization to disaster management, tell us your devops secrets, or unleash your chaos monkey!

  • Computer languages and dev environments — What are your favorite languages and tools? Are you a Pythonista? Doing something interesting in Golang or Rust?
  • Is this Open Source? — Got a Github repo to share? Our attendees would love to walk your code.
  • Integrations into Big Data ecosystem — Share your stack! Kafka, Spark, other SQL and NoSQL systems; time series, graph, in-memory integrations (think “block diagrams”).
  • Seastar — The Seastar infrastructure, which is the heart of our Scylla database, can be used for other projects as well. What sort of systems architecture challenges are you tackling with Seastar?

Event Details and Call for Speakers Schedule

  • Dates: Tuesday-Wednesday, November 5-6, 2019 (Pre-Summit Training Day on November 4)
  • Location: Parc 55 Hotel, 55 Cyril Magnin Street, San Francisco, CA
  • Speaker Submission Deadline: Friday, June 28th, 2019, 5:00 PM
  • Speakers receive complimentary admission to the 2-day summit and 1-day pre-summit training (on Monday, November 4, 2019)
  • Code of Conduct

Required Information

You’ll be asked to include the following information in your proposal:

  • Proposed title
  • Description
  • Suggested main topic
  • Audience information
  • Who is this presentation for?
  • What will the audience take away?
  • What prerequisite knowledge would they need?
  • Videos or live demos included in presentation?
  • Length of the presentation (30-45 minutes recommended)

Eight Tips for a Successful Speaker Proposal

Help us understand why your presentation is the right one for Scylla Summit 2019. Please keep in mind this event is made by and for deeply technical professionals. All presentations and supporting materials must be respectful and inclusive.

  1. Be authentic — Your peers need original ideas with real-world scenarios, relevant examples, and knowledge transfer
  2. Be catchy — Give your proposal a simple and straightforward but catchy title
  3. Be interesting — Make sure the subject will be of interest to others; explain why people will want to attend and what they’ll take away from it
  4. Be complete — Include as much detail about the presentation as possible
  5. Don’t be “pitchy” — Keep proposals free of marketing and sales. We tend to ignore proposals submitted by PR agencies and require that we can reach the suggested participant directly.
  6. Be understandable — While you can certainly cite industry terms, try to write a jargon-free proposal that contains clear value for attendees
  7. Be deliverable — Sessions have a fixed length, and you will not be able to cover everything. The best sessions are concise and focused. Overviews aren’t great in this format; the narrower your topic is, the deeper you can dive into it, giving the audience more to take home
  8. Be cautious — Live demos sometimes don’t go as planned, so we don’t recommend them

SUBMIT A SPEAKER PROPOSAL

Super Early Bird Registration for Scylla Summit 2019 is open!

Follow us on Twitter or subscribe to our blog to stay up to date!

The post Scylla Summit 2019 Call for Speakers appeared first on ScyllaDB.

Anomalia Machina 10 – Final Results: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra

In this tenth and final blog of the Anomalia Machina series we tune the anomaly detection system and succeed in scaling the application out from 3 to 48 Cassandra nodes, and get some impressive numbers: 574 CPU cores (across Cassandra, Kafka, and Kubernetes clusters), 2.3 Million writes/s into Kafka (peak), 220,000 anomaly checks per second (sustainable), which is a massive 19 Billion anomaly checks a day.

1. The Scaling Journey

Odysseus’s final challenge was to regain his throne! Odysseus finally reached his homeland of Ithaca only to find his palace overrun with a crowd of 108 suitors who were drinking his wine, slaughtering his cattle, and courting his wife, Penelope. If he had rushed in without thinking, it would probably have ended badly. Instead, disguised as a beggar, he planned to recapture his throne. Penelope announced to the suitors that she would marry the man who could string the bow of Odysseus and shoot an arrow through 12 axes placed in a row. The suitors all tried and failed. When the “beggar” tried, he accomplished the feat! Throwing off his disguise, Odysseus fought and eliminated all the suitors.

Likewise, rather than attempting to jump straight to the end of the story and run the Anomalia Machina Application at a massive scale we thought it prudent to scale it out on increasingly bigger clusters. The aim was to (a) discover how to scale it before committing a larger amount of resources, and (b) document how well it scales with increasing cluster sizes.

Anomalia Machina 10 - Odysseus eliminates Penelope's suitors

Odysseus eliminates Penelope’s suitors

1.1 Preparations

For the initial scalability testing, I simplified the approach to focus on Cassandra scalability and application tuning on my Kubernetes cluster. To do this I used a small production Kafka cluster (3 nodes with 4 cores each) to “replay” the same events that I had previously sent to it.  Event reprocessing is a rich use case for Kafka that we explored in the blog Exploring the Apache Kafka “Castle” Part B: Event Reprocessing. Kafka consumers also place a very low load on Kafka clusters compared with producers, so this ensured that Kafka was not a bottleneck and the results were repeatable as I scaled the rest of the system.

To get the Anomalia Machina application ready to scale there were a few things I improved from the previous blog.  Given the likely increase in the number of Pods to monitor with Prometheus, the previous approach of running the Prometheus server on a laptop and manually adding each Pod IP address to the configuration file was no longer workable.  To fix this I deployed Prometheus to the Kubernetes cluster and automated Pod monitoring using the Prometheus Operator.

Anomalia Machina 10 - Prometheus Kubernetes Operator

Prometheus Kubernetes Operator

The first step is to install the Prometheus Operator and get it running. I did this by copying the yaml file:

https://github.com/coreos/prometheus-operator/blob/master/bundle.yaml

to my local machine, and running:

kubectl apply -f bundle.yaml

The Prometheus operator works by dynamically detecting and monitoring Pods with labels that match a selector.  Some assembly is required.

  1. Service Objects monitor Pods (with labels that match the selector)
  2. ServiceMonitors discover Service Objects
  3. Prometheus objects specify which ServiceMonitors should be included
  4. To access the Prometheus instance it must be exposed to the outside world (e.g. by using a Service of type NodePort)
  5. When it doesn’t work the first time, you most likely need to create Role-based access control rules for both Prometheus and Prometheus Operator

See this documentation for all the steps and examples.  

But now that Prometheus was running in the Kubernetes cluster it’s not easy for it to monitor the Kafka load generator (Kafka producer application) running on standalone AWS EC2 instances. I also wanted to ensure sufficient resources for the Kafka producer, so an obvious solution was to deploy it into the Kubernetes cluster. This turned out to be easy, just a simple copy/edit of the existing Kubernetes deployment artefacts to create a new Deployment type from the producer jar file. The Kafka producer load can now be easily increased by scaling the number of Pods.  This also enables unified monitoring of both the producer application and the detector pipeline application by the Prometheus Operator. We’re now ready to continue with the scaling journey.

1.2 Pre-tuning

Anomalia Machina 10 - Pre-tuning

Pre-tuning: “La Jamais Contente”, first automobile to reach 100 km/h in 1899 (electric, 68hp)

There are a few knobs to twiddle to tune the anomaly detector pipeline part of the application. Each Pod has 2 Kafka consumers (in a thread pool) reading events from the Kafka cluster and another thread pool which performs Cassandra writes and reads and runs the anomaly detection algorithm. A single Cassandra connection is initially started per Pod, but this can be increased automatically by the Cassandra Java driver, if the connection becomes overloaded (it didn’t).

There are therefore 2 parameters that are critical for tuning the application with increasing scale: (1) The number of Kafka consumers, and (2) the number of detector threads per Pod.   The number of partitions for the Kafka topic must be greater than or equal to the number of Kafka consumers to ensure that every consumer receives data (any consumers in excess of the number of partitions will be idle). The number of detector threads per Pod is critical, as throughput peaks at a “magic” number, and drops off if there are more or less threads.

I initially assumed that I could scale-out by the relatively simple process of (1) tuning the detector thread pool at low load (1 Pod) for a 3 node cluster, and then (2) increasing the number of Pods for each bigger cluster until maximum throughput was obtained.  I used this approach for increasing cluster sizes, doubling the number of nodes from 3 to 6, 12, and 24 nodes. Surprisingly this gave sub-linear scalability as shown by this graph.

Anomalia Machina 10 - TPS(Pre-tuning)

Given the theoretical perfect linear scalability of Cassandra with more nodes, something was obviously wrong with this application tuning approach.

1.3 Post-tuning

To gain more visibility into what was actually going on, I made more metrics visible in Prometheus including the throughput for the Kafka consumers, the detector thread pool, and different detector events including detector run or not run (>= 50 rows returned from Cassandra or < 50 rows), and the number of rows of data read from Cassandra. This enabled confirmation that the business metric was correct (only reporting when >= 50 rows for every ID), and checking/tuning the thread pool so that the consumer/detector rates are steady state (one is not getting ahead/behind the other), as this can be suboptimal.

I then reran the benchmark, and using the increased visibility the extra metrics gave, improved the tuning approach for smaller iterations of cluster sizes (adding only 3 nodes at a time rather than doubling the number as above). Adding extra nodes to an existing Cassandra cluster is easy with an Instaclustr managed Cassandra cluster, as there is a button on the console to automatically add extra nodes.  This also sped up the benchmarking process as I didn’t have to reload the data into Cassandra at the start of each run (as I did for a new cluster each time), as it was already in place.

Note that I kept the Kafka cluster the same size for these experiments (as load from the Kafka consumers was minimal, < 10% CPU), but I did increase the size of the Kubernetes cluster by increasing the number of Kubernetes worker nodes each time CPU utilisation went over 50% to ensure the application resources weren’t a bottleneck.

The following graphs show the post-tuning results for clusters from 3 to 21 nodes, with linear extrapolation to 50 nodes.  The number of expected Pods for 50 nodes is approximately 100.

Anomalia Machina 10 - Worker Pods

A similar graph but this time showing the predicted number of Kafka consumers (2 x Pods) for a 50 node cluster to be around 200.

Anomalia Machina 10 - Kafka Consumers

This graph shows the results of tuning the number of detector threads per pod with increasing Cassandra nodes, and linear extrapolation predicts more than 250 threads per Pod for a 50 node Cassandra cluster.

Anomalia Machina 10 - Detector Threads Per Pod

Note that there was up to +/-10% variation in results across runs with identical configurations, and that the tuning may not be 100% optimal for some of the middle-sized clusters (i.e. slightly too many pods and insufficient threads per pod). However, it is likely that extrapolation over 7 data points gives reasonable predictions for bigger clusters.

Did all the tuning help? Yes, from 3 to 21 node Cassandra clusters we now have significantly better scalability compared with the first attempt, now close to perfect linear scalability (within the +/-10% error variation) as shown in this graph.

Anomalia Machina 10 - TPS (Pre vs Post Tuning)

Observation: Apache Cassandra is perfectly linear scalable (as it’s a shared nothing architecture there are no shared resources that can become bottlenecks with increased nodes), but you need to put some effort into application optimisation. Cassandra will handle large numbers of connections, but for good scalability try to minimise the total number of Cassandra connections by optimising the use of each connection.

Anomalia Machina 10 - Post Tuning, Fast forward 120 years

Post-tuning: Fast-forward 120 years… “Pininfarina Battista” the fastest car in the world, 0-100 kph in 2 seconds, top speed 350 kph (electric, 1,900hp).

2. Results at Massive Scale

  • 2.3 Million writes/s into Kafka (peak)
  • 220,000 anomaly checks per second (sustainable)
    • 400 million events checked for anomalies in 30 minutes
    • 19 Billion anomaly checks a day
  • 6.4 Million events/s total “system” throughput (peak)
  • 574 CPU cores across Cassandra, Kafka, Kubernetes

For the final run, we revisited the original “Kafka as a buffer” use case (to decouple event producers from consumers). We want a Kafka cluster that will process at least 2 Million write/s for a few minutes to cope with load spikes while enabling the rest of the anomaly detection pipeline to scale and run at maximum capacity to process the backlog of events as fast as possible.

Based on the experience of tuning the application up to a 21 node Cassandra cluster we hopefully have sufficient experience to tackle the final challenge and scale up to something even bigger – a 48 node cluster.

2.1 Kafka Cluster Sizing 

Based on the predictions above it looked like we needed 200 partitions on the Kafka side. I, therefore, spun up some different sized Kafka clusters and experimented with increasing producer throughputs and number of partitions.

Using a 6 node (4 CPU cores per node) Kafka cluster as a starting point, it’s apparent that the write throughput drops significantly with increasing partitions (This turns out to be due to the number of partitions being written to, rather than just in existence, as writing to only 6 out of 600 partitions results in the same throughput as if there were only 6 partitions).

Anomalia Machina 10 - Partitions vs Max Write:s

Using bigger Kafka node sizes (8 cores per node) gets us into the target (>= 2M write/s) range for 200 partitions (right hand orange bar), but the cluster is close to maxed out, so we decided to use a 9 node (8 CPU cores per node) Kafka cluster, as we don’t want the Kafka cluster to be a bottleneck.

Anomalia Machina 10 - Kafka maximum write throughput

Initial testing revealed that 9 Kafka Producer Pods was sufficient to exceed the write/s target of 2M/s for the 9×8 Kafka cluster with 200 partitions.

2.2 48 Node Cassandra Cluster

To get the final results we spun up a 48 node Cassandra cluster on AWS using the Instaclustr console. We then tuned the application thread pool (thread pool 2 in the diagram in section 2.4) and increased the number of Pods while monitoring the application metrics in Prometheus, and the Kafka and Cassandra cluster metrics using the Instaclustr console. We reached the maximum anomaly checks/s with 100 Pods, with 300 detector threads per Pod (slightly more than predicted, giving a total of 30,000 detector pipeline application threads), and with the Cassandra cluster running close to flat out at 97% CPU (higher than recommended for a production cluster), and Kafka with some headroom at 66% CPU.

To test the Kafka as a buffer use case we switched from replaying existing Kafka events to reading new events, ramped up the Kafka producer load over 2 minutes, and held the load at maximum for a further 2 minutes before terminating.

After a few false starts, I found it useful to use an open source Kubernetes tool, Weaveworks Scope, to see that everything was working as expected. It is easy to connect to the Kubernetes cluster and supports different views and filtering of nodes. This view shows the main Services (some of which I’d had problems with configuring previously) and shows that Prometheus is correctly deployed and monitoring 100 Consumer Pods and 9 Producer Pods via the Prometheus operator.

Anomalia Machina 10 - Weavescope

Here are the specifications of the final system.

Cluster Details (all running in AWS, US East North Virginia)

Instaclustr managed Kafka – EBS: high throughput 1500 9 x r4.2xlarge-1500 (1,500 GB Disk, 61 GB RAM, 8 cores), Apache Kafka 2.1.0, Replication Factor=3

Instaclustr managed Cassandra – Extra Large, 48 x i3.2xlarge (1769 GB (SSD), 61 GB RAM, 8 cores), Apache Cassandra 3.11.3, Replication Factor=3

AWS EKS Kubernetes Worker Nodes – 2 x c5.18xlarge (72 cores, 144 GB RAM, 25 Gbps network), Kubernetes Version 1.10, Platform Version eks.3

2.3 Raw Results in Prometheus and Grafana

Here are the raw results. The average latency of the detector pipeline (from reading an event from Kafka, to deciding if it is an anomaly or not) was under 460ms for this test as shown in this Prometheus graph.

Anomalia Machina 10 - Raw results in Prometheus

The next graph shows the Kafka producer ramping up (from 1 to 9 Kubernetes Pods), with 2 minutes load time, peaking at 2.3M events/s (this time in Grafana). Note that because each metric was being retrieved from multiple Pods I had to view them as stacked graphs to get the total metric value for all the Pods.

Anomalia Machina 10 - Total Metric value for all the pods

This graph shows the anomaly check rate reaching 220,000 events/s and continuing (until all the events are processed). Prometheus is gathering this metric from 100 Kubernetes Pods.

Anomalia Machina 10 - Prometheus gathering metric from 100 Kubernetes Pods

2.4 Is this a good result?

Ten years ago it was considered “impractical to present an entire series of transactions” to an anomaly detection system. Instead, they recommended using aggregated historical data. However, we’ve demonstrated that current technology is more than up to the task of detecting anomalies from the raw transactions, rather than having to rely on aggregated data.

How do our results compare with more recent results? Results published in 2018, for a similar system, achieved 200 anomaly check/s using 240 cores. They used supervised anomaly detection which required training of the classifiers (once a day), so they used Apache Spark (for ML, feature engineering, and classification), as well as Kafka and Cassandra.  Taking into account resource differences, our result is around 500 times higher throughput, and with faster real-time latency. They had more overhead due to the “feature engineering” phase, and their use of Spark to run the classifier introduced up to 200s latency, making it unsuitable for real-time use.  With a detection latency under 1s (average 500ms), our solution is fast enough to provide real-time anomaly detection and blocking. If the incoming load exceeds the capacity of the pipeline for brief periods of time the processing time increases, and potentially anomalous transactions detected then may need to be handled differently.

2.5 Analysis

To summarise, the maximum Kafka writes/s reached 2.3M/s, while the rest of the pipeline managed a sustainable 220,000 anomaly checks/s.

Anomalia Machina 10 - Analysis-Final Results

The numbers are actually bigger than this if we take into account all the events in the complete system (i.e. all the events flowing between the distributed systems). In the previous blog we showed that for every anomaly check decision, there are many other events contributing to it. For the load spike scenario, we need to take into account the bigger Kafka load spike (Y, blue line, 2.3M/s) and the smaller detector pipeline rate (Z, orange line, 220,000/s):

Anomalia Machina 10 - Analysis - Load Spike

The peak total system throughput calculation is slightly different from the previous steady-state calculation as the load spike (Y) is produced by the Kafka load generator (step 1) and written to the Kafka cluster (step 2) until the rest of the pipeline catches up and processes them (steps 3-10, at maximum rate Z).

Anomalia Machina 10 - Analysis - Peak system throughput

The peak system throughput is therefore (2 x Peak Kafka load) + (8 x anomaly checks/s) = (2 x 2.3M) + (8 x 0.22) = 6.4 Million events/s as shown in this graph:

Anomalia Machina 10 - Analysis-Peak total system throughput

Here are some more big numbers. The 4 minutes of events (ramp up and load) into Kafka produces 400 million events to be checked, and it takes the anomaly detection pipeline 30 minutes to process all the events.  The following graph shows this scenario (note that rates are in Millions per minute):

Anomalia Machina 10 - Analysis - No background load

A more realistic scenario is to assume that on top of the load spike, there is an average background load of say 50% of the pipeline capacity running continuously (110,000 events/s). It then takes 60 minutes to clear all the events due to the load spike as shown in this graph:

Anomalia Machina 10 - Analysis - 50% detector pipeline capacity background load

Under what circumstances would this be useful? Imagine we have an SLA in place to process say 99% of events per week in under 1s, with an upper bound of 1-hour latency. Assuming load spike events like this are relatively infrequent (e.g. once a week) then these scenarios can satisfy a 99.4% SLA (1 hour is 0.6% of a week).

And for our final and Biggest Number, the following graph shows that our largest Anomalia Machine system with 48 Cassandra nodes has more than sufficient capability to process 19 Billion anomaly checks a day.

Anomalia Machina 10 - Analysis - Transactions per day

2.6 How Big is Anomalia Machina?

How big is our final Anomalia Machina “machine”?  Here’s a graph showing the business metric vs the number of cores used for the Cassandra, Kafka, and Kubernetes clusters and the total system.

Anomalia Machina 10 - Throughput vs CPU Cores Used

The complete machine for the biggest result (48 Cassandra nodes) has 574 cores in total.  This is a lot of cores! Managing the provisioning and monitoring of this sized system by hand would be an enormous effort. With the combination of the Instaclustr managed Cassandra and Kafka clusters (automated provisioning and monitoring), and the Kubernetes (AWS EKS) managed cluster for the application deployment it was straightforward to spin up clusters on demand, run the application for a few hours, and delete the resources when finished for significant cost savings. Monitoring over 100 Pods running the application using the Prometheus Kubernetes operator worked smoothly and gave enhanced visibility into the application and the necessary access to the benchmark metrics for tuning and reporting of results.

The system (irrespective of size) was delivering an approximately constant 400 anomaly checks per second per core.

It is worth noting that the Cassandra cluster is more than 5 times bigger than the Kafka cluster, even though the Kafka cluster is processing an order of magnitude larger load spike (2.3M/s) than the Cassandra cluster (220,000/s). It is obviously more efficient (easier, cheaper, more elastic) to use “Kafka as a buffer” to cope with load spikes rather than to increase the size of the Cassandra cluster by an order of magnitude (i.e. from 48 to 480 nodes!) in a hurry.   However, it is possible to dynamically resize a Cassandra cluster given sufficient warning. Instaclustr’s dynamic resizing for Apache Cassandra enables vertical scaling up or down in minutes (20-30 minutes for a complete cluster, but the capacity starts to increase almost immediately).   The biggest increase in capacity is from r4.large (2 cores) to r.4xlarge (16 cores) giving a capacity increase of 8 times. This would be sufficient for this scenario if used and in conjunction with Kafka as a buffer, and would result in significantly faster processing of the event backlog.  I tried this on a smaller cluster with resizing one node at a time (concurrent resizing is also an option), and it worked flawlessly. For this to work you need to have (1) created a resizable Instaclustr Cassandra cluster, (2) with sufficient nodes to enable vertical scaling to satisfy the target load, and (3) enable elastic scaling of the application on Kubernetes (this is another challenge).

2.7 Affordability at Scale

We have proven that our system can scale well to process 19 Billion events a day, more than adequate for even a large business. So, what is the operational cost to run an anomaly detection system of this size? This graph shows that it only costs around $1,000 a day for the basic infrastructure using on-demand AWS instances.  

Anomalia Machina 10 - Analysis - Anomalia Machina Cost $:day

This graph also shows that the system can easily be scaled up or down to match different business requirements, and the infrastructure costs will scale proportionally. For example, the smallest system we ran still checked 1.5 Billion events per day, for a cost of only $100/day for the AWS infrastructure.

 

Admittedly, the total cost of ownership would be higher (including R&D of the anomaly detection application, ongoing maintenance of the application, Managed service costs, etc). Assuming a more realistic $10,000 a day total cost (x10 the infrastructure cost), the system can run anomaly checks on 1.9 Million events per dollar spent.  

 

___________________________________________________________________

 

Epilogue

Just as Homer’s epic was an exercise in unbounded imagination (e.g. Heroes, gods, monsters such as Polyphemus the one-eyed cannibal Cyclops, the alluring Sirens, Scylla the six-headed sea snake, Calypso the Nymph), the size and use cases of a scalable Anomaly Detector system are only limited by your imagination!  In this series we have demonstrated that a combination of open source technologies (Kafka, Cassandra, Kubernetes, Prometheus) and Instaclustr managed Kafka and Cassandra clusters can scale to detect anomalies hidden in Billions of events a day, provides a significant return/cost ratio and actual dollar savings, and is applicable across many application areas. Do you have a Homeric scale imagination?

 

Postscript

I had a chance encounter with “Kubernetes” (helmsman) related technology recently. The Fijian museum in Suva has the last Fijian ocean going double-hulled canoe (Drua – the Ratu Finau, 14m hull). Here’s a photo with its steering oar (uli), which is over 3m long, but could be managed by one helmsman.

Anomalia Machina 10 - Postscript

The steering oars of the older bigger canoes (36m hulls) were even more massive (as long as this canoe) and needed up to 4 helmsmen to handle them (with the help of ropes) and keep them on course.

Anomalia Machina 10 - Fijian Canoe

To find out more about our Managed Platform for Open Source Technologies, contact us or sign up for a free trial.

The post Anomalia Machina 10 – Final Results: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra appeared first on Instaclustr.

What’s New in Scylla Monitoring Stack 2.3

New CQL Optimization Dashboard

We just released Scylla Monitoring Stack version 2.3. The new version comes with dashboards to support the coming Scylla Enterprise 2019.1 release and for the Scylla Manager 1.4 release.

Making the Scylla Monitoring Stack more robust

Templates

Scylla Monitoring Stack 2.3 improves the way Scylla Monitoring works with templates and makes some of the magic of dashboard generation more visible and explicit.

Scylla Monitoring Stack uses Grafana for its front end dashboards. Grafana dashboards definitions are verbose and hard to maintain. To make dashboard maintenance easier we use a hierarchical template mechanism. You can read more about it the blog post here.

We use a python script to generate the dashboards from the template. This created a dependency on python for the solution to work.

As of Scylla Monitoring Stack 2.3, the dashboards will be available pre-generated with the release. Which means, that by default, you no longer have a python dependency.

Making changes to the dashboards

If you are making changes to the dashboard, you will need to run generate-dashboards.sh for the changes to take effect. Note that generate-dashboards.sh will change the dashboards in place and that the grafana server will update the changes without a restart and does depend on python.

Docker and Permissions

Using Docker Containers is an easy way to install the different servers of the Scylla Monitoring Stack. Containers bring a layer of isolation with the intent to provide additional security. In practice, many users run into issues when a process inside the container needs to access and modify files outside of the container. This happens with the Prometheus data directory, and now, with Grafana dashboards and plugins.

Many users when facing this problem tends to use some workaround to bypass the Linux security, examples are using root (running with sudo) changing directory permissions to all, and disabling SELinux.

All these workarounds are unadvised. We made multiple changes in the way we run the containers so it will not be necessary.

Best Practices for using Scylla Monitoring Stack:

  • Do not use root, use the same user for everything (e.g. centos)
  • Add that user to the docker group (See here)
  • Use the same user when downloading and extracting Scylla Monitoring Stack
  • If you did use sudo in the past, it is preferential to change the directory and file ownership instead of granting excessive root permissions.

Controlling the alerts configuration files from the command line

It is now possible to override the Prometheus alert file and the alertmanager configuration files from the command line.

The Prometheus alert file, describe what alerts will be triggered and when. To specify the Prometheus alert file use the -R command line with start-alls.h

For example: ./start-all.sh -R promtheus.rules.yml

The Alertmanager configuration describes how to handle alerts. To specify the alert manager config file use the -r command line operator with start-all.sh

For example: ./start-all.sh -r rules.yml

As of Scylla Monitoring Stack 2.3 all directory and files can be passed either as a relative path or as an absolute path.

Generating Prometheus configuration with genconfig.py

genconfig.py is a utility that can generate the scylla_sever files for Prometheus. In Scylla Monitoring Stack 2.3 there are multiple enhancement to it.

  • It now supports dc and cluster name.
  • You can use the output from nodetool status as input, this will make sure that your datacenters are configured correctly.
  • It no longer creates the node_exporter file that was deprecated.

New panels to existing dashboards

CQL Optimization: Cross shard

Scylla uses a shared-nothing model that shards all requests onto individual cores. Scylla runs one application thread-per-core, and depends on explicit message passing, not shared memory between threads. This design avoids slow, unscalable lock primitives and cache bounces.

Ideally, each request to a Scylla node reaches the right core (shard), avoiding internal communication between cores. This is not always the case, for example, when using a non-shard-aware Scylla driver (see more here)

New panels in the cql optimization dashboard were added to help identify cross-shard traffic.

Cross-shard traffic

Per-machine Dashboard: Disk Usage Over time

Answering user request to show the disk usage as a graph over time, the disk size panel shows the aggregation disk usage (by instance, dc or cluster)

Storage usage over time

Conclusion

Now you’ve seen the changes that were made in Scylla Monitoring Stack 2.3 to make it easier to run and more secure. The next step is yours! Download Scylla Monitoring Stack 2.3 directly from Github. It’s free and open source. If you try it, we’d love to hear your feedback, either by contacting us privately or sharing your experience with your fellow users on our Slack channel.

The post What’s New in Scylla Monitoring Stack 2.3 appeared first on ScyllaDB.

Scylla Monitoring Stack Release 2.3

Scylla Monitoring Stack Release Notes

The Scylla team is pleased to announce the release of Scylla Monitoring Stack 2.3.

Scylla Monitoring Stack is an open source stack for monitoring Scylla Enterprise and Scylla Open Source, based on Prometheus and Grafana. Scylla Monitoring Stack 2.2 supports:

  • Scylla Open Source versions 2.3 and 3.0
  • Scylla Enterprise versions 2018.x and 2019.x (upcoming release)
  • Scylla Manager 1.3.x, 1.4.x (upcoming release)

Related Links

New in Scylla Monitoring Stack 2.3

  • Scylla enterprise dashboards for 2019.1 (#538)
    Getting ready for Scylla Enterprise 2019.1, the dashboards for Scylla Enterprise 2019.1 are included.
  • Scylla manager dashboard for 1.4 (#557)
    Getting ready for Scylla Manager 1.4, the dashboard for Scylla Manager 1.4 is included.
  • Dashboards are precompiled in the release
    Scylla Monitoring Stack uses templates for simpler dashboard representation. A python script is used to generate the dashboards from the templates. As of Scylla Monitoring version 2.3, the dashboards will be packaged pre-compiled part of the release. This will remove the dependency in python for users that do not make changes to the dashboards.In addition, the starting script will not generate new dashboards on change, but will only issue a warning that a dashboard was changed and would ask the user to run the generation script.
  • Add cross_shard_ops panel to cql optimization (#553)
    Scylla uses a shared-nothing model that shards all requests onto individual cores. Scylla runs one application thread-per-core, and depends on explicit message passing, not shared memory between threads. This design avoids slow, unscalable lock primitives and cache bounces.Ideally, each request to a Scylla node reaches the right core (shard), avoiding internal communication between cores. This is not always the case, for example, when using a non-shard-aware Scylla driver (see more here)New panels in the cql optimization dashboard were added to help identify cross-shard traffic.Cross-shard traffic
  • Cluster name in use is shown in the dashboard (#533)
    To simplify monitoring multi-cluster installation the cluster name is now shown on the dashboards header.Cluster Name
  • genconfig.py with multi dc support (#513)
    The genconfig.py utility can accept datacenter name, it can also be used with nodetool output for simpler configuration.
  • Add a storage usage over time panel (#466)
    Add a panel to the per-machine dashboard that shows the disk usage over time.Storage usage over time
  • Upgrade Prometheus to 2.7.2 (#456)
    Prometheus container now uses Prometheus version 2.7.2, see Prometheus releases for more information
  • Show more information about compaction (#491)
    Two panels were added to the per-server dashboard, one that shows the percentage of CPU used by compaction, and one that shows the compaction shares over time.CPU used by compactions
  • Alertmanager and Prometheus alerts can be configured from the command line
    It is now possible to override the Prometheus alert file and the alertmanager configuration files from the command line.To specify the alert manager config file use the -r command line argument with start-all.sh

    For example: ./start-all.sh -r rules.ymlTo specify the prometheus alert file use the -R command line argument with start-alls.hFor example: ./start-all.sh -R prometheus.rules.yml
  • Warn users when starting docker as root and make grafana volume sharable
    Users should avoid running Docker containers as root. When a user would start the Monitoring stack as root, a warning will be issued.
  • Prometheus data directory to accept relative path (#527)
    No need to specify the full path to the Prometheus data directory.

Bug Fixes

  • Prometheus.rules.yaml: Fix the alert rule that warns when there is no CQL connectivity (#541)
  • Not all 2018.1 uses cluster and dc (#540)

The post Scylla Monitoring Stack Release 2.3 appeared first on ScyllaDB.

Veramine Turns to Scylla to Manage Big Data for Enterprise Cybersecurity

Veramine

The threat of cybersecurity is real, pervasive and omnipresent. Real-time global cybersecurity attack maps, such as those maintained by Kaspersky Labs and Fortinet show there are no breaks, vacations or downtime allowance in the world of online and computer security.

The scale, variety, complexity, velocity and ferocity of attacks has compounded year-over-year. The costs associated with such attacks also continues to increase, and cybercrime is estimated to exceed $6 trillion annually by 2021. Beyond the criminal costs, there are also risks that range from national security to personal safety.

Veramine is one company tackling the national security threat. Awarded contracts from the U.S. Air Force and U.S. Department of Homeland Security to defend against cyberthreats, Veramine is also a commercially available service for enterprises.

About Veramine

Veramine provides advanced capabilities for reactive intrusion response and proactive threat detection. Using endpoint telemetry to feed a central server, Veramine scours huge amounts of network data to identify attacker activity. With its advanced detection engine, advanced rule-based and machine-learning algorithms, Veramine can identify Mimikatz-style password dumping, kernel-mode exploitation (local EoP), process injection, unauthorized lateral movement, and other attacker activity.

The Challenge

For Veramine, cybersecurity analysis begins with good data. As such, the company’s goal is to collect as much data from the enterprise network as possible, including both servers and desktops. Events collected by the platform are enriched with context information from the system. For example, every network connection that’s recorded is associated with its originating process, user, time, and associated metadata. The end result is huge and ever-growing data set.

“Even if the performance were only as good as Cassandra, and in fact it’s much better, Scylla would still be a significant improvement.”

— Jonathan Ness, CEO, Veramine

According to Jonathan Ness, CEO of Veramine, “We’re trying to collect everything you would ever want to know about what goes on on a computer and centralize that data and run algorithms on it.”

All that data needs to be stored somewhere. Since the data is so sensitive, very few Veramine customers permit it to be stored in the cloud. Veramine needs to provide low-latency big data capabilities in on-premises datacenters. Given the sensitive nature of the data collected, Veramine personnel were unable to directly access databases to help with support.

Veramine began using Postgres, but quickly realized that a NoSQL database was more appropriate to their use case. They switched to Cassandra, but soon realized that it was not up to the task.

“The problem was every week it was crashing, so we created all this infrastructure just to keep Cassandra alive,” said Ness. Veramine went so far as to parse Cassandra logs in an attempt to predict when garbage collection would happen, and then apply throttling to avoid crashing the database. Without direct access to customer environments, Cassandra soon became a nightmare. The team set out to find a replacement.

The Solution

What was needed was a low-latency NoSQL database that provided extremely low administrative overhead and high stability. Initial attempts to use PostgreSQL did not meet the challenge. And while Cassandra was able to scale, it had operational management problems. That was when the Veramine team turned to Scylla.

Veramine saw instant results from using Scylla. “We started using Scylla two years ago,” said Ness. “We fell in love with Scylla because it doesn’t crash and we don’t have to manage it.” Since Scylla is a feature-complete, drop-in replacement for Cassandra, the migration was quick and painless. “Our code didn’t change much going from Cassandra to Scylla.”

According to Ness, a big benefit of Scylla is developer productivity. Scylla lets the team focus on business logic rather than on custom code around the datastore. Veramine’s Scylla clusters that are running in production are surprisingly small compared to Cassandra.

Ness summed up Veramine’s Scylla journey: “Even if the performance were only as good as Cassandra, and in fact it’s much better, Scylla would still be a significant improvement due to its stability and lower administrative overhead.”



The post Veramine Turns to Scylla to Manage Big Data for Enterprise Cybersecurity appeared first on ScyllaDB.

Install All the THINGS! But especially Apache Cassandra, Apache Spark and Jupyter

Installing Apache Cassandra

Download bits

http://cassandra.apache.org/download/

Untar and Start

Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

Streaming is a process where nodes of a cluster exchange data in the form of SSTables. Streaming can kick in during many situations such as bootstrap, repair, rebuild, range movement, cluster expansion, etc. In this post, we discuss the massive performance improvements made to the streaming process in Apache Cassandra 4.0.

High Availability

As we know Cassandra is a Highly Available, Eventually Consistent database. The way it maintains its legendary availability is by storing redundant copies of data in nodes known as replicas, usually running on commodity hardware. During normal operations, these replicas may end up having hardware issues causing them to fail. As a result, we need to replace them with new nodes on fresh hardware.

As part of this replacement operation, the new Cassandra node streams data from the neighboring nodes that hold copies of the data belonging to this new node’s token range. Depending on the amount of data stored, this process can require substantial network bandwidth, taking some time to complete. The longer these types of operations take, the more we are exposing ourselves to loss of availability. Depending on your replication factor and consistency requirements, if another node fails during this replacement operation, ability will be impacted.

Increasing Availability

To minimize the failure window, we want to make these operations as fast as possible. The faster the new node completes streaming its data, the faster it can serve traffic, increasing the availability of the cluster. Towards this goal, Cassandra 4.0 saw the addition of Zero Copy streaming. For more details on Cassandra’s zero copy implementation, see this blog post and CASSANDRA-14556 for more information.

Talking Numbers

To quantify the results of these improvements, we, at Netflix, measured the performance impact of streaming in 4.0 vs 3.0, using our open source NDBench benchmarking tool with the CassJavaDriverGeneric plugin. Though we knew there would be improvements, we were still amazed with the overall results of a five fold increase in streaming performance. The test setup and operations are all detailed below.

Test Setup

In our test setup, we used the following configurations:

  • 6-node clusters on i3.xl, i3.2xl, i3.4xl and i3.8xl EC2 instances, each on 3.0 and trunk (sha dd7ec5a2d6736b26d3c5f137388f2d0028df7a03).
  • Table schema
CREATE TABLE testing.test (
    key text,
    column1 int,
    value text,
    PRIMARY KEY (key, column1)
) WITH CLUSTERING ORDER BY (column1 ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
    AND compression = {'enabled': 'false'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
  • Data size per node: 500GB
  • No. of tokens per node: 1 (no vnodes)

To trigger the streaming process we used the following steps in each of the clusters:

  • terminated a node
  • add a new node as a replacement
  • measure the time taken to complete streaming data by the new node replacing the terminated node

For each cluster and version, we repeated this exercise multiple times to collect several samples.

Below is the distribution of streaming times we found across the clusters Benchmark results

Interpreting the Results

Based on the graph above, there are many conclusions one can draw from it. Some of them are

  • 3.0 streaming times are inconsistent and show high degree of variability (fat distributions across multiple samples)
  • 3.0 streaming is highly affected by the instance type and generally looks generally CPU bound
  • Zero Copy streaming is approximately 5x faster
  • Zero Copy streaming time shows little variability in its performance (thin distributions across multiple samples)
  • Zero Copy streaming performance is not CPU bound and remains consistent across instance types

It is clear from the performance test results that Zero Copy Streaming has a huge performance benefit over the current streaming infrastructure in Cassandra. But what does it mean in the real world? The following key points are the main take aways.

MTTR (Mean Time to Recovery): MTTR is a KPI (Key Performance Indicator) that is used to measure how quickly a system recovers from a failure. Zero Copy Streaming has a very direct impact here with a five fold improvement on performance.

Costs: Zero Copy Streaming is ~5x faster. This translates directly into cost for some organizations primarily as a result of reducing the need to maintain spare server or cloud capacity. In other situations where you’re migrating data to larger instance types or moving AZs or DCs, this means that instances that are sending data can be turned off sooner saving costs. An added cost benefit is that now you don’t have to over provision the instance. You get a similar streaming performance whether you use a i3.xl or an i3.8xl provided the bandwidth is available to the instance.

Risk Reduction: There is a great reduction in the risk due to Zero Copy Streaming as well. Since a Cluster’s recovery mainly depends on the streaming speed, Cassandra clusters with failed nodes will be able to recover much more quickly (5x faster). This means the window of vulnerability is reduced significantly, in some situations down to few minutes.

Finally, a benefit that we generally don’t talk about is the environmental benefit of this change. Zero Copy Streaming enables us to move data very quickly through the cluster. It objectively reduces the number and sizes of instances that are used to build Cassandra cluster. As a result not only does it reduce Cassandra’s TCO (Total Cost of Ownership), it also helps the environment by consuming fewer resources!

Scylla: four ways to optimize your disk space consumption

We recently had to face free disk space outages on some of our scylla clusters and we learnt some very interesting things while outlining some improvements that could be made to the ScyllaDB guys.

100% disk space usage?

First of all I wanted to give a bit of a heads up about what happened when some of our scylla nodes reached (almost) 100% disk space usage.

Basically they:

  • stopped listening to client requests
  • complained in the logs
  • wouldn’t flush commitlog (expected)
  • abort their compaction work (which actually gave back a few GB of space)
  • stay in a stuck / unable to stop state (unexpected, this has been reported)

After restarting your scylla server, the first and obvious thing you can try to do to get out of this situation is to run the nodetool clearsnapshot command which will remove any data snapshot that could be lying around. That’s a handy command to reclaim space usually.

Reminder: depending on your compaction strategy, it is usually not advised to allow your data to grow over 50% of disk space...

But that’s only a patch so let’s go down the rabbit hole and look at the optimization options we have.


Optimize your schemas

Schema design and the types your choose for your columns have a huge impact on disk space usage! And in our case we indeed overlooked some of the optimizations that we could have done from the start and that did cost us a lot of wasted disk space. Fortunately it was easy and fast to change.

To illustrate this, I’ll take a sample of 100,000 rows of a simple and naive schema associating readings of 50 integers to a user ID:

Note: all those operations were done using Scylla 3.0.3 on Gentoo Linux.

CREATE TABLE IF NOT EXISTS test.not_optimized
(
uid text,
readings list<int>,
PRIMARY KEY(uid)
) WITH compression = {};

Once inserted on disk, this takes about 250MB of disk space:

250M    not_optimized-00cf1500520b11e9ae38000000000004

Now depending on your use case, if those readings at not meant to be updated for example you could use a frozen list instead, which will allow a huge storage optimization:

CREATE TABLE IF NOT EXISTS test.mid_optimized
(
uid text,
readings frozen<list<int>>,
PRIMARY KEY(uid)
) WITH compression = {};

With this frozen list we now consume 54MB of disk space for the same data!

54M     mid_optimized-011bae60520b11e9ae38000000000004

There’s another optimization that we could do since our user ID are UUIDs. Let’s switch to the uuid type instead of text:

CREATE TABLE IF NOT EXISTS test.optimized
(
uid uuid,
readings frozen<list<int>>,
PRIMARY KEY(uid)
) WITH compression = {};

By switching to uuid, we now consume 50MB of disk space: that’s a 80% reduced disk space consumption compared to the naive schema for the same data!

50M     optimized-01f74150520b11e9ae38000000000004

Enable compression

All those examples were not using compression. If your workload latencies allows it, you should probably enable compression on your sstables.

Let’s see its impact on our tables:

ALTER TABLE test.not_optimized WITH compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'};
ALTER TABLE test.mid_optimized WITH compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'};
ALTER TABLE test.optimized WITH compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'};

Then we run a nodetool compact test to force a (re)compaction of all the sstables and we get:

63M     not_optimized-00cf1500520b11e9ae38000000000004
28M mid_optimized-011bae60520b11e9ae38000000000004
24M optimized-01f74150520b11e9ae38000000000004

Compression is really a great gain here allowing another 50% reduced disk space usage reduction on our optimized table!

Switch to the new “mc” sstable format

Since the Scylla 3.0 release you can use the latest “mc” sstable storage format on your scylla clusters. It promises a greater efficiency for usually a way more reduced disk space consumption!

It is not enabled by default, you have to add the enable_sstables_mc_format: true parameter to your scylla.yaml for it to be taken into account.

Since it’s backward compatible, you have nothing else to do as new compactions will start being made using the “mc” storage format and the scylla server will seamlessly read from old sstables as well.

But in our case of immediate disk space outage, we switched to the new format one node at a time, dropped the data from it and ran a nodetool rebuild to reconstruct the whole node using the new sstable format.

Let’s demonstrate its impact on our test tables: we add the option to the scylla.yaml file, restart scylla-server and run nodetool compact test again:

49M     not_optimized-00cf1500520b11e9ae38000000000004
26M mid_optimized-011bae60520b11e9ae38000000000004
22M optimized-01f74150520b11e9ae38000000000004

That’s a pretty cool gain of disk space, even more for the not optimized version of our schema!

So if you’re in great need of disk space or it is hard for you to change your schemas, switching to the new “mc” sstable format is a simple and efficient way to free up some space without effort.

Consider using secondary indexes

While denormalization is the norm (yep.. legitimate pun) in the NoSQL world this does not mean we have to duplicate everything all the time. A good example lies in the internals of secondary indexes if your workload can compromise with its moderate impact on latency.

Secondary indexes on scylla are built on top of Materialized Views that basically stores an up to date pointer from your indexed column to your main table partition key. That means that secondary indexes MVs are not duplicating all the columns (and thus the data) from your main table as you would have to do when denormalizing a table to query by another column: this saves disk space!

This of course comes with a latency drawback because if your workload is interested in the other columns than the partition key of the main table, the coordinator node will actually issue two queries to get all your data:

  1. query the secondary index MV to get the pointer to the partition key of the main table
  2. query the main table with the partition key to get the rest of the columns you asked for

This has been an effective trick to avoid duplicating a table and save disk space for some of our workloads!

(not a tip) Move the commitlog to another disk / partition?

This should only be considered as a sort of emergency procedure or for cost efficiency (cheap disk tiering) on non critical clusters.

While this is possible even if the disk is not formatted using XFS, it not advised to separate the commitlog from data on modern SSD/NVMe disks but… you technically can do it (as we did) on non production clusters.

Switching is simple, you just need to change the commitlog_directory parameter in your scylla.yaml file.

Speculative Query Executions with GoCql

Speculative query executions in Cassandra are an interesting solution for some use cases, such as faulty/slow/unresponsive nodes or network issues. This kind of execution allows a client to launch a DB request for multiple endpoints at the same time and let these requests compete for the winning response. This would only work if the query itself is defined as idempotent, that is, it renders the same result no matter how many times it is run. We’ve written about idempotence and speculative query execution before, so head over https://www.instaclustr.com/query-idempotence-in-gocql-driver/ to refresh your memory if needed.

In most cases, using speculative query execution would not lead to performance improvement in normal execution (even though in some edge cases it might), but improve the reliability of queries to get a response from the server. In other cases, it might improve overall execution time for queries, as getting the response from the fastest node does save time. One should also remember, that while this can improve the reliability of the queries or overall execution time, it will be at the cost of using more resources (mostly CPU/Network) to do so.

A few use cases for using speculative queries:

  1. A node you are querying is down and your SLAs require a response from the server within a timeframe that is lower than Cassandra timeout.
  2. A node is flaky, leading to inconsistent response times or dropped queries.
  3. A node is returning timeout errors requiring a client application to retry the query on another node.

Speculative query execution has been a feature of the Java driver for quite some time now, and we have recently included similar functionality into the GoCql driver. As mentioned above, it allows a user to run the same query on multiple hosts at the same time, letting the executions to compete for the winning spot. The first query to get a response will win and be returned to the client.

In order to use the speculative query execution with this change, one must define the query as Idempotent and provide an execution policy:

...
    cluster := gocql.NewCluster("192.168.1.1", "192.168.1.2", "192.168.1.3")
    sp := &SimpleSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond}
    session, err := cluster.CreateSession()
    // Build the query
    qry := session.Query("speculative").SetSpeculativeExecutionPolicy(sp).Idempotent(true)

As can be seen from the example above, we’ve used a SimpleSpeculativeExecution policy, which is the one implemented in the driver. It is a very simple policy, one that defines the number of additional executions (that is in addition to the original request), and the constant delay between these executions. One could implement their own policy easily for example, to have a policy that pauses incrementally longer between additional executions, one could build the following policy:

type IncreasingSpeculativeExecution struct {
        NumAttempts  int
        TimeoutDelay time.Duration
}

func (sp *IncreasingSpeculativeExecution) Attempts() int        { return sp.NumAttempts }
func (sp *IncreasingSpeculativeExecution) Delay() time.Duration {
    sp.TimeoutDelay += 50 * time.Millisecond
    return sp.TimeoutDelay
}

And then use it in the query execution:

...
    cluster := gocql.NewCluster("192.168.1.1", "192.168.1.2", "192.168.1.3")
    sp := &IncreasingSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond}
    session, err := cluster.CreateSession()
    // Build the query
    qry := session.Query("speculative").SetSpeculativeExecutionPolicy(sp).Idempotent(true)
    …..

To show an example for using speculative query executions, we’ll use a 3-node Cassandra cluster. The use case that we’re going to explore is going to be a slow node, which we are going to simulate using a simple tc tool that comes as a part of the iproute2 package. Our example is going to be a bit extreme, but hopefully, it conveys the idea of when speculative queries might be useful.

To simulate a slow node, run the following command on one of the nodes:

sudo tc qdisc add dev eth0 root netem delay 250ms

This adds 250ms delay to all outbound packets for the given physical device (eth0 in the above case). Then we use the following client code to run the tests:

 

/* Before you execute the program, Launch `cqlsh` and execute:
create keyspace example with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
create table example.tweet(timeline text, id UUID, text int, PRIMARY KEY(id));
create index on example.tweet(timeline);
*/
package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/gocql/gocql"
)

type hostMetrics struct {
    attempts int
    latency  int
}

// The observer type to watch the queries data
type testQueryObserver struct {
    metrics map[string]*hostMetrics
    verbose bool
}

func (o *testQueryObserver) ObserveQuery(ctx context.Context, q gocql.ObservedQuery) {
    host := q.Host.ConnectAddress().String()
    curMetric := o.metrics[host]
    curAttempts := 0
    curLatency := 0
    if curMetric != nil {
        curAttempts = curMetric.attempts
        curLatency = curMetric.latency
    }
    if q.Err == nil {
        o.metrics[host] = &hostMetrics{attempts: q.Metrics.Attempts + curAttempts, latency: curLatency + int(q.Metrics.TotalLatency/1000000)}
    }
    if o.verbose {
        fmt.Printf("Observed query %q. Returned %v rows, took %v on host %q with %v attempts and total latency %v. Error: %q\n",
            q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.Attempts, q.Metrics.TotalLatency, q.Err)
    }
}

func (o *testQueryObserver) GetMetrics() {
    for h, m := range o.metrics {
        fmt.Printf("Host: %s, Attempts: %v, Avg Latency: %vms\n", h, m.attempts, m.latency/m.attempts)
    }
}

// Simple retry policy for attempting the connection to 1 host only per query
type RT struct {
    num int
}

func (rt *RT) Attempt(q gocql.RetryableQuery) bool {
    return q.Attempts() <= rt.num
}

func (rt *RT) GetRetryType(err error) gocql.RetryType {
    return gocql.Rethrow
}

func main() {

    specExec := flag.Bool("specExec", false, "Speculative execution")
    flag.Parse()

    // the number of entries to insert
    cycles := 10000

    // connect to the cluster
    cluster := gocql.NewCluster("...")
    cluster.Keyspace = "example"

    // the timeout of one of the nodes is very high, so let’s make sure we wait long enough
    cluster.Timeout = 10 * time.Second
    cluster.RetryPolicy = &RT{num: 3}
    session, err := cluster.CreateSession()
    if err != nil {
        log.Fatal(err)
    }
    defer session.Close()

    observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), verbose: false}
    rand.Seed(time.Now().UnixNano())
    for i := 0; i < cycles; i = i + 1 {
        r := rand.Intn(10000)
        u, _ := gocql.RandomUUID()
        query := session.Query(`INSERT INTO example.tweet (id, timeline, data) VALUES (?, 'me', ?)`, u, r).Observer(observer)
        // Create speculative execution policy with the timeout delay between following executions set to 10ms
        sp := &gocql.SimpleSpeculativeExecution{NumAttempts: 2, TimeoutDelay: 10 * time.Millisecond}
        // Specifically set Idempotence to either true or false to constrol normal/speculative execution
        query.SetSpeculativeExecutionPolicy(sp).Idempotent(*specExec)
        query.Exec()
    }

    // wait a sec before everything finishes
    <-time.After(1 * time.Second)

    // Print results
    fmt.Println("\n==========\n")
    observer.GetMetrics()
}

This code is also available at https://github.com/instaclustr/sample-GoCql-Speculative-Execution.

This client code will insert 10000 entries into the cluster. As we’re using random numbers for the key column (id), all the queries are expected to be distributed more or less evenly among the nodes. Now, when we start the cluster and execute the client, we notice the following:

admin@ip-10-0-17-222:~/go$ time go run spectest.go

==========

Host1: <ip>, Attempts: 3334, Avg Latency: 502ms
Host2: <ip> , Attempts: 3333, Avg Latency: 2ms
Host3: <ip>, Attempts: 3333, Avg Latency: 2ms

real    28m21.859s
user    0m2.920s
sys     0m1.828s

So it takes about half an hour to run the queries because one of the nodes has a constant delay of about half a second. When running with speculative execution mode, we get:

admin@ip-10-0-17-222:~/go$ time go run spectest.go --specExec

==========

Host2: <ip>, Attempts: 5000, Avg Latency: 1ms
Host3: <ip>, Attempts: 4999, Avg Latency: 2ms

real    1m24.493s
user    0m3.900s
sys     0m3.072s

Not only we don’t ever see the “problematic” node responding to the query, but all queries are also split between the 2 other “fast” nodes, taking only about a 1.5 minutes to complete.

As for the overhead of this improvement, there’s a jump in open sockets while running in speculative mode vs non-speculative:

Open sockets while running in speculative mode vs non-speculative

As can be seen, the number of open sockets jumps from about 147 to 153 (about 5% jump). There are no significant increases in CPU, memory or io utilisation.

This might be considered a good trade-off, but remember that this is what happens on a single, very simple client on a performant host (m4-2xlarge) that only ran this test. In other use cases, such as smaller instance, busy client, or client running many queries in parallel, this may have a more significant effect on system resources utilisation. As a result, it is highly recommended to test the benefits of using speculative queries in your pre-production environment before deploying in production.

After this feature was released, a few issues were identified and have already been fixed, thanks to the community of contributors. It is in good shape now and is ready for general use. Enhancements planned for this feature include running the query observer (useful for monitoring query executions) in the main goroutine, which will make it more convenient to debug failures in the client code.

 

The post Speculative Query Executions with GoCql appeared first on Instaclustr.

Anomalia Machina 9 – Anomaly Detection at Scale: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra

In the previous blog, we deployed the Anomalia Machina application in a production-like Kubernetes environment.  In this blog, we test it out and see many anomalies it can detect at scale on small Kafka and Cassandra Instaclustr production clusters.

Coming towards the end of our epic odyssey we now have a final challenge to overcome. Can we ride out the storm and scale the application?

Anomalia Machina - Poseidon stirs up a storm

Poseidon stirs up a storm

Kubernetes Resource Management (Replicas, Pods, Nodes)

I ended my initial Kubernetes experiments (Anomalia Machina 7) by accidentally exploding my Kubernetes cluster – I overloaded the Kubernetes worker nodes with too many Pods. This time we’ll try and scale correctly. As we discovered, Kubernetes by default assumes pods require no resources, so will (try) to create an infinite number of Pods on a finite number of Nodes. Here’s the Kubernetes resource management documentation. In the kubernetes configuration file, you can specify the minimum (requests) and maximum (limits) resources a Pod requires to run in terms of CPU and/or memory resources. This is how Pods are scheduled to Nodes:

When you create a Pod, the Kubernetes scheduler selects a node for the Pod to run on. Each node has a maximum capacity for each of the resource types: the amount of CPU and memory it can provide for Pods. The scheduler ensures that, for each resource type, the sum of the resource requests of the scheduled Containers is less than the capacity of the node.

If a Pod exceeds its resource limits then it might be evicted from the node. We won’t use limits.

It’s therefore easy enough to force Kubernetes to allocate only one Pod per Node, and at the same time prevent accidental overloading of the nodes, by setting the resources request to the number of cores each EC2 instance provides. If we pick a small instance size, with say 2 cores, and use a resource request of 2 (cpu: “2000m” below) this gives us the flexibility to scale up in small increments:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-deployment
  labels:
    app: consumer
spec:
  replicas: 1
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      containers:
      - name: consumer
        image: myimages/consumer:latest
        ports:
        - containerPort: 1235
        resources:
          requests:
            cpu: "2000m"

This also gives us the opportunity to tune the application so it’s scalable in units of 2 cores. Our anomaly detection application pipeline consists of a Kafka consumer and a Cassandra client (and detector algorithm) in two separate thread pools. After some experimentation, we settled on 1 thread for the Kafka consumer and 20 threads for the Cassandra client as optimal. Kafka consumers are fast and return many events per poll, and Kafka scales best with fewer consumers. However, the writes and reads to/from Cassandra and the detector algorithm are slower, so more threads are needed to keep up with the consumer.

This also means that each Pod has 1 Kafka consumer and at least one Cassandra connection (Cassandra uses connection pooling by default, so the 20 threads can use 1 or more connections depending on configuration). As we scale further, we need to ensure that the number of Kafka Partitions on the topic is >= the number of Pods (otherwise any excess Kafka consumers, and therefore Pods, will be idle). Cassandra doesn’t have a maximum connection limit, each Cassandra node will easily handle 100s of connections, and the Java client automatically manages the connection pool.

Anomaly Detection at Scale

Checking a few events for anomalies is easy, but what happens when there are many events to check?

Anomalia Machina - Sheeps

In order to scale the application, we need sufficient worker nodes/EC2 instances. As discovered in the last blog, Kubernetes by default doesn’t automatically manage the instances.

Unlike pods and services, a node is not inherently created by Kubernetes: it is created externally by cloud providers.

In AWS EKS the simplest way of increasing worker nodes is to manually increase the desired and maximum values in the worker nodes auto scaling group.  Is it possible to actually get autoscaling working? Yes, in theory. There are two aspects to Kubernetes autoscaling: Pods and Nodes.

The Horizontal Pod Autoscaler scales the number of Pods based on triggers such as CPU utilisation. As such it’s probably not a good fit for this benchmarking use case where we want each Pod to run flat out and use close to 100% of the resources available to it.  

The Cluster Autoscaler scales the worker nodes based on pending Pods.  Although the Cluster Autoscaler is the de facto standard for automatic scaling in Kubernetes, it is not part of the main release.  Here are some blogs on how to run the Cluster Autoscaler on AWS. And here’s a good blog explaining the differences between the scaling approaches.

One reason I ran into problems initially with pod scaling was that I naively assumed Kubernetes was more intelligent that it actually is (by default). I thought it could somehow automatically work out resource usage for Pods. Well, it turns out that I wasn’t completely crazy. There’s also a second Pod autoscaler called the Vertical Pod Autoscaler which sets resource requests on pod containers automatically based on historical usage. This is a good blog which covers all three autoscalers.

Anomalia Machina - Odysseus nasty encounter with Polyphemus

Odysseus had a nasty encounter with the giant shepherd Polyphemus but used the flock of sheep to escape the blinded Cyclops.

Results: 18,000 Anomaly Checks per second

Finally, we can reveal some results. I spun up (small) production sized Cassandra (version 3.11.3) and Kafka (version 2.1.0) clusters in the same region as my AWS EKS cluster. Here are the cluster and the worker nodes and producer instance details:

 

Cassandra Cluster

3 nodes x 8 cores = 24 cores (i3.2xlarge, 1900GB SSD, 61GB RAM, 8 cores)

Kafka Cluster

3 nodes x 4 cores = 12 cores (r4.xlarge-750, 750GB Disk, 30.5GB RAM, 4 cores)

Kubernetes Worker Nodes

10 nodes x 2 cores = 20 cores (c5.large)

Kafka Producer

1 node x 8 cores = 8 cores (m4.2xlarge)

 

For simplicity, I ran the Kafka producer flat out (around 2 million TPS) for long enough to create sufficient data (Billions of events) in Kafka to run the rest of the pipeline and produce consistent results. I then gradually increased the number of Pods from 1 to the maximum number of worker nodes available, 10, by increasing the number of replicas with this command:

kubectl scale deployment consumer-deployment --replicas=10

Using the Prometheus monitoring metrics, the maximum throughput obtained for the business level metric (Anomaly Checks per second) was 18,000 TPS, with all the worker nodes and Cassandra cluster at 100% CPU Utilisation. Not surprisingly, the throughput didn’t increase further with more worker nodes. The Kafka cluster had plenty of spare CPU capacity. Here’s a graph showing these results:

Anomalia Machina - Cassandra Cluster results

What’s also informative is to understand the throughput (events in and out) for each sub-system (Kafka load generator, Kafka cluster, Kubernetes cluster, and Cassandra cluster), and compute the total system throughput as the sum of the subsystem throughputs. This will give us extra insights such as, how much of the total workload each subsystem handles, and how many events in total are required to perform each anomaly check.

We assume a Kafka load generator rate of Z events per second, and a steady state (i.e. all subsystems keep up with this event rate). Specifically, this means that the business metric, Anomaly Checks per second (step 9 below) is also equal to Z.

The subsystem throughputs are: Kafka load generator (step 1: 1 x Z), Kafka cluster (steps 2, 3, 10: 3 x Z), Kubernetes cluster (steps 4, 5, 8, 9: 4 x Z), and Cassandra cluster (steps 6, 7: 2 x Z). The total system throughput is, therefore: (1 + 3 + 4 + 2) x Z = 10 x Z.  i.e. The system throughput is an order of magnitude higher than the business metric.

Anomalia Machina 9 - Kubernetes Cluster Throughput

For the initial small production cluster this gives a system throughput of: 18,000 x 10 = 180,000 events/s. This stacked bar graph shows the breakdown for the subsystems and the total. The load generator rate on the bottom is the same value as Anomaly Checks per second (18,000):

Anomalia Machina - System events

Next blog we’ll keep scaling the system and see how far we can go.

 

The post Anomalia Machina 9 – Anomaly Detection at Scale: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra appeared first on Instaclustr.

Virtual tables are coming in Cassandra 4.0

One of the exciting features coming in Cassandra 4.0 is the addition of Virtual Tables. They will expose elements like configuration settings, metrics, or running compactions through the CQL interface instead of JMX for more convenient access. This post explains what Virtual Tables are and walks through the various types that will be available in version 4.0.

Virtual Tables

The term “Virtual Tables” can be confusing, as a quick Google search may leave one under the impression that they are views that can be created through an DDL statement. In the context of Cassandra, however, Virtual Tables will be created and managed by Cassandra itself, with no possibility of creating custom ones through CQL.

They are not to be confused with Materialized Views either, which persist data from a base table into another table with a different primary key.

For Cassandra 4.0, virtual tables will be read only, trivially exposing data as CQL rows. Such data was (and will still be) accessible through JMX, which can be cumbersome to interact with and secure.

Two new keyspaces were added in Cassandra 4.0 to support Virtual Tables: system_views and system_virtual_schema.
The latter will contain schema information on the Virtual Tables, while the former will contain the actual tables.

cqlsh> select * from system_virtual_schema.tables;

 keyspace_name         | table_name    | comment
-----------------------+---------------+------------------------------
          system_views |        caches |                system caches
          system_views |       clients |  currently connected clients
          system_views |      settings |             current settings
          system_views | sstable_tasks |        current sstable tasks
          system_views |  thread_pools |                             
 system_virtual_schema |       columns |   virtual column definitions
 system_virtual_schema |     keyspaces | virtual keyspace definitions
 system_virtual_schema |        tables |    virtual table definitions

Neither of these keyspaces can be described through the DESCRIBE KEYSPACE command, so listing the rows in system_virtual_schema.tables is the only way to discover the Virtual Tables.

The tables themselves can be described as shown here:

cqlsh> describe table system_views.caches

CREATE TABLE system_views.caches (
    capacity_bytes bigint PRIMARY KEY,
    entry_count int,
    hit_count bigint,
    hit_ratio double,
    name text,
    recent_hit_rate_per_second bigint,
    recent_request_rate_per_second bigint,
    request_count bigint,
    size_bytes bigint
) WITH compaction = {'class': 'None'}
    AND compression = {};

Available Tables in 4.0

Since Apache Cassandra 4.0 was feature freezed in September 2018, we already have the definitive list of Virtual Tables that will land in that release.

caches

The caches virtual table displays the list of caches involved in Cassandra’s read path. It contains all the necessary information to get an overview of their settings, usage, and efficiency:

cqlsh> select * from system_views.caches;

 name     | capacity_bytes | entry_count | hit_count | hit_ratio | recent_hit_rate_per_second | recent_request_rate_per_second | request_count | size_bytes
----------+----------------+-------------+-----------+-----------+----------------------------+--------------------------------+---------------+------------
   chunks |       95420416 |          16 |       134 |  0.864516 |                          0 |                              0 |           155 |    1048576
 counters |       12582912 |           0 |         0 |       NaN |                          0 |                              0 |             0 |          0
     keys |       25165824 |          18 |        84 |  0.792453 |                          0 |                              0 |           106 |       1632
     rows |              0 |           0 |         0 |       NaN |                          0 |                              0 |             0 |          0

This information is currently available through the nodetool info command.

clients

The clients virtual tables will list all connected clients, with information such as the number of issued requests or what username it is using:

cqlsh> select * from system_views.clients;

 address   | port  | connection_stage | driver_name | driver_version | hostname  | protocol_version | request_count | ssl_cipher_suite | ssl_enabled | ssl_protocol | username
-----------+-------+------------------+-------------+----------------+-----------+------------------+---------------+------------------+-------------+--------------+-----------
 127.0.0.1 | 61164 |            ready |        null |           null | localhost |                4 |           146 |             null |       False |         null | anonymous
 127.0.0.1 | 61165 |            ready |        null |           null | localhost |                4 |           155 |             null |       False |         null | anonymous 

settings

The settings virtual table will list all configuration settings that are exposeable in the cassandra.yaml config file:

cqlsh> select * from system_views.settings limit 100;

@ Row 1
-------+-----------------------------------------------------------------------------------------------------------------------------------------------
 name  | allocate_tokens_for_keyspace
 value | null

@ Row 2
-------+-----------------------------------------------------------------------------------------------------------------------------------------------
 name  | audit_logging_options_audit_logs_dir
 value | /Users/adejanovski/.ccm/trunk/node1/logs/audit/

@ Row 3
-------+-----------------------------------------------------------------------------------------------------------------------------------------------
 name  | audit_logging_options_enabled
 value | false

@ Row 4
-------+-----------------------------------------------------------------------------------------------------------------------------------------------
 name  | audit_logging_options_excluded_categories
 value | 

@ Row 5
-------+-----------------------------------------------------------------------------------------------------------------------------------------------
 name  | audit_logging_options_excluded_keyspaces
 value | 
...
...
...
@ Row 17
-------+-----------------------------------------------------------------------------------------------------------------------------------------------
 name  | back_pressure_strategy
 value | org.apache.cassandra.net.RateBasedBackPressure{high_ratio=0.9, factor=5, flow=FAST}

@ Row 18
-------+-----------------------------------------------------------------------------------------------------------------------------------------------
 name  | batch_size_fail_threshold_in_kb
 value | 50

@ Row 19
-------+-----------------------------------------------------------------------------------------------------------------------------------------------
 name  | batch_size_warn_threshold_in_kb
 value | 5

@ Row 20
-------+-----------------------------------------------------------------------------------------------------------------------------------------------
 name  | batchlog_replay_throttle_in_kb
 value | 1024
...
...

Here, I’ve truncated the output, as there 209 settings exposed currently. There are plans to make this table writeable so that some settings can be changed at runtime as can currently be done through JMX. Such changes, of course, would need to be persisted in cassandra.yaml to survive a restart of the Cassandra process.

sstable_tasks

The sstable_tasks virtual table will expose currently running operations on SSTables like compactions, upgradesstables, or cleanup. For example:

cqlsh> select * from system_views.sstable_tasks ;

 keyspace_name | table_name  | task_id                              | kind       | progress | total     | unit
---------------+-------------+--------------------------------------+------------+----------+-----------+-------
    tlp_stress | sensor_data | f6506ec0-3064-11e9-95e2-b3ac36f635bf | compaction | 17422218 | 127732310 | bytes

These informations are currently available through the nodetool compactionstats command.

thread_pools

The thread_pools virtual table will display the metrics for each thread pool in Cassandra:

cqlsh> select * from system_views.thread_pools ;

 name                         | active_tasks | active_tasks_limit | blocked_tasks | blocked_tasks_all_time | completed_tasks | pending_tasks
------------------------------+--------------+--------------------+---------------+------------------------+-----------------+---------------
             AntiEntropyStage |            0 |                  1 |             0 |                      0 |               0 |             0
         CacheCleanupExecutor |            0 |                  1 |             0 |                      0 |               0 |             0
           CompactionExecutor |            0 |                  2 |             0 |                      0 |            3121 |             0
         CounterMutationStage |            0 |                 32 |             0 |                      0 |               0 |             0
                  GossipStage |            0 |                  1 |             0 |                      0 |           17040 |             0
              HintsDispatcher |            0 |                  2 |             0 |                      0 |               0 |             0
        InternalResponseStage |            0 |                  8 |             0 |                      0 |               0 |             0
          MemtableFlushWriter |            0 |                  2 |             0 |                      0 |              20 |             0
            MemtablePostFlush |            0 |                  1 |             0 |                      0 |              21 |             0
        MemtableReclaimMemory |            0 |                  1 |             0 |                      0 |              20 |             0
               MigrationStage |            0 |                  1 |             0 |                      0 |               0 |             0
                    MiscStage |            0 |                  1 |             0 |                      0 |               0 |             0
                MutationStage |            0 |                 32 |             0 |                      0 |               8 |             0
    Native-Transport-Requests |            1 |                128 |             0 |                      0 |             717 |             0
       PendingRangeCalculator |            0 |                  1 |             0 |                      0 |               6 |             0
 PerDiskMemtableFlushWriter_0 |            0 |                  2 |             0 |                      0 |              20 |             0
              ReadRepairStage |            0 |                  8 |             0 |                      0 |               0 |             0
                    ReadStage |            0 |                 32 |             0 |                      0 |              22 |             0
                  Repair-Task |            0 |         2147483647 |             0 |                      0 |               0 |             0
         RequestResponseStage |            0 |                  8 |             0 |                      0 |              22 |             0
                      Sampler |            0 |                  1 |             0 |                      0 |               0 |             0
     SecondaryIndexManagement |            0 |                  1 |             0 |                      0 |               0 |             0
           ValidationExecutor |            0 |         2147483647 |             0 |                      0 |               0 |             0
            ViewBuildExecutor |            0 |                  1 |             0 |                      0 |               0 |             0
            ViewMutationStage |            0 |                 32 |             0 |                      0 |               0 |             0

This information is currently available through the nodetool tpstats command.

Locality

Virtual Tables, regardless of the type, contain data that is specific to each node. They are not replicated, have no associated SSTables, and querying them will return the values of the coordinator (the node that the driver chooses to coordinate the request). They will also ignore the consistency level of the queries they are sent.

When interacting with Virtual Tables through cqlsh, results will come from the node that cqlsh connected to:

cqlsh> consistency ALL
Consistency level set to ALL.
cqlsh> select * from system_views.caches;

 name     | capacity_bytes | entry_count | hit_count | hit_ratio | recent_hit_rate_per_second | recent_request_rate_per_second | request_count | size_bytes
----------+----------------+-------------+-----------+-----------+----------------------------+--------------------------------+---------------+------------
   chunks |       95420416 |          16 |       134 |  0.864516 |                          0 |                              0 |           155 |    1048576
 counters |       12582912 |           0 |         0 |       NaN |                          0 |                              0 |             0 |          0
     keys |       25165824 |          18 |        84 |  0.792453 |                          0 |                              0 |           106 |       1632
     rows |              0 |           0 |         0 |       NaN |                          0 |                              0 |             0 |          0

(4 rows)

Tracing session: 06cb2100-3060-11e9-95e2-b3ac36f635bf

 activity                                                                 | timestamp                  | source    | source_elapsed | client
--------------------------------------------------------------------------+----------------------------+-----------+----------------+-----------
                                                       Execute CQL3 query | 2019-02-14 14:54:20.048000 | 127.0.0.1 |              0 | 127.0.0.1
 Parsing select * from system_views.caches; [Native-Transport-Requests-1] | 2019-02-14 14:54:20.049000 | 127.0.0.1 |            390 | 127.0.0.1
                        Preparing statement [Native-Transport-Requests-1] | 2019-02-14 14:54:20.049000 | 127.0.0.1 |            663 | 127.0.0.1
                                                         Request complete | 2019-02-14 14:54:20.049424 | 127.0.0.1 |           1424 | 127.0.0.1

When interacting through the driver, there is no simple way of selecting a single node as coordinator. The load balancing policy is responsible for this and it is set on the Cluster object, not on a per query basis.
For the Datastax Java Driver, a new feature was introduced to support selecting a specific node to ease up Virtual Tables access through JAVA-1917. It adds a setNode(Node node) method to the Statement class in order to forcefully designate the node responsible for the query, and “voilà”.
For the record, the same feature was added to the Python driver.

Beyond Apache Cassandra 4.0

The data that is currently missing from Virtual Tables are global and table level metrics such as latencies and throughputs (Cassandra exposes A LOT of table specific metrics beyond those two).
Rest assured that these are being worked on from two different approaches in CASSANDRA-14670 and CASSANDRA-14572, which were not ready in time for the feature freeze.

It will probably take some time for Virtual Tables to match the amount of data available through JMX but we are confident it will catch up eventually.
Convenient and secure CQL access to runtime metrics in Apache Cassandra will tremendously ease building tools like Reaper which currently rely on JMX.

SLAs – Watch Out for the Shades of Gray

The Service Level Agreement (SLA) is an integral part of an MSP’s (Managed Service Provider) business. Its purpose is to define the scope of services that the MSP offers, including:

  • guarantees on metrics relevant to their business and technology
  • customer responsibilities
  • issue management
  • compensation commitment when MSPs fail to deliver on the SLA.

It should set the customer’s expectations, be realistic and be crystal clear, with no scope for misinterpretation. Well, that’s how they are meant be. But unfortunately, in the quest for more sales, some MSPs tend to commit themselves to unrealistic SLAs. It’s tempting to buy into a service when an MSP offers you 100% availability. It is even more tempting when you see a compensation clause that gives you confidence in going ahead with that MSP. But hold on! Have you checked out the exclusion clauses? Have you checked out your responsibilities in order to get what you are entitled to in the SLA? Just as it is the MSP’s responsibility to define a crystal-clear SLA, it is the customer’s responsibility to thoroughly understand the SLA and be aware of every clause in it. That is how you will notice the shades of gray!

We have put together a list of things to look for in an SLA so that customers are aware of the nuances involved and avoid unpleasant surprises after signing on.

Priced SLAs

Some MSPs provide a baseline SLA for their service, and customers wishing to receive higher levels of commitment may need to fork out extra money.

At Instaclustr, we have a different take on this. We have four tiers of SLA — not to have customers pay more but because our SLA approach is based on the capability and limitations of the underlying technology we are servicing. The SLA tiers are based on the number of nodes in a cluster and a set of responsibilities that customers are prepared to commit to. Our customers do not pay extra for a higher level of SLA. They pay for the number of nodes in the cluster. With more nodes come the higher levels of SLA.

 

Compensation

While MSPs do their best in delivering on the SLA commitment, sometimes things go south.  In scenarios where an SLA metric is not delivered, MSPs provide a compensation. Typically, it is paid in credit or a cut in the monthly bill. Customers should look into how the compensation is calculated. Some compensate based on the length of downtime while others may compensate with a flat reduction in the monthly bill irrespective of the length of downtime.

Instaclustr compensates its customers fairly, with a flat reduction in their monthly bill no matter how small the downtime is. The exact rate varies based on the SLA tier the customer’s cluster falls in. For example, a customer with a cluster of 12 production nodes (Critical tier) is entitled to up to 100% of their monthly bill with each violation of availability SLA, capped at 30%, and each violation of latency SLA, capped at 10%. Every time we fail to deliver on an SLA metric, there is a big impact on Instaclustr’s business — but we like the challenge. Continuously improving uptime and performance is at the core of our business.

Issue Management

Another important factor customers have to look at in an SLA is how the MSP handles issues. Issue management commonly includes communication touchpoints, named contacts, escalation procedure, issue impact and severity levels, first-response time and, in some cases, resolution time. Customers should familiarize themselves with each of these aspects and make their internal teams aware of them.

Instaclustr customers can get this information in our Support Policy document, which gives clear details on issue management and sets the right expectations.

Customer Responsibilities

Although the primary purpose of an SLA is for MSPs to provide guarantees on key metrics such as availability and latency, MSPs usually add a “help us help you” clause. It basically means: in order for MSPs to uphold those SLA guarantees, customers have certain responsibilities that they have to commit to. If your SLA doesn’t have customer responsibilities recorded, talk to your MSP and get it clarified first thing.

Instaclustr SLA has a list of customer responsibilities that must be met in order for us to deliver on those SLA guarantees. Each SLA tier, which is based on the number of nodes in a cluster, has different levels of responsibilities. Basically, requiring customers to take on more responsibilities to receive higher level of SLA guarantees. For example, the “Small” tier for Kafka requires customers to maintain a minimum of 3 replicas per Kafka topic to get the SLA guarantees that the tier promises. While the “Critical” tier for Kafka cluster requires customers to not only maintain 3 replicas per topic but also maintain separate testing and production clusters. This transparency upfront avoids all the uncertainties and unpleasant surprises if an issue arises.

Service/Technology Dependent SLA

Instaclustr’s  SLA guarantees are defined on the basis of the technology under management and its cluster configuration. For instance, a customer with a Kafka cluster comprising 5 production nodes will be guaranteed 99.95% availability for writes but is given no guarantee for latency. However, if the same customer has another Kafka cluster with 12 production nodes (and meets other documented conditions), they will be guaranteed 99.999% availability and 99th percentile latency. Similarly, Cassandra clusters of different sizes come with different tiers of SLA—basically, the larger the cluster (more nodes), the higher the availability of data. This guarantee is backed by our experience in providing massive-scale technology as a fully managed service. Instaclustr simply offers the best SLA realistically possible for the technology and the cluster configuration.

If your MSP is promising a 100% availability SLA irrespective of the technology under management, its size and its configuration, that is simply not realistic. Be sure to check for exclusions and also review the compensation clause to make sure it is substantial.

Hold Harmless Clauses

This is probably the most important section to watch out for in an SLA. MSPs need to protect themselves from situations where an SLA guarantee isn’t met because of conditions outside their control. MSPs operate in several autonomous environments with several technologies and integrations. Even with best practices in place, sometimes something will go wrong due to no fault on the part of an MSP.

For example, Instaclustr has this clause: “All service levels exclude outages caused by non-availability of service at the underlying cloud provider region level or availability zone level in regions which only support two availability zones”. Clauses like this are critical from an MSP’s perspective as they ensure they aren’t vulnerable to conditions outside their control—simply because we can’t do anything about such large-scale disruptions in the underlying cloud infrastructure. However, an unreasonable exclusion would be to exclude failures of virtual machines (VM) in the cloud. With hundreds or thousands of VMs running on a cloud, it is normal to expect VM failures. The large-scale technologies we operate can easily be designed to handle VM (node) failures without hurting availability. If your MSP has a VM failure exclusion clause, it is time to have a conversation.

Another necessary exclusion is significant, unexpected changes in application behavior. An MSP has limited visibility into changes in your application environment (either business-level or technical) that may impact the delivered services. However, we do have the shared objective with our customers of maximising their cluster’s performance and availability. Communicating, and in some cases testing, significant changes before deploying on production is necessary to ensure the service can cope with the change. Instaclustr SLAs for latency exclude “unusual/unplanned workload on cluster caused by customer’s application”, and generally exclude “issues caused by customer actions including but not limited to attempting to operate a cluster beyond available processing or storage capacity”. Customers can avoid being impacted by these clauses by contacting our technical operations team ahead of significant changes to manage the risks.

It is common for MSPs to add a third-party exclusion clause. This means: if an issue is found to be in a third-party application they integrate with, and where they have no control, they are safeguarded. But Instaclustr does not have any third-party application related exclusion. Given that we operate in the open source technology space, adding a clause like this would be protecting ourselves in relation to any issue in Kafka or Cassandra—which is what our customers are paying us to manage. If your MSP has third-party application related exclusions, it is time to closely review them if you haven’t done so already.

Customers should understand these exclusions in depth and make sure they are not unreasonable. This will need thorough review from technical and legal teams.

It’s Not Just About Uptime

Over the years, uptime or availability has become the default term when discussing an SLA with MSPs. It is definitely the most important metric you would want covered. However, there are many other metrics that could be very valuable to a customer’s business. It is important to make sure that the SLA covers the right metrics—the ones that matter to the customer based on the application and the technology under management.

Instaclustr has always included availability and latency in SLAs, as they are two key metrics for the technologies we manage. We are glad to announce the recent addition of the Recovery Point Objective (RPO) metric to our SLA. We take daily backups, with an option of continuous backup (5-minute intervals), and our technical operations team has always treated data recovery as a top priority incident as it impacts customer’s business. So it just made sense to add an RPO guarantee to our SLA.

SLAs Are Negotiable

SLAs are a mutual contract—in fact, a legal agreement between two parties. Although it is primarily the MSP’s responsibility to draft the initial SLA, customers have the right to negotiate. Customers should look into it thoroughly and negotiate if something important to their business is missing before signing up for it. Instaclustr Sales and Customer success teams handle most of these discussions and we welcome any SLA requirements from customers and are open to negotiation.

SLAs Aren’t Everything

SLAs are a quantitative, contractual promise with financial penalties for failures. They are important for covering the most important aspects of service delivery. However, SLAs can never capture every aspect of delivery of a high-quality service that meets the customer’s needs. At Instaclustr, meeting our SLAs is a core focus but just the first step in providing a service that meets and exceeds our customers’ expectations. For example, ticket response time guarantees tell you how quickly you will get a first response to a ticket but not the quality of response. At Instaclustr we have expert engineers as the first responders to all tickets to provide the best quality response possible.

Instaclustr SLA Update

As part of our commitment to keep our SLAs relevant and realistic, we regularly review them on the basis of quantitative evidence. We have recently updated our SLAs to strengthen our commitments. Here is a summary of the changes.

  • Cassandra
    • Recovery Point Objective (RPO) is added as a new metric, stating: “We will maintain backups to allow a restoration of data with less than 24 hours data loss for standard backups and less than 5 minutes data loss for our Continuous Back-ups option. Should we fail to meet this Recovery Point Objective, you will be eligible for SLA credits of 100% of monthly fees for the relevant cluster. If you have undertaken restore testing of your cluster in the last 6 months (using our automated restore functionality) and can demonstrate that data loss during an emergency restore is outside target RPO and your verification testing, then you will be eligible for SLA credits of 500% of monthly fees”.
    • Starter plan: increase from 99.9% to 99.95% for availability target with best effort.
    • Small plan: 99.9% availability for consistency ONE has been increased to 99.95% availability for consistency LOCAL_QUORUM.
    • Enterprise plan: 99.95% availability has been increased to 99.99%.
  • Kafka
    • Starter plan: increase from 99.9% to 99.95% for availability target with best effort.
    • Small plan: increase from 99.9% availability to 99.95%.
    • Enterprise plan: increase from 99.95% availability to 99.99%.
    • Critical plan: increase from 99.99% availability to 99.999%.

 

More details and the complete SLA can be found on the Policy documentation page on the Instaclustr website.

The post SLAs – Watch Out for the Shades of Gray appeared first on Instaclustr.

Upgraded Compute Intensive Node Sizes for AWS Clusters

Last year, Instaclustr upgraded its support for high throughput, compute intensive nodes on top of the AWS platform. This involved switching away from the aging c3 instance types and onto the newer c5d instance type. The new c5d instances provide better performance, better compute power and is better value for money than the c3 instances.

The new Instaclustr node type is based off the c5d.2xlarge compute image and provisioned with 8 vCPUs, 16GiB RAM and 200GB NVMe SSD storage. These nodes will be suitable for high throughput, low latency deployments of Apache Cassandra.

But how much better are the c5d instances? We ran our standard Cassandra benchmarking tests on c3 and c5d instances running Cassandra clusters to find out. These tests cover a combination of pure read, pure write and mixed read/write operations, with varying data sizes to evaluate Cassandra’s performance on these node sizes.

Below are the results for our basic latency and throughput tests on clusters with 3 nodes, running Cassandra version 3.11.3.

Latency

We measure latency by performing tests with a static number of operations per second across all test clusters. This number of operations is calibrated to represent a moderate load on the least powerful cluster under test. This shows a difference in the amount of time taken to process the same number of requests.

Small Operations

For small sized operations, the c3’s showed 87% higher median latency on insert operations, and 136% higher median latency on read operations.

Latency - Small Operations

Medium Operations

Medium sized operations show 33% higher latency on inserts on c3s, stretching out to 90% higher latency for read operations.

Latency - Medium Operations

Throughput

Then we measure a clusters throughput, by starting at a small number of operations per seconds and increasing them until the cluster starts to become overloaded (indicated by median latency exceeding 20ms for reads, 5ms for writes, or pending compactions being more than 20 on minute after load completes). This tells us if a cluster is able to perform more or fewer requests, in the same amount of time.

Small Operations

For small sized operations, we can see that c5d nodes allow for an 80% improvement in insert operations per second and a huge 125% improvement in write operations per second.

Throughput - Small Operations

Medium Operations

While for medium data sizes we have the slightly more modest 33% improvement in insert operations per second and the still significant 93% improvement in read operations per second.

Throughput - Medium Operations

Conclusion

Comparing those performance figures against the modest additional price of c5d’s (c5d’s are approximately 15% more expensive, with some variance depending on the region), we can see that c5d’s represent strong value for money improvement over the c3s.

Instaclustr supports the c5d instances in the following AWS regions:

  • US-East-1 (N. Virginia)
  • US-East-2 (Ohio)
  • US-West-1 (Northern California)
  • US-West-2 (Oregon)
  • Europe-Central-1 (Frankfurt)
  • Europe-West-1 (Ireland)
  • Europe-West-2 (London)
  • Asia-Pacific-Southeast-1 (Singapore)

If you already have a c3 cluster with Instaclustr, don’t worry, these will continue to be supported as normal.

Instaclustr supports a range of machine size configurations across all its cloud providers to make sure there is a size perfect for your needs. To see what we’re offering with the new c5d nodes or any of our other existing nodes sizes, visit our console or contact sales@instaclustr.com.

The post Upgraded Compute Intensive Node Sizes for AWS Clusters appeared first on Instaclustr.

Apache Cassandra - Data Center Switch

Did you ever wonder how to change the hardware efficiently on your Apache Cassandra cluster? Or did you maybe read this blog post from Anthony last week about how to set up a balanced cluster, found it as exciting as we do, but are still unsure how to change the number of vnodes?

A Data Center “Switch” might be what you need.

We will go through this process together hereafter. Additionally, If you’re just looking at adding or removing a data center from an existing cluster, these operations are described as part of the data center switch. I hope the structure of the post will make it easy for you to find the part you need!

Warning/Note: If you are unable to get hardware easily, this won’t work for you. The Data Center (DC) Switch is well adapted to cloud environments or for teams with spare/available hardware, as we will need to create a new data center before we can actually free up the previously used machines. Generally we use the same number of nodes in the new data center, but this is not mandatory. Thus, you’ll want to use a proportional number of machines to keep performance unchanged in case the hardware changes.

Definition

First things first, what is a “Data Center Switch” in our Apache Cassandra context?

The idea is to transition to a new data center, freshly added for this operation, and then to remove the old one. In between, clients need to be switched to the new data center.

Logical isolation / topology between data centers in Cassandra helps keep this operation safe and allows you to rollback the operation at almost any stage and with little effort. This technique does not generate any downtime when it is well executed.

Use Cases

A DC Switch can be used for changes that cannot be easily or safely performed within an existing data center, through JMX, or even with a rolling restart. It can also allow you to make changes such as modifying the number of vnodes in use for a cluster, or to change the hardware without creating unwanted transitional states where some nodes would have distinct hardware (or operating system).

It could also be used for a very critical upgrade. Note that with an upgrade it’s important to keep in mind that streaming in a cluster running mixed versions of Casandra is not recommended. In this case, it’s better to add the new Data Center using the same Cassandra version, feed it with the data from the old data center, and only then upgrade the Cassandra version in the new data center. When the new data center is considered stable, the clients can be routed to the new datacenter and the old one can be removed.

It might even fit your needs in some case I cannot think about right now. Once you know and understand this process, you might find yourself making original use of it.

Limitations and Risks

  • Again, this is a good fit for a cloud environment or when getting new servers. In some cases it might not be possible (or is not worth it) to double the hardware in use, even for a short period of time.
  • You cannot use the DC Switch to change global settings like the cluster_name, as this value is unique for the whole cluster.

How-to - DC Switch

This section provides a detailed runbook to perform a Data Center Switch.

We will cover the following aspects of a data center switch.

Phases described here are mostly independent and at the end of each phase the cluster should be in a stable state. You can run only phase 1 (add a DC) or phase 3 (remove a DC) for example. You should always read about and pay attention to the phase 0 though.

Rollback

Before we start, it is important to note that until the very last step, anything can be easily rolled back. You can always safely and quickly go back to the previous state during the procedure. Simply stated, you should be able to run the opposite commands to the one shown here, in the reverse order until the cluster is in an acceptable state. It’s good to check (and store!) the value for the configuration we are about to change, before any step.

Keep in mind that after each Phase below the cluster is in stable state and can remain like this for a long time or forever without problem.

Phase 0 - Prepare Configuration for Multiple Data Centers

First, the preparation phase. We want to make sure Cassandra and Clients will react as expected in a Multi-DC environment.

Server Side (Cassandra)

Step 1: All the keyspaces use NTS

  • Confirm that each user keyspace is using the NetworkTopologyStrategy:
$ cqlsh -e ​"DESCRIBE KEYSPACES;"​
$ cqlsh -e ​"DESCRIBE KEYSPACE <my_ks>;"​ | grep replication
$ cqlsh -e ​​"DESCRIBE KEYSPACE <my_other_ks>;"​ | grep replication
...

If not, change it to be NetworkTopologyStrategy.

  • Confirm that all the keyspaces (including system_*, possibly opscenter, …) also use either the NetworkTopologyStrategy or the LocalStrategy. To be clear:
    • keyspaces using SimpleStrategy (i.e. possibly system_auth, system_distributed, …) should be switched to NetworkTopologyStrategy.
    • system keyspace or any other keyspace, using LocalStrategy should not be changed.

Note: SimpleStrategy is not good in most cases and none of the keyspaces should use it in a Multi-DC context. This is because client operations would touch the distinct data centers to answer reads, breaking the expected isolation between the data centers.

$ cqlsh -e ​"DESCRIBE KEYSPACE system_auth;"​ | grep replication
...

In case you need to change this, do something like:

# Custom Keyspace
ALTER KEYSPACE tlp_labs WITH replication = {
  'class': 'NetworkTopologyStrategy',
  'old-dc': '3'
};
[...]

# system_* keyspaces from SimpleStrategy to NetworkTopologyStrategy
ALTER KEYSPACE system_auth WITH replication = {
  'class': 'NetworkTopologyStrategy',
  'old-dc': '3' # Or more... But it's not today's topic ;-)
};
ALTER KEYSPACE system_distributed WITH replication = {
  'class': 'NetworkTopologyStrategy',
  old-dc': '3': '3'
};
ALTER KEYSPACE system_traces WITH replication = {
  'class': 'NetworkTopologyStrategy',
  'old-dc': '3': '3'
};
[...]

Warning:

This process might have consequences on data availability, be sure to understand the consequences on the token ownership not to break the service availability.

You can avoid this problem by mirroring the previous distribution SimpleStrategy was producing. Using NetworkTopologyStrategy in combination with GossipingPropertyFileSnitch copying the previous SimpleStrategy/SimpleSnitch behaviour (data center and rack names), you should be able to make this change a ‘non-event’, where actually nothing happens from the Cassandra topology perspective.

In other words, if both topologies result in the same logical placement of the nodes, then there is no movement and no risk. If the operation results in a topology change, (ie 2 clusters considered previously as one for example) it’s good to consider the consequences ahead and to run a full repair after the transition.

Step 2:

Make sure the existing nodes are not using the SimpleSnitch.

Instead, the snitch must be one that considers the data center (and racks).

For example:

 endpoint_snitch: Ec2Snitch

or

 endpoint_snitch: GossipingPropertyFileSnitch

If your cluster is using SimpleSnitch at the moment, be careful changing this value, as you might induce a change in the topology or where the data belongs. It is worth reading in detail about this specific topic if that is the case.

That’s it for now on Cassandra, we are done configuring the server side.

Client Side (Cassandra)

With this, since there are many clients out there, you might have to adapt to your specific case or driver. I take the Datastax Java driver as an example here.

All the clients using the cluster should go through the checks/changes below.

Step 4: Use a ​DCAware​ policy and disable remote connections

Cluster cluster = Cluster.builder()
                                  .addContactPoint(​<ip_contact_list_from_old_dc>​)
                                  .withLoadBalancingPolicy(
                                          DCAwareRoundRobinPolicy.builder()
                                          .withLocalDc(​"<old_dc_name>"​)
                                          .withUsedHostsPerRemoteDc(​0​)
                                          .build()
                                  ).build();

Step 5: Pin connections to the existing data center

It’s important to use a consistency level that aims at retrieving the data from the current data center and not across the whole cluster. In general, use a consistency of the form: LOCAL_*. If you were using QUORUM before, change it to LOCAL_QUORUM for example.

At this point, all the clients should be ready to receive the new data center. Clients should now ignore the new data center’s nodes and only contact the local data center as defined above.

Phase 1 - Add a New Data Center

Step 6: Create and configure new Cassandra nodes

Choose the right hardware and number of nodes for the new data center, then bring the machines up.

Configure Cassandra nodes exactly like the old nodes except for those configuration that you intended to change with the new DC along with the data center name. The data center name is defined depending on the Snitch you picked. It can either be determined by the IP address, a File or AWS region name.

To perform this change using GossipingPropertyFileSnitch, edit the cassandra-rackdc.properties file on all nodes:

dc=<new_dc>
...

To create a new data center in the same region in AWS, you have to set the dc_suffix option in the cassandra-rackdc.properties file on all nodes:

# to have us-east-1-awesome-cluster for example
dc_suffix=-awesome-cluster

Step 7: Add new nodes to the cluster

Nodes can now be added, one at the time, just start Cassandra using the service start method that is specific to your operating system. For example on my linux systems we can run:

service cassandra ​start

Notes:

  • Start with the seed nodes for this new data center. Using two or three nodes as seeds per DC is a standard recommendation.
    -seeds: “<​old_ip1​>, <​​old_​ip2​>, <​old_​ip3​>, <​new_ip1​>, <​new_​ip2​>, <​new_​ip3​>”
    
  • There should be no streaming, adding a node should be quick - check the logs, to make sure of it. tail ​-fn 100 /​var​/​log​/cassandra/system.log
  • Due to the previous point, the nodes should join quickly as part of the new data center, check nodetool status. Make sure a node appears as UN before moving to the next node.

Step 8: Start accepting writes on the new data center

The next step is to accept writes for this data center by changing the topology so the new DC is also part of replication strategy:

ALTER​ KEYSPACE <my_ks> ​WITH​ ​replication​ = {
        'class': 'NetworkTopologyStrategy',
        '<old_dc_name>': '<replication_factor>',
        '<my_new_dc>': '<replication_factor>'
};
ALTER​ KEYSPACE <my_other_ks> ​WITH​ ​replication​ = {
        'class': 'NetworkTopologyStrategy',
        '<old_dc_name>': '<replication_factor>',
        '<my_new_dc>': '<replication_factor>'
};
[...]

Include system keyspaces that should now be using the NetworkTopologyStrategy for replication:

ALTER KEYSPACE system_auth WITH replication = {
  'class': 'NetworkTopologyStrategy',
  '<old_dc_name>': '<replication_factor>',
  '<new_dc_name>': '<replication_factor>'
};
ALTER KEYSPACE system_distributed WITH replication = {
  'class': 'NetworkTopologyStrategy',
  '<old_dc_name>': '<replication_factor>',
  '<new_dc_name>': '<replication_factor>'
};
ALTER KEYSPACE system_traces WITH replication = {
  'class': 'NetworkTopologyStrategy',
  '<old_dc_name>': '<replication_factor>',
  '<new_dc_name>': '<replication_factor>'
};
[...]

Note: Here again, do not modify tables using LocalStrategy.

To make sure that the keyspaces were altered as expected, you can see the ownership with:

nodetool​ status <my_ks>

Note: this is a a good moment to detect any imbalances in ownership and fix any issue there, before actually streaming the data to the new data center. We detailed this part in the post “How To Set Up A Cluster With Even Token Distribution” that Anthony wrote. You should really check this if you plan to use a low number of vnodes, if not, you might go through an operational nightmare trying to handle imbalances.

Step 9: Stream historical data to the new data center

Stream the historical data to the new data center to fill the gap, as the new cluster is now receiving writes, but is still missing all the past data. This is done by running this on all the nodes of the new data center:

nodetool​ rebuild old_dc_name

The output (in the logs) should look like:

INFO  [RMI TCP Connection(8)-192.168.1.31] ... - rebuild from dc: ..., ..., (All tokens)
INFO  [RMI TCP Connection(8)-192.168.1.31] ... - [Stream ...] Executing streaming plan for Rebuild
INFO  [StreamConnectionEstablisher:1] ... - [Stream ...] Starting streaming to ...
...
INFO  [StreamConnectionEstablisher:2] ... - [Stream ...] Beginning stream session with ...

Note:

  • nodetool​ setstreamthroughput X can help reducing the burden caused by the streaming on the nodes answering requests or, the other wait around, to make the transfer faster.
  • A good way to know the query finished is to run the command above from a screen or using tmux for example screen -R rebuild.

Phase 2 - Switch Clients to the new DC

At this point the new data center can be tested and should be a mirror of the previous one, except for the things you changed of course.

Step 10: Client Switch

The clients can now be routed to the new data center. To do so, change the contact point and the data center name. Doing this one client at the time, while observing impacts, is probably the safest way when there are many clients plugged to a single cluster. Back to our Java driver example, it would now look like this:

Cluster cluster = Cluster.builder()
                                  .addContactPoint(​<ip_contact_list_from_new_dc>​)
                                  .withLoadBalancingPolicy(
                                          DCAwareRoundRobinPolicy.builder()
                                          .withLocalDc(​"<new_dc_name>"​)
                                          .withUsedHostsPerRemoteDc(​0​)
                                          .build()
                                  ).build();

Note: Before going forward you can (and probably should) make sure that no client is connected to old nodes anymore. You can do this in a number of ways:

  • Look at netstats for opened (native or thrift) connections:
    netstat -tupawn | grep -e 9042 -e 9160
    
  • Check that the node does not receive local reads (i.e. ReadStage should not increase) and does not act as a coordinator (i.e. RequestResponseStage should not increase).
    watch -d "nodetool tpstats"
    
  • Monitoring system/Dashboards: Ensure there are no local reads heading to the old data center.

Phase 3 - Remove the old DC

Step 11: Stop replicating data in the old data center

Alter the keyspaces so they no longer reference the old data center:

ALTER​ KEYSPACE <my_ks> ​WITH​ ​replication​ = {
        'class': 'NetworkTopologyStrategy',
        '<my_new_dc>': '<replication_factor>'
};
ALTER​ KEYSPACE <my_other_ks> ​WITH​ ​replication​ = {
        'class': 'NetworkTopologyStrategy',
        '<my_new_dc>': '<replication_factor>'
};

[...]

ALTER KEYSPACE system_auth WITH replication = {
  'class': 'NetworkTopologyStrategy',
  '<new_dc_name>': '<replication_factor>'
};
ALTER KEYSPACE system_distributed WITH replication = {
  'class': 'NetworkTopologyStrategy',
  '<new_dc_name>': '<replication_factor>'
};
ALTER KEYSPACE system_traces WITH replication = {
  'class': 'NetworkTopologyStrategy',
  '<new_dc_name>': '<replication_factor>'
};

Step 12: Decommission old nodes

Finally we need to get rid of the old nodes. Stopping the nodes is not enough as Cassandra will continue to expect them to come back to life anytime. We want them out, safely, but once and for all. This is what a decommission does. To cleanly remove the old data center we need to decommission all the nodes in this data center, one by one.

On the bright side, the operation should be almost instantaneous (at least very quick), because this datacenter is not owning any data anymore from a Cassandra perspective. Thus, there should be no data to stream to other nodes. If streaming happens, you probably forgot about a keyspace using SimpleStrategy or NetworkTopologyStrategy that still uses the old data center.

Sequentially, on each node of the old data center, run:

$ nodetool decommission

This should be fast, not to say immediate as this command should trigger no streaming at all due to the changes we made in the keyspaces replication configuration. This data center should not own any token ranges anymore as we removed the data center from all the keyspaces, in the previous step.

Step 13: Remove old nodes from the seeds

To remove any reference to the old data center, we need to update the cassandra.yaml file.

-seeds: “<​new_ip1​>, <​new_​ip2​>, <​new_​ip3​>”

And that’s it! You have now successfully switched over to a new Data Center. Of course during the process, ensure that the changes you just made are actually acknowledged in this new data center.

How To Set Up A Cluster With Even Token Distribution

Apache Cassandra is fantastic for storing large amounts of data and being flexible enough to scale out as the data grows. This is all fun and games until the data that is distributed in the cluster becomes unbalanced. In this post we will go through how to set up a cluster with predictive token allocation using the allocate_tokens_for_keyspace setting, which will help to evenly distribute the data as it grows.

Unbalanced clusters are bad mkay

An unbalanced load on a cluster means that some nodes will contain more data than others. An unbalanced cluster can be caused by the following:

  • Hot spots - by random chance one node ends up responsible for a higher percentage of the token space than the other nodes in the cluster.
  • Wide rows - due to data modelling issues, for example a partition row which grows significantly larger than the other rows in the data.

The above issues can have a number of impacts on individual nodes in the cluster, however this is a completely different topic and requires a more detailed post. In summary though, a node that contains disproportionately more tokens and/or data than other nodes in the cluster may experience one or more of the following issues:

  • Run out storage more quickly than the other nodes.
  • Serve more requests than the other nodes.
  • Suffer from higher read and write latencies than the other nodes.
  • Time to run repairs is longer than other nodes.
  • Time to run compactions is longer than other nodes.
  • Time to replace the node if it fails is longer than other nodes.

What about vnodes, don’t they help?

Both issues that cause data imbalance in the cluster (hot spots, wide rows) can be prevented by manual control. That is, specify the tokens using the initial_token setting in the casandra.yaml file for each node and ensure your data model evenly distributes data across the cluster. The second control measure (data modelling) is something we always need to do when adding data to Cassandra. The first point however, defining the tokens manually, is cumbersome to do when maintaining a cluster, especially when growing or shrinking it. As a result, token management was automated early on in Cassandra (version 1.2 - CASSANDRA-4119) through the introduction of Virtual Nodes (vnodes).

Vnodes break up the available range of tokens into smaller ranges, defined by the num_tokens setting in the cassandra.yaml file. The vnode ranges are randomly distributed across the cluster and are generally non-contiguous. If we use a large number for num_tokens to break up the token ranges, the random distribution means it is less likely that we will have hot spots. Using statistical computation, the point where all clusters of any size always had a good token range balance was when 256 vnodes were used. Hence, the num_tokens default value of 256 was the recommended by the community to prevent hot spots in a cluster. The problem here is that the performance for operations requiring token-range scans (e.g. repairs, Spark operations) will tank big time. It can also cause problems with bootstrapping due to large numbers of SSTables generated. Furthermore, as Joseph Lynch and Josh Snyder pointed out in a paper they wrote, the higher the value of num_tokens in large clusters, the higher the risk of data unavailability .

Token allocation gets smart

This paints a pretty grim picture of vnodes, and as far as operators are concerned, they are caught between a rock and hard place when selecting a value for num_tokens. That was until Cassandra version 3.0 was released, which brought with it a more intelligent token allocation algorithm thanks to CASSANDRA-7032. Using a ranking system, the algorithm feeds in the replication factor of a keyspace, the number of tokens, and the partitioner, to derive token ranges that are evenly distributed across the cluster of nodes.

The algorithm is configured by settings in the cassandra.yaml configuration file. Prior to this algorithm being added, the configuration file contained the necessary settings to configure the algorithm with the exception of the one to specify the keyspace name. When the algorithm was added, the allocate_tokens_for_keyspace setting was introduced into the configuration file. The setting allows a keyspace name to be specified so that during the bootstrap of a node we query the keyspace for its replication factor and pass that to the token allocation algorithm.

However, therein lies the problem, for existing clusters updating this setting is easy, as a keyspace already exists, but for a cluster starting from scratch we have a chicken and egg situation. How do we specify a keyspace that doesn’t exist!? And there are other caveats, too…

  • It works for only a single replication factor. As long as all the other keyspaces are using the same replication as the one specified for allocate_tokens_for_keyspace all is fine. However, if you have keyspaces with a different replication factor they can potentially cause hot spots.
  • It works when nodes are only added to the cluster. The process for token distribution when a node is removed from the cluster remains unchanged, and hence can cause hot spots.
  • It works with only the default partitioner, Murmur3Partitioner.

Additionally, this is no silver bullet for all unbalanced clusters; we still need to make sure we have a data model that evenly distributes data across partitions. Wide partitions can still be an issue and no amount of token shuffling will fix this.

Despite these drawbacks, this feature gives us the ability to allocate tokens in a more predictable way whilst leveraging the advantage of vnodes. This means we can specify a small value for vnodes (e.g. 4) and still be able to avoid hot spots. The question then becomes, in the case of starting a brand new cluster from scratch, which comes first the chicken or the egg?

One does not simply start a cluster… with evenly distributed tokens

While it might be possible to rectify an unbalance cluster due to unfortunate token allocations, it is better for the token allocation to be set up correctly when the cluster is created. To set up a brand new cluster that takes advantage of the allocate_tokens_for_keyspace setting we need to use the following steps. The method below takes into account a cluster with nodes that spread across multiple racks. The examples used in each step, assumes that our cluster will be configured as follows:

  • 4 vnodes (num_tokens = 4).
  • 3 racks with a single seed node in each rack.
  • A replication factor of 3, i.e. one replica per rack.

1. Calculate and set tokens for the seed node in each rack

We will need to set the tokens for the seed nodes in each rack manually. This is to prevent each node from randomly calculating its own token ranges. We can calculate the token ranges that we will use for the initial_token setting using the following python code:

$ python

Python 2.7.13 (default, Dec 18 2016, 07:03:39)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> num_tokens = 4
>>> num_racks = 3
>>> print "\n".join(['[Node {}] initial_token: {}'.format(r + 1, ','.join([str(((2**64 / (num_tokens * num_racks)) * (t * num_racks + r)) - 2**63) for t in range(num_tokens)])) for r in range(num_racks)])
[Node 1] initial_token: -9223372036854775808,-4611686018427387905,-2,4611686018427387901
[Node 2] initial_token: -7686143364045646507,-3074457345618258604,1537228672809129299,6148914691236517202
[Node 3] initial_token: -6148914691236517206,-1537228672809129303,3074457345618258600,7686143364045646503

We can then uncomment the initial_token setting in the cassandra.yaml file in each of the seed nodes, set it to value generated by our python command, and set the num_tokens setting to the number of vnodes. When the node first starts the value for the initial_token setting will used, subsequent restarts will use the num_tokens setting.

Note that we need to manually calculate and specify the initial tokens for only the seed node in each rack. All other nodes will be configured differently.

2. Start the seed node in each rack

We can start the seed nodes one at a time using the following command:

$ sudo service cassandra start

When we watch the logs, we should see messages similar to the following appear:

...
INFO  [main] ... - This node will not auto bootstrap because it is configured to be a seed node.
INFO  [main] ... - tokens manually specified as [-9223372036854775808,-4611686018427387905,-2,4611686018427387901]
...

After starting the first of the seed nodes, we can use nodetool status to verify that 4 tokens are being used:

$ nodetool status
Datacenter: dc1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.36.11  99 KiB     4            100.0%            5d7e200d-ba1a-4297-a423-33737302e4d5  rack1

We will wait for this message appear in logs, then start the next seed node in the cluster.

INFO  [main] ... - Starting listening for CQL clients on ...

Once all seed nodes in the cluster are up, we can use nodetool ring to verify the token assignments in the cluster. It should look something like this:

$ nodetool ring

Datacenter: dc1
==========
Address        Rack        Status State   Load            Owns                Token
                                                                              7686143364045646503
172.31.36.11   rack1       Up     Normal  65.26 KiB       66.67%              -9223372036854775808
172.31.36.118  rack2       Up     Normal  65.28 KiB       66.67%              -7686143364045646507
172.31.43.239  rack3       Up     Normal  99.03 KiB       66.67%              -6148914691236517206
172.31.36.11   rack1       Up     Normal  65.26 KiB       66.67%              -4611686018427387905
172.31.36.118  rack2       Up     Normal  65.28 KiB       66.67%              -3074457345618258604
172.31.43.239  rack3       Up     Normal  99.03 KiB       66.67%              -1537228672809129303
172.31.36.11   rack1       Up     Normal  65.26 KiB       66.67%              -2
172.31.36.118  rack2       Up     Normal  65.28 KiB       66.67%              1537228672809129299
172.31.43.239  rack3       Up     Normal  99.03 KiB       66.67%              3074457345618258600
172.31.36.11   rack1       Up     Normal  65.26 KiB       66.67%              4611686018427387901
172.31.36.118  rack2       Up     Normal  65.28 KiB       66.67%              6148914691236517202
172.31.43.239  rack3       Up     Normal  99.03 KiB       66.67%              7686143364045646503

We can then move to the next step.

3. Create only the keyspace for the cluster

On any one of the seed nodes we will use cqlsh to create the cluster keyspace using the following commands:

$ cqlsh NODE_IP_ADDRESS -u ***** -p *****

Connected to ...
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cassandra@cqlsh>
cassandra@cqlsh> CREATE KEYSPACE keyspace_with_replication_factor_3
    WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 3}
    AND durable_writes = true;

Note that this keyspace can be any name, it can even be the keyspace that contains the tables we will use for our data.

4. Set the number of tokens and the keyspace for all remaining nodes

We will set the num_tokens and allocate_tokens_for_keyspace settings in the cassandra.yaml file on all of the remaining nodes as follows:

num_tokens: 4
...
allocate_tokens_for_keyspace: keyspace_with_replication_factor_3

We have assigned the allocate_tokens_for_keyspace value to be the name of keyspace created in the previous step. Note that at this point the Cassandra service on all other nodes is still down.

5. Start the remaining nodes in the cluster, one at a time

We can start the remaining nodes in the cluster using the following command:

$ sudo service cassandra start

When we watch the logs we should see messages similar to the following appear to say that we are using the new token allocation algorithm:

INFO  [main] ... - JOINING: waiting for ring information
...
INFO  [main] ... - Using ReplicationAwareTokenAllocator.
WARN  [main] ... - Selected tokens [...]
...
INFO  ... - JOINING: Finish joining ring

As per step 2 when we started the seed nodes, we will wait for this message to appear in the logs before starting the next node in the cluster.

INFO  [main] ... - Starting listening for CQL clients on ...

Once all the nodes are up, our shiny, new, evenly-distributed-tokens cluster is ready to go!

Proof is in the token allocation

While we can learn a fair bit from talking about the theory for the allocate_tokens_for_keyspace setting, it is still good to put it to the test and see what difference it makes when used in a cluster. I decided to create two clusters running Apache Cassandra 3.11.3 and compare the load distribution after inserting some data. For this test, I provisioned both clusters with 9 nodes using tlp-cluster and generated load using tlp-stress. Both clusters used 4 vnodes, but one of the clusters was setup using the even token distribution method described above.

Cluster using random token allocation

I started with a cluster that uses the traditional random token allocation system. For this cluster I set num_tokens: 4 and endpoint_snitch: GossipingPropertyFileSnitch in the cassandra.yaml on all the nodes. Nodes were split across three racks by specifying the rack in the cassandra-rackdc.properties file.

Once the cluster instances were up and Cassandra was installed, I started each node one at a time. After all nodes were started, the cluster looked like this:

ubuntu@ip-172-31-39-54:~$ nodetool status
Datacenter: dc1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.36.95   65.29 KiB  4            16.1%             4ada2c52-0d1b-45cd-93ed-185c92038b39  rack1
UN  172.31.39.79   65.29 KiB  4            20.4%             c282ef62-430e-4c40-a1d2-47e54c5c8685  rack2
UN  172.31.47.155  65.29 KiB  4            21.2%             48d865d7-0ad0-4272-b3c1-297dce306a34  rack1
UN  172.31.43.170  87.7 KiB   4            24.5%             27aa2c78-955c-4ea6-9ea0-3f70062655d9  rack1
UN  172.31.39.54   65.29 KiB  4            30.8%             bd2d745f-d170-4fbf-bf9c-be95259597e3  rack3
UN  172.31.35.165  70.36 KiB  4            25.5%             056e2472-c93d-4275-a334-e82f87c4b53a  rack3
UN  172.31.35.149  70.37 KiB  4            24.8%             06b0e1e4-5e73-46cb-bf13-626eb6ce73b3  rack2
UN  172.31.35.33   65.29 KiB  4            23.8%             137602f0-3248-459f-b07c-c0b3e647fa48  rack2
UN  172.31.37.129  99.03 KiB  4            12.9%             cd92c974-b32e-4181-9e14-fb52dd27b09e  rack3

I ran tlp-stress against the cluster using the command below. This generated a write-only load that randomly inserted 10 million unique key value pairs into the cluster. tlp-stress inserted data into a newly created keyspace and tabled called tlp_stress.keyvalue.

tlp-stress run KeyValue --replication "{'class':'NetworkTopologyStrategy','dc1':3}" --cl LOCAL_QUORUM --partitions 10M --iterations 100M --reads 0 --host 172.31.43.170

After running tlp-stress the cluster load distribution for the tlp_stress keyspace looked like this:

ubuntu@ip-172-31-39-54:~$ nodetool status tlp_stress
Datacenter: dc1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.36.95   1.29 GiB   4            20.8%             4ada2c52-0d1b-45cd-93ed-185c92038b39  rack1
UN  172.31.39.79   2.48 GiB   4            39.1%             c282ef62-430e-4c40-a1d2-47e54c5c8685  rack2
UN  172.31.47.155  1.82 GiB   4            35.1%             48d865d7-0ad0-4272-b3c1-297dce306a34  rack1
UN  172.31.43.170  3.45 GiB   4            44.1%             27aa2c78-955c-4ea6-9ea0-3f70062655d9  rack1
UN  172.31.39.54   2.16 GiB   4            54.3%             bd2d745f-d170-4fbf-bf9c-be95259597e3  rack3
UN  172.31.35.165  1.71 GiB   4            29.1%             056e2472-c93d-4275-a334-e82f87c4b53a  rack3
UN  172.31.35.149  1.14 GiB   4            26.2%             06b0e1e4-5e73-46cb-bf13-626eb6ce73b3  rack2
UN  172.31.35.33   2.61 GiB   4            34.7%             137602f0-3248-459f-b07c-c0b3e647fa48  rack2
UN  172.31.37.129  562.15 MiB  4            16.6%             cd92c974-b32e-4181-9e14-fb52dd27b09e  rack3

I verified the data load distribution by checking the disk usage on all nodes using pssh (parallel ssh).

ubuntu@ip-172-31-39-54:~$ pssh -ivl ... -h hosts.txt "du -sh /var/lib/cassandra/data"
[1] ... [SUCCESS] 172.31.35.149
1.2G    /var/lib/cassandra/data
[2] ... [SUCCESS] 172.31.43.170
3.5G    /var/lib/cassandra/data
[3] ... [SUCCESS] 172.31.36.95
1.3G    /var/lib/cassandra/data
[4] ... [SUCCESS] 172.31.39.79
2.5G    /var/lib/cassandra/data
[5] ... [SUCCESS] 172.31.35.33
2.7G    /var/lib/cassandra/data
[6] ... [SUCCESS] 172.31.35.165
1.8G    /var/lib/cassandra/data
[7] ... [SUCCESS] 172.31.37.129
564M    /var/lib/cassandra/data
[8] ... [SUCCESS] 172.31.39.54
2.2G    /var/lib/cassandra/data
[9] ... [SUCCESS] 172.31.47.155
1.9G    /var/lib/cassandra/data

As we can see from the above results, there was large load distribution across nodes. Node 172.31.37.129 held the smallest amount of data (roughly 560 MB), whilst node 172.31.43.170 held six times that amount of data (~ roughly 3.5 GB). Effectively the difference between the smallest and largest data load is 3.0 GB!!

Cluster using predictive token allocation

I then moved on to setting up the cluster with predictive token allocation. Similar to the previous cluster, I set num_tokens: 4 and endpoint_snitch: GossipingPropertyFileSnitch in the cassandra.yaml on all the nodes. These settings were common to all nodes in this cluster. Nodes were again split across three racks by specifying the rack in the cassandra-rackdc.properties file.

I set the initial_token setting for each of the seed nodes and started the Cassandra process on them one at a time. One seed node allocated to each rack in the cluster.

The initial keyspace that would be specified in the allocate_tokens_for_keyspace setting was created via cqlsh using the following command:

CREATE KEYSPACE keyspace_with_replication_factor_3 WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'} AND durable_writes = true;

I then set allocate_tokens_for_keyspace: keyspace_with_replication_factor_3 in the cassandra.yaml file for the remaining non-seed nodes and started the Cassandra process on them one at a time. After all nodes were started, the cluster looked like this:

ubuntu@ip-172-31-36-11:~$ nodetool status keyspace_with_replication_factor_3
Datacenter: dc1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.36.47   65.4 KiB   4            32.3%             5ece457c-a3af-4173-b9c8-e937f8b63d3b  rack2
UN  172.31.43.239  117.45 KiB  4            33.3%             55a94591-ad6a-48a5-b2d7-eee7ea06912b  rack3
UN  172.31.37.44   70.49 KiB  4            33.3%             93054390-bc83-487c-8940-b99e7b85e5c2  rack3
UN  172.31.36.11   104.3 KiB  4            35.4%             5d7e200d-ba1a-4297-a423-33737302e4d5  rack1
UN  172.31.39.186  65.41 KiB  4            31.2%             ecd00ff5-a90a-4d33-b7ab-bdd22e3e50b8  rack1
UN  172.31.38.137  65.39 KiB  4            33.3%             64802174-885a-4c04-b530-a9b4685b1b96  rack1
UN  172.31.40.56   65.39 KiB  4            33.3%             0846effa-e4ac-4a19-845e-2162cd2b7680  rack3
UN  172.31.36.118  104.32 KiB  4            35.4%             5ad47bc0-9bcc-4fc5-b5b0-0a15ad63345f  rack2
UN  172.31.41.196  65.4 KiB   4            32.3%             4128ca20-b4fa-4173-88b2-aac62539a6d8  rack2

I ran tlp-stress against the cluster using the same command that was used to test the cluster with random token allocation. After running tlp-stress the cluster load distribution for the tlp_stress keyspace looked like this:

ubuntu@ip-172-31-36-11:~$ nodetool status tlp_stress
Datacenter: dc1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.36.47   2.16 GiB   4            32.3%             5ece457c-a3af-4173-b9c8-e937f8b63d3b  rack2
UN  172.31.43.239  2.32 GiB   4            33.3%             55a94591-ad6a-48a5-b2d7-eee7ea06912b  rack3
UN  172.31.37.44   2.32 GiB   4            33.3%             93054390-bc83-487c-8940-b99e7b85e5c2  rack3
UN  172.31.36.11   1.84 GiB   4            35.4%             5d7e200d-ba1a-4297-a423-33737302e4d5  rack1
UN  172.31.39.186  2.01 GiB   4            31.2%             ecd00ff5-a90a-4d33-b7ab-bdd22e3e50b8  rack1
UN  172.31.38.137  2.32 GiB   4            33.3%             64802174-885a-4c04-b530-a9b4685b1b96  rack1
UN  172.31.40.56   2.32 GiB   4            33.3%             0846effa-e4ac-4a19-845e-2162cd2b7680  rack3
UN  172.31.36.118  1.83 GiB   4            35.4%             5ad47bc0-9bcc-4fc5-b5b0-0a15ad63345f  rack2
UN  172.31.41.196  2.16 GiB   4            32.3%             4128ca20-b4fa-4173-88b2-aac62539a6d8  rack2

I again verified the data load distribution by checking the disk usage on all nodes using pssh.

ubuntu@ip-172-31-36-11:~$ pssh -ivl ... -h hosts.txt "du -sh /var/lib/cassandra/data"
[1] ... [SUCCESS] 172.31.36.11
1.9G    /var/lib/cassandra/data
[2] ... [SUCCESS] 172.31.43.239
2.4G    /var/lib/cassandra/data
[3] ... [SUCCESS] 172.31.36.118
1.9G    /var/lib/cassandra/data
[4] ... [SUCCESS] 172.31.37.44
2.4G    /var/lib/cassandra/data
[5] ... [SUCCESS] 172.31.38.137
2.4G    /var/lib/cassandra/data
[6] ... [SUCCESS] 172.31.36.47
2.2G    /var/lib/cassandra/data
[7] ... [SUCCESS] 172.31.39.186
2.1G    /var/lib/cassandra/data
[8] ... [SUCCESS] 172.31.40.56
2.4G    /var/lib/cassandra/data
[9] ... [SUCCESS] 172.31.41.196
2.2G    /var/lib/cassandra/data

As we can see from the above results, there was little variation in the load distribution across nodes compared to a cluster that used random token allocation. Node 172.31.36.118 held the smallest amount of data (roughly 1.83 GB) and nodes 172.31.43.239, 172.31.37.44, 172.31.38.137, and 172.31.40.56 held the largest amount of data (roughly 2.32 GB each). The difference between the smallest and largest data load being roughly 400 MB which is significantly less than the data size difference in the cluster that used random token allocation.

Conclusion

Having a perfectly balanced cluster takes a bit of work and planning. While there are some steps to set up and caveats to using the allocate_tokens_for_keyspace setting, the predictive token allocation is a definite must use when setting up a new cluster. As we have seen from testing, it allows us to take advantage of num_tokens being set to a low value without having to worry about hot spots developing in the cluster.

Testing Cassandra compatible APIs

In this quick blog post, I’m going to assess how the databases that advertise themselves as “Cassandra API-compatible” fare in the compatibility department.

But that is all I will do, only API testing, and not an extensive testing, just based on the APIs I see used often. Based on this, you can start building an informed decision on whether or not to change databases.

The contenders:

  • Apache Cassandra 4.0
    • Installation: Build from Source
  • Yugabyte – https://www.yugabyte.com/
    • “YCQL is a transactional flexible-schema API that is compatible with the Cassandra Query Language (CQL). “
    • Installation: Docker
  • ScyllaDB – https://www.scylladb.com/
    • “Apache Cassandra’s wire protocol, a rich polyglot of drivers, and integration with Spark, Presto, and Graph tools make for resource-efficient and performance-effective coding.”
    • Installation: Docker
  • Azure Cosmos – https://azure.microsoft.com/en-us/services/cosmos-db/
    • “Azure Cosmos DB provides native support for NoSQL and OSS APIs including MongoDB, Cassandra, Gremlin and SQL”
    • Installation: Azure Portal Wizard

All installations were done with the containers as they are provided. Cosmos DB used all defaults as they were provided by the wizard interface.

The CQL script used to test was this one: https://gist.github.com/cjrolo/f5f3cc02699c06ed1f4909d632d90f8f

What I’m not doing on this blog post: performance testing, feature comparison and everything else that is not testing the API. Those might all be more or less important for other use cases, but that is not the scope of this blog.

What was tested

In this test, the following CQL APIs were tested:

  1. Keyspace Creation
  2. Table Creation
  3. Adding a Column to a table (Alter table)
  4. Data Insert
  5. Data Insert with TTL (Time-to-live)
  6. Data Insert with LWT (Lightweight Transactions)
  7. Select Data
  8. Select data with a full table scan (ALLOW FILTERING)
  9. Creating a Secondary Index (2I)

Cassandra 4.0

  • All statements worked (as expected)

CosmosDB

LWT Not supported

ALLOW FILTERING Not supported

2i Is Not supported

Scylla

LWT Not supported

Yugabyte

2i Not Supported

Results Table

So, with these results, which are not a full comparison (I have left out other parts offered in these systems), you can decide if it is compatible enough for you.

Reaper 1.4 Released

Cassandra Reaper 1.4 was just released with security features that now expand to the whole REST API.

Security Improvements

Reaper 1.2.0 integrated Apache Shiro to provide authentication capabilities in the UI. The REST API remained fully opened though, which was a security concern. With Reaper 1.4.0, the REST API is now fully secured and managed by the very same Shiro configuration as the Web UI. Json Web Tokens (JWT) were introduced to avoid sending credentials over the wire too often. In addition spreaper, Reaper’s command line tool, has been updated to provide a login operation and manipulate JWTs.

The documentation was updated with all the necessary information to handle authentication in Reaper and even some samples on how to connect LDAP directories through Shiro.

Note that Reaper doesn’t support authorization features and it is impossible to create users with different rights.
Authentication is now enabled by default for all new installs of Reaper.

Configurable JMX port per cluster

One of the annoying things with Reaper was that it was impossible to use a different port for JMX communications than the default one, 7199.
You could define specific ports per IP, but that was really for testing purposes with CCM.
That long overdue feature has now landed in 1.4.0 and a custom JMX can be passed when declaring a cluster in Reaper:

Configurable JMX port

TWCS/DTCS tables blacklisting

In general, it is best to avoid repairing DTCS tables, as it can generate lots of small SSTables that could stay out of the compaction window and generate performance problems. We tend to recommend not to repair TWCS tables either, to avoid replicating timestamp overlaps betwen nodes that can delay the deletion of fully expired SSTables.

When using the auto-scheduler though, it is impossible to specify blacklists, as all keyspaces and all tables get automatically scheduled by Reaper.

Based on the initial PR of Dennis Kline that was then re-worked by our very own Mick, a new configuration setting allows automatically blacklisting of TWCS and DTCS tables for all repairs:

blacklistTwcsTables: false

When set to true, Reaper will discover the compaction strategy for all tables in the keyspace and remove any table with either DTCS or TWCS, unless they are explicitely passed in the list of tables to repair.

Web UI improvements

The Web UI reported decommissioned nodes that still appeared in the Gossip state of the cluster, with a Left state. This has been fixed and such nodes are not displayed anymore.
Another bug was the number of tokens reported in the node detail panel, which was nowhere near matching reality. We now display the correct number of tokens and clicking on this number will open a popup containing the list of tokens the node is responsible for:

Tokens

Work in progress

Work in progress will introduce the Sidecar Mode, which will collocate a Reaper instance with each Cassandra node and support clusters where JMX access is restricted to localhost.
This mode is being actively worked on currently and the branch already has working repairs.
We’re now refactoring the code and porting other features to this mode like snapshots and metric collection.
This mode will also allow for adding new features and permit Reaper to better scale with the clusters it manages.

Upgrade to Reaper 1.4.0

The upgrade to 1.4 is recommended for all Reaper users. The binaries are available from yum, apt-get, Maven Central, Docker Hub, and are also downloadable as tarball packages. Remember to backup your database before starting the upgrade.

All instructions to download, install, configure, and use Reaper 1.4 are available on the Reaper website.

How to build your very own Cassandra 4.0 release

Over the last few months, I have been seeing references to Cassandra 4.0 and some of its new features. When that happens with a technology I am interested in, I go looking for the preview releases to download and test. Unfortunately, so far, there are no such releases. But, I am still interested, so I’ve found it necessary to build my own Cassandra 4.0 release. This is in my humble opinion not the most desirable way to do things since there is no Cassandra 4.0 branch yet. Instead, the 4.0 code is on the trunk. So if you do two builds a commit or two apart, and there are typically at least three or four commits a week right now, you get a slightly different build. It is, in essence, a moving target.

All that said and done, I decided if I could do it, then the least I could do is write about how to do it and let everyone who wants to try it learn how to avoid a couple of dumb things I did when I first tried it.

Building your very own Cassandra 4.0 release is actually pretty easy. It consists of five steps:

  1. Make sure you have your prerequisites
    1. Java SDK 1.8 or Java 1.11 Open Source or Oracle
    2. Ant 1.8
    3. Git CLI client
    4. Python >=2.7<3.0
  2. Download the GIT repository
    1. git clone http://git-wip-us.apache.org/repos/asf/cassandra.git
  3. Build your new Cassandra release
    1. Cd cassandra
    2. Ant
  4. Run Cassandra
    1. Cd ./bin
    2. ./cassandra
  5. Have fun
    1. ./nodetool status
    2. ./cqlsh

I will discuss each step in a little bit more detail:

Step 1) Verify, and if necessary, install your prerequisites

For Java, you can confirm the JDK presence by typing in:


john@Lenny:~$javac -version
javac 1.8.0_191

For ant:


john@Lenny:~$ ant -version
Apache Ant(TM) version 1.9.6 compiled on July 20 2018

For git:


john@Lenny:~$ git --version
git version 2.7.4

For Python:


john@Lenny:~$ python --version
Python 2.7.12

If you have all of the right versions, you are ready for the next step. If not, you will need to install the required software which I am not going to go into here.

Step 2) Clone the repository

Verify you do not already have an older copy of the repository:


john@Lenny:~$ ls -l cassandra
ls: cannot access 'cassandra': No such file or directory

If you found a Cassandra directory, you will want to delete or move it or your current directory elsewhere. Otherwise:


john@Lenny:~$ git clone http://git-wip-us.apache.org/repos/asf/cassandra.git
Cloning into 'cassandra'...
remote: Counting objects: 316165, done.
remote: Compressing objects: 100% (51450/51450), done.
remote: Total 316165 (delta 192838), reused 311524 (delta 189005)
Receiving objects: 100% (316165/316165), 157.78 MiB | 2.72 MiB/s, done.
Resolving deltas: 100% (192838/192838), done.
Checking connectivity... done.
Checking out files: 100% (3576/3576), done.

john@Lenny:~$ du -sh *
294M cassandra

At this point, you have used up 294 MB on your host and you have an honest-for-real git repo clone on your host – in my case, a Lenovo laptop running Windows 10 Linux subsystem.

And your repository looks something like this:


  john@Lenny:~$ ls -l cassandra
  total 668
  drwxrwxrwx 1 john john    512 Feb  6 15:54 bin
  -rw-rw-rw- 1 john john    260 Feb  6 15:54 build.properties.default
  -rw-rw-rw- 1 john john 101433 Feb  6 15:54 build.xml
  -rw-rw-rw- 1 john john   4832 Feb  6 15:54 CASSANDRA-14092.txt
  -rw-rw-rw- 1 john john 390460 Feb  6 15:54 CHANGES.txt
  drwxrwxrwx 1 john john    512 Feb  6 15:54 conf
  -rw-rw-rw- 1 john john   1169 Feb  6 15:54 CONTRIBUTING.md
  drwxrwxrwx 1 john john    512 Feb  6 15:54 debian
  drwxrwxrwx 1 john john    512 Feb  6 15:54 doc
  -rw-rw-rw- 1 john john   5895 Feb  6 15:54 eclipse_compiler.properties
  drwxrwxrwx 1 john john    512 Feb  6 15:54 examples
  drwxrwxrwx 1 john john    512 Feb  6 15:54 ide
  drwxrwxrwx 1 john john    512 Feb  6 15:54 lib
  -rw-rw-rw- 1 john john  11609 Feb  6 15:54 LICENSE.txt
  -rw-rw-rw- 1 john john 123614 Feb  6 15:54 NEWS.txt
  -rw-rw-rw- 1 john john   2600 Feb  6 15:54 NOTICE.txt
  drwxrwxrwx 1 john john    512 Feb  6 15:54 pylib
  -rw-rw-rw- 1 john john   3723 Feb  6 15:54 README.asc
  drwxrwxrwx 1 john john    512 Feb  6 15:54 redhat
  drwxrwxrwx 1 john john    512 Feb  6 15:54 src
  drwxrwxrwx 1 john john    512 Feb  6 15:54 test
  -rw-rw-rw- 1 john john  17215 Feb  6 15:54 TESTING.md
  drwxrwxrwx 1 john john    512 Feb  6 15:54 tools

Step 3) Build your new Cassandra 4.0 release

Remember what I said in the beginning? There is no branch for Cassandra 4.0 at this point, so building from the trunk is quite simple:


john@Lenny:~$ cd cassandra
john@Lenny:~/cassandra$ ant
Buildfile: /home/john/cassandra/build.xml

BUILD SUCCESSFUL
Total time: 1 minute 4 seconds

That went quickly enough. Let’s take a look and see how much larger the directory has gotten:


john@Lenny:~$ du -sh *
375M cassandra

Our directory grew by 81MB pretty much all in the new build directory which now has 145 new files including ./build/apache-cassandra-4.0-SNAPSHOT.jar. I am liking that version 4.0 right in the middle of the filename.

Step 4) Start Cassandra up. This one is easy if you do the sensible thing


john@Lenny:~/cassandra$ cd ..
john@Lenny:~$ cd cassandra/bin
john@Lenny:~/cassandra/bin$ ./cassandra
john@Lenny:~/cassandra/bin$ CompilerOracle: dontinline org/apache/cassandra/db/Columns$Serializer.deserializeLargeSubset (Lorg/apache/cassandra/io/util/DataInputPlus;Lorg/apache/cassandra/db/Columns;I)Lorg/apache/cassandra/db/Columns;
CompilerOracle: dontinline org/apache/cassandra/db/Columns$Serializer.serializeLargeSubset (Ljava/util/Collection;ILorg/apache/cassandra/db/Columns;ILorg/apache/cassandra/io/util/DataOutputPlus;)V
CompilerOracle: dontinline org/apache/cassandra/db/Columns$Serializer.serializeLargeSubsetSize (Ljava/util/Collection;ILorg/apache/cassandra/db/Columns;I)I

INFO [MigrationStage:1] 2019-02-06 21:26:26,222 ColumnFamilyStore.java:407 - Initializing system_auth.role_members
INFO [MigrationStage:1] 2019-02-06 21:26:26,234 ColumnFamilyStore.java:407 - Initializing system_auth.role_permissions
INFO [MigrationStage:1] 2019-02-06 21:26:26,244 ColumnFamilyStore.java:407 - Initializing system_auth.roles

We seem to be up and running. Its time to try some things out:

Step 5) Have fun

We will start out making sure we are up and running by using nodetool to connect and display a cluster status. Then we will go into the CQL shell to see something new. It is important to note that since you are likely to have nodetool and cqlsh already installed on your host, you need to use the ./ in front of your commands to ensure you are using the 4.0 version. I have learned the hard way that forgetting the ./ can result in some very real confusion.


  john@Lenny:~/cassandra/bin$ ./nodetool status
  Datacenter: datacenter1
  =======================
  Status=Up/Down
  |/ State=Normal/Leaving/Joining/Moving
  --  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
  UN  127.0.0.1  115.11 KiB  256          100.0%            f875525b-3b78-49b4-a9e1-2ab0cf46b881  rack1
            
  john@Lenny:~/cassandra/bin$ ./cqlsh
  Connected to Test Cluster at 127.0.0.1:9042.
  [cqlsh 5.0.1 | Cassandra 4.0-SNAPSHOT | CQL spec 3.4.5 | Native protocol v4]
  Use HELP for help.
  cqlsh> desc keyspaces;

  system_traces  system_auth  system_distributed     system_views
  system_schema  system       system_virtual_schema

  cqlsh>

We got a nice cluster with one node and we see the usual built-in key spaces. Well um… not exactly. We see two new key spaces system_virtual_schema and system_views. Those look very interesting.

In my next blog, I’ll be talking more about Cassandra’s new virtual table facility and how very useful it is going to be someday soon. I hope.

Anomalia Machina 7 – Kubernetes Cluster Creation and Application Deployment

Kubernetes – Greek: κυβερνήτης = Helmsman

If you are Greek hero about to embark on an epic aquatic quest (encountering one eyed rock throwing monsters, unpleasant weather, a detour to the underworld, tempting sirens, angry gods, etc) then having a trusty helmsman is mandatory (Even though the helmsman survived the Cyclops, like all of Odysseus’s companions, he eventually came to a sticky end).

Anomalia Machina 7 - Odysseus and Polyphemus

Anomalia Machina 7 - Kubernetes

Introduction

A few months ago we decided to try Kubernetes to see how it would help with our quest to scale our Anomalia Machina application (a massively scalable Anomaly Detection application using Apache Kafka and Cassandra). This blog is a recap of our initial experiences (reported in this webinar) creating a Kubernetes cluster on AWS and deploying the application.

Kubernetes is an automation system for the management, scaling and deployment of containerized applications. Why is Kubernetes interesting?

  • It’s Open Source, and cloud provider (multi cloud) and programming language (polyglot programming) agnostic
  • You can develop and test code locally, then deploy at scale
  • It helps with resource management, you can deploy an application to Kubernetes and it manages application scaling
  • More powerful frameworks built on the Kubernetes APIs are becoming available (e.g. Istio)

Kubernetes and AWS EKS

Our goal was to try Kubernetes out on AWS using the newish AWS EKS, the Amazon Elastic Container Service for Kubernetes. Kubernetes is a sophisticated four year old technology, and it isn’t trivial to learn or deploy on a production cloud platform. To understand, set-up, and use it, you need familiarity with a mixture of Docker, AWS, security, networking, linux, systems administration, grid computing, resource management and performance engineering!  To get up to speed faster, I accepted an invitation to attend an AWS Get-it-done-athon in Sydney for a few days last year to try out AWS EKS. Here’s the documentation I used: EKS user guide, step by step guide, and a useful EKS workshop which includes some extensions.

 

At a high level Kubernetes has a Master (controller) and multiple (worker) Nodes.

Anomalia Machina 7 - Kubernetes Master (controller) and multiple (worker) Nodes.

Kubernetes has a lot of layers,  a bit like Russian nesting dolls. The smallest doll is the code, the Java classes. Then you have to package it as a Java JAR file, then turn the JAR into a Docker image and upload it to the Docker Hub. The Kubernetes layers include nodes (EC2 instances), pods which run on nodes, and deployments which have 1 or more pods. A Kubernetes Deployment (one of many controller types), declaratively tells the Control Plane what state (and how many) Pods there should be. Finally the control plane, the master, which runs everything. This is the focus of AWS EKS.

Anomalia Machina 7 - Kubernetes has a lot of layers, a bit like Russian nesting dolls

This is a high level overview of AWS EKS. It looks simple enough, but there are a few complications to solve before you can get your application running.

Anomalia Machina 7 - High level overview of AWS EKS

Regions

Anomalia Machina 7 - Toto, I have feeling we're not in Kansas anymore

“Toto, I’ve a feeling we’re not in Kansas anymore”

Each AWS EKS runs in an AWS region, and the worker nodes must run in the same region. When I created EKS last year there were only 3 supported regions. There are now lots more EKS regions. Unfortunately my Kafka and Cassandra test clusters were in Ohio, which was not one of the EKS regions at the time. Virginia was the nearest EKS region. Ohio is only 500 km away, so I hoped the application would work ok (at least for testing) across regions. However, running EKS in a different region to other parts of your application may be undesirable due to increased complexity, latency, and extra data charges. It also introduced some unnecessary complications for this experiment as well.

Kubernetes Snakes and Ladders

It turns out that there are lots of steps to get AWS EKS running, and they didn’t all work smoothly (the “snakes”), particularly with the cross region complications. Let’s roll the dice and test our luck.

Anomalia Machina 7 - Rolling Dice

Anomalia Machina 7 - Snakes and Ladder

First we have to prepare things:

1 – Create Amazon EKS service role (5 steps)

2 – Create your Amazon EKS Cluster VPC (12 steps)

3 – Install and Configure kubectl for Amazon EKS (5 steps)

4 – Install aws-iam-authenticator for Amazon EKS (10 steps)

5 – Download and Install the Latest AWS CLI (6 steps)

 

Many (38) steps and a few hours later we were ready to create our AWS EKS cluster.

 

6 – Create Your Amazon EKS Cluster (12 steps)

7 – Configure kubectl for Amazon EKS (3 steps)

 

Next, more steps to launch and configure worker nodes:

 

8 – Add key pairs to EKS region (extra steps due to multiple regions, down a snake)

9 – Launch and Configure Amazon EKS Worker Nodes (29 steps)

 

We finally got 3 worker nodes started.

 

10 – Deploying the Kubernetes Web GUI dashboard is a good idea if you want to see what’s going on (it runs on worker nodes). But then down another snake as the documentation wasn’t 100% clear (12 steps).

 

11 – Deploy the application (5 steps).  Does it work?

12 – Not yet… down another snake. Configure, debug, test application (30 steps)

 

After 100+ steps (and 2 days) we could finally ramp up our application on the Kubernetes cluster and test it out. It worked (briefly). The completed “game”, with steps (1-12) numbered, looked like this:

Anomalia Machina 7 - Snakes and ladder completed game with Kubernetes

Here’s the Kubernetes GUI dashboard with the 3 worker nodes and the anomaly detection application pipeline deployed. You can kill pods, and EKS magically replaces them. You can ask for more pods and EKS creates more – just don’t ask for too many!

Anomalia Machina 7 - Kubernetes GUI Dashboard

(Mis)understanding Kubernetes Scaling

Anomalia Machina 7 - (Mis)understanding Kubernetes Scaling

https://www.earthstoriez.com/turkey-nasreddin-tree-stories/

 

It’s useful to understand the basics of AWS EKS/Kubernetes scaling (I initially didn’t).  When you create a Kubernetes deployment you specify how many replicas (pods) you want. Pods are designed to run a single instance of an application and are the basic unit of concurrency in Kubernetes. They are also designed to be ephemeral – they can be started, killed, and restarted on different nodes. Once running, you can request more or less replicas.  I started out with 1 replica which worked ok.

 

I then wanted to see what happened if I requested more replicas. I had 3 worker nodes (EC2 instances), so was curious to see how many pods Kubernetes would allow – 1, 2, 3, or more?  (Note that some of the worker node resources are already used by Kubernetes for the GUI and monitoring). I soon found out that by default Kubernetes doesn’t limit the number of pods at all! I.e. it assumes pods use no resources, and allows more pods to be created even when all the worker nodes have run out of spare resources. As a result the worker nodes were soon overloaded, and the whole system became unresponsive – the Kubernetes GUI froze and I couldn’t delete pods. To try and get things back to a working state I manually stopped the EC2 worker instances from the AWS console.

 

Did this work? No. What I hadn’t realised was that the worker nodes are created as part of an auto scaling group.  

Anomalia Machina 7 - Auto Scaling Groups

To change the number of instances (and therefore nodes) you need to change the auto scaling group minimum and maximum sizes. The number of instances will scale elastically within these limits. After I manually stopped the worker instances I checked later and I found that (1) they had all started again, and (2) they were all saturated again (as Kubernetes was still trying to achieve the desired number of replicas for the deployment).

 

Therefore, another useful thing to understand about AWS EKS and Kubernetes is that Kubernetes resources are controlled declaratively by the higher layers, but AWS EC2 instances are controlled by AWS. If you need to kill off all the pods you have to change the number of desired replicas to 0 in the Kubernetes deployment.  But if you need more worker nodes you have to change the max/desired sizes in the workers autoscaling group, as Kubernetes cannot override these values. Apparently Kubernetes does not by default enable elastic node auto scaling using the workers auto scaling group. You need the Kubernetes Cluster autoscaler configured for this to work.

Further Resources

Here are a couple of other AWS EKS reviews:

 

Next blog(s) I’ll provide an update on using Kubernetes to scale the production version of the Anomalia Machina Application (with Prometheus and OpenTracing) and the results at scale.

The post Anomalia Machina 7 – Kubernetes Cluster Creation and Application Deployment appeared first on Instaclustr.

14 Things To Do When Setting Up a New Cassandra Cluster

Over the last few years, we’ve spent a lot of time reviewing clusters, answering questions on the Cassandra User mailing list, and helping folks out on IRC. During this time, we’ve seen the same issues come up again and again. While we regularly go into detail on individual topics, and encourage folks to read up on the fine print later, sometimes teams just need a starting point to help them get going. This is our basic tuning checklist for those teams who just want to get a cluster up and running and avoid some early potholes.

  1. Adjust The Number of Tokens Per Node

    When the initial work on vnodes in Cassandra was started on the mailing list, 256 tokens was chosen in order to ensure even distribution. This carried over into CASSANDRA-4119 and became part of the implementation. Unfortunately, there were several unforeseen (and unpredictable) consequences of choosing a number this high. Without spending a lot of time on the subject, 256 can cause issues with bootstrapping new nodes (lots of SSTables), repair takes longer, and CPU usage is overall higher. Since Cassandra 3.0, we’ve had the ability to allocate tokens in a more predictable manner that avoids hotspots and leverages the advantage of vnodes. We thus recommend using a value of 4 here, which gives a significant improvement in token allocation. In order to use this please ensure you’re using the allocate_tokens_for_keyspace setting in cassandra.yaml file. It’s important to do this up front as there’s not an easy way to change this without setting up a new data center and doing migration.

    Note: Using 4 tokens won’t allow you to add a single node to a cluster of 100 nodes and have each node get an additional 1% capacity. In practice there’s not much benefit from doing that, as you would always want to expand a cluster by roughly 25% in order to have a noticeable improvement, and 4 tokens allows for that.

  2. Configure Racks, Snitch, and Replication

    Cassandra has the ability to place data around the ring in a manner that ensures we can survive losing a rack, an availability zone, or entire data center. In order to do this correctly, it needs to know where each node is placed, so that it can place copies of data in a fault-tolerant manner according to the replication strategy. Production workloads should ALWAYS use the NetworkTopologyStrategy, which takes the racks and data centers into account when writing data. We recommend using GossipingPropertyFileSnitch as a default. If you’re planning on staying within a cloud provider it’s probably easier to use the dedicated snitch, such as the EC2Snitch, as they figure out their rack and data center automatically. These should all be set up before using the cluster for production as changing it is extremely difficult.

  3. Set up Internode Encryption

    We discuss how to do this in our guide on setting up internode encryption. I won’t go deep into the details here since Nate did a good job of that in his post. This falls in the “do it up front or you’ll probably never do it” list with #1 and #2.

  4. Set up client authentication.

    Using authentication for your database is a good standard practice, and pretty easy to set up initially. We recommend disabling the Cassandra user altogether once auth is set up, and increasing the replication factor (RF) of the system_auth keyspace to a few nodes per rack. For example, if you have 3 racks, use RF=9 for system_auth. It’s common to read folks advising increasing the system_auth to match the cluster size no matter how big it is, I’ve found this to be unnecessary. It’s also problematic when you increase your cluster size. If you’re using 1000 nodes per data center, you’ll need a higher setting, but it’s very unlikely your first cluster will be that big. Be sure to bump up the roles_validity_in_ms, permissions_validity_in_ms, and credentials_validity_in_ms settings (try 30 seconds) to avoid hammering those nodes with auth requests.

    Using authentication is a great start, but adding a layer of authorization to specific tables is a good practice as well. Lots of teams use a single Cassandra cluster for multiple purposes, which isn’t great in the long term but is fine for starting out. Authorization lets you limit what each user can do on each keyspace and that’s a good thing to get right initially. Cassandra 4.0 will even have the means of restricting users to specific data centers which can help segregate different workloads and reduce human error.

    The Security section of the documentation is worth reading for more details.

  5. Disable Dynamic Snitch

    Dynamic snitch is a feature that was intended to improve the performance of reads by preferring nodes which are performing better. Logically, it makes quite a bit of sense, but unfortunately it doesn’t behave that way in practice. In reality, we suffer from a bit of the Observer Effect, where the observation of the situation affects the outcome. As a result, the dynamic snitch generates quite a bit of garbage, so much in fact that using it makes everything perform significantly worse. By disabling it, we make the cluster more stable overall and end up with a net reduction in performance related problems. A fix for this is actively being worked on for Cassandra 4.0 in CASSANDRA-14459.

  6. Set up client encryption

    If you’re going to set up authentication, it’s a good idea to set up client encryption as well, or else everything (including authentication credentials) is sent over cleartext.

    Again, the Security section of the documentation is worth reading for more details on this, I won’t rehash what’s there.

  7. Increase counter cache (if using counters)

    Counters do a read before write operation. The counter cache allows us to skip reading the value off disk. In counter heavy clusters we’ve seen a significant performance improvement by increasing the counter cache. We’ll dig deeper into this in a future post and include some performance statistics as well as graphs.

  8. Set up sub range repair (with Reaper)

    Incremental repair is a great idea but unfortunately has been the cause of countless issues, which we discussed in our blog previously. Cassandra 4.0 should fix the remaining issues with incremental repair by changing the way anti compaction works.

    Read up on the details in CASSANDRA-9143

  9. Setup Monitoring

    Without good monitoring it’s just not possible to make good decisions on a single server, and the problem is compounded when dealing with an entire cluster. There’s a number of reasonable monitoring solutions available. At the moment we’re fans of Prometheus, if you chose to self host, and Datadog if you prefer hosted, but this isn’t meant to be an exhaustive list. We recommend aggregating metrics from your application and your databases into a single monitoring system.

    Once you’ve got all your metrics together, be sure to create useful dashboards that expose:

    • Throughput
    • Latency
    • Error rate

    These metrics must be monitored from all layers to truly understand what’s happening when there’s an issue in production.

    Going an extra step or two past normal monitoring is distributed tracing, made popular by the Google Dapper paper. The open source equivalent is Zipkin, which Mick here at The Last Pickle is a contributor to and wrote a bit about some time back.

  10. Backups

    Cassandra’s fault tolerance makes it easy to (incorrectly) disregard the usual advice of having a solid backup strategy. Just because data is replicated doesn’t mean we might not need a way to recover data later on. This is why we always recommend having a backup strategy in place.

    There are hosted solutions like Datos, Cassandra Snapshots, volume snapshots (LVM, EBS Volumes, etc), incremental Cassandra backups, home rolled tools, etc. It’s not possible to recommend a single solution for every use case. The right solution is going to be dependent on your requirements.

  11. Basic GC Tuning

    The default Cassandra JVM argument is, for the most part, unoptimized for virtually every workload. We’ve written a post on GC tuning so we won’t rehash here. Most people assume GC tuning is an exercise in premature optimization and are quite surprised when they can get a 2-5x improvement in both throughput (queries per second) and p99 latency.

  12. Disable Materialized Views

    When materialized views (MVs) were added to Cassandra 3 everyone, including me, was excited. It was one of the most interesting features added in a long time and the possibility of avoiding manual denormalization was very exciting. Unfortunately, writing such a feature to work correctly turned out to be extremely difficult as well.

    Since the release of the feature, materialized views have retroactively been marked as experiemental, and we don’t recommend them for normal use. Their use makes it very difficult (or impossible) to repair and bootstrap new nodes into the cluster. We’ll dig deeper into this issue in a later post. For now, we recommend setting the following in cassandra.yaml file:

    enable_materialized_views: false

  13. Configure Compression

    We wrote a post a while back discussing tuning compression and how it can help improve performance, especially on read heavy workloads.

  14. Dial Back Read Ahead

    Generally speaking, whenever data is read off disk, it’s put in the page cache (there are exceptions). Accessing data in the page cache is significantly faster than accessing data off disk, since it’s reading from memory. Read ahead simply reads extra data. The logic is that adding extra data to a read is cheap, since the request is already being made, so we might as well get as much data in the page cache as we can.

    There’s a big problem with this.

    First, if read ahead is requesting information that’s not used, it’s a waste of resources to pull extra data off disk. Pulling extra data off disk means the disk is doing more work. If we had a small amount of data, that might be fine. The more data we have, the higher the ratio of disk space to memory, and that means it keeps getting less and less likely we’ll actually have the data in memory. In fact, we end up finding it likely we’ll only use some of the data, some of the time.

    The second problem has to do with page cache churn. If you have 30GB of page cache available and you’re accessing 3TB of data, you end up pulling a lot of data into your page cache. The old data needs to be evicted. This process is an incredible waste of resources. You can tell if this is happening on your system if you see a kswapd process taking up a lot of CPU, even if you have swap disabled.

    We recommend setting this to 0 or 8 sectors (4KB) on local SSD, and 16 (KB) if you’re using EBS in AWS and you’re performing reads off large partitions.

    blockdev --setra 8
    

    You’ll want to experiment with the read ahead setting to find out what works best for your environment.

There’s quite a bit to digest, so I’ll stop here. This isn’t a comprehensive list of eveything that can be adjusted by any means, but it’s a good start if you’re looking to do some basic tuning. We’ll be going deeper into some of the topics in later posts!

Handling a Cassandra transactional workload

Overview of Cassandra

As previously mentioned in my notes on lightweight transactions, Cassandra does not support ACID transactions. Cassandra was built to support a brisk ingest of writes while being distributed for availability. Follow the link to my previous post above to learn more specifics about LWTs and the Paxos algorithm.

Here I’ll cover some other ways to handle a transactional workload in Cassandra.

Batch mode

The standard write path for Cassandra is from client to memtable, and commit log to sstable. The write is stored on the memtable and commitlog of replica nodes (as configured using replication factor) before it is considered complete.

The batch write path includes, in addition, a batch log which is used to group updates that are then considered complete (or not) together. This is an expensive operation unless batch writes affect a single partition key.

As with lightweight transactions, time coordination among nodes remains important with batched writes.

Workarounds

To avoid expensive lightweight transactions or batched writes, software can be installed beside Cassandra to manage writes that need to be done together. Applications coordinate with the software to introduce locks to the write path to ensure atomicity and isolation of updates; the software used manages these locks.

Two software tools that can be used for this type of workaround are Apache Zookeeper and Hashicorp Consul. Both of these tools are typically used to manage distributed configuration but can be leveraged to assist with Cassandra transactions. Whereas Zookeeper was originally created as an in-memory data store, Consul was built to be a configuration manager.

Zookeeper

Because Zookeeper is essentially a data store, several libraries were created for the locking functionality. Two of these are Google’s Cages and Netflix’s Curator (now maintained as an Apache project). Note that Zookeeper and the Cages/Curator libraries have not been updated in several years. There is no reason application developers could not write similar functionality within their main application to interact with Zookeeper, perhaps using these as references.

Cages

Cages is a Java library used to synchronize the movement of data among distributed machines, making Cassandra transactional workloads an ideal use case.

Cages includes several classes for reading and writing data. A pertinent one for transactional workloads is ZkWriteLock, used to wrap statements inside a lock stored in Zookeeper. Note that this lock stored in Zookeeper has nothing to do with the underlying Cassandra functionality, and must be adhered to by all parts of the application. Indeed, the application or another user could bypass the lock and interact directly with Cassandra.

Curator

Curator was created specifically to manage Zookeeper, resulting in a tighter integration. Curator works similarly to Cages, though, wrapping statements in a mutex and requiring the application to observe the locks to ensure data consistency.

Consul

Consul is also a distributed storage system used to manage configuration and similar data. It is recently developed and remains up-to-date.

The distribution of Consul storage is highly flexible and performant, making it a great alternative to Zookeeper. The basic interaction from the application remains the same: the application would store a lock as a key-value in Consul, and all writes from the application would need to respect the lock.

Performance

Introducing extra steps in the write path is not free with regard to performance.

In addition to the lag inherent to locking, Zookeeper can become a bottleneck. This can be avoided by scaling the Zookeeper clusters. A feature called Observer helps to reduce time spent getting a quorum from the Zookeeper cluster.

Regardless, there is an upper limit — of about 5-10K operations per second — that you can perform per second against Zookeeper, so take this into consideration when planning an architecture.

Recommendations

If the transactional workload is infrequent and minimal, lightweight transactions should suffice. However, if transactions are a core function of the application, we recommend using Zookeeper or Consul to manage write locks. Zookeeper has a longer history, but Consul is more up-to-date and provides great flexibility and performance, giving us a preference for Consul.