Planet Cassandra

All your NoSQL Apache Cassandra resources in one place.

Scylla Release 2.1

The Scylla team is pleased to announce the release of Scylla 2.1, a production-ready Scylla Open Source minor release.

Scylla is an open source NoSQL database compatible with Apache Cassandra, with superior performance and consistently low latency. Starting from this release, critical bugs will be fixed in the Scylla 2.1 and 2.0 release series only. If you are still using open source Scylla 1.7 or prior – you are encouraged to upgrade. We will continue to fix bugs and add features to the master branch towards Scylla 2.2 and beyond.

Related Links

Installation updates

Starting from Scylla 2.1, Scylla packages for Ubuntu 16 and Debian 8 are signed. You can find the keys and instructions per Linux distribution here:

New features in Scylla 2.1

  • Time Window Compaction Strategy (TWCS ). An improvement and simplification on Date Tiered Compaction Strategy (DTCS) as the go-to strategy for time series data. TWCS uses STCS on “time windowed“ groups of SSTables, using the maximum timestamp of the SSTable to decide which time window each SSTable will be part of. #1432
  • CQL additions:
    • Cast functions between types, for example: SELECT avg(cast(count as double)) FROM myTable.
    • Support Duration data type #2240.
    • CQL prepared statements are now properly evicted so that you’re not able to OOM a server just by preparing many unique statements. See #2474
    • Support non-reserved keywords as columns (without quotes) #2507, for example, a column named “frozen”.
  • CompressionInfo.db files, part of the SSTable format, are stored more efficiently in memory, allowing higher disk: RAM ratios #1946.
  • Transitional Auth – To enable access control without downtime, use transitional modes to temporarily support applications and users that do not have accounts without interrupting services.
  • New REST API /storage_service/force_terminate_repair allows aborting of a running repair and all the data streams related to it. #2105.
  • Secondary Indexing – experimental. Scylla’s Ssecondary Index implementation is based on Materialized Views, introduced in Scylla 2.0, and allows the creation of a Secondary Index using an Apache Cassandra compatible syntax: CREATE INDEX ON ks.users (email); More on Scylla’s Secondary Indexing usage and advantages can be found here.
    Experimental Secondary Indexing (SI) limitations in Scylla 2.1:
    • Only data inserted after the index is created is indexed (as per MV limitations).
    • SI only works for regular columns – not with partition key or their components, clustering key columns, static columns, or collection columns.
    • SI queries don’t support paging and will perform badly for low cardinality indexes (e.g. few unique indexed values that map to a large number of primary keys).

Metrics updates from 2.1

Scylla Monitoring stack now includes Scylla 2.1 dashboard.

The following metric names have changed:

Old Metric Name New Metric Name
Scylla_database_active_reads_streaming scylla_database_active_reads {reads=streaming}
Scylla_database_active_reads_system_keyspace scylla_database_active_reads {reads=system_keyspace}
scylla_database_queued_reads_streaming scylla_database_queued_reads {reads=streaming}
scylla_database_queued_reads_system_keyspace  scylla_database_queued_reads_system {reads=keyspace}

The following metrics are new in Scylla 2.1:


For a full list of metrics and their descriptions: From a Scylla node run:curl

The post Scylla Release 2.1 appeared first on ScyllaDB.

Understanding the Java Virtual Machine (JVM) Architecture Part 1

What is garbage collection?

Garbage collection (GC) is a form of automatic memory management. In essence what the garbage collection does is to attempt to reclaim garbage, ergo memory occupied by objects that are no longer relevant for the active program, while allowing the developer to focus on the application without having to free memory. My particular interest is how the Java Virtual Machine (JVM) affects Apache Cassandra, since it was built in Java and it can have a big impact on performance.

The method used by the Java Virtual Machine (JVM) to track down all the live objects and to make sure that memory from non-reachable objects can be reclaimed is called the Mark and Sweep Algorithm. It comprises of two steps:

  • Marking phase scans through all reachable objects and keep in native memory a ledger about all such objects.
  • Sweeping makes sure the memory addresses allocated to non-reachable objects is reclaimed so that it can be used for new objects.

Different GC algorithms within the JVM such as CMS of G1GC implement these phases, but the concept explained above remains the same for all.

A crucial thing to consider is that in order for the garbage collection to happen the application threads need to be stopped, as you cannot count references to objects if they keep changing during the process. The temporary pause so that the JVM can perform “housekeeping” activities is called a Stop The World pause. These pauses can happen for multiple reasons, with garbage collection being the principal one.

Garbage collection in Java

Whenever sweeping occurs, and blocks of memory are reclaimed, fragmentation ensues. Memory fragmentation behaves much like disk fragmentation and can lead to multiple problems:

  • Writing operations become more inefficient since finding the next suitable block of sufficient size is no longer a trivial operation.
  • When creating new objects, the JVM allocates memory in contiguous blocks. When fragmentation increases to the point where no single available block is big enough to accommodate the newly created object, an allocation error occurs.

To avoid these problems, the JVM preforms a memory de-fragmentation during the garbage collection. This process moves all live objects close to each other, thereby reducing fragmentation.

Block compaction during a garbage collection

Weak Generational Hypothesis

Garbage collectors make assumptions about the way applications use objects. The most important of these assumptions is the weak generational hypothesis, which states that most objects survive for only a short period of time.

While naïve garbage collection examines every live object in the heap, generational collection exploits several empirical observed properties of most applications to minimize the work required to reclaim unused (garbage) objects.

  • Most of the objects soon become unused.
  • References from old objects to young objects only exist in small numbers.

Some objects live longer than others, and it follows a distribution close to the one below. An efficient collection is made possible by focusing on the fact that a majority of objects “die young.”

Weak Generational Hypothesis – Object life cycle

Based on this hypothesis, the JVM memory is divided into generations (memory pools holding objects of different ages.)

Most objects exist and die in the pool dedicated to young objects (the Young Generation pool.) When the Young Generation pool fills up, a minor compaction is triggered in which only the young generation is collected. The cost of such collections is proportional to the number of live objects being collected, but since the weak generational hypothesis states that most objects die young, the result is a very efficient garbage collection.

