Implementing the Netflix Media Database

In the previous blog posts in this series, we introduced the Netflix Media DataBase (NMDB) and its salient “Media Document” data model. In this post we will provide details of the NMDB system architecture beginning with the system requirements — these will serve as the necessary motivation for the architectural choices we made. A fundamental requirement for any lasting data system is that it should scale along with the growth of the business applications it wishes to serve. NMDB is built to be a highly scalable, multi-tenant, media metadata system that can serve a high volume of write/read throughput as well as support near real-time queries. At any given time there could be several applications that are trying to persist data about a media asset (e.g., image, video, audio, subtitles) and/or trying to harness that data to solve a business problem.

Some of the essential elements of such a data system are (a) reliability and availability — under varying load conditions as well as a wide variety of access patterns; (b) scalability — persisting and serving large volumes of media metadata and scaling in the face of bursty requests to serve critical backend systems like media encoding, (c) extensibility — supporting a demanding list of features with a growing list of Netflix business use cases, and (d) consistency — data access semantics that guarantee repeatable data read behavior for client applications. The following section enumerates the key traits of NMDB and how the design aims to address them.

System Requirements

Support for Structured Data

The growth of NoSQL databases has broadly been accompanied with the trend of data “schemalessness” (e.g., key value stores generally allow storing any data under a key). A schemaless system appears less imposing for application developers that are producing the data, as it (a) spares them from the burden of planning and future-proofing the structure of their data and, (b) enables them to evolve data formats with ease and to their liking. However, schemas are implicit in a schemaless system as the code that reads the data needs to account for the structure and the variations in the data (“schema-on-read”). This places a burden on applications that wish to consume that supposed treasure trove of data and can lead to strong coupling between the system that writes the data and the applications that consume it. For this reason, we have implemented NMDB as a “schema-on-write” system — data is validated against schema at the time of writing to NMDB. This provides several benefits including (a) schema is akin to an API contract, and multiple applications can benefit from a well defined contract, (b) data has a uniform appearance and is amenable to defining queries, as well as Extract, Transform and Load (ETL) jobs, (c) facilitates better data interoperability across myriad applications and, (d) optimizes storage, indexing and query performance thereby improving Quality of Service (QoS). Furthermore, this facilitates high data read throughputs as we do away with complex application logic at the time of reading data.

A critical component of a “schema-on-write” system is the module that ensures sanctity of the input data. Within the NMDB system, Media Data Validation Service (MDVS), is the component that makes sure the data being written to NMDB is in compliance with an aforementioned schema. MDVS also serves as the storehouse and the manager for the data schema itself. As was noted in the previous post, data schema could itself evolve over time, but all the data, ingested hitherto, has to remain compliant with the latest schema. MDVS ensures this by applying meticulous treatment to schema modification ensuring that any schema updates are fully compatible with the data already in the system.

Multi-tenancy and Access Control

We envision NMDB as a system that helps foster innovation in different areas of Netflix business. Media data analyses created by an application developed by one team could be used by another application developed by another team without friction. This makes multi-tenancy as well as access control of data important problems to solve. All NMDB APIs are authenticated (AuthN) so that the identity of an accessing application is known up front. Furthermore, NMDB applies authorization (AuthZ) filters that whitelists applications or users for certain actions, e.g., a user or application could be whitelisted for read/write/query or a more restrictive read-only access to a certain media metadata.

In NMDB we think of the media metadata universe in units of “DataStores”. A specific media analysis that has been performed on various media assets (e.g., loudness analysis for all audio files) would be typically stored within the same DataStore (DS). while different types of media analyses (e.g., video shot boundary and video face detection) for the same media asset typically would be persisted in different DataStores. A DS helps us achieve two very important purposes (a) serves as a logical namespace for the same media analysis for various media assets in the Netflix catalog, and (b) serves as a unit of access control — an application (or equivalently a team) that defines a DataStore also configures access permissions to the data. Additionally, as was described in the previous blog article, every DS is associated with a schema for the data it stores. As such, a DS is characterized by the three-tuple (1) a namespace, (2) a media analysis type (e.g., video shot boundary data), and (3) a version of the media analysis type (different versions of a media analysis correspond to different data schemas). This is depicted in Figure 1.

Figure 1: NMDB DataStore semantics

We have chosen the namespace portion of a DS definition to correspond to an LDAP group name. NMDB uses this to bootstrap the self-servicing process, wherein members of the LDAP group are granted “admin” privileges and may perform various operations (like creating a DS, deleting a DS) and managing access control policies (like adding/removing “writers” and “readers”). This allows for a seamless self-service process for creating and managing a DS. The notion of a DS is thus key to the ways we support multi-tenancy and fine grained access control.

Integration with other Netflix Systems

In the Netflix microservices environment, different business applications serve as the system of record for different media assets. For example, while playable media assets such as video, audio and subtitles for a title could be managed by a “playback service”, promotional assets such as images or video trailers could be managed by a “promotions service”. NMDB introduces the concept of a “MediaID” (MID) to facilitate integration with these disparate asset management systems. We think of MID as a foreign key that points to a Media Document instance in NMDB. Multiple applications can bring their domain specific identifiers/keys to address a Media Document instance in NMDB. We implement MID as a map from strings to strings. Just like the media data schema, an NMDB DS is also associated with a single MID schema. However unlike the media data schema, MID schema is immutable. At the time of the DS definition, a client application could define a set of (name, value) pairs against which all of the Media Document instances would be stored in that DS. A MID handle could be used to fetch documents within a DS in NMDB, offering convenient access to the most recent or all documents for a particular media asset.

SLA Guarantees

NMDB serves different logically tiered business applications some of which are deemed to be more business critical than others. The Netflix media transcoding sub-system is an example of a business critical application. Applications within this sub-system have stringent consistency, durability and availability needs as a large swarm of microservices are at work generating content for our customers. A failure to serve data with low latency would stall multiple pipelines potentially manifesting as a knock-on impact on secondary backend services. These business requirements motivated us to incorporate immutability and read-after-write consistency as fundamental precepts while persisting data in NMDB.

We have chosen the high data capacity and high performance Cassandra (C*) database as the backend implementation that serves as the source of truth for all our data. A front-end service, known as Media Data Persistence Service (MDPS), manages the C* backend and serves data at blazing speeds (latency in the order of a few tens of milliseconds) to power these business critical applications. MDPS uses local quorum for reads and writes to guarantee read-after-write consistency. Data immutability helps us sidestep any conflict issues that might arise from concurrent updates to C* while allowing us to perform IO operations at a very fast clip. We use a UUID as the primary key for C*, thus giving every write operation (a MID + a Media Document instance) a unique key and thereby avoiding write conflicts when multiple documents are persisted against the same MID. This UUID (also called as DocumentID) also serves as the primary key for the Media Document instance in the context of the overall NMDB system. We will touch upon immutability again in later sections to show how we also benefited from it in some other design aspects of NMDB.

Flexibility of Queries

The pivotal benefit of data modeling and a “schema-on-write” system is query-ability. Technical metadata residing in NMDB is invaluable to develop new business insights in the areas of content recommendations, title promotion, machine assisted content quality control (QC), as well as user experience innovations. One of the primary purposes of NMDB is that it can serve as a data warehouse. This brings the need for indexing the data and making it available for queries, without a priori knowledge of all possible query patterns.

In principle, a graph database can answer arbitrary queries and promises optimal query performance for joins. For that reason, we explored a graph-like data-model so as to address our query use cases. However, we quickly learnt that our primary use case, which is spatio-temporal queries on the media timeline, made limited use of database joins. And in those queries, where joins were used, the degree of connectedness was small. In other words the power of graph-like model was underutilized. We concluded that for the limited join query use-cases, application side joins might provide satisfactory performance and could be handled by an application we called Media Data Query Service (MDQS). Further, another pattern of queries emerged — searching unstructured textual data e.g., mining movie scripts data and subtitle search. It became clear to us that a document database with search capabilities would address most of our requirements such as allowing a plurality of metadata, fast paced algorithm development, serving unstructured queries and also structured queries even when the query patterns are not known a priori.

Elasticsearch (ES), a highly performant scalable document database implementation fitted our needs really well. ES supports a wide range of possibilities for queries and in particular shines at unstructured textual search e.g., searching for a culturally sensitive word in a subtitle asset that needs searching based on a stem of the word. At its core ES uses Lucene — a powerful and feature rich indexing and searching engine. A front-end service, known as Media Data Analysis Service (MDAS), manages the NMDB ES backend for write and query operations. MDAS implements several optimizations for answering queries and indexing data to meet the demands of storing documents that have varying characteristics and sizes. This is described more in-depth later in this article.

A Data System from Databases

