Innovative data compression for time series: An open source solution
Introduction
There’s no escaping the role that monitoring plays in our everyday lives. Whether it’s from monitoring the weather or the number of steps we take in a day, or computer systems to ever-popular IoT devices.
Practically any activity can be monitored in one form or another these days. This generates increasing amounts of data to be pored over and analyzed–but storing all this data adds significant costs over time. Given this huge amount of data that only increases with each passing day, efficient compression techniques are crucial.
Here at NetApp® Instaclustr we saw a great opportunity to improve the current compression techniques for our time series data. That’s why we created the Advanced Time Series Compressor (ATSC) in partnership with University of Canberra through the OpenSI initiative.
ATSC is a groundbreaking compressor designed to address the challenges of efficiently compressing large volumes of time-series data. Internal test results with production data from our database metrics showed that ATSC would compress, on average of the dataset, ~10x more than LZ4 and ~30x more than the default Prometheus compression. Check out ATSC on GitHub.
There are so many compressors already, so why develop another one?
While other compression methods like LZ4, DoubleDelta, and ZSTD are lossless, most of our timeseries data is already lossy. Timeseries data can be lossy from the beginning due to under-sampling or insufficient data collection, or it can become lossy over time as metrics are rolled over or averaged. Because of this, the idea of a lossy compressor was born.
ATSC is a highly configurable, lossy compressor that leverages the characteristics of time-series data to create function approximations. ATSC finds a fitting function and stores the parametrization of that function—no actual data from the original timeseries is stored. When the data is decompressed, it isn’t identical to the original, but it is still sufficient for the intended use.
Here’s an example: for a temperature change metric—which mostly varies slowly (as do a lot of system metrics!)—instead of storing all the points that have a small change, we fit a curve (or a line) and store that curve/line achieving significant compression ratios.
Image 1: ATSC data for temperature
How does ATSC work?
ATSC looks at the actual time series, in whole or in parts, to find how to better calculate a function that fits the existing data. For that, a quick statistical analysis is done, but if the results are inconclusive a sample is compressed with all the functions and the best function is selected.
By default, ATSC will segment the data—this guarantees better local fitting, more and smaller computations, and less memory usage. It also ensures that decompression targets a specific block instead of the whole file.
In each fitting frame, ATSC will create a function from a pre-defined set and calculate the parametrization of said function.
ATSC currently uses one (per frame) of those following functions:
- FFT (Fast Fourier Transforms)
- Constant
- Interpolation – Catmull-Rom
- Interpolation – Inverse Distance Weight
Image 2: Polynomial fitting vs. Fast-Fourier Transform fitting
These methods allow ATSC to compress data with a fitting error within 1% (configurable!) of the original time-series.
For a more detailed insight into ATSC internals and operations check our paper!
Use cases for ATSC and results
ATSC draws inspiration from established compression and signal analysis techniques, achieving compression ratios ranging from 46x to 880x with a fitting error within 1% of the original time-series. In some cases, ATSC can produce highly compressed data without losing any meaningful information, making it a versatile tool for various applications (please see use cases below).
Some results from our internal tests comparing to LZ4 and normal Prometheus compression yielded the following results:
Method | Compressed size (bytes) | Compression Ratio |
Prometheus | 454,778,552 | 1.33 |
LZ4 | 141,347,821 | 4.29 |
ATSC | 14,276,544 | 42.47 |
Another characteristic is the trade-off between fast compression speed vs. slower compression speed. Compression is about 30x slower than decompression. It is expected that time-series are compressed once but decompressed several times.
Image 3: A better fitting (purple) vs. a loose fitting (red). Purple takes twice as much space.
ATSC is versatile and can be applied in various scenarios where space reduction is prioritized over absolute precision. Some examples include:
- Rolled-over time series: ATSC can offer significant space savings without meaningful loss in precision, such as metrics data that are rolled over and stored for long term. ATSC provides the same or more space savings but with minimal information loss.
- Under-sampled time series: Increase sample rates without losing space. Systems that have very low sampling rates (30 seconds or more) and as such, it is very difficult to identify actual events. ATSC provides the space savings and keeps the information about the events.
- Long, slow-moving data series: Ideal for patterns that are easy to fit, such as weather data.
- Human visualization: Data meant for human analysis, with minimal impact on accuracy, such as historic views into system metrics (CPU, Memory, Disk, etc.)
Image 4: ATSC data (green) with an 88x compression vs. the original data (yellow)
Using ATSC
ATSC is written in Rust as and is available in GitHub. You can build and run yourself following these instructions.
Future work
Currently, we are planning to evolve ATSC in two ways (check our open issues):
- Adding features to the core compressor
focused on
these functionalities:
- Frame expansion for appending new data to existing frames
- Dynamic function loading to add more functions without altering the codebase
- Global and per-frame error storage
- Improved error encoding
- Integrations with
additional
technologies (e.g.
databases):
- We are currently looking into integrating ASTC with ClickHouse® and Apache Cassandra®
CREATE TABLE sensors_poly ( sensor_id UInt16, location UInt32, timestamp DateTime, pressure Float64 CODEC(ATSC('Polynomial', 1)), temperature Float64 CODEC(ATSC('Polynomial', 1)), ) ENGINE = MergeTree ORDER BY (sensor_id, location, timestamp);
Image 5: Currently testing ClickHouse integration
Sound interesting? Try it out and let us know what you think.
ATSC represents a significant advancement in time-series data compression, offering high compression ratios with a configurable accuracy loss. Whether for long-term storage or efficient data visualization, ATSC is a powerful open source tool for managing large volumes of time-series data.
But don’t just take our word for it—download and run it!
Check our documentation for any information you need and submit ideas for improvements or issues you find using GitHub issues. We also have easy first issues tagged if you’d like to contribute to the project.
Want to integrate this with another tool? You can build and run our demo integration with ClickHouse.
The post Innovative data compression for time series: An open source solution appeared first on Instaclustr.
ScyllaDB 2024.2 Introduces New Efficiency & Elasticity via “Tablets”
New capabilities introduced in ScyllaDB Enterprise make scaling operations with Tablets up to 30X faster while reducing network costs by up to 50% ScyllaDB just released ScyllaDB 2024.2, the first enterprise release featuring ScyllaDB’s new “tablets” replication architecture. This new architecture, which builds upon a multiyear project to implement and extend the Raft consensus protocol, enables new levels of elasticity, speed, simplicity, and efficiency. The enterprise release offers new capabilities designed to help you reduce infrastructure costs and streamline operations: Tablets, a dynamic data distribution architecture that significantly improves elasticity and scalability (limited availability – details in release notes) File-based streaming for tablets further speeds up scaling operations (e.g., adding and removing nodes) Strongly consistent topology updates, authentication updates, service levels (workload prioritization) ZSTD-based network compression for intra-node RPC, with a shard dictionary for improved performance DynamoDB API (Alternator) enhancements such as role-based access control See the release notes for details The ScyllaDB Enterprise 2024.2 release is based on ScyllaDB 6.0; it includes *all* the features available in ScyllaDB 6.0 like: Tablets, a dynamic way to distribute data across nodes that significantly improves scalability Strongly consistent topology, Auth, and Service Level updates In addition, 2024.2 includes enterprise-only features such as: Improved network compression (see below) File-based streaming for tablets A new FIPS enabled Docker Image Related links: Read more about ScyllaDB Enterprise Get ScyllaDB Enterprise 2024.2 (customers only, or 30-day evaluation) Upgrade from ScyllaDB Enterprise 2024.1.x to 2024.2.y Upgrade from ScyllaDB Open Source 6.0 to ScyllaDB Enterprise 2024.2.x ScyllaDB Enterprise customers are encouraged to upgrade to ScyllaDB Enterprise 2024.2, and are welcome to contact our Support Team with questions. Tablets In this release, ScyllaDB enabled Tablets, a new data distribution algorithm as a better alternative to the legacy vNodes approach inherited from Apache Cassandra. While the vNodes approach statically distributes all tables across all nodes and shards based on the token ring, the Tablets approach dynamically distributes each table to a subset of nodes and shards based on its size. In the future, distribution will use CPU, OPS, and other information to further optimize the distribution. In particular, Tablets provide the following: Faster scaling and topology changes. New nodes can start serving reads and writes as soon as the first Tablet is migrated. Together with Strongly Consistent Topology Updates (below), this also allows users to add multiple nodes simultaneously. Automatic support for mixed clusters with different core counts. Manual vNode updates are not required. More efficient operations on small tables., since such tables are placed on a small subset of nodes and shards. Note that you can run a cluster with some of the Keyspaces with Tablets disabled, and some with tablets enabled for as long as you wish. The scaling improvement will be partial, and limited to Keyspaces with Tables enabled. Read the Tablets documentation Currently, tablets are ideal for new clusters that you frequently scale out or in and that have one main large table and potentially many tiny ones. ScyllaDB Support can help you determine if Tablets in release 2024.2 are a good solution for your use case. Learn more about tablets: Smooth Scaling: Why ScyllaDB Moved to “Tablets” Data Distribution: The rationale behind ScyllaDB’s new “tablets” replication architecture ScyllaDB Fast Forward: True Elastic Scale: Introducing major architectural shifts that enable new levels of elasticity and operational simplicity Elasticity, Speed & Simplicity: Get the Most Out of New ScyllaDB Capabilities: A technical walkthrough of exactly what’s changed from the user/operator perspective Monitoring Tablets To Monitor Tablets in real time, upgrade ScyllaDB Monitoring Stack to release 4.8, and use the new dynamic Tablet panels, below. Driver Support The Following Drivers support Tablets Java driver 4.x, from 4.18.0.2 Java driver 3.x, from 3.11.5.2 Python driver, from 3.26.6 Gocql driver, from 1.13.0 Rust driver from 0.13.0 Legacy ScyllaDB and Apache Cassandra drivers will continue to work with ScyllaDB but will be less efficient when working with tablet-based Keyspaces. Read the blog post “How We Updated ScyllaDB Drivers for Tablets Elasticity” File based streaming for Tablets File-based streaming is a ScyllaDB Enterprise-only feature that optimizes tablet migration. In ScyllaDB Open Source, migrating tablets is performed by streaming mutation fragments, which involves deserializing SSTable files into mutation fragments and re-serializing them back into SSTables on the other node. In ScyllaDB Enterprise, migrating tablets is performed by streaming entire SStables, which does not require (de)serializing or processing mutation fragments. As a result, less data is streamed over the network, and less CPU is consumed, especially for data models that contain small cells. File-based streaming is used for tablet migration in all keyspaces created with tablets enabled. Read the file-based streaming documentation Consistent Topology and Metadata Strongly Consistent Topology Updates With Raft-managed topology enabled, all topology operations are internally sequenced consistently. A centralized coordination process ensures that topology metadata is synchronized across the nodes on each step of a topology change procedure. This makes topology updates fast and safe, as the cluster administrator can trigger many topology operations concurrently, and the coordination process will safely drive all of them to completion. For example, multiple nodes can be bootstrapped concurrently, which couldn’t be done with the previous gossip-based topology. Strongly Consistent Topology Updates is now the default for new clusters, and should be enabled after upgrade for existing clusters. Read the blog post “ScyllaDB’s Safe Topology and Schema Changes on Raft” Strongly Consistent Auth Updates System-auth-2 is a reimplementation of the Authentication and Authorization systems in a strongly consistent way on top of the Raft sub-system. This means that Role-Based Access Control (RBAC) commands like create role or grant permission are safe to run in parallel without a risk of getting out of sync with themselves and other metadata operations, like schema changes. As a result, there is no need to update system_auth RF or run repair when adding a Data Center. Strongly Consistent Service Levels Service Levels allow you to define attributes like timeout per workload. Service levels are now strongly consistent using Raft, like Schema, Topology and Auth. Improved network compression for intra-node RPC This release adds new Enterprise only RPC compression improvements for node to node communication: Using zstd instead of lz4 Using a shared dictionary re-trained periodically on the traffic, instead of the message by message compression. Below is a comparison of compressions algorithms on different types of data. Note that dictionary based compression can be used with either lz4 or zstd. The actual compression is very much workload-dependent and can vary between use cases. Alternator Role-Based Access Control Alternator supports Role-Based Access Control (RBAC) for authorization. Control is done via the CQL API. Native Nodetool The nodetool utility provides simple command-line interface operations and attributes. ScyllaDB inherited the Java based nodetool from Apache Cassandra. In this release, the Java implementation was replaced with a backward-compatible native nodetool. The native nodetool works much faster. Unlike the Java version ,the native nodetool is part of the ScyllaDB repo, and allows easier and faster updates. With the native nodetool, the JMX server has become redundant and will no longer be part of the default ScyllaDB Installation or image. If you are using the JMX server directly, not via nodetool, you can either: Work directly with the ScyllaDB REST API (recommended) Install the JMX server yourself. See https://github.com/scylladb/scylla-jmx for instructions. As part of moving to native tooling and away from Java tools, we will deprecate SSTableloader in future versions of ScyllaDB Enterprise. You can use the Load and Stream to upload SSTables directly to ScyllaDB, either from Apache Cassandra or other ScyllaDB clusters. We are also deprecating the Java version of nodetool, which was replaced by a compatible native version (see above). Maintenance Maintenance mode is a new mode in which the node does not communicate with clients or other nodes and only listens to the local maintenance socket and the REST API. It can be used to fix damaged nodes – for example, by using nodetool compact or nodetool scrub. In maintenance mode, ScyllaDB skips loading tablet metadata if it is corrupted to allow an administrator to fix it. Also, the Maintenance Socket provides a new way to interact with ScyllaDB from within the node it runs on. It is mainly for debugging. You can use CQLSh with the Maintenance Socket as described in the Maintenance Socket docs. Improvements and Bug Fixes The latest release also features extensive improvements to stability, performance, monitoring and more. For details, see the release notes on the ScyllaDB Community Forum. See the details release notesDatabase Internals: Working with IO
Explore the tradeoffs of different Linux I/O methods and learn how databases can take advantage of a modern SSD’s unique characteristics The following blog is an excerpt from Chapter 3 of the Database Performance at Scale book, which is available for free. This book sheds light on often overlooked factors that impact database performance at scale. Unless the database engine is an in-memory one, it will have to keep the data on external storage. There can be many options to do that, including local disks, network-attached storage, distributed file- and object- storage systems, etc. The term “I/O” typically refers to accessing data on local storage – disks or file systems (that, in turn, are located on disks as well). And in general, there are four choices for accessing files on a Linux server: read/write, mmap, direct I/O (DIO) read/write, and asynchronous I/O (AIO/DIO, because this I/O is rarely used in cached mode). Traditional read/write The traditional method is to use the read(2) and write(2) system calls. In a modern implementation, the read system call (or one of its many variants – pread, readv, preadv, etc) asks the kernel to read a section of a file and copy the data into the calling process address space. If all of the requested data is in the page cache, the kernel will copy it and return immediately; otherwise, it will arrange for the disk to read the requested data into the page cache, block the calling thread, and when the data is available, it will resume the thread and copy the data. A write, on the other hand, will usually just copy the data into the page cache; the kernel will write-back the page cache to disk some time afterward. mmap An alternative and more modern method is to memory-map the file into the application address space using the mmap(2) system call. This causes a section of the address space to refer directly to the page cache pages that contain the file’s data. After this preparatory step, the application can access file data using the processor’s memory read and memory write instructions. If the requested data happens to be in cache, the kernel is completely bypassed and the read (or write) is performed at memory speed. If a cache miss occurs, then a page-fault happens and the kernel puts the active thread to sleep while it goes off to read the data for that page. When the data is finally available, the memory-management unit is programmed so the newly read data is accessible to the thread which is then awoken. Direct I/O (DIO) Both traditional read/write and mmap involve the kernel page cache and defer the scheduling of I/O to the kernel. When the application wishes to schedule I/O itself (for reasons that we will explain later), it can use direct I/O, shown in Figure 3-4. This involves opening the file with the O_DIRECT flag; further activity will use the normal read and write family of system calls. However, their behavior is now altered: instead of accessing the cache, the disk is accessed directly, which means that the calling thread will be put to sleep unconditionally. Furthermore, the disk controller will copy the data directly to userspace, bypassing the kernel. Figure 3-4: Direct IO involves opening the file with the O_DIRECT flag; further activity will use the normal read and write family of system calls, but their behavior is now altered Asynchronous I/O (AIO/DIO) A refinement of direct I/O, asynchronous direct I/O behaves similarly but prevents the calling thread from blocking (see Figure 3-5). Instead, the application thread schedules direct I/O operations using theio_submit(2)
system call, but the thread is not blocked; the I/O operation runs
in parallel with normal thread execution. A separate system call,
io_getevents(2)
, is used to wait for and collect the
results of completed I/O operations. Like DIO, the kernel’s page
cache is bypassed, and the disk controller is responsible for
copying the data directly to userspace. Figure 3-5: A
refinement of direct I/O, asynchronous direct I/O behaves similarly
but prevents the calling thread from blocking Note:
io_uring The API to perform asynchronous I/O appeared in
Linux long ago, and it was warmly met by the community. However, as
it often happens, real-world usage quickly revealed many
inefficiencies such as blocking under some circumstances (despite
the name), the need to call the kernel too often, and poor support
for canceling the submitted requests. Eventually, it became clear
that the updated requirements are not compatible with the existing
API and the need for a new one arose. This is how the io_uring()
API appeared. It provides the same facilities as what AIO does, but
in a much more convenient and performant way (also it has notably
better documentation). Without diving into implementation details,
let’s just say that it exists and is preferred over the legacy AIO.
Understanding the tradeoffs The different access methods share some
characteristics and differ in others. Table 3-1 summarizes these
characteristics, which are elaborated on below. Copying and MMU
activity One of the benefits of the mmap method is that if the data
is in cache, then the kernel is bypassed completely. The kernel
does not need to copy data from the kernel to userspace and back,
so fewer processor cycles are spent on that activity. This benefits
workloads that are mostly in cache (for example, if the ratio of
storage size to RAM size is close to 1:1). The downside of mmap,
however, occurs when data is not in the cache. This usually happens
when the ratio of storage size to RAM size is significantly higher
than 1:1. Every page that is brought into the cache causes another
page to be evicted. Those pages have to be inserted into and
removed from the page tables; the kernel has to scan the page
tables to isolate inactive pages, making them candidates for
eviction, and so forth. In addition, mmap requires memory for the
page tables. On x86 processors, this requires 0.2% of the size of
the mapped files. This seems low, but if the application has a
100:1 ratio of storage to memory, the result is that 20% of memory
(0.2% * 100) is devoted to page tables. I/O scheduling One of the
problems with letting the kernel control caching (with the mmap and
read/write access methods) is that the application loses control of
I/O scheduling. The kernel picks whichever block of data it deems
appropriate and schedules it for write or read. This can result in
the following problems: A write storm. When the
kernel schedules large amounts of writes, the disk will be busy for
a long while and impact read latency The kernel cannot
distinguish between “important” and “unimportant” I/O. I/O
belonging to background tasks can overwhelm foreground tasks,
impacting their latency By bypassing the kernel page cache, the
application takes on the burden of scheduling I/O. This doesn’t
mean that the problems are solved, but it does mean that the
problems can be solved – with sufficient attention and effort. When
using Direct I/O, each thread controls when to issue I/O However,
the kernel controls when the thread runs, so responsibility for
issuing I/O is shared between the kernel and the application. With
AIO/DIO, the application is in full control of when I/O is issued.
Thread scheduling An I/O intensive application using mmap or
read/write cannot guess what its cache hit rate will be. Therefore
it has to run a large number of threads (significantly larger than
the core count of the machine it is running on). Using too few
threads, they may all be waiting for the disk leaving the processor
underutilized. Since each thread usually has at most one disk I/O
outstanding, the number of running threads must be around the
concurrency of the storage subsystem multiplied by some small
factor in order to keep the disk fully occupied. However, if the
cache hit rate is sufficiently high, then these large numbers of
threads will contend with each other for the limited number of
cores. When using Direct I/O, this problem is somewhat mitigated.
The application knows exactly when a thread is blocked on I/O and
when it can run, so the application can adjust the number of
running threads according to runtime conditions. With AIO/DIO, the
application has full control over both running threads and waiting
I/O (the two are completely divorced), so it can easily adjust to
in-memory or disk-bound conditions or anything in between. I/O
alignment Storage devices have a block size; all I/O must be
performed in multiples of this block size which is typically 512 or
4096 bytes. Using read/write or mmap, the kernel performs the
alignment automatically; a small read or write is expanded to the
correct block boundary by the kernel before it is issued. With DIO,
it is up to the application to perform block alignment. This incurs
some complexity, but also provides an advantage: the kernel will
usually over-align to a 4096 byte boundary even when a 512-byte
boundary suffices. However, a user application using DIO can issue
512-byte aligned reads, which results in saving bandwidth on small
items. Application complexity While the previous discussions
favored AIO/DIO for I/O intensive applications, that method comes
with a significant cost: complexity. Placing the responsibility of
cache management on the application means it can make better
choices than the kernel and make those choices with less overhead.
However, those algorithms need to be written and tested. Using
asynchronous I/O requires that the application is written using
callbacks, coroutines, or a similar method, and often reduces the
reusability of many available libraries. Choosing the Filesystem
and/or Disk Beyond performing the I/O itself, the database design
must consider the medium against which this I/O is done. In many
cases, the choice is often between a filesystem or a raw block
device, which in turn can be a choice of a traditional spinning
disk or an SSD drive. In cloud environments, however, there can be
the third option because local drives are always ephemeral – which
imposes strict requirements on the replication. Filesystems vs raw
disks This decision can be approached from two angles: management
costs and performance. If accessing the storage as a raw block
device, all the difficulties with block allocation and reclamation
are on the application side. We touched on this topic slightly
earlier when we talked about memory management. The same set of
challenges apply to RAM as well as disks. A connected, though very
different, challenge is providing data integrity in case of
crashes. Unless the database is purely in-memory, the I/O should be
done in a way that avoids losing data or reading garbage from disk
after a restart. Modern file systems, however, provide both and are
very mature to trust the efficiency of allocations and integrity of
data. Accessing raw block devices unfortunately lacks those
features (so they will need to be implemented at the same quality
on the application side). From the performance point of view, the
difference is not that drastic. On one hand, writing data to a file
is always accompanied by associated metadata updates. This consumes
both disk space and I/O bandwidth. However, some modern file
systems provide a very good balance of performance and efficiency,
almost eliminating the IO latency. (One of the most prominent
examples is XFS. Another really good and mature piece of software
is Ext4). The great ally in this camp is the fallocate(2) system
call that makes the filesystem pre-allocate space on disk. When
used, filesystems also have a chance to make full use of the
extents mechanisms, thus bringing the QoS of using files to the
same performance level as when using raw block devices. Appending
writes The database may have a heavy reliance on appends to files
or require in-place updates of individual file blocks. Both
approaches need special attention from the system architect because
they call for different properties from the underlying system. On
one hand, appending writes requires careful interaction with the
filesystem so that metadata updates (file size, in particular) do
not dominate the regular I/O. On the other hand, appending writes
(being sort of cache-oblivious algorithms) handle the disk
overwriting difficulties in a natural manner. Contrary to this,
in-place updates cannot happen at random offsets and sizes because
disks may not tolerate this kind of workload, even if used in a raw
block device manner (not via a filesystem). That being said, let’s
dive even deeper into the stack and descend into the hardware
level. How modern SSDs work Like other computational resources,
disks are limited in the speed they can provide. This speed is
typically measured as a 2-dimensional value with Input/Output
Operations per Second (IOPS) and bytes per second
(throughput). Of course, these parameters are not cut in stone even
for each particular disk, and the maximum number of requests or
bytes greatly depends on the requests’ distribution, queuing and
concurrency, buffering or caching, disk age, and many other
factors. So when performing I/O, a disk must always balance between
two inefficiencies — overwhelming the disk with requests and
underutilizing it. Overwhelming the disk should be avoided because
when the disk is full of requests it cannot distinguish between the
criticality of certain requests over others. Of course, all
requests are important, but it makes sense to prioritize
latency-sensitive requests. For example, ScyllaDB serves real-time
queries that need to be completed in single-digit milliseconds or
less and, in parallel, it processes terabytes of data for
compaction, streaming, decommission, and so forth. The former have
strong latency sensitivity; the latter are less so. Good I/O
maintenance that tries to maximize the I/O bandwidth while keeping
latency as low as possible for latency-sensitive tasks is
complicated enough to become a standalone component called the
IO
Scheduler. When evaluating a disk, one would most likely
be looking at its 4 parameters — read/write IOPS and read/write
throughput (such as in MB/s). Comparing these numbers to one
another is a popular way of claiming one disk is better than the
other and estimating the aforementioned “bandwidth capacity” of the
drive by applying Little’s Law. With that, the IO Scheduler’s job
is to provide a certain level of concurrency inside the disk to get
maximum bandwidth from it, but not to make this concurrency too
high in order to prevent the disk from queueing requests internally
for longer than needed. For instance, Figure 3-6 illustrates how
read request latency depends on the intensity of small reads
(challenging disk IOPS capacity) vs the intensity of large writes
(pursuing the disk bandwidth). The latency value is color-coded,
and the “interesting area” is painted in cyan — this is where the
latency stays below 1 millisecond. The drive measured is the NVMe
disk that comes with the AWS EC2 i3en.3xlarge instance. Figure
3-6: Bandwidth/latency graphs showing how read request latency
depends on the intensity of small reads (challenging disk IOPS
capacity) vs the intensity of large writes (pursuing the disk
bandwidth) This drive demonstrates almost perfect half-duplex
behavior — increasing the read intensity several times requires
roughly the same reduction in write intensity to keep the disk
operating at the same speed. Tip: How to measure your own
disk behavior under load The better you understand how
your own disks perform under load, the better you can tune them to
capitalize on their “sweet spot.” One way to do this is with
Diskplorer, an open-source disk latency/bandwidth exploring
toolset. By using Linux fio under the hood it runs a battery of
measurements to discover performance characteristics for a specific
hardware configuration, giving you an at-a-glance view of how
server storage I/O will behave under load. For a walkthrough of how
to use this tool, see this Linux Foundation video, “Understanding
Storage I/O Under Load.” Streamlining Data Queries with Amazon Q and Cassandra Query Language
Managing massive datasets is a priority for today’s developers, and Apache Cassandra® stands out as a powerful, scalable solution for distributed data management. With CQL (Cassandra Query Language), developers can interact seamlessly with these large-scale databases. But what if there were a way...When ScyllaDB is Overkill vs. DynamoDB
ScyllaDB isn’t for everyone. In some cases, migrating from DynamoDB won’t reduce your costs at all. ScyllaDB isn’t for everyone. It’s designed specifically for teams that need predictable ultra-low latency with high throughput. And since one size never fits all, there are inevitably situations where you’d be much better served with a different database. For teams that need relatively high performance and want something simple to use, that “different database” is often DynamoDB. We’re the first to admit that there are situations where you’d be quite fine with DynamoDB. Sometimes moving to ScyllaDB just doesn’t make sense from a pure cost perspective. Other times, it could lower costs 5-40X. And there are other situations when one database is just a better fit from a technical perspective – regardless of costs. So where is the tipping point if you’re looking at the decision from a cost perspective? Often, it’s somewhere around 10,000 OPS. But the precise answer varies depending on your workload characteristics (read/write ratios) and the amount of data under management. That’s what we’ll focus on in this article. ScyllaDB and DynamoDB are a lot alike But first, let’s step back. Why are we comparing ScyllaDB and DynamoDB in the first place? For the past couple of years, we’ve worked with a lot of users moving from DynamoDB to ScyllaDB. That makes sense, given that ScyllaDB and DynamoDB share common goals of high performance and low hassle. But they’re fundamentally built differently. ScyllaDB’s close-to-the-metal architecture handles millions of ops/sec with predictable single-digit millisecond latencies and lower, predictable costs. Still, you can use the same data model in ScyllaDB as in DynamoDB. And an open source DynamoDB API (“Alternator”) simplifies migration. You redirect your application to ScyllaDB instead of DynamoDB and it just works like magic (actually, we listen on a specific port that understands the DynamoDB API). In most cases, minimal code changes are required. Moreover, both databases provide high performance, high availability, and multi-region support – with a fully managed “database-as-a-service” option. Can you make use of (at least) a minimally viable ScyllaDB cluster? One key difference between ScyllaDB and DynamoDB is provisioning clusters versus provisioning tables. When you work with DynamoDB, you provision tables and assign a different billing mode or capacity to each. In ScyllaDB, you provision a group of nodes (VMs, containers, pods, etc) that collectively manage and distribute data as a cluster. You can route traffic to that cluster with great performance…as long as there is enough capacity to sustain your workload’s needs. Even for applications with minimal traffic, you still want to provision a full ScyllaDB cluster. The smallest ScyllaDB Cloud cluster runs with 3-nodes. That’s 1) to ensure high availability and 2) to serve strongly consistent [or quorum] reads even if one node fails or goes out for maintenance. But in some cases, even that minimal ScyllaDB cluster might provide more power than you really need. ScyllaDB’s sweet spot is throughput-heavy workloads, given its ability to sustain massive parallel processing at scale. If your throughput wouldn’t really benefit from that, ScyllaDB is quite possibly overkill. The amount of data under management is another factor to consider. ScyllaDB is optimized for high throughput and low latencies and assumes that most of your data is frequently queried and requires low latencies. We achieve this by relying on local SSDs, which provide high concurrency and low latencies compared to other storage mediums. If you have a lot of data under management, your ScyllaDB cluster might require more, or larger, VMs. However, if your throughput isn’t high enough to make good use of that infrastructure because most of your data set is read infrequently, then ScyllaDB is probably overkill from a cost perspective. Discovering your specific tipping point With that reasoning in mind, let’s get more specific. You’re probably curious about what makes most sense for your own workload and storage requirements? There’s a calculator for that! We built a cost estimation calculator that compares ScyllaDB’s on-demand pricing vs DynamoDB’s on-demand pricing, using the same math as AWS. If you’re debating between selecting DynamoDB and ScyllaDB – or thinking of migrating from DynamoDB – these on-demand cost estimates are a fast way to see if ScyllaDB could be worth exploring. First off, note the minimums. The lowest number of operations per second (OPS) you can specify is 10K and the minimal storage set size is 1TB. That’s a pretty big hint – ScyllaDB is probably overkill for anything under that. Second, be careful when you’re entering your storage utilization. If you’re already using DynamoDB, don’t just copy over your DynamoDB storage utilization. Keep in mind that the reported utilization there refers to uncompressed RAW data. As you move to another database, your storage utilization will be less because you will typically be able to achieve at least 50% compression, if not more. Before you go and plug in your own numbers, let’s walk through how it plays out across two very different scenarios. Scenario 1: Storage Bound First, let’s consider a “storage-bound” case. For example, say you’re consuming 250TB of storage with DynamoDB. You would need 18 nodes of i3en.24xlarge – which are VERY large instances – to support 250TB in ScyllaDB. But if you consider that ScyllaDB’s compression typically provides a 50% gain, that would take us down to 125TB and require 9 nodes of i3en.24xlarge. If you look at the cluster capacity area, you see that this cluster can easily sustain close to 3M operations/second (as a rather conservative estimate) Now, if we click the DynamoDB button, you’ll see that our calculator tries to make a ScyllaDB and DynamoDB cost comparison analysis for you. But, unfortunately, it is telling us to talk to Sales. This indicates ScyllaDB is likely more expensive than DynamoDB here because you have so much storage for so few operations. Still, if you have other non-cost-related reasons to move, it might be worth a discussion. However, now assume that you decided to store only your most frequently accessed data on ScyllaDB. Also assume that’s 10% of the total 125TB (thus 13TB). In this case, the ScyllaDB pricing also drops….by a factor of ~10. Scenario 2: Write-Heavy Now, let’s increase the rate of writes, which are more expensive than reads with DynamoDB. If you bump up the throughput, note that the ScyllaDB estimates are the same for anywhere from 200K to 2M writes per second. That’s because as you move from 200K to 2M with ScyllaDB, you already have enough hardware to sustain this level of operations. In contrast, DynamoDB pricing keeps increasing when you increase the write throughput. Get your own cost estimation now Other factors to consider Of course, cost isn’t everything. Even when ScyllaDB seems like a more cost-effective option, moving from DynamoDB doesn’t always make sense. If your use case is heavily dependent on the AWS ecosystem, moving from DynamoDB to ScyllaDB might require considerable refactoring work. I once met with a DynamoDB user who started looking at ScyllaDB to replace DynamoDB and they had over a thousand Lambdas connected to DynamoDB. That’s a thousand lambdas you would need to refactor to connect to ScyllaDB. Also consider the feature set you currently require from DynamoDB. Although There is not a one-to-to one mapping for every DynamoDB features like multi-item transactions (TransactWriteItems, TransactGetItems) and accounting/capping are not (yet) available in ScyllaDB. For example, accounting and capping is an interesting one. I once met with a user who used DynamoDB’s provisioned throughput to their benefit, and throttled and billed their own customers according to it. Currently, there’s no such thing in ScyllaDB since we don’t impose any throughput limits by default. In this case, ScyllaDB didn’t make sense. On the flip side, there are two key situations where it makes sense to consider ScyllaDB even if there’s not an impressive cost advantage vs. DynamoDB: when you need better latency and more deployment flexibility. If DynamoDB can’t achieve the latency you need for whatever reason – or you want to keep latencies low without the hassles/cost of DAX – ScyllaDB could likely help you address that. And if you need to move your DynamoDB workloads to another cloud or on-prem, ScyllaDB can help you move fast with just minimal application refactoring – often just a one-line change. I talk a lot more about both of these situations in the blog post, DynamoDB: When to Move Out. Rule of thumb So the bottom line truly is “it depends.” But ScyllaDB could very well be overkill if your workload is under 10K OPS and You don’t expect much throughput growth, You’re satisfied with how DynamoDB meets your latency SLAs, and You have no foreseeable need to move beyond AWS Bonus: DynamoDB Cost Optimization Masterclass If you want more opinions, more strategies,and more options about DynamoDB cost optimization, I encourage you to take a look at this masterclass I recently participated in with Ales DeBrie and Miles Ward…Netflix’s Distributed Counter Abstraction
By: Rajiv Shringi, Oleksii Tkachuk, Kartik Sathyanarayanan
Introduction
In our previous blog post, we introduced Netflix’s TimeSeries Abstraction, a distributed service designed to store and query large volumes of temporal event data with low millisecond latencies. Today, we’re excited to present the Distributed Counter Abstraction. This counting service, built on top of the TimeSeries Abstraction, enables distributed counting at scale while maintaining similar low latency performance. As with all our abstractions, we use our Data Gateway Control Plane to shard, configure, and deploy this service globally.
Distributed counting is a challenging problem in computer science. In this blog post, we’ll explore the diverse counting requirements at Netflix, the challenges of achieving accurate counts in near real-time, and the rationale behind our chosen approach, including the necessary trade-offs.
Note: When it comes to distributed counters, terms such as ‘accurate’ or ‘precise’ should be taken with a grain of salt. In this context, they refer to a count very close to accurate, presented with minimal delays.
Use Cases and Requirements
At Netflix, our counting use cases include tracking millions of user interactions, monitoring how often specific features or experiences are shown to users, and counting multiple facets of data during A/B test experiments, among others.
At Netflix, these use cases can be classified into two broad categories:
- Best-Effort: For this category, the count doesn’t have to be very accurate or durable. However, this category requires near-immediate access to the current count at low latencies, all while keeping infrastructure costs to a minimum.
- Eventually Consistent: This category needs accurate and durable counts, and is willing to tolerate a slight delay in accuracy and a slightly higher infrastructure cost as a trade-off.
Both categories share common requirements, such as high throughput and high availability. The table below provides a detailed overview of the diverse requirements across these two categories.
Distributed Counter Abstraction
To meet the outlined requirements, the Counter Abstraction was designed to be highly configurable. It allows users to choose between different counting modes, such as Best-Effort or Eventually Consistent, while considering the documented trade-offs of each option. After selecting a mode, users can interact with APIs without needing to worry about the underlying storage mechanisms and counting methods.
Let’s take a closer look at the structure and functionality of the API.
API
Counters are organized into separate namespaces that users set up for each of their specific use cases. Each namespace can be configured with different parameters, such as Type of Counter, Time-To-Live (TTL), and Counter Cardinality, using the service’s Control Plane.
The Counter Abstraction API resembles Java’s AtomicInteger interface:
AddCount/AddAndGetCount: Adjusts the count for the specified counter by the given delta value within a dataset. The delta value can be positive or negative. The AddAndGetCount counterpart also returns the count after performing the add operation.
{
"namespace": "my_dataset",
"counter_name": "counter123",
"delta": 2,
"idempotency_token": {
"token": "some_event_id",
"generation_time": "2024-10-05T14:48:00Z"
}
}
The idempotency token can be used for counter types that support them. Clients can use this token to safely retry or hedge their requests. Failures in a distributed system are a given, and having the ability to safely retry requests enhances the reliability of the service.
GetCount: Retrieves the count value of the specified counter within a dataset.
{
"namespace": "my_dataset",
"counter_name": "counter123"
}
ClearCount: Effectively resets the count to 0 for the specified counter within a dataset.
{
"namespace": "my_dataset",
"counter_name": "counter456",
"idempotency_token": {...}
}
Now, let’s look at the different types of counters supported within the Abstraction.
Types of Counters
The service primarily supports two types of counters: Best-Effort and Eventually Consistent, along with a third experimental type: Accurate. In the following sections, we’ll describe the different approaches for these types of counters and the trade-offs associated with each.
Best Effort Regional Counter
This type of counter is powered by EVCache, Netflix’s distributed caching solution built on the widely popular Memcached. It is suitable for use cases like A/B experiments, where many concurrent experiments are run for relatively short durations and an approximate count is sufficient. Setting aside the complexities of provisioning, resource allocation, and control plane management, the core of this solution is remarkably straightforward:
// counter cache key
counterCacheKey = <namespace>:<counter_name>
// add operation
return delta > 0
? cache.incr(counterCacheKey, delta, TTL)
: cache.decr(counterCacheKey, Math.abs(delta), TTL);
// get operation
cache.get(counterCacheKey);
// clear counts from all replicas
cache.delete(counterCacheKey, ReplicaPolicy.ALL);
EVCache delivers extremely high throughput at low millisecond latency or better within a single region, enabling a multi-tenant setup within a shared cluster, saving infrastructure costs. However, there are some trade-offs: it lacks cross-region replication for the increment operation and does not provide consistency guarantees, which may be necessary for an accurate count. Additionally, idempotency is not natively supported, making it unsafe to retry or hedge requests.
Edit: A note on probabilistic data structures:
Probabilistic data structures like HyperLogLog (HLL) can be useful for tracking an approximate number of distinct elements, like distinct views or visits to a website, but are not ideally suited for implementing distinct increments and decrements for a given key. Count-Min Sketch (CMS) is an alternative that can be used to adjust the values of keys by a given amount. Data stores like Redis support both HLL and CMS. However, we chose not to pursue this direction for several reasons:
- We chose to build on top of data stores that we already operate at scale.
- Probabilistic data structures do not natively support several of our requirements, such as resetting the count for a given key or having TTLs for counts. Additional data structures, including more sketches, would be needed to support these requirements.
- On the other hand, the EVCache solution is quite simple, requiring minimal lines of code and using natively supported elements. However, it comes at the trade-off of using a small amount of memory per counter key.
Eventually Consistent Global Counter
While some users may accept the limitations of a Best-Effort counter, others opt for precise counts, durability and global availability. In the following sections, we’ll explore various strategies for achieving durable and accurate counts. Our objective is to highlight the challenges inherent in global distributed counting and explain the reasoning behind our chosen approach.
Approach 1: Storing a Single Row per Counter
Let’s start simple by using a single row per counter key within a table in a globally replicated datastore.
Let’s examine some of the drawbacks of this approach:
- Lack of Idempotency: There is no idempotency key baked into the storage data-model preventing users from safely retrying requests. Implementing idempotency would likely require using an external system for such keys, which can further degrade performance or cause race conditions.
- Heavy Contention: To update counts reliably, every writer must perform a Compare-And-Swap operation for a given counter using locks or transactions. Depending on the throughput and concurrency of operations, this can lead to significant contention, heavily impacting performance.
Secondary Keys: One way to reduce contention in this approach would be to use a secondary key, such as a bucket_id, which allows for distributing writes by splitting a given counter into buckets, while enabling reads to aggregate across buckets. The challenge lies in determining the appropriate number of buckets. A static number may still lead to contention with hot keys, while dynamically assigning the number of buckets per counter across millions of counters presents a more complex problem.
Let’s see if we can iterate on our solution to overcome these drawbacks.
Approach 2: Per Instance Aggregation
To address issues of hot keys and contention from writing to the same row in real-time, we could implement a strategy where each instance aggregates the counts in memory and then flushes them to disk at regular intervals. Introducing sufficient jitter to the flush process can further reduce contention.
However, this solution presents a new set of issues:
- Vulnerability to Data Loss: The solution is vulnerable to data loss for all in-memory data during instance failures, restarts, or deployments.
- Inability to Reliably Reset Counts: Due to counting requests being distributed across multiple machines, it is challenging to establish consensus on the exact point in time when a counter reset occurred.
- Lack of Idempotency: Similar to the previous approach, this method does not natively guarantee idempotency. One way to achieve idempotency is by consistently routing the same set of counters to the same instance. However, this approach may introduce additional complexities, such as leader election, and potential challenges with availability and latency in the write path.
That said, this approach may still be suitable in scenarios where these trade-offs are acceptable. However, let’s see if we can address some of these issues with a different event-based approach.
Approach 3: Using Durable Queues
In this approach, we log counter events into a durable queuing system like Apache Kafka to prevent any potential data loss. By creating multiple topic partitions and hashing the counter key to a specific partition, we ensure that the same set of counters are processed by the same set of consumers. This setup simplifies facilitating idempotency checks and resetting counts. Furthermore, by leveraging additional stream processing frameworks such as Kafka Streams or Apache Flink, we can implement windowed aggregations.
However, this approach comes with some challenges:
- Potential Delays: Having the same consumer process all the counts from a given partition can lead to backups and delays, resulting in stale counts.
- Rebalancing Partitions: This approach requires auto-scaling and rebalancing of topic partitions as the cardinality of counters and throughput increases.
Furthermore, all approaches that pre-aggregate counts make it challenging to support two of our requirements for accurate counters:
- Auditing of Counts: Auditing involves extracting data to an offline system for analysis to ensure that increments were applied correctly to reach the final value. This process can also be used to track the provenance of increments. However, auditing becomes infeasible when counts are aggregated without storing the individual increments.
- Potential Recounting: Similar to auditing, if adjustments to increments are necessary and recounting of events within a time window is required, pre-aggregating counts makes this infeasible.
Barring those few requirements, this approach can still be effective if we determine the right way to scale our queue partitions and consumers while maintaining idempotency. However, let’s explore how we can adjust this approach to meet the auditing and recounting requirements.
Approach 4: Event Log of Individual Increments
In this approach, we log each individual counter increment along with its event_time and event_id. The event_id can include the source information of where the increment originated. The combination of event_time and event_id can also serve as the idempotency key for the write.
However, in its simplest form, this approach has several drawbacks:
- Read Latency: Each read request requires scanning all increments for a given counter potentially degrading performance.
- Duplicate Work: Multiple threads might duplicate the effort of aggregating the same set of counters during read operations, leading to wasted effort and subpar resource utilization.
- Wide Partitions: If using a datastore like Apache Cassandra, storing many increments for the same counter could lead to a wide partition, affecting read performance.
- Large Data Footprint: Storing each increment individually could also result in a substantial data footprint over time. Without an efficient data retention strategy, this approach may struggle to scale effectively.
The combined impact of these issues can lead to increased infrastructure costs that may be difficult to justify. However, adopting an event-driven approach seems to be a significant step forward in addressing some of the challenges we’ve encountered and meeting our requirements.
How can we improve this solution further?
Netflix’s Approach
We use a combination of the previous approaches, where we log each counting activity as an event, and continuously aggregate these events in the background using queues and a sliding time window. Additionally, we employ a bucketing strategy to prevent wide partitions. In the following sections, we’ll explore how this approach addresses the previously mentioned drawbacks and meets all our requirements.
Note: From here on, we will use the words “rollup” and “aggregate” interchangeably. They essentially mean the same thing, i.e., collecting individual counter increments/decrements and arriving at the final value.
TimeSeries Event Store:
We chose the TimeSeries Data Abstraction as our event store, where counter mutations are ingested as event records. Some of the benefits of storing events in TimeSeries include:
High-Performance: The TimeSeries abstraction already addresses many of our requirements, including high availability and throughput, reliable and fast performance, and more.
Reducing Code Complexity: We reduce a lot of code complexity in Counter Abstraction by delegating a major portion of the functionality to an existing service.
TimeSeries Abstraction uses Cassandra as the underlying event store, but it can be configured to work with any persistent store. Here is what it looks like:
Handling Wide Partitions: The time_bucket and event_bucket columns play a crucial role in breaking up a wide partition, preventing high-throughput counter events from overwhelming a given partition. For more information regarding this, refer to our previous blog.
No Over-Counting: The event_time, event_id and event_item_key columns form the idempotency key for the events for a given counter, enabling clients to retry safely without the risk of over-counting.
Event Ordering: TimeSeries orders all events in descending order of time allowing us to leverage this property for events like count resets.
Event Retention: The TimeSeries Abstraction includes retention policies to ensure that events are not stored indefinitely, saving disk space and reducing infrastructure costs. Once events have been aggregated and moved to a more cost-effective store for audits, there’s no need to retain them in the primary storage.
Now, let’s see how these events are aggregated for a given counter.
Aggregating Count Events:
As mentioned earlier, collecting all individual increments for every read request would be cost-prohibitive in terms of read performance. Therefore, a background aggregation process is necessary to continually converge counts and ensure optimal read performance.
But how can we safely aggregate count events amidst ongoing write operations?
This is where the concept of Eventually Consistent counts becomes crucial. By intentionally lagging behind the current time by a safe margin, we ensure that aggregation always occurs within an immutable window.
Lets see what that looks like:
Let’s break this down:
- lastRollupTs: This represents the most recent time when the counter value was last aggregated. For a counter being operated for the first time, this timestamp defaults to a reasonable time in the past.
- Immutable Window and Lag: Aggregation can only occur safely within an immutable window that is no longer receiving counter events. The “acceptLimit” parameter of the TimeSeries Abstraction plays a crucial role here, as it rejects incoming events with timestamps beyond this limit. During aggregations, this window is pushed slightly further back to account for clock skews.
This does mean that the counter value will lag behind its most recent update by some margin (typically in the order of seconds). This approach does leave the door open for missed events due to cross-region replication issues. See “Future Work” section at the end.
- Aggregation Process: The rollup process aggregates all events in the aggregation window since the last rollup to arrive at the new value.
Rollup Store:
We save the results of this aggregation in a persistent store. The next aggregation will simply continue from this checkpoint.
We create one such Rollup table per dataset and use Cassandra as our persistent store. However, as you will soon see in the Control Plane section, the Counter service can be configured to work with any persistent store.
LastWriteTs: Every time a given counter receives a write, we also log a last-write-timestamp as a columnar update in this table. This is done using Cassandra’s USING TIMESTAMP feature to predictably apply the Last-Write-Win (LWW) semantics. This timestamp is the same as the event_time for the event. In the subsequent sections, we’ll see how this timestamp is used to keep some counters in active rollup circulation until they have caught up to their latest value.
Rollup Cache
To optimize read performance, these values are cached in EVCache for each counter. We combine the lastRollupCount and lastRollupTs into a single cached value per counter to prevent potential mismatches between the count and its corresponding checkpoint timestamp.
But, how do we know which counters to trigger rollups for? Let’s explore our Write and Read path to understand this better.
Add/Clear Count:
An add or clear count request writes durably to the TimeSeries Abstraction and updates the last-write-timestamp in the Rollup store. If the durability acknowledgement fails, clients can retry their requests with the same idempotency token without the risk of overcounting. Upon durability, we send a fire-and-forget request to trigger the rollup for the request counter.
GetCount:
We return the last rolled-up count as a quick point-read operation, accepting the trade-off of potentially delivering a slightly stale count. We also trigger a rollup during the read operation to advance the last-rollup-timestamp, enhancing the performance of subsequent aggregations. This process also self-remediates a stale count if any previous rollups had failed.
With this approach, the counts continually converge to their latest value. Now, let’s see how we scale this approach to millions of counters and thousands of concurrent operations using our Rollup Pipeline.
Rollup Pipeline:
Each Counter-Rollup server operates a rollup pipeline to efficiently aggregate counts across millions of counters. This is where most of the complexity in Counter Abstraction comes in. In the following sections, we will share key details on how efficient aggregations are achieved.
Light-Weight Roll-Up Event: As seen in our Write and Read paths above, every operation on a counter sends a light-weight event to the Rollup server:
rollupEvent: {
"namespace": "my_dataset",
"counter": "counter123"
}
Note that this event does not include the increment. This is only an indication to the Rollup server that this counter has been accessed and now needs to be aggregated. Knowing exactly which specific counters need to be aggregated prevents scanning the entire event dataset for the purpose of aggregations.
In-Memory Rollup Queues: A given Rollup server instance runs a set of in-memory queues to receive rollup events and parallelize aggregations. In the first version of this service, we settled on using in-memory queues to reduce provisioning complexity, save on infrastructure costs, and make rebalancing the number of queues fairly straightforward. However, this comes with the trade-off of potentially missing rollup events in case of an instance crash. For more details, see the “Stale Counts” section in “Future Work.”
Minimize Duplicate Effort: We use a fast non-cryptographic hash like XXHash to ensure that the same set of counters end up on the same queue. Further, we try to minimize the amount of duplicate aggregation work by having a separate rollup stack that chooses to run fewer beefier instances.
Availability and Race Conditions: Having a single Rollup server instance can minimize duplicate aggregation work but may create availability challenges for triggering rollups. If we choose to horizontally scale the Rollup servers, we allow threads to overwrite rollup values while avoiding any form of distributed locking mechanisms to maintain high availability and performance. This approach remains safe because aggregation occurs within an immutable window. Although the concept of now() may differ between threads, causing rollup values to sometimes fluctuate, the counts will eventually converge to an accurate value within each immutable aggregation window.
Rebalancing Queues: If we need to scale the number of queues, a simple Control Plane configuration update followed by a re-deploy is enough to rebalance the number of queues.
"eventual_counter_config": {
"queue_config": {
"num_queues" : 8, // change to 16 and re-deploy
...
Handling Deployments: During deployments, these queues shut down gracefully, draining all existing events first, while the new Rollup server instance starts up with potentially new queue configurations. There may be a brief period when both the old and new Rollup servers are active, but as mentioned before, this race condition is managed since aggregations occur within immutable windows.
Minimize Rollup Effort: Receiving multiple events for the same counter doesn’t mean rolling it up multiple times. We drain these rollup events into a Set, ensuring a given counter is rolled up only once during a rollup window.
Efficient Aggregation: Each rollup consumer processes a batch of counters simultaneously. Within each batch, it queries the underlying TimeSeries abstraction in parallel to aggregate events within specified time boundaries. The TimeSeries abstraction optimizes these range scans to achieve low millisecond latencies.
Dynamic Batching: The Rollup server dynamically adjusts the number of time partitions that need to be scanned based on cardinality of counters in order to prevent overwhelming the underlying store with many parallel read requests.
Adaptive Back-Pressure: Each consumer waits for one batch to complete before issuing the rollups for the next batch. It adjusts the wait time between batches based on the performance of the previous batch. This approach provides back-pressure during rollups to prevent overwhelming the underlying TimeSeries store.
Handling Convergence:
In order to prevent low-cardinality counters from lagging behind too much and subsequently scanning too many time partitions, they are kept in constant rollup circulation. For high-cardinality counters, continuously circulating them would consume excessive memory in our Rollup queues. This is where the last-write-timestamp mentioned previously plays a crucial role. The Rollup server inspects this timestamp to determine if a given counter needs to be re-queued, ensuring that we continue aggregating until it has fully caught up with the writes.
Now, let’s see how we leverage this counter type to provide an up-to-date current count in near-realtime.
Experimental: Accurate Global Counter
We are experimenting with a slightly modified version of the Eventually Consistent counter. Again, take the term ‘Accurate’ with a grain of salt. The key difference between this type of counter and its counterpart is that the delta, representing the counts since the last-rolled-up timestamp, is computed in real-time.
And then, currentAccurateCount = lastRollupCount + delta
Aggregating this delta in real-time can impact the performance of this operation, depending on the number of events and partitions that need to be scanned to retrieve this delta. The same principle of rolling up in batches applies here to prevent scanning too many partitions in parallel. Conversely, if the counters in this dataset are accessed frequently, the time gap for the delta remains narrow, making this approach of fetching current counts quite effective.
Now, let’s see how all this complexity is managed by having a unified Control Plane configuration.
Control Plane
The Data Gateway Platform Control Plane manages control settings for all abstractions and namespaces, including the Counter Abstraction. Below, is an example of a control plane configuration for a namespace that supports eventually consistent counters with low cardinality:
"persistence_configuration": [
{
"id": "CACHE", // Counter cache config
"scope": "dal=counter",
"physical_storage": {
"type": "EVCACHE", // type of cache storage
"cluster": "evcache_dgw_counter_tier1" // Shared EVCache cluster
}
},
{
"id": "COUNTER_ROLLUP",
"scope": "dal=counter", // Counter abstraction config
"physical_storage": {
"type": "CASSANDRA", // type of Rollup store
"cluster": "cass_dgw_counter_uc1", // physical cluster name
"dataset": "my_dataset_1" // namespace/dataset
},
"counter_cardinality": "LOW", // supported counter cardinality
"config": {
"counter_type": "EVENTUAL", // Type of counter
"eventual_counter_config": { // eventual counter type
"internal_config": {
"queue_config": { // adjust w.r.t cardinality
"num_queues" : 8, // Rollup queues per instance
"coalesce_ms": 10000, // coalesce duration for rollups
"capacity_bytes": 16777216 // allocated memory per queue
},
"rollup_batch_count": 32 // parallelization factor
}
}
}
},
{
"id": "EVENT_STORAGE",
"scope": "dal=ts", // TimeSeries Event store
"physical_storage": {
"type": "CASSANDRA", // persistent store type
"cluster": "cass_dgw_counter_uc1", // physical cluster name
"dataset": "my_dataset_1", // keyspace name
},
"config": {
"time_partition": { // time-partitioning for events
"buckets_per_id": 4, // event buckets within
"seconds_per_bucket": "600", // smaller width for LOW card
"seconds_per_slice": "86400", // width of a time slice table
},
"accept_limit": "5s", // boundary for immutability
},
"lifecycleConfigs": {
"lifecycleConfig": [
{
"type": "retention", // Event retention
"config": {
"close_after": "518400s",
"delete_after": "604800s" // 7 day count event retention
}
}
]
}
}
]
Using such a control plane configuration, we compose multiple abstraction layers using containers deployed on the same host, with each container fetching configuration specific to its scope.
Provisioning
As with the TimeSeries abstraction, our automation uses a bunch of user inputs regarding their workload and cardinalities to arrive at the right set of infrastructure and related control plane configuration. You can learn more about this process in a talk given by one of our stunning colleagues, Joey Lynch : How Netflix optimally provisions infrastructure in the cloud.
Performance
At the time of writing this blog, this service was processing close to 75K count requests/second globally across the different API endpoints and datasets:
while providing single-digit millisecond latencies for all its endpoints:
Future Work
While our system is robust, we still have work to do in making it more reliable and enhancing its features. Some of that work includes:
- Regional Rollups: Cross-region replication issues can result in missed events from other regions. An alternate strategy involves establishing a rollup table for each region, and then tallying them in a global rollup table. A key challenge in this design would be effectively communicating the clearing of the counter across regions.
- Error Detection and Stale Counts: Excessively stale counts can occur if rollup events are lost or if a rollup fails and isn’t retried. This isn’t an issue for frequently accessed counters, as they remain in rollup circulation. This issue is more pronounced for counters that aren’t accessed frequently. Typically, the initial read for such a counter will trigger a rollup, self-remediating the issue. However, for use cases that cannot accept potentially stale initial reads, we plan to implement improved error detection, rollup handoffs, and durable queues for resilient retries.
Conclusion
Distributed counting remains a challenging problem in computer science. In this blog, we explored multiple approaches to implement and deploy a Counting service at scale. While there may be other methods for distributed counting, our goal has been to deliver blazing fast performance at low infrastructure costs while maintaining high availability and providing idempotency guarantees. Along the way, we make various trade-offs to meet the diverse counting requirements at Netflix. We hope you found this blog post insightful.
Stay tuned for Part 3 of Composite Abstractions at Netflix, where we’ll introduce our Graph Abstraction, a new service being built on top of the Key-Value Abstraction and the TimeSeries Abstraction to handle high-throughput, low-latency graphs.
Acknowledgments
Special thanks to our stunning colleagues who contributed to the Counter Abstraction’s success: Joey Lynch, Vinay Chella, Kaidan Fullerton, Tom DeVoe, Mengqing Wang, Varun Khaitan
Netflix’s Distributed Counter Abstraction was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.