What’s New for DataStax Enterprise Analytics 6.7

The past several releases of DSE have included several pivotal analytics innovations, specifically the popular Always On Spark SQL and DataStax Enterprise File System (DSEFS) features. With the latest release of DSE, the analytics story continues as we provide enhancements to DSE Analytics that make all of its great features more secure, faster, and easier to use.

In DSE 6.7, we’ve rounded out the DSE Analytics Kerberos authentication story by providing support for DSEFS REST API users. In addition to this, we’ve added convenience methods to the DSEFS API to enable recursive commands as well as improved logging in the DataStax Spark Shell, a popular choice for developers working with DSE Analytics. Finally, we’ve implemented intelligent throttling mechanisms that optimize performance in DSE Analytics between the Apache Cassandra™ and Spark™ components.

DSEFS Kerberos

Users of the DSEFS REST API now have the ability to access DSEFS data securely with DSE’s Unified Authentication functionality. In DSE 6.7, the DSEFS REST API has been integrated with Kerberos providing peace of mind for architects, developers, and operators of applications built on the power of DataStax’s distributed file system.

Setup and configuration of a secured DSEFS REST endpoint is easy thanks to the options provided by the integration of SPENGO and Kerberos token delegation in DSE Analytics. Users can simply choose the method they desire to obtain a delegation token and then register that token with the REST endpoint. Now, all DSEFS REST API calls to a specific endpoint are secured through Kerberos authentication.

DSEFS Recursive Commands

Users of the first releases of DSEFS have missed a few convenience methods that support bulk operations for file management. With DSE 6.7, DSEFS users have the ability to make bulk changes using the recursive flag R for operations, such as copy (cp), permissions management (chmod), and owner management (chown). There’s no configuration needed to take advantage of this feature; simply upgrade to DSE 6.7 and you’re able to start using the recursive command flag.

Spark Shell Logging

Users who come to DSE Analytics from a Spark background have noticed that the logging and messages passed into the DSE Analytics shell differ from those of the open source Spark shell. These differences have been removed and now messages experienced in both environments are in synch, providing a friendly experience for native Spark users in DSE Analytics.

Predictable Performance

At DataStax we’re constantly looking for ways to improve the lives of developers building applications against DSE. One small but impactful improvement included for DSE Analytics in 6.7 is the introduction of a rate-limiting feature for DSE Analytics read operations. This new feature ensures that developers don’t inadvertently overload DSE clusters by issuing overly aggressive queries that can consume all available DSE Analytics resources.

Working with this new feature is simple: Users simply set the newly introduced spark.cassandra.input.throughputMBPerSec property available per DSE Analytics task; once set, DSE Analytics will ensure that reads are limited at the rate specified.

This new property is specifically useful in situations where a DSE Analytics task leverages the JoinWithCassandraTable method.

More to Come

The features contained in DSE 6.7 help strengthen the usability of DSE for analytics users. DataStax will continue to listen to our users and provide meaningful advances in DSE Analytics for years to come.

You can download DSE 6.7 now and read through our updated documentation for more information.

SQL Support in DataStax Enterprise (white paper)


Scylla and Confluent Integration for IoT Deployments

Scylla Flow Control


The Internet is not just connecting people around the world. Through the Internet of Things (IoT), it is also connecting humans to the machines all around us and directly connecting machines to other machines. In this blog post we’ll share an emerging machine-to-machine (M2M) architecture pattern in which MQTT, Apache Kafka and Scylla all work together to provide an end-to-end IoT solution. We’ll also provide demo code so you can try it out for yourself.


IoT Scale

IoT is a fast-growing market, already known to be over $1.2 trillion in 2017 and anticipated to grow to over $6.5 trillion by 2024. The explosive number of devices generating, tracking, and sharing data across a variety of networks is overwhelming to most data management solutions. With more than 25 billion connected devices in 2018 and internet penetration increasing at a staggering 1066% since 2000, the opportunity in the IOT market is significant.

There’s a wide variety of IoT applications, like data center and physical plant monitoring, manufacturing (a multibillion-dollar sub-category known as Industrial IoT, or IIoT), smart meters, smart homes, security monitoring systems and public safety, emergency services, smart buildings (both commercial and industrial), healthcare, logistics & cargo tracking, retail, self-driving cars, ride sharing, navigation and transport, gaming and entertainment… the list goes on.

A significant dependency for this growth is the overall reliability and scalability of IoT deployments. As Internet of Things projects go from concepts to reality, one of the biggest challenges is how the data created by devices will flow through the system. How many devices will create information? What protocols do the devices use to communicate? How will they send that information back? Will you need to capture that data in real time, or in batches? What role will analytics play in the future? What follows is an example of such a system, using existing best-in-class technologies.

An End-to-End Architecture for the Internet of Things

IOT-based applications (both B2C and B2B) are typically built in the cloud as microservices with similar characteristics. It is helpful to think about the data created by the devices and the applications in three stages:

  • Stage one is the initial creation — where data is created on the device and then sent over the network.
  • Stage two is how the central system collects and organizes that data.
  • Stage three is the ongoing use of that data stored in a persistent storage system.

Typically, when sensors/smart-devices get actuated they create data. This information can then be sent over the network back to the central application. At this point, one must decide which standard the data will be created in and how it will be sent over the network.

One widely used protocol for delivering this data is the Message Queuing Telemetry Transport (MQTT) protocol. MQTT is a lightweight messaging protocol for pub-sub communication typically used for M2M communication. Apache Kafka® is not a replacement to MQTT, but since MQTT is not built for high scalability, longer storage or easy integration to legacy systems, it complements Apache Kafka well.

In an IoT solution, devices can be classified into sensors and actuators. Sensors generate data points while actuators are mechanical components that may be controlled through commands. For example, the ambient lighting in a room may be used to adjust the brightness of an LED bulb and MQTT is the protocol optimized for sensor networks and M2M. Since MQTT is designed for low-power and coin-cell-operated devices, it cannot handle the ingestion of massive datasets.

On the other hand, Apache Kafka may deal with high-velocity data ingestion but not with M2M. Scalable IoT solutions use MQTT as an explicit device communication while relying on Apache Kafka for ingesting sensor data. It is also possible to bridge Kafka and MQTT for ingestion. It is recommended to keep them separate by configuring the devices or gateways as Kafka producers while still participating in the M2M network managed by an MQTT broker.

At stage two, data typically lands as streams in Kafka and is arranged in the corresponding topics that various IoT applications consume for real-time decision making. Various options like KSQL and Single Message Transforms (SMT) are available at this stage.

At stage three this data, which typically has a shelf life, is streamed into a long-term store like Scylla using the Kafka Connect framework. A scalable, distributed, peer-to-peer NoSQL database, Scylla is a perfect fit for consuming the variety, velocity and volume of data (often time-series) coming directly from users, devices and sensors spread across geographic locations.

What is Apache Kafka?

Apache Kafka is an open source distributed message queuing and streaming platform capable of handling a high volume and velocity of events. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from a message queuing system to a full-fledged streaming platform.

Enterprises typically accumulate large amounts of data over time from different sources and data types such as IoT devices and microservices applications. Traditionally, for businesses to derive insights from this data they used data warehousing strategies to perform Extract, Transform, Load (ETL) operations that are batch-driven and run at a specific cadence. This leads to an unmanageable situation as custom scripts move data from their sources to destinations as one-offs. It also creates many single points of failure and does not permit analysis of the data in real time.

Kafka provides a platform that can arrange all these messages by topics and streams. Kafka is enterprise ready and has features like high availability (HA) and replication on commodity hardware. Kafka decouples the impedance mismatch between the sources and the downstream systems that need to perform business-driven actions on the data.

Apache Kafka Integrations

What is Scylla?

Scylla is a scalable, distributed, peer-to-peer NoSQL database. It is a drop-in replacement for Apache Cassandra™ that delivers as much as 10X better throughput and more consistent low latencies. It also provides better cluster resource utilization while building upon the existing Apache Cassandra ecosystem and APIs.

Most microservices developed in the cloud prefer to have a distributed database native to the cloud that can linearly scale. Scylla fits that use case well by harnessing modern multi-core/multi-CPU architecture, and producing low, predictable latency response times. Scylla is written in C++, which results in significant improvements of TCO, ROI and an overall better user experience.

Scylla is a perfect complement to Kafka because it leverages the best from Apache Cassandra in high availability, fault tolerance, and its rich ecosystem. Kafka is not an end data store itself, but a system to serve a number of downstream storage systems that depend on sources generating the data.

Demo of Scylla and Confluent Integration

The goal of this demo is to demonstrate an end-to-end use case where sensors emit temperature and brightness readings to Kafka and the messages are then processed and stored in Scylla. To demonstrate this, we are using Kafka MQTT proxy (part of the Confluent Enterprise package), which acts as a broker for all the sensors that are emitting the readings.

We also use the Kafka Connect Cassandra connector, which spins up the necessary consumers to stream the messages into Scylla. Scylla supports both the data format (SSTable) and all relevant external interfaces, which is why we can use the out of the box Kafka Connect Cassandra connector.

The load from various sensors is simulated as MQTT messages via the MQTT Client (Mosquitto), which will publish to the Kafka MQTT broker proxy. All the generated messages are then published to the corresponding topics and then a Scylla consumer picks up the messages and stores them into Scylla.


  1. Download Confluent Enterprise

  2. Once the tarball is downloaded – then:

  3. Set the $PATH variable

    For the demo we choose to run the KAFKA Cluster locally but if we want to run this in production we would have to modify a few files to include the actual IP addresses of the cluster:

    • Zookeeper – /etc/kafka/zookeeper.properties
    • Kafka – /etc/kafka/server.properties
    • Schema Registry – /etc/schema-registry/schema-registry.properties

  4. Now we need to start the services Kafka and Zookeeper This should start both zookeeper and Kafka. To do this manually you have to provide these two parameters

  5. Configuring the MQTT proxy

    Inside the directory /etc/confluent-kafka-mqtt there is a file kafka-mqtt-dev.properties file that comes with the confluent distribution and this lists all the available configuration options for MQTT Proxy. Modify these parameters

  6. Create Kafka topics

    The simulated MQTT devices will be publishing to the topics temperature and brightness, so let’s create those topics in Kafka manually.

  7. Start the MQTT proxy

    This is how we start the configured MQTT proxy

  8. Installing the Mosquitto framework

  9. Publish MQTT messages

    We are going to be publishing messages with QoS2, that is the highest quality of service supported by MQTT protocol

  10. Verify messages in Kafka

    Make sure that the messages are published into the kafka topic

  11. To produce a continuous feed of MQTT messages (optional)

    Run this on the terminal

  12. Let’s start a scylla cluster and make it a kafka connect sink

    Note: If you are choosing to use Scylla in a different environment - then start from here https://www.scylladb.com/download/

    Once the cluster comes up with 3 nodes then ssh into each node and uncomment the broadcast address in /etc/scylla/scylla.yaml, change it to the public address of the node. If we are running the demo locally on a laptop or if we are running the Kafka connect framework in another Data Center compared to where the Scylla cluster is running.

  13. Let’s create a file cassandra-sink.properties

    This will enable us to start the connect framework with the necessary properties.

    Add these lines to the properties file

  14. Next we need to download the binaries for the stream reactor

    Now change the plugin.path property in

    To ABSOLUTE_PATH/confluent-5.0.0/lib/stream-reactor-1.1.0-1.1.0/libs/

  15. Now let’s start the connect framework in distributed mode
  16. Make sure that the cassandra-sink.properties file is updated with the necessary contact points of scylla nodes i.e the external IP addresses.

    Make sure that the necessary keyspace and tables with the appropriate schema are created after you CQLSH into the scylla nodes.

    Then to start the sink connector

    After you run the above command, then you should be able to see Scylla as a Cassandra sink and any messages published using the instructions in step-9 will get written to scylla as a downstream system.

  17. Now, let’s try to run a script which can simulate the activity of a MQTT device - you can do this by cloning this repo https://github.com/mailmahee/MQTTKafkaConnectScyllaDB

    And then running

    This script simulates MQTT sensor activity and publish messages to the corresponding topics. Then the connect frameworks drains the messages from the topics into the corresponding tables in Scylla.

You did it!

If you follow the instructions above, you should now be able to connect Kafka and Scylla using the Connect framework. In addition, You should be able to generate MQTT workloads that publish the messages to the corresponding Kafka topics, which are then used for both real-time as well as batch analytics via Scylla.