As indicated above, business requirements mandated that NMDB be implemented as a system with multiple microservices that manage a polyglot of DataBases (DBs). The different constituent DBs serve complementary purposes. We are however presented with the challenge of keeping the data consistent across them in the face of the classic distributed systems shortcomings — sometimes the dependency services can fail, sometimes service nodes can go down or even more nodes added to meet a bursty demand. This motivates the need for a robust orchestration service that can (a) maintain and execute a state machine, (b) retry operations in the event of transient failures, and (c) support asynchronous (possibly long running) operations such as queries. We use the Conductor orchestration framework to coordinate and execute workflows related to the NMDB Create, Read, Update, Delete (CRUD) operations and for other asynchronous operations such as querying. Conductor helps us achieve a high degree of service availability and data consistency across different storage backends. However, given the collection of systems and services that work in unison it is not possible to provide strong guarantees on data consistency and yet remain highly available for certain use cases, implying data read skews are not entirely avoidable. This is true in particular for query APIs — these rely on successful indexing of Media Document instances which is done as an asynchronous, background operation in ES. Hence queries on NMDB are expected to be eventually consistent.

Figure 2: Block diagram of NMDB system

Figure 2 shows the NMDB system block diagram. A front end service that shares its name with the NMDB system serves as the gateway to all CRUD and query operations. Read APIs are performed synchronously while write and long running query APIs are managed asynchronously through Conductor workflows. Circling back to the point of data immutability that was discussed previously — another one of its benefits is that it preserves all writes that could occur e.g., when a client or the Conductor framework retries a write perhaps because of transient connection issues. While this does add to data footprint but the benefits such as (a) allowing for lockless retries, (b) eliminating the need for resolving write conflicts and © mitigating data loss, far outweigh the storage costs.

Included in Figure 2 is a component named Object Store that is a part of the NMDB data infrastructure. Object Store is a highly available, web-scale, secure storage service such as Amazon’s Simple Storage Service (S3). This component ensures that all data being persisted is chunked and encrypted for optimal performance. It is used in both write and read paths. This component serves as the primary means for exchanging Media Document instances between the various components of NMDB. Media Document instances can be large in size (several hundreds of MBs — perhaps because a media analysis could model metadata e.g., about every frame in a video file. Further, the per frame data could explode in size due to some modeling of spatial attributes such as bounding boxes). Such a mechanism optimizes bandwidth and latency performance by ensuring that Media Document instances do not have to travel over the wire between the different microservices involved in the read or the write path and can be downloaded only where necessary.

NMDB in Action

While the previous sections discussed the key architectural traits, in this section we dive deeper into the NMDB implementation.

Writing data into NMDB

Figure 3: Writing a Media Document Instance to NMDB

The animation shown in Figure 3 details the machinery that is set in action when we write into NMDB. The write process begins with a client application that communicates its intent to write a Media Document instance. NMDB accepts the write request by submitting the job to the orchestration framework (Conductor) and returns a unique handle to identify the request. This could be used by the client to query on the status of the request. Following this, the schema validation, document persistence and document indexing steps are performed in that order. Once the document is persisted in C* it becomes available for read with strong consistency guarantees and is ready to be used by read-only applications. Indexing a document into ES can be a high latency operation since it is a relatively more intensive procedure that requires multiple processes coordinating to analyze the document contents, and update several data structures that enable efficient search and queries.

Also, noteworthy is the use of an Object store to optimize IO across service components (as was discussed earlier). NMDB leverages a cloud storage service (e.g., AWS S3 service) to which a client first uploads the Media Document instance data. For each write request to NMDB, NMDB generates a Type-IV UUID that is used to compose a key. The key in turn is used to compose a unique URL to which the client uploads the data it wishes to write into NMDB. This URL is then passed around as a reference for the Media Document instance data.

Scaling Strategies

From the perspective of writing to NMDB, some of the NMDB components are compute heavy while some others are IO heavy. For example, the bottle neck for MDVS is CPU as well as memory (as it needs to work with large documents for validation). On the other hand MDAS is bound by network IO as well (Media Document instances need to be downloaded from NMDB Object Store to MDAS so that they can be indexed). Different metrics can be used to configure a continuous deployment platform, such as Spinnaker for load balancing and auto-scaling for NMDB. For example, “requests-per-second” (RPS) is commonly used to auto-scale micro services to serve increased reads or queries. While RPS or CPU usage could be useful metrics for scaling synchronous services, asynchronous APIs (like storing a document in NMDB) bring in the requirement of monitoring queue depth to anticipate work build up and scale accordingly.

Figure 4: Scaling the NMDB service plane

The strategy discussed above gives us a good way to auto-scale the NMDB micro services layer (identified as “Service Plane” in Figure 4) quasi-linearly. However as seen in Figure 4, the steady state RPS that the system can support eventually plateaus at which point scaling the Service Plane does not help improve SLA. At this point it should be amply clear that the data nodes (identified as “Data Backend”) have reached their peak performance limits and need to be scaled. However, distributed DBs do not scale as quickly as services and horizontal or vertical scaling may take a few hours to days, depending on data footprint size. Moreover, while scaling the Service Plane can be an automated process, adding more data nodes (C* or ES) to scale the Data Backend is typically done manually. However, note that once the Data Backend is scaled up (horizontal and/or vertically), the effects of scaling the Service Plane manifests as an increased steady state RPS as seen in Figure 4.

An important point related to scaling data nodes, which is worth mentioning is the key hashing strategy that each DB implements. C* employs consistent key hashing and hence adding a node distributes the data uniformly across nodes. However, ES deploys a modulus based distributed hashing. Here adding a data node improves distribution of shards across the available nodes, which does help alleviate query/write bottlenecks to an extent. However, as the size of shards grow over time, horizontal scaling might not help improve query/write performance as shown in Figure 5.

Figure 5: ES scaling strategy

ES mandates choosing the number of shards for every index at the time of creating an index, which cannot be modified without going through a reindexing step which is expensive and time consuming for large amounts of data. A fixed pre-configured shard size strategy could be used for timed data such as logs, where new shards could be created while older shards are discarded. However, this strategy cannot be employed by NMDB since multiple business critical applications could be using the data, in other words data in NMDB needs to be durable and may not ever be discarded. However, as discussed above large shard sizes affect query performance adversely. This calls for some application level management for relocating shards into multiple indices as shown in Figure 6.

Figure 6: Creating new ES indices over time

Accordingly, once an index grows beyond a threshold, MDAS creates a different index for the same NMDB DS, thereby allowing indices to grow over time and yet keeping the shard size within a bound for optimal write/query performance. ES has a feature called index aliasing that is particularly helpful for alleviating performance degradation that is caused due to large shard sizes which is suitable for the scenario we explained. An index alias could point to multiple indices and serve queries by aggregating search results across all the indices within the alias.

Indexing Data in NMDB at Scale

A single Media Document instance could be large ranging from hundreds of MBs to several GBs. Many document databases (including ES) have a limit on the size of a document after which DB performance degrades significantly. Indexing large documents can present other challenges on a data system such as requiring high network I/O connections, increased computation and memory costs, high indexing latencies as well as other adverse effects.

In principle, we could apply the ES parent-child relationship at the various levels of the Media Document hierarchy and split up a Media Document instance into several smaller ES documents. However, the ES parent-child relationship is a two-level relationship and query performance suffers when multiple such relationships are chained together to represent a deeply nested model (the NMDB Media Document model exhibits upto five levels of nesting). Alternately, we could consider modeling it as a two-level relationship with the high cardinality entities (“Event” and “Region”) on the “child” side of the relationship. However, Media Document could contain a huge number of “Event” and “Region” entities (hundreds of thousands of Events and tens of Regions per Event are typical for an hour of content) which would result in a very large number of child documents. This could also adversely impact query performance.

To address these opposing limitations, we came up with the idea of using “data denormalization. Adopting this needs more thought since data denormalization can potentially lead to data explosion. Through a process referred to as “chunking”, we split up large document payloads into multiple smaller documents prior to indexing them in ES. The smaller chunked documents could be indexed by using multiple threads of computation (on a single service node) or multiple service nodes — this results in better workload distribution, efficient memory usage, avoids hot spots and improves indexing latencies (because we are processing smaller chunks of data concurrently). We utilized this approach simultaneously with some careful decisions around what data we denormalize in order to provide optimal indexing and querying performance. More details of our implementation are presented as follows.

Chunking Media Document Instances

The hierarchical nature of the Media Document model (as explained in the previous blog post) requires careful consideration while chunking as it contains relationships between its entities. Figure 7 depicts the pre-processing we perform on a Media Document instance prior to indexing it in ES.

Figure 7: An efficient strategy for indexing Media Document Instances in ES
  • Each Media Document instance is evenly split into multiple chunks with smaller size (of the order of a few MBs).
  • Asset, Track and Component level information is denormalized across all the chunks and a parent document per chunk with this information is indexed in ES. This denormalization of parent document across different chunks also helps us to overcome a major limitation with ES parent-child relationship, that is the parent document and all the children documents must belong to same shard.
  • At the level of an event, data is denormalized across all the regions and a child document per region is indexed in ES.

This architecture allows distribution of Media Document instances across multiple nodes and speeds up indexing as well as query performance. At query time, MDAS uses a combination of different strategies depending on the query patterns for serving queries efficiently

  • ES parent-child join queries are used to speed up query performance where needed.
  • In another query pattern, the parent documents are queried followed by children documents and application side joins are performed in MDAS to create search results.

Serving Queries & Analytics