A fraction of the objects that survive the minor compaction get promoted to the Old Generation or Tenured Generation, significantly larger than the former and dealing with objects that are less likely to be garbage. Eventually, Tenured Generation will fill up and a major collection will ensue, in which the entire heap is collected. Major compactions usually last much longer than minor collections because a significantly larger number of objects are involved.

This approach also has some problems though:

  • Objects from different generations may contain references to each other.
  • Since GC algorithms are optimized for objects that either ‘die young’ of ‘will live a long time’, the JVM behaves poorly with objects with ‘medium’ life expectancy.

Memory Pools

Heap memory pools


Young Generation

The young generation is comprised of 3 different spaces.

  • Eden Space
  • Survivor spaces


Eden is the memory region where objects are allocated when they are created. Since we are usually talking about multi-threaded environments, where multiple threads are creating a lot of objects simultaneously, Eden is further divided into one or more Thread Local Allocation Buffer (TLAB.) These buffers allow each thread to allocate objects in the corresponding buffer, avoiding expensive lock-contention issues.

When the allocation is not possible inside a TLAB (not enough room), the allocation is done on a Shared Eden space. If no room exists in either the TLAB or the shared Eden Space, a Young Generation Garbage Collection is triggered to free up more space. In the case where the garbage collection did not free enough free memory inside the Eden pool, the object is allocated in the Old Generation.

After the marking phase of the garbage collection identifies all living objects within Eden, all of them are copied to one of the Survivor Spaces, and the entire Eden is cleaned, so that it can be used for new object allocation. This approach is called “Mark and Copy”: the live objects are marked, and then copied (not moved) to a survivor space.

Eden spaces

Survivor Spaces

Adjacent to the Eden space are two survivor spaces. An important thing to notice is that one of the two survivor spaces is always empty. Every time a young generation garbage collection happens, all of the live objects from the Eden space and live objects from the other survivor space (the whole live Young Generation objects) are copied to the other survivor space, leaving the former survivor space empty.

Survivor Spaces cycle

This garbage collection cycle of copying live objects between the two survivor spaces is repeated multiple times (x15) until objects are deemed old enough to be promoted to the old Generation, as they are expected to continue to be used for a long time.

To determine if an object is deemed “old enough” to be considered ready for promotion to the Old Space, whenever an object survives a GC cycle it has its age incremented. When the age exceeds a certain tenuring threshold, the object will be promoted.

The actual tenuring threshold is dynamically adjusted by the JVM, but one can adjust its upper limit by changing -XX:+MaxTenuringThreshold.

  • Setting -XX:+MaxTenuringThreshold=0 results in immediate promotion without copying it between Survivor spaces.
  • On modern JVMs  -XX:+MaxTenuringThreshold,  is set to 15 GC cycles by default. This is also the maximum value in HotSpot.

Promotion may also happen prematurely if the size of the Survivor space is not enough to hold all of the live objects in the Young generation.

Tenured Generation / Old Generation

Old Generation is usually much larger than the Young Generation, and it holds the objects that are less likely to become deprecated.

Garbage Collections in the Old Generation pool happens less frequently than in the Young Generation, but it also takes more time to complete resulting in bigger Stop the World pauses.

Since Objects are expected to live a long time in the Old Generation pool, the cleaning algorithms for the Old Generation are slightly different, there is no Mark and Copy as in the Young Generation but a Mark and Move instead, to minimize fragmentation.

PermGen & Metaspace


As of Java 8, Permanent Generation (PermGen) space, which was created at startup and used to store metadata such as classes and interned strings is gone. The space where metadata information has now moved to native memory to an area known as the Metaspace.

The move was necessary because PermGen was really hard to tune, and it was also difficult to size the PermGen. This created a lot of issues to Java developers since it’s very difficult to predict how much space all that metadata would require, resulting in lots of java.lang.OutOfMemoryError: Permgen space exceptions. The way to fix this problem was to increase the PermGen size up to the maximum allowed size of 256MB:

JVM configuration
java -XX:MaxPermSize=256m com.mycompany.MyApplication
Move from PermGen to Metaspace in Java 8

Move from PermGen to Metaspace in Java 8


With the Metaspace, metadata (such as class definitions) are now located in the native memory and do not interfere with regular Heap Objects. By default, the Metaspace size is only limited by the amount of native memory available to the Java process, thus saving the developers from memory errors as described earlier. The downside of this is you still need to worry about the Metaspace footprint.

  • By default class metadata allocation is limited by the amount of available native memory (capacity will of course depend if you use a 32-bit JVM vs. 64-bit along with OS virtual memory availability).
  • The MaxMetaspaceSize, allows you to limit the amount of native memory used for class metadata. If you don’t use it, the metaspace will dynamically re-size depending on current demand.

The Four Things Companies Are Still Getting Wrong About Customer Experience

The digital age has shaped customers to expect applications to be relevant to them, always available, instantly responsive, and accessible when and where they need them.

This “always on, always there, always relevant to each customer” way of thinking has redefined how enterprises do business. The best enterprises and brands are not only focusing on delivering hyper-personalized customer experiences at every touchpoint, they are taking the lead in reconfiguring their organizations to operate around “customer experience.”

But with all the focus on customer experience being the new competitive differentiator, there are still areas where companies are missing the mark. These blinds spots can have a damaging impact on the customer experience even if brands are doing everything else right:

1) Customer data is siloed

Marketing, customer service, R&D, corporate communications, IT, and sales are still far too often siloed, as far as their data is concerned. Each owns data around the customer, but that data is not centralized with one view of the customer, and that data is not shared between departments. The customer belongs to the brand, but the relationship with the customer is owned by too many disparate functions.

Customer data needs to be centralized and shareable to create a 360-degree view. Everyone that touches the customer experience has ownership in the relationship. Like a multidisciplinary healthcare team that provides care for a patient, the customer should feel that they are interacting with a single, integrated, seamless whole, not with separate, non-communicating functions.

2) CSAT is the primary measure of loyalty

CSAT surveys provide critical data on customer satisfaction and an important metric to gauge how well the brand is doing on customer experience. But, CSAT scores alone do not equate to loyalty and retention.

