Introducing Netflix’s TimeSeries Data Abstraction Layer
By Rajiv Shringi, Vinay Chella, Kaidan Fullerton, Oleksii Tkachuk, Joey Lynch
Introduction
As Netflix continues to expand and diversify into various sectors like Video on Demand and Gaming, the ability to ingest and store vast amounts of temporal data — often reaching petabytes — with millisecond access latency has become increasingly vital. In previous blog posts, we introduced the Key-Value Data Abstraction Layer and the Data Gateway Platform, both of which are integral to Netflix’s data architecture. The Key-Value Abstraction offers a flexible, scalable solution for storing and accessing structured key-value data, while the Data Gateway Platform provides essential infrastructure for protecting, configuring, and deploying the data tier.
Building on these foundational abstractions, we developed the TimeSeries Abstraction — a versatile and scalable solution designed to efficiently store and query large volumes of temporal event data with low millisecond latencies, all in a cost-effective manner across various use cases.
In this post, we will delve into the architecture, design principles, and real-world applications of the TimeSeries Abstraction, demonstrating how it enhances our platform’s ability to manage temporal data at scale.
Note: Contrary to what the name may suggest, this system is not built as a general-purpose time series database. We do not use it for metrics, histograms, timers, or any such near-real time analytics use case. Those use cases are well served by the Netflix Atlas telemetry system. Instead, we focus on addressing the challenge of storing and accessing extremely high-throughput, immutable temporal event data in a low-latency and cost-efficient manner.
Challenges
At Netflix, temporal data is continuously generated and utilized, whether from user interactions like video-play events, asset impressions, or complex micro-service network activities. Effectively managing this data at scale to extract valuable insights is crucial for ensuring optimal user experiences and system reliability.
However, storing and querying such data presents a unique set of challenges:
- High Throughput: Managing up to 10 million writes per second while maintaining high availability.
- Efficient Querying in Large Datasets: Storing petabytes of data while ensuring primary key reads return results within low double-digit milliseconds, and supporting searches and aggregations across multiple secondary attributes.
- Global Reads and Writes: Facilitating read and write operations from anywhere in the world with adjustable consistency models.
- Tunable Configuration: Offering the ability to partition datasets in either a single-tenant or multi-tenant datastore, with options to adjust various dataset aspects such as retention and consistency.
- Handling Bursty Traffic: Managing significant traffic spikes during high-demand events, such as new content launches or regional failovers.
- Cost Efficiency: Reducing the cost per byte and per operation to optimize long-term retention while minimizing infrastructure expenses, which can amount to millions of dollars for Netflix.
TimeSeries Abstraction
The TimeSeries Abstraction was developed to meet these requirements, built around the following core design principles:
- Partitioned Data: Data is partitioned using a unique temporal partitioning strategy combined with an event bucketing approach to efficiently manage bursty workloads and streamline queries.
- Flexible Storage: The service is designed to integrate with various storage backends, including Apache Cassandra and Elasticsearch, allowing Netflix to customize storage solutions based on specific use case requirements.
- Configurability: TimeSeries offers a range of tunable options for each dataset, providing the flexibility needed to accommodate a wide array of use cases.
- Scalability: The architecture supports both horizontal and vertical scaling, enabling the system to handle increasing throughput and data volumes as Netflix expands its user base and services.
- Sharded Infrastructure: Leveraging the Data Gateway Platform, we can deploy single-tenant and/or multi-tenant infrastructure with the necessary access and traffic isolation.
Let’s dive into the various aspects of this abstraction.
Data Model
We follow a unique event data model that encapsulates all the data we want to capture for events, while allowing us to query them efficiently.
Let’s start with the smallest unit of data in the abstraction and work our way up.
- Event Item: An event item is a key-value pair that users use to store data for a given event. For example: {“device_type”: “ios”}.
- Event: An event is a structured collection of one or more such event items. An event occurs at a specific point in time and is identified by a client-generated timestamp and an event identifier (such as a UUID). This combination of event_time and event_id also forms part of the unique idempotency key for the event, enabling users to safely retry requests.
- Time Series ID: A time_series_id is a collection of one or more such events over the dataset’s retention period. For instance, a device_id would store all events occurring for a given device over the retention period. All events are immutable, and the TimeSeries service only ever appends events to a given time series ID.
- Namespace: A namespace is a collection of time series IDs and event data, representing the complete TimeSeries dataset. Users can create one or more namespaces for each of their use cases. The abstraction applies various tunable options at the namespace level, which we will discuss further when we explore the service’s control plane.
API
The abstraction provides the following APIs to interact with the event data.
WriteEventRecordsSync: This endpoint writes a batch of events and sends back a durability acknowledgement to the client. This is used in cases where users require a guarantee of durability.
WriteEventRecords: This is the fire-and-forget version of the above endpoint. It enqueues a batch of events without the durability acknowledgement. This is used in cases like logging or tracing, where users care more about throughput and can tolerate a small amount of data loss.
{
"namespace": "my_dataset",
"events": [
{
"timeSeriesId": "profile100",
"eventTime": "2024-10-03T21:24:23.988Z",
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventItems": [
{
"eventItemKey": "ZGV2aWNlVHlwZQ==",
"eventItemValue": "aW9z"
},
{
"eventItemKey": "ZGV2aWNlTWV0YWRhdGE=",
"eventItemValue": "c29tZSBtZXRhZGF0YQ=="
}
]
},
{
"timeSeriesId": "profile100",
"eventTime": "2024-10-03T21:23:30.000Z",
"eventId": "123e4567-e89b-12d3-a456-426614174000",
"eventItems": [
{
"eventItemKey": "ZGV2aWNlVHlwZQ==",
"eventItemValue": "YW5kcm9pZA=="
}
]
}
]
}
ReadEventRecords: Given a combination of a namespace, a timeSeriesId, a timeInterval, and optional eventFilters, this endpoint returns all the matching events, sorted descending by event_time, with low millisecond latency.
{
"namespace": "my_dataset",
"timeSeriesId": "profile100",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"eventFilters": [
{
"matchEventItemKey": "ZGV2aWNlVHlwZQ==",
"matchEventItemValue": "aW9z"
}
],
"pageSize": 100,
"totalRecordLimit": 1000
}
SearchEventRecords: Given a search criteria and a time interval, this endpoint returns all the matching events. These use cases are fine with eventually consistent reads.
{
"namespace": "my_dataset",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"searchQuery": {
"booleanQuery": {
"searchQuery": [
{
"equals": {
"eventItemKey": "ZGV2aWNlVHlwZQ==",
"eventItemValue": "aW9z"
}
},
{
"range": {
"eventItemKey": "ZGV2aWNlUmVnaXN0cmF0aW9uVGltZXN0YW1w",
"lowerBound": {
"eventItemValue": "MjAyNC0xMC0wMlQwMDowMDowMC4wMDBa",
"inclusive": true
},
"upperBound": {
"eventItemValue": "MjAyNC0xMC0wM1QwMDowMDowMC4wMDBa"
}
}
}
],
"operator": "AND"
}
},
"pageSize": 100,
"totalRecordLimit": 1000
}
AggregateEventRecords: Given a search criteria and an aggregation mode (e.g. DistinctAggregation) , this endpoint performs the given aggregation within a given time interval. Similar to the Search endpoint, users can tolerate eventual consistency and a potentially higher latency (in seconds).
{
"namespace": "my_dataset",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"searchQuery": {...some search criteria...},
"aggregationQuery": {
"distinct": {
"eventItemKey": "deviceType",
"pageSize": 100
}
}
}
In the subsequent sections, we will talk about how we interact with this data at the storage layer.
Storage Layer
The storage layer for TimeSeries comprises a primary data store and an optional index data store. The primary data store ensures data durability during writes and is used for primary read operations, while the index data store is utilized for search and aggregate operations. At Netflix, Apache Cassandra is the preferred choice for storing durable data in high-throughput scenarios, while Elasticsearch is the preferred data store for indexing. However, similar to our approach with the API, the storage layer is not tightly coupled to these specific data stores. Instead, we define storage API contracts that must be fulfilled, allowing us the flexibility to replace the underlying data stores as needed.
Primary Datastore
In this section, we will talk about how we leverage Apache Cassandra for TimeSeries use cases.
Partitioning Scheme
At Netflix’s scale, the continuous influx of event data can quickly overwhelm traditional databases. Temporal partitioning addresses this challenge by dividing the data into manageable chunks based on time intervals, such as hourly, daily, or monthly windows. This approach enables efficient querying of specific time ranges without the need to scan the entire dataset. It also allows Netflix to archive, compress, or delete older data efficiently, optimizing both storage and query performance. Additionally, this partitioning mitigates the performance issues typically associated with wide partitions in Cassandra. By employing this strategy, we can operate at much higher disk utilization, as it reduces the need to reserve large amounts of disk space for compactions, thereby saving costs.
Here is what it looks like :
Time Slice: A time slice is the unit of data retention and maps directly to a Cassandra table. We create multiple such time slices, each covering a specific interval of time. An event lands in one of these slices based on the event_time. These slices are joined with no time gaps in between, with operations being start-inclusive and end-exclusive, ensuring that all data lands in one of the slices.
Why not use row-based Time-To-Live (TTL)?
Using TTL on individual events would generate a significant number of tombstones in Cassandra, degrading performance, especially during range scans. By employing discrete time slices and dropping them, we avoid the tombstone issue entirely. The tradeoff is that data may be retained slightly longer than necessary, as an entire table’s time range must fall outside the retention window before it can be dropped. Additionally, TTLs are difficult to adjust later, whereas TimeSeries can extend the dataset retention instantly with a single control plane operation.
Time Buckets: Within a time slice, data is further partitioned into time buckets. This facilitates effective range scans by allowing us to target specific time buckets for a given query range. The tradeoff is that if a user wants to read the entire range of data over a large time period, we must scan many partitions. We mitigate potential latency by scanning these partitions in parallel and aggregating the data at the end. In most cases, the advantage of targeting smaller data subsets outweighs the read amplification from these scatter-gather operations. Typically, users read a smaller subset of data rather than the entire retention range.
Event Buckets: To manage extremely high-throughput write operations, which may result in a burst of writes for a given time series within a short period, we further divide the time bucket into event buckets. This prevents overloading the same partition for a given time range and also reduces partition sizes further, albeit with a slight increase in read amplification.
Note: With Cassandra 4.x onwards, we notice a substantial improvement in the performance of scanning a range of data in a wide partition. See Future Enhancements at the end to see the Dynamic Event bucketing work that aims to take advantage of this.
Storage Tables
We use two kinds of tables
- Data tables: These are the time slices that store the actual event data.
- Metadata table: This table stores information about how each time slice is configured per namespace.
Data tables
The partition key enables splitting events for a time_series_id over a range of time_bucket(s) and event_bucket(s), thus mitigating hot partitions, while the clustering key allows us to keep data sorted on disk in the order we almost always want to read it. The value_metadata column stores metadata for the event_item_value such as compression.
Writing to the data table:
User writes will land in a given time slice, time bucket, and event bucket as a factor of the event_time attached to the event. This factor is dictated by the control plane configuration of a given namespace.
For example:
During this process, the writer makes decisions on how to handle the data before writing, such as whether to compress it. The value_metadata column records any such post-processing actions, ensuring that the reader can accurately interpret the data.
Reading from the data table:
The below illustration depicts at a high-level on how we scatter-gather the reads from multiple partitions and join the result set at the end to return the final result.
Metadata table
This table stores the configuration data about the time slices for a given namespace.
Note the following:
- No Time Gaps: The end_time of a given time slice overlaps with the start_time of the next time slice, ensuring all events find a home.
- Retention: The status indicates which tables fall inside and outside of the retention window.
- Flexible: This metadata can be adjusted per time slice, allowing us to tune the partition settings of future time slices based on observed data patterns in the current time slice.
There is a lot more information that can be stored into the metadata column (e.g., compaction settings for the table), but we only show the partition settings here for brevity.
Index Datastore
To support secondary access patterns via non-primary key attributes, we index data into Elasticsearch. Users can configure a list of attributes per namespace that they wish to search and/or aggregate data on. The service extracts these fields from events as they stream in, indexing the resultant documents into Elasticsearch. Depending on the throughput, we may use Elasticsearch as a reverse index, retrieving the full data from Cassandra, or we may store the entire source data directly in Elasticsearch.
Note: Again, users are never directly exposed to Elasticsearch, just like they are not directly exposed to Cassandra. Instead, they interact with the Search and Aggregate API endpoints that translate a given query to that needed for the underlying datastore.
In the next section, we will talk about how we configure these data stores for different datasets.
Control Plane
The data plane is responsible for executing the read and write operations, while the control plane configures every aspect of a namespace’s behavior. The data plane communicates with the TimeSeries control stack, which manages this configuration information. In turn, the TimeSeries control stack interacts with a sharded Data Gateway Platform Control Plane that oversees control configurations for all abstractions and namespaces.
Separating the responsibilities of the data plane and control plane helps maintain the high availability of our data plane, as the control plane takes on tasks that may require some form of schema consensus from the underlying data stores.
Namespace Configuration
The below configuration snippet demonstrates the immense flexibility of the service and how we can tune several things per namespace using our control plane.
"persistence_configuration": [
{
"id": "PRIMARY_STORAGE",
"physical_storage": {
"type": "CASSANDRA", // type of primary storage
"cluster": "cass_dgw_ts_tracing", // physical cluster name
"dataset": "tracing_default" // maps to the keyspace
},
"config": {
"timePartition": {
"secondsPerTimeSlice": "129600", // width of a time slice
"secondPerTimeBucket": "3600", // width of a time bucket
"eventBuckets": 4 // how many event buckets within
},
"queueBuffering": {
"coalesce": "1s", // how long to coalesce writes
"bufferCapacity": 4194304 // queue capacity in bytes
},
"consistencyScope": "LOCAL", // single-region/multi-region
"consistencyTarget": "EVENTUAL", // read/write consistency
"acceptLimit": "129600s" // how far back writes are allowed
},
"lifecycleConfigs": {
"lifecycleConfig": [ // Primary store data retention
{
"type": "retention",
"config": {
"close_after": "1296000s", // close for reads/writes
"delete_after": "1382400s" // drop time slice
}
}
]
}
},
{
"id": "INDEX_STORAGE",
"physicalStorage": {
"type": "ELASTICSEARCH", // type of index storage
"cluster": "es_dgw_ts_tracing", // ES cluster name
"dataset": "tracing_default_useast1" // base index name
},
"config": {
"timePartition": {
"secondsPerSlice": "129600" // width of the index slice
},
"consistencyScope": "LOCAL",
"consistencyTarget": "EVENTUAL", // how should we read/write data
"acceptLimit": "129600s", // how far back writes are allowed
"indexConfig": {
"fieldMapping": { // fields to extract to index
"tags.nf.app": "KEYWORD",
"tags.duration": "INTEGER",
"tags.enabled": "BOOLEAN"
},
"refreshInterval": "60s" // Index related settings
}
},
"lifecycleConfigs": {
"lifecycleConfig": [
{
"type": "retention", // Index retention settings
"config": {
"close_after": "1296000s",
"delete_after": "1382400s"
}
}
]
}
}
]
Provisioning Infrastructure
With so many different parameters, we need automated provisioning workflows to deduce the best settings for a given workload. When users want to create their namespaces, they specify a list of workload desires, which the automation translates into concrete infrastructure and related control plane configuration. We highly encourage you to watch this ApacheCon talk, by one of our stunning colleagues Joey Lynch, on how we achieve this. We may go into detail on this subject in one of our future blog posts.
Once the system provisions the initial infrastructure, it then scales in response to the user workload. The next section describes how this is achieved.
Scalability
Our users may operate with limited information at the time of provisioning their namespaces, resulting in best-effort provisioning estimates. Further, evolving use-cases may introduce new throughput requirements over time. Here’s how we manage this:
- Horizontal scaling: TimeSeries server instances can auto-scale up and down as per attached scaling policies to meet the traffic demand. The storage server capacity can be recomputed to accommodate changing requirements using our capacity planner.
- Vertical scaling: We may also choose to vertically scale our TimeSeries server instances or our storage instances to get greater CPU, RAM and/or attached storage capacity.
- Scaling disk: We may attach EBS to store data if the capacity planner prefers infrastructure that offers larger storage at a lower cost rather than SSDs optimized for latency. In such cases, we deploy jobs to scale the EBS volume when the disk storage reaches a certain percentage threshold.
- Re-partitioning data: Inaccurate workload estimates can lead to over or under-partitioning of our datasets. TimeSeries control-plane can adjust the partitioning configuration for upcoming time slices, once we realize the nature of data in the wild (via partition histograms). In the future we plan to support re-partitioning of older data and dynamic partitioning of current data.
Design Principles
So far, we have seen how TimeSeries stores, configures and interacts with event datasets. Let’s see how we apply different techniques to improve the performance of our operations and provide better guarantees.
Event Idempotency
We prefer to bake in idempotency in all mutation endpoints, so that users can retry or hedge their requests safely. Hedging is when the client sends an identical competing request to the server, if the original request does not come back with a response in an expected amount of time. The client then responds with whichever request completes first. This is done to keep the tail latencies for an application relatively low. This can only be done safely if the mutations are idempotent. For TimeSeries, the combination of event_time, event_id and event_item_key form the idempotency key for a given time_series_id event.
SLO-based Hedging
We assign Service Level Objectives (SLO) targets for different endpoints within TimeSeries, as an indication of what we think the performance of those endpoints should be for a given namespace. We can then hedge a request if the response does not come back in that configured amount of time.
"slos": {
"read": { // SLOs per endpoint
"latency": {
"target": "0.5s", // hedge around this number
"max": "1s" // time-out around this number
}
},
"write": {
"latency": {
"target": "0.01s",
"max": "0.05s"
}
}
}
Partial Return
Sometimes, a client may be sensitive to latency and willing to accept a partial result set. A real-world example of this is real-time frequency capping. Precision is not critical in this case, but if the response is delayed, it becomes practically useless to the upstream client. Therefore, the client prefers to work with whatever data has been collected so far rather than timing out while waiting for all the data. The TimeSeries client supports partial returns around SLOs for this purpose. Importantly, we still maintain the latest order of events in this partial fetch.
Adaptive Pagination
All reads start with a default fanout factor, scanning 8 partition buckets in parallel. However, if the service layer determines that the time_series dataset is dense — i.e., most reads are satisfied by reading the first few partition buckets — then it dynamically adjusts the fanout factor of future reads in order to reduce the read amplification on the underlying datastore. Conversely, if the dataset is sparse, we may want to increase this limit with a reasonable upper bound.
Limited Write Window
In most cases, the active range for writing data is smaller than the range for reading data — i.e., we want a range of time to become immutable as soon as possible so that we can apply optimizations on top of it. We control this by having a configurable “acceptLimit” parameter that prevents users from writing events older than this time limit. For example, an accept limit of 4 hours means that users cannot write events older than now() — 4 hours. We sometimes raise this limit for backfilling historical data, but it is tuned back down for regular write operations. Once a range of data becomes immutable, we can safely do things like caching, compressing, and compacting it for reads.
Buffering Writes
We frequently leverage this service for handling bursty workloads. Rather than overwhelming the underlying datastore with this load all at once, we aim to distribute it more evenly by allowing events to coalesce over short durations (typically seconds). These events accumulate in in-memory queues running on each instance. Dedicated consumers then steadily drain these queues, grouping the events by their partition key, and batching the writes to the underlying datastore.
The queues are tailored to each datastore since their operational characteristics depend on the specific datastore being written to. For instance, the batch size for writing to Cassandra is significantly smaller than that for indexing into Elasticsearch, leading to different drain rates and batch sizes for the associated consumers.
While using in-memory queues does increase JVM garbage collection, we have experienced substantial improvements by transitioning to JDK 21 with ZGC. To illustrate the impact, ZGC has reduced our tail latencies by an impressive 86%:
Because we use in-memory queues, we are prone to losing events in case of an instance crash. As such, these queues are only used for use cases that can tolerate some amount of data loss .e.g. tracing/logging. For use cases that need guaranteed durability and/or read-after-write consistency, these queues are effectively disabled and writes are flushed to the data store almost immediately.
Dynamic Compaction
Once a time slice exits the active write window, we can leverage the immutability of the data to optimize it for read performance. This process may involve re-compacting immutable data using optimal compaction strategies, dynamically shrinking and/or splitting shards to optimize system resources, and other similar techniques to ensure fast and reliable performance.
The following section provides a glimpse into the real-world performance of some of our TimeSeries datasets.
Real-world Performance
The service can write data in the order of low single digit milliseconds
while consistently maintaining stable point-read latencies:
At the time of writing this blog, the service was processing close to 15 million events/second across all the different datasets at peak globally.
Time Series Usage @ Netflix
The TimeSeries Abstraction plays a vital role across key services at Netflix. Here are some impactful use cases:
- Tracing and Insights: Logs traces across all apps and micro-services within Netflix, to understand service-to-service communication, aid in debugging of issues, and answer support requests.
- User Interaction Tracking: Tracks millions of user interactions — such as video playbacks, searches, and content engagement — providing insights that enhance Netflix’s recommendation algorithms in real-time and improve the overall user experience.
- Feature Rollout and Performance Analysis: Tracks the rollout and performance of new product features, enabling Netflix engineers to measure how users engage with features, which powers data-driven decisions about future improvements.
- Asset Impression Tracking and Optimization: Tracks asset impressions ensuring content and assets are delivered efficiently while providing real-time feedback for optimizations.
- Billing and Subscription Management: Stores historical data related to billing and subscription management, ensuring accuracy in transaction records and supporting customer service inquiries.
and more…
Future Enhancements
As the use cases evolve, and the need to make the abstraction even more cost effective grows, we aim to make many improvements to the service in the upcoming months. Some of them are:
- Tiered Storage for Cost Efficiency: Support moving older, lesser-accessed data into cheaper object storage that has higher time to first byte, potentially saving Netflix millions of dollars.
- Dynamic Event Bucketing: Support real-time partitioning of keys into optimally-sized partitions as events stream in, rather than having a somewhat static configuration at the time of provisioning a namespace. This strategy has a huge advantage of not partitioning time_series_ids that don’t need it, thus saving the overall cost of read amplification. Also, with Cassandra 4.x, we have noted major improvements in reading a subset of data in a wide partition that could lead us to be less aggressive with partitioning the entire dataset ahead of time.
- Caching: Take advantage of immutability of data and cache it intelligently for discrete time ranges.
- Count and other Aggregations: Some users are only interested in counting events in a given time interval rather than fetching all the event data for it.
Conclusion
The TimeSeries Abstraction is a vital component of Netflix’s online data infrastructure, playing a crucial role in supporting both real-time and long-term decision-making. Whether it’s monitoring system performance during high-traffic events or optimizing user engagement through behavior analytics, TimeSeries Abstraction ensures that Netflix operates seamlessly and efficiently on a global scale.
As Netflix continues to innovate and expand into new verticals, the TimeSeries Abstraction will remain a cornerstone of our platform, helping us push the boundaries of what’s possible in streaming and beyond.
Stay tuned for Part 2, where we’ll introduce our Distributed Counter Abstraction, a key element of Netflix’s Composite Abstractions, built on top of the TimeSeries Abstraction.
Acknowledgments
Special thanks to our stunning colleagues who contributed to TimeSeries Abstraction’s success: Tom DeVoe Mengqing Wang, Kartik Sathyanarayanan, Jordan West, Matt Lehman, Cheng Wang, Chris Lohfink .
Introducing Netflix’s TimeSeries Data Abstraction Layer was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.
We Compared ScyllaDB and Memcached and… We Lost?
An in-depth look at database and cache internals, and the tradeoffs in each. ScyllaDB would like to publicly acknowledge dormando (Memcached maintainer) and Danny Kopping for their contributions to this project, as well as thank them for their support and patience. Engineers behind ScyllaDB – the database for predictable performance at scale – joined forces with Memcached maintainer dormando to compare both technologies head-to-head, in a collaborative vendor-neutral way. The results reveal that: Both Memcached and ScyllaDB maximized disks and network bandwidth while being stressed under similar conditions, sustaining similar performance overall. While ScyllaDB required data modeling changes to fully saturate the network throughput, Memcached required additional IO threads to saturate disk I/O. Although ScyllaDB showed better latencies when compared to Memcached pipelined requests to disk, Memcached latencies were better for individual requests. This document explains our motivation for these tests, provides a summary of the tested scenarios and results, then presents recommendations for anyone who might be deciding between ScyllaDB and Memcached. Along the way, we analyze the architectural differences behind these two solutions and discuss the tradeoffs involved in each. There’s also a detailed Gitbook for this project, with a more extensive look at the tests and results and links to the specific configurations you can use to perform the tests yourself. Bonus: dormando and I will be discussing this project at P99 CONF, a highly technical conference on performance and low latency engineering. It’s free and virtual, October 23 and 24. I invite you to join, and bring your questions for us! Join us – it’s free and virtual Why have we done this? First and foremost, ScyllaDB invested lots of time and engineering resources optimizing our database to deliver predictable low latencies for real-time data-intensive applications. ScyllaDB’s shard-per-core, shared-nothing architecture, userspace I/O scheduler and internal cache implementation (fully bypassing the Linux page cache) are some notable examples of such optimizations. Second: performance converges over time. In-memory caches have been (for a long time) regarded as one of the fastest infrastructure components around. Yet, it’s been a few years now since caching solutions started to look into the realm of flash disks. These initiatives obviously pose an interesting question: If an in-memory cache can rely on flash storage, then why can’t a persistent database also work as a cache? Third: We previously discussed 7 Reasons Not to Put a Cache in Front of Your Database and recently explored how specific teams have successfully replaced their caches with ScyllaDB. Fourth: At last year’s P99 CONF, Danny Kopping gave us an enlightening talk, Cache Me If You Can, where he explained how Memcached Extstore helped Grafana Labs scale their cache footprint 42x while driving down costs. And finally, despite the (valid) criticism that performance benchmarks receive, they still play an important role in driving innovation. Benchmarks are a useful resource for engineers seeking in-house optimization opportunities. Now, on to the comparison. Setup Instances Tests were carried out using the following AWS instance types: Loader: c7i.16xlarge (64 vCPUs, 128GB RAM) Memcached: i4i.4xlarge (16 vCPUs, 128GB RAM, 3.75TB NVMe) ScyllaDB: i4i.4xlarge (16 vCPUs, 128GB RAM, 3.75TB NVMe) All instances can deliver up to 25Gbps of network bandwidth. Keep in mind that specially during tests maxing out the promised Network Capacity, we noticed throttling shrinking down the bandwidth down to the instances’ baseline capacity. Optimizations and Settings To overcome potential bottlenecks, the following optimizations and settings were applied: AWS side: All instances used a Cluster placement strategy, following the AWS Docs: “This strategy enables workloads to achieve the low-latency network performance necessary for tightly-coupled node-to-node communication that is typical of high-performance computing (HPC) applications.” Memcached: Version 1.6.25, compiled with Extstore enabled. Except where denoted, run with 14 threads, pinned to specific CPUs. The remaining 2 vCPUs were assigned to CPU 0 (core & HT sibling) to handle Network IRQs, as specified by the sq_split mode in seastar perftune.py. CAS operations were disabled to save space on per-item overhead. The full command line arguments were: taskset -c 1-7,9-15 /usr/local/memcached/bin/memcached -v -A -r -m 114100 -c 4096 –lock-memory –threads 14 -u scylla -C ScyllaDB: Default settings as configured by ScyllaDB Enterprise 2024.1.2 AMI (ami-id: ami-018335b47ba6bdf9a) in an i4i.4xlarge. This includes the same CPU pinning settings as described above for Memcached. Stressors For Memcached loaders, we used mcshredder, part of memcached’s official testing suite. The applicable stressing profiles are in the fee-mendes/shredders GitHub repository. For ScyllaDB, we used cassandra-stress, as shipped with ScyllaDB, and specified comparable workloads as the ones used for Memcached. Tests and Results The following is a summary of the tests we conducted and their results. If you want a more detailed description and analysis, go to the extended writeup of this project. RAM Caching Efficiency The more items you can fit into RAM, the better your chance of getting cache hits. More cache hits result in significantly faster access than going to disk. Ultimately, that improves latency. This project began by measuring how many items we could store to each datastore. Throughout our tests, the key was between 4 to 12 bytes (key0 .. keyN) for Memcached, and 12 bytes for ScyllaDB. The value was fixed to 1000 bytes. Memcached Memcached stored roughly 101M items until eviction started. It’s memory efficient. Out of Memcached’s 114G assigned memory, this is approximately 101G worth of values, without considering the key size and other flags: Memcached stored 101M items in memory before evictions started ScyllaDB ScyllaDB stored between 60 to 61M items before evictions started. This is no surprise, given that its protocol requires more data to be stored as part of a write (such as the write timestamp since epoch, row liveness, etc). ScyllaDB also persists data to disk as you go, which means that Bloom Filters (and optionally Indexes) need to be stored in memory for subsequent disk lookups. With ScyllaDB, eviction starts under memory pressure while trying to load 61M rows Takeaways Memcached stored approximately 65% more in-memory items than ScyllaDB. ScyllaDB rows have higher per-item overhead to support a wide-column orientation. In ScyllaDB, Bloom Filters, Index Caching, and other components are also stored in-memory to support efficient disk lookups, contributing to yet another layer of overhead. Read-only In-Memory Workload The ideal (though unrealistic) workload for a cache is one where all the data fits in RAM – so that reads don’t require disk accesses and no evictions or misses occur. Both ScyllaDB and Memcached employ LRU (Least Recently Used) logic for freeing up memory: When the system runs under pressure, items get evicted from the LRU’s tail; these are typically the least active items. Taking evictions and cache misses out of the picture helps measure and set a performance baseline for both datastores. It places the focus on what matters most for these kinds of workloads: read throughput and request latency. In this test, we first warmed up both stores with the same payload sizes used during the previous test. Then, we initiated reads against their respective ranges for 30 minutes. Memcached Memcached achieved an impressive 3 Million Gets per second, fully maximizing AWS NIC bandwidth (25 Gbps)! Memcached kept a steady 3M rps, fully maximizing the NIC throughput The parsed results show that p99.999 responses completed below 1ms: stat: cmd_get : Total Ops: 5503513496 Rate: 3060908/s === timer mg === 1-10us 0 0.000% 10-99us 343504394 6.238% 100-999us 5163057634 93.762% 1-2ms 11500 0.00021% ScyllaDB To read more rows in ScyllaDB, we needed to devise a better data model for client requests due to protocol characteristics (in particular, no pipelining). With a clustering key, we could fully maximize ScyllaDB’s cache, resulting in a significant improvement in the number of cached rows. We ingested 5M partitions, each with 16 clustering keys, for a total of 80M cached rows. As a result, the number of records within the cache significantly improved compared to the key-value numbers shown previously. As dormando correctly pointed out (thanks!), this configuration is significantly different than the previous Memcached set-up. While the Memcached workload always hits an individual key-value pair, a single request in ScyllaDB results in several rows being returned. Notably, the same results could be achieved using Memcached by feeding the entire payload as the value under a single key, with the results scaling accordingly. We explained the reasons for these changes in the detailed writeup. There, we covered characteristics of the CQL protocol (such as the per-item overhead [compared to memcached] and no support for pipelining) which make wide-partitions more efficient on ScyllaDB than single-key fetches. With these adjustments, our loaders ran a total of 187K read ops/second over 30 minutes. Each operation resulted in 16 rows getting retrieved. Similarly to memcached, ScyllaDB also maximized the NIC throughput. It served roughly 3M rows/second solely from in-memory data: ScyllaDB Server Network Traffic as reported by node_exporter Number of read operations (left) and rows being hit (right) from cache during the exercise ScyllaDB exposes server-side latency information, which is useful for analyzing latency without the network. During the test, ScyllaDB’s server-side p99 latency remained within 1ms bounds: Latency and Network traffic from ScyllaDB matching the adjustments done The client-side percentiles are, unsurprisingly, higher than the server-side latency with a read P99 of 0.9ms. cassandra-stress P99 latency histogram Takeaways Both Memcached and ScyllaDB fully saturated the network; to prevent saturating the maximum network packets per second, Memcached relied on request pipelining whereas ScyllaDB was switched to a wide-column orientation. ScyllaDB’s cache showed considerable gains following a wide-column schema, able to store more items compared to the previous simple key-value orientation. On the protocol level, Memcached’s protocol is simpler and lightweight, whereas ScyllaDB’s CQL provides richer features but can be heavier. Adding Disks to the Picture Measuring flash storage performance introduces its own set of challenges, which makes it almost impossible to fully characterize a given workload realistically. For disk-related tests, we decided to measure the most pessimistic situation: Compare both solutions serving data (mostly) from block storage, knowing that: The likelihood of realistic workloads doing this is somewhere close to zero Users should expect numbers in between the previous optimistic cache workload and the pessimistic disk-bound workload in practice Memcached Extstore The Extstore wiki page provides extensive detail into the solution’s inner workings. At a high-level, it allows memcached to keep its hash table and keys in memory, but store values onto external storage. During our tests, we populated memcached with 1.25B items with a value size of 1KB and a keysize of up to 14 bytes: Evictions started as soon as we hit approximately 1.25B items, despite free disk space With Extstore, we stored around 11X the number of items compared to the previous in-memory workload until evictions started to kick in (as shown in the right hand panel in the image above). Even though 11X is an already impressive number, the total data stored on flash was only 1.25TB out of the total 3.5TB provided by the AWS instance. Read-Only Performance For the actual performance tests, we stressed Extstore against item sizes of 1KB and 8KB. The table below summarizes the results: Test Type Items per GET Payload Size IO Threads GET Rate P99 perfrun_metaget_pipe 16 1KB 32 188K/s 4~5 ms perfrun_metaget 1 1KB 32 182K/s <1ms perfrun_metaget_pipe 16 1KB 64 261K/s 5~6 ms perfrun_metaget 1 1KB 64 256K/s 1~2ms perfrun_metaget_pipe 16 8KB 16 92K/s 5~6 ms perfrun_metaget 1 8KB 16 90K/s <1ms perfrun_metaget_pipe 16 8KB 32 110K/s 3~4 ms perfrun_metaget 1 8KB 32 105K/s <1ms ScyllaDB We populated ScyllaDB with the same number of items as used for memcached. Although ScyllaDB showed higher GET rates than memcached, it did so under slightly higher tail latencies compared to memcached’s non-pipelining workloads. This is summarized below: Test Type Items per GET Payload Size GET Rate Server-side P99 Client-side P99 1KB Read 1 1KB 268.8K/s 2ms 2.4ms 8KB Read 1 8KB 156.8K/s 1.54ms 1.9ms Takeaways Extstore required considerable tuning to its settings in order to fully saturate flash storage I/O. Due to Memcached architecture, smaller payloads are unable to fully utilize the available disk space, providing smaller gains compared to ScyllaDB. ScyllaDB rates were overall higher than Memcached in a key-value orientation, especially under higher payload sizes. Latencies were better than pipelined requests, but slightly higher than individual GETs in Memcached. Overwrite Workload Following our previous Disk results, we then compared both solutions in a read-mostly workload targeting the same throughput (250K ops/sec). The workload in question is a slight modification of memcached’s ‘basic’ test for Extstore, with 10% random overwrites. It is considered a “semi-worst case scenario.”. Memcached Memcached achieved a rate of slightly under 249K during the test. Although the write rates remained steady during the duration of the test, we observed that reads fluctuated slightly throughout the run: Memcached: Read-mostly workload metrics We also observed slightly high extstore_io_queue metrics despite the lowered read ratios, but latencies still remained low. These results are summarized below: Operation IO Threads Rate P99 Latency cmd_get 64 224K/s 1~2 ms cmd_set 64 24.8K/s <1ms ScyllaDB The ScyllaDB test was run using 2 loaders, each with half of the target rate. Even though ScyllaDB achieved a slightly higher throughput (259.5K), the write latencies were kept low throughout the run and the read latencies were higher (similarly as with memcached): ScyllaDB: Read-mostly workload metrics The table below summarizes the client-side run results across the two loaders: Loader Rate Write P99 Read P99 loader1 124.9K/s 1.4ms 2.6 ms loader2 124.6K/s 1.3ms 2.6 ms Takeaways Both Memcached and ScyllaDB write rates were steady, with reads slightly fluctuating throughout the run ScyllaDB writes still account for the commitlog overhead, which sits in the hot write path ScyllaDB server-side latencies were similar to those observed in Memcached results, although client-side latencies were slightly higher Read a more detailed analysis in the Gitbook for this project Wrapping Up Both memcached and ScyllaDB managed to maximize the underlying hardware utilization across all tests and keep latencies predictably low. So which one should you pick? The real answer: It depends. If your existing workload can accommodate a simple key-value model and it benefits from pipelining, then memcached should be more suitable to your needs. On the other hand, if the workload requires support for complex data models, then ScyllaDB is likely a better fit. Another reason for sticking with Memcached: it easily delivers traffic far beyond what a NIC can sustain. In fact, in this Hacker News thread, dormando mentioned that he could scale it up past 55 million read ops/sec for a considerably larger server. Given that, you could make use of smaller and/or cheaper instance types to sustain a similar workload, provided the available memory and disk footprint suffice your workload needs. A different angle to consider is the data set size. Even though Extstore provides great cost savings by allowing you to store items beyond RAM, there’s a limit to how many keys can fit per GB of memory. Workloads with very small items should observe smaller gains compared to those with larger items. That’s not the case with ScyllaDB, which allows you to store billions of items irrespective of their sizes. It’s also important to consider whether data persistence is required. If it is, then running ScyllaDB as a replicated distributed cache provides you greater resilience and non-stop operations, with the tradeoff being (and as memcached correctly states) that replication halves your effective cache size. Unfortunately Extstore doesn’t support warm restarts and thus the failure or maintenance of a single node is prone to elevating your cache miss ratios. Whether this is acceptable or not depends on your application semantics: If a cache miss corresponds to a round-trip to the database, then the end-to-end latency will be momentarily higher. With regards to consistent hashing, memcached clients are responsible for distributing keys across your distributed servers. This may introduce some hiccups, as different client configurations will cause keys to be assigned differently, and some implementations may not be compatible with each other. These details are outlined in Memcached’s ConfiguringClient wiki. ScyllaDB takes a different approach: consistent hashing is done at the server level and propagated to clients when the connection is first established. This ensures that all connected clients always observe the same topology as you scale. So who won (or who lost)? Well… This does not have to be a competition, nor an exhaustive list outlining every single consideration for each solution. Both ScyllaDB and memcached use different approaches to efficiently utilize the underlying infrastructure. When configured correctly, both of them have the potential to provide great cost savings. We were pleased to see ScyllaDB matching the numbers of the industry-recognized Memcached. Of course, we had no expectations of our database being “faster.” In fact, as we approach microsecond latencies at scale, the definition of faster becomes quite subjective. 🙂 Continuing the Discussion at P99 CONF Reminder: dormando (Alan Kasindorf) and I will be discussing this project at P99 CONF, a highly technical conference on performance and low latency engineering. It’s free and virtual, October 23 and 24. I invite you to join, and bring your questions for us! Join us – it’s free and virtualApache Cassandra 5.0 and DataStax: The Benefits of Staying in Sync
As an Apache Cassandra® committer and long-time advocate, I'd like to walk you through the relationship between the open-source Cassandra project and DataStax. With the recent release of Cassandra 5.0, it's the perfect time to explore how this collaboration drives innovation and benefits the entire...Low-Latency Database Strategies Featured at P99 CONF 24
Obsessed with high-performance low-latency data systems? See the 20+ data-related tech talks sessions we’re hosting at P99 CONF 2024. P99 CONF is a (free + online) highly-technical conference for engineers who obsess over P99 percentiles and long-tail latencies. The open source, community-focused event is hosted by ScyllaDB, the company behind the monstrously fast and scalable NoSQL database (and the adorable one-eyed sea monster). Since database performance is so near and dear to ScyllaDB, we quite eagerly reached out to our friends and colleagues across the community to ensure a wide spectrum of distributed data systems, approaches, and challenges would be represented at P99 CONF. As you can see on our agenda, the response was overwhelming. This year’s attendees get to hear from – and interact with – the creators of Postgres, ScyllaDB, Turso, SlateDB, SpiceDB, Arroyo, Responsive, FerretDB, and Percona. We’re also looking forward to sessions with engineers from Redis, Oracle, TigerBeetle, AWS, QuestDB, and more. There’s a series of keynotes focused on rethinking the database, deep dives into database internals, and case studies of database engineering feats at organizations like Uber, Shopify, ShareChat, and LinkedIn. If you share our obsession with high-performance low-latency data systems, here’s a rundown of sessions to consider joining at P99 CONF 24. Register Now – It’s Free Just In Time LSM Compaction Aleksei Kladov (TigerBeetle) TigerBeetle is a reliable and fast accounting database. Its primary on-disk data structure is a log-structured merge-tree. This talk is a deep dive into TigerBeetle’s compaction algorithm — “garbage collection” for LSM, which achieves several unusual goals: Full storage determinism across replicas, enabling recovery from disk faults. Absence of dynamic memory allocation. Highly concurrent implementation, utilizing all available CPU and disk bandwidth. Perfect pacing: resources are carefully balanced between compaction and normal transaction processing, avoiding starvation and guaranteeing bounded P100. Time-Series and Analytical Databases Walk in a Bar… Andrei Pechkurov (QuestDB) A good time-series database also has to be a decent analytical database. This implies both SQL features and efficient query processing. That’s why we recently added many optimizations to QuestDB’s SQL engine, featuring better on-disk data layout, specialized data structures, SIMD and SWAR-based code, scalable aggregation algorithms, and parallel execution pipelines. Many of these additions can be met in popular databases, some are unique to QuestDB. In this talk, we will go through the most important optimizations, and discuss what’s yet to be added and where we are when compared with the fastest analytical databases. The Next Chapter in the Sordid Love/Hate Relationship Between DBs and OSes Andy Pavlo (Carnegie Mellon) Database management systems (DBMSs) are beautiful, free-spirited software that want nothing more than to help users store and access data as quickly as possible. To achieve this goal, DBMSs have spent decades trying to avoid operating systems (OSs) at all costs. Such avoidance is necessary because OSs always try to impose their will on DBMSs and stifle their ambitions through disingenuous syscall semantics, unscalable kernel-level data structures, and excessive data copying. The many attempts to avoid the OS through kernel-bypass methods or custom hardware have such high engineering/R&D costs that few DBMSs support them. In the end, DBMSs are stuck in an abusive relationship: they need the OS to run their software and provide them with basic functionalities (e.g., memory allocation), but they do not like how the OS treats them. However, new technologies like eBPF, which allow DBMSs to run custom code safely inside the OS kernel to override its functionality, are poised to upend this power struggle. In this talk, I will present a new design approach called “user-bypass” for building high-performance database systems and services with eBPF. I will discuss recent developments in eBPF relevant to the DBMS community and what parts of a DBMS are most amenable to using it. We will also present the design of BPF-DB, an embedded DBMS written in eBPF that provides ACID transactions over multi-versioned data and runs entirely in the Linux kernel. Designing a Query Queue for ScyllaDB Avi Kivity (ScyllaDB) Database queries can be highly variable. Some are served immediately from cache, return a single row, and are done in milliseconds. Others return gigabytes or terabytes of data, take minutes or hours, and require plenty of disk I/O and compute. Deciding what concurrency to use when running these queries is a delicate balance of CPU consumption, memory consumption, and the queue designer’s nerves. A bad design can mean high latency, under-utilization of available resources, or crashing when one runs out of memory. This talk will cover the decisions we made while designing ScyllaDB’s query queue. Reliable Data Replication Cameron Morgan (Shopify) Data replication is required to make data highly available to users. Highly available in this context means users can access data in a reliable, consistent and timely fashion. Because it is so important, if this problem has not come up in your work, you have certainly used such a system. This talk focuses on the hard problems of data replication, the ones that are usually skipped in talks. What is a backfill and why do I need them to be reliable, non-blocking and often? How do you handle schema changes? How do you validate the data is correct? How can you be resistant to failure? How can you write in parallel? This talk is about the hard problems Shopify solved replicating Shopify stores to the Edge and reaching ~5M rows replicated per second with < 1s replication lag p99. Rust: A Productive Language for Writing Database Applications Carl Lerche (AWS) When you think about Rust, you might think of performance, safety, and reliability, but what about productivity? Last year, I recommended considering Rust for developing high-level applications. Rust showed great promise, but its library ecosystem needed to mature. What has changed since then? Many higher-level applications sit on top of a database. In this talk, I will explore the current state of Rust libraries for database access, focusing on ergonomics and ease of use—two crucial factors in high-level database application development. Building a Cloud Native LSM on Object Storage Chris Riccomini (Materialized View) and Rohan Desai (Responsive) This talk discusses the design and implementation of SlateDB, an open source cloud native storage engine built as a log-structured merge-tree (LSM) on top of an object store like S3, Google Cloud Storage (GCS), or Azure Blob Store (ABS). LSMs are traditionally built assuming data will reside on local storage. Building an LSM on object storage allows SlateDB to benefit from object storage replication and durability guarantees while presenting unique latency and cost challenges. We’ll discuss the design decisions and tradeoffs we faced when building SlateDB. Taming Tail Latencies in Apache Pinot with Generational ZGC Christopher Peck (Uber) The introduction of Generational ZGC mostly eliminated concerns around pause time for Java applications. This session will cover a real world application of Generational ZGC and the effects. The session will also cover how application level configs/features can be used to offset some of the trade-offs we encountered when switching to Generational ZGC. Apache Pinot is an OLAP database with an emphasis on low latency. We’ll walk through how we solved large scatter gather induced tail latencies in our Pinot clusters by switching to Generational ZGC, uncovering the low latency query potential of Pinot. We’ll also a couple of Pinot’s features which made using Generational ZGC possible. Elevating PostgreSQL: Benchmarking Vector Search Performance Daniel Seybold (benchANT) The database market is constantly evolving with new database systems addressing specific use cases such as time series data or vector search. But there is one open source database system which has been around since nearly three decades and which has gained a strong momentum over the last years, PostgreSQL. Due its pure open source approach and strong community, PostgreSQL is continuously improving on its features, performance and extensions that enable PostgreSQL to handle also specific use cases such as vector search. Over the last years, multiple native vector database systems have been established and many NoSQL and relational database systems have released vector extensions for their database systems. The same goes for PostgreSQL with two available vector search extensions, pgvector and pgvecto.rs. And since vector search performance is a crucial differentiating factor for every vector search database, we report on the latest vector search benchmark results for PostgreSQL with pgvector and pgvecto.rs. The benchmarking study covers multiple data sets of varying vector dimensions, also considering different PostgreSQL configurations, from a baseline configuration to tuned configurations. Overcoming Distributed Databases Scaling Challenges with Tablets Dor Laor (ScyllaDB) Getting fantastic performance cannot stop at the server level. Even after rewriting your code in assembly, you would need multiple servers to run at scale and to provide availability. Clusters are often sharded to achieve good performance. In this session, I will cover tablets, a new dynamic sharding design we applied at ScyllaDB in order to maximize cpu and storage utilization dynamically and to maximize elasticity speed. Why Databases Cache, but Caches Go to Disk Felipe Mendes (ScyllaDB) and Alan Kasindorf (Cache Forge) Caches and Databases are different animals. Yet, databases have always cached data and caches are exploring disks. To quantify the strengths and tradeoffs of each, ScyllaDB joined forces with Memcached’s maintainer to compare both across different scenarios. Join us as we discuss how the results trace back to each underlying architectures’ design decisions. Specifically, we’ll compare ScyllaDB row-based cache with Memcached’s in-memory hash table, and look at how Memcached’s External Flash Storage compares to ScyllaDB’s userspace I/O scheduler and asynchronous AIO/DIO. Feature Store Evolution Under Cost Constraints: When Cost is Part of the Architecture Ivan Burmistrov and David Malinge (ShareChat) At P99 CONF 23, the ShareChat team presented the scaling challenges for the ML Feature Store so it could handle 1 billion features per second. Once the system was scaled to handle the load, the next challenge the team faced was extreme cost constraints: it was required to make the same quality system much cheaper to run. This year, we will talk about approaches the team implemented in order to optimize for cost in the Cloud environment while maintaining the same SLA for the service. The talk will touch on such topics as advanced optimizations on various levels to bring down the compute, minimizing the waste when running on Kubernetes, autoscaling challenges for stateful Apache Flink jobs, and others. The talk should be useful for those who are either interested in building or optimizing an ML Feature Store or in general looking into cost optimizations in the cloud environment. Running Low-Latency Workloads on Kubernetes Jimmy Zelinskie (authzed) Configuring Kubernetes to optimally run a particular workload is best described as a continuous journey. Depending on your requirements, best practices might not only no longer apply, but actively harm performance. In this session, we document what we’ve found to work best in our journey running SpiceDB, a low-latency authorization system. Cheating the Cloud: 50% Savings with Compression Dictionaries Łukasz Paszkowsk (ScyllaDB) Discover how to slash networking costs by up to 50% with advanced compression techniques. This session covers a real-world case where the default LZ4 compression in Cassandra, with its limited 25% efficiency, was causing high costs in inter-zone replication. We’ll introduce a custom RPC compressor with external dictionary support that samples RPC traffic, trains optimized dictionaries, and seamlessly switches connections to these new dictionaries. Learn how this approach dramatically improves compression ratios, reduces cloud expenses, and enhances data transfer efficiency across distributed systems. It’s perfect for those looking to optimize cloud infrastructure. Latency, Throughput & Fault Tolerance: Designing the Arroyo Streaming Engine Micah Wylde (Arroyo) Arroyo is a distributed, stateful stream processing engine written in Rust. It combines predictable millisecond-latency processing with the throughput of a high-performance batch query engine—on top of a distributed checkpointing implementation that provides fault tolerance and exactly-once processing. These design goals are often in tension: increasing throughput generally comes at the expense of latency, and consistent checkpointing can introduce periodic latency spikes while we wait for alignment and IO. In this talk, I will cover the distributed architecture and implementation of Arroyo including the core Arrow-based dataflow engine, algorithms for stateful windowing and aggregates, and the Chandy-Lamport inspired distributed checkpointing system. You’re Doing It All Wrong Michel Stonebraker (MIT, Postgres creator) In this talk, we consider business data processing applications, which have historically been written for a three-tier architecture. Two ideas totally upset this applecart. Idea #1: The Cloud All enterprises are moving everything possible to the cloud as quickly as possible. In this new environment, you are highly encouraged to use a cloud-native architecture, whereby your system is composed of distributed functions, working in parallel, and running on a serverless (and stateless) platform like AWS Lambda or Azure Functions. You program your application as a workflow of “steps.” To make systems resilient to failures you require a separate state machine and workflow manager (e.g., AWS Step Functions, Airflow, etc.). If you use this architecture, then you don’t pay for resources when your application is idle, often a major benefit. Depending on the platform, you may also get automatic resource elasticity and load balancing; additional major benefits. Idea #2: Leverage the DBMS Obviously, your data belongs in a DBMS. However, by extension, so does the state of your application. Keeping track of application state in the DBMS allows one to provide once-and-only-once execution semantics for your workflow. One can also use the database concept of “sagas” to allow multi-transaction applications to be done to completion or not at all. Furthermore, to go an order of magnitude faster that AWS Lambda, you need to collocate your application and the DBMS. The fastest alternative is to run your application inside the DBMS using stored procedures (SPs). However, it is imperative to overcome SP weaknesses, specifically the requirement of a different language (e.g.PL/SQL) and the absence of a debugging environment. The latter can be accomplished by persisting the database log and allowing “time travel debugging” for SPs. The former can be supported by coding SPs in a conventional language such as Typescript. Extending this idea to the operating environment, one can time travel the entire system, thereby allowing recovery to a previous point in time when disasters happen (errant programs, adversary intrusions, ransomware, etc.). I will discuss one such platform (DBOS) with all of the above features. In my opinion, this is an example of why “you are doing it all wrong.” Taming Discard Latency Spikes Patryk Wróbel (ScyllaDB) Discover an interesting lesson related to the impact of discarding files on read and write latency on modern NVMe SSDs, learned while fixing a real-world problem in ScyllaDB. Dive into the way how TRIM requests are issued when online discard is enabled for the XFS file system, the problems that may occur, and possible solutions. Redis Alternatives Compared Peter Zaitsev (Percona) In my talk, I will delve into a variety of Redis alternatives, providing an unbiased analysis that encompasses emerging forks like Valley and Redix, established contenders such as DragonflyDB and KeyDB, and unique options like Microsoft Garnet and Redka. My presentation will cover critical aspects of these alternatives, including licensing models and their implications, comparisons of feature sets and functionality, governance and community support structures, and performance considerations tailored to different use cases. You will leave with a clearer understanding of how each alternative could meet specific needs, insights into open source compliance and licensing, and an appreciation of the importance of performance and support options in choosing the right solution. Join me to clarify your options and strategize your approach in the ever-changing world of Redis alternatives. Database Drivers: Performance Perspectives Piot Sarna (poolside) This talk explains how to get the most out of database drivers by understanding their design and potential. It’s a technical deep dive into how database drivers work underneath, and how to adjust their performance to your expectations. Using eBPF Off-CPU Sampling to See What Your Databases Are Really Waiting For Tanel Poder At P99 CONF 23, I introduced the general concept of using eBPF-populated Task State Arrays to keep track of all Linux applications’ (including database engines) thread states and activity without relying on the built-in instrumentation of the application. For example, the “wait events” built into database engines are not perfect; some voluntary waits (system calls) are not properly instrumented in all database engines. There are also other involuntary waits caused by OS-level issues, like memory allocation stalls, CPU queuing, and task scheduler glitches. This year, I will show the latest eBPF-based “xcapture” tool in practical use, measuring where MySQL, Postgres, and DuckDB really spend their time, both when on CPU and sleeping. All this can be done without having to change any source code of the database engine or applications running on it. Java Heap Memory Optimization to Improve P99 Query Latency at LinkedIn Scale Vivek Iyer Vaidyanathan Iyer (LinkedIn) Apache Pinot is a real-time, distributed, OLAP database designed to serve low-latency SQL queries at high throughput. It was built and open-sourced by Linkedin and powers many site facing use cases for low latency realtime analytics. Pinot Servers, the work-horses of SQL query processing, store table shards on local SSDs and memory map the columnar data buffers (data, indexes etc). In some specialized use cases where we have P99 query SLA under 100ms, the column buffers are loaded on Java heap as opposed to off heap (memory map). The data in these on heap column buffers are characterized by high cardinality, featuring a high number of unique objects alongside a notable abundance of DUPLICATE objects. Duplicate Objects waste almost 20% of the JVM heap per host. The memory-intensive nature of our OnHeap dictionary indexes leads to high Java Heap usage resulting in spiky P99 latencies due to the unpredictable nature of Java Garbage Collection. This talk will challenge the conventional notion that discourages the use of Interning methodologies and showcase how the Pinot production deployments at LinkedIn saw 20% heap savings per host along while improving P99 query latencies by 35% using a home-grown, efficient strategy of FALF Interning – Fixed-Size Array Lock-Free Interning. Designed as a small, fixed-size, open-hashmap-based object cache that duplicates objects opportunistically, these Interners work on all object types and are 17% faster than the traditional Interners. In this talk, we will present on how we used the JXRAY memory analysis to discover the problem, design, implementation and the P99 query latency improvements we observed in production @ LinkedIn Scale. We will discuss the general challenges to solve the duplicate objects problem for Java-based systems where the traditional methods of tuning JVM parameters, employing native or Guava Interners don’t work. Join the Conference Online – It’s FreeUnderstanding Distributed System Performance… from the Grocery Store
Learn essential steps for boosting distributed system performance– explained with grocery store checkout analogies. I visited a small local grocery store which happens to be in a touristy part of my neighborhood. If you’ve ever traveled abroad, then you’ve probably visited a store like that to stock up on bottled water without purchasing the overpriced hotel equivalent. This was one of these stores. To my misfortune, my visit happened to coincide with a group of tourists arriving all at once to buy beverages and warm up (it’s winter!). It just so happens that selecting beverages is often much faster than buying fruit – the reason for my visit. So after I had selected some delicious apples and grapes, I ended up waiting in line behind 10 people. And there was a single cashier to serve us all. The tourists didn’t seem to mind the wait (they were all chatting in line), but I sure wish that the store had more cashiers so I could get on with my day faster. What does this have to do with system performance? You’ve probably experienced a similar situation yourself and have your own tale to tell. It happens so frequently that sometimes we forget how applicable these situations can be to other domain areas, including distributed systems.Sometimes when you evaluate a new solution, the results don’t meet your expectations. Why is latency high? Why is the throughput so low? Those are two of the top questions that pop up every now and then. Many times, the challenges can be resolved by optimizing your performance testing approach, as well as better maximizing your solution’s potential. As you’ll realize, improving the performance of a distributed system is a lot like ensuring speedy checkouts in a grocery store. This blog covers 7 performance-focused steps for you to follow as you evaluate distributed systems performance. Step #1: Measure Time With groceries, the first step towards doing any serious performance optimization is to precisely measure how long it takes for a single cashier to scan a barcode. Some goods, like bulk fruits that require weighing, may take longer to scan than products in industrial packaging. A common misconception is that processing happens in parallel. It does not (note: we’re not referring to capabilities like SIMD and pipelining here). Cashiers do not service more than a single person at a time, nor do they scan your products’ barcodes simultaneously. Likewise, a single CPU in a system will process one work unit at a time, no matter how many requests are sent to it. In a distributed system, consider all the different work units you have and execute them in an isolated way against a single shard. Execute your different items with single-threaded execution and measure how many requests per second the system can process. Eventually, you may learn that different requests get processed at different rates. For example, if the system is able to process a thousand 1 KB requests/sec, the average latency is 1ms. Similarly, if throughput is 500 requests/sec for a larger payload size, then the average latency is 2ms. Step #2: Find the Saturation Point A cashier is never scanning barcodes all the time. Sometimes, they will be idle waiting for customers to place their items onto the checkout counter, or waiting for payment to complete. This introduces delays you’ll typically want to avoid. Likewise, every request your client submits against a system incurs, for example, network round trip time – and you will always pay a penalty under low concurrency. To eliminate this idleness and further increase throughput, simply increase the concurrency. Do it in small increments until you observe that the throughput saturates and the latency starts to grow. Once you reach that point, congratulations! You effectively reached the system’s limits. In other words, unless you manage to get your work items processed faster (for example, by reducing the payload size) or tune the system to work more efficiently with your workload, you won’t achieve gains past that point. You definitely don’t want to find yourself in a situation where you are constantly pushing the system against its limits, though. Once you reach the saturation area, fall back to lower concurrency numbers to account for growth and unpredictability. Step #3: Add More Workers If you live in a busy area, grocery store demand might be beyond what a single cashier can sustain. Even if the store happened to hire the fastest cashier in the world, they would still be busy as demand/concurrency increases. Once the saturation point is reached it is time to hire more workers. In the distributed systems case, this means adding more shards to the system to scale throughput under the latency you’ve previously measured. This leads us to the following formula: Number of Workers = Target Throughput / Single worker limit You already discovered the performance limits of a single worker in the previous exercise. To find the total number of workers you need, simply divide your target throughput by how much a single worker can sustain under your defined latency requirements. Distributed systems like ScyllaDB provide linear scale, which simplifies the math (and total cost of ownership [TCO]). In fact, as you add more workers, chances are that you’ll achieve even higher rates than under a single worker. The reason is due to Network IRQs, and out of scope for this write-up (but see this perftune docs page for some details). Step #4: Increase Parallelism Think about it. The total time to check out an order is driven by the number of items in a cart divided by the speed of a single cashier. Instead of adding all the pressure on a single cashier, wouldn’t it be far more efficient to divide the items in your shopping cart (our work) and distribute them among friends who could then check out in parallel? Sometimes the number of work items you need to process might not be evenly split across all available cashiers. For example, if you have 100 items to check out, but there are only 5 cashiers, then you would route 20 items per counter. You might wonder: “Why shouldn’t I instead route only 5 customers with 20 items each?” That’s a great question – and you probably should do that, rather than having the store’s security kick you out. When designing real-time low latency OLTP systems, however, you mostly care about the time it takes for a single work unit to get processed. Although it is possible to “batch” multiple requests against a single shard, it is far more difficult (though not impossible) to consistently accomplish that task in such a way that every item is owned by that specific worker. The solution is to always ensure you dispatch individual requests one at a time. Keep concurrency high enough to overcome external delays like client processing time and network RTT, and introduce more clients for higher parallelism. Step #5: Avoid Hotspots Even after multiple cashiers get hired, it sometimes happens that a long line of customers queue after a handful of them. More often than not you should be able to find less busy – or even totally free – cashiers simply by walking through the hallway. This is known as a hotspot, and it often gets triggered due to unbound concurrency. It manifests in multiple ways. A common situation is when you have a traffic spike to a few popular items (load). That momentarily causes a single worker to queue a considerable amount of requests. Another example: low cardinality (uneven data distribution) prevents you from fully benefiting from the increased workforce. There’s also another commonly overlooked situation that frequently arises. It’s when you dispatch too much work against a single worker to coordinate, and that single worker depends on other workers to complete that task. Let’s get back to the shopping analogy: Assume you’ve found yourself on a blessed day as you approach the checkout counters. All cashiers are idle and you can choose any of them. After most of your items get scanned, you say “Dear Mrs. Cashier, I want one of those whiskies sitting in your locked closet.” The cashier then calls for another employee to pick up your order. A few minutes later, you realize: “Oops, I forgot to pick up my toothpaste,” and another idling cashier nicely goes and picks it up for you. This approach introduces a few problems. First, your payment needs to be aggregated by a single cashier – the one you ran into when you approached the checkout counter. Second, although we parallelized, the “main” cashier will be idle waiting for their completion, adding delays. Third, further delays may be introduced in between each additional and individual request completion: for example, when the keys of the locked closet are only held by a single employee, so the total latency will be driven by the slowest response. Consider the following pseudocode: See that? Don’t do that. The previous pattern works nicely when there is a single work unit (or shard) to route requests to. Key-value caches are a great example of how multiple requests can get pipelined all together for higher efficiency. As we introduce sharding into the picture, this becomes a great way to undermine your latencies given the previously outlined reasons. Step #6: Limit Concurrency When more clients are introduced, it’s like customers inadvertently ending up at the supermarket during rush hour. Suddenly, they can easily end up in a situation where many clients all decide to queue under a handful of cashiers. You previously discovered the maximum concurrency at which a single shard can service requests. These are hard numbers and – as you observed during small scale testing – you won’t see any benefits if you try to push requests further. The formula goes like this: Concurrency = Throughput * Latency If a single shard sustains up to 5K ops/second under an average latency of 1 ms, then you can execute up to 5 concurrent in-flight requests at all times. Later you added more shards to scale that throughput. Say you scaled to 20 shards for a total throughput goal of 100K ops/second. Intuitively, you would think that your maximum useful concurrency would become 100. But there’s a problem. Introducing more shards to a distributed system doesn’t increase the maximum concurrency that a single shard can handle. To continue the shopping analogy, a single cashier will continue to scan barcodes at a fixed rate – and if several customers line up waiting to get serviced, their wait time will increase. To mitigate (though not necessarily prevent) that situation, divide the maximum useful concurrency among the number of clients. For example, if you’ve got 10 clients and a maximum useful concurrency of 100, then each client should be able to queue up to 10 requests across all available shards. This generally works when your requests are evenly distributed. However, it can still backfire when you have a certain degree of imbalance. Say all 10 clients decided to queue at least one request under the same shard. At a given point in time, that shard’s concurrency climbed to 10, double our initially discovered maximum concurrency. As a result, latency increases, and so does your P99. There are different approaches to prevent that situation. The right one to follow depends on your application and use case semantics. One option is to limit your client concurrency even further to minimize its P99 impact. Another strategy is to throttle at the system level, allowing each shard to shed requests as soon as it queues past a certain threshold. Step #7: Consider Background Operations Cashiers do not work at their maximum speed at all times. Sometimes, they inevitably slow down. They drink water, eat lunch, go to the restroom, and eventually change shifts. That’s life! It is now time for real-life production testing. Apply what you’ve learned so far and observe how the system behaves over long periods of time. Distributed systems often need to run background maintenance activities (like compactions and repairs) to keep things running smoothly. In fact, that’s precisely the reason why I recommended that you stay away from the saturation area at the beginning of this article. Background tasks inevitably consume system resources, and are often tricky to diagnose. I commonly receive reports like “We observed a latency increase due to compactions”, only to find out later the actual cause was something else – for example, a spike in queued requests to a given shard. Irrespective of the cause, don’t try to “throttle” system tasks. They exist and need to run for a reason. Throttling their execution will likely backfire on you eventually. Yes, background tasks slow down a given shard momentarily (that’s normal!). Your application should simply prefer other less busy replicas (or cashiers) when it happens. For a great detailed discussion of these points, see Brian Taylor’s insights during his How to Maximize Database Concurrency talk. Applying These Steps Hopefully, you are now empowered to address questions like “why latency is high”, or “why throughput is so low”. As you start evaluating performance, start small. This minimizes costs, and gives you fine-grained control during each step. If latencies are sub-optimal under small scale, it either means you are pushing a single shard too hard, or that your expectations are off. Do not engage in larger scale testing until you are happy with the performance a single shard gives you. Once you feel comfortable with the performance of a single shard, scale capacity accordingly. Keep an eye on concurrency at all times and watch out for imbalances, mitigating or preventing them as needed. When you find yourself in a situation where throughput no longer increases but the system is idling, add more clients to increase parallelism. These concepts generally apply to every distributed system out there, including ScyllaDB. Our shard-per-core architecture linearly scales, making it easy for you to follow through the steps we discussed here. If you’d like to know more on how we can help, book a technical session with us.Introducing Netflix’s Key-Value Data Abstraction Layer
Vidhya Arvind, Rajasekhar Ummadisetty, Joey Lynch, Vinay Chella
Introduction
At Netflix our ability to deliver seamless, high-quality, streaming experiences to millions of users hinges on robust, global backend infrastructure. Central to this infrastructure is our use of multiple online distributed databases such as Apache Cassandra, a NoSQL database known for its high availability and scalability. Cassandra serves as the backbone for a diverse array of use cases within Netflix, ranging from user sign-ups and storing viewing histories to supporting real-time analytics and live streaming.
Over time as new key-value databases were introduced and service owners launched new use cases, we encountered numerous challenges with datastore misuse. Firstly, developers struggled to reason about consistency, durability and performance in this complex global deployment across multiple stores. Second, developers had to constantly re-learn new data modeling practices and common yet critical data access patterns. These include challenges with tail latency and idempotency, managing “wide” partitions with many rows, handling single large “fat” columns, and slow response pagination. Additionally, the tight coupling with multiple native database APIs — APIs that continually evolve and sometimes introduce backward-incompatible changes — resulted in org-wide engineering efforts to maintain and optimize our microservice’s data access.
To overcome these challenges, we developed a holistic approach that builds upon our Data Gateway Platform. This approach led to the creation of several foundational abstraction services, the most mature of which is our Key-Value (KV) Data Abstraction Layer (DAL). This abstraction simplifies data access, enhances the reliability of our infrastructure, and enables us to support the broad spectrum of use cases that Netflix demands with minimal developer effort.
In this post, we dive deep into how Netflix’s KV abstraction works, the architectural principles guiding its design, the challenges we faced in scaling diverse use cases, and the technical innovations that have allowed us to achieve the performance and reliability required by Netflix’s global operations.
The Key-Value Service
The KV data abstraction service was introduced to solve the persistent challenges we faced with data access patterns in our distributed databases. Our goal was to build a versatile and efficient data storage solution that could handle a wide variety of use cases, ranging from the simplest hashmaps to more complex data structures, all while ensuring high availability, tunable consistency, and low latency.
Data Model
At its core, the KV abstraction is built around a two-level map architecture. The first level is a hashed string ID (the primary key), and the second level is a sorted map of a key-value pair of bytes. This model supports both simple and complex data models, balancing flexibility and efficiency.
HashMap<String, SortedMap<Bytes, Bytes>>
For complex data models such as structured Records or time-ordered Events, this two-level approach handles hierarchical structures effectively, allowing related data to be retrieved together. For simpler use cases, it also represents flat key-value Maps (e.g. id → {"" → value}) or named Sets (e.g.id → {key → ""}). This adaptability allows the KV abstraction to be used in hundreds of diverse use cases, making it a versatile solution for managing both simple and complex data models in large-scale infrastructures like Netflix.
The KV data can be visualized at a high level, as shown in the diagram below, where three records are shown.
message Item (
Bytes key,
Bytes value,
Metadata metadata,
Integer chunk
)
Database Agnostic Abstraction
The KV abstraction is designed to hide the implementation details of the underlying database, offering a consistent interface to application developers regardless of the optimal storage system for that use case. While Cassandra is one example, the abstraction works with multiple data stores like EVCache, DynamoDB, RocksDB, etc…
For example, when implemented with Cassandra, the abstraction leverages Cassandra’s partitioning and clustering capabilities. The record ID acts as the partition key, and the item key as the clustering column:
The corresponding Data Definition Language (DDL) for this structure in Cassandra is:
CREATE TABLE IF NOT EXISTS <ns>.<table> (
id text,
key blob,
value blob,
value_metadata blob,
PRIMARY KEY (id, key))
WITH CLUSTERING ORDER BY (key <ASC|DESC>)
Namespace: Logical and Physical Configuration
A namespace defines where and how data is stored, providing logical and physical separation while abstracting the underlying storage systems. It also serves as central configuration of access patterns such as consistency or latency targets. Each namespace may use different backends: Cassandra, EVCache, or combinations of multiple. This flexibility allows our Data Platform to route different use cases to the most suitable storage system based on performance, durability, and consistency needs. Developers just provide their data problem rather than a database solution!
In this example configuration, the ngsegment namespace is backed by both a Cassandra cluster and an EVCache caching layer, allowing for highly durable persistent storage and lower-latency point reads.
"persistence_configuration":[
{
"id":"PRIMARY_STORAGE",
"physical_storage": {
"type":"CASSANDRA",
"cluster":"cassandra_kv_ngsegment",
"dataset":"ngsegment",
"table":"ngsegment",
"regions": ["us-east-1"],
"config": {
"consistency_scope": "LOCAL",
"consistency_target": "READ_YOUR_WRITES"
}
}
},
{
"id":"CACHE",
"physical_storage": {
"type":"CACHE",
"cluster":"evcache_kv_ngsegment"
},
"config": {
"default_cache_ttl": 180s
}
}
]
Key APIs of the KV Abstraction
To support diverse use-cases, the KV abstraction provides four basic CRUD APIs:
PutItems — Write one or more Items to a Record
The PutItems API is an upsert operation, it can insert new data or update existing data in the two-level map structure.
message PutItemRequest (
IdempotencyToken idempotency_token,
string namespace,
string id,
List<Item> items
)
As you can see, the request includes the namespace, Record ID, one or more items, and an idempotency token to ensure retries of the same write are safe. Chunked data can be written by staging chunks and then committing them with appropriate metadata (e.g. number of chunks).
GetItems — Read one or more Items from a Record
The GetItemsAPI provides a structured and adaptive way to fetch data using ID, predicates, and selection mechanisms. This approach balances the need to retrieve large volumes of data while meeting stringent Service Level Objectives (SLOs) for performance and reliability.
message GetItemsRequest (
String namespace,
String id,
Predicate predicate,
Selection selection,
Map<String, Struct> signals
)
The GetItemsRequest includes several key parameters:
- Namespace: Specifies the logical dataset or table
- Id: Identifies the entry in the top-level HashMap
- Predicate: Filters the matching items and can retrieve all items (match_all), specific items (match_keys), or a range (match_range)
- Selection: Narrows returned responses for example page_size_bytes for pagination, item_limit for limiting the total number of items across pages and include/exclude to include or exclude large values from responses
- Signals: Provides in-band signaling to indicate client capabilities, such as supporting client compression or chunking.
The GetItemResponse message contains the matching data:
message GetItemResponse (
List<Item> items,
Optional<String> next_page_token
)
- Items: A list of retrieved items based on the Predicate and Selection defined in the request.
- Next Page Token: An optional token indicating the position for subsequent reads if needed, essential for handling large data sets across multiple requests. Pagination is a critical component for efficiently managing data retrieval, especially when dealing with large datasets that could exceed typical response size limits.
DeleteItems — Delete one or more Items from a Record
The DeleteItems API provides flexible options for removing data, including record-level, item-level, and range deletes — all while supporting idempotency.
message DeleteItemsRequest (
IdempotencyToken idempotency_token,
String namespace,
String id,
Predicate predicate
)
Just like in the GetItems API, the Predicate allows one or more Items to be addressed at once:
- Record-Level Deletes (match_all): Removes the entire record in constant latency regardless of the number of items in the record.
- Item-Range Deletes (match_range): This deletes a range of items within a Record. Useful for keeping “n-newest” or prefix path deletion.
- Item-Level Deletes (match_keys): Deletes one or more individual items.
Some storage engines (any store which defers true deletion) such as Cassandra struggle with high volumes of deletes due to tombstone and compaction overhead. Key-Value optimizes both record and range deletes to generate a single tombstone for the operation — you can learn more about tombstones in About Deletes and Tombstones.
Item-level deletes create many tombstones but KV hides that storage engine complexity via TTL-based deletes with jitter. Instead of immediate deletion, item metadata is updated as expired with randomly jittered TTL applied to stagger deletions. This technique maintains read pagination protections. While this doesn’t completely solve the problem it reduces load spikes and helps maintain consistent performance while compaction catches up. These strategies help maintain system performance, reduce read overhead, and meet SLOs by minimizing the impact of deletes.
Complex Mutate and Scan APIs
Beyond simple CRUD on single Records, KV also supports complex multi-item and multi-record mutations and scans via MutateItems and ScanItems APIs. PutItems also supports atomic writes of large blob data within a single Item via a chunked protocol. These complex APIs require careful consideration to ensure predictable linear low-latency and we will share details on their implementation in a future post.
Design Philosophies for reliable and predictable performance
Idempotency to fight tail latencies
To ensure data integrity the PutItems and DeleteItems APIs use idempotency tokens, which uniquely identify each mutative operation and guarantee that operations are logically executed in order, even when hedged or retried for latency reasons. This is especially crucial in last-write-wins databases like Cassandra, where ensuring the correct order and de-duplication of requests is vital.
In the Key-Value abstraction, idempotency tokens contain a generation timestamp and random nonce token. Either or both may be required by backing storage engines to de-duplicate mutations.
message IdempotencyToken (
Timestamp generation_time,
String token
)
At Netflix, client-generated monotonic tokens are preferred due to their reliability, especially in environments where network delays could impact server-side token generation. This combines a client provided monotonic generation_time timestamp with a 128 bit random UUID token. Although clock-based token generation can suffer from clock skew, our tests on EC2 Nitro instances show drift is minimal (under 1 millisecond). In some cases that require stronger ordering, regionally unique tokens can be generated using tools like Zookeeper, or globally unique tokens such as a transaction IDs can be used.
The following graphs illustrate the observed clock skew on our Cassandra fleet, suggesting the safety of this technique on modern cloud VMs with direct access to high-quality clocks. To further maintain safety, KV servers reject writes bearing tokens with large drift both preventing silent write discard (write has timestamp far in past) and immutable doomstones (write has a timestamp far in future) in storage engines vulnerable to those.
Handling Large Data through Chunking
Key-Value is also designed to efficiently handle large blobs, a common challenge for traditional key-value stores. Databases often face limitations on the amount of data that can be stored per key or partition. To address these constraints, KV uses transparent chunking to manage large data efficiently.
For items smaller than 1 MiB, data is stored directly in the main backing storage (e.g. Cassandra), ensuring fast and efficient access. However, for larger items, only the id, key, and metadata are stored in the primary storage, while the actual data is split into smaller chunks and stored separately in chunk storage. This chunk storage can also be Cassandra but with a different partitioning scheme optimized for handling large values. The idempotency token ties all these writes together into one atomic operation.
By splitting large items into chunks, we ensure that latency scales linearly with the size of the data, making the system both predictable and efficient. A future blog post will describe the chunking architecture in more detail, including its intricacies and optimization strategies.
Client-Side Compression
The KV abstraction leverages client-side payload compression to optimize performance, especially for large data transfers. While many databases offer server-side compression, handling compression on the client side reduces expensive server CPU usage, network bandwidth, and disk I/O. In one of our deployments, which helps power Netflix’s search, enabling client-side compression reduced payload sizes by 75%, significantly improving cost efficiency.
Smarter Pagination
We chose payload size in bytes as the limit per response page rather than the number of items because it allows us to provide predictable operation SLOs. For instance, we can provide a single-digit millisecond SLO on a 2 MiB page read. Conversely, using the number of items per page as the limit would result in unpredictable latencies due to significant variations in item size. A request for 10 items per page could result in vastly different latencies if each item was 1 KiB versus 1 MiB.
Using bytes as a limit poses challenges as few backing stores support byte-based pagination; most data stores use the number of results e.g. DynamoDB and Cassandra limit by number of items or rows. To address this, we use a static limit for the initial queries to the backing store, query with this limit, and process the results. If more data is needed to meet the byte limit, additional queries are executed until the limit is met, the excess result is discarded and a page token is generated.
This static limit can lead to inefficiencies, one large item in the result may cause us to discard many results, while small items may require multiple iterations to fill a page, resulting in read amplification. To mitigate these issues, we implemented adaptive pagination which dynamically tunes the limits based on observed data.
Adaptive Pagination
When an initial request is made, a query is executed in the storage engine, and the results are retrieved. As the consumer processes these results, the system tracks the number of items consumed and the total size used. This data helps calculate an approximate item size, which is stored in the page token. For subsequent page requests, this stored information allows the server to apply the appropriate limits to the underlying storage, reducing unnecessary work and minimizing read amplification.
While this method is effective for follow-up page requests, what happens with the initial request? In addition to storing item size information in the page token, the server also estimates the average item size for a given namespace and caches it locally. This cached estimate helps the server set a more optimal limit on the backing store for the initial request, improving efficiency. The server continuously adjusts this limit based on recent query patterns or other factors to keep it accurate. For subsequent pages, the server uses both the cached data and the information in the page token to fine-tune the limits.
In addition to adaptive pagination, a mechanism is in place to send a response early if the server detects that processing the request is at risk of exceeding the request’s latency SLO.
For example, let us assume a client submits a GetItems request with a per-page limit of 2 MiB and a maximum end-to-end latency limit of 500ms. While processing this request, the server retrieves data from the backing store. This particular record has thousands of small items so it would normally take longer than the 500ms SLO to gather the full page of data. If this happens, the client would receive an SLO violation error, causing the request to fail even though there is nothing exceptional. To prevent this, the server tracks the elapsed time while fetching data. If it determines that continuing to retrieve more data might breach the SLO, the server will stop processing further results and return a response with a pagination token.
This approach ensures that requests are processed within the SLO, even if the full page size isn’t met, giving clients predictable progress. Furthermore, if the client is a gRPC server with proper deadlines, the client is smart enough not to issue further requests, reducing useless work.
If you want to know more, the How Netflix Ensures Highly-Reliable Online Stateful Systems article talks in further detail about these and many other techniques.
Signaling
KV uses in-band messaging we call signaling that allows the dynamic configuration of the client and enables it to communicate its capabilities to the server. This ensures that configuration settings and tuning parameters can be exchanged seamlessly between the client and server. Without signaling, the client would need static configuration — requiring a redeployment for each change — or, with dynamic configuration, would require coordination with the client team.
For server-side signals, when the client is initialized, it sends a handshake to the server. The server responds back with signals, such as target or max latency SLOs, allowing the client to dynamically adjust timeouts and hedging policies. Handshakes are then made periodically in the background to keep the configuration current. For client-communicated signals, the client, along with each request, communicates its capabilities, such as whether it can handle compression, chunking, and other features.
KV Usage @ Netflix
The KV abstraction powers several key Netflix use cases, including:
- Streaming Metadata: High-throughput, low-latency access to streaming metadata, ensuring personalized content delivery in real-time.
- User Profiles: Efficient storage and retrieval of user preferences and history, enabling seamless, personalized experiences across devices.
- Messaging: Storage and retrieval of push registry for messaging needs, enabling the millions of requests to flow through.
- Real-Time Analytics: This persists large-scale impression and provides insights into user behavior and system performance, moving data from offline to online and vice versa.
Future Enhancements
Looking forward, we plan to enhance the KV abstraction with:
- Lifecycle Management: Fine-grained control over data retention and deletion.
- Summarization: Techniques to improve retrieval efficiency by summarizing records with many items into fewer backing rows.
- New Storage Engines: Integration with more storage systems to support new use cases.
- Dictionary Compression: Further reducing data size while maintaining performance.
Conclusion
The Key-Value service at Netflix is a flexible, cost-effective solution that supports a wide range of data patterns and use cases, from low to high traffic scenarios, including critical Netflix streaming use-cases. The simple yet robust design allows it to handle diverse data models like HashMaps, Sets, Event storage, Lists, and Graphs. It abstracts the complexity of the underlying databases from our developers, which enables our application engineers to focus on solving business problems instead of becoming experts in every storage engine and their distributed consistency models. As Netflix continues to innovate in online datastores, the KV abstraction remains a central component in managing data efficiently and reliably at scale, ensuring a solid foundation for future growth.
Acknowledgments: Special thanks to our stunning colleagues who contributed to Key Value’s success: William Schor, Mengqing Wang, Chandrasekhar Thumuluru, Rajiv Shringi, John Lu, George Cambell, Ammar Khaku, Jordan West, Chris Lohfink, Matt Lehman, and the whole online datastores team (ODS, f.k.a CDE).
Introducing Netflix’s Key-Value Data Abstraction Layer was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.
ScyllaDB’s Rust Developer Workshop: What We Learned
A recap of the recent Rust developer workshop, where we built and refactored a high-performance Rust app for real-time data streaming (with ScyllaDB and Redpanda). Felipe Cardeneti Mendes (ScyllaDB Technical Director) and I recently got together with a couple thousand curious Rustaceans for a ScyllaDB Rust Developer Workshop. The agenda: walk through how we built and refactored a high-performance Rust app for real-time data streaming. We promised to show developers, engineers, and architects how to: Create and compile a sample social media app with Rust Connect the application to ScyllaDB (NoSQL data store) and Redpanda (streaming data) Negotiate tradeoffs related to data modeling and querying Manage and monitor the database for consistently low latencies This blog post is a quick recap of what we covered. Hopefully, it’s a nice wrapup for those who joined us live. If you missed it, you can still watch the recording (uncut – complete with a little cat chasing!). And feel free to ping me or Felipe with any questions you have. Access the workshop now Attend P99 CONF (free + virtual) to watch Rust tech talks First Things First First, I wanted to cover how I approach an existing, legacy codebase that Felipe so kindly generated for the workshop. I think it’s really important to respect everyone who interacts with code – past, present and future. That mindset helps foster good collaboration and leads to more maintainable and high quality code. Who knows, you might even have a laugh along the way. You probably spotted me using an Integrated Development Environment (IDE). Depending on your budget (from free to perhaps a couple hundred dollars), an IDE will really help streamline your coding process, especially when working with complex projects. The eagle eyed among you may have spotted some AI in there as well from our friends at GitHub. Every bit helps! Dealing with Dependencies In the code walkthrough, I first tackled the structure of the code, and showed how to organize workspace members. This helps me resolve dependencies efficiently and start to test the binaries in isolation:[workspace] members = ["backend", "consumer",
"frontend"] resolver = "1"
Then I could just run the
consumer after stopping it in docker-compose with: cargo run
--package consumer --bin consumer
Updating the Driver
Another thing I did was update the driver. It’s important to keep
things in check with releases from ScyllaDB so we upgraded the Rust driver
for the whole project. I did a quick walkthrough of application
functionality and decided to write a quick smoke test that
simulated traffic on the front end in terms of messaging between
users. If you’re interested, I used a great load testing tool called k6 to simulate that
load. Here’s the script: export default function () {
http.post('http://localhost:3001/new_post', JSON.stringify({
content: 'bar', subject: 'foo', id:
'8d8712fc-786f-4d72-98ea-3669e56f7012' }), { headers: {
'Content-Type': 'application/json', }, }); }
Dealing with an
Offset Bug Once we had some messages flowing (perhaps way too many,
as it turned out) I discovered a potential bug, where the offset
was not being persisted between application restarts. This meant
every time we restarted the application, all of the messages would
be read from the topic and then re-written to the database. Without
understanding functionality like the ability to parse
consumer offsets in Redpanda, I went for a more naive approach
by storing the offset in ScyllaDB instead. I’m sure I’m not the
first dev to go down the wrong path, and I fully blame Felipe for
not intercepting earlier 😉 Refactoring Time In any case, it was fun
to see how we might approach the topic of refactoring code. It’s
always easier to start with small, manageable tasks when making
improvements or refactoring code. The first thing I did was decide
what the table (and ultimately query) might look like. This “query
first design” is an important design concept in ScyllaDB..Be sure
to check out some ScyllaDB
University courses on this. I decided the table would look
something like this to store my offset value: CREATE TABLE IF
NOT EXISTS ks.offset (consumer text PRIMARY KEY, count
BigInt)
We briefly touched on why I chose a
BigInt primitive instead of a
Counter value. The main reason is that we can’t arbitrarily set
the latter to a value, only increment or decrement it. We then
tackled how we might write to that table and came up with the
following: async fn update_offset(offset: i64, session:
&Session, update_counter: &PreparedStatement, consumer:
&str) -> Result<()> { session.execute(update_counter,
(offset, consumer)).await?; Ok(()) }
You’ll notice here that
I’m passing it a
prepared statement which is an important concept to grasp when
making your code perform well with ScyllaDB. Be sure to read the
docs on that if you’re unsure. I also recall writing a TODO to move
some existing prepared query statements outside a for loop. The
main reason: you only need to do this once for your app, not over
and over. So watch out for that mistake. I also stored my query as
a constant: const UPDATE_OFFSET: &str = "UPDATE ks.offset
SET count = ? WHERE consumer = ?";
There are different ways
to skin this, like maybe some form of model approach, but this was
a simple way to keep the queries in one place within the consumer
code. We restarted the app and checked the database using cqlsh
to see if the offsets were being written – and they weren’t! But
first, a quick tip from other webinars: If you’re running ScyllaDB
in a docker container, you can simple exec to it and run the tool:
docker exec -it scylla cqlsh
Back to my mistake, why
no writes to the table? If you recall, I write the offset after the
consumer has finished processing records from the topic:
offset = consumer(&postclient, "posts", offset,
&session).await; update_offset(offset, &session,
&update_counter, "posts").await.expect("Failed to update
offset"); tokio::time::sleep(Duration::from_secs(5)).await;
Since I had written a load test with something like 10K records,
that consumer takes some time to complete, so update_offset wasn’t
getting called straight away. By the end of the webinar, it
actually finished reading from the topic and wrote the offset to
the table. Another little change I snuck in there was on:
tokio::time::sleep(Duration::from_secs(5)).await;
Felipe spoke to the benefits of using tokio, an asynchronous runtime for Rust.
The previous thread sleep would in fact do nothing, hence the
change. Hooray for quick fixes! Once we had writes, we needed to
read from the table, so I added another function that looked like
this: async fn fetch_offset(session: &Session, consumer:
&str) -> Result { let query = "SELECT count FROM ks.offset
WHERE consumer = ?"; let values = (consumer,); let result =
session.query(query, values).await.expect("Failed to execute
query"); if let Some(row) =
result.maybe_first_row_typed::<(i64,)>().expect("Failed to
get row") { Ok(row.0) } else { Ok(0) } }
I spoke about some
common gotchas here, like misunderstanding how query
values work, with different types, and whether to use a slice
&[] or a tuple (). Query text is constant, but the values might
change. You can pass changing values to a query by specifying a
list of variables as bound values. Don’t forget the parenthesis! I
also highlighted some of the convenience methods in query
result, like maybe_first_row_typed. That returns
Option<RowT> containing the first row from the result – which
is handy when you just want the first row or None. Once again, you
can play around with types, and even use
custom structs if you prefer for the output. In my case, it was
just a tuple with an i64. The complete consumer code for posts
looked something like this: tokio::spawn(async move { use
std::time::Duration; info!("Posts Consumer Started"); let session =
db_session().await; let update_counter =
session.prepare(UPDATE_OFFSET).await.expect("Failed to prepare
query"); loop { let mut offset = fetch_offset(&session,
"posts").await.expect("Failed to fetch offset"); offset =
consumer(&postclient, "posts", offset, &session).await;
update_offset(offset, &session, &update_counter,
"posts").await.expect("Failed to update offset");
tokio::time::sleep(Duration::from_secs(5)).await; } });
You
can see I prepare the statement before the loop, then I fetch the
offset from the database, consume the topic, write the offset to
the database and sleep. Keep doing that forever! What We Didn’t
Have Time to Cover There were a few things that I wanted to cover,
but ran out of time. If you wanted to write results to a custom
struct, the code might look something like: #[derive(Default,
FromRow)] pub struct Offset { consumer: String, count: i64, } use
scylla::IntoTypedRows; async fn fetch_offset_type(session:
&Session, consumer: &str) -> Offset { let query =
"SELECT * FROM ks.offset WHERE consumer = ?"; let values =
(consumer,); let result = session.query(query,
values).await.expect("Failed to execute query"); if let Some(rows)
= result.rows { if let Some(row) = rows.into_typed::().next() { let
offset: Offset = row.expect("Failed to parse row"); return offset;
} } Offset { consumer: consumer.to_string(), count: 0, } }
There are some custom values you’ll come across like CqlTimestamps
and Counter… so you should be aware of the ways to handle these
different
data types. For example, rather than convert everything to and
from millisecond timestamps, you can add the
chrono feature flag on the crate to interact with time. You can
also improve logging with the driver’s support of the tracing crate for your
logs. If you add that, you can use a tracing subscriber as follows:
#[tokio::main] async fn main() {
tracing_subscriber::fmt::init(); …
Wrapping Up I personally
find refactoring code enjoyable. I’d encourage you to have a
patient, persistent approach to coding, testing and refactoring.
When it comes to ScyllaDB it’s a product where it really pays to
read the documentation, as many of the foot guns are well
documented. If you still find yourself stuck, feel free to ask
questions on the ScyllaDB forum and learn from your peers. And
remember, small, continuous improvements lead to long-term
benefits. Have fun! See what
you missed – watch the video Training Updates, Forum, Upcoming ScyllaDB University LIVE Event
The ScyllaDB “sea monster” community is growing fast, so we’re excited to announce lots of new resources to get you on the fast track to ScyllaDB success. In this post, I’ll update you on our next ScyllaDB Labs and ScyllaDB University LIVE training events, introduce new lessons on ScyllaDB University, and summarize some interesting discussions from the community forum. ScyllaDB University Updates For those not (yet) familiar with ScyllaDB University, it’s an online learning and training center for ScyllaDB. The self-paced lessons include theory and hands-on labs that you can run yourself. To get started, just create a (free) account. As the product evolves, we enhance and update the training material. One of the lessons we recently updated is the “How to Write Better Apps” lesson. Many of the issues new users commonly face can be avoided by using best practices and straightforward built-in debugging mechanisms. This lesson covers important metrics users should track, and how to keep track of them using the Monitoring stack. Other topics include prepared statements, token-aware drivers, denormalizing data, working with multiple data centers, and data modeling best practices (partition sizing, distribution, retries, and batching). Another lesson we recently updated is the “Materialized Views, Secondary Indexes, and Filtering” lesson. ScyllaDB offers three indexing options: Materialized Views, Global Secondary Indexes, and Local Secondary Indexes. I often get questions from users about the differences between them and how to use them. They are all covered in this lesson, along with a comparison, examples of when to use each, quizzes, and hands-on labs. By the end of the lesson, users will have an understanding of the different index types in ScyllaDB, how to use them, and when to use each one. Additionally, they’ll gain some hands-on experience by creating and using these indexes in the labs. Additionally, we are embedding interactive, hands-on labs within the lessons, as you can see in the Quick Wins lab. Having the lab embedded within the browser means that you can run it regardless of your operating system – and without any prerequisites. In addition to the on-demand ScyllaDB University portal, we periodically host live online training sessions. The next two we have scheduled are ScyllaDB Labs and ScyllaDB University LIVE. Start Learning at ScyllaDB University ScyllaDB Labs – 17 September, 2024 The interactive workshop will be held in an Asia-friendly time zone. Its focus is on providing hands-on experience building and interacting with high-performance applications using ScyllaDB. The labs introduce NoSQL strategies used by top teams, guiding participants through the creation of sample applications. We cover key topics such as determining if ScyllaDB is suitable for your use case, achieving low-latency NoSQL at scale, application performance optimization, data modeling and making important decisions when getting started with ScyllaDB. I will be hosting it together with my colleague Tim Koopmans. He will start with an introduction to ScyllaDB, covering some core concepts and a getting started lab. After that, my talk will focus on Data Modeling basics, including some architecture and another hands-on lab. Finally, attendees will have some time to run labs independently while we will be there to answer questions. Save Your Spot ScyllaDB University Live – 24 September, 2024 ScyllaDB University LIVE is an instructor-led NoSQL database online, half-day training event. It includes live sessions conducted by ScyllaDB’s lead engineers and architects and has two parallel tracks. Essentials: ScyllaDB architecture, key components, data modeling, and building your first ScyllaDB-powered app (in Rust!). Advanced Topics: Deep dives into cluster topology, advanced data modeling, optimizing elasticity and efficiency, and power user tips and tricks. Participants can switch between tracks or attend specific sessions of interest. After the training, there will be an expert panel to answer user questions. Attendees will also have the chance to complete quizzes, participate in hands-on labs, earn certificates, and receive exclusive ScyllaDB swag. The sessions are live, and there’s no on-demand option. Save Your Spot What’s Trending on the ScyllaDB Community Forum The community forum is the place to discuss all things NoSQL related. You can ask questions, learn what your peers are doing, and share how you’re using ScyllaDB. Here is a summary of some of the top topics. GUI Visualization for ScyllaDB Many new users ask about using a GUI with ScyllaDB. Some relevant tools are the ScyllaDB Manager and the Monitoring Stack. One user suggested using JetBrains Rider, a paid tool that he found useful. Additionally, DBeaver, in some versions, now supports ScyllaDB. It’s a universal database manager that allows users to run CQL queries and view result tables directly within the interface. See the complete discussion Kafka: How to extract nested field in JSON structure? Here a user that is migrating from MySQL to ScyllaDB, and using ElasticSearch, is using thescylladb-cdc-source-connector
to publish CDC messages
to a Kafka topic and is facing issues with accessing nested fields
in the JSON message structure. Existing Single Message
Transformations (SMTs) don’t support accessing nested fields, some
workaround options are discussed.
See the complete discussion Best way to Fetch N rows in
ScyllaDB: Count, Limit or Paging This topic discusses different
ways for fetching N rows while using the TTL feature, optimizing
for performance and efficiency. Three options are mentioned: using
count(), using LIMIT, and using Paging. Some suggestions were to
change the clustering key to include the timestamp and allow for
more efficient queries, as well as using a Counter Table. Another
point that was brought up was the performance difference between
COUNT and LIMIT.
See the complete discussion Read_concurrency_semaphore & p99
read latency The user is experiencing high P99 read latency in an
application that queries time-series data using ScyllaDB, despite
low average latency. The application uses a ScyllaDB cluster with 3
nodes, each with 16 cores, 128 GB RAM, and 3 TB RAID0 SSDs. The
schema is designed for time-series data with a composite primary
key and TimeWindowCompactionStrategy for compaction. While the
ScyllaDB dashboard shows P99 read latency as low (1-10ms), the
gocql latency report shows occasional spikes in P99 latency (700ms
to 1s). The user has tried profiling and tracing but cannot
identify the root cause.
See the complete discussion ScyllaDB vs Aerospike The user read
a whitepaper comparing ScyllaDB’s and Aerospike’s performance. The
paper shows that ScyllaDB outperforms Aerospike by 30-40%. The user
has several questions about the methodology of the tests used,
versions, configurations, and so on. See the
complete discussion
Say Hello to the Community