As noted earlier, NMDB has a treasure trove of indexed media metadata and lots of interesting insight could be developed by analyzing it. The MDAS backend with ES forms the backbone of analytical capabilities of NMDB. In a typical analytics usage, NMDB users are interested in two types of queries:

  1. A DS level query to retrieve all documents that match the specified query. This is similar to filtering of records using SQL ‘WHERE’ clause. Filtering can be done on any of the entities in a Media Document instance using various condition operators ‘=’ , ‘>’ or ‘<’ etc. Conditions can also be grouped using logic operators like OR, AND or NOT etc.
  2. A more targeted query on a Media Document instance using a Document ID handle to retrieve specific portions of the document. In this query type, users can apply conditional filtering on each of the entities of a Media Document instance and retrieve matching entities.

The two query types target different use cases. Queries of the first type span an entire NMDB DS and can provide insights into which documents in a DS match the specified query. Considering the huge payload of data corresponding to Media Document instances that match a query of the first type, NMDB only returns the coordinates (DocumentID and MID) of the matching documents. The second query type can be used to target a specific Media Document instance using DocumentID and retrieve portions of the document with conditional filtering applied. For example, only a set of events that satisfy a specified query could be retrieved, along with Track and Component level metadata. While it is typical to use the two types of queries in succession, in the event where a document handle is already known one could glean more insights into the data by directly executing the second query type on a specific Media Document instance.

As explained earlier, chunking Media Document instances at the time of indexing comes very handy in optimizing queries. Since relationships between the different entities of a Media Document instance are preserved, cross-entity queries can be handled at the ES layer. For example, a Track can be filtered out based on the number of Events it contains or if it contains Events matching the specified query. The indexing strategy as explained earlier can be contrasted with the nested document approach of ES. Indexing Event and Region level information as children documents helps us output the search results more efficiently.

What’s next

As explained in the previous blog post, the Media Document model has a hierarchical structure and offers a logical way of modeling media timeline data. However, such a hierarchical structure is not optimal for parallel processing. In particular validation (MDVS) and indexing (MDAS) services could benefit immensely by processing a large Media Document instance in parallel thereby reducing write latencies. A compositional structure for Media Document instances would be more amenable to parallel processing and therefore go a long way in alleviating the challenges posed by large Media Document instances. Briefly, such a structure implies a single media timeline is composed of multiple “smaller” media timelines, where each media timeline is represented by a corresponding “smaller” Media Document instance. Such a model would also enable targeted reads that do not require reading the entire Media Document instance.

On the query side, we anticipate a growing need for performing joins across different NMDB DataStores — this could be computationally intensive in some scenarios. This along with the high storage costs associated with ES is motivating us to look for other “big-data” storage solutions. As NMDB continues to be the media metadata platform of choice for applications across Netflix, we will continue to carefully consider new use cases that might need to be supported and evaluate technologies that we will need to onboard to address them. Some interesting areas of future work could involve exploring Map-Reduce frameworks such as Apache Hadoop, for distributed compute, query processing, relational databases for their transactional support, and other Big Data technologies. Opportunities abound in the area of media-oriented data systems at Netflix especially with the anticipated growth in business applications and associated data.

— by Shinjan Tiwary, Sreeram Chakrovorthy, Subbu Venkatrav, Arsen Kostenko, Yi Guo and Rohit Puri

Implementing the Netflix Media Database was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Going Head-to-Head: Scylla vs Amazon DynamoDB

Going Head-to-Head: Scylla vs Amazon DynamoDB

“And now for our main event! Ladies and gentlemen, in this corner, weighing in at 34% of the cloud infrastructure market, the reigning champion and leader of the public cloud…. Amazon!” Amazon has unparalleled expertise at maximizing scalability and availability for a vast array of customers using a plethora of software products. While Amazon offers software products like DynamoDB, it’s database-as-a-service is only one of their many offerings.

“In the other corner is today’s challenger — young, lightning quick and boasting low-level Big Data expertise… ScyllaDB!” Unlike Amazon, our company focuses exclusively on creating the best database for distributed data solutions.

A head-to-head database battle between Scylla and DynamoDB is a real David versus Goliath situation. It’s Rocky Balboa versus Apollo Creed. Is it possible Scylla could deliver an unexpected knockout punch against DynamoDB? [SPOILER ALERT: Our results will show Scylla has 1/4th the latencies and is only 1/7th the cost of DynamoDB — and this is in the most optimized case for Dynamo. Watch closely as things go south for Dynamo in Round 6. Please keep reading to see how diligent we were in creating a fair test case and other surprise outcomes from our benchmark battle royale.]

To be clear, Scylla is not a competitor to AWS at all. Many of our customers deploy Scylla to AWS, we ourselves find it to be an outstanding platform, and on more than one occasion we’ve blogged about its unique bare metal instances. Here’s further validation — our Scylla Cloud service runs on top of AWS. But we do think we might know a bit more about building a real-time big data database, so we limited the scope of this competitive challenge solely to Scylla versus DynamoDB, database-to-database.

Scylla is a drop-in replacement for Cassandra, implemented from scratch in C++. Cassandra itself was a reimplementation of concepts from the Dynamo paper. So, in a way, Scylla is the “granddaughter” of Dynamo. That means this is a family fight, where a younger generation rises to challenge an older one. It was inevitable for us to compare ourselves against our “grandfather,” and perfectly in keeping with the traditions of Greek mythology behind our name.

If you compare Scylla and Dynamo, each has pros and cons, but they share a common class of NoSQL database: Column family with wide rows and tunable consistency. Dynamo and its Google counterpart, Bigtable, were the first movers in this market and opened up the field of massively scalable services — very impressive by all means.

Scylla is much younger opponent, just 4.5 years in age. Though Scylla is modeled on Cassandra, Cassandra was never our end goal, only a starting point. While we stand on the shoulders of giants in terms of existing design, our proven system programing abilities have come heavily into play and led to performance to the level of a million operations per second per server. We recently announced feature parity (minus transactions) with Cassandra, and also our own database-as-a-service offering, Scylla Cloud.

But for now we’ll focus on the question of the day: Can we take on DynamoDB?

Rules of the Game

With our open source roots, our culture forces us to be fair as possible. So we picked a reasonable benchmark scenario that’s supposed to mimic the requirements of a real application and we will judge the two databases from the user perspective. For the benchmark we used Yahoo! Cloud Serving Benchmark (YCSB) since it’s a cross-platform tool and an industry standard. The goal was to meet a Service Level Agreement of 120K operations per second with a 50:50 read/write split (YCSB’s workload A) with a latency under 10ms in the 99% percentile. Each database would provision the minimal amount of resources/money to meet this goal. Each DB should be populated first with 1 billion rows using the default, 10 column schema of YCSB.

We conducted our tests using Amazon DynamoDB and Amazon Web Services EC2 instances as loaders. Scylla also used Amazon Web Services EC2 instances for servers, monitoring tools and the loaders.

These tests were conducted on Scylla Open Source 2.1, which is the code base for Scylla Enterprise 2018.1. Thus performance results for these tests will hold true across both Open Source and Enterprise. However, we use Scylla Enterprise for comparing Total Cost of Ownership

DynamoDB is known to be tricky when the data distribution isn’t uniform, so we selected uniform distribution to test Dynamo within its sweet spot. We set 3 nodes of i3.8xl for Scylla, with replication of 3 and quorum consistency level, loaded the 1 TB dataset (replicated 3 times) and after 2.5 hours it was over, waiting for the test to begin.

Scylla Enterprise Amazon DynamoDB
Scylla Cluster
  • i3.8xlarge | 32 vCPU | 244 GiB | 4 x 1.9TB NVMe
  • 3-node cluster on single DC | RF=3
  • Dataset: ~1.1TB (1B partitions / size: ~1.1Kb)
  • Total used storage: ~3.3TB
Provisioned Capacity
  • 160K write | 80K read (strong consistency)
  • Dataset: ~1.1TB (1B partitions / size: ~1.1Kb)
  • Storage size: ~1.1 TB (DynamoDB table metrics)
  • Workload-A: 90 min, using 8 YCSB clients, every client runs on its own data range (125M partitions)
  • Loaders: 4 x m4.2xlarge (8 vCPU | 32 GiB RAM), 2 loaders per machine
  • Scylla workloads runs with Consistency Level = QUORUM for writes and reads.
  • Scylla starts with a cold cache in all workloads.
  • DynamoDB workloads ran with dynamodb.consistentReads = true
  • Sadly for DynamoDB, each item weighted 1.1kb – YCSB default schema, thus each write originated in two accesses

Let the Games Begin!

We started to populate Dynamo with the dataset. However, not so fast..

High Rate of InternetServerError

Turns out the population stage is hard on DynamoDB. We had to slow down the population rate time and again, despite it being well within the reserved IOPS. Sometimes we managed to populate up to 0.5 billion rows before we started to receive the errors again.

Each time we had to start over to make sure the entire dataset was saved. We believe DynamoDB needs to break its 10GB partitions through the population and cannot do it in parallel to additional load without any errors. The gory details:

  • Started population with Provisioned capacity: 180K WR | 120K RD.
    • ⚠ We hit errors on ~50% of the YCSB threads causing them to die when using ≥50% of write provisioned capacity.
    • For example, it happened when we ran with the following throughputs:
      • 55 threads per YCSB client = ~140K throughput (78% used capacity)
      • 45 threads per YCSB client = ~130K throughput (72% used capacity)
      • 35 threads per YCSB client = ~96K throughput (54% used capacity)