Today’s customers are loyal to their experiences, not to their brands. They stay with a brand as long as they continue to have the experiences they desire. ThinkJar reports that 67% of customers cite bad experiences as a reason for churn. However, the absence of negative feedback is not a sign of satisfaction. ThinkJar also reported that only one of 26 unhappy customers will complain — the rest simply leave. Which means brands must focus on the unseen, unspoken drivers of loyalty and retention and not rely solely on surveys for an accurate measure.

3) Not every employee is empowered to be a customer experience champion (and they should be)

For an enterprise to truly become customer-centric, everyone must personally own the customer experience in the work they do each day. Companies must be able to clearly articulate what defines their standards for customer experience and share it widely from the boardroom on down to IT, sales associates, and influencers. Customer experience education, training, and accountability are essential for everyone if the customer is to have exceptional levels of customer experience replicated each time they interact with the brand. Remember, replication and consistency are what drive retention.

4) Data is not used for real-time insights

The ability to deliver hyper-personalization depends on your ability to read, interact with, and respond to customer behavior in real time. If your data platform can’t provide a scalable, flexible foundation on which to build amazing customer experience applications, then your enterprise will not be able to generate customer experiences that keep pace with the demands of the Right-Now Customer.

A customer experience-focused data management platform enables real-time personalization that delivers an amazing, tailored customer experience anywhere, on any device, seamlessly across touchpoints. It should scale to handle high volumes and be capable of creating a hyper-personalized, responsive, consistent experience that drives retention and loyalty.

Drive consistent customer experience with real-time data management

To avoid disruption in the customer experience arena, enterprises must be able to create replicable experiences that are highly personalized to each customer in the moment it matters most to that customer. A consistent customer experience is what builds trust between brands and the customer, and what ultimately drives retention. Data management platforms that deliver in real time are key to building a customer-centric organization in today’s digital age.

Read Now

Introducing DSE Analytics Solo

Today, DataStax is announcing the introduction of DataStax Enterprise (DSE) Analytics Solo, a new offering to enable more flexible and cost-effective analytics processing of data stored in DataStax Enterprise.  

DSE Analytics Solo delivers all of the powerful features of DSE Analytics that have allowed numerous companies to blend the functionality of a continuously available, scalable, distributed data layer with a powerful analytic processing engine. Also, it is designed to cover some of the new deployment modes our customers have been leveraging as of late.

DSE Analytics Solo allows customers to deploy DSE Analytics processing on hardware configurations segregated from the DataStax database to ensure consistent behavior of both engines in a configuration that does not compete for compute resources. This separation of compute and storage configurations is good for processing-intensive analytic workloads, whereas DSE’s traditional collocated configuration, which allows for easy addition of analytic processing to the DataStax database without the need of additional hardware, is good when the analysis is not as intensive or the database is not as heavily in use.

DSE Analytics Solo enables customers to:

  • Leverage the same highly available, scalable, secure Apache Spark™ deployment that is included with DSE Analytics, including faster overall performance than open source Spark/Cassandra, a fault-tolerant resource manager with secured communications, the ability to create pools of resources grantable to particular users, and a continuously available, scalable, HDFS-compatible distributed file system.
  • Deploy on dedicated hardware, ensuring segregated resources for both the database and the processing engine, and predictable performance of both.
  • Have the flexibility to quickly and cost-effectively add more, or fewer, analytic processing nodes than database nodes, as the use case requires.
  • Deploy analytic processing nodes via the same OpsCenter management suite that manages the database nodes.


DataStax started a new era of big data processing in DSE with the introduction of Apache Spark to the DataStax platform, replacing Apache HadoopTM, in DSE 4.5. Since then, DataStax has invested in improving the integration of Spark in every DSE release to include:

  • A highly-available Spark Resource Manager allowing applications to be submitted at all times to any node, with minimal impact during failures;
  • Secured communications between all Spark processes (Master, Worker, Driver, and Executor) leveraging the security of the DataStax database drivers for all communications;
  • Continuously available, scalable, HDFS-compatible, distributed file system (DSEFS) with no single point of failure, no Zookeeper dependencies, etc;
  • Ability to define pools of resources and grant permissions to specific users to specific workpools, restricting which users can run applications on which resources;
  • Optimizations to more efficiently read from the DataStax database, up to 3x faster;
  • Capability to leverage DSE Search indices for increased performance; and
  • Spark Jobserver for REST submission and management of Spark applications, including the ability to share cached data between applications.

In DSE 5.1, the DSE Spark Resource Manager is able to support a variety of deployment configurations, including:

  • Collocated — The traditional deployment mode where all nodes in the Analytics data center are running both Spark and the DataStax database, and a copy of the data has been replicated to this data center;
  • DSE Analytics-Only Data Center —  In this mode a data center is DSE Analytics enabled but no data is replicated to the data center. So, while the data uses the same set of users and permissions, it is not replicated locally and is read from another data center in the same cluster. One advantage of this configuration is that scaling up or down the size of this DSE Analytics-Only Data Center does not require moving the database data, making those operations much quicker;
  • DSE Analytics-Only Cluster — This takes the DSE Analytics-Only Data Center scenario one step more decoupled. In this mode, the Spark cluster is a completely separate from DSE cluster. Data is read remotely, as in the DSE Analytics-Only Data Center scenario, but, since it is a separate cluster, the users are not necessarily the same.

The benefits of these non-collocated deployment configurations include:

  • Segregated hardware to remove resource contention between the analytics engine and the database;
  • Allowing configurations with more, or fewer, analytic processing nodes than database nodes;
  • Easier addition, or removal, of analytic processing nodes to the cluster, requiring no database data movement when changing cluster size; and
  • The ability to add multiple DSE Analytics-Only Data Centers to allocate separate analytic processing resources to different sets of users.

These new deployment configuration options allow administrators to choose a scenario that best suits their needs. If the analytic needs are light or the database is not overly busy, then the collocated configuration is probably suitable and is the simplest option.  

If analytic workloads are heavier or more consistent, such as with stream processing scenarios, then a DSE Analytics-Only Data Center configuration is a good choice, since it will protect the database and analytic engine from competing with each other for resources, but will still retain the same user management. If the analytic need is very sporadic and as such the administrator would like it to be very lightly coupled, then a DSE Analytics-Only Cluster configuration could be a good choice.

