All Work and No Play Makes Scylla a Dull Monster

Following a week of Scylla hands-on training, Scylla Summit, and internal meetings, the associates of ScyllaDB had two days to relax, unwind, and have fun in the beautiful city of Sonoma. Over the past year ScyllaDB’s team has grown by 68% which meant a large contingent coming to Scylla Summit were new! Scylla monsters converged from 13 countries all over the world and speak at least 15 languages. The fun days provided the perfect atmosphere to get to know our colleagues and allowed everyone to bond and develop long lasting friendships.

Our internal event planning team paid attention to every detail making sure we were fed, entertained, and enjoyed all that Sonoma had to offer. A big shout out to the Operations team for their hard work and dedication to make sure everyone had a good time! Well done!

The bonding activities started during the two days of internal summit meetings before the fun days even happened. The Operations team provided activities which allowed us to break the ice, and get to know all of the new people who had joined over the past year. We played a form of get-to-know-you Bingo, interviewing our co-workers and learning all the talents and interests our associates have beyond the office. Our hobbies are as diverse as our origins. We enjoy activities such as running, ping pong, ice skating, swimming, snowboarding, riding horses, bicycles and motorcycles, and playing billiards, checkers, guitar, and saxophone. Some of us woke up early or stayed up late during the week in order to enjoy them as well. Our Solutions Architect Tomer played the saxophone in one of the local clubs and received wonderful applause. Bravo Tomer!

Everyone had the chance to sample the cuisine and drink locally grown and bottled wine. Some of the activities we did over the two days included a choice of two hikes, building bicycles for disadvantaged children and a scavenger hunt which tested our logical thinking as well as our knowledge of mechanics and physics.

Take the Path Less Traveled

Both hikes were enjoyed by all given the weather was sunny and cool. The easier hike made use of the Sonoma Bike Path, a paved path which lead to La Casa Grande, the first home of General Mariano Vallejo who was appointed as the first Commandante General for California by Mexico when it gained independence from Spain.

Another highlight on the hike was to pass Depot Park which was once the site of the Northwestern Pacific Railroad Depot. The park contains original boxcars and a caboose which are situated in front of the Depot Park Museum (there was no time to enter). The museum contains many historical artifacts about the Sonoma Valley Railroad Company, which used to operate this railroad line. Also visible from the hike was the Sebastiani Vineyards and Winery. The hike culminated in the beautiful and historic Sonoma Plaza surrounded by Sonoma City Hall (pictured) and numerous shops and restaurants and beautiful fountains and benches and areas to rest and relax.

Another hike went to The Montini Open Space Preserve (a state park recently established in December 2005). The preserve is covered with oak woodland, large rock outcroppings and open grasslands. The large trees provided shade and made the hike enjoyable to all. The preserve features oak woodlands and grasslands which provide homes for animals important for preserving the ecosystem and natural heritage of Sonoma County including deer, moles, red-shouldered hawks, red-tailed hawks, plus both acorn and pileated woodpeckers.

How Many Scylla Monsters Does it Take to Build a Bicycle?

The Scylla Fun days not only provided opportunities to bond and work together, but it also gave us the opportunity to contribute to the local community.

The Scylla monsters broke into teams of 10 and worked to Build-A-Bike ® supporting children with a low SES who attend Sassarini Elementary in Sonoma County. Once the bicycles are delivered to the school, it is given to the child as a reward for working hard and to celebrate their success in the classroom. Before each team began, four judges were selected who added humor and reality to the game. The teams went to their stations, created their flags and then began to build and decorate their bike. The team names included CoreDump and Unicorns and each team showed their creativity and enthusiasm. The activity had a twist to make it more competitive and fun. Instead of giving each team the exact parts they needed, the judges held some of the parts. In order to get these precious packages, each team had to solve a puzzle. However, sometimes solving the puzzle resulted in a judge giving you a part you already had. When this happened, you needed to go to another team and negotiate a trade. Some teams decided to get creative with their engineering efforts (Simon Helberg, watch out!), and in the end 10 complete bikes were made (5 girl’s bikes and 5 boy’s bikes). Of course if Scylla herself had helped us we would have finished much faster as her multiple tentacles would have been useful with putting the parts together.

The best bikes are built on a solid frame. So it didn’t surprise us at all that the name of the girl’s bike was: Huffy Seastar. Coincidence? We think not.

As you can see from the photo we enjoyed this activity tremendously and cannot wait to see the look on the children’s faces when they receive these bikes! We hope the kids are inspired by STEAM and pursue a career in the sciences.

The Race of a Lifetime

The second day of fun featured an Amazing Race activity where teams of 8/9 Scylla Monsters all took different colored bandanas, a backpack with clues, and an egg and ran all over Sonoma to find clues, solve puzzles, and record epic scenes such as: a victory dance, the twist, an epic Star Wars fight scene, lumberjacks cutting down trees, doing the macarena and the robot and more! Points for spying on other teams (recording them recording themselves) were given and extra points were given if you found the points of interest indicated on the postcards in the puzzle pack. At the end of the race the teams were given an identical set of items and had to construct a package for their egg so that when dropped it would not break. Only 3 teams succeeded in this task and one of them, the green team was the winner! The day culminated with some free time so that everyone could get lunch on their own and make their way back to the hotel before departing for the airport.

The fun days allowed the Scylla team to unwind, relax, energize, and most importantly bond. The bonds of friendship that developed over these two days will create opportunities for better engagement and collaboration in the future. We went to the airport and when we got home we discovered two things: first, Scylla Monster plushies make great kids and dogs toys and, second, we can’t wait to see gather again next year!

Response from all across the company was enthusiastic. As our VP of Sales Rich Nigro said, “The organizing team rocked and the camaraderie was equal to the task!” Shlomi Livne, our VP of R&D added, “Year after year you are raising your bar. I can’t wait to see where you will take us next year.”

Want to be part of the fun? Check out our job openings!

The post All Work and No Play Makes Scylla a Dull Monster appeared first on ScyllaDB.

Maximizing Performance via Concurrency While Minimizing Timeouts in Distributed Databases

Modern database systems are designed to handle a large number of connections, each serving an equally large number of simultaneous requests. This is especially important in databases supporting microservices architectures that scale horizontally and have clients coming and going as demand requires.

However, the fact that modern databases can handle large numbers of connections doesn’t make them immune to overload. At some point, there is such a thing as too many. Moreover, overload will increase latency and eventually generate timeouts. If one is not careful with retry policies, retries can further deteriorate the server.

For you to get the most out of your big data applications, let’s explore the effects of concurrency in distributed databases and provide you with tools to correctly configure your infrastructure for maximum performance, including client-side parallelism and timeout settings.

Connections, Pools, and Parallelism

One common confusion when talking about parallelism is to conflate the amount of connections with request-level parallelism. Is increasing the number of connections by itself enough to increase database activity?

Regardless if synchronous or asynchronous, a request/response exchange between a client and a server presupposes the existence of a network connection between them. One way to see a connection is like a two-way road: it represents a path between two endpoints capable of accommodating traffic in both directions. By the same token, requests and responses can be regarded as vehicles travelling from one point to the other in these roads.

Connection creation and destruction is an expensive operation both in terms of resources and time, so reusing them for multiple exchanges is a common practice among database servers. A connection pool does essentially that: it keeps a pool of established client/server connections and dynamically allocates them on demand whenever requests are triggered by the client application. The same reasoning applies to thread pools: reusing threads to dispatch requests and receive responses is more efficient that creating and destroying them on-the-fly.

Having our traffic analogy in mind, it is easy to understand why an excessive number of connections or threads can easily saturate any database server. The optimal size of a client/server connection pool is intrinsically dependent on the server’s physical capacity, just like the number of lanes in a highway depends on the size of the cities being interconnected by it. Also, the number of in-flight requests should be enough to keep the lanes busy, but not to the point of jamming the traffic. The diagram below illustrates these concepts.

Scaling Workloads

Properly sizing the workload that a client should submit to a server can be a tricky task. In this section we introduce an iterative process to assess the server’s ability to handle requests, which helps defining upper connection and in-flight request limits.

In our examples, we will use Scylla, a real-time NoSQL database which particularly specializes at very high throughput use cases. Scylla’s internal architecture leverages a shared-nothing model that assigns independent shards to individual cores, thanks to the Seastar framework it is based upon. Because of this architecture, the relationship between connections and the server physical attributes is even more visible.

The tests are executed against a 3-node cluster running on i3.2xlarge AWS VMs (8 cores, 61 GB RAM and NVMe storage volumes). The workload is simultaneously generated by two c5n.4xlarge AWS VMs (16 cores, 42 GB RAM and EBS storage volumes) running the cassandra-stress tool. The total number of threads shown in the results below are the sum of the number of threads configured in each VMs. As for the results, throughput results are aggregated and latency results are averaged.

Scaling Threads

The first part consists in fixing the number of connections so that we can observe the effects of parallelism in isolation. We initially set the number of connections to the total number of server cores – 8 per server, remembering that we have two stress tool instances running in parallel – and gradually increase the number of client threads. Since the cassandra-stress tool uses synchronous requests, each execution thread sequentially dispatches one request at a time. Hence, increasing the number of threads effectively increases the number of in-flight requests to the database.

The WRITE workloads are started at each loader respectively with the following commands:

cassandra-stress write n=48000000 -pop seq=1..48000000 -rate threads=<count> -node <IP> -mode native cql3 connectionsPerHost=4
cassandra-stress write n=48000000 -rate seq=48000000..96000000 threads=<count> -node <IP> -mode native cql3 connectionsPerHost=4

The READ workloads are started at each loader VM with the following command:

cassandra-stress read n=60000000 -rate threads=<count> -node -mode native cql3 connectionsPerHost=4

As can be seen in chart 1, peak throughput happens with about 2664 WRITE threads and 1512 READ threads. Besides peaking with more threads, WRITE operations also present higher peak throughput than READ. Beyond the peak point, cluster performance gradually degrades as an indication of server saturation.

Chart 1: Throughput rates observed while increasing the number of cassandra-stress threads

It is worth noting that more throughput is not always better: Latencies monotonically increase (get worse) with concurrency, both in the average and the high tail of the distribution, as shown in charts 2 and 3.

Chart 2: Latency rates observed while increasing the number of cassandra-stress threads

Chart 3: Read latency rates around the throughput turning point

Therefore, depending on the desired qualities of service from the database cluster, concurrency has to be judiciously balanced to reach appropriate throughput and latency values. This relationship can be expressed as the effective queue size as defined by the Little’s law, which establishes that:


where λ is the average throughput, W is the average latency and L represents the total number of requests being either processed or on queue at any given moment when the cluster reaches steady state. Usually, throughput and average latency are part of the service level that the user controls. In other words, you know how much throughput your database system needs to provide, and which average latency it should sustain.

In the example above, if we want a system to serve 500,000 requests per second at 2.5ms average latency, the best concurrency is around 1250 in-flight requests. As we approach the saturation limit of the system — around 600,000 requests/s for read requests, increases in concurrency will keep constant since this is the physical limit of the database. Every new in-flight request will only translate into increased latency.

In fact, if we approximate 600,000 requests/s as the physical capacity of this database, we can calculate the expected average latency at a particular concurrency point. For example, at 6120 in-flight requests, our average latency is expected to be 6120 / 600,000 = 10ms. That is indeed very close to what we see in the graphs above.

Note that as seen in the throughput graph above, in real-life systems throughput-at-peak may not stay constant but be reduced-slightly, as contention increases.

Unbounded Request Flows

Now that we understand how the number of in-flight requests relate to throughput and latency, let’s take a look at some scenarios that can lead to overload and how to avoid them.

A thread pool is an important mechanism to reuse threads and attenuate the overhead of creating and destroying them recurrently. An important side effect of thread pools is that they offer and out-of-the-box mechanism to constraint client-side load by defining the maximum size a pool should have. Failure to specify a pool size limit can lead to excessive concurrency, which leads to server performance degradation.

Thread-pool parameters can usually be configured (see this example for Scylla and Cassandra), and relying on their default values is often not a good idea. Consider for example the Java code block below:

ExecutorService executor = Executors.newFixedThreadPool(500);
Cluster cluster = Cluster.builder().addContactPoint("host").build();
Session session = cluster.connect("keyspace");
PreparedStatement stmt = session.prepare("INSERT INTO test (id) VALUES (?)");