After multiple attempts with various provisioned capacities and throughputs, eventually a streaming rate was found that permitted a complete database population. Here are the results of the population stage:

YCSB Workload / Description Scylla Open Source 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
100% Write

1B partitions (~1.1Kb)


Overall Throughput(ops/sec): 104K
Avg Load (scylla-server): ~85%

INSERT operations (Avg): 125M
Avg. 95th Percentile Latency (ms): 8.4
Avg. 99th Percentile Latency (ms): 11.3
Overall Throughput(ops/sec): 51.7K
Max Consumed capacity: WR 75%

INSERT operations (Avg): 125M
Avg. 95th Percentile Latency (ms): 7.5
Avg. 99th Percentile Latency (ms): 11.6

Scylla completed the population at twice the speed but more importantly, worked out of the box without any errors or pitfalls.

YCSB Workload A, Uniform Distribution

Finally, we began the main test, the one that gauges our potential user workload with an SLA of 120,000 operations. This scenario is supposed to be DynamoDB’s sweet spot. The partitions are well balanced and the load isn’t too high for DynamoDB to handle. Let’s see the results:

YCSB Workload /
Scylla Open Source 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
Workload A
50% Read / 50% Write

1B partitions (~1.1Kb)


Duration: 90 min.
Overall Throughput(ops/sec): 119.1K
Avg Load (scylla-server): ~58%

READ operations (Avg): ~39.93M
Avg. 95th Percentile Latency (ms): 5.0
Avg. 99th Percentile Latency (ms): 7.2

UPDATE operations (Avg): ~39.93M
Avg. 95th Percentile Latency (ms): 3.4
Avg. 99th Percentile Latency (ms): 5.6
Overall Throughput(ops/sec): 120.1K
Avg Load (scylla-server): ~WR 76% | RD 76%

READ operations (Avg): ~40.53M
Avg. 95th Percentile Latency (ms): 12.0
Avg. 99th Percentile Latency (ms): 18.6

UPDATE operations (Avg): ~40.53M
Avg. 95th Percentile Latency (ms): 13.2
Avg. 99th Percentile Latency (ms): 20.2

After all the effort of loading the data, DynamoDB was finally able to demonstrate its value. DynamoDB met the throughput SLA (120k OPS). However, it failed to meet the latency SLA of 10ms for 99%, but after the population difficulties we were happy to get to this point.

Scylla on the other hand, easily met the throughput SLA, with only 58% load and latency. That was 3x-4x better than DynamoDB and well below our requested SLA. (Also, what you don’t see here is the huge cost difference, but we’ll get to that in a bit.)

We won’t let DynamoDB off easy, however. Now that we’ve seen how DynamoDB performs with its ideal uniform distribution, let’s have a look at how it behaves with a real life use-case.

Real Life Use-case: Zipfian Distribution

A good schema design goal is to have the perfect, uniform distribution of your primary keys. However, in real life, some keys are accessed more than others. For example, it’s common practice to use UUID for the customer or the product ID and to look them up. Some of the customers will be more active than others and some products will be more popular than others, so the differences in access times can go up to 10x-1000x. Developers cannot improve the situation in the general case since if you add an additional column to the primary key in order to improve the distribution, you may improve the specific access but at the cost of complexity when you retrieve the full information about the product/customer. 

Keep in mind what you store in a database. It’s data such as how many people use Quora or how many likes NBA teams have:

With that in mind, let’s see how ScyllaDB and DynamoDB behave given a Zipfian distribution access pattern. We went back to the test case of 1 billion keys spanning 1TB of pre-replicated dataset and queried it again using YCSB Zipfian accesses. It is possible to define the hot set of partitions in terms of volume — how much data is in it — and define the percentile of access for this hot set as part from the overall 1TB set.

We set a variety of parameters for the hot set and the results were pretty consistent – DynamoDB could not meet the SLA for Zipfian distribution. It performed well below its reserved capacity — only 42% utilization — but it could not execute 120k OPS. In fact, it could do only 65k OPS. The YCSB client experienced multiple, recurring ProvisionedThroughputExceededException (code: 400) errors, and throttling was imposed by DynamoDB.

YCSB Workload /
Scylla 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
Workload A
50% Read / 50% Write

1B partitions

Distribution: Zipfian

Duration: 90 min.

Hot set: 10K partitions
Hot set access: 90%
Overall Throughput(ops/sec): 120.2K
Avg Load (scylla-server): ~55%

READ operations (Avg): ~40.56M
Avg. 95th Percentile Latency (ms): 6.1
Avg. 99th Percentile Latency (ms): 8.6

UPDATE operations (Avg): ~40.56M
Avg. 95th Percentile Latency (ms): 4.4
Avg. 99th Percentile Latency (ms): 6.6
Overall Throughput(ops/sec): 65K
Avg Load (scylla-server): ~WR 42% | RD 42%

READ operations (Avg): ~21.95M
Avg. 95th Percentile Latency (ms): 6.0
Avg. 99th Percentile Latency (ms): 9.2

UPDATE operations (Avg): ~21.95M
Avg. 95th Percentile Latency (ms): 7.3
Avg. 99th Percentile Latency (ms): 10.8

Why can’t DynamoDB meet the SLA in this case? The answer lies within the Dynamo model. The global reservation is divided to multiple partitions, each no more than 10TB in size.

DynamoDB partition equations

This when such a partition is accessed more often it may reach its throttling cap even though overall you’re well within your global reservation. In the example above, when reserving 200 writes, each of the 10 partitions cannot be queried more than 20 writes/s

The Dress that Broke DynamoDB

If you asked yourself, “Hmmm, is 42% utilization the worst I’d see from DynamoDB?” we’re afraid we have some bad news for you. Remember the dress that broke the internet? What if you have an item in your database that becomes extremely hot? To explore this, we tested a single hot partition access and compared it.

The Dress that Broke the Internet

We ran a single YCSB, working on a single partition on a 110MB dataset (100K partitions). During our tests, we observed a DynamoDB limitation when a specific partition key exceeded 3000 read capacity units (RCU) and/or 1000 write capacity units (WCU).

Even when using only ~0.6% of the provisioned capacity (857 OPS), the YCSB client experienced ProvisionedThroughputExceededException (code: 400) errors, and throttling was imposed by DynamoDB (see screenshots below).

It’s not that we recommend not planning for the best data model. However, there will always be cases when your plan is far from reality. In the Scylla case, a single partition still performed reasonably well: 20,200 OPS with good 99% latency.

Scylla vs DynamoDB – Single (Hot) Partition

YCSB Workload /
Scylla 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
Workload A
50% Read / 50% Write


Single partition (~1.1Kb)

Distribution: Uniform

Duration: 90 min.
Overall Throughput(ops/sec): 20.2K
Avg Load (scylla-server): ~5%

READ operations (Avg): ~50M
Avg. 95th Percentile Latency (ms): 7.3
Avg. 99th Percentile Latency (ms): 9.4

UPDATE operations (Avg): ~50M
Avg. 95th Percentile Latency (ms): 2.7
Avg. 99th Percentile Latency (ms): 4.5
Overall Throughput(ops/sec): 857
Avg Load (scylla-server): ~WR 0.6% | RD 0.6%

READ operations (Avg): ~2.3M
Avg. 95th Percentile Latency (ms): 5.4
Avg. 99th Percentile Latency (ms): 10.7

UPDATE operations (Avg): ~2.3M
Avg. 95th Percentile Latency (ms): 7.7
Avg. 99th Percentile Latency (ms): 607.8
Screenshot 1: Single partition.

Screenshot 1: Single partition. Consumed capacity: ~0.6% -> Throttling imposed by DynamoDB

Additional Factors

Cross-region Replication and Global Tables

We compared the replication speed between datacenters and a simple comparison showed that DynamoDB replicated in 370ms on average to a remote DC while Scylla’s average was 82ms. Since the DynamoDB cross-region replication is built on its streaming api, we believe that when congestion happens, the gap will grow much further into a multi-second gap, though we haven’t yet tested it.

Beyond replication propagation, there is a more burning functional difference — Scylla can easily add regions on demand at any point in the process with a single command:

ALTER KEYSPACE mykespace WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor': '3', '<exiting_dc>' : 3, <new_dc> : 4};

In DynamoDB, on the other hand, you must define your global tables ahead of time. This imposes a serious usability issue and a major cost one as you may need to grow the amount of deployed datacenters over time.

Why start with global Tables..? (quote)

Explicit Caching is Expensive and Bad for You

DynamoDB performance can improve and its high cost can be reduced in some cases when using DAX. However, Scylla has a much smarter and more efficient embedded cache (the database nodes have memory, don’t they?) and the outcome is far better for various reasons we described in a recent blog post.


This is another a major advantage of Scylla — DynamoDB locks you to the AWS cloud, significantly decreasing your chances of ever moving out. Data gravity is significant. No wonder they’re going after Oracle!