As an example, a user with a streaming application that analyzes streaming data, processes the incoming records and filters out 99.9% of the data, only persisting 0.1% of the incoming data, may choose a DSE Analytics-Only Data Center configuration with more analytic processing nodes than database nodes. This allows the user to deploy more analytic processing nodes than database nodes, since only 0.1% of the data is persisted. It also allows the user to segregate the processing nodes from the database nodes to remove resource contention, and to use the same set of users in the database and for the Spark cluster.

As another example, a user requiring a weekly report on the data stored in their DataStax database in a cloud deployment could deploy a DSE Analytics-Only Data Center configuration.  This would allow the user to use the analytic engine to produce the reports, and then reduce the size of the analytic cluster (potentially removing it) during the week to reduce cloud hardware instance costs, spinning them back up to run the next report.

It should be noted that the DSE Analytics-Only Data Center configuration is usually suitable for most scenarios that cover the DSE Analytics-Only Cluster configuration, and has some benefits for simpler management, including using the same users and permissions as the database nodes. For more technical information, see the DataStax documentation.

Analytics on Dedicated Nodes

To enable DSE Analytics-Only Data Center and DSE Analytics-Only Cluster deployments, the new DSE Analytics Solo option allows customers to add analytics processing nodes to their existing database nodes, but not store user data on those nodes. DSE Analytics Solo nodes use the same installation methods and configuration mechanisms as DSE Analytics, including OpsCenter, as well as advanced security features such as LDAP, but are licensed to only use DSE’s production-certified Spark processing engine and DSEFS and not to store DataStax database data or to use DSE Search or DSE Graph.  

For more information, please consult the DataStax documentation for DSE Analytics Solo and if you haven’t already, download DSE and check out the new DSE Analytics Solo option for yourself.

Numberly Shares 7 Lessons for Evaluating Scylla

numberlyAs word about Scylla continues to spread, we’re seeing more and more downloads of our open source software. We’re not always privy to our users’ experiences, but we’re very glad when we have the opportunity to share their results. A recent example of this is from Alexys Jacob of Numberly, who shared his experience evaluating Scylla for production on his personal Blog. In the first installment of a 2-part series, he describes his preparation for a successful POC-based evaluation with the help of the ScyllaDB team.

Numberly began using MongoDB but found that MongoDB’s primary/secondary architecture hinders performance because of a loss of write throughput. Also, when you want to set up a cluster, it appears to be an abstraction added on top of replica-sets. As Alexys summarizes his MongoDB experience, “To say the least, it is inefficient and cumbersome to operate and maintain.

Alexys also took a look at using Apache Cassandra for Numberly but was apprehensive about using a Java solution because of the heap sizes and tuning, Java overhead, and garbage collection that can temporarily stop the service. He heard about Scylla, a NoSQL database built in C++, which allows users to get more out of their machines, and decided to put it to the test.

Alexys openly provided feedback on the entire process, which is helpful to any organization evaluating a new technology. If an organization is not clear in what they are trying to achieve during a POC, problems can occur down the road when evaluating a product. Alexys also had good feedback from his experience working with the ScyllaDB staff. Here at ScyllaDB, we love feedback and always want to do better for our users.

Alexy’s blog post covers these seven lessons for evaluating Scylla:

  1. Have some background
  2. Work with a shared reference document
  3. Have monitoring in place
  4. Know when to stop
  5. Plan some high availability tests
  6. POC != production
  7. Make time

Get all the details in the blog post.

We’re looking forward to part two of Alexys’s series when he covers the technical aspects and details of the Scylla POC.

To learn more about Alexys’s journey with evaluating Scylla, check out his Blog post. If you are feeling eager to test out Scylla’s performance, you can take it for a Test Drive here.









The post Numberly Shares 7 Lessons for Evaluating Scylla appeared first on ScyllaDB.

DataStax and Oracle: Making “The Moment” Possible

The cloud has transformed customer engagement.  

From geo-locations to shopping history to clicks, likes, and taps, enterprises are using data from every possible touchpoint and channel to solve the greater equation of how to deliver meaningful, individualized experiences that create unwavering customer loyalty.

But most customers aren’t thinking about any of this.

They are not thinking about where their personal data is going or how it is likely being processed through legacy systems, accessed through a variety of applications, sent across a network, or stored in the cloud, making it vulnerable to hackers or fraudsters.

In short, they are not thinking about what is going on under the hood of their car… They are thinking about driving the car and capitalizing on “the moment”.

The moment they deposit their paycheck into their account.

The moment they buy a new bike for their kid.

The moment they hit “like” on an interesting article about hot-air ballooning.

What the collective sum of these moments ultimately translates to is intimacy. The feeling of being known. The feeling that their chosen brands can anticipate their needs and deliver exactly what they want, when and how they want it, regardless of where in the world they are or how many other customers may be expecting the same thing at the same time.

DataStax Enterprise, a data management platform built on the best distribution of Apache Cassandra™ NoSQL database, delivers this seamless experience for enterprises around the world in countless different verticals. From innovators, to digital natives, to industry stalwarts needing to reinvent themselves, DataStax provides the always-on, real-time data layer that makes “the moment” possible for every customer and every company.  McDonald’s, eBay, Sony, UBS, Walmart, and hundreds of other established enterprises rely on DataStax to deliver impactful digital experiences while protecting their data at its most raw and fundamental level.

It is in this spirit that we happily announce our partnership with Oracle Cloud Infrastructure, Oracle’s-next generation, internet-scale infrastructure service. Oracle Cloud Infrastructure is designed from the ground up to help modern enterprises do more while experiencing new levels of speed and flexibility with substantial savings over on-premises or cloud alternatives.

And the great thing is: you don’t have to worry about uprooting everything to start using it. Oracle can help customers extend their already-leveraged technology investments, tools, and processes by easily moving mission-critical workloads to Oracle Cloud Infrastructure without the time and cost of re-architecture.

Combining our technology with Oracle will undoubtedly further enhance our ability to be the power behind the moment for enterprises seeking to thrive in the right-now economy.