List<Future> futures = new ArrayList<>();
futures.add(executor.submit(() -> {
        return session.execute(stmt.bind(123));

Here, a fixed-size thread pool is created and a synchronous task is submitted. If more tasks like that are triggered at the same time, they are executed in parallel at the pool. If more than 500 tasks are active at the same time (the pool size), only 500 are executed at the pool and the remaining ones are queue until a running task completes. This mechanism clearly constraints the load a client subjects the cluster to 500 requests. We could do the same for whichever concurrency we want to generate for this client.

But Is this enough to restrict concurrency? Consider the following code block:

ExecutorService executor = Executors.newFixedThreadPool(500);
Cluster cluster = Cluster.builder().addContactPoint("host").build();
Session session = cluster.connect("keyspace");
PreparedStatement stmt = session.prepare("INSERT INTO test (id) VALUES (?)");

List futures = new ArrayList<>();
executor.submit(() -> {

The differences are subtle, but the functionality is fundamentally different than the previous one. In this case, each task triggers an asynchronous statement execution. The task themselves are short-lived and simply signal the client driver to spawn a new statement to the cluster. From the server workload perspective, this is similar to creating an additional thread with a synchronous statement execution, but on an unbounded pool.

So especially when dealing with asynchronous requests, how can we make sure that the server is always healthy and not overloaded?

Scaling Connections

Let’s take a look at how throughput and latency vary with the number of connections opened to the database. Here, the number of threads is locked to the best configuration found in the previous section — remembering that we have two stress tool instances running — and the number of connections is initially set to half the original configuration:

cassandra-stress write n=48000000 -rate threads= -node <IP> -mode native cql3 connectionsPerHost=<count>
cassandra-stress write n=48000000 -rate threads= -node <IP> -mode native cql3 connectionsPerHost=<count>

Chart 4 illustrates the results. As we can see, we need connections — the lanes — so our requests can flow. If there are too few connections we won’t achieve peak throughput. Having too many is not, on its own, enough to generate overload, as overload is caused by excess in-flight requests. But the extra lanes may very well be the avenue through which an excessive number of in-flight requests can now reach the server.

The biggest benefit in being aware of the number of connections come with the fact that we can configure the connection pool properties, like the number of maximum in-flight requests per connection. Take for instance, this example from the Scylla and

Cassandra Java drivers:

.setMaxRequestsPerConnection(HostDistance.LOCAL, 10)

HostDistance.LOCAL is a Scylla and Cassandra specific concept, that tells that this setting is valid for the local DataCenter (as opposed to HostDistance.REMOTE). In this example we allow for ten default connections with a maximum of ten — meaning the pool size will never dynamically increase. We then set the maximum number of requests to ten as well, meaning that the maximum number of in-flight requests coming from this client will be one hundred.

Chart 4: Throughput rates observed when increasing the number of connections

If the client generates more than that, either because of asynchronous requests are generated too fast or because of too many threads, the requests will sit on a client-side queue for up to 100ms — according to setPoolTimeoutMillis — and timeout after that.

Careful setting of Connection Pool parameters is a powerful way for application developers to make sure that reasonable concurrency limits to the database are always respected, no matter what happens.

Timed out Requests and Retries

When dealing with distributed systems, requests may inevitably timeout. Timeouts can happen for a variety of reasons, from intermittent network interruptions to server overload. If and when requests do timeout, is it a good idea to retry them?

To demonstrate that let’s consider the following scenario: A heavy READ workload with moderate sized payloads (>500 bytes per operation) targeting a 3-node Scylla cluster. In Chart 5 we can see that the CPU load in the system is already high, and yet the average latencies, as seen on Chart 6, are below 5ms for this workload.

Chart 5: Cluster load with default timeout configurations

Chart 6: Cluster latencies with default timeout configurations

At such high load it is possible that some requests, even at the tail latency, may timeout. In the specific case of Scylla, the server configuration can tell us when will the requests time out in the server:

[centos@ip-10-0-0-59 scylla]$ egrep '^[^#]+timeout_in_ms:' /etc/scylla/scylla.yaml
read_request_timeout_in_ms: 5000
write_request_timeout_in_ms: 2000
cas_contention_timeout_in_ms: 1000
range_request_timeout_in_ms: 10000
request_timeout_in_ms: 10000

Because this is a read workload, we can see that the server will time out after 5 seconds. Workloads run on an SLA, and oftentimes what we see is that after a certain threshold is crossed, the users may retry the request even if the server hasn’t replied yet.

We can simulate a scenario in which the client wants an SLA in the order of single-digit milliseconds, and retries requests that take more than that without changing the server configuration. This seems harmless enough if this happens with the occasional request due to intermittent failures. But Chart 7 shows us what happens if the latency is higher than expected due to overload:

Chart 7: Cluster latencies after lowering operation timeout parameter

As you can see, average latencies sky-rocketed 15 fold. Also, the latency curve shape clearly illustrates a cascading effect. As we saw throughout this article, an excessive amount of in-flight requests can harm the server. We saw in Charts 1 and 2 that after the saturation point latency increases without a corresponding throughput increase.

And if the client-side timeouts are lower than the server side timeout, this is exactly what happens here: new requests arrive from the client before the server had the opportunity to retire them. The increased workload injected by timed out operations push the latencies further up, which in turn result in higher timeout rates and so on.

For this reason, it is mandatory to use client-side timeouts that are equal to or greater than the server timeouts. If performance requirements are more strict and lower timeouts are needed, both servers and clients should be configured accordingly.

Next Steps

In this article you read some of the main aspects of concurrency control and how they should be taken into account when building and tuning a client application in a client/server architecture. You also learned how to assess the server’s ability to accommodate increasing workloads, and our best recommendations on how to avoid common pitfalls when designing and sizing a client application.

While we used Scylla as our reference database for testing we encourage you to perform your own testing with other databases using similar methodologies.

Designing scalable systems, dealing with multi-threaded programming and managing concurrency control are hard tasks. Hopefully this article will help alleviate some of the burdens when making your applications efficient and scalable.

If you have more questions about how best to take advantage of concurrency and timeouts in Scylla, or if you wish to share the results of your own testing, please contact us or drop by our public Slack channel. We’d love to hear from you!

The post Maximizing Performance via Concurrency While Minimizing Timeouts in Distributed Databases appeared first on ScyllaDB.

Overheard at Scylla Summit 2019

“It was five years ago when Dor Laor and Avi Kivity had a dream…” So began the opening remarks at Scylla Summit 2019. The dreams of ScyllaDB’s founders have since manifested and made their impact all across the Big Data industry.

The world’s leading Scylla practitioners gathered in San Francisco earlier this month to hear about the latest Scylla product developments and production deployments. Once again your intrepid reporter tried to keep up with the goings-on, live tweeting, along with others under the #ScyllaSummit hashtag about the event’s many sessions.

It’s impossible to pack two days and dozens of speakers into a few thousand words, so I’ll give just the highlights. We’ll get the SlideShare links and videos posted for you over the coming days.

Pre-Summit Programs: Training Day and the Seastar Summit

The day before the official opening of Scylla Summit was set aside for our all-day training program. The training program included novice and advanced administration tracks that were well attended. There were courses on data modeling, debugging, migrating, using Scylla Manager, the Scylla Operator for Kubernetes, and more!

Between sessions students flock around ScyllaDB’s Customer Success Manager Tomer Sandler to learn how to migrate to Scylla.

Training Day was held concurrently as a separate-but-related developer conference: the first-ever Seastar Summit, which featured speakers from Red Hat, Lightbits Labs, as well as ScyllaDB. It was a unique opportunity for those building solutions based on the Seastar engine to engage directly with peers .


ScyllaDB CEO Dor Laor

The Tuesday keynotes kicked off with ScyllaDB CEO Dor Laor providing a review of how we got to this point in history in terms of scalability. He recounted the C10k problem — the problem with optimizing network sockets to handle 10,000 concurrent sessions. The C10k barrier has since been shattered by orders of magnitude — it is now the so-called “C10M problem” — with real-world examples of 10 million, 12 million or even 40 million concurrent connections.

In an analogous vein, there is also the so-called $1 million engineering problem, which has at its core the following axiom: “it’s easy to fall into a cycle where the first response to any problem is to spend more money.”

Dor dubbed this issue the “D10M” problem, where customers are spending $10 million or more on their cloud provider bill. He asserted large organizations using Big Data could literally save millions of dollars a year if they just had performant systems that scaled to their needs.

To that point, Dor brought to the stage ScyllaDB’s VP of Field Engineering, Glauber Costa, and Packet’s Solution Architect James Malachowski to reveal a new achievement in scalability. They had created a test scenario simulating 1 million IoT sensors sampling temperature data every minute over the course of a year. That resulted in a data set of 536 billion temperature readings — 1.44 billion data points per day.

Against that data, they created a query to check for average, min and max temperatures every minute across a given timespan.

To give an idea of how large such a dataset is, if you were to analyze it at 1 million rows per second it would take 146 hours — almost a week. Yet organizations don’t want to wait for hours, never mind days, to take action against their data. They want immediacy of insights, and the capability to take action in seconds or minutes.

Packet’s Solution Architect James Malachowski (left) and ScyllaDB VP of Field Engineering, Glauber Costa (right) describe the architecture needed to scale to 1,000,000,000 reads per second

This was why ScyllaDB partnered with Packet to run the bare-metal instances needed to analyze that data as fast as possible. Packet is a global bare metal cloud built for enterprises. Packet ran the Scylla database on a cluster of 83 instances. This cluster was capable of scanning three months of data in less than two minutes at a speed of 1.1 billion rows per second!

Scanning the entire dataset from disk (i.e., without leveraging any caching) took only 9 minutes.

As Glauber put it, “Bare metal plus Scylla are like peanut butter and jelly.”

Packet’s bare-metal cluster that achieved a billion reads-per-second comprised 83 servers with a total of 2800 physical cores, 34 TB of RAM and 314 TB of NVMe.

And while your own loads may be nowhere near as large, the main point was that if Scylla can scale to that kind of load, it can performantly handle pretty much anything you throw at it.

Dor then retook the stage noted how Jay Kreps, CEO of Confluent, recently scratched out the last word in Nadella’s famous quote, declaring instead “All companies are software.”

Yet even if “all companies are software,” he also noted many company’s software isn’t at all easy. So a major goal of ScyllaDB is to make it EASY for software companies to adopt, to use and to build upon Scylla. He then outlined the building blocks for Scylla that help address this issue.

  • Scylla Cloud allows users to deploy a highly-performant scalable NoSQL database without having to hire a team of experts to administer the back end.
  • Project Alternator, ScyllaDB’s DynamoDB-compatible API, is now available in beta on Scylla Cloud. Dor noted how with provisioned pricing 120,000 operations per second would cost $85 per hour on DynamoDB whereas running Scylla Alternator a user could do the same workload for $7.50 an hour — an order of magnitude cheaper.
  • With workload prioritization, you can now automatically balance workloads across a single cluster, providing greater cost savings by minimizing wasteful overprovisioning.

Beyond that, there are needs for workhorse features coming soon from Scylla, such as

  • Database backup with Scylla Manager
  • Lightweight Transactions (LWT)
  • Change Data Capture (CDC) for database updates and
  • User Defined Functions (UDFs) which will support data transformations

While Dor observed the trend to think of “all companies are software,” he also recognized that companies are still at their heart driven by people, highlighting the case of a Scylla user. He finished by making a bold assertion of his own. If all companies are software, then “Scylla is a damned good choice for your software.”

ScyllaDB CTO Avi Kivity

It was then time for Avi Kivity, ScyllaDB’s CTO to take the stage. Avi emphasized how Scylla was doubling down on density. While today’s cloud servers are capable of scaling to 60 terabytes of storage, he pointed out how features like Scylla’s innovative incremental compaction strategy will allow users to get the most out of those large storage systems. Also, to safeguard your data, Scylla now supports encryption at rest.

What Avi gets most excited about are the plans for new features. For instance User Defined Functions (UDFs) and User Defined Aggregates (UDAs). Also now that Scylla has provided Cassandra and DynamoDB APIs, Avi noted that there’s also work afoot on a Redis API (#5132) that allows for disk-backed persistence.

Avi clarified there are also going to be two implementations for Lightweight Transactions (LWT). First, a Paxos-based implementation for stricter guarantees, and then, in due time, a Raft implementation for higher throughput.

Avi also spoke about the unique nature of Scylla’s Change Data Capture (CDC) implementation. Instead of being a separate interface, it will be a standard CQL-readable table for increased integration with other systems.

He finished with a review of Scylla release roadmaps for 2020.

ScyllaDB CTO Avi Kivity showed the Release Schedule for Scylla Enterprise, Scylla Open Source and Scylla Manager for 2020

Philip Zimich, Comcast X1

Next up to speak was Comcast’s Philip Zimich, who presented the scope and scale of use cases behind Comcast’s video and DVR services. When Comcast’s X1 platform team began to consider Scylla they had grown their business to 15 million households and 31 million devices. Their data had grown to 19 terabytes per datacenter spanning 962 nodes of Casssandra. They make 2.4 billion RESTful calls per day, their business logic persisting both recordings and recording instructions. Everything from the DVRs and their recording history to back office data, recording intents, reminders, lookup maps and histories.

Their testing led the Xfinity team to spin up a 200 node cluster, first on Cassandra and then on Scylla, to simulate multiple times the normal peak production load of a single datacenter. Their results were startling. Cassandra is known as a fast-write oriented database. In Comcast’s testing it was able to achieve 22,000 writes per second. Yet Scylla was able to get over 26,500 writes per second — an improvement of 20%. On reads the difference was even more dramatic. Cassandra was able to manage 144,000 reads while Scylla was able to get 553,000 reads — an improvement of over 280%.

Comcast’s testing showed that Scylla could improve their read transactions by 2.8x and lower their long-tail (p999) latencies by over 22x

Difference in latencies were similarly dramatic. Median reads and writes for Scylla were both sub-millisecond. Scylla’s p999s were in the single-digit millisecond range. Under all situations latencies for Scylla were far better than for Cassandra — anywhere between 2x to 22x faster.

Latencies (in milliseconds) Cassandra Scylla Improvement
Reads Median 3.959 0.523 7.5x
p90 22.338 0.982 22.7x
p99 84.318 3.839 22x
p999 218.15 9.786 22.3x
Writes Median 1.248 0.609 2x
p90 4.094 0.913 4.5x
p99 31.914 2.556 12.5x
p999 117.344 7.152 16.4x

With performance testing complete Comcast moved forward with their migration to Scylla. Scylla’s architectural advantages allowed Comcast to scale their servers vertically, minimizing the number of individual nodes they need to administer.

Comcast’s dramatic node count reduction, from 962 nodes of Cassandra to 78 of Scylla

When fully deployed, Comcast will shrink their deployment radically from 962 servers on Cassandra to only 78 nodes on Scylla. This new server topology gives them all the capacity they need to support their user base but without increasing their costs, capping their spending and sustaining their planned growth through 2022.

Martin Strycek,

Last year when the global travel booking giant took the stage at Scylla Summit they were still in the middle of their migration and described that stage of their implementation as “taking flight with Scylla.” At this year’s Summit Martin continued the analogy to “reaching cruising altitude” and updated the crowds regarding their progress.

Martin described how they were able to use two specific features of Scylla to make all the difference for their production efficiency.

The first feature that improved’s performance dramatically was enabling BYPASS CACHE for full table scan queries. With 90 million rows to search against, bypassing the cache allowed them to drop the time to do a full table scan from 520 seconds down to 330 seconds – a 35% improvement.

The second feature was SSTable 3.0 (“mc’) format. Enabling this allowed to shrink their storage needs from 32 TiB of total data down to 22 TiB on disk — a 31% reduction.

The amount of disk used per server shrank once enabled SSTable 3.0 (“mc”) format.

Enabling these features was a smooth, error-free operation for the team. Martin finished his presentation by thanking ScyllaDB, especially Glauber, for making the upgrade experience entirely uneventful: “Thank you for a boring database.”

We take that as high praise!

Glauber Costa

After announcing the winners of the Scylla User Awards for 2019 the keynotes continued with Glauber Costa returning to the stage to share tips and tricks for how to be successful with Scylla.

First off he made distinctions for long-time Cassandra admins of what to remember from their prior experience (data model and consistency issues) and what they’ll need to forget about — such as trying to tune the system the exact same way as before. Because many of the operational aspects of Cassandra may work completely differently or may not even exist in Scylla.

In terms of production hardware, Glauber suggested NVMe if minimizing latency is your main goal. SSD is best if you need high throughput. But forget about using HDDs or any network interface below 1 Gbps if you care about performance. And, generally, never use more, smaller nodes if the result is the same amount of resources. However, in practice it is acceptable to use them to smooth out expansion.

Another key point Glauber touched upon was rack awareness. It is best to “run as many racks as you have replicas.” So if you have a replication factor of three, use three racks for your deployment. This provides perfect balance and perfect resiliency.

These are just two of many topics that Glauber touched upon, and we’ll work to get you the full video of his presentation soon. For now, here are his slides:

Alexys Jacob, Numberly

Known to the developer community as @ultrabug, Alexys is the CTO of Numberly. His presentation was his production experience comparison of MongoDB and Scylla. Numberly uses both of these NoSQL databases in their environment, and Alexys wished to contrast their purpose, utility, and architecture.

For Alexys, it’s not an either-or situation with NoSQL databases. Each is designed for specific data models and use cases. Alexys’ presentation highlighted some of the commonalities between both systems, then drilled down into the differences.

His operations takeaways between the two systems were unsparing but fair. Whereas he hammered MongoDB’s claims about their sharding-based clustering — which, to his view had both poor TCO and operations, he said that replica-sets should be enough, and gave them a moderate TCO rating (“vertical scaling is not a bad thing!”) and good operations rating (“Almost no tuning needed”).

He gave Scylla a good rating for TCO due to its clean and simple topology, maximized hardware utilization and capability to scale horizontally (as well as vertically). For operations, what Alexys wanted was even more automation.

Alexys observed there are complex and mandatory background maintenance operations for Scylla. For example, while compactions are seamless, repairs are still only “semi-seamless.”

From Alexys’ perspective, MongoDB favors flexibility to performance, while Scylla favors consistent performance to versatility.

Breakout Sessions

In their session SmartDeployAI spoke about democratizing Machine Learning and AI by making deployment of resources easy through Kubernetes

This year Scylla Summit boasted over thirty breakout sessions, which spanned from the afternoon of the first day to the afternoon of the second. In due time we’ll share the video and slides for each, but for now, a few highlights.

  • JanusGraph was quite prevalent this year, with back-to-back sessions hosted by our friends at Expero and Enharmonic, and use cases presented by Zeotap for adtech and FireEye for cybersecurity.
  • Kubernetes was also top of mind for many, with presentations from Arrikto’s Yannis Zarkardis about the Scylla Operator, as well as SmartDeployAI talking about using Kubernetes to make efficient workflow pipelines for Machine Learning.
  • Streaming is now nearly ubiquitous. We were pleased to have Confluent’s Jeff Bean on hand for our die-hard Kafka fans, as well as CapitalOne’s Glen Gomez Zuazo who gave an overview of streaming technologies that included Kafka but also touched on other options like RabbitMQ and NATS.
  • Scylla continues to make new inroads across various industries from IoT (Augury, Mistaway and Nauto), to retail and delivery services (Fanatics, iFood) to security (Lookout, Reversing Labs, and FireEye), utilities/energy (SkyElectric), to consumer video (Comcast and Tubi).
  • Scylla Cloud user Mistaway showcased how using Scylla’s database-as-a-service allowed them to keep their focus on their application and their customers, not their infrastructure.

SkyElectric spoke about the incredible opportunities of providing renewable energy – solar and wind — to power the world using their solutions built upon Scylla Open Source

There were also many sessions from ScyllaDB’s own engineering team, highlighting our major new features and capabilities, best practices, security, sizing, Scylla Manager, Scylla Monitoring Stack and more. Look for the video and slides from those sessions in the days ahead.

Day Two General Sessions

The second day of the conference reconvened after lunch as a single general session.

Goran Cvijanovic, ReversingLabs — You have twenty billion objects you’ve analyzed. Any of them could be malware (loaded with a virus, a trojan horse, or other malicious payload), or so-called “goodware” (known to be benign). What database will be able to keep up a level of analytics to prevent the next major data breach? That’s the problem facing ReversingLabs. They analyze all those objects for what they call a “file reputation” which determines if it is safe, suspicious, or known to be dangerous. The need to be fast and right every time is why they put Scylla at the heart of their TitaniumCloud offering.

ReversingLabs’ Goran Cvijanovic described the scale and scope of their file reputation system

The ReversingLabs backend data store needed to support protobuf format natively, exhibit extremely low latencies (<2 milliseconds), and allow for highly variable record sizes (ranging anywhere from 1k of data up to 500 MB). On top of it all, it needed to be highly available and support replication.

ReversingLabs results showed average latencies for writes below 6 milliseconds, and average reads below 1 millisecond. Even their p99 latencies were 12 ms for writes and 7 ms for reads

Goran emphasized that one of the key take aways is to test your chunk size with data compression. In ReversingLabs’ case, it meant they were able to use 49% less storage. (If you want to learn how to take advantage of Scylla’s data compression feature yourself, make sure to read our blog series on compression: Part One and Part Two).

Richard Ney, Lookout — Richard Ney is the principal architect behind Lookout’s ingestion pipeline and query services for the security of tens of millions of mobile devices. Their Common Device Service is receiving data at different intervals for all sorts of attributes related to a mobile device; its software and hardware, its filesystem, settings, permissions and configuration, a binary manifest of what is installed, and various analyses of risk.

Lookout’s Richard Ney shared how a simple mobile device, multiplied tens of millions of times, becomes a Big Data security challenge! His talk was on how Lookout solved that challenge with Scylla.

Their existing design had Spark streaming ingestion jobs flowing through this Common Device Service to DynamoDB and Elasticsearch. But the long-term issue was that, as the system grew, costs increased significantly. Particularly costs associated with DynamoDB. As well, DynamoDB has limits on the primary key and sort key; it was not designed for time series data.

What Lookout is now seeking to do is to replace their current architecture with Scylla plus Kafka, with an eye to leveraging Scylla’s recently announced Change Data Capture (CDC) to flow data through Kafka Connect into Elasticsearch and other downstream services. They also seek to employ incremental compaction with Scylla Enterprise.

Richard described Lookout’s test environment, which emulated 38 million devices generating over 100,000 messages per second. He then noted that the default Kafka partitioner (Murmur2 hash) was not very efficient. The Lookout team implemented their own Murmur3 hash (the same as is used within Scylla) with a consistent jump hash using the Google guava library.

The bottom line for Lookout was that “the cost benefits over the current architecture flow increased significantly as our volume increased.” Richard showed the cost analysis for DynamoDB versus Scylla. In an on-demand scenario, supporting 38 million devices would cost Lookout over $304,000 per month compared to $14,500 for Scylla; a 95% savings over DynamoDB! Even moving to provisioned pricing, the cost of DynamoDB (more than $55,600 per month) would still eclipse Scylla. Scylla would still be 74% cheaper.

Keeping in mind the growth of the mobile market as well as Dor’s allusion to the “million dollar engineering challenge,” Richard extrapolated costs out even further, to a 100,000,000 device scenario. In that case costs for provisioned DynamoDB would rise to $146,000 a month — roughly $1.88 million annually. Whereas for Scylla, at around $38,300 a month, the annualized cost would be less than $460,000. That would mean a savings of over $1.4 million annually.

Shlomi Livne — ScyllaDB’s VP of R&D was up next with a session on writing applications for Scylla. His conceptual process model had a number of steps, each of which could lead back to prior steps. In general:

  1. Think about the queries you are going to run
  2. Only then create a data model
  3. Use cassandra-stress or another tool to validate performance while you…
  4. Develop
  5. Scale test, and finally
  6. Deploy

During this iterative process, Shlomi encouraged the audience to look for opportunities for CQL optimization and to also pay heed to disk access patterns. Look at the amount of I/O operations, and the overall amount of bytes read. There are two elements read off disk: the data stored in the SSTables, and the index. Everything else is read from memory.

Shlomi also proposed paying careful attention to your memory-to-disk ratio when you choose your instance types. For example the AWS EC2 i3 family has a memory-to-disk ratio of 1:30. Whereas the newer i3en family has a memory-to-disk ratio of 1:78. Thus, on the latter instance type you will get more queries served off disk.

Shlomi then led the audience on a comparison of doing a single partition scan versus range scans for performance, and gave examples of how current features such as BYPASS CACHE and PER PARTITION LIMIT, and upcoming features like GROUP BY and LIKE will help users get even more efficiency out of their full table and range scans. The bottom line is that optimized full scans can reduce the overall amount of disk access compared to aggregated single-partition scans.

Calle Wilund — Change Data Capture is a compelling new feature presented by our engineer Calle Wilund. It is a way to record changes to the tables of a database, which can then be read asynchronously by consumers. Basically a way to track diffs to records in your Scylla database. What is unique about Scylla’s implementation is that CDC is implemented as a standard CQL-readable table. It does not require a separate interface or application to read them.

A table comparing how Change Data Capture (CDC) is implemented in Scylla versus other NoSQL databases.

There are a number of use cases for CDC, including database mirroring and replication, or to enable specific applications like fraud detection or a Kafka pipeline. The way it is implemented is as a per-table log in the form of a CQL-readable table. The CDC log is colocated with the original data and has a default Time To Live (TTL) of 24 hours to ensure it doesn’t bloat your disk use uncontrollably.

We’ll provide more information about this feature and its implementation in a future blog, as well as our documentation. For now, if you want to keep an eye on our progress, make sure to check out issue #4985 in Github and peek in on all of the related subtasks.

Konstantin Osipov — Lightweight Transactions (LWT) that enable “Compare-and-Set” are another long-awaited feature in Scylla (see #1359). Our Software Team Lead Konstantin “Kostja” Osipov gave the audience an update on our implementation status. The good news from Kostja is that you can now try it yourself using Scylla’s nightly builds. There is even a quickstart guide on how to use them in Docker. However, LWT is an experimental feature (use --experimental) unsuitable for production; it is not yet available in a tested maintenance release.

Kostja also noted that while ScyllaDB is doing its best to make our implementation match Cassandra, there are some differences. For example, Scylla supports per-core partitioning, so you would need to use a shard-aware driver for optimal performance.

Kostja made clear that LWT has a performance cost. Throughput will be lower and latencies higher. Four round trips are very costly. Especially when working across multiple regions. Right now, ScyllaDB is focusing on improving performance with LWT using a Paxos consensus algorithm. There are also plans in the future to implement Raft.

As with CDC, we’ll have a full blog and documentation of our LWT implementation in the future.

Nadav Har’ElProject Alternator, the free and open source DynamoDB-compatible API for Scylla was presented to our audience by our Distinguished Engineer Nadav Har’el. What was new to those who have been paying close attention to this project was that we are now offering Alternator on Scylla Cloud in beta.

An intensive test of DynamoDB vs. Alternator at 120,000 ops shows that running Scylla on EC2 VMs could be an order of magnitude cheaper than DynamoDB.

Ask Us Anything!

The summit ended with its traditional Ask Me Anything session with Dor, Avi and Glauber onstage. We’d like to thank everyone who attended, and to all our speakers who made the event such a great success.

Even though Scylla Summit is over, we still invite you to ask us anything! Want more details on a feature we announced? Curious to know how to get the same advantages of Scylla’s performance and scalability in your own organization? Feel free to contact us, or drop into our Slack and let us know your thoughts.


The post Overheard at Scylla Summit 2019 appeared first on ScyllaDB.

Winners of the Scylla Summit 2019 User Awards

The Envelope Please…

There are many highlights from our 2019 Scylla Summit this week. One of our favorites from Day 1 was our Scylla User Awards, our opportunity to recognize the impressive things our users are doing with Scylla.

This year we had winners in nine categories. I’m glad for the chance to share them here, along with a bit of color on their use case.

Best Use of Scylla with Spark: Tubi

Streaming company Tubi has a backend service that uses pre-computed machine learning model results to personalize its user home pages. Hundreds of millions of them are generated per day in Spark. Scylla is used for the persistence layer. Tubi chose Scylla because they needed “a NoSQL solution that is very fault-tolerant, fast reads/writes, and easy to set up plus maintain.”

Best Use of Scylla with Kafka: Grab

Southeast Asia’s leading super app, Grab developed a microservices architecture based on data streaming with Apache Kafka. These streams not only power Grab’s business, they provide a vital source of intelligence. Grab’s engineering teams aggregate and republish the streams using a low-latency metadata store built on Scylla Enterprise.

Best Use of Scylla with a Graph Database: FireEye

Cybersecurity company FireEye built its Graph Store using JanusGraph, which uses both ScyllaDB and ElasticSearch for backend storage. They have more than 600 million vertices and over 1.2 billion edges, occupying over 2.5TB of space. After comparing NoSQL databases to serve as a backend, FireEye found Scylla to be roughly 10X faster than other options.

Best Use of Scylla Cloud: Dynamic Yield

Dynamic Yield bills itself as an omnichannel personalization platform built by a team that is known to “eat, sleep and breathe data.” In their industry-leading platform, they combined Scylla with Apache Flink and Apache Spark to power their Unified Customer Profile, which also made GDPR compliance far easier to for them to implement.

Greatest Node Reduction: Comcast

Telecommunications and entertainment leader Comcast Xfinity has been migrating from Cassandra to Scylla with tremendous benefits. To date, Comcast has replaced 962 nodes of Cassandra with just 78 nodes of Scylla. That’s a 92% reduction in nodes! The significantly smaller footprint brings great costs savings in both hardware and administration.

Best Real-Time Use Case: Opera

Web browser company Opera lets you to synchronize your browsing data (bookmarks, open tabs, passwords, history, etc.) between your devices. After migrating to Scylla, Opera managed to reduce their P99 read latencies from 5 seconds to 4ms (a 99.92% reduction), and their P99 write latencies from 500ms to 4ms (a 99.2% reduction). This helps them push updates to connected browsers much faster than ever before. Beyond latency improvements, Opera shared that “migrating to Scylla helped us to sleep well at night.”

Best Analytics Use Case:

Travel leader won in this category for cohabitating analytics and operational stores on the same box, and reducing their footprint 30% by using Bypass Cache. Their data, based on flight bookings, is constantly and rapidly changing. In fact, experiences a 100% turnover in their dataset every ten days. Their need to do analytics against their ever-changing data via full table scans requires them to have a database that can meet their analytics workloads without impacting their live customer transactions.

Community Member of the Year: Yannis Zarkadas, Arrikto

Yannis Zarkadas from data management company Arrikto was winner of Best Scylla Community Member for his work on the Kubernetes Operator for Scylla. He said: “Working on the Kubernetes Operator as part of my diploma thesis at Arrikto was a great experience. The ScyllaDB team was always there if I needed advice or suggestions, and the Operator turned out even better than I’d hoped. Scylla and Kubernetes are a natural fit.”

Most Innovative Use of Scylla: Numberly

AdTech pioneer Numberly has combined Scylla with Kafka Connect, Kafka Streams, Apache Spark and Python Faust, built on Gentoo Linux and deployed on bare-metal across multiple datacenters, all managed with Kubernetes. All of that resulted in reengineering a calculation process that used to take 72 hours but can now be delivered in just 10 seconds.

We congratulate all of our winners, and also thank everyone who has been making Scylla such a vital part of their enterprises and a vibrant open source software community. If you have created your own groundbreaking applications built on Scylla, we’d love to hear more about it! Contact us privately or join us on Slack and tell us all about it!

The post Winners of the Scylla Summit 2019 User Awards appeared first on ScyllaDB.

Medusa - Spotify’s Apache Cassandra backup tool is now open source

Spotify and The Last Pickle (TLP) have collaborated over the past year to build Medusa, a backup and restore system for Apache Cassandra which is now fully open sourced under the Apache License 2.0.

Challenges Backing Up Cassandra

Backing up Apache Cassandra databases is hard, not complicated. You can take manual snapshots using nodetool and move them off the node to another location. There are existing open source tools such as tablesnap, or the manual processes discussed in the previous TLP blog post “Cassandra Backup and Restore - Backup in AWS using EBS Volumes”. However they all tend to lack some features needed in production, particularly when it comes to restoring data - which is the ultimate test of a backup solution.

Providing disaster recovery for Cassandra has some interesting challenges and opportunities:

  • The data in each SSTable is immutable allowing for efficient differential backups that only copy changes since the last backup.
  • Each SSTable contains data that node is responsible for, and the restore process must make sure it is placed on a node that is also responsible for the data. Otherwise it may be unreachable by clients.
  • Restoring to different cluster configurations, changes in the number of nodes or their tokens, requires that data be re-distributed into the new topology following Cassandra’s rules.

Introducing Medusa

Medusa is a command line backup and restore tool that understands how Cassandra works.
The project was initially created by Spotify to replace their legacy backup system. TLP was hired shortly after to take over development, make it production ready and open source it.
It has been used on small and large clusters and provides most of the features needed by an operations team.

Medusa supports:

  1. Backup a single node.
  2. Restore a single node.
  3. Restore a whole cluster.
  4. Selective restore of keyspaces and tables.
  5. Support for single token and vnodes clusters.
  6. Purging the backup set of old data.
  7. Full or incremental backup modes.
  8. Automated verification of restored data.

The command line tool that uses Python version 3.6 and needs to be installed on all the nodes you want to back up. It supports all versions of Cassandra after 2.1.0 and thanks to the Apache libcloud project can store backups in a number of platforms including:

Backup A Single Node With Medusa

Once medusa is installed and configured a node can be backed up with a single, simple command:

medusa backup --backup-name=<backup name>

When executed like this Medusa will:

  1. Create a snapshot using the Cassandra nodetool command.
  2. Upload the snapshot to your configured storage provider.
  3. Clear the snapshot from the local node.

Along with the SSTables, Medusa will store three meta files for each backup:

  • The complete CQL schema.
  • The token map, a list of nodes and their token ownership.
  • The manifest, a list of backed up files with their md5 hash.

Full And Differential Backups

All Medusa backups only copy new SSTables from the nodes, reducing the network traffic needed. It then has two ways of managing the files in the backup catalog that we call Full or Differential backups. For Differential backups only references to SSTables are kept by each new backup, so that only a single instance of each SStable exists no matter how many backups it is in. Differential backups are the default and in operations at Spotify reduced the backup size for some clusters by up to 80%.

Full backups create a complete copy of all SSTables on the node each time they run. Files that have not changed since the last backup will be copied in the backup catalog into the new backup (and not copied off the node). In contrast to the differential method which only creates a reference to files. Full backups are useful when you need to take a complete copy and have all the files in a single location.

Cassandra Medusa Full Backups

Differential backups take advantage of the immutable SSTables created by the Log Structured Merge Tree storage engine used by Cassanda. In this mode Medusa checks if the SSTable has previously being backed up, and only copies the new files (just like always). However all SSTables for the node are then stored in a single common folder, and the backup manifest contains only metadata files and references to the SSTables.

Cassandra Medusa Full Backups

Backup A Cluster With Medusa

Medusa currently lacks an orchestration layer to run a backup on all nodes for you. In practice we have been using crontab to do cluster wide backups. While we consider the best way to automate this (and ask for suggestions) we recommend using techniques such as:

  • Scheduled via crontab on each node.
  • Manually on all nodes using pssh.
  • Scripted using cstar.

Listing Backups

All backups with the same “backup name” are considered part of the same backup for a cluster. Medusa can provide a list of all the backups for a cluster, when they started and finished, and if all the nodes have completed the backup.

To list all existing backups for a cluster, run the following command on one of the nodes:

$ medusa list-backups
2019080507 (started: 2019-08-05 07:07:03, finished: 2019-08-05 08:01:04)
2019080607 (started: 2019-08-06 07:07:04, finished: 2019-08-06 07:59:08)
2019080707 (started: 2019-08-07 07:07:04, finished: 2019-08-07 07:59:55)
2019080807 (started: 2019-08-08 07:07:03, finished: 2019-08-08 07:59:22)
2019080907 (started: 2019-08-09 07:07:04, finished: 2019-08-09 08:00:14)
2019081007 (started: 2019-08-10 07:07:04, finished: 2019-08-10 08:02:41)
2019081107 (started: 2019-08-11 07:07:04, finished: 2019-08-11 08:03:48)
2019081207 (started: 2019-08-12 07:07:04, finished: 2019-08-12 07:59:59)
2019081307 (started: 2019-08-13 07:07:03, finished: Incomplete [179 of 180 nodes])
2019081407 (started: 2019-08-14 07:07:04, finished: 2019-08-14 07:56:44)
2019081507 (started: 2019-08-15 07:07:03, finished: 2019-08-15 07:50:24)

In the example above the backup called “2019081307” is marked as incomplete because 1 of the 180 nodes failed to complete a backup with that name.

It is also possible to verify that all expected files are present for a backup, and their content matches hashes generated at the time of the backup. All these operations and more are detailed in the Medusa README file.

Restoring Backups

While orchestration is lacking for backups, Medusa coordinates restoring a whole cluster so you only need to run one command. The process connects to nodes via SSH, starting and stopping Cassandra as needed, until the cluster is ready for you to use. The restore process handles three different use cases.

  1. Restore to the same cluster.
  2. Restore to a different cluster with the same number of nodes.
  3. Restore to a different cluster with a different number of nodes.

Case #1 - Restore To The Same Cluster

This is the simplest case: restoring a backup to the same cluster. The topology of the cluster has not changed and all the nodes that were present at the time the backup was created are still running in the cluster.

Cassandra Medusa Full Backups

Use the following command to run an in-place restore:

$ medusa restore-cluster --backup-name=<name of the backup> \

The seed target node will be used as a contact point to discover the other nodes in the cluster. Medusa will discover the number of nodes and token assignments in the cluster and check that it matches the topology of the source cluster.

To complete this restore each node will:

  1. Download the backup data into the /tmp directory.
  2. Stop Cassandra.
  3. Delete the commit log, saved caches and data directory including system keyspaces.
  4. Move the downloaded SSTables into to the data directory.
  5. Start Cassandra.

The schema does not need to be recreated as it is contained in the system keyspace, and copied from the backup.

Case #2 - Restore To A Different Cluster With Same Number Of Nodes

Restoring to a different cluster with the same number of nodes is a little harder because:

  • The destination cluster may have a different name, which is stored in system.local table.
  • The nodes may have different names.
  • The nodes may have different token assignments.

Cassandra Medusa Full Backups

Use the following command to run a remote restore:

$ medusa restore-cluster --backup-name=<name of the backup> \
                         --host-list <mapping file>

The host-list parameter tells Medusa how to map from the original backup nodes to the destination nodes in the new cluster, which is assumed to be a working Cassandra cluster. The mapping file must be a Command Separated File (without a heading row) with the following columns:

  1. is_seed: True or False indicating if the destination node is a seed node. So we can restore and start the seed nodes first.
  2. target_node: Host name of a node in the target cluster.
  3. source_node: Host name of a source node to copy the backup data from.

For example:


In addition to the steps listed for Case 1 above, when performing a backup to a remote cluster the following steps are taken:

  1. The system.local and system.peers tables are not modified to preserve the cluster name and prevent the target cluster from connecting to the source cluster.
  2. The system_auth keyspace is restored from the backup, unless the --keep-auth flag is passed to the restore command.
  3. Token ownership is updated on the target nodes to match the source nodes by passing the -Dcassandra.initial_token JVM parameter when the node is restarted. Which causes ownership to be updated in the local system keyspace.

Case #3 - Restore To A Different Cluster With A Different Number Of Nodes

Restoring to a different cluster with a different number of nodes is the hardest case to deal with because:

  • The destination cluster may have a different name, which is stored in system.local table.
  • The nodes may have different names.
  • The nodes may have different token assignments.
  • Token ranges can never be the same as there is a different number of nodes.

The last point is the crux of the matter. We cannot get the same token assignments because we have a different number of nodes, and the tokens are assigned to evenly distribute the data between nodes. However the SSTables we have backed up contain data aligned to the token ranges defined in the source cluster. The restore process must ensure the data is placed on the nodes which are replicas according to the new token assignments, or data will appear to have been lost.

To support restoring data into a different topology Medusa uses the sstableloader tool from the Cassandra code base. While slower than copying the files from the backup the sstableloader is able to “repair” data into the destination cluster. It does this by reading the token assignments and streaming the parts of the SSTable that match the new tokens ranges to all the replicas in the cluster.

Cassandra Medusa Full Backups

Use the following command to run a restore to a cluster with a different topology :

$ medusa restore-cluster --backup-name=<name of the backup> \

Restoring data using this technique has some drawbacks:

  1. The restore will take significantly longer.
  2. The amount of data loaded into the cluster will be the size of the backup set multiplied by the Replication Factor. For example, a backup of a cluster with Replication Factor 3 will have 9 copies of the data loaded into it. The extra replicas will be removed by compaction however the total on disk load during the restore process will be higher than what it will be at the end of the restore. See below for a further discussion.
  3. The current schema in the cluster will be dropped and a new one created using the schema from the backup. By default Cassandra will take a snapshot when the schema is dropped, a feature controlled by the auto_snapshot configuration setting, which will not be cleared up by Medusa or Cassandra. If there is an existing schema with data it will take extra disk space. This is a sane safety precaution, and a simple work around is to manually ensure the destination cluster does not have any data in it.

A few extra words on the amplification of data when restoring using sstableloader. The backup has the replicated data and lets say we have a Replication Factor of 3, roughly speaking there are 3 copies of each partition. Those copies are spread around the SSTables we collected from each node. As we process each SSTable the sstableloader repairs the data back into the cluster, sending it to the 3 new replicas. So the backup contains 3 copies, we process each copy, and we send each copy to the 3 new replicas, which means in this case:

  • The restore sends nine copies of data to the cluster.
  • Each node gets three copies of data rather than one.

The following sequence of operations will happen when running this type of restore:

  1. Drop the schema objects and re-create them (once for the whole cluster)
  2. Download the backup data into the /tmp directory
  3. Run the sstableloader for each of the tables in the backup

Available Now On GitHub

Medusa is now available on GitHub and is available through PyPi. With this blog post and the readme file in the repository you should be able to take a backup within minutes of getting started. As always if you have any problems create an issue in the GitHub project to get some help. It has been in use for several months at Spotify, storing petabytes of backups in Google Cloud Storage (GCS), and we thank Spotify for donating the software to the community to allow others to have the same confidence that their data is safely backed up.

One last thing, contributions are happily accepted especially to add support for new object storage providers.

Also, we’re looking to hire a Cassandra expert in America.

TLP Is Hiring Another Cassandra Expert

TLP is looking to hire someone in America to work with us on tough problems for some of the biggest and smallest companies in the world. We help our customers get the best out of Apache Cassandra, and we have a really simple approach: be smart, be nice, and try to make it fun. Our customers range from large corporates to well-known internet companies to small startups. They call on TLP to fix and improve the Cassandra clusters they use to deliver products and services to millions of users, and sometimes to hundreds of millions. We’ve been doing this since 2011 so if you have used the internet there is a good chance TLP has worked with at least one company you have visited.

You will be part of the globally-distributed Consulting team. In the team you will use your expert Cassandra experience and knowledge to work directly with our customers to solve their problems, improve their processes, and build their Cassandra skills. Your teammates will rely on you to review their work, question their assumptions, and help them when they get stuck; as you will of them. This is the only way we know works when the team is asked to solve problems no one else can. When given time you will enjoy participating in the Apache Cassandra community and contributing to our open source projects which have wide usage. In short you will be smart, like working with people of all skill levels, and get a buzz out of connecting with other people and helping them.

About The Role

  • You will be part of the Consulting team distributed in 7 countries in Asia-Pacific, America, and Europe with diverse backgrounds.
  • You will get to work with large and small companies from around the world and together we will find the best ways to help their teams be successful.
  • You will occasionally be required to work outside of normal business hours with customers or your team members. We provide limited on-call support during business hours, and occasional extended support for our customers.
  • You will understand the customer comes first, and will be able to get a quick response to them if they need one. Then you will follow-up with the full TLP knowledge dump :)
  • We provide our customers expert advice including troubleshooting, performance tuning, scaling and upgrading, data modelling, monitoring, automation, hardware selection, and training. With the aim of delivering documentation and video so that if the event happens again they are able to solve it by themselves. We work in a highly-collaborative way even though we are widely distributed. Most things we do for customers are checked by a team member, and we are comfortable admitting we are wrong in front of each other and our customers.

About You

  • You will have 3+ years experience using, operating, and working on Cassandra or be able to convince us you know enough already.
  • You will be fluent with Java, and ideally at least comfortable with Python or Bash (or a similar scripting language).
  • You will be able to demonstrate contributions to open source projects and be familiar with the workflow.
  • You will be comfortable speaking to an audience and ideally have previously spoken about Cassandra at an event. At TLP you will be able to speak about complex ideas to the rest of the team, our customers, or a full room at a conference.
  • You will be able to communicate complex ideas in writing or interpretive dance, and have previous examples of either. At TLP you will be able to write effective ticket updates or run books for customers, blog posts for TLP, or participate in a weekend dance workshop on the frailty of the human condition and consistency in partition tolerant distributed databases.

About TLP

The Last Pickle has been helping customers get the most out of Apache Cassandra since March 2011. We are a successful, self-funded startup with a great customer list, and a great team. We get to spend time on open source tools and researching how best to use Cassandra, as we strive to be a research driven consultancy.

If this sounds like the right job for you let us know by emailing

Open Sourcing Mantis: A Platform For Building Cost-Effective, Realtime, Operations-Focused…

Open Sourcing Mantis: A Platform For Building Cost-Effective, Realtime, Operations-Focused Applications

By Cody Rioux, Daniel Jacobson, Jeff Chao, Neeraj Joshi, Nick Mahilani, Piyush Goyal, Prashanth Ramdas, Zhenzhong Xu

Today we’re excited to announce that we’re open sourcing Mantis, a platform that helps Netflix engineers better understand the behavior of their applications to ensure the highest quality experience for our members. We believe the challenges we face here at Netflix are not necessarily unique to Netflix which is why we’re sharing it with the broader community.

As a streaming microservices ecosystem, the Mantis platform provides engineers with capabilities to minimize the costs of observing and operating complex distributed systems without compromising on operational insights. Engineers have built cost-efficient applications on top of Mantis to quickly identify issues, trigger alerts, and apply remediations to minimize or completely avoid downtime to the Netflix service. Where other systems may take over ten minutes to process metrics accurately, Mantis reduces that from tens of minutes down to seconds, effectively reducing our Mean-Time-To-Detect. This is crucial because any amount of downtime is brutal and comes with an incredibly high impact to our members — every second counts during an outage.

As the company continues to grow our member base, and as those members use the Netflix service even more, having cost-efficient, rapid, and precise insights into the operational health of our systems is only growing in importance. For example, a five-minute outage today is equivalent to a two-hour outage at the time of our last Mantis blog post.

Mantis Makes It Easy to Answer New Questions

The traditional way of working with metrics and logs alone is not sufficient for large-scale and growing systems. Metrics and logs require that you to know what you want to answer ahead of time. Mantis on the other hand allows us to sidestep this drawback completely by giving us the ability to answer new questions without having to add new instrumentation. Instead of logs or metrics, Mantis enables a democratization of events where developers can tap into an event stream from any instrumented application on demand. By making consumption on-demand, you’re able to freely publish all of your data to Mantis.

Mantis is Cost-Effective in Answering Questions

Publishing 100% of your operational data so that you’re able to answer new questions in the future is traditionally cost prohibitive at scale. Mantis uses an on-demand, reactive model where you don’t pay the cost for these events until something is subscribed to their stream. To further reduce cost, Mantis reissues the same data for equivalent subscribers. In this way, Mantis is differentiated from other systems by allowing us to achieve streaming-based observability on events while empowering engineers with the tooling to reduce costs that would otherwise become detrimental to the business.

From the beginning, we’ve built Mantis with this exact guiding principle in mind: Let’s make sure we minimize the costs of observing and operating our systems without compromising on required and opportunistic insights.

Guiding Principles Behind Building Mantis

The following are the guiding principles behind building Mantis.

  1. We should have access to raw events. Applications that publish events into Mantis should be free to publish every single event. If we prematurely transform events at this stage, then we’re already at a disadvantage when it comes to getting insight since data in its original form is already lost.
  2. We should be able to access these events in realtime. Operational use cases are inherently time sensitive by nature. The traditional method of publishing, storing, and then aggregating events in batch is too slow. Instead, we should process and serve events one at a time as they arrive. This becomes increasingly important with scale as the impact becomes much larger in far less time.
  3. We should be able to ask new questions of this data without having to add new instrumentation to your applications. It’s not possible to know ahead of time every single possible failure mode our systems might encounter despite all the rigor built in to make these systems resilient. When these failures do inevitably occur, it’s important that we can derive new insights with this data. You should be able to publish as large of an event with as much context as you want. That way, when you think of a new questions to ask of your systems in the future, the data will be available for you to answer those questions.
  4. We should be able to do all of the above in a cost-effective way. As our business critical systems scale, we need to make sure the systems in support of these business critical systems don’t end up costing more than the business critical systems themselves.

With these guiding principles in mind, let’s take a look at how Mantis brings value to Netflix.

How Mantis Brings Value to Netflix

Mantis has been in production for over four years. Over this period several critical operational insight applications have been built on top of the Mantis platform.

A few noteworthy examples include:

Realtime monitoring of Netflix streaming health which examines all of Netflix’s streaming video traffic in realtime and accurately identifies negative impact on the viewing experience with fine-grained granularity. This system serves as an early warning indicator of the overall health of the Netflix service and will trigger and alert relevant teams within seconds.

Contextual Alerting which analyzes millions of interactions between dozens of Netflix microservices in realtime to identify anomalies and provide operators with rich and relevant context. The realtime nature of these Mantis-backed aggregations allows the Mean-Time-To-Detect to be cut down from tens of minutes to a few seconds. Given the scale of Netflix this makes a huge impact.

Raven which allows users to perform ad-hoc exploration of realtime data from hundreds of streaming sources using our Mantis Query Language (MQL).

Cassandra Health check which analyzes rich operational events in realtime to generate a holistic picture of the health of every Cassandra cluster at Netflix.

Alerting on Log data which detects application errors by processing data from thousands of Netflix servers in realtime.

Chaos Experimentation monitoring which tracks user experience during a Chaos exercise in realtime and triggers an abort of the chaos exercise in case of an adverse impact.

Realtime Personally Identifiable Information (PII) data detection samples data across all streaming sources to quickly identify transmission of sensitive data.

Try It Out Today

To learn more about Mantis, you can check out the main Mantis page. You can try out Mantis today by spinning up your first Mantis cluster locally using Docker or using the Mantis CLI to bootstrap a minimal cluster in AWS. You can also start contributing to Mantis by getting the code on Github or engaging with the community on the users or dev mailing list.


A lot of work has gone into making Mantis successful at Netflix. We’d like to thank all the contributors, in alphabetical order by first name, that have been involved with Mantis at various points of its existence:

Andrei Ushakov, Ben Christensen, Ben Schmaus, Chris Carey, Danny Yuan, Erik Meijer, Indrajit Roy Choudhury, Josh Evans, Justin Becker, Kathrin Probst, Kevin Lew, Ram Vaithalingam, Ranjit Mavinkurve, Sangeeta Narayanan, Santosh Kalidindi, Seth Katz, Sharma Podila.

Open Sourcing Mantis: A Platform For Building Cost-Effective, Realtime, Operations-Focused… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Reduce Operational Costs with AWS EC2 R5 Instance Types on the Instaclustr Managed Platform

AWS EC2 R5 instance types on the Instaclustr Managed Platform

Continuing our efforts in adding support for the latest generation AWS EC2 instances, Instaclustr is pleased to announce support for the EC2 R5 instance type for Apache Cassandra clusters on the Instaclustr Managed Platform. The R-series instances have been optimized for memory intensive applications that require high-performance databases, distributed in-memory caches, in-memory databases, and big data analytics. On the Instaclustr Managed Platform, they are suitable for Cassandra and Kafka clusters when they are required by customer applications to deliver very high performance I/O with minimal latency.

The latest generation of R-series (R5s) instances provide significantly better price-to-performance and price-per-GB over its predecessor. They are designed to gain the most out of the hardware capabilities allowing applications to attain the best value for money. R5s also have EBS-optimized burst support, delivering better peak throughput than R4s. Read more about AWS EC2 R5 instances here.

If you are interested in migrating your existing Cassandra and Kafka cluster on the Instaclustr Managed Platform from R4s to R5s, our highly experienced Technical Operations team can provide all the assistance you need. We have built several tried and tested node replacement strategies to provide zero-downtime, non-disruptive migrations. Read our Advanced Node Replacement blog for more details on one such strategy for Cassandra.

Apache Kafka Benchmarking Results

Our Kafka performance benchmarking methodology aims to find the maximum consumer throughput (messages consumed per second) for a given test set.  For this test a large number of producer load generators were established, enough to stress our Kafka cluster. Under this peak load, a consumer group was also established to consume messages from Kafka, essentially achieving the maximum consumer throughput. The results showed 16% increase in consumer throughput with R5s compared to R4s. 

The load test set was comprised of two 3-node Kafka clusters, one with R4.xlarge and the other with R5.xlarge instances. Each node had 750GB EBS backed SSDs attached. A 30 partition 3x replicated topic was created on both the clusters. The performance was calculated on small 100 byte messages. Average message consumption per second was 1,994,499 messages/sec with R4s, and 2,322,540 messages/sec with R5s.

AWS R5 benchmarking result

We are working on benchmarking R5s for Cassandra and will soon publish the results once the testing is completed. 

If you want to know more about this benchmarking or need clarification on when to use the R5 instance type for Cassandra of Kafka, reach out to our Support team (if you are an existing customer), or contact our Sales team. You can also refer to our support article that describes the nature of each offered instance type and their likely use cases.

R5 Pricing

R5 series is slightly cheaper than R4s and the cost savings are passed to our customers. But considering R5’s improved performance, the overall price-performance gain is significantly higher. You can access pricing details through the Instaclustr Console when you login, or contact our Support or Sales teams.

The post Reduce Operational Costs with AWS EC2 R5 Instance Types on the Instaclustr Managed Platform appeared first on Instaclustr.

How Netflix microservices tackle dataset pub-sub

By Ammar Khaku


In a microservice architecture such as Netflix’s, propagating datasets from a single source to multiple downstream destinations can be challenging. These datasets can represent anything from service configuration to the results of a batch job, are often needed in-memory to optimize access and must be updated as they change over time.

One example displaying the need for dataset propagation: at any given time Netflix runs a very large number of A/B tests. These tests span multiple services and teams, and the operators of the tests need to be able to tweak their configuration on the fly. There needs to be the ability to detect nodes that have failed to pick up the latest test configuration, and the ability to revert to older versions of configuration when things go wrong.

Another example of a dataset that needs to be disseminated is the result of a machine-learning model: the results of these models may be used by several teams, but the ML teams behind the model aren’t necessarily interested in maintaining high-availability services in the critical path. Rather than each team interested in consuming the model having to build in fallbacks to degrade gracefully, there is a lot of value in centralizing the work to allow multiple teams to leverage a single team’s effort.

Without infrastructure-level support, every team ends up building their own point solution to varying degrees of success. Datasets themselves are of varying size, from a few bytes to multiple gigabytes. It is important to build in observability and fault detection, and to provide tooling to allow operators to make quick changes without having to develop their own tools.

Dataset propagation

At Netflix we use an in-house dataset pub/sub system called Gutenberg. Gutenberg allows for propagating versioned datasets — consumers subscribe to data and are updated to the latest versions when they are published. Each version of the dataset is immutable and represents a complete view of the data — there is no dependency on previous versions of data. Gutenberg allows browsing older versions of data for use cases such as debugging, rapid mitigation of data related incidents, and re-training of machine-learning models. This post is a high level overview of the design and architecture of Gutenberg.

Data model

1 topic -> many versions

The top-level construct in Gutenberg is a “topic”. A publisher publishes to a topic and consumers consume from a topic. Publishing to a topic creates a new monotonically-increasing “version”. Topics have a retention policy that specifies a number of versions or a number of days of versions, depending on the use case. For example, you could configure a topic to retain 10 versions or 10 days of versions.

Each version contains metadata (keys and values) and a data pointer. You can think of a data pointer as special metadata that points to where the actual data you published is stored. Today, Gutenberg supports direct data pointers (where the payload is encoded in the data pointer value itself) and S3 data pointers (where the payload is stored in S3). Direct data pointers are generally used when the data is small (under 1MB) while S3 is used as a backing store when the data is large.

1 topic -> many publish scopes

Gutenberg provides the ability to scope publishes to a particular set of consumers — for example by region, application, or cluster. This can be used to canary data changes with a single cluster, roll changes out incrementally, or constrain a dataset so that only a subset of applications can subscribe to it. Publishers decide the scope of a particular data version publish, and they can later add scopes to a previously published version. Note that this means that the concept of a latest version depends on the scope — two applications may see different versions of data as the latest depending on the publish scopes created by the publisher. The Gutenberg service matches the consuming application with the published scopes before deciding what to advertise as the latest version.

Use cases

The most common use case of Gutenberg is to propagate varied sizes of data from a single publisher to multiple consumers. Often the data is held in memory by consumers and used as a “total cache”, where it is accessed at runtime by client code and atomically swapped out under the hood. Many of these use cases can be loosely grouped as “configuration” — for example Open Connect Appliance cache configuration, supported device type IDs, supported payment method metadata, and A/B test configuration. Gutenberg provides an abstraction between the publishing and consumption of this data — this allows publishers the freedom to iterate on their application without affecting downstream consumers. In some cases, publishing is done via a Gutenberg-managed UI, and teams do not need to manage their own publishing app at all.

Another use case for Gutenberg is as a versioned data store. This is common for machine-learning applications, where teams build and train models based on historical data, see how it performs over time, then tweak some parameters and run through the process again. More generally, batch-computation jobs commonly use Gutenberg to store and propagate the results of a computation as distinct versions of datasets. “Online” use cases subscribe to topics to serve real-time requests using the latest versions of topics’ data, while “offline” systems may instead use historical data from the same topics — for example to train machine-learned models.

An important point to note is that Gutenberg is not designed as an eventing system — it is meant purely for data versioning and propagation. In particular, rapid-fire publishes do not result in subscribed clients stepping through each version; when they ask for an update, they will be provided with the latest version, even if they are currently many versions behind. Traditional pub-sub or eventing systems are suited towards messages that are smaller in size and are consumed in sequence; consumers may build up a view of an entire dataset by consuming an entire (potentially compacted) feed of events. Gutenberg, however, is designed for publishing and consuming an entire immutable view of a dataset.

Design and architecture

Gutenberg consists of a service with gRPC and REST APIs as well as a Java client library that uses the gRPC API.

High-level architecture


The Gutenberg client library handles tasks such as subscription management, S3 uploads/downloads, Atlas metrics, and knobs you can tweak using Archaius properties. It communicates with the Gutenberg service via gRPC, using Eureka for service discovery.


Publishers generally use high-level APIs to publish strings, files, or byte arrays. Depending on the data size, the data may be published as a direct data pointer or it may get uploaded to S3 and then published as an S3 data pointer. The client can upload a payload to S3 on the caller’s behalf or it can publish just the metadata for a payload that already exists in S3.

Direct data pointers are automatically replicated globally. Data that is published to S3 is uploaded to multiple regions by the publisher by default, although that can be configured by the caller.

Subscription management

The client library provides subscription management for consumers. This allows users to create subscriptions to particular topics, where the library retrieves data (eg from S3) before handing off to a user-provided listener. Subscriptions operate on a polling model — they ask the service for a new update every 30 seconds, providing the version with which they were last notified. Subscribed clients will never consume an older version of data than the one they are on unless they are pinned (see “Data resiliency” below). Retry logic is baked in and configurable — for instance, users can configure Gutenberg to try older versions of data if it fails to download or process the latest version of data on startup, often to deal with non-backwards-compatible data changes. Gutenberg also provides a pre-built subscription that holds on to the latest data and atomically swaps it out under the hood when a change comes in — this tackles a majority of subscription use cases, where callers only care about the current value at any given time. It allows callers to specify a default value — either for a topic that has never been published to (a good fit when the topic is used for configuration) or if there is an error consuming the topic (to avoid blocking service startup when there is a reasonable default).

Consumption APIs

Gutenberg also provides high-level client APIs that wrap the low-level gRPC APIs and provide additional functionality and observability. One example of this is to download data for a given topic and version — this is used extensively by components plugged into Netflix Hollow. Another example is a method to get the “latest” version of a topic at a particular time — a common use case when debugging and when training ML models.

Client resiliency and observability

Gutenberg was designed with a bias towards allowing consuming services to be able to start up successfully versus guaranteeing that they start with the freshest data. With this in mind, the client library was built with fallback logic for when it cannot communicate with the Gutenberg service. After HTTP request retries are exhausted, the client downloads a fallback cache of topic publish metadata from S3 and works based off of that. This cache contains all the information needed to decide whether an update needs to be applied, and from where data needs to be fetched (either from the publish metadata itself or from S3). This allows clients to fetch data (which is potentially stale, depending on how current that fallback cache is) without using the service.

Part of the benefit of providing a client library is the ability to expose metrics that can be used to alert on an infrastructure-wide issue or issues with specific applications. Today these metrics are used by the Gutenberg team to monitor our publish-propagation SLI and to alert in the event of widespread issues. Some clients also use these metrics to alert on app-specific errors, for example individual publish failures or a failure to consume a particular topic.


The Gutenberg service is a Governator/Tomcat application that exposes gRPC and REST endpoints. It uses a globally-replicated Cassandra cluster for persistence and to propagate publish metadata to every region. Instances handling consumer requests are scaled separately from those handling publish requests — there are approximately 1000 times more consumer requests than there are publish requests. In addition, this insulates publishing from consumption — a sudden spike in publishing will not affect consumption, and vice versa.

Each instance in the consumer request cluster maintains its own in-memory cache of “latest publishes”, refreshing it from Cassandra every few seconds. This is to handle the large volume of poll requests coming from subscribed clients without passing on the traffic to the Cassandra cluster. In addition, request-pooling low-ttl caches protect against large spikes in requests that could potentially burden Cassandra enough to affect entire region — we’ve had situations where transient errors coinciding with redeployments of large clusters have caused Gutenberg service degradation. Furthermore, we use an adaptive concurrency limiter bucketed by source application to throttle misbehaving applications without affecting others.

For cases where the data was published to S3 buckets in multiple regions, the server makes a decision on what bucket to send back to the client to download from based on where the client is. This also allows the service to provide the client with a bucket in the “closest” region, and to have clients fall back to another region if there is a region outage.

Before returning subscription data to consumers, the Gutenberg service first runs consistency checks on the data. If the checks fail and the polling client already has consumed some data the service returns nothing, which effectively means that there is no update available. If the polling client has not yet consumed any data (this usually means it has just started up), the service queries the history for the topic and returns the latest value that passes consistency checks. This is because we see sporadic replication delays at the Cassandra layer, where by the time a client polls for new data, the metadata associated with the most recently published version has only been partially replicated. This can result in incomplete data being returned to the client, which then manifests itself either as a data fetch failure or an obscure business-logic failure. Running these consistency checks on the server insulates consumers from the eventual-consistency caveats that come with the service’s choice of a data store.

Visibility on topic publishes and nodes that consume a topic’s data is important for auditing and to gather usage info. To collect this data, the service intercepts requests from publishers and consumers (both subscription poll requests and others) and indexes them in Elasticsearch by way of the Keystone data pipeline. This allows us to gain visibility into topic usage and decommission topics that are no longer in use. We expose deep-links into a Kibana dashboard from an internal UI to allow topic owners to get a handle on their consumers in a self-serve manner.

In addition to the clusters serving publisher and consumer requests, the Gutenberg service runs another cluster that runs periodic tasks. Specifically this runs two tasks:

  1. Every few minutes, all the latest publishes and metadata are gathered up and sent to S3. This powers the fallback cache used by the client as detailed above.
  2. A nightly janitor job purges topic versions which exceed their topic’s retention policy. This deletes the underlying data as well (e.g. S3 objects) and helps enforce a well-defined lifecycle for data.

Data resiliency


In the world of application development bad deployments happen, and a common mitigation strategy there is to roll back the deployment. A data-driven architecture makes that tricky, since behavior is driven by data that changes over time.

Data propagated by Gutenberg influences — and in many cases drives — system behavior. This means that when things go wrong, we need a way to roll back to a last-known good version of data. To facilitate this, Gutenberg provides the ability to “pin” a topic to a particular version. Pins override the latest version of data and force clients to update to that version — this allows for quick mitigation rather than having an under-pressure operator attempt to figure out how to publish the last known good version. You can even apply a pin to a specific publish scope so that only consumers that match that scope are pinned. Pins also override data that is published while the pin is active, but when the pin is removed clients update to the latest version, which may be the latest version when the pin was applied or a version published while the pin was active.

Incremental rollout

When deploying new code, it’s often a good idea to canary new builds with a subset of traffic, roll it out incrementally, or otherwise de-risk a deployment by taking it slow. For cases where data drives behavior, a similar principle should be applied.

One feature Gutenberg provides is the ability to incrementally roll out data publishes via Spinnaker pipelines. For a particular topic, users configure what publish scopes they want their publish to go to and what the delay is between each one. Publishing to that topic then kicks off the pipeline, which publishes the same data version to each scope incrementally. Users are able to interact with the pipeline; for example they may choose to pause or cancel pipeline execution if their application starts misbehaving, or they may choose to fast-track a publish to get it out sooner. For example, for some topics we roll out a new dataset version one AWS region at a time.


Gutenberg has been at use at Netflix for the past three years. At present, Gutenberg stores low tens-of-thousands of topics in production, about a quarter of which have published at least once in the last six months. Topics are published at a variety of cadences — from tens of times a minute to once every few months — and on average we see around 1–2 publishes per second, with peaks and troughs about 12 hours apart.

In a given 24 hour period, the number of nodes that are subscribed to at least one topic is in the low six figures. The largest number of topics a single one of these nodes is subscribed to is north of 200, while the median is 7. In addition to subscribed applications, there are a large number of applications that request specific versions of specific topics, for example for ML and Hollow use cases. Currently the number of nodes that make a non-subscribe request for a topic is in the low hundreds of thousands, the largest number of topics requested is 60, and the median is 4.

Future work

Here’s a sample of work we have planned for Gutenberg:

  • Polyglot support: today Gutenberg only supports a Java client, but we’re seeing an increasing number of requests for Node.js and Python support. Some of these teams have cobbled together their own solutions built on top of the Gutenberg REST API or other systems. Rather than have different teams reinvent the wheel, we plan to provide first-class client libraries for Node.js and Python.
  • Encryption and access control: for sensitive data, Gutenberg publishers should be able to encrypt data and distribute decryption credentials to consumers out-of-band. Adding this feature opens Gutenberg up to another set of use-cases.
  • Better incremental rollout: the current implementation is in its pretty early days and needs a lot of work to support customization to fit a variety of use cases. For example, users should be able to customize the rollout pipeline to automatically accept or reject a data version based on their own tests.
  • Alert templates: the metrics exposed by the Gutenberg client are used by the Gutenberg team and a few teams that are power users. Instead, we plan to provide leverage to users by building and parameterizing templates they can use to set up alerts for themselves.
  • Topic cleanup: currently topics sit around forever unless they are explicitly deleted, even if no one is publishing to them or consuming from them. We plan on building an automated topic cleanup system based on the consumption trends indexed in Elasticsearch.
  • Data catalog integration: an ongoing issue at Netflix is the problem of cataloging data characteristics and lineage. There is an effort underway to centralize metadata around data sources and sinks, and once Gutenberg integrates with this, we can leverage the catalog to automate tools that message the owners of a dataset.

If any of this piques your interest — we’re hiring!

How Netflix microservices tackle dataset pub-sub was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Delta: A Data Synchronization and Enrichment Platform

Part I: Overview

Andreas Andreakis, Falguni Jhaveri, Ioannis Papapanagiotou, Mark Cho, Poorna Reddy, Tongliang Liu


It is a commonly observed pattern for applications to utilize multiple datastores where each is used to serve a specific need such as storing the canonical form of data (MySQL etc.), providing advanced search capabilities (ElasticSearch etc.), caching (Memcached etc.), and more. Typically when using multiple datastores, one of them acts as the primary store, and the others as derived stores. Now the challenge becomes how to keep these datastores in sync.

We have observed a series of distinct patterns which have tried to address multi-datastore synchronization, such as dual writes, distributed transactions, etc. However, these approaches have limitations in regards to feasibility, robustness, and maintenance. Beyond data synchronization, some applications also need to enrich their data by calling external services.

To address these challenges, we developed Delta. Delta is an eventual consistent, event driven, data synchronization and enrichment platform.

Existing Solutions

Dual Writes

In order to keep two datastores in sync, one could perform a dual write, which is executing a write to one datastore following a second write to the other. The first write can be retried, and the second can be aborted should the first fail after exhausting retries. However, the two datastores can get out of sync if the write to the second datastore fails. A common solution is to build a repair routine, which can periodically re-apply data from the first to the second store, or does so only if differences are detected.

Implementing the repair routine typically is tailored work which may not be reusable. Also, data between the stores remain out of sync until the repair routine is applied. The solution can become increasingly complicated if more than two datastores are involved. Finally, the repair routine can add substantial stress to the primary data source during its activity.

Change Log Table

When mutations (like an insert, update and delete) occur on a set of tables, entries for the changes are added to the log table as part of the same transaction. Another thread or process is constantly polling events from the log table and writes them to one or multiple datastores, optionally removing events from the log table after acknowledged by all datastores.

This needs to be implemented as a library and ideally without requiring code changes for the application using it. In a polyglot environment this library implementation needs to be repeated for each supported language and it is challenging to ensure consistent features and behavior across languages.

Another issue exists for the capture of schema changes, where some systems, like MySQL, don’t support transactional schema changes [1][2]. Therefore, the pattern to execute a change (like a schema change) and to transactionally write it to the change log table does not always work.

Distributed Transactions

Distributed transactions can be used to span a transaction across multiple heterogeneous datastores so that a write operation is either committed to all involved stores or to none.

Distributed transactions have proven to be problematic across heterogeneous datastores. By their nature, they can only rely on the lowest common denominator of participating systems. For example, XA transactions block execution if the application process fails during the prepare phase; moreover, XA provides no deadlock detection and no support for optimistic concurrency-control schemes. Also, certain systems like ElasticSearch, do not support XA or any other heterogeneous transaction model. Thus, ensuring the atomicity of writes across different storage technologies remains a challenging problem for applications [3].


Delta has been developed to address the limitations of existing solutions for data synchronization, and also allows to enrich data on the fly. Our goal was to abstract those complexities from application developers so they can focus on implementing business features. In the following, we are describing “Movie Search”, an actual use case within Netflix that leverages Delta.

In Netflix the microservice architecture is widely adopted and each microservice typically handles only one type of data. The core movie data resides in a microservice called Movie Service, and related data such as movie deals, talents, vendors and so on are managed by multiple other microservices (e.g Deal Service, Talent Service and Vendor Service). Business users in Netflix Studios often need to search by various criteria for movies in order to keep track of productions, therefore, it is crucial for them to be able to search across all data that are related to movies.

Prior to Delta, the movie search team had to fetch data from multiple other microservices before indexing the movie data. Moreover, the team had to build a system that periodically updated their search index by querying others for changes, even if there was no change at all. That system quickly grew very complex and became difficult to maintain.

Figure 1. Polling System Prior to Delta

After on-boarding to Delta, the system is simplified into an event driven system, as depicted in the following diagram. CDC (Change-Data-Capture) events are sent by the Delta-Connector to a Keystone Kafka topic. A Delta application built using the Delta Stream Processing Framework (based on Flink) consumes the CDC events from the topic, enriches each of them by calling other microservices, and finally sinks the enriched data to the search index in Elasticsearch. The whole process is nearly real-time, meaning as soon as the changes are committed to the datastore, the search indexes are updated.

Figure 2. Data Pipeline using Delta

In the following sections, we are going to describe the Delta-Connector that connects to a datastore and publishes CDC events to the Transport Layer, which is a real-time data transportation infrastructure routing CDC events to Kafka topics. And lastly we are going to describe the Delta Stream Processing Framework that application developers can use to build their data processing and enrichment logics.

CDC (Change-Data-Capture)

We have developed a CDC service named Delta-Connector, which is able to capture committed changes from a datastore in real-time and write them to a stream. Real-time changes are captured from the datastore’s transaction log and dumps. Dumps are taken because transaction logs typically do not contain the full history of changes. Changes are commonly serialized as Delta events so that a consumer does not need to be concerned if a change originates from the transaction log or a dump.

Delta-Connector offers multiple advanced features such as:

  • Ability to write into custom outputs beyond Kafka.
  • Ability to trigger manual dumps at any time, for all tables, a specific table, or for specific primary keys.
  • Dumps can be taken in chunks, so that there is no need to repeat from scratch in case of failure.
  • No need to acquire locks on tables, which is essential to ensure that the write traffic on the database is never blocked by our service.
  • High availability, via standby instances across AWS Availability Zones.

We currently support MySQL and Postgres, including when deployed in AWS RDS and its Aurora flavor. In addition, we support Cassandra (multi-master). We will cover the Delta-Connector in more detail in upcoming blog posts.

Kafka & Transport Layer

The transport layer of Delta events were built on top of the Messaging Service in our Keystone platform.

Historically, message publishing at Netflix is optimized for availability instead of durability (see a previous blog). The tradeoff is potential broker data inconsistencies in various edge scenarios. For example, unclean leader election will result in consumer to potentially duplicate or lose events.

For Delta, we want stronger durability guarantees in order to make sure CDC events can be guaranteed to arrive to derived stores. To enable this, we offered special purpose built Kafka cluster as a first class citizen. Some broker configuration looks like below.

In Keystone Kafka clusters, unclean leader election is usually enabled to favor producer availability. This can result in messages being lost when an out-of-sync replica is elected as a leader. For the new high durability Kafka cluster, unclean leader election is disabled to prevent these messages getting lost.

We’ve also increased the replication factor from 2 to 3 and the minimum insync replicas from 1 to 2. Producers writing to this cluster require acks from all, to guarantee that 2 out of 3 replicas have the latest messages that were written by the producers.

When a broker instance gets terminated, a new instance replaces the terminated broker. However, this new broker will need to catch up on out-of-sync replicas, which may take hours. To improve the recovery time for this scenario, we started using block storage volumes (Amazon Elastic Block Store) instead of local disks on the brokers. When a new instance replaces the terminated broker, it now attaches the EBS volume that the terminated instance had and starts catching up on new messages. This process reduces the catch up time from hours to minutes since the new instance no longer have to replicate from a blank state. In general, the separate life cycles of storage and broker greatly reduce the impact of broker replacement.

To further maximize our delivery guarantee, we used the message tracing system to detect any message loss due to extreme conditions (e.g clock drift on the partition leader).

Stream Processing Framework

The processing layer of Delta is built on top of Netflix SPaaS platform, which provides Apache Flink integration with the Netflix ecosystem. The platform provides a self-service UI which manages Flink job deployments and Flink cluster orchestration on top of our container management platform Titus. The self-service UI also manages job configurations and allows users to make dynamic configuration changes without having to recompile the Flink job.

Delta provides a stream processing framework on top of Flink and SPaaS that uses an annotation driven DSL (Domain Specific Language) to abstract technical details further away. For example, to define a step that enriches events by calling external services, users only need to write the following DSL and the framework will translate it into a model which is executed by Flink.

Figure 3. Enrichment DSL Example in a Delta Application

The processing framework not only reduces the learning curve, but also provides common stream processing functionalities like deduplication, schematization, as well as resilience and fault tolerance to address general operational concerns.

Delta Stream Processing Framework consists of two key modules, the DSL & API module and Runtime module. The DSL & API module provides the annotation based DSL and UDF (User-Defined-Function) APIs for users to write custom processing logic (e.g filter and transformation). The Runtime module provides DSL parser implementation that builds an internal representation of the processing steps in DAG models. The Execution component interprets the DAG models to initialize the actual Flink operators and eventually run the Flink app. The architecture of the framework is illustrated in the following Chart.

Figure 4. Delta Stream Processing Framework Architecture

This approach has several benefits:

  • Users can focus on their business logic without the need of learning the specifics of Flink or the SPaaS framework.
  • Optimization can be made in a way that is transparent to users, and bugs can be fixed without requiring any changes to user code (UDFs).
  • Operating Delta applications is made simple for users as the framework provides resilience and failure tolerance out of the box and collects many granular metrics that can be used for alerts.

Production Usages

Delta has been running in production for over a year and has been playing a crucial role in many Netflix Studio applications. It has helped teams implement use cases such as search indexing, data warehousing, and event driven workflows. Below is a view of the high level architecture of the Delta platform.

Figure 5. High Level Architecture of Delta

Stay Tuned

We will publish follow-up blogs about technical details of the key components such as Delta-Connector and Delta Stream Processing Framework. Please stay tuned. Also feel free to reach out to the authors for any questions you may have.


We would like to thank the following persons that have been involved in making Delta successful at Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta, Steven Wu, Tharanga Gamaethige, Yun Wang, and Zhenzhong Xu.


  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Online event processing. Commun. ACM 62(5): 43–49 (2019). DOI:

Delta: A Data Synchronization and Enrichment Platform was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

TypeScript Support in the DataStax Node.js Drivers

TypeScript declarations of the driver are now contained in the same package and repository. These declarations will be maintained and kept in sync along with the JavaScript API. 

Additionally, you can now use DSE-specific types like geo, auth and graph types from TypeScript.

Getting started

To get started with the Node.js driver for Apache Cassandra in a TypeScript project, install the driver package:

npm install cassandra-driver

Removing Support for Outdated Encryption Mechanisms

At Instaclustr, security is the foundation of everything we do. We are continually working towards compliance with additional security standards as well as conducting regular risk reviews of our environments. This blog post outlines some technical changes we are making that both increases the security of our managed environment and enables compliance with a wider range of security standards.

From October 9, 2019 AEST newly provisioned Instaclustr clusters running recent versions of Cassandra and Kafka will have support for the SSLv3, TLSv1.0 and TLSv1.1 encryption protocols disabled and thus require the use of TLS 1.2 and above. From this date, we will also begin working with customers to roll this change out to existing clusters.

Instaclustr-managed clusters that will be affected are:

  • Apache Cassandra 3.11+
  • Apache Kafka 2.1+

Why are we doing this?

The protocols we are disabling are out of date, have known vulnerabilities and are not compliant with a range of public and enterprise security standards. All identified clients that support this version of Cassandra and Kafka support TLS1.2.

How can I test if I will be affected?


The cqlsh CLI will need to be changed to request TLSv1.2 (otherwise it defaults to TLSv1.0).  Assuming a cqlshrc file based on the Instaclustr example, the updated entry should be:


certfile = full_path_to_cluster-ca-certificate.pem

validate = true

factory = cqlshlib.ssl.ssl_transport_factory

version = TLSv1_2

Note: If running CQLSH from on Mac OS X, the system Python is not updated and will not support the TLSv1_2 option.  You should instead manually update your system Python or run cqlsh from a Docker container.

Clients built on the Datastax Apache Cassandra Java driver can create a custom SSLContext that requires that TLSv1.2 is used e.g.

ctx = SSLContext.getInstance("TLSv1.2", "SunJSSE");

If the client is able to successfully connect, then it confirms that your Java environment supports TLSv1.2 (i.e. is recent enough and is not configured to disable it).


If using the official Apache Kafka Java client (or the Instaclustr ic-kafka-topics tool), the client configuration can be updated to allow only TLSv1.2.  For example, based on the Instaclustr example configuration, the enabled protocols becomes:


If the client is able to successfully connect, then it confirms that your Java environment supports TLSv1.2 (i.e. is recent enough and is not configured to disable it).


We understand that testing and changing systems is a time-consuming process. Given the widespread support for TLSv1.2 we do not anticipate that this change will actually impact any current systems. 

If you have any questions or concerns, please do not hesitate to contact us at


The post Removing Support for Outdated Encryption Mechanisms appeared first on Instaclustr.

ApacheCon 2019: DataStax Announces Cassandra Monitoring Free Tier, Unified Drivers, Proxy for DynamoDB & More for the Community

It’s hard to believe that we’re celebrating the 20th year of the Apache Software Foundation. But here we are—and it’s safe to say open source has come a long way over the last two decades.

We just got back from ApacheCon, where DataStax—one of the major forces behind the powerful open source Apache Cassandra™ database—was a platinum sponsor this year. 

We don’t know about you, but we couldn’t be more excited about what the future holds for software development and open source technology in particular.

During CTO Jonathan Ellis’ keynote, we announced three exciting new developer tools for the Cassandra community:

  • DataStax Insights (Cassandra Performance Monitoring)
  • Unified Drivers
  • DataStax Spring Boot Starter for the Java Driver


While we’re at it, in my talk “Happiness is a hybrid cloud with Apache Cassandra,” I announced our preview release for another open source tool: DataStax Proxy for DynamoDB™ and Apache Cassandra.

This tool enables developers to run their AWS DynamoDB™ workloads on Cassandra. With this proxy, developers can run DynamoDB workloads on-premises to take advantage of the hybrid, multi-model, and scalability benefits of Cassandra.

These tools highlight our commitment to open source and will help countless Cassandra developers build transformative software solutions and modern applications in the months and years ahead. 

Let’s explore each of them briefly.

1. DataStax Insights (Cassandra Performance Monitoring)

Everyone who uses Cassandra—whether they’re developers or operators—stands to benefit from DataStax Insights, a next-generation performance management and monitoring tool that is included with DataStax Constellation, DataStax Enterprise, and open source Cassandra 3.x and higher. 

We’re now offering sign-ups for DataStax Insights (or better said: Cassandra Monitoring) for free, allowing Cassandra users to have an at-a-glance health index to get a single view of all clusters. The tool also enables users to optimize their clusters using AI to recommend solutions to issues, highlight anti-patterns, and identify performance bottlenecks, among other things. 

DataStax Insights is free for all Cassandra users for up to 50 nodes and includes one week of rolling retention. Interested in joining the DataStax Insights early access program? We’re taking sign-ups now (more on that below). 


2. Unified Drivers

Historically, DataStax has maintained two sets of drivers: one for DataStax Enterprise and one for open source Cassandra users. Moving forward, we are merging these two sets into a single unified DataStax driver for each supported programming language including C++, C#, Java, Python, and node.js. As a result, each unified driver will work for both Cassandra and DataStax products.

This move benefits developers by simplifying driver choice, which makes it easier to determine which driver to use when building applications. At the same time, developers using the open source version of Cassandra will now have free access to advanced features that initially shipped with our premium solutions. Further, developers that have previously used two different sets of drivers will now only need to use one driver for their applications across any DataStax platform and open source Cassandra. This will help with enhanced load balancing and reactive streams support. 

3. Spring Boot Starter 

The DataStax Java Driver Spring Boot Starter, which is now available in DataStax Labs, streamlines the process of building standalone Spring-based applications with Cassandra and DataStax databases.

Developers will enjoy that this tool centralizes familiar configuration in one place while providing easy access to the Java Driver in Spring applications.

It’s just one more way that makes the application development process easier.

4. DataStax Proxy for DynamoDB™ and Apache Cassandra™

With the DataStax Proxy for DynamoDB and Cassandra, developers can run DynamoDB workloads on-premises, taking advantage of the hybrid, multi-model, and scalability benefits of Cassandra.

The proxy is designed to enable users to back their DynamoDB applications with Cassandra. We determined that the best way to help users leverage this new tool and to help it flourish was to make it an open source Apache 2 licensed project.

The code consists of a scalable proxy layer that sits between your app and the database. It provides compatibility with the DynamoDB SDK which allows existing DynamoDB applications to read and write data to Cassandra without application changes.


Sign up for the DataStax Insights early access program today!

Are you interested in optimizing your on-premises or cloud-based Cassandra deployments using a platform that lets novices monitor and fine-tune their cluster performance like experts? 

If so, you may want to give DataStax Insights a try. 

We’re currently accepting sign-ups to our early access program. Click the button below to get started!


DataStax Labs

DataStax Labs provides the Apache Cassandra™ and DataStax communities with early access to product previews or enhancements for developers that are being considered for future production software; including tools, aids, and partner software designed to increase productivity. When you try out some of our new Labs technologies, we would love your feedback—good or bad—let us know!

Top 5 Reasons to Choose Apache Cassandra Over DynamoDB

Overview – Why Apache Cassandra Over DynamoDB

DynamoDB and Apache Cassandra are both very popular distributed data store technologies. Both are used successfully in many applications and production-proven at phenomenal scale. 

At Instaclustr, we live and breathe Apache Cassandra (and Apache Kafka). We have many customers at all levels of size and maturity who have built successful businesses around Cassandra-based applications. Many of those customers have undertaken significant evaluation exercises before choosing Cassandra over DynamoDB and several have migrated running applications from DynamoDB to Cassandra. 

This blog distills the top reasons that our customers have chosen Apache Cassandra over DynamoDB.

Reason 1: Significant Cost of Writes to DynamoDB

For many use cases, Apache Cassandra can offer a significant cost saving over DynamoDB. This is particularly the case of requirements that are write-heavy. The cost of write to DynamoDB is five times that cost of the read (reflected directly in your AWS bill). For Apache Cassandra, write are several times cheaper than reads (reflected in system resource usage).

Reason 2: Portability

DynamoDB is available in AWS and nowhere else. For multi-tenant SaaS offerings where only a single instance of the application will ever exist, then being all-in on AWS is not a major issue. However, many applications, for a lot of good reasons, still need to be installed and managed on a per-customer basis and many customers (often the largest ones!) will not want to run on AWS. Choosing Cassandra allows your application to run anywhere you can run a linux box.  

Reason 3: Design Without Having to Worry About Pricing Models

DynamoDB’s pricing is complex with two different pricing models and multiple pricing dimensions. Applying the wrong pricing models or designing your architecture without considering pricing can result in order of magnitude differences in costs. This also means that a seemingly innocuous change to your application can dramatically impact cost.  With Apache Cassandra, you have your infrastructure and you know your management fees, once you have completed performance testing and you know that your infrastructure can meet your requirements, you know your costs.

Reason 4: Multi-Region Functionality

Apache Cassandra was the first NoSQL technology to offer active-active multi-region support. While DynamoDB has added Global Tables, these have a couple of key limitations when compared to Apache Cassandra. The most significant in many cases is that you cannot add replicas to an existing global table. So, if you set up in two regions and then decide to add a third you need to completely rebuild from an empty table. With Cassandra, adding a region to a cluster is a normal, and fully online, operation. Another major limitation is that DynamoDB only offers eventual consistency across Global Tables, whereas Apache Cassandra’s tunable consistency levels can enforce strong consistency across multiple regions.

Reason 5: Avoiding Vendor Lock-In

Apache Cassandra is true open source software, owned and governed by the Apache Software Foundation to be developed and maintained for the benefit of the community and able to be run in any cloud or on-premise environment. DynamoDB is an AWS proprietary solution that not only locks you in to DynamoDB but also locks your application to the wider AWS ecosystem. 

While these are the headline reasons that people make the choice of Apache Cassandra over DynamoDB, there are also many advantages at the detailed functional level such as:

  • DynamoDB’s capacity is limited by partition with a maximum of 1,000 write capacity units and 3,000 read capacity units per partition. Cassandra’s capacity is distributed per node which typically provide a per-partition limit orders of magnitude higher than this.
  • Cassandra’s CQL query language provides a simple learning curve for developers familiar with SQL.
  • DynamoDB only allows single value partition and sort (called clustering in Cassandra) keys while Cassandra support multi-part keys. A minor difference but another way Cassandra reduces application complexity.
  • Cassandra supports aggregate functions which in some use cases can provide significant efficiencies.


The post Top 5 Reasons to Choose Apache Cassandra Over DynamoDB appeared first on Instaclustr.

DataStax Proxy for DynamoDB™ and Apache Cassandra™ – Preview

Yesterday at ApacheCon, our very own Patrick McFadin announced the public preview of an open source tool that enables developers to run their AWS DynamoDB™ workloads on Apache Cassandra. With the DataStax Proxy for DynamoDB and Cassandra, developers can run DynamoDB workloads on premises, taking advantage of the hybrid, multi-model, and scalability benefits of Cassandra.

The Big Picture

Amazon DynamoDB is a key-value and document database which offers developers elasticity and a zero-ops cloud experience. However, the tight AWS integration that makes DynamoDB great for cloud is a barrier for customers that want to use it on premises.

Cassandra has always supported key-value and tabular data sets so supporting DynamoDB workloads just meant that DataStax customers needed a translation layer to their existing storage engine.

Today we are previewing a proxy that provides compatibility with the DynamoDB SDK, allowing existing applications to read/write data to DataStax Enterprise (DSE) or Cassandra without any code changes. It also provides the hybrid + multi-model + scalability benefits of Cassandra to DynamoDB users.

If you’re just here for the code you can find it in GitHub and DataStax Labs:

Possible Scenarios

Application Lifecycle Management: Many customers develop on premises and then deploy to the cloud for production. The proxy enables customers to run their existing DynamoDB applications using Cassandra clusters on-prem.

Hybrid Deployments: DynamoDB Streams can be used to enable hybrid workload management and transfers from DynamoDB cloud deployments to on-prem Cassandra-proxied deployments. This is supported in the current implementation and, like DynamoDB Global Tables, it uses DynamoDB Streams to move the data. For hybrid transfer to DynamoDB, check out the Cassandra CDC improvements which could be leveraged and stay tuned to the DataStax blog for updates on our Change Data Capture (CDC) capabilities.

What’s in the Proxy?

The proxy is designed to enable users to back their DynamoDB applications with Cassandra. We determined that the best way to help users leverage this new tool and to help it flourish was to make it an open source Apache 2 licensed project.

The code consists of a scalable proxy layer that sits between your app and the database. It provides compatibility with the DynamoDB SDK which allows existing DynamoDB applications to read and write data to Cassandra without application changes.

How It Works

A few design decisions were made when designing the proxy. As always, these are in line with the design principles that we use to guide development for both Cassandra and our DataStax Enterprise product.

Why a Separate Process?

We could have built this as a Cassandra plugin that would execute as part of the core process but we decided to build it as a separate process for the following reasons:

  1. Ability to scale the proxy independently of Cassandra
  2. Ability to leverage k8s / cloud-native tooling
  3. Developer agility and to attract contributors—developers can work on the proxy with limited knowledge of Cassandra internals
  4. Independent release cadence, not tied to the Apache Cassandra project
  5. Better AWS integration story for stateless apps (i.e., leverage CloudWatch alarm, autoscaling, etc.)

Why Pluggable Persistence?

On quick inspection, DynamoDB’s data model is quite simple. It consists of a hash key, a sort key, and a JSON structure which is referred to as an item. Depending on your goals, the DynamoDB data model can be persisted in Cassandra Query Language (CQL) in different ways. To allow for experimentation and pluggability, we have built the translation layer in a pluggable way that allows for different translators. We continue to build on this scaffolding to test out multiple data models and determine which are best suited for:

  1. Different workloads
  2. Different support for consistency / linearization requirements
  3. Different performance tradeoffs based on SLAs


If you have any interest in running DynamoDB workloads on Cassandra, take a look at the project. Getting started is easy and spelled out in the readme and DynamoDB sections. Features supported by the proxy are quickly increasing and collaborators are welcome.

All product and company names are trademarks or registered trademarks of their respective owner. Use of these trademarks does not imply any affiliation with or endorsement by the trademark owner.

1Often in the DynamoDB documentation, this key is referred to as a partition key, but since these are not one-to-one with DynamoDB partitions we will use the term hash key instead.

Why Developing Modern Applications Is Getting Easier

Historically, software was monolithic. In most cases, development teams would have to rewrite or rebuild an entire application to fix a bug or add a new feature. Building applications with any sense of speed or agility was largely out of the question, which is why software suites like Microsoft Office were generally released once a year.

Much has changed over the last decade or so. In the age of lightning-fast networks and instant gratification, leading software development teams are adopting DevOps workflows and prioritizing CI/CD so they can pump out stronger software releases much faster and much more frequently. 

Monthly, weekly, or even more frequent releases, for example, are becoming something closer to the norm. 

This accelerated release process is the result of the fact that—over several years—it’s become much easier to develop applications. 

Today, many engineering teams are utilizing new technologies to build better applications in less time, developing software with agility. Let’s take a look at four of the key technologies that have largely transformed the development process in recent years. 

1. Microservices

Microservices enable development teams to build applications that—you guessed it—are made up of several smaller services. 

Compared to the old-school monolithic approach, microservices speed up the development process considerably. Engineers can scale microservices independently of one another; updating or adding a feature no longer requires an entire rewrite of an application. 

Beyond that, microservices also bring more flexibility to developers. For example, developers can use their language of choice, building one service in Java and another in Node.js. 

The speed, flexibility, and agility microservices bring to the table have made it much easier to develop modern applications. Add it all up, and it comes as no surprise that a recent survey found that 91 percent of companies are using or plan to use microservices today.

2. Containers

Containers (think Docker) go hand-in-hand with microservices. Using containers, developers can create, deploy, and run applications in any environment.

At a very basic level, containers let developers “package” an application’s code and dependencies together as one unit. Once that package has been created, it can quickly be moved from a container to a laptop to a virtual server and back again. Containers enable developers to start, create, copy, and spin down applications rapidly.

It’s even easier to build modern applications with containers when you use Kubernetes to manage containerized workloads and services.

3. Open source tools

Docker and Kubernetes are both open source. So are Apache Cassandra™, Prometheus, and Grafana. There’s also Jenkins, too, which helps developers accelerate CI/CD workflows. With Jenkins, engineering teams can use automation to safely build, test, and deploy code changes, making it easier to integrate new features into any project.

Open source tools simplify the development process considerably. With open source, engineering teams get access to proven technologies that are built collaboratively by developers around the world to improve the coding process.

Not only does open source provide access to these tools, popular open source projects also have robust user communities that developers can turn to when they get stuck on something. 

4. Hybrid cloud

More and more companies are building applications in hybrid cloud environments because it enables them to leverage the best of what both the public and private cloud have to offer. 

For example, with hybrid cloud, you get the scalability of the public cloud while being able to use on-premises or private cloud resources to keep sensitive data secure (e.g., for HIPAA or GDPR compliance). What’s more, hybrid cloud also increases availability. In the event one provider gets knocked offline, application performance remains unchanged—so long as you have the right database in place.

The same sentiment holds true for multi-cloud or intercloud environments where organizations use several different cloud vendors to take advantage of each of their strengths, avoid vendor lock-in, or reduce the risk of service disruption. 

How does your development process compare?

If you’re not using microservices, containers, open source tools, and hybrid cloud environments to build applications, it’s time to reconsider your approach. 

The rise of these new technologies has given development teams the ability to pivot at a moment’s notice, incorporating user feedback to build new features and respond to incidents quickly and effectively.

Give them a try. It’s only a matter of time before you’ll start wondering why you didn’t think of it sooner.

Four Key Technologies That Enable Microservices (white paper)