Scylla is an open source database. You have the freedom to choose between our community version, an Enterprise version and our new fully managed service. Scylla runs on all major cloud providers and opens the opportunity for you to run some datacenters on one provider and others on another provider within the same cluster. One of our telco customers is a great example of the hybrid model — they chose to run some of their datacenters on-premise and some on AWS.

Our approach for “locking-in” users is quite different — we do it solely by the means of delivering quality and value such that you won’t want to move away from us. As of today, we have experienced exactly zero customer churn.

No Limits

DynamoDB imposes various limits on the size of each cell — only 400kb. In Scylla you can effectively store megabytes. One of our customers built a distributed storage system using Scylla, keeping large blobs in Scylla with single-digit millisecond latency for them too.

Another problematic limit is the sort key amount, DynamoDB cannot hold more than 10GB items. While this isn’t a recommended pattern in Scylla either, we have customers who keep 130GB items in a single partition. The effect of these higher limits is more freedom in data modeling and fewer reasons to worry. 

Total Cost of Ownership (TCO)

We’re confident the judges would award every round of this battle to Scylla so far, and we haven’t even gotten to comparing the total cost of ownership. The DynamoDB setup, which didn’t even meet the required SLA and which caused us to struggle multiple times to even get working, costs 7 times more than the comparable Scylla setup.

Scylla Enterprise
(3 x i3.8xlarge + Scylla Enterprise license)
Amazon DynamoDB
(160K write | 80K Read + Business-level Support)

Year-term Estimated Cost: ~$71K

Year-term Estimated Cost: ~$524K

  • DynamoDB 1-year term: ~$288K
  • Monthly fee : ~$19.7K/month (~236K annual)

Note that only 3 machines were needed for Scylla; not much of a challenge in terms of administration. And, as we mentioned earlier, you can offload all your database administration with our new fully managed cloud service, Scylla Cloud. (By the way, Scylla Cloud comes in at 4-6x less expensive than DynamoDB, depending on the plan.)

Final Decision: A Knockout!

Uniform 99% ms Latency
Zipfian Distribution Throughput
  • DynamoDB failed to achieve the required SLA multiple times, especially during the population phase.
  • DynamoDB has 3x-4x the latency of Scylla, even under ideal conditions
  • DynamoDB is 7x more expensive than Scylla
  • Dynamo was extremely inefficient in a real-life Zipfian distribution. You’d have to buy 3x your capacity, making it 20x more expensive than Scylla
  • Scylla demonstrated up to 20x better throughput in the hot-partition test with better latency numbers
  • Last but not least, Scylla provides you freedom of choice with no cloud vendor lock-in (as Scylla can be run on various cloud vendors, or even on-premises).

Still not convinced? Listen to what our users have to say.

If you’d like to try your own comparison, remember that our product is open source. Feel free to download now. We’d love to hear from you if you have any questions about how we stack up or if you’d like to share your own results. And we’ll end with a final reminder that our Scylla Cloud (now available in Early Access) is built on Scylla Enterprise, delivering similar price-performance advantages while eliminating administrative overhead.

The post Going Head-to-Head: Scylla vs Amazon DynamoDB appeared first on ScyllaDB.

Instaclustr Releases Three Open Source Projects That Facilitate Cassandra-Kubernetes Integration and LDAP/Kerberos Authentication

An open source Cassandra operator for Kubernetes, LDAP authenticator, and Kerberos authenticator are all now freely available via GitHub


Palo Alto, California – December 12, 2018 Instaclustr, the leading provider of completely managed solutions for scalable open source technologies, today announced the availability of three open source projects purpose-built to expand developers’ capabilities using Apache Cassandra and address pain points. These projects include an open source Cassandra operator for more seamlessly running and operating Cassandra within Kubernetes, and open source LDAP and Kerberos authenticator plug-ins for Cassandra.

With Kubernetes emerging as the dominant container orchestration solution – and with Cassandra recognized for providing scalable applications with highly reliable data storage – more and more developers are seeking to use these powerful solutions in tandem. While running Cassandra on Kubernetes can be relatively simple to begin with, Kubernetes provides only a limited understanding of database functionality: it’s blind to key details of the database being written to, and has incomplete capabilities for storing data in-state.

With the open source Cassandra operator, Instaclustr and partner contributors have now introduced a solution that functions as a Cassandra-as-a-Service on Kubernetes, specifically designed to alleviate persistent challenges developers face when combining these technologies. The Cassandra operator takes deployment and operations duties completely off of developers’ plates. It provides a consistent environment and set of operations founded in best practices, and is reproducible across production clusters and development, staging, and QA environments. These advantages allow developers to focus their attention on product development – with the full strengths of Kubernetes and Cassandra at their disposal. The Cassandra operator is now ready to use in development environments through GitHub. The project continues to undergo improvements, with plans to add components and capabilities that will expand the operator’s value for developers.

To meet developers’ increasing demand for LDAP integration into Cassandra, Instaclustr has also created and released an open source LDAP authenticator plug-in that works closely with the existing CassandraAuthorizer implementation. This plug-in enables developers to much more quickly reap the benefits of secure LDAP authentication without any need to write their own solutions, and to transition to using the authenticator with zero downtime. The LDAP authenticator is freely available on GitHub, along with setup and usage instructions.

Additionally, Instaclustr has released an open source Kerberos authenticator that makes Kerberos’ industry-leading secure authentication and true single sign-on capabilities available to developers using Apache Cassandra. This project also includes a Kerberos authenticator plugin for the Cassandra Java driver.

Instaclustr customers with Apache Cassandra Enterprise Support can receive assistance implementing and running the LDAP and Kerberos authenticators. Enterprise support for the Cassandra operator will start next year.

“Our commitment to delivering 100% open source data-layer solutions for our managed platform extends to the development of new open source tools – especially when we recognize specific needs felt by our customers,” said Ben Bromhead, CTO, Instaclustr. “With these open source projects, we’ve set out to empower any developer who wishes to pair Cassandra with Kubernetes, or take advantage of LDAP or Kerberos authentication within their Cassandra deployments. We invite anyone interested to join our community of contributors, and suggest or offer improvements to these open source projects.”

About Instaclustr

Instaclustr is the Open Source-as-a-Service company, delivering reliability at scale. We operate an automated, proven, and trusted managed environment, providing database, analytics, search, and messaging. We enable companies to focus internal development and operational resources on building cutting edge customer-facing applications.

For more information, visit and follow us @Instaclustr.

The post Instaclustr Releases Three Open Source Projects That Facilitate Cassandra-Kubernetes Integration and LDAP/Kerberos Authentication appeared first on Instaclustr.

Scylla Manager 1.3 Release Announcement

Scylla Manager Release

The Scylla Enterprise team is pleased to announce the release of Scylla Manager 1.3, a production-ready release of Scylla Manager for Scylla Enterprise customers.

Scylla Manager 1.3 adds a new Health Check, which works as follows.. Scylla nodes are already reporting on their status through “nodetool status” and via Scylla Monitoring Stack dashboards; but in some cases, it is not enough. A node might report an Up-Normal (UN) status, while in fact, it is slow or not responding to CQL requests. This might be a result of an internal problem in the node, or an external issue (for example, a blocked CQL port somewhere between the application and the Scylla node).

Scylla Manager’s new Health Check functionality helps identify such issues as soon as possible, playing a similar role to an application querying the CQL interface from outside the Scylla cluster.

Scylla Manager 1.3 automatically adds a new task to each a new managed cluster. This task is a health check which sends a CQL OPTION command to each Scylla node and measures the response time. If there is a response faster than 250ms the node is considered to be ‘up’. If there is no response or the response takes longer than 250 ms, the node is considered to be ‘down’. The results are available using the “sctool status” command.

Scylla Manager 1.3 Architecture, including the Monitoring Stack, and the new CQL base Health Check interface to Scylla nodes.

If you have enabled the Scylla Monitoring stack, Monitoring stack 2.0 Manager dashboard includes the same cluster status report. A new Alert was defined in Prometheus Alert Manager, to report when a Scylla node health check fails and the node is considered ‘down’.

Example of Manager 1.3 Dashboard, including an active repair running, and Health Check reports of all nodes responding to CQL.

Related links:

Upgrade to Scylla Manager 1.3

Read the upgrade guide carefully. In particular, you will need to redefine scheduled repairs. Please contact Scylla Support team for help in installing and upgrading Scylla Manager.


Scylla Grafana Monitoring 2.0 now includes the Scylla Manager 1.3 dashboard

About Scylla Manager

Scylla Manager adds centralized cluster administration and recurrent task automation to Scylla Enterprise. Scylla Manager 1.x includes automation of periodic repair. Future releases will provide rolling upgrades, recurrent backup, and more. With time, Scylla Manager will become the focal point of Scylla Enterprise cluster management, including a GUI front end. Scylla Manager is available for all Scylla Enterprise customers. It can also be downloaded from for a 30-day trial.

The post Scylla Manager 1.3 Release Announcement appeared first on ScyllaDB.

Introducing the DataStax Apache Kafka™ Connector