Given that applications in IoT are by and large based on streaming data, the alignment between MQTT, Kafka and Scylla makes a great deal of sense. With the new Scylla connector, application developers can easily build solutions that harness IoT-scale fleets of devices, as well as store the data from them in Scylla tables for real-time as well as analytic use cases.

Many of ScyllaDB’s IoT customers like General Electric, Grab, Nauto and Meshify use Scylla and Kafka as the backend for handling their application workloads. Whether a customer is rolling out an IoT deployment for commercial fleets, consumer vehicles, remote patient monitoring or a smart grid, our single-minded focus on the IoT market has led to scalable service offerings that are unmatched in cost efficiency, quality of service and reliability.


Try It Yourself

The post Scylla and Confluent Integration for IoT Deployments appeared first on ScyllaDB.

Share Your Knowledge at the Premier Apache Cassandra™ Conference

Here at DataStax we’re already looking forward to DataStax Accelerate, the world’s premier Apache Cassandra™ conference.  The conference is being held May 21-23, 2019 in National Harbor, Maryland.  Register as soon as possible to secure your spot!

Of course, a conference is only as good as the quality of its content, and that’s where you come in. The Cassandra community would love to hear from you. Yes, you! The Call for Papers (CFP) closes Feb. 15, 2019, but we highly encourage you to get your submission in before then.

Why Should You Submit a Paper?

Speaking at an event like DataStax Accelerate can be great for your career, the community, and for your budget. Here’s why:

It’s great for your career

There is no doubt that being good at public speaking, especially when it comes to clearly articulating technical topics, is a great for your career. Being good at presenting will set you apart from many in your company and industry—it means you can deliver sales presentations, influential talks, and engage audiences at group events. Even if you’re not actively seeking career advancement, speaking at conferences can open doors you never knew existed.

It’s great for the community

A key indicator of the health of an open source community is the quality and quantity of its collective set of knowledge. By sharing your knowledge and expertise with your peers, you will only make the Apache Cassandra community stronger. A stronger community means higher-quality and more innovative contributions—a win-win for everyone.

You get a complimentary pass

If your CFP is accepted, you get a FREE pass! Each session is allowed one primary speaker and one co-speaker. The primary speaker will receive a complimentary conference pass and the co-speaker will receive a discounted conference pass.

What Should You Speak About?

If you’re still a bit stumped on what to speak about, here are some potential topics to get you started:

  • Cassandra fundamentals – Apache Cassandra fundamentals and how they apply to building applications in a highly distributed fashion.
  • Modern applications – Examples of how and why Cassandra enables us to rethink what’s possible when it comes to data management for modern applications.
  • Hybrid and multi-cloud – The technical challenges and business impact of implementing hybrid and multi-cloud.
  • Application development – Use cases and best practices for advanced application development with Cassandra.
  • Cassandra operations – Best practices for configuring and maintaining Cassandra.
  • Graph – Application use cases for identifying and analyzing hidden relationships between connected data using Graph.

Tips For Submitting

If you’re not sure what to talk about, have never written a CFP before, or just don’t know where to start, here are some tips for you:

1.    Tell your story

Tell your story and what you learned along the way. Not every talk has to be groundbreaking. Have you ever been stuck on a problem and come up with an innovative solution? Sometimes it’s just a little thing that comes up that starts you thinking in a completely different way. These lessons we learn on a day-to-day basis can also be valuable to others in the community. Of course, if you come up with something completely revolutionary, we’d love to hear about that as well!

2.  Choose your audience

When outlining your talk, decide who your audience is. For example, are they developers, architects, or IT executives? Think of what that attendee might want to hear about. In your submission, if you have a specific audience in mind, describe who they are and what you expect them to gain from your presentation.

3.  Include sufficient detail in your abstracts

The Review Committee has the task of reading every submission and identifying the best talks. Oftentimes ` submissions include very little detail. Unfortunately, without sufficient detail we won’t be able to accurately assess whether your session is a good fit for the conference. You don’t need to write a book, but some details on what the talk is about and what value the attendees will get from it goes a long way. Don’t forget, you are selling your talk and trying to make it stand out.

4.  Pick a focus area and stick with it

When you have been involved in a large project, it’s easy to want to talk about all the details that made it successful. These are 40-minute talks (35 minutes plus a 5-minute Q&A), so narrowing the focus to a few points will really elevate the quality of your talk. This is a technical conference and the people attending will want to know the fine details. Diagrams and small code snippets are always welcome. And avoid the dreaded wall of text! Slides with lots of text and a small font size can be difficult to follow while sitting in the audience.

5. Give the talk you would want to attend

Think about when you were first getting started with Apache Cassandra. What would you have wanted to learn? If you are giving a more advanced talk, think about the kind of information that would be helpful. This is a conference of peers. We talk to each other and learn. Most likely you have seen talks that you have loved and talks that left you wanting more. Think about the great talks that you loved and do that!

6. Avoid sales pitches

We definitely do not expect every presentation to have code snippets and technical deep-dives, but there is one thing that you should avoid when preparing your paper because it takes away from the integrity of the event and is rarely well-received by conference attendees: a sales pitch. There are plenty of ways to give a presentation about projects and technologies without focusing on company-specific efforts. Think of ways to make it interesting for attendees while sharing your experiences, educating the community about an issue, or generating interest in a project.


We understand that getting that paper submitted can be daunting. So, if you need help or just want to talk about your idea, feel free to reach out to us at accelerate@datastax.com.



What’s New for Search in DSE 6.7

DataStax Enterprise (DSE) 6.7 is built on the best distribution of Apache Cassandra™ and DSE 6.7 Search is built with a production-certified version of Apache Solr™ 6.

We are continuing with the themes of improving performance and stability as well as eliminating complexity for users from the previous release.

Here’s a quick tour of some of the enhancements and features found in DSE 6.7 Search.

New Shard Selection Algorithm

A profound feature that will benefit all DSE Search users is the introduction of a new search shard selection algorithm. One of the huge benefits of DSE search, in general, is the ability to leverage search indexes to execute vastly more flexible queries than native CQL, such as full table scans and query tables by non-primary key values. The algorithm that determines the search shards or token ranges to query from the replicas is what enables this capability.

This new algorithm has been introduced to greatly reduce the amount of shard or token filtering the coordinating node must do. This not only greatly reduces the amount of resources required to perform operations, but this reduction in work translates to performance gains since this is the main engine behind distributed search queries. This new algorithm also doesn’t impose the suggested 8 tokens when configuring DSE Search for virtual nodes.

Geospatial Search

Almost all modern or cloud applications have geospatial requirements, and it has been reported that roughly 80% of enterprises manage location data. DSE 6.7 elevates the native geospatial functionality while making it easier to configure, execute, and manage geospatial queries and data types.

Java Topology Suite

With the recent license changes from the Java Topology Suite, the Java library standard for creating and manipulating vector geometry, DataStax is now able to directly package this library directly with DSE, eliminating the need to manually extend DSE’s capabilities.

Data Types

With geospatial functionality becoming more standard, working with the right datatypes is as important as the platform’s query capabilities. DSE can automatically create a search index by inferring the data types in the CQL schema and generating a corresponding Solr schema. DSE 6.7 also handles the now native spatial data types LineStringType, PointType, and PolygonTypes automatically, reducing the steps a user must go through to enable spatial analysis on search applications.

Extra Support for Facet Heatmap

Finally for our geospatial users, DSE 6.7 has added support for Solr’s facet heatmap functionality through our CQL solr_query API allowing users to leverage this powerful feature from Solr to generate heatmaps from location data through the native DSE protocol.

Download DSE 6.7 today.

White paper: DataStax Enterprise and Apache Kafka™ for Modern Architectures


New Major Versions of the DataStax Node.js Drivers

We've just released version 4.0 of the DataStax Node.js Driver for Apache Cassandra and version 2.0 of the DataStax Enterprise Node.js Driver.

Let's have a look at some of the noteworthy features and changes in these releases.

SaaS vs OSS – Fight or flight, round #2

Inspecting Software Licensing (APGL, SSPL, Confluent Community)

To quote Bob Dylan, “the times they are a changin’.” Microsoft loves Linux, IBM buys Red Hat, RedisLabs changes their module license to Commons Clause, Mongo invents Server Side Public License (SSPL) and moves from AGPL, AWS open sources Firecracker and releases a Kafka service, and the hot news from Friday, Confluent changes its license for components of the Confluent Platform from Apache 2.0 to the Confluent Community License.

A few weeks ago I wrote about MongoDB’s SSPL, which is similar to Confluent’s new license. You could say the Confluent Community License is to the Apache license as MongoDB SSPL is to AGPL. Both of these new licenses take the same position in order to protect their assets from cloud providers.

It is hard to blame Confluent for responding to pressure from the AWS Kafka service–and perhaps fear that other cloud vendors will follow. Rightfully, Confluent wishes to enjoy the fruits of their investment in KSQL and, in parallel, still keep it open source. It’s important to note that KSQL isn’t part of the Apache Kafka project, which remains under the ASL2 license and was fully developed by Confluent. To the best of my knowledge, AWS has not provided KSQL as a service but perhaps this is a means of preventing future abuse. AWS adoption of Kafka also signals a victory for of OSS Kafka over the proprietary Kinesis technology.

Although more and more OSS vendors are going down the path of more restrictive licenses, and they might even eventually make these standard, overall this is a step in the wrong direction for our industry. These more restrictive licenses have the regrettable consequence of creating silos where once there was sharing.

Remember that just the opposite was true for large community projects like Linux, KVM, and Hadoop, which saw contributors line up by the hundreds. Unfortunately, pressure from SaaS/Cloud providers is turning OSS vendors toward the undesirable (but perhaps logical) path of more restrictive usage licenses. This is a lose-lose scenario, one that goes against the very spirit of open source.

In a perfect world, customers would not use OSS software-based services from vendors who did not participate in the creation of that software. This is the flag that OSS vendors should carry. On the other hand, it’s hard to compete with an 800-pound gorilla that owns a lot of the mindshare of the industry.

My prediction is that vendors like AWS will receive more criticism and, in response, will lean more and more toward an open source play. Were this a game of chess, a streaming SQL query functionality contribution to Apache Kafka by AWS would result in a call of check. Let’s stay tuned and see how it evolves.

Another class of companies who will be hurt by this licensing trend will be smaller as-a-service vendors that provide MongoDB/Elastic/Kafka/etc. Despite their contributions back to the OSS, these smaller companies will be hampered by the more restrictive licenses, which will keep them from running the very technologies they’ve aided. Examples include mLabs with MongoDB, Instaclustr with Kafka/Confluent, and IBM Compose with a variety of offerings.

This trend is neither healthy nor ideal. We should all want to see OSS vendors and IaaS vendors complement each other — either by contributing to the same OSS project and sharing the commercial benefits or by allowing the OSS vendor to monetize on top of the IaaS market place. As an end user, you should support this cause by directing your buying power toward not only the vendors providing the best services, of course, but also the ones making a difference by aligning themselves with the overall long term value you receive from the ecosystem.

Dor is the co-founder and CEO of ScyllaDB, who develop the AGPL’ed Scylla database and Seastar, its Apache-licensed core engine.

The post SaaS vs OSS – Fight or flight, round #2 appeared first on ScyllaDB.

Implementing the Netflix Media Database

In the previous blog posts in this series, we introduced the Netflix Media DataBase (NMDB) and its salient “Media Document” data model. In this post we will provide details of the NMDB system architecture beginning with the system requirements — these will serve as the necessary motivation for the architectural choices we made. A fundamental requirement for any lasting data system is that it should scale along with the growth of the business applications it wishes to serve. NMDB is built to be a highly scalable, multi-tenant, media metadata system that can serve a high volume of write/read throughput as well as support near real-time queries. At any given time there could be several applications that are trying to persist data about a media asset (e.g., image, video, audio, subtitles) and/or trying to harness that data to solve a business problem.

Some of the essential elements of such a data system are (a) reliability and availability — under varying load conditions as well as a wide variety of access patterns; (b) scalability — persisting and serving large volumes of media metadata and scaling in the face of bursty requests to serve critical backend systems like media encoding, (c) extensibility — supporting a demanding list of features with a growing list of Netflix business use cases, and (d) consistency — data access semantics that guarantee repeatable data read behavior for client applications. The following section enumerates the key traits of NMDB and how the design aims to address them.

System Requirements

Support for Structured Data

The growth of NoSQL databases has broadly been accompanied with the trend of data “schemalessness” (e.g., key value stores generally allow storing any data under a key). A schemaless system appears less imposing for application developers that are producing the data, as it (a) spares them from the burden of planning and future-proofing the structure of their data and, (b) enables them to evolve data formats with ease and to their liking. However, schemas are implicit in a schemaless system as the code that reads the data needs to account for the structure and the variations in the data (“schema-on-read”). This places a burden on applications that wish to consume that supposed treasure trove of data and can lead to strong coupling between the system that writes the data and the applications that consume it. For this reason, we have implemented NMDB as a “schema-on-write” system — data is validated against schema at the time of writing to NMDB. This provides several benefits including (a) schema is akin to an API contract, and multiple applications can benefit from a well defined contract, (b) data has a uniform appearance and is amenable to defining queries, as well as Extract, Transform and Load (ETL) jobs, (c) facilitates better data interoperability across myriad applications and, (d) optimizes storage, indexing and query performance thereby improving Quality of Service (QoS). Furthermore, this facilitates high data read throughputs as we do away with complex application logic at the time of reading data.

A critical component of a “schema-on-write” system is the module that ensures sanctity of the input data. Within the NMDB system, Media Data Validation Service (MDVS), is the component that makes sure the data being written to NMDB is in compliance with an aforementioned schema. MDVS also serves as the storehouse and the manager for the data schema itself. As was noted in the previous post, data schema could itself evolve over time, but all the data, ingested hitherto, has to remain compliant with the latest schema. MDVS ensures this by applying meticulous treatment to schema modification ensuring that any schema updates are fully compatible with the data already in the system.

Multi-tenancy and Access Control

We envision NMDB as a system that helps foster innovation in different areas of Netflix business. Media data analyses created by an application developed by one team could be used by another application developed by another team without friction. This makes multi-tenancy as well as access control of data important problems to solve. All NMDB APIs are authenticated (AuthN) so that the identity of an accessing application is known up front. Furthermore, NMDB applies authorization (AuthZ) filters that whitelists applications or users for certain actions, e.g., a user or application could be whitelisted for read/write/query or a more restrictive read-only access to a certain media metadata.

In NMDB we think of the media metadata universe in units of “DataStores”. A specific media analysis that has been performed on various media assets (e.g., loudness analysis for all audio files) would be typically stored within the same DataStore (DS). while different types of media analyses (e.g., video shot boundary and video face detection) for the same media asset typically would be persisted in different DataStores. A DS helps us achieve two very important purposes (a) serves as a logical namespace for the same media analysis for various media assets in the Netflix catalog, and (b) serves as a unit of access control — an application (or equivalently a team) that defines a DataStore also configures access permissions to the data. Additionally, as was described in the previous blog article, every DS is associated with a schema for the data it stores. As such, a DS is characterized by the three-tuple (1) a namespace, (2) a media analysis type (e.g., video shot boundary data), and (3) a version of the media analysis type (different versions of a media analysis correspond to different data schemas). This is depicted in Figure 1.

Figure 1: NMDB DataStore semantics

We have chosen the namespace portion of a DS definition to correspond to an LDAP group name. NMDB uses this to bootstrap the self-servicing process, wherein members of the LDAP group are granted “admin” privileges and may perform various operations (like creating a DS, deleting a DS) and managing access control policies (like adding/removing “writers” and “readers”). This allows for a seamless self-service process for creating and managing a DS. The notion of a DS is thus key to the ways we support multi-tenancy and fine grained access control.

Integration with other Netflix Systems

In the Netflix microservices environment, different business applications serve as the system of record for different media assets. For example, while playable media assets such as video, audio and subtitles for a title could be managed by a “playback service”, promotional assets such as images or video trailers could be managed by a “promotions service”. NMDB introduces the concept of a “MediaID” (MID) to facilitate integration with these disparate asset management systems. We think of MID as a foreign key that points to a Media Document instance in NMDB. Multiple applications can bring their domain specific identifiers/keys to address a Media Document instance in NMDB. We implement MID as a map from strings to strings. Just like the media data schema, an NMDB DS is also associated with a single MID schema. However unlike the media data schema, MID schema is immutable. At the time of the DS definition, a client application could define a set of (name, value) pairs against which all of the Media Document instances would be stored in that DS. A MID handle could be used to fetch documents within a DS in NMDB, offering convenient access to the most recent or all documents for a particular media asset.

SLA Guarantees

NMDB serves different logically tiered business applications some of which are deemed to be more business critical than others. The Netflix media transcoding sub-system is an example of a business critical application. Applications within this sub-system have stringent consistency, durability and availability needs as a large swarm of microservices are at work generating content for our customers. A failure to serve data with low latency would stall multiple pipelines potentially manifesting as a knock-on impact on secondary backend services. These business requirements motivated us to incorporate immutability and read-after-write consistency as fundamental precepts while persisting data in NMDB.

We have chosen the high data capacity and high performance Cassandra (C*) database as the backend implementation that serves as the source of truth for all our data. A front-end service, known as Media Data Persistence Service (MDPS), manages the C* backend and serves data at blazing speeds (latency in the order of a few tens of milliseconds) to power these business critical applications. MDPS uses local quorum for reads and writes to guarantee read-after-write consistency. Data immutability helps us sidestep any conflict issues that might arise from concurrent updates to C* while allowing us to perform IO operations at a very fast clip. We use a UUID as the primary key for C*, thus giving every write operation (a MID + a Media Document instance) a unique key and thereby avoiding write conflicts when multiple documents are persisted against the same MID. This UUID (also called as DocumentID) also serves as the primary key for the Media Document instance in the context of the overall NMDB system. We will touch upon immutability again in later sections to show how we also benefited from it in some other design aspects of NMDB.

Flexibility of Queries

The pivotal benefit of data modeling and a “schema-on-write” system is query-ability. Technical metadata residing in NMDB is invaluable to develop new business insights in the areas of content recommendations, title promotion, machine assisted content quality control (QC), as well as user experience innovations. One of the primary purposes of NMDB is that it can serve as a data warehouse. This brings the need for indexing the data and making it available for queries, without a priori knowledge of all possible query patterns.

In principle, a graph database can answer arbitrary queries and promises optimal query performance for joins. For that reason, we explored a graph-like data-model so as to address our query use cases. However, we quickly learnt that our primary use case, which is spatio-temporal queries on the media timeline, made limited use of database joins. And in those queries, where joins were used, the degree of connectedness was small. In other words the power of graph-like model was underutilized. We concluded that for the limited join query use-cases, application side joins might provide satisfactory performance and could be handled by an application we called Media Data Query Service (MDQS). Further, another pattern of queries emerged — searching unstructured textual data e.g., mining movie scripts data and subtitle search. It became clear to us that a document database with search capabilities would address most of our requirements such as allowing a plurality of metadata, fast paced algorithm development, serving unstructured queries and also structured queries even when the query patterns are not known a priori.

Elasticsearch (ES), a highly performant scalable document database implementation fitted our needs really well. ES supports a wide range of possibilities for queries and in particular shines at unstructured textual search e.g., searching for a culturally sensitive word in a subtitle asset that needs searching based on a stem of the word. At its core ES uses Lucene — a powerful and feature rich indexing and searching engine. A front-end service, known as Media Data Analysis Service (MDAS), manages the NMDB ES backend for write and query operations. MDAS implements several optimizations for answering queries and indexing data to meet the demands of storing documents that have varying characteristics and sizes. This is described more in-depth later in this article.

A Data System from Databases

As indicated above, business requirements mandated that NMDB be implemented as a system with multiple microservices that manage a polyglot of DataBases (DBs). The different constituent DBs serve complementary purposes. We are however presented with the challenge of keeping the data consistent across them in the face of the classic distributed systems shortcomings — sometimes the dependency services can fail, sometimes service nodes can go down or even more nodes added to meet a bursty demand. This motivates the need for a robust orchestration service that can (a) maintain and execute a state machine, (b) retry operations in the event of transient failures, and (c) support asynchronous (possibly long running) operations such as queries. We use the Conductor orchestration framework to coordinate and execute workflows related to the NMDB Create, Read, Update, Delete (CRUD) operations and for other asynchronous operations such as querying. Conductor helps us achieve a high degree of service availability and data consistency across different storage backends. However, given the collection of systems and services that work in unison it is not possible to provide strong guarantees on data consistency and yet remain highly available for certain use cases, implying data read skews are not entirely avoidable. This is true in particular for query APIs — these rely on successful indexing of Media Document instances which is done as an asynchronous, background operation in ES. Hence queries on NMDB are expected to be eventually consistent.

Figure 2: Block diagram of NMDB system

Figure 2 shows the NMDB system block diagram. A front end service that shares its name with the NMDB system serves as the gateway to all CRUD and query operations. Read APIs are performed synchronously while write and long running query APIs are managed asynchronously through Conductor workflows. Circling back to the point of data immutability that was discussed previously — another one of its benefits is that it preserves all writes that could occur e.g., when a client or the Conductor framework retries a write perhaps because of transient connection issues. While this does add to data footprint but the benefits such as (a) allowing for lockless retries, (b) eliminating the need for resolving write conflicts and (c) mitigating data loss, far outweigh the storage costs.

Included in Figure 2 is a component named Object Store that is a part of the NMDB data infrastructure. Object Store is a highly available, web-scale, secure storage service such as Amazon’s Simple Storage Service (S3). This component ensures that all data being persisted is chunked and encrypted for optimal performance. It is used in both write and read paths. This component serves as the primary means for exchanging Media Document instances between the various components of NMDB. Media Document instances can be large in size (several hundreds of MBs — perhaps because a media analysis could model metadata e.g., about every frame in a video file. Further, the per frame data could explode in size due to some modeling of spatial attributes such as bounding boxes). Such a mechanism optimizes bandwidth and latency performance by ensuring that Media Document instances do not have to travel over the wire between the different microservices involved in the read or the write path and can be downloaded only where necessary.

NMDB in Action

While the previous sections discussed the key architectural traits, in this section we dive deeper into the NMDB implementation.

Writing data into NMDB

Figure 3: Writing a Media Document Instance to NMDB

The animation shown in Figure 3 details the machinery that is set in action when we write into NMDB. The write process begins with a client application that communicates its intent to write a Media Document instance. NMDB accepts the write request by submitting the job to the orchestration framework (Conductor) and returns a unique handle to identify the request. This could be used by the client to query on the status of the request. Following this, the schema validation, document persistence and document indexing steps are performed in that order. Once the document is persisted in C* it becomes available for read with strong consistency guarantees and is ready to be used by read-only applications. Indexing a document into ES can be a high latency operation since it is a relatively more intensive procedure that requires multiple processes coordinating to analyze the document contents, and update several data structures that enable efficient search and queries.

Also, noteworthy is the use of an Object store to optimize IO across service components (as was discussed earlier). NMDB leverages a cloud storage service (e.g., AWS S3 service) to which a client first uploads the Media Document instance data. For each write request to NMDB, NMDB generates a Type-IV UUID that is used to compose a key. The key in turn is used to compose a unique URL to which the client uploads the data it wishes to write into NMDB. This URL is then passed around as a reference for the Media Document instance data.

Scaling Strategies

From the perspective of writing to NMDB, some of the NMDB components are compute heavy while some others are IO heavy. For example, the bottle neck for MDVS is CPU as well as memory (as it needs to work with large documents for validation). On the other hand MDAS is bound by network IO as well (Media Document instances need to be downloaded from NMDB Object Store to MDAS so that they can be indexed). Different metrics can be used to configure a continuous deployment platform, such as Spinnaker for load balancing and auto-scaling for NMDB. For example, “requests-per-second” (RPS) is commonly used to auto-scale micro services to serve increased reads or queries. While RPS or CPU usage could be useful metrics for scaling synchronous services, asynchronous APIs (like storing a document in NMDB) bring in the requirement of monitoring queue depth to anticipate work build up and scale accordingly.

Figure 4: Scaling the NMDB service plane

The strategy discussed above gives us a good way to auto-scale the NMDB micro services layer (identified as “Service Plane” in Figure 4) quasi-linearly. However as seen in Figure 4, the steady state RPS that the system can support eventually plateaus at which point scaling the Service Plane does not help improve SLA. At this point it should be amply clear that the data nodes (identified as “Data Backend”) have reached their peak performance limits and need to be scaled. However, distributed DBs do not scale as quickly as services and horizontal or vertical scaling may take a few hours to days, depending on data footprint size. Moreover, while scaling the Service Plane can be an automated process, adding more data nodes (C* or ES) to scale the Data Backend is typically done manually. However, note that once the Data Backend is scaled up (horizontal and/or vertically), the effects of scaling the Service Plane manifests as an increased steady state RPS as seen in Figure 4.

An important point related to scaling data nodes, which is worth mentioning is the key hashing strategy that each DB implements. C* employs consistent key hashing and hence adding a node distributes the data uniformly across nodes. However, ES deploys a modulus based distributed hashing. Here adding a data node improves distribution of shards across the available nodes, which does help alleviate query/write bottlenecks to an extent. However, as the size of shards grow over time, horizontal scaling might not help improve query/write performance as shown in Figure 5.

Figure 5: ES scaling strategy

ES mandates choosing the number of shards for every index at the time of creating an index, which cannot be modified without going through a reindexing step which is expensive and time consuming for large amounts of data. A fixed pre-configured shard size strategy could be used for timed data such as logs, where new shards could be created while older shards are discarded. However, this strategy cannot be employed by NMDB since multiple business critical applications could be using the data, in other words data in NMDB needs to be durable and may not ever be discarded. However, as discussed above large shard sizes affect query performance adversely. This calls for some application level management for relocating shards into multiple indices as shown in Figure 6.

Figure 6: Creating new ES indices over time

Accordingly, once an index grows beyond a threshold, MDAS creates a different index for the same NMDB DS, thereby allowing indices to grow over time and yet keeping the shard size within a bound for optimal write/query performance. ES has a feature called index aliasing that is particularly helpful for alleviating performance degradation that is caused due to large shard sizes which is suitable for the scenario we explained. An index alias could point to multiple indices and serve queries by aggregating search results across all the indices within the alias.

Indexing Data in NMDB at Scale

A single Media Document instance could be large ranging from hundreds of MBs to several GBs. Many document databases (including ES) have a limit on the size of a document after which DB performance degrades significantly. Indexing large documents can present other challenges on a data system such as requiring high network I/O connections, increased computation and memory costs, high indexing latencies as well as other adverse effects.

In principle, we could apply the ES parent-child relationship at the various levels of the Media Document hierarchy and split up a Media Document instance into several smaller ES documents. However, the ES parent-child relationship is a two-level relationship and query performance suffers when multiple such relationships are chained together to represent a deeply nested model (the NMDB Media Document model exhibits upto five levels of nesting). Alternately, we could consider modeling it as a two-level relationship with the high cardinality entities (“Event” and “Region”) on the “child” side of the relationship. However, Media Document could contain a huge number of “Event” and “Region” entities (hundreds of thousands of Events and tens of Regions per Event are typical for an hour of content) which would result in a very large number of child documents. This could also adversely impact query performance.

To address these opposing limitations, we came up with the idea of using “data denormalization. Adopting this needs more thought since data denormalization can potentially lead to data explosion. Through a process referred to as “chunking”, we split up large document payloads into multiple smaller documents prior to indexing them in ES. The smaller chunked documents could be indexed by using multiple threads of computation (on a single service node) or multiple service nodes — this results in better workload distribution, efficient memory usage, avoids hot spots and improves indexing latencies (because we are processing smaller chunks of data concurrently). We utilized this approach simultaneously with some careful decisions around what data we denormalize in order to provide optimal indexing and querying performance. More details of our implementation are presented as follows.

Chunking Media Document Instances

The hierarchical nature of the Media Document model (as explained in the previous blog post) requires careful consideration while chunking as it contains relationships between its entities. Figure 7 depicts the pre-processing we perform on a Media Document instance prior to indexing it in ES.

Figure 7: An efficient strategy for indexing Media Document Instances in ES
  • Each Media Document instance is evenly split into multiple chunks with smaller size (of the order of a few MBs).
  • Asset, Track and Component level information is denormalized across all the chunks and a parent document per chunk with this information is indexed in ES. This denormalization of parent document across different chunks also helps us to overcome a major limitation with ES parent-child relationship, that is the parent document and all the children documents must belong to same shard.
  • At the level of an event, data is denormalized across all the regions and a child document per region is indexed in ES.

This architecture allows distribution of Media Document instances across multiple nodes and speeds up indexing as well as query performance. At query time, MDAS uses a combination of different strategies depending on the query patterns for serving queries efficiently

  • ES parent-child join queries are used to speed up query performance where needed.
  • In another query pattern, the parent documents are queried followed by children documents and application side joins are performed in MDAS to create search results.

Serving Queries & Analytics

As noted earlier, NMDB has a treasure trove of indexed media metadata and lots of interesting insight could be developed by analyzing it. The MDAS backend with ES forms the backbone of analytical capabilities of NMDB. In a typical analytics usage, NMDB users are interested in two types of queries:

  1. A DS level query to retrieve all documents that match the specified query. This is similar to filtering of records using SQL ‘WHERE’ clause. Filtering can be done on any of the entities in a Media Document instance using various condition operators ‘=’ , ‘>’ or ‘<’ etc. Conditions can also be grouped using logic operators like OR, AND or NOT etc.
  2. A more targeted query on a Media Document instance using a Document ID handle to retrieve specific portions of the document. In this query type, users can apply conditional filtering on each of the entities of a Media Document instance and retrieve matching entities.

The two query types target different use cases. Queries of the first type span an entire NMDB DS and can provide insights into which documents in a DS match the specified query. Considering the huge payload of data corresponding to Media Document instances that match a query of the first type, NMDB only returns the coordinates (DocumentID and MID) of the matching documents. The second query type can be used to target a specific Media Document instance using DocumentID and retrieve portions of the document with conditional filtering applied. For example, only a set of events that satisfy a specified query could be retrieved, along with Track and Component level metadata. While it is typical to use the two types of queries in succession, in the event where a document handle is already known one could glean more insights into the data by directly executing the second query type on a specific Media Document instance.

As explained earlier, chunking Media Document instances at the time of indexing comes very handy in optimizing queries. Since relationships between the different entities of a Media Document instance are preserved, cross-entity queries can be handled at the ES layer. For example, a Track can be filtered out based on the number of Events it contains or if it contains Events matching the specified query. The indexing strategy as explained earlier can be contrasted with the nested document approach of ES. Indexing Event and Region level information as children documents helps us output the search results more efficiently.

What’s next

As explained in the previous blog post, the Media Document model has a hierarchical structure and offers a logical way of modeling media timeline data. However, such a hierarchical structure is not optimal for parallel processing. In particular validation (MDVS) and indexing (MDAS) services could benefit immensely by processing a large Media Document instance in parallel thereby reducing write latencies. A compositional structure for Media Document instances would be more amenable to parallel processing and therefore go a long way in alleviating the challenges posed by large Media Document instances. Briefly, such a structure implies a single media timeline is composed of multiple “smaller” media timelines, where each media timeline is represented by a corresponding “smaller” Media Document instance. Such a model would also enable targeted reads that do not require reading the entire Media Document instance.

On the query side, we anticipate a growing need for performing joins across different NMDB DataStores — this could be computationally intensive in some scenarios. This along with the high storage costs associated with ES is motivating us to look for other “big-data” storage solutions. As NMDB continues to be the media metadata platform of choice for applications across Netflix, we will continue to carefully consider new use cases that might need to be supported and evaluate technologies that we will need to onboard to address them. Some interesting areas of future work could involve exploring Map-Reduce frameworks such as Apache Hadoop, for distributed compute, query processing, relational databases for their transactional support, and other Big Data technologies. Opportunities abound in the area of media-oriented data systems at Netflix especially with the anticipated growth in business applications and associated data.

— by Shinjan Tiwary, Sreeram Chakrovorthy, Subbu Venkatrav, Arsen Kostenko, Yi Guo and Rohit Puri

Implementing the Netflix Media Database was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Introducing DataStax Node.js Mapper for Apache Cassandra

The new object Node.js Object Mapper for Apache Cassandra lets you interact with your data like you would interact with a set of documents.

Going Head-to-Head: Scylla vs Amazon DynamoDB

Going Head-to-Head: Scylla vs Amazon DynamoDB

“And now for our main event! Ladies and gentlemen, in this corner, weighing in at 34% of the cloud infrastructure market, the reigning champion and leader of the public cloud…. Amazon!” Amazon has unparalleled expertise at maximizing scalability and availability for a vast array of customers using a plethora of software products. While Amazon offers software products like DynamoDB, it’s database-as-a-service is only one of their many offerings.

“In the other corner is today’s challenger — young, lightning quick and boasting low-level Big Data expertise… ScyllaDB!” Unlike Amazon, our company focuses exclusively on creating the best database for distributed data solutions.

A head-to-head database battle between Scylla and DynamoDB is a real David versus Goliath situation. It’s Rocky Balboa versus Apollo Creed. Is it possible Scylla could deliver an unexpected knockout punch against DynamoDB? [SPOILER ALERT: Our results will show Scylla has 1/4th the latencies and is only 1/7th the cost of DynamoDB — and this is in the most optimized case for Dynamo. Watch closely as things go south for Dynamo in Round 6. Please keep reading to see how diligent we were in creating a fair test case and other surprise outcomes from our benchmark battle royale.]

To be clear, Scylla is not a competitor to AWS at all. Many of our customers deploy Scylla to AWS, we ourselves find it to be an outstanding platform, and on more than one occasion we’ve blogged about its unique bare metal instances. Here’s further validation — our Scylla Cloud service runs on top of AWS. But we do think we might know a bit more about building a real-time big data database, so we limited the scope of this competitive challenge solely to Scylla versus DynamoDB, database-to-database.

Scylla is a drop-in replacement for Cassandra, implemented from scratch in C++. Cassandra itself was a reimplementation of concepts from the Dynamo paper. So, in a way, Scylla is the “granddaughter” of Dynamo. That means this is a family fight, where a younger generation rises to challenge an older one. It was inevitable for us to compare ourselves against our “grandfather,” and perfectly in keeping with the traditions of Greek mythology behind our name.

If you compare Scylla and Dynamo, each has pros and cons, but they share a common class of NoSQL database: Column family with wide rows and tunable consistency. Dynamo and its Google counterpart, Bigtable, were the first movers in this market and opened up the field of massively scalable services — very impressive by all means.

Scylla is much younger opponent, just 4.5 years in age. Though Scylla is modeled on Cassandra, Cassandra was never our end goal, only a starting point. While we stand on the shoulders of giants in terms of existing design, our proven system programing abilities have come heavily into play and led to performance to the level of a million operations per second per server. We recently announced feature parity (minus transactions) with Cassandra, and also our own database-as-a-service offering, Scylla Cloud.

But for now we’ll focus on the question of the day: Can we take on DynamoDB?

Rules of the Game

With our open source roots, our culture forces us to be fair as possible. So we picked a reasonable benchmark scenario that’s supposed to mimic the requirements of a real application and we will judge the two databases from the user perspective. For the benchmark we used Yahoo! Cloud Serving Benchmark (YCSB) since it’s a cross-platform tool and an industry standard. The goal was to meet a Service Level Agreement of 120K operations per second with a 50:50 read/write split (YCSB’s workload A) with a latency under 10ms in the 99% percentile. Each database would provision the minimal amount of resources/money to meet this goal. Each DB should be populated first with 1 billion rows using the default, 10 column schema of YCSB.

We conducted our tests using Amazon DynamoDB and Amazon Web Services EC2 instances as loaders. Scylla also used Amazon Web Services EC2 instances for servers, monitoring tools and the loaders.

These tests were conducted on Scylla Open Source 2.1, which is the code base for Scylla Enterprise 2018.1. Thus performance results for these tests will hold true across both Open Source and Enterprise. However, we use Scylla Enterprise for comparing Total Cost of Ownership

DynamoDB is known to be tricky when the data distribution isn’t uniform, so we selected uniform distribution to test Dynamo within its sweet spot. We set 3 nodes of i3.8xl for Scylla, with replication of 3 and quorum consistency level, loaded the 1 TB dataset (replicated 3 times) and after 2.5 hours it was over, waiting for the test to begin.

Scylla Enterprise Amazon DynamoDB
Scylla Cluster
  • i3.8xlarge | 32 vCPU | 244 GiB | 4 x 1.9TB NVMe
  • 3-node cluster on single DC | RF=3
  • Dataset: ~1.1TB (1B partitions / size: ~1.1Kb)
  • Total used storage: ~3.3TB
Provisioned Capacity
  • 160K write | 80K read (strong consistency)
  • Dataset: ~1.1TB (1B partitions / size: ~1.1Kb)
  • Storage size: ~1.1 TB (DynamoDB table metrics)
  • Workload-A: 90 min, using 8 YCSB clients, every client runs on its own data range (125M partitions)
  • Loaders: 4 x m4.2xlarge (8 vCPU | 32 GiB RAM), 2 loaders per machine
  • Scylla workloads runs with Consistency Level = QUORUM for writes and reads.
  • Scylla starts with a cold cache in all workloads.
  • DynamoDB workloads ran with dynamodb.consistentReads = true
  • Sadly for DynamoDB, each item weighted 1.1kb – YCSB default schema, thus each write originated in two accesses

Let the Games Begin!

We started to populate Dynamo with the dataset. However, not so fast..

High Rate of InternetServerError

Turns out the population stage is hard on DynamoDB. We had to slow down the population rate time and again, despite it being well within the reserved IOPS. Sometimes we managed to populate up to 0.5 billion rows before we started to receive the errors again.

Each time we had to start over to make sure the entire dataset was saved. We believe DynamoDB needs to break its 10GB partitions through the population and cannot do it in parallel to additional load without any errors. The gory details:

  • Started population with Provisioned capacity: 180K WR | 120K RD.
    • ⚠ We hit errors on ~50% of the YCSB threads causing them to die when using ≥50% of write provisioned capacity.
    • For example, it happened when we ran with the following throughputs:
      • 55 threads per YCSB client = ~140K throughput (78% used capacity)
      • 45 threads per YCSB client = ~130K throughput (72% used capacity)
      • 35 threads per YCSB client = ~96K throughput (54% used capacity)

After multiple attempts with various provisioned capacities and throughputs, eventually a streaming rate was found that permitted a complete database population. Here are the results of the population stage:

YCSB Workload / Description Scylla Open Source 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
100% Write

1B partitions (~1.1Kb)


Overall Throughput(ops/sec): 104K
Avg Load (scylla-server): ~85%

INSERT operations (Avg): 125M
Avg. 95th Percentile Latency (ms): 8.4
Avg. 99th Percentile Latency (ms): 11.3
Overall Throughput(ops/sec): 51.7K
Max Consumed capacity: WR 75%

INSERT operations (Avg): 125M
Avg. 95th Percentile Latency (ms): 7.5
Avg. 99th Percentile Latency (ms): 11.6

Scylla completed the population at twice the speed but more importantly, worked out of the box without any errors or pitfalls.

YCSB Workload A, Uniform Distribution

Finally, we began the main test, the one that gauges our potential user workload with an SLA of 120,000 operations. This scenario is supposed to be DynamoDB’s sweet spot. The partitions are well balanced and the load isn’t too high for DynamoDB to handle. Let’s see the results:

YCSB Workload /
Scylla Open Source 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
Workload A
50% Read / 50% Write

1B partitions (~1.1Kb)


Duration: 90 min.
Overall Throughput(ops/sec): 119.1K
Avg Load (scylla-server): ~58%

READ operations (Avg): ~39.93M
Avg. 95th Percentile Latency (ms): 5.0
Avg. 99th Percentile Latency (ms): 7.2

UPDATE operations (Avg): ~39.93M
Avg. 95th Percentile Latency (ms): 3.4
Avg. 99th Percentile Latency (ms): 5.6
Overall Throughput(ops/sec): 120.1K
Avg Load (scylla-server): ~WR 76% | RD 76%

READ operations (Avg): ~40.53M
Avg. 95th Percentile Latency (ms): 12.0
Avg. 99th Percentile Latency (ms): 18.6

UPDATE operations (Avg): ~40.53M
Avg. 95th Percentile Latency (ms): 13.2
Avg. 99th Percentile Latency (ms): 20.2

After all the effort of loading the data, DynamoDB was finally able to demonstrate its value. DynamoDB met the throughput SLA (120k OPS). However, it failed to meet the latency SLA of 10ms for 99%, but after the population difficulties we were happy to get to this point.

Scylla on the other hand, easily met the throughput SLA, with only 58% load and latency. That was 3x-4x better than DynamoDB and well below our requested SLA. (Also, what you don’t see here is the huge cost difference, but we’ll get to that in a bit.)

We won’t let DynamoDB off easy, however. Now that we’ve seen how DynamoDB performs with its ideal uniform distribution, let’s have a look at how it behaves with a real life use-case.

Real Life Use-case: Zipfian Distribution

A good schema design goal is to have the perfect, uniform distribution of your primary keys. However, in real life, some keys are accessed more than others. For example, it’s common practice to use UUID for the customer or the product ID and to look them up. Some of the customers will be more active than others and some products will be more popular than others, so the differences in access times can go up to 10x-1000x. Developers cannot improve the situation in the general case since if you add an additional column to the primary key in order to improve the distribution, you may improve the specific access but at the cost of complexity when you retrieve the full information about the product/customer. 

Keep in mind what you store in a database. It’s data such as how many people use Quora or how many likes NBA teams have:

With that in mind, let’s see how ScyllaDB and DynamoDB behave given a Zipfian distribution access pattern. We went back to the test case of 1 billion keys spanning 1TB of pre-replicated dataset and queried it again using YCSB Zipfian accesses. It is possible to define the hot set of partitions in terms of volume — how much data is in it — and define the percentile of access for this hot set as part from the overall 1TB set.

We set a variety of parameters for the hot set and the results were pretty consistent – DynamoDB could not meet the SLA for Zipfian distribution. It performed well below its reserved capacity — only 42% utilization — but it could not execute 120k OPS. In fact, it could do only 65k OPS. The YCSB client experienced multiple, recurring ProvisionedThroughputExceededException (code: 400) errors, and throttling was imposed by DynamoDB.

YCSB Workload /
Scylla 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
Workload A
50% Read / 50% Write

1B partitions

Distribution: Zipfian

Duration: 90 min.

Hot set: 10K partitions
Hot set access: 90%
Overall Throughput(ops/sec): 120.2K
Avg Load (scylla-server): ~55%

READ operations (Avg): ~40.56M
Avg. 95th Percentile Latency (ms): 6.1
Avg. 99th Percentile Latency (ms): 8.6

UPDATE operations (Avg): ~40.56M
Avg. 95th Percentile Latency (ms): 4.4
Avg. 99th Percentile Latency (ms): 6.6
Overall Throughput(ops/sec): 65K
Avg Load (scylla-server): ~WR 42% | RD 42%

READ operations (Avg): ~21.95M
Avg. 95th Percentile Latency (ms): 6.0
Avg. 99th Percentile Latency (ms): 9.2

UPDATE operations (Avg): ~21.95M
Avg. 95th Percentile Latency (ms): 7.3
Avg. 99th Percentile Latency (ms): 10.8

Why can’t DynamoDB meet the SLA in this case? The answer lies within the Dynamo model. The global reservation is divided to multiple partitions, each no more than 10TB in size.

DynamoDB partition equations

This when such a partition is accessed more often it may reach its throttling cap even though overall you’re well within your global reservation. In the example above, when reserving 200 writes, each of the 10 partitions cannot be queried more than 20 writes/s

The Dress that Broke DynamoDB

If you asked yourself, “Hmmm, is 42% utilization the worst I’d see from DynamoDB?” we’re afraid we have some bad news for you. Remember the dress that broke the internet? What if you have an item in your database that becomes extremely hot? To explore this, we tested a single hot partition access and compared it.

The Dress that Broke the Internet

We ran a single YCSB, working on a single partition on a 110MB dataset (100K partitions). During our tests, we observed a DynamoDB limitation when a specific partition key exceeded 3000 read capacity units (RCU) and/or 1000 write capacity units (WCU).

Even when using only ~0.6% of the provisioned capacity (857 OPS), the YCSB client experienced ProvisionedThroughputExceededException (code: 400) errors, and throttling was imposed by DynamoDB (see screenshots below).

It’s not that we recommend not planning for the best data model. However, there will always be cases when your plan is far from reality. In the Scylla case, a single partition still performed reasonably well: 20,200 OPS with good 99% latency.

Scylla vs DynamoDB – Single (Hot) Partition

YCSB Workload /
Scylla 2.1 (3x i3.8xlarge)
8 YCSB Clients
DynamoDB (160K WR | 80K RD)
8 YCSB clients
Workload A
50% Read / 50% Write


Single partition (~1.1Kb)

Distribution: Uniform

Duration: 90 min.
Overall Throughput(ops/sec): 20.2K
Avg Load (scylla-server): ~5%

READ operations (Avg): ~50M
Avg. 95th Percentile Latency (ms): 7.3
Avg. 99th Percentile Latency (ms): 9.4

UPDATE operations (Avg): ~50M
Avg. 95th Percentile Latency (ms): 2.7
Avg. 99th Percentile Latency (ms): 4.5
Overall Throughput(ops/sec): 857
Avg Load (scylla-server): ~WR 0.6% | RD 0.6%

READ operations (Avg): ~2.3M
Avg. 95th Percentile Latency (ms): 5.4
Avg. 99th Percentile Latency (ms): 10.7

UPDATE operations (Avg): ~2.3M
Avg. 95th Percentile Latency (ms): 7.7
Avg. 99th Percentile Latency (ms): 607.8
Screenshot 1: Single partition.

Screenshot 1: Single partition. Consumed capacity: ~0.6% -> Throttling imposed by DynamoDB

Additional Factors

Cross-region Replication and Global Tables

We compared the replication speed between datacenters and a simple comparison showed that DynamoDB replicated in 370ms on average to a remote DC while Scylla’s average was 82ms. Since the DynamoDB cross-region replication is built on its streaming api, we believe that when congestion happens, the gap will grow much further into a multi-second gap, though we haven’t yet tested it.

Beyond replication propagation, there is a more burning functional difference — Scylla can easily add regions on demand at any point in the process with a single command:

ALTER KEYSPACE mykespace WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor': '3', '<exiting_dc>' : 3, <new_dc> : 4};

In DynamoDB, on the other hand, you must define your global tables ahead of time. This imposes a serious usability issue and a major cost one as you may need to grow the amount of deployed datacenters over time.

Why start with global Tables..? (quote)

Explicit Caching is Expensive and Bad for You

DynamoDB performance can improve and its high cost can be reduced in some cases when using DAX. However, Scylla has a much smarter and more efficient embedded cache (the database nodes have memory, don’t they?) and the outcome is far better for various reasons we described in a recent blog post.


This is another a major advantage of Scylla — DynamoDB locks you to the AWS cloud, significantly decreasing your chances of ever moving out. Data gravity is significant. No wonder they’re going after Oracle!

Scylla is an open source database. You have the freedom to choose between our community version, an Enterprise version and our new fully managed service. Scylla runs on all major cloud providers and opens the opportunity for you to run some datacenters on one provider and others on another provider within the same cluster. One of our telco customers is a great example of the hybrid model — they chose to run some of their datacenters on-premise and some on AWS.

Our approach for “locking-in” users is quite different — we do it solely by the means of delivering quality and value such that you won’t want to move away from us. As of today, we have experienced exactly zero customer churn.

No Limits

DynamoDB imposes various limits on the size of each cell — only 400kb. In Scylla you can effectively store megabytes. One of our customers built a distributed storage system using Scylla, keeping large blobs in Scylla with single-digit millisecond latency for them too.

Another problematic limit is the sort key amount, DynamoDB cannot hold more than 10GB items. While this isn’t a recommended pattern in Scylla either, we have customers who keep 130GB items in a single partition. The effect of these higher limits is more freedom in data modeling and fewer reasons to worry. 

Total Cost of Ownership (TCO)

We’re confident the judges would award every round of this battle to Scylla so far, and we haven’t even gotten to comparing the total cost of ownership. The DynamoDB setup, which didn’t even meet the required SLA and which caused us to struggle multiple times to even get working, costs 7 times more than the comparable Scylla setup.

Scylla Enterprise
(3 x i3.8xlarge + Scylla Enterprise license)
Amazon DynamoDB
(160K write | 80K Read + Business-level Support)

Year-term Estimated Cost: ~$71K

Year-term Estimated Cost: ~$524K

  • DynamoDB 1-year term: ~$288K
  • Monthly fee : ~$19.7K/month (~236K annual)

Note that only 3 machines were needed for Scylla; not much of a challenge in terms of administration. And, as we mentioned earlier, you can offload all your database administration with our new fully managed cloud service, Scylla Cloud. (By the way, Scylla Cloud comes in at 4-6x less expensive than DynamoDB, depending on the plan.)

Final Decision: A Knockout!

Uniform 99% ms Latency
Zipfian Distribution Throughput
  • DynamoDB failed to achieve the required SLA multiple times, especially during the population phase.
  • DynamoDB has 3x-4x the latency of Scylla, even under ideal conditions
  • DynamoDB is 7x more expensive than Scylla
  • Dynamo was extremely inefficient in a real-life Zipfian distribution. You’d have to buy 3x your capacity, making it 20x more expensive than Scylla
  • Scylla demonstrated up to 20x better throughput in the hot-partition test with better latency numbers
  • Last but not least, Scylla provides you freedom of choice with no cloud vendor lock-in (as Scylla can be run on various cloud vendors, or even on-premises).

Still not convinced? Listen to what our users have to say.

If you’d like to try your own comparison, remember that our product is open source. Feel free to download now. We’d love to hear from you if you have any questions about how we stack up or if you’d like to share your own results. And we’ll end with a final reminder that our Scylla Cloud (now available in Early Access) is built on Scylla Enterprise, delivering similar price-performance advantages while eliminating administrative overhead.

The post Going Head-to-Head: Scylla vs Amazon DynamoDB appeared first on ScyllaDB.

Scylla Manager 1.3 Release Announcement

Scylla Manager Release

The Scylla Enterprise team is pleased to announce the release of Scylla Manager 1.3, a production-ready release of Scylla Manager for Scylla Enterprise customers.

Scylla Manager 1.3 adds a new Health Check, which works as follows.. Scylla nodes are already reporting on their status through “nodetool status” and via Scylla Monitoring Stack dashboards; but in some cases, it is not enough. A node might report an Up-Normal (UN) status, while in fact, it is slow or not responding to CQL requests. This might be a result of an internal problem in the node, or an external issue (for example, a blocked CQL port somewhere between the application and the Scylla node).

Scylla Manager’s new Health Check functionality helps identify such issues as soon as possible, playing a similar role to an application querying the CQL interface from outside the Scylla cluster.

Scylla Manager 1.3 automatically adds a new task to each a new managed cluster. This task is a health check which sends a CQL OPTION command to each Scylla node and measures the response time. If there is a response faster than 250ms the node is considered to be ‘up’. If there is no response or the response takes longer than 250 ms, the node is considered to be ‘down’. The results are available using the “sctool status” command.

Scylla Manager 1.3 Architecture, including the Monitoring Stack, and the new CQL base Health Check interface to Scylla nodes.

If you have enabled the Scylla Monitoring stack, Monitoring stack 2.0 Manager dashboard includes the same cluster status report. A new Alert was defined in Prometheus Alert Manager, to report when a Scylla node health check fails and the node is considered ‘down’.

Example of Manager 1.3 Dashboard, including an active repair running, and Health Check reports of all nodes responding to CQL.

Related links:

Upgrade to Scylla Manager 1.3

Read the upgrade guide carefully. In particular, you will need to redefine scheduled repairs. Please contact Scylla Support team for help in installing and upgrading Scylla Manager.


Scylla Grafana Monitoring 2.0 now includes the Scylla Manager 1.3 dashboard

About Scylla Manager

Scylla Manager adds centralized cluster administration and recurrent task automation to Scylla Enterprise. Scylla Manager 1.x includes automation of periodic repair. Future releases will provide rolling upgrades, recurrent backup, and more. With time, Scylla Manager will become the focal point of Scylla Enterprise cluster management, including a GUI front end. Scylla Manager is available for all Scylla Enterprise customers. It can also be downloaded from scylladb.com for a 30-day trial.

The post Scylla Manager 1.3 Release Announcement appeared first on ScyllaDB.

Rolling Reboots with cstarpar

Welcome to the third post in our cstar series. So far, the first post gave an introduction to cstar, while the second post explained how to extend cstar with custom commands. In this post we will look at cstar’s cousin cstarpar. Both utilities deliver the same topology-aware orchestration, yet cstarpar executes commands locally, allowing operations cstar is not capable of.

Using ssh

cstarpar relies heavily on ssh working smoothly and without any user prompts. When we run a command with cstar, it will take the command, ssh into the remote host, and execute the command on our behalf. For example, we can run hostname on each node of a 3-node cluster:

$ cstar run --seed-host --command hostname
$ cat ~/.cstar/jobs/8ff6811e-31e7-4975-bec4-260eae885ef6/ec2-*/out

If we switch to cstarpar, it will execute the hostname command locally and we will see something different:

$ cstarpar --seed-host hostname
$ cat ~/.cstar/jobs/a1735406-ae58-4e44-829b-9e8d4a90fd06/ec2-*/out

To make cstarpar execute commands on remote machines we just need to make the command explicitly use ssh:

$ cstarpar --seed-host "ssh {} hostname"
cat ~/.cstar/jobs/2c54f7a1-8982-4f2e-ada4-8b45cde4c4eb/ec2-*/out

Here we can see the hostname was executed on the remote hosts.


The true advantage of local execution is that there is no need for interaction with the remote host. This approach allows operations that would normally prevent that interaction, such as reboots. For example, the following command reboots the entire cluster in a topology-aware fashion, albeit very roughly because it gracelessly kills all processes, including Cassandra:

$ cstarpar --seed-host -- "ssh {} sudo reboot &"

Note that this example used the sudo reboot & command. The reboot command on its own causes the reboot immediately. This is so drastic that it causes Python’s subprocess module to think an error occured. Placing the & after the command, directing to run the command in the background, allows the shell execution return back to Python cleanly. Once the host is down, cstarpar will mark the host as such in the job status report.

It is important to ensure the hosts are configured to start the Cassandra process automatically after the reboot, because just like cstar, cstartpar will proceed with next hosts only if all hosts are up and will otherwise wait indefinitely for the rebooted host to come back.

Since cstarpar can execute local commands and scripts, it need not support complex commands in the same way cstar does. To run a complex command with cstarpar, we can use a script file. To illustrate this, the script below will add a graceful shutdown of Cassandra before executing the actual reboot:

$ cat ~/gentle_reboot.sh

echo "Draining Cassandra"
ssh ${FQDN} nodetool drain && sleep 5

echo "Stopping Cassandra process"
ssh ${FQDN} sudo service cassandra stop && sleep 5

echo "Rebooting"
ssh ${FQDN} sudo reboot &

The reboot command then runs like this:

$ cstarpar --seed-host -- "bash /absolute/path/to/gentle_reboot.sh {}"

Replication and Conclusion

For this post, I used a simple three node cluster provisioned with tlp-cluster. cstarpar relies heavily on ssh working smoothly and without user prompts. Initially, I attempted the connection without any specific ssh configuration on my laptop or the AWS hosts, the ssh calls looked like this:

$ cstarpar --seed-host ${SEED_IP} --ssh-identity-file=${PATH_TO_KEY}  --ssh-username ubuntu "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@{} hostname"

In the gentle_reboot.sh I also had to add some options:

ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i ${PATH_TO_KEY} ubuntu@${FQDN} sudo reboot &

Once configured, I was able to harness the full power of cstarpar, which supplements cstar functionality by executing commands locally. This was demonstrated to be useful for operations for which the cstar’s mode of operation is not well suited, such as reboots. Importantly, to leverage the most value from cstarpar, it is critical to have ssh configured to run smoothly and without any user prompts.

Worry-Free Ingestion: Flow Control of Writes in Scylla

Scylla Flow Control

This blog post is based on a talk I gave last month at the third annual Scylla Summit in San Francisco. It explains how Scylla ensures that ingestion of data proceeds as quickly as possible, but not quicker. It looks into the existing flow-control mechanism for tables without materialized views, and into the new mechanism for tables with materialized views, which is introduced in the upcoming Scylla Open Source 3.0 release.


In this post we look into ingestion of data into a Scylla cluster. What happens when we make a large volume of update (write) requests?

We would like the ingestion to proceed as quickly as possible but without overwhelming the servers. An over-eager client may send write requests faster than the cluster can complete earlier requests. If this is only a short burst of requests, Scylla can absorb the excess requests in a queue or numerous queues distributed throughout the cluster (we’ll look at the details of these queues below). But had we allowed the client to continue writing at this excessive rate, the backlog of uncompleted writes would continue to grow until the servers run out of memory and possibly crash. So as the backlog grows, we need to find a way for the server to tell the client to slow down its request rate. If we can’t slow down the client, we have to start failing new requests.

Cassandra’s CQL protocol does not offer any explicit flow-control mechanisms for the server to slow down a client which is sending requests faster than the server can handle them. We only have two options to work with: delaying replies to the client’s requests, and failing them. How we can use these two options depends on what drives the workload: We consider two different workload models — a batch workload with bounded concurrency, and an interactive workload with unbounded concurrency:

  1. In a batch workload, a client application wishes to drive the server at 100% utilization for a long time, to complete some predefined amount of work. There is a fixed number of client threads, each running a request loop: preparing some data, making a write request, and waiting for its response. The server can fully control the request rate by rate-limiting (delaying) its replies: If the server only sends N replies per second, the client will only send N new requests per second. We call this rate-limiting of replies, or throttling.

  2. In an interactive workload, the client sends requests driven by some external events (e.g., activity of real users). These requests can come at any rate, which is unrelated to the rate at which the server completes previous requests. For such a workload, if the request rate is at or below the cluster’s capacity, everything is fine and the request backlog will be mostly empty. But if the request rate is above the cluster’s capacity, the server has no way of slowing down these requests and the backlog grows and grows. If we don’t want to crash the server (and of course, we don’t), we have no choice but to return failure for some of these requests.

    When we do fail requests, it’s also important how we fail: We should fail fresh new, not yet handled, client requests. It’s a bad idea to fail requests to which we had already devoted significant work — if the server spends valuable CPU time on requests which will end up being failed anyway, throughput will lower. We use the term admission control for a mechanism which fails a new request when it believes the server will not have the resources needed to handle the request to completion.

For these reasons Scylla utilizes both throttling and admission control. Both are necessary. Throttling is a necessary part of handling normal batch workloads, and admission control is needed for unexpected overload situations. In this post, we will focus on the throttling part.

We sometimes use the term backpressure to describe throttling, which metaphorically takes the memory “pressure” (growing queues) which the server is experiencing, and feeds it back to the client. However, this term may be confusing, as historically it was used for other forms of flow control, not for delaying replies as a mechanism to limit the request rate. In the rest of this document I’ll try to avoid the term “backpressure” in favor of other terms like throttling and flow control.

Above we defined two workload models — interactive and and batch workloads. We can, of course, be faced by a combination of both. Moreover, even batch workloads may involve several independent batch clients, starting at different times and working with different concurrencies. The sum of several such batch workloads can be represented as one batch workload with a changing client concurrency. E.g., a workload can start with concurrency 100 for one minute, then go to concurrency 200 for another minute, etc. Our flow control algorithms need to reasonably handle this case as well, and react to a client’s changing concurrency. As an example, consider that the client doubled the number of threads. Since the total number of writes the server can handle per second remains the same, now each client thread will need to send requests at half the rate it sent earlier when there were just half the number of threads.

The problem of background writes

Let’s first look at writes to regular Scylla tables which do not have materialized views. Later we can see how materialized views further complicate matters.

A client sends an update (a write request) to a coordinator node, which sends the update to RF replicas (RF is the replication factor — e.g., 3). The coordinator then waits for first CL (consistency level — e.g., 2) of those writes to have completed, at which point it sends a reply to the client, saying that the desired consistency-level has been achieved. The remaining ongoing writes to replicas (RF-CL — in the above examples =1 remaining write) will then continue “in the background”, i.e., after the response to the client, and without the client waiting for them to finish.

The problem with these background writes is that a batch workload, upon receiving the server’s reply, will send a new request before these background writes finish. So if new writes come in faster than we can finish background writes, the number of these background writes can grow without bound. But background writes take memory, so we cannot allow them to grow without bound. We need to apply some throttling to slow the workload down.

The slow node example

Before we explain how Scylla does this throttling, it is instructive to look at one concrete — and common — case where background writes pile up and throttling becomes necessary.

This is the case where one of the nodes happens to be, for some reason, consistently slower than the others. It doesn’t have to be much slower — even a tiny bit slower can cause problems:

Consider, for example, three nodes and a table with RF=3, i.e., all data is replicated on all three nodes, so all writes need to go to all three. Consider than one node is just 1% slower: Two of the nodes can complete 10,000 replica writes per second, while the third can only complete 9,900 replica writes per second. If we do CL=2 writes, then every second 10,000 of these writes can complete after node 1 and 2 completed their work. But since node 3 can only finish 9,900 writes in this second, we will have added 100 new “background writes” waiting for the write to node 3 to complete. We will continue to accumulate 100 additional background writes each second and, for example, after 100 seconds we will have accumulated 10,000 background writes. And this will continue until we run out of memory, unless we slow down the client to only 9,900 writes per second (and in a moment, we’ll explain how). It is possible to demonstrate this and similar situations in real-life Scylla clusters. But to make it easier to play with different scenarios and flow-control algorithms, we wrote a simple simulator. In the simulator we can exactly control the client’s concurrency, the rate at which each replica completes write requests, and then graph the lengths of the various queues, the overall write performance, and so on, and investigate how those respond to different throttling algorithms.

In our simple “slow node” example, we see the following results from the simulator:

Simulator Results, Figure 1

Simulator Results 2

In the top graph, we see that a client with fixed concurrency (arbitrarily chosen as 50 threads) writing with CL=2 will, after a short burst, get 10,000 replies each second, i.e., the speed of the two fastest nodes. But while staying at that speed, we see in the bottom graph that the backlog of background writes grows continuously — 100 every second, as we suspected. We need to slow down the client to curb this growth.

It’s obvious from the description above that any consistent difference in node performance, even much smaller than 1%, will eventually cause throttling to be needed to avoid filling the entire memory with backlogged writes. In real-life such small performance differences do happen in clouds, e.g., because some of the VMs have busier “neighbors” than others.

Throttling to limit background writes

Scylla applies a simple, but effective, throttling mechanism: When the total amount of memory that background writes are currently using goes over some limit — currently 10% of the shard’s memory — the coordinator starts throttling the client by no longer moving writes from foreground to background mode. This means that the coordinator will only reply when all RF replica writes have completed, with no additional work left in the background. When this throttling is on, the backlog of background writes does not continue to grow, and replies are only sent at the rate we can complete all the work, so a batch workload will slow down its requests to the same rate.

It is worth noting that when throttling is needed, the queue of background writes will typically hover around its threshold size (e.g., 10% of memory). When a flow-control algorithm always keeps a full queue, it is said to suffer from the bufferbloat problem. The typical bufferbloat side-effect is increased latency, but happily in our case this is not an issue: The client does not wait for the background writes (since the coordinator has already returned a reply), so the client will experience low latency even when the queue of background writes is full. Nevertheless, the full queue does have downsides: it wastes memory and it prevents the queue from absorbing writes to a node that temporarily goes down.

Let’s return to our “slow node” simulation from above, and see how this throttling algorithm indeed helps to curb the growth of the backlog of background writes:

Simulator Results 3

Simulator Results 4

As before, we see in the top graph that the server starts by sending 10,000 replies per second, which is the speed of the two fastest nodes (remember we asked for CL=2). At that rate, the bottom graph shows we are accruing a backlog of 100 background writes per second, until at time 3, the backlog has grown to 300 items. In this simulation we chose 300 as background write limit (representing the 10% of the shard’s memory in real Scylla). So at that point, as explained above, the client is throttled by having its writes wait for all three replica writes to complete. Those will only complete at rate of 9,900 per second (the rate of the slowest node), so the client will slow down to this rate (top graph, starting from time 3), and the background write queue will stop growing (bottom graph). If the same workload continues, the background write queue will remain full (at the threshold 300) — if it temporarily goes below the threshold, throttling is disabled and the queue will start growing back to the threshold.

The problem of background view updates

After understanding how Scylla throttles writes to ordinary tables, let’s look at how Scylla throttles writes to materialized views. Materialized views were introduced in Scylla 2.0 as an experimental feature — please refer to this blog post if you are not familiar with them. They are officially supported in Scylla Open Source Release 3.0, which also introduces the throttling mechanism we describe now, to slow down ingestion to the rate at which Scylla can safely write the base table and all its materialized views.

As before, a client sends a write requests to a coordinator, and the coordinator sends them to RF (e.g., 3) replica nodes, and waits for CL (e.g., 2) of them to complete, or for all of them to complete if the backlog of background write reached the limit. But when the table (also known as the base table) has associated materialized views, each of the base replicas now also sends updates to one or more paired view replicas — other nodes holding the relevant rows of the materialized views.

The exact details of which updates we send, where, and why is beyond the scope of this post. But what is important to know here is that the sending of the view updates always happens asynchronously — i.e., the base replica doesn’t wait for it, and therefore the coordinator does not wait for it either — only the completion of enough writes to the base replicas will determine when the coordinator finally replies to the client.

The fact that the client does not wait for the view updates to complete has been a topic for heated debate ever since the materialized view feature was first designed for Cassandra. The problem is that if a base replica waits for updates to several view replicas to complete, this hurts high availability which is a cornerstone of Cassandra’s and Scylla’s design.

Because the client does not wait for outstanding view updates to complete, their number may grow without bound and use unbounded amounts of memory on the various nodes involved — the coordinator, the RF base replicas and all the view replicas involved in the write. As in the previous section, here too we need to start slowing down the client, until the rate when the system completes background work at the same rate as new background work is generated.

To illustrate the problem Scylla needed to solve, let’s use our simulator again to look at a concrete example, continuing the same scenario we used above. Again we have three nodes, RF=3, client with 50 threads writing with CL=2. As before two nodes can complete 10,000 base writes per second, and the third only 9,900. But now we introduce a new constraint: the view updates add considerable work to each write, to the point that the cluster can now only complete 3,000 writes per second, down from the 9,900 it could complete without materialized views. The simulator shows us (top graph below) that, unsurprisingly, without a new flow-control mechanism for view writes the client is only slowed down to 9,900 requests per second, not to 3,000. The bottom graph shows that at this request rate, the memory devoted to incomplete view writes just grows and grows, by as many as 6,900 (=9,900-3,000) updates per second:

Simulator Results 5

Simulator Results 6

So, what we need now is to find a mechanism for the coordinator to slow down the client to exactly 3,000 requests per second. But how do we slow down the client, and how does the coordinator know that 3,000 is the right request rate?

Throttling to limit background view updates

Let us now explain how Scylla 3.0 throttles the client to limit the backlog of view updates. We begin with two key insights:

  1. To slow down a batch client (with bounded concurrency), we can add an artificial delay to every response. The longer the delay is, the lower the client’s request rate will become.
  2. The chosen delay influences the size of the view-update backlog: Picking a higher delay slows down the client and slows the growth of the view update backlog, or even starts reducing it. Picking a lower delay speeds up the client and increases the growth of the backlog.

Basically, our plan is to devise a controller, which changes the delay based on the current backlog, trying to keep the length of the backlog in a desired range. The simplest imaginable controller, a linear function, works amazingly well:

(1) delay = α ⋅ backlog

Here α is any constant. Why does this deceptively-simple controller work?

Remember that if delay is too small, backlog starts increasing, and if delay is too large, the backlog starts shrinking. So there is some “just right” delay, where the backlog size neither grows nor decreases. The linear controller converges on exactly this just-right delay:

  1. If delay is lower than the just-right one, the client is too fast, the backlog increases, so according to our formula (1), we will increase delay.
  2. If delay is higher than the just-right one, the client is too slow, the backlog shrinks, so according to (1), we will decrease delay.

Let’s add to our simulator the ability to delay responses by a given delay amount, and to vary this delay according to the view update backlog in the base replicas, using formula (1). The result of this simulation looks like this:

Simulator Results 7

Simulator Results 8

In the top graph, we see the client’s request rate gradually converging to exactly the request rate we expected: 3,000 requests per second. In the bottom graph, the backlog length settles on about 1600 updates. The backlog then stops growing any more — which was our goal.

But why did the backlog settle on 1600, and not on 100 or 1,000,000? Remember that the linear control function (1) works for any α. In the above simulation, we took α = 1.0 and the result was convergence on backlog=1600. If we change α, the delay to which we converge will still have to be the same, so (1) tells us that, for example, if we double α to 2.0, the converged backlog will halve, to 800. In this manner, if we gradually change α we can reach any desired backlog length. Here is an example, again from our simulator, where we gradually changed α with the goal of reaching a backlog length of 200:

Simulator Results 9

Simulator Results 10

Indeed, we can see in the lower graph that after over-shooting the desired queue length 200 and reaching 700, the controller continues to increase to decrease the backlog, until the backlog settles on exactly the desired length — 200. In the top graph we see that as expected, the client is indeed slowed down to 3,000 requests per second. Interestingly in this graph, we also see a “dip”, a short period where the client was slowed down even further, to just 2,000 requests per second. The reason for this is easy to understand: The client starts too fast, and a backlog starts forming. At some point the backlog reached 700. Because we want to decrease this backlog (to 200), we must have a period where the client sends less than 3,000 requests per second, so that the backlog would shrink.

In controller-theory lingo, the controller with the changing α is said to have an integral term: the control function depends not just on the current value of the variable (the backlog) but also on the previous history of the controller.

In (1), we considered the simplest possible controller — a linear function. But the proof above that it converges on the correct solution did not rely on this linearity. The delay can be set to any other monotonically-increasing function of the backlog:

(2) delay = f(backlog / backlog0 delay0

(where backlog0 is a constant with backlog units, and delay0 is a constant with time units).

In Scylla 3.0 we chose this function to be a polynomial, selected to allow relatively-high delays to be reached without requiring very long backlogs in the steady state. But we do plan to continue improving this controller in future releases.


A common theme in Scylla’s design, which we covered in many previous blog posts, is the autonomous database, a.k.a. zero configuration. In this post we covered another aspect of this theme: When a user unleashes a large writing job on Scylla, we don’t want him or her to need to configure the client to use a certain speed or risk overrunning Scylla. We also don’t want the user to need to configure Scylla to limit an over-eager client. Rather, we want everything to happen automatically: The write job should just just run normally without any artificial limits, and Scylla should automatically slow it down to exactly the right pace — not too fast that we start piling up queues until we run out of memory, but also not too slow that we let available resources go to waste.

In this post, we explained how Scylla throttles (slows down) the client by delaying its responses, and how we arrive at exactly the right pace. We started with describing how throttling works for writes to ordinary tables — a feature that had been in Scylla for well over a year. We then described the more elaborate mechanisms we introduce in Scylla 3.0 for throttling writes to tables with materialized views. For demonstration purposes, we used a simulator for the different flow-control mechanisms to better illustrate how they work. However, these same algorithms have also been implemented in Scylla itself — so go ahead and ingest some data! Full steam ahead!

Flow Control Finish

The post Worry-Free Ingestion: Flow Control of Writes in Scylla appeared first on ScyllaDB.

Step by Step Monitoring Cassandra with Prometheus and Grafana

In this blog, I’m going to give a detailed guide on how to monitor a Cassandra cluster with Prometheus and Grafana.

For this, I’m using a new VM which I’m going to call “Monitor VM”. In this blog post, I’m going to work on how to install the tools. In a second one, I’m going to go through the details on how to do use and configure Grafana dashboards to get the most out of your monitoring!

High level plan

Monitor VM

  1. Install Prometheus
  2. Configure Prometheus
  3. Install Grafana

Cassandra VMs

  1. Download prometheus JMX-Exporter
  2. Configure JMX-Exporter
  3. Configure Cassandra
  4. Restart Cassandra

Detailed Plan

Monitor VM

Step 1. Install Prometheus

$ wget https://github.com/prometheus/prometheus/releases/download/v2.3.1/prometheus-2.3.1.linux-amd64.tar.gz
  $ tar xvfz prometheus-*.tar.gz
  $ cd prometheus-*

Step 2. Configure Prometheus

        $ vim /etc/prometheus/prometheus.yaml
    scrape_interval: 15s

  # Cassandra config
    - job_name: 'cassandra'
      scrape_interval: 15s
        - targets: ['cassandra01:7070', 'cassandra02:7070', 'cassandra03:7070']

Step 3. Create storage and start Prometheus

  $ mkdir /data
  $ chown prometheus:prometheus /data
  $ prometheus --config.file=/etc/prometheus/prometheus.yaml

Step 4. Install Grafana

  $ wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana_5.1.4_amd64.deb
  $ sudo apt-get install -y adduser libfontconfig
  $ sudo dpkg -i grafana_5.1.4_amd64.deb

Step 5. Start Grafana

  $ sudo service grafana-server start

Cassandra Nodes

Step 1. Download JMX-Exporter:

  $ mkdir /opt/jmx_prometheus
  $ wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.3.0/jmx_prometheus_javaagent-0.3.0.jar

Step 2. Configure JMX-Exporter

  $ vim /opt/jmx_prometheus/cassandra.yml
  lowercaseOutputName: true
  lowercaseOutputLabelNames: true
  whitelistObjectNames: [
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(Connection|Streaming), scope=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Value)
      name: cassandra_$1_$3
        address: "$2"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(ColumnFamily), name=(RangeLatency)&amp;gt;&amp;lt;&amp;gt;(Mean)
      name: cassandra_$1_$2_$3
    - pattern: org.apache.cassandra.net&amp;lt;type=(FailureDetector)&amp;gt;&amp;lt;&amp;gt;(DownEndpointCount)
      name: cassandra_$1_$2
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(Keyspace), keyspace=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Mean|95thPercentile)
      name: cassandra_$1_$3_$4
        "$1": "$2"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(Table), keyspace=(\S*), scope=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Mean|95thPercentile)
      name: cassandra_$1_$4_$5
        "keyspace": "$2"
        "table": "$3"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(ClientRequest), scope=(\S*), name=(\S*)&amp;gt;&amp;lt;&amp;gt;(Count|Mean|95thPercentile)
      name: cassandra_$1_$3_$4
        "type": "$2"
    - pattern: org.apache.cassandra.metrics&amp;lt;type=(\S*)(?:, ((?!scope)\S*)=(\S*))?(?:, scope=(\S*))?,
      name: cassandra_$1_$5
        "$1": "$4"
        "$2": "$3"

Step 3. Configure Cassandra

  echo 'JVM_OPTS="$JVM_OPTS -javaagent:/opt/prometheus-exporter/jmx_prometheus_javaagent-0.3.0.jar=7070:/opt/prometheus-exporter/cassandra.yaml"' &amp;gt;&amp;gt; conf/cassandra-env.sh

Step 4. Restart Cassandra

  $ nodetool flush
  $ nodetool drain
  $ sudo service cassandra restart

And now, if you have no errors (and you shouldn’t!) your Prometheus is ingesting your Cassandra metrics!

Wait for the next blog post where I will guide you through a good Grafana configuration!

Introducing Transient Replication

Transient Replication is a new experimental feature soon to be available in 4.0. When enabled, it allows for the creation of keyspaces where replication factor can be specified as a number of copies (full replicas) and temporary copies (transient replicas). Transient replicas retain the data they replicate only long enough for it to be propagated to full replicas, via incremental repair, at which point the data is deleted. Writing to transient replicas can be avoided almost entirely if monotonic reads are not required because it is possible to achieve a quorum of acknowledged writes without them.

This results in a savings in disk space, CPU, and IO. By deleting data as soon as it is no longer needed, transient replicas require only a fraction of the disk space of a full replica. By not having to store the data indefinitely, the CPU and IO required for compaction is reduced, and read queries are faster as they have less data to process.

So what are the benefits of not actually keeping a full copy of the data? Well, for some installations and use cases, transient replicas can be almost free if monotonic reads are disabled. In future releases where monotonic reads are supported with Transient Replication, enabling monotonic reads would reduce the savings in CPU and IO, but even then they should still be significant.

Transient Replication is designed to be transparent to applications:

  • Consistency levels continue to produce the same results for queries.
  • The number of replicas that can be lost before data loss occurs is unchanged.
  • The number of replicas that can be unavailable before some queries start to timeout or return unavailable is unchanged (with the exception of ONE).

With Transient Replication, you can go from 3 replicas to 5 replicas, two of which are transient, without adding any hardware.

If you are running an active-passive 2 DC setup with 3 replicas in each DC, you can make one replica in each DC transient and still have four full copies of the data in total.

Feature support

Transient Replication is not intended to fully replace Cassandra’s existing approach to replication. There are features that currently don’t work with transiently replicated keyspaces and features that are unlikely ever to work with them.

You can have keyspaces with and without Transient Replication enabled in the same cluster, so it is possible to use Transient Replication for just the use cases that are a good fit for the currently available functionality.

Currently unsupported but coming:

  • Monotonic reads
  • Batch log
  • LWT
  • Counters

Will never be supported:

  • Secondary indexes
  • Materialized views

How Transient Replication works


Transient replication extends Cassandra’s existing consistent hashing algorithm to designate some replicas of a point or range on the consistent hash ring as transient and some as full. The following image depicts a consistent hash ring with three replicas A, B, and C. The replicas are located at tokens 5, 10, 15 respectively. A key k hashes to token 3 on the ring.

A consistent hash ring without Transient Replication

Replicas are selected by walking the ring clockwise starting at the point on the ring the key hashes to. At RF=3, the replicas of key k **are **A, B, C. With Transient Replication, the last N replicas (where N is the configured number of transient replicas) found while walking the ring are designated as transient.

There are no nodes designated as transient replicas or full replicas. All nodes will fully replicate some ranges on the ring and transiently replicate others.

The following image depicts a consistent hash ring at RF=3/1 (three replicas, one of which is transient). The replicas of k are still A, B, and C, but C is now transiently replicating k.

A consistent hash ring with Transient Replication

Normally all replicas of a range receive all writes for that range, as depicted in the following image.

Normal write behavior

Transient replicas do not receive writes in the normal write path.

Transient write behavior

If sufficient full replicas are unavailable, transient replicas will receive writes.

Transient write with unavailable node

This optimization, which is possible with Transient Replication, is called Cheap Quorums. This minimizes the amount of work that transient replicas have to do at write time, and reduces the amount of background compaction they will have to do.

Cheap Quorums and monotonic reads: Cheap Quorums may end up being incompatible with an initial implementation of monotonic reads, and operators will be able to make a conscious trade off between performance and monotonic reads.

Rapid write protection

In keyspaces utilizing Transient Replication, writes are sent to every full replica and enough transient replicas to meet the requested consistency level (to make up for unavailable full replicas). In addition, enough transient replicas are selected to reach a quorum in every datacenter, though unless the consistency level requires it, the write will be acknowledged without ensuring all have been delivered.

Because not all replicas are sent the write, it’s possible that insufficient replicas will respond, causing timeouts. To prevent this, we implement rapid write protection, similar to rapid read protection, that sends writes to additional replicas if sufficient acknowledgements to meet the consistency level are not received promptly.

The following animation shows rapid write protection in action.

Animation of rapid write protection preventing a write timeout

Rapid write protection is configured similarly to rapid read protection using the table option additional_write_policy. The policy determines how long to wait for acknowledgements before sending additional mutations. The default is to wait for P99 of the observed latency.

Incremental repair

Incremental repair is used to clean up transient data at transient replicas and propagate it to full replicas.

When incremental repair occurs transient replicas stream out transient data, but don’t receive any. Anti-compaction is used to separate transient and fully replicated data so that only fully replicated data is retained once incremental repair completes.

The result of running an incremental repair is that all full replicas for a range are synchronized and can be used interchangeably to retrieve the repaired data set for a query.

Read path

Reads must always include at least one full replica and can include as many replicas (transient or full) as necessary to achieve the desired consistency level. At least one full replica is required in order to provide the data not available at transient replicas, but it doesn’t matter which full replica is picked because incremental repair synchronizes the repaired data set across full replicas.

Reads at transient replicas are faster than reads at full replicas because reads at transient replicas are unlikely to return any results if monotonic reads are disabled, and they haven’t been receiving writes.

Creating keyspaces with Transient Replication

Transient Replication is supported by SimpleStrategy and NetworkTopologyStrategy. When specifying the replication factor, you can specify the number of transient replicas in addition to the total number of replicas (including transient replicas). The syntax for a replication factor of 3 replicas total with one of them being transient would be “3/1”.

ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'DC1' : '3/1'};
ALTER KEYSPACE foo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : '3/1'};

Monotonic reads are not supported with Transient Replication in 4.0, so any existing tables in the keyspace must have monotonic reads disabled by setting read_repair = 'NONE'

Once the keyspace has been altered, you will need to run incremental repair and then nodetool cleanup to ensure transient data is cleaned up.

Operational matters

Transient replication requires rolling incremental repair to be run regularly in order to move data from transient replicas to full replicas. By default transient replicas will receive 1% of writes for transiently replicated ranges due to rapid write protection. If a node is down for an extended period of time, its transient replicas will receive additional write load and that data should be cleaned up using incremental repair. Running incremental repair regularly will ensure that the size of each repair is small.

It’s also a good idea to run a small number of vnodes with transient replication so that when a node goes down the load is spread out over several other nodes that transiently replicate that range. Larges numbers of vnodes are known to be problematic, so it’s best to start with a cluster that is already close to or at its maximum size so that a small number of vnodes will be sufficient. If you intend to grow the cluster in the future, you will need to be cognizant of how this will interact with the number of vnodes you select.

While the odds of any data loss should multiple nodes be permanently lost remain the same with transient replication, the magnitude of potential data loss does not. With 3/1 transient replication the permanent loss of two nodes could result in the loss of the entirety of the repaired data set. If you are running a multi-DC setup with a high level of replication such as 2 DCs, with 3/1 replicas in each, then you will have 4 full copies total and the added risk of transient replication is minimal.

Experimental features

Experimental features are a relatively new idea for Apache Cassandra. Although we recently voted to make materialized views an experimental feature retroactively, Transient Replication is the first experimental feature to be introduced as such.

The goal of introducing experimental features is to allow for incremental development across multiple releases. In the case of Transient Replication, we can avoid a giant code drop that heavily modifies the code base, and the associated risks with incorporating a new feature that way.

What it means for a feature to be experimental doesn’t have a set definition, but for Transient Replication it’s intended to set expectations. As of 4.0, Transient Replication’s intended audience is expert operators of Cassandra with the ability to write the book on how to safely deploy Transient Replication, debug any issues that result, and if necessary contribute code back to address problems as they are discovered.

It’s expected that the feature set for Transient Replication will not change in minor updates to 4.0, but eventually it should be ready for use by a wider audience.

Next steps for Transient Replication

If increasing availability or saving on capacity sounds good to you, then you can help make transient replication production-ready by testing it out or even deploying it. Experience and feedback from the community is one the of the things that will drive transient replication bug fixing and development.

Getting started with GraphQL and Apache Cassandra

GraphQL positioning

Yet another API specification

Reaper 1.3 Released

Cassandra Reaper 1.3 was released a few weeks ago, and it’s time to cover its highlights.

Configurable pending compactions threshold

Reaper protects clusters from being overloaded by repairs by not submitting new segments that involve a replica with more than 20 pending compactions. This usually means that nodes aren’t keeping up with streaming triggered by repair and running more repair could end up seriously harming the cluster.

While the default value is a good one, there may be cases where one would want to tune the value to match its specific needs. Reaper 1.3.0 allows this by adding a new configuration setting to add in your yaml file:

maxPendingCompactions: 50

Use this setting with care and for specific cases only.

More nodes metrics in the UI

Following up on the effort to expand the capabilities of Reaper beyond the repair features, the following informations are now available in the UI (and through the REST API), when clicking on a node in the cluster view.

Progress of incoming and outgoing streams is now displayed:
Streaming progress

Compactions can be tracked:
Streaming progress

And both thread pool stats and client latency metrics are displayed in tables:
Streaming progress

Streaming progress

What’s next for Reaper?

As shown in a previous blog post, we’re working on an integration with Spotify’s cstar in order to run and schedule topology aware commands on clusters managed by Reaper.

We are also looking to add a sidecar mode, which would collocate an instance of Reaper on each Cassandra node, allowing to use Reaper on clusters where JMX access is restricted to localhost.

The upgrade to 1.3 is recommended for all Reaper users. The binaries are available from yum, apt-get, Maven Central, Docker Hub, and are also downloadable as tarball packages. Remember to backup your database before starting the upgrade.

All instructions to download, install, configure, and use Reaper 1.3 are available on the Reaper website.