Go to the Oracle Cloud Jump Start page for a free, hands-on lab on DataStax Enterprise (DSE) Graph and CQL on Oracle Cloud Infrastructure. The lab will familiarize you with basic CQL concepts, DSE Graph, and Gremlin, and also provide you with a strong understanding of how DSE can help your organization deliver real-time customer experiences to capitalize on “the moment”.

Scaling Time Series Data Storage — Part I

by Ketan Duvedi, Jinhua Li, Dhruv Garg, Philip Fisher-Ogden


The growth of internet connected devices has led to a vast amount of easily accessible time series data. Increasingly, companies are interested in mining this data to derive useful insights and make data-informed decisions. Recent technology advancements have improved the efficiency of collecting, storing and analyzing time series data, spurring an increased appetite to consume this data. However this explosion of time series data can overwhelm most initial time series data architectures.

Netflix, being a data-informed company, is no stranger to these challenges and over the years has enhanced its solutions to manage the growth. In this 2-part blog post series, we will share how Netflix has evolved a time series data storage architecture through multiple increases in scale.

Time Series Data — Member Viewing History

Netflix members watch over 140 million hours of content per day. Each member provides several data points while viewing a title and they are stored as viewing records. Netflix analyzes the viewing data and provides real time accurate bookmarks and personalized recommendations as described in these posts:

Viewing history data increases along the following 3 dimensions:

  1. As time progresses, more viewing data is stored for each member.
  2. As member count grows, viewing data is stored for more members.
  3. As member monthly viewing hours increase, more viewing data is stored for each member.

As Netflix streaming has grown to 100M+ global members in its first 10 years there has been a massive increase in viewing history data. In this blog post we will focus on how we approached the big challenge of scaling storage of viewing history data.

Start Simple

The first cloud-native version of the viewing history storage architecture used Cassandra for the following reasons:

In the initial approach, each member’s viewing history was stored in Cassandra in a single row with row key:CustomerId. This horizontal partitioning enabled effective scaling with member growth and made the common use case of reading a member’s entire viewing history very simple and efficient. However as member count increased and, more importantly, each member streamed more and more titles, the row sizes as well as the overall data size increased. Over time, this resulted in high storage and operation cost as well as slower performance for members with large viewing history.

The following figure illustrates the read and write flows of the initial data model:

Figure 1: Single Table Data Model

Write Flow

One viewing record was inserted as a new column when a member started playing a title. That viewing record was updated after member paused or stopped the title. This single column write was fast and efficient.

Read Flows

Whole row read to retrieve all viewing records for one member: The read was efficient when the number of records per member was small. As a member watched more titles, the number of viewing records increased. Reading rows with a large number of columns put additional stress on Cassandra that negatively impacted read latencies.

Time range query to read a time slice of a member’s data: This resulted in the same inconsistent performance as above depending on the number of viewing records within the specified time range.

Whole row read via pagination for large viewing history: This was better for Cassandra as it wasn’t waiting for all the data to be ready before sending it back. This also avoided client timeouts. However it increased overall latency to read the whole row as the number of viewing records increased.

Slowdown Reasons

Let’s look at some of the Cassandra internals to understand why our initial simple design slowed down. As the data grew, the number of SSTables increased accordingly. Since only recent data was in memory, in many cases both the memtables and SSTables had to be read to retrieve viewing history. This had a negative impact on read latency. Similarly Compaction took more IOs and time as the data size increased. Read repair and Full column repair became slower as rows got wider.

Caching Layer

Cassandra performed very well writing viewing history data but there was a need to improve the read latencies. To optimize read latencies, at the expense of increased work during the write path, we added an in-memory sharded caching layer (EVCache) in front of Cassandra storage. The cache was a simple key value store with the key being CustomerId and value being the compressed binary representation of viewing history data. Each write to Cassandra incurred an additional cache lookup and on cache hit the new data was merged with the existing value. Viewing history reads were serviced by the cache first. On a cache miss, the entry was read from Cassandra, compressed and then inserted in the cache.

With the addition of the caching layer, this single Cassandra table storage approach worked very well for many years. Partitioning based on CustomerId scaled well in the Cassandra cluster. By 2012, the Viewing History Cassandra cluster was one of the biggest dedicated Cassandra clusters at Netflix. To scale further, the team needed to double the cluster size. This meant venturing into uncharted territory for Netflix’s usage of Cassandra. In the meanwhile, Netflix business was continuing to grow rapidly, including an increasing international member base and forthcoming ventures into original content.

Redesign: Live and Compressed Storage Approach

It became clear that a different approach was needed to scale for growth anticipated over the next 5 years. The team analyzed the data characteristics and usage patterns, and redesigned viewing history storage with two main goals in mind:

  1. Smaller Storage Footprint.
  2. Consistent Read/Write Performance as viewing per member grows.

For each member, viewing history data is divided into two sets:

  • Live or Recent Viewing History (LiveVH): Small number of recent viewing records with frequent updates. The data is stored in uncompressed form as in the simple design detailed above.
  • Compressed or Archival Viewing History (CompressedVH): Large number of older viewing records with rare updates. The data is compressed to reduce storage footprint. Compressed viewing history is stored in a single column per row key.

LiveVH and CompressedVH are stored in different tables and are tuned differently to achieve better performance. Since LiveVH has frequent updates and small number of viewing records, compactions are run frequently and gc_grace_seconds is small to reduce number of SSTables and data size. Read repair and full column family repair are run frequently to improve data consistency. Since updates to CompressedVH are rare, manual and infrequent full compactions are sufficient to reduce number of SSTables. Data is checked for consistency during the rare updates. This obviates the need for read repair as well as full column family repair.

Write Flow

New viewing records are written to LiveVH using the same approach as described earlier.

Read Flows

To get the benefit of the new design, the viewing history API was updated with an option to read recent or full data:

  • Recent Viewing History: For most cases this results in reading from LiveVH only, which limits the data size resulting in much lower latencies.
  • Full Viewing History: Implemented as parallel reads of LiveVH and CompressedVH. Due to data compression and CompressedVH having fewer columns, less data is read thereby significantly speeding up reads.

CompressedVH Update Flow