Built by the team that authors the DataStax Drivers for Apache Cassandra™, the DataStax Apache Kafka Connector capitalizes on the best practices of ingesting to DataStax Enterprise (DSE) while delivering enterprise-grade resiliency and security.

Modern architectures are made up of a diverse landscape of technologies, each serving its purpose within the data ecosystem. Apache Kafka fits naturally as a distributed queue for event-driven architectures, serving as a buffer layer to transport the messages to the database and surrounding technologies.

There is no better solution in the market to complement Apache Kafka than DSE. As an operational data layer and hybrid cloud database, DSE delivers a multi-model persistent data store that never goes down and scales horizontally to deliver real-time access that is needed to serve enriched, personalized applications.

Automatic Ingest from Kafka to DSE

The DataStax Apache Kafka Connector is the bridge that allows data to seamlessly move from Apache Kafka to DSE in event-driven architectures. Known in the Kafka Connect framework as a sink, the key features of this connector are its market-leading performance, flexibility, security, and visibility. All of this is offered with DataStax Basic, DSE, and DataStax Distribution of Apache Cassandra subscriptions at no additional cost.

As mentioned, the DataStax Apache Kafka Connector is built by the experts that develop and maintain Apache Cassandra’s drivers. Without going into the weeds, the same techniques used in the DataStax Bulk Loader that proved to outperform all other bulk loading solutions for Cassandra are also leveraged in the connector.


The design of this sink considers the varying data structures that are found in Apache Kafka, and the selective mapping functionality in the connector allows the user to specify the Kafka fields that should be written to DSE columns. This allows for a single connector instance to read from multiple Apache Kafka topics and write to many DSE tables, thereby removing the burden of managing several connector instances. Whether the Apache Kafka data is in Avro, JSON, or string format, the DataStax Apache Kafka Connector extends advanced parsing to account for the wide range of data inputs.


One of the core value propositions of DSE is its enterprise-grade security. With built-in SSL, LDAP/Active Directory, and Kerberos integration, DSE contains the tools needed to achieve strict compliance regulations for the connection from client to server. These security features are also included in the DataStax Apache Kafka Connector, ensuring that the connection between the connector and the data store is secure.


In regards to visibility and error handling, we know that in complex distributed environments, things are bound to hit points of failure. The engineering team at DataStax took special care to account for these error scenarios and all of the intelligence of the DataStax Drivers is applied in the DataStax Apache Kafka Connector. Additionally, there are metrics included that give the operator visibility into the failure rate and latency indicators as the messages pass from Kafka to DSE.

Available Now

We are excited to release this connector and improve the interoperability of DSE in the data ecosystem for DSE versions 5.0 and above. Stay tuned for coming blogs that will detail advanced usage of this sink, visit our documentation and examples for more information, and download the new connector today to try out in your own environment.

Learn about the DataStax Apache Kafka Connector in this short course.

White paper: DataStax Enterprise and Apache Kafka™ for Modern Architectures


Rolling Reboots with cstarpar

Welcome to the third post in our cstar series. So far, the first post gave an introduction to cstar, while the second post explained how to extend cstar with custom commands. In this post we will look at cstar’s cousin cstarpar. Both utilities deliver the same topology-aware orchestration, yet cstarpar executes commands locally, allowing operations cstar is not capable of.

Using ssh

cstarpar relies heavily on ssh working smoothly and without any user prompts. When we run a command with cstar, it will take the command, ssh into the remote host, and execute the command on our behalf. For example, we can run hostname on each node of a 3-node cluster:

$ cstar run --seed-host --command hostname
$ cat ~/.cstar/jobs/8ff6811e-31e7-4975-bec4-260eae885ef6/ec2-*/out

If we switch to cstarpar, it will execute the hostname command locally and we will see something different:

$ cstarpar --seed-host hostname
$ cat ~/.cstar/jobs/a1735406-ae58-4e44-829b-9e8d4a90fd06/ec2-*/out

To make cstarpar execute commands on remote machines we just need to make the command explicitly use ssh:

$ cstarpar --seed-host "ssh {} hostname"
cat ~/.cstar/jobs/2c54f7a1-8982-4f2e-ada4-8b45cde4c4eb/ec2-*/out

Here we can see the hostname was executed on the remote hosts.


The true advantage of local execution is that there is no need for interaction with the remote host. This approach allows operations that would normally prevent that interaction, such as reboots. For example, the following command reboots the entire cluster in a topology-aware fashion, albeit very roughly because it gracelessly kills all processes, including Cassandra:

$ cstarpar --seed-host -- "ssh {} sudo reboot &"

Note that this example used the sudo reboot & command. The reboot command on its own causes the reboot immediately. This is so drastic that it causes Python’s subprocess module to think an error occured. Placing the & after the command, directing to run the command in the background, allows the shell execution return back to Python cleanly. Once the host is down, cstarpar will mark the host as such in the job status report.

It is important to ensure the hosts are configured to start the Cassandra process automatically after the reboot, because just like cstar, cstartpar will proceed with next hosts only if all hosts are up and will otherwise wait indefinitely for the rebooted host to come back.

Since cstarpar can execute local commands and scripts, it need not support complex commands in the same way cstar does. To run a complex command with cstarpar, we can use a script file. To illustrate this, the script below will add a graceful shutdown of Cassandra before executing the actual reboot:

$ cat ~/

echo "Draining Cassandra"
ssh ${FQDN} nodetool drain && sleep 5

echo "Stopping Cassandra process"
ssh ${FQDN} sudo service cassandra stop && sleep 5

echo "Rebooting"
ssh ${FQDN} sudo reboot &

The reboot command then runs like this:

$ cstarpar --seed-host -- "bash /absolute/path/to/ {}"

Replication and Conclusion

For this post, I used a simple three node cluster provisioned with tlp-cluster. cstarpar relies heavily on ssh working smoothly and without user prompts. Initially, I attempted the connection without any specific ssh configuration on my laptop or the AWS hosts, the ssh calls looked like this:

$ cstarpar --seed-host ${SEED_IP} --ssh-identity-file=${PATH_TO_KEY}  --ssh-username ubuntu "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@{} hostname"

In the I also had to add some options:

ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i ${PATH_TO_KEY} ubuntu@${FQDN} sudo reboot &

Once configured, I was able to harness the full power of cstarpar, which supplements cstar functionality by executing commands locally. This was demonstrated to be useful for operations for which the cstar’s mode of operation is not well suited, such as reboots. Importantly, to leverage the most value from cstarpar, it is critical to have ssh configured to run smoothly and without any user prompts.

DZone: Kerberos Authenticator for Apache Cassandra

Coming right on the heels of announcing an open source LDAP authenticator for Apache Cassandra, we’re proud to now release an open source Kerberos authenticator. The new project makes Kerberos’ celebrated single sign-on and secure authentication capabilities available to all Apache Cassandra users.

Read more about the open source Kerberos authenticator here.

The post DZone: Kerberos Authenticator for Apache Cassandra appeared first on Instaclustr.

iTWire: Why Open Source makes sense for cloud deployments

Instaclustr is a 100% open-source business, using Cassandra (“one of the most scalable databases in the world”) for data storage, Spark for analytics, Elasticsearch for search, and Kafka for messaging, among other pieces of software.

Instaclustr‘s proposition is that organisations need to be able to massively and reliably scale cloud applications, and if Instaclustr looks after the data layer, its clients can concentrate on their applications, chief executive Peter Nichol told iTWire.

Read more here.

The post iTWire: Why Open Source makes sense for cloud deployments appeared first on Instaclustr.

Announcing DataStax Enterprise 6.7 (And More!)

We’re excited to announce DSE 6.7 today, along with OpsCenter 6.7 and updates for our DataStax Studio developer tool and DataStax drivers. If you’re thinking, “Hey, 6.7? Wasn’t your last major release 6.0?”, you’ve been paying attention and are correct. We decided to sync the major/minor version numbers of our software solutions—hence the jump in the minor version number.

Release numbering aside, there are some real gems in DSE 6.7 that have been driven by our tight customer-engineering relationships. Let’s dive in and see what’s in the 6.7 box.

Analytics Everyone?

I’m sure it surprises no one that our DSE Analytics is one of the most popular enterprise components found in DSE. Operational analytics are paramount in driving the types of real-time analysis needed by today’s constantly-connected cloud applications that personalize every customer encounter.

In DSE 6.7, we’ve enhanced our enterprise Apache Spark™ connectivity layer so that performance purrs along in a predictably consistent way, which delivers greater stability and less resource overhead on a DSE cluster.

One of the great things about DSE is that you have flexibility in how you store and access data. If you want a real operational database, we have that for you with our certified distribution of Apache Cassandra™.

We also have DSEFS (DataStax Enterprise file system), our enterprise-class file system that gives you all the niceties of HDFS with none of its thorns. DSEFS is great for analytics use cases where file system storage is all that’s needed vs. a true database. In 6.7, the security muscles of DSEFS get bigger with its support for Kerberos, plus we’ve made your experience working with many files and nested folders stored in DSEFS much simpler. Regarding scale, we’ve seen DSEFS scale to 40TB per node with no query performance degradation.

Finally, we round out our analytics enhancements with faster graph database analytical performance and a better high-availability Spark engine.

Boosting Geospatial Search

DSE Analytics and DSE Search typically run neck-and-neck in popularity, which makes sense given that today’s smart transactions usually involve analytical and search workloads.

In DSE 6.7 we’ve better enabled use cases that need geospatial search (e.g. logistics, IoT, financial network management) with automatic handling of key geospatial data types in Apache Solr™ cores, packaged installation of needed geospatial software on every DSE node, and support for geospatial heatmaps in CQL—all of which make building apps dependent on geospatial much easier. This is important given that an article in CIO magazine a while back stated that ~80% of enterprises rely on geospatial data to help run their business

In addition, 6.7 includes improved diagnostic monitoring of search performance and better speed/reliability for deployments of DSE Search that use vnodes.

Improved Data Protection and Insights

Every second counts when a critical database restore is needed, so in DSE 6.7, we’ve optimized our restore engine to get a database back on its feet magnitudes faster than in prior releases. We also now support backups to Microsoft Azure and generic S3 platforms.

Responding to customer requests for more sophisticated performance troubleshooting capabilities, we’re rolling out the first phase of our DSE Insights engine, which is designed (in a no-agent way) to continually collect key metrics on cluster performance and make them available to our administration tools as well as easily integrate with third-party monitoring software often found in IT shops.

DSE Now Kafka and Docker Ready

Along with DSE 6.7, we’re excited to announce that we’re releasing our own Apache Kafka™ Connector. Most every IT pro knows that message/queues/streams provide modern apps with the ability to act on events as they arrive, something that’s greatly needed in fraud / anomaly detection, financial systems, IoT, and time series use cases that cut across nearly every industry.

Our new Kafka connector reads from a Kafka topic and automatically saves those events to DSE. For situations that don’t need heavier lifting with Spark Streaming, the new Kafka connector provides an easy way to integrate data coming from Kafka into a real database.

With respect to Docker, we’ve had images up on Docker Store and Hub for development use, but now we’ve certified images for production usage.

DSE 6.7, along with updated versions of OpsCenter, Studio, and DSE Drivers are all available for download, and our updated documentation is available to guide you through installation, upgrading, and all the new features. Give DSE 6.7 a try and shoot us your feedback.

Also—keep an eye for more blog posts in the coming weeks that delve into all of the DSE 6.7’s new features in more detail.

White paper: DataStax Enterprise and Apache Kafka™ for Modern Architectures


Worry-Free Ingestion: Flow Control of Writes in Scylla

Scylla Flow Control

This blog post is based on a talk I gave last month at the third annual Scylla Summit in San Francisco. It explains how Scylla ensures that ingestion of data proceeds as quickly as possible, but not quicker. It looks into the existing flow-control mechanism for tables without materialized views, and into the new mechanism for tables with materialized views, which is introduced in the upcoming Scylla Open Source 3.0 release.


In this post we look into ingestion of data into a Scylla cluster. What happens when we make a large volume of update (write) requests?

We would like the ingestion to proceed as quickly as possible but without overwhelming the servers. An over-eager client may send write requests faster than the cluster can complete earlier requests. If this is only a short burst of requests, Scylla can absorb the excess requests in a queue or numerous queues distributed throughout the cluster (we’ll look at the details of these queues below). But had we allowed the client to continue writing at this excessive rate, the backlog of uncompleted writes would continue to grow until the servers run out of memory and possibly crash. So as the backlog grows, we need to find a way for the server to tell the client to slow down its request rate. If we can’t slow down the client, we have to start failing new requests.

Cassandra’s CQL protocol does not offer any explicit flow-control mechanisms for the server to slow down a client which is sending requests faster than the server can handle them. We only have two options to work with: delaying replies to the client’s requests, and failing them. How we can use these two options depends on what drives the workload: We consider two different workload models — a batch workload with bounded concurrency, and an interactive workload with unbounded concurrency:

  1. In a batch workload, a client application wishes to drive the server at 100% utilization for a long time, to complete some predefined amount of work. There is a fixed number of client threads, each running a request loop: preparing some data, making a write request, and waiting for its response. The server can fully control the request rate by rate-limiting (delaying) its replies: If the server only sends N replies per second, the client will only send N new requests per second. We call this rate-limiting of replies, or throttling.

  2. In an interactive workload, the client sends requests driven by some external events (e.g., activity of real users). These requests can come at any rate, which is unrelated to the rate at which the server completes previous requests. For such a workload, if the request rate is at or below the cluster’s capacity, everything is fine and the request backlog will be mostly empty. But if the request rate is above the cluster’s capacity, the server has no way of slowing down these requests and the backlog grows and grows. If we don’t want to crash the server (and of course, we don’t), we have no choice but to return failure for some of these requests.

    When we do fail requests, it’s also important how we fail: We should fail fresh new, not yet handled, client requests. It’s a bad idea to fail requests to which we had already devoted significant work — if the server spends valuable CPU time on requests which will end up being failed anyway, throughput will lower. We use the term admission control for a mechanism which fails a new request when it believes the server will not have the resources needed to handle the request to completion.

For these reasons Scylla utilizes both throttling and admission control. Both are necessary. Throttling is a necessary part of handling normal batch workloads, and admission control is needed for unexpected overload situations. In this post, we will focus on the throttling part.

We sometimes use the term backpressure to describe throttling, which metaphorically takes the memory “pressure” (growing queues) which the server is experiencing, and feeds it back to the client. However, this term may be confusing, as historically it was used for other forms of flow control, not for delaying replies as a mechanism to limit the request rate. In the rest of this document I’ll try to avoid the term “backpressure” in favor of other terms like throttling and flow control.

Above we defined two workload models — interactive and and batch workloads. We can, of course, be faced by a combination of both. Moreover, even batch workloads may involve several independent batch clients, starting at different times and working with different concurrencies. The sum of several such batch workloads can be represented as one batch workload with a changing client concurrency. E.g., a workload can start with concurrency 100 for one minute, then go to concurrency 200 for another minute, etc. Our flow control algorithms need to reasonably handle this case as well, and react to a client’s changing concurrency. As an example, consider that the client doubled the number of threads. Since the total number of writes the server can handle per second remains the same, now each client thread will need to send requests at half the rate it sent earlier when there were just half the number of threads.

The problem of background writes

Let’s first look at writes to regular Scylla tables which do not have materialized views. Later we can see how materialized views further complicate matters.

A client sends an update (a write request) to a coordinator node, which sends the update to RF replicas (RF is the replication factor — e.g., 3). The coordinator then waits for first CL (consistency level — e.g., 2) of those writes to have completed, at which point it sends a reply to the client, saying that the desired consistency-level has been achieved. The remaining ongoing writes to replicas (RF-CL — in the above examples =1 remaining write) will then continue “in the background”, i.e., after the response to the client, and without the client waiting for them to finish.

The problem with these background writes is that a batch workload, upon receiving the server’s reply, will send a new request before these background writes finish. So if new writes come in faster than we can finish background writes, the number of these background writes can grow without bound. But background writes take memory, so we cannot allow them to grow without bound. We need to apply some throttling to slow the workload down.

The slow node example

Before we explain how Scylla does this throttling, it is instructive to look at one concrete — and common — case where background writes pile up and throttling becomes necessary.

This is the case where one of the nodes happens to be, for some reason, consistently slower than the others. It doesn’t have to be much slower — even a tiny bit slower can cause problems:

Consider, for example, three nodes and a table with RF=3, i.e., all data is replicated on all three nodes, so all writes need to go to all three. Consider than one node is just 1% slower: Two of the nodes can complete 10,000 replica writes per second, while the third can only complete 9,900 replica writes per second. If we do CL=2 writes, then every second 10,000 of these writes can complete after node 1 and 2 completed their work. But since node 3 can only finish 9,900 writes in this second, we will have added 100 new “background writes” waiting for the write to node 3 to complete. We will continue to accumulate 100 additional background writes each second and, for example, after 100 seconds we will have accumulated 10,000 background writes. And this will continue until we run out of memory, unless we slow down the client to only 9,900 writes per second (and in a moment, we’ll explain how). It is possible to demonstrate this and similar situations in real-life Scylla clusters. But to make it easier to play with different scenarios and flow-control algorithms, we wrote a simple simulator. In the simulator we can exactly control the client’s concurrency, the rate at which each replica completes write requests, and then graph the lengths of the various queues, the overall write performance, and so on, and investigate how those respond to different throttling algorithms.

In our simple “slow node” example, we see the following results from the simulator:

Simulator Results, Figure 1

Simulator Results 2

In the top graph, we see that a client with fixed concurrency (arbitrarily chosen as 50 threads) writing with CL=2 will, after a short burst, get 10,000 replies each second, i.e., the speed of the two fastest nodes. But while staying at that speed, we see in the bottom graph that the backlog of background writes grows continuously — 100 every second, as we suspected. We need to slow down the client to curb this growth.

It’s obvious from the description above that any consistent difference in node performance, even much smaller than 1%, will eventually cause throttling to be needed to avoid filling the entire memory with backlogged writes. In real-life such small performance differences do happen in clouds, e.g., because some of the VMs have busier “neighbors” than others.

Throttling to limit background writes

Scylla applies a simple, but effective, throttling mechanism: When the total amount of memory that background writes are currently using goes over some limit — currently 10% of the shard’s memory — the coordinator starts throttling the client by no longer moving writes from foreground to background mode. This means that the coordinator will only reply when all RF replica writes have completed, with no additional work left in the background. When this throttling is on, the backlog of background writes does not continue to grow, and replies are only sent at the rate we can complete all the work, so a batch workload will slow down its requests to the same rate.

It is worth noting that when throttling is needed, the queue of background writes will typically hover around its threshold size (e.g., 10% of memory). When a flow-control algorithm always keeps a full queue, it is said to suffer from the bufferbloat problem. The typical bufferbloat side-effect is increased latency, but happily in our case this is not an issue: The client does not wait for the background writes (since the coordinator has already returned a reply), so the client will experience low latency even when the queue of background writes is full. Nevertheless, the full queue does have downsides: it wastes memory and it prevents the queue from absorbing writes to a node that temporarily goes down.

Let’s return to our “slow node” simulation from above, and see how this throttling algorithm indeed helps to curb the growth of the backlog of background writes:

Simulator Results 3

Simulator Results 4

As before, we see in the top graph that the server starts by sending 10,000 replies per second, which is the speed of the two fastest nodes (remember we asked for CL=2). At that rate, the bottom graph shows we are accruing a backlog of 100 background writes per second, until at time 3, the backlog has grown to 300 items. In this simulation we chose 300 as background write limit (representing the 10% of the shard’s memory in real Scylla). So at that point, as explained above, the client is throttled by having its writes wait for all three replica writes to complete. Those will only complete at rate of 9,900 per second (the rate of the slowest node), so the client will slow down to this rate (top graph, starting from time 3), and the background write queue will stop growing (bottom graph). If the same workload continues, the background write queue will remain full (at the threshold 300) — if it temporarily goes below the threshold, throttling is disabled and the queue will start growing back to the threshold.

The problem of background view updates

After understanding how Scylla throttles writes to ordinary tables, let’s look at how Scylla throttles writes to materialized views. Materialized views were introduced in Scylla 2.0 as an experimental feature — please refer to this blog post if you are not familiar with them. They are officially supported in Scylla Open Source Release 3.0, which also introduces the throttling mechanism we describe now, to slow down ingestion to the rate at which Scylla can safely write the base table and all its materialized views.

As before, a client sends a write requests to a coordinator, and the coordinator sends them to RF (e.g., 3) replica nodes, and waits for CL (e.g., 2) of them to complete, or for all of them to complete if the backlog of background write reached the limit. But when the table (also known as the base table) has associated materialized views, each of the base replicas now also sends updates to one or more paired view replicas — other nodes holding the relevant rows of the materialized views.

The exact details of which updates we send, where, and why is beyond the scope of this post. But what is important to know here is that the sending of the view updates always happens asynchronously — i.e., the base replica doesn’t wait for it, and therefore the coordinator does not wait for it either — only the completion of enough writes to the base replicas will determine when the coordinator finally replies to the client.

The fact that the client does not wait for the view updates to complete has been a topic for heated debate ever since the materialized view feature was first designed for Cassandra. The problem is that if a base replica waits for updates to several view replicas to complete, this hurts high availability which is a cornerstone of Cassandra’s and Scylla’s design.

Because the client does not wait for outstanding view updates to complete, their number may grow without bound and use unbounded amounts of memory on the various nodes involved — the coordinator, the RF base replicas and all the view replicas involved in the write. As in the previous section, here too we need to start slowing down the client, until the rate when the system completes background work at the same rate as new background work is generated.

To illustrate the problem Scylla needed to solve, let’s use our simulator again to look at a concrete example, continuing the same scenario we used above. Again we have three nodes, RF=3, client with 50 threads writing with CL=2. As before two nodes can complete 10,000 base writes per second, and the third only 9,900. But now we introduce a new constraint: the view updates add considerable work to each write, to the point that the cluster can now only complete 3,000 writes per second, down from the 9,900 it could complete without materialized views. The simulator shows us (top graph below) that, unsurprisingly, without a new flow-control mechanism for view writes the client is only slowed down to 9,900 requests per second, not to 3,000. The bottom graph shows that at this request rate, the memory devoted to incomplete view writes just grows and grows, by as many as 6,900 (=9,900-3,000) updates per second:

Simulator Results 5

Simulator Results 6

So, what we need now is to find a mechanism for the coordinator to slow down the client to exactly 3,000 requests per second. But how do we slow down the client, and how does the coordinator know that 3,000 is the right request rate?

Throttling to limit background view updates

Let us now explain how Scylla 3.0 throttles the client to limit the backlog of view updates. We begin with two key insights:

  1. To slow down a batch client (with bounded concurrency), we can add an artificial delay to every response. The longer the delay is, the lower the client’s request rate will become.
  2. The chosen delay influences the size of the view-update backlog: Picking a higher delay slows down the client and slows the growth of the view update backlog, or even starts reducing it. Picking a lower delay speeds up the client and increases the growth of the backlog.

Basically, our plan is to devise a controller, which changes the delay based on the current backlog, trying to keep the length of the backlog in a desired range. The simplest imaginable controller, a linear function, works amazingly well:

(1) delay = α ⋅ backlog

Here α is any constant. Why does this deceptively-simple controller work?

Remember that if delay is too small, backlog starts increasing, and if delay is too large, the backlog starts shrinking. So there is some “just right” delay, where the backlog size neither grows nor decreases. The linear controller converges on exactly this just-right delay:

  1. If delay is lower than the just-right one, the client is too fast, the backlog increases, so according to our formula (1), we will increase delay.
  2. If delay is higher than the just-right one, the client is too slow, the backlog shrinks, so according to (1), we will decrease delay.

Let’s add to our simulator the ability to delay responses by a given delay amount, and to vary this delay according to the view update backlog in the base replicas, using formula (1). The result of this simulation looks like this:

Simulator Results 7

Simulator Results 8

In the top graph, we see the client’s request rate gradually converging to exactly the request rate we expected: 3,000 requests per second. In the bottom graph, the backlog length settles on about 1600 updates. The backlog then stops growing any more — which was our goal.

But why did the backlog settle on 1600, and not on 100 or 1,000,000? Remember that the linear control function (1) works for any α. In the above simulation, we took α = 1.0 and the result was convergence on backlog=1600. If we change α, the delay to which we converge will still have to be the same, so (1) tells us that, for example, if we double α to 2.0, the converged backlog will halve, to 800. In this manner, if we gradually change α we can reach any desired backlog length. Here is an example, again from our simulator, where we gradually changed α with the goal of reaching a backlog length of 200:

Simulator Results 9

Simulator Results 10

Indeed, we can see in the lower graph that after over-shooting the desired queue length 200 and reaching 700, the controller continues to increase to decrease the backlog, until the backlog settles on exactly the desired length — 200. In the top graph we see that as expected, the client is indeed slowed down to 3,000 requests per second. Interestingly in this graph, we also see a “dip”, a short period where the client was slowed down even further, to just 2,000 requests per second. The reason for this is easy to understand: The client starts too fast, and a backlog starts forming. At some point the backlog reached 700. Because we want to decrease this backlog (to 200), we must have a period where the client sends less than 3,000 requests per second, so that the backlog would shrink.

In controller-theory lingo, the controller with the changing α is said to have an integral term: the control function depends not just on the current value of the variable (the backlog) but also on the previous history of the controller.

In (1), we considered the simplest possible controller — a linear function. But the proof above that it converges on the correct solution did not rely on this linearity. The delay can be set to any other monotonically-increasing function of the backlog:

(2) delay = f(backlog / backlog0 delay0

(where backlog0 is a constant with backlog units, and delay0 is a constant with time units).

In Scylla 3.0 we chose this function to be a polynomial, selected to allow relatively-high delays to be reached without requiring very long backlogs in the steady state. But we do plan to continue improving this controller in future releases.


A common theme in Scylla’s design, which we covered in many previous blog posts, is the autonomous database, a.k.a. zero configuration. In this post we covered another aspect of this theme: When a user unleashes a large writing job on Scylla, we don’t want him or her to need to configure the client to use a certain speed or risk overrunning Scylla. We also don’t want the user to need to configure Scylla to limit an over-eager client. Rather, we want everything to happen automatically: The write job should just just run normally without any artificial limits, and Scylla should automatically slow it down to exactly the right pace — not too fast that we start piling up queues until we run out of memory, but also not too slow that we let available resources go to waste.

In this post, we explained how Scylla throttles (slows down) the client by delaying its responses, and how we arrive at exactly the right pace. We started with describing how throttling works for writes to ordinary tables — a feature that had been in Scylla for well over a year. We then described the more elaborate mechanisms we introduce in Scylla 3.0 for throttling writes to tables with materialized views. For demonstration purposes, we used a simulator for the different flow-control mechanisms to better illustrate how they work. However, these same algorithms have also been implemented in Scylla itself — so go ahead and ingest some data! Full steam ahead!

Flow Control Finish

The post Worry-Free Ingestion: Flow Control of Writes in Scylla appeared first on ScyllaDB.