While reading viewing history records from LiveVH, if the number of records is over a configurable threshold then the recent viewing records are rolled up, compressed and stored in CompressedVH via a background task. Rolled up data is stored in a new row with row key:CustomerId. The new rollup is versioned and after being written is read to check for consistency. Only after verifying the consistency of the new version, the old version of rolled up data is deleted. For simplicity there is no locking during rollup and Cassandra takes care of resolving very rare duplicate writes (i.e., the last writer wins).

Figure 2: Live and Compressed Data Model

As shown in figure 2, the rolled up row in CompressedVH also stores metadata information like the latest version, object size and chunking information (more on that later). The version column stores a reference to the latest version of rolled up data so that reads for a CustomerId always return only the latest rolled up data. The rolled up data is stored in a single column to reduce compaction pressure. To minimize the frequency of rollups for members with frequent viewing pattern, just the last couple of days worth of viewing history records are kept in LiveVH after rollup and the rest are merged with the records in CompressedVH during rollup.

Auto Scaling via Chunking

For the majority of members, storing their entire viewing history in a single row of compressed data resulted in good performance during the read flows. For a small percentage of members with very large viewing history, reading CompressedVH from a single row started to slow down due to similar reasons as described in the first architecture. There was a need to have an upper bound on the read and write latencies for this rare case without negatively impacting the read and write latencies for the common case.

To solve for this, we split the rolled up compressed data into multiple chunks if the data size is greater than a configurable threshold. These chunks are stored on different Cassandra nodes. Parallel reads and writes of these chunks results in having an upper bound on the read and write latencies even for very large viewing data.

Figure 3: Auto Scale via Chunking

Write Flow

As figure 3 indicates, rolled up compressed data is split into multiple chunks based on a configurable chunk size. All chunks are written in parallel to different rows with row key:CustomerId$Version$ChunkNumber. Metadata is written to its own row with row key:CustomerId after successful write of the chunked data. This bounds the write latency to two writes for rollups of very large viewing data. In this case the metadata row has an empty data column to enable fast read of metadata.

To make the common case (compressed viewing data is smaller than the configurable threshold) fast, metadata is combined with the viewing data in the same row to eliminate metadata lookup overhead as shown in figure 2.

Read Flow

The metadata row is first read using CustomerId as the key. For the common case, the chunk count is 1 and the metadata row also has the most recent version of rolled up compressed viewing data. For the rare case, there are multiple chunks of compressed viewing data. Using the metadata information like version and chunk count, different row keys for the chunks are generated and all chunks are read in parallel. This bounds the read latency to two reads.

Caching Layer Changes

The in-memory caching layer was enhanced to support chunking for large entries. For members with large viewing history, it was not possible to fit the entire compressed viewing history in a single EVCache entry. So similar to the CompressedVH model, each large viewing history cache entry is broken into multiple chunks and the metadata is stored along with the first chunk.


By leveraging parallelism, compression, and an improved data model, the team was able to meet all of the goals:

  1. Smaller Storage Footprint via compression.
  2. Consistent Read/Write Performance via chunking and parallel reads/writes. Latency bound to one read and one write for common cases and latency bound to two reads and two writes for rare cases.
Figure 4: Results

The team achieved ~6X reduction in data size, ~13X reduction in system time spent on Cassandra maintenance, ~5X reduction in average read latency and ~1.5X reduction in average write latency. More importantly, it gave the team a scalable architecture and headroom to accommodate rapid growth of Netflix viewing data.

In the next part of this blog post series, we will explore the latest scalability challenges motivating the next iteration of viewing history storage architecture. If you are interested in solving similar problems, join us.

Scaling Time Series Data Storage — Part I was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Analyzing Cassandra Performance with Flame Graphs

One of the challenges of running large scale distributed systems is being able to pinpoint problems. It’s all too common to blame a random component (usually a database) whenever there’s a hiccup even when there’s no evidence to support the claim. We’ve already discussed the importance of monitoring tools, graphing and alerting metrics, and using distributed tracing systems like ZipKin to correctly identify the source of a problem in a complex system.

Once you’ve narrowed down the problem to a single system, what do you do? To figure this out, it’s going to depend on the nature of the problem, of course. Some issues are temporary, like a dead disk. Some are related to a human-introduced change, like a deployment or a wrong configuration setting. These have relatively straightforward solutions. Replace the disk, or rollback the deployment.

What about problems that are outside the scope of a simple change? One external factor that we haven’t mentioned so far is growth. Scale can be a difficult problem to understand because reproducing the issue is often nuanced and complex. These challenges are sometimes measured in throughput, (requests per second), size (terabytes), or latency (5ms p99). For instance, if a database server is able to serve every request out of memory, it may get excellent throughput. As the size of the dataset increases, random lookups are more and more likely to go to disk, decreasing throughput. Time Window Compaction Strategy is a great example of a solution to a scale problem that’s hard to understand unless the numbers are there. The pain of compaction isn’t felt until you’re dealing with a large enough volume of data to cause performance problems.

During the times of failure we all too often find ourselves thinking of the machine and its processes as a black box. Billions of instructions executing every second without the ability to peer inside and understand its mysteries.

Fortunately, we’re not completely blind as to what a machine is doing. For years we’ve had tools like debuggers and profilers available to us. Oracle’s JDK offers us Java Flight Recorder, which we can use to analyze running processes locally or in production:

mission control

Profiling with flight recorder is straightforward, but interpreting the results takes a little bit of work. Expanding the list of nested tables and looking for obvious issues is a bit more mental work than I’m interested in. It would be a lot easier if we could visualize the information. It requires a commercial license to use in production, and only works with the Oracle JDK.

That brings us back to the subject of this post: a way of generating useful visual information called a flame graph. A flame graph let’s us quickly identify the performance bottlenecks in a system. They were invented by Brendan Gregg. This is also part one of a very long series of performance tuning posts, we will be referring back to it as we dive deeper into the internals of Cassandra.

Swiss Java Knife

The approach we’ll examine in this post is utilizing the Swiss Java Knife, usually referred to as SJK, to capture the data from the JVM and generate the flame graphs. SJK is a fantastic collection of tools. Aside from generating flame graphs, we can inspect garbage collection statistics, watch threads, and do a variety of other diagnostic tasks. It works on Mac, Linux, and both the Oracle JDK and the OpenJDK.

I’ve downloaded the JAR, put it in my $HOME/bin and set up a shell function to call it easily:

sjk () {
        java -jar ~/bin/sjk-plus-0.8.jar "$@"

On my laptop I’m running a workload with cassandra-stress. I’ve prepopulated the database, and started the stress workload with the following command:

cassandra-stress read n=1000000

For the first step of our analysis, we need to capture the stack frames of our running Java application using the stcap feature of SJK. To do this, we need to pass in the process id and the file to which we will dump the data. The dumps are written in a binary format that we’ll be able to query later:

sjk stcap -p 92541 -i 10ms -o dump.std

Then we can analyze the data. If all we have is a terminal, we can print out a histogram of the analysis. This can be pretty useful on it’s own if there’s an obvious issue. In this case, we can see that a lot of time is spent in sun.misc.Unsafe.park, meaning threads are just waiting around, parked:

$ sjk ssa -f dump.std --histo
Trc     (%)  Frm  N  Term    (%)  Frame
372447  96%  372447       0   0%
309251  80%  309251  309251  80%  sun.misc.Unsafe.park(Native Method)
259376  67%  259376       0   0%  java.util.concurrent.locks.LockSupport.park(
254388  66%  254388       0   0%
 55709  14%  55709        0   0%  java.util.concurrent.ThreadPoolExecutor$
 52374  13%  52374        0   0%  org.apache.cassandra.concurrent.NamedThreadFactory$$Lambda$6/ Source)
 52374  13%  52374        0   0%  org.apache.cassandra.concurrent.NamedThreadFactory.lambda$threadLocalDeallocator$0(
 44892  11%  44892        0   0%  io.netty.util.concurrent.DefaultThreadFactory$
 44887  11%  44887        0   0%  java.util.concurrent.ThreadPoolExecutor.runWorker(
 42398  11%  42398        0   0%
 42398  11%  42398        0   0%  io.netty.util.concurrent.SingleThreadEventExecutor$
 42398  11%  42398        0   0%
 42398  11%  42398        0   0%
 42398  11%  42398        0   0%
 42398  11%  42398    42398  11% Method)
 42398  11%  42398        0   0%
 42398  11%  42398        0   0%

Now that we have our stcap dump, we can generate a flame graph with the following command:

sjk ssa --flame -f dump.std > flame-sjk.svg

When you open the SVG in a browser, you should end up with an image which looks something like this:


If you open the flame graph on your machine you can mouse over the different sections to see the method call and percentage of time it’s taking. The wider the bar, the more frequent it’s present in the stacks. It’s very easy to glance at the graph to understand where the time is spent in our program.

This is not the only technique for generating flame graphs. Brendan Gregg has a long list of links and references I recommend reading at the bottom of his FlameGraph page. I intend to write a utility to export the SJK format to the format that Brendan uses on his blog as it’s a little nicer to look, has a better mouseover, supports drill down, and has a search. They also support differential flame graphs, which are nice if you’re doing performance comparisons across different builds.

I hope you’ve enjoyed this post on visualizing Cassandra’s performance using FlameGraphs. We’ve used this tool several times with the teams we’ve worked with to tune Cassandra’s configurations and optimize performance. In the next post in this series we’ll be examining how to tune garbage collection parameters to maximize throughput while keeping latency to a minimum.

Meltdown's Impact on Cassandra Latency

What impact on latency should you expect from applying the kernel patches for the Meltdown security vulnerability?

TL;DR expect a latency increase of at least 20% for both reads and writes.


The Meltdown vulnerability, formally CVE-2017-5754, allows rogue processes to access kernel memory. Simple demonstrations have already appeared online on how to expose passwords and ssh private keys from memory. The consequences of this, in particular on shared hosts (ie cloud) are considered “catastrophic” by security analysts. Initially discovered in early 2017, the vulnerability was planned to be publicly announced on the 9th January 2018. However, due to the attention generated by the frequency of Linux kernel ‘page-table isolation’ (KPTI) patches committed late in 2017 the news broke early on 3rd January 2018.


Without updated hardware, the Linux kernel patches impact CPU usage. While userspace programs are not directly affected, anything that triggers a lot of interrupts to the CPU, such as a database’s use of IO and network, will suffer. Early reports are showing evidence of CPU usage taking a hit between 5% and 70%. Because of the potential CPU performance hit and lack of evidence available, The Last Pickle used a little time to see what impacts we could record for ourselves.


The hardware used for testing was a Lenovo X1 Carbon (gen 5) laptop. This machine runs an Intel Core i7-5600U CPU with 8Gb RAM. Running on it is Ubuntu 17.10 Artful. The unpatched kernel was version 4.13.0-21, and the patched kernel version 4.13.0-25. A physical machine was used to avoid the performance variances encountered in the different cloud environments.

The Ubuntu kernel was patched according to instructions here and the ppa:canonical-kernel-team/pti repository.


A simple schema, but typical of many Cassandra usages, was used on top of Cassandra-3.11.1 via a 3 node ccm cluster. The stress execution ran with 32 threads. Running stress, three nodes, and a large number threads on one piece of hardware was intentional so to increase thread/context switching and kernel overhead.

The stress run was limited to 5k requests per second so to avoid saturation, which occurred around 7k/s. The ratio of writes to reads was 1:1, with reads being split between whole partitions and single rows. The table used TWCS and was tuned down to 10 minute windows, so to ensure compactions ran during an otherwise short stress run. The stress ran for an hour against both the unpatched and patched kernels.

ccm stress user profile=stress.yaml ops\(insert=2,by_partition=1,by_row=1\) duration=1h -rate threads=32 throttle=5000/s -graph file=meltdown.html title=Meltdown revision=cassandra-3.11.1-unpatched


The following graphs show that over every percentile a 20%+ latency increase occurs. Sometimes the increase is up around 50%.

Meltdown Cassandra median

Meltdown Cassandra 95th

Meltdown Cassandra 99th

Meltdown Cassandra stats


The full stress results are available here.

Should you use incremental repair?

After seeing a lot of questions surrounding incremental repair on the mailing list and after observing several outages caused by it, we figured it would be good to write down our advices in a blog post.

Repair in Apache Cassandra is a maintenance operation that restores data consistency throughout a cluster. It is advised to run repair operations at leasts every gc_grace_seconds to ensure that tombstones will get replicated consistently to avoid zombie records if you perform DELETE statements on your tables.

Repair also facilitates recovery from outages that last longer than the hint window, or in case hints were dropped. For those operators already familiar with the repair concepts, there were a few back-to-basics moments when the behavior of repair changed significantly in the release of Apache Cassandra 2.2. The introduction of incremental repair as the default along with the generalization of anti-compaction created a whole new set of challenges.

How does repair work?

To perform repairs without comparing all data between all replicas, Apache Cassandra uses merkle trees to compare trees of hashed values instead.

Merkle tree

During a repair, each replica will build a merkle tree, using what is called a “validation compaction”. It is basically a compaction without the write phase, the output being a tree of hashes.

Validation compaction

Merkle trees will then be compared between replicas to identify mismatching leaves, each leaf containing several partitions. No difference check is made on a per partition basis : if one partition in a leaf is not in sync, then all partitions in the leaf are considered as not being in sync. When more data is sent over than is required it’s typically called overstreaming. Gigabytes of data can be streamed, even for one bit of difference. To mitigate overstreaming, people started performing subrange repairs by specifying the start/end tokens to repair by smaller chunks, which results in having less partitions per leaf.

With clusters growing in size and density, performing repairs within gc_grace_seconds started to get more and more challenging, with repairs sometimes lasting for tens of days. Some clever folks leveraged the immutable nature of SSTables and introduced incremental repair in Apache Cassandra 2.1.

What is incremental repair?

The plan with incremental repair was that once some data had been repaired, it would be marked as such and never needed to be repaired anymore.
Since SSTables can contain tokens from multiple token ranges, and repair is performed by token range, it was necessary to be able to separate repaired data from unrepaired data. That process is called anticompaction.


Once a repair session ends, each repaired SSTable will be split into 2 SSTables : one that contains the data that was repaired in the session (ie : data that belonged to the repaired token range) and another one with the remaining unrepaired data. The newly created SSTable containing repaired data will be marked as such by setting its repairedAt timestamp to the time of the repair session.
When performing validation compaction during the next incremental repair, Cassandra will skip the SSTables with a repairedAt timestamp higher than 0, and thus only compare data that is unrepaired.

Incremental repair

Incremental repair was actually promising enough that it was promoted as the default repair mode in C* 2.2, and anticompaction was since then also performed during full repairs.
To say the least, this was a bit of a premature move from the community as incremental repair has a few very annoying drawbacks and caveats that would make us consider it an experimental feature instead.

The problems of incremental repair

The most nasty one is filed in the Apache Cassandra JIRA as CASSANDRA-9143 with a fix ready for the unplanned 4.0 release. Between validation compaction and anticompaction, an SSTable that is involved in a repair can be compacted away as part of the standard compaction process on one node and not on the others. Such an SSTable will not get marked as repaired on that specific node while the rest of the cluster will consider the data it contained as repaired.
Thus, on the next incremental repair run, all the partitions contained by that SSTable will be seen as inconsistent and it can generate a fairly large amount of overstreaming. This is a particularly nasty bug when incremental repair is used in conjunction with Level Compaction Strategy (LCS). LCS is a very intensive strategy where SSTables get compacted way more often than with STCS and TWCS. LCS creates fixed sized SSTables, which can easily lead to have thousands of SSTables for a single table. The way streaming occurs in Apache Cassandra during repair makes that overstreaming of LCS tables could create tens of thousands of small SSTables in L0 which can ultimately bring nodes down and affect the whole cluster. This is particularly true when the nodes use a large number of vnodes.
We have seen happening on several customers clusters, and it requires then a lot of operational expertise to bring back the cluster to a sane state.

In addition to the bugs related to incorrectly marked sstables, there is significant overhead of anti-compaction. It was kind of a big surprise for users upgrading from 2.0/2.1 to 2.2 when trying to run repair. If there is already a lot of data on disk, the first incremental repair can take a lot of time (if not forever) and create a similar situation as above with a lot of SSTables being created due to anticompaction. Keep in mind that anticompaction will rewrite all SSTables on disk to separate repaired and unrepaired data.
While it’s not necessary anymore to “prepare” the migration to incremental repair, we would strongly advise against running it on a cluster with a lot of unrepaired data, without first marking SSTables as repaired. This would require to run a full repair first to make sure data is actually repaired, but now even full repair performs anticompaction, so… you see the problem.

A safety measure has been set in place to prevent SSTables going through anticompaction to be compacted, for valid reasons. The problem is that it will also prevent that SSTable from going through validation compaction which will lead repair sessions to fail if an SSTable is being anticompacted. Given that anticompaction also occurs with full repairs, this creates the following limitation : you cannot run repair on more than one node at a time without risking to have failed sessions due to concurrency on SSTables. This is true for incremental repair but also full repair, and it changes a lot of the habit you had to run repair in previous versions.

The only way to perform repair without anticompaction in “modern” versions of Apache Cassandra is subrange repair, which fully skips anticompaction. To perform a subrange repair correctly, you have three options :

Regardless, it is extremely important to note that repaired and unrepaired SSTables can never be compacted together. If you stop performing incremental repairs once you started, you could end up with outdated data not being cleaned up on disk due to the presence of the same partition in both states. So if you want to continue using incremental repair, make sure it runs very regularly, and if you want to move back to full subrange repairs you will need to mark all SSTables as unrepaired using sstablerepairedset.

Note that due to subrange repair not performing anti-compaction, is not possible to perform subrange repair in incremental mode.

Repair : state of the art in late 2017

Here’s our advice at the time of writing this blog post, based on our experience with customers : perform full subrange repair exclusively for now and do not ever run incremental repair. Just pretend that feature does not exist for now.

While the idea behind incremental repair is brilliant, the implementation still has flaws that can cause severe damage to a production cluster, especially when using LCS and DTCS. The improvements and fixes planned for 4.0 will need to be thoroughly tested to prove they fixed incremental repair and allow it to be safely used as a daily routine.

We are confident that future releases will make incremental repair better, allowing the operation to be safe and blazing fast compared to full repairs.