P99 CONF 24 Recap: Heckling on the Shoulders of Giants
As I sit here at the hotel breakfast bar contemplating what a remarkable couple of days it’s been for the fourth annual P99 CONF, I feel quite honored to have helped host it. While the coffee is strong and the presentations are fresh in my mind, let’s recap some of the great content we shared and reveal some of the behind-the-scenes efforts that made it all happen. Watch P99 CONF Talks On Demand Day 1 While we warmed up the live hosting stage, we had a fright with Error 1016 origin DNS errors on our event platform. As we scrambled to potentially host live on an alternative platform, Cloudflare saved the day and the show was back on the road. DNS issues weren’t going to stop us from launching P99 CONF! Felipe Cardeneti Mendes got things started in the lounge with hundreds of people asking great questions about ScyllaDB pre-show. Co-founder of ScyllaDB, Dor Laor, opened the show with his keynote about ScyllaDB tablets. In the first few slides we were looking at assembly, then 128 fully utilized CPU cores not long after that. By the end of the presentation, we had throughput north of 1M ops/sec – complete with ScyllaDB’s now famous predictable low latency. To help set the scene of P99 CONF, we heard from Pekka Enberg, CTO of Turso (sorry, I’m the one who overlooked the company name mistake in the original video). Pekka dove into the patterns of low latency. This generated more great conversation in chat. If you want all the details, then his book simply titled Latency is a must-read. Since parallel programming is hard, we opened up 3 stages for you to choose from following the keynotes. Felipe returned, this time as a session speaker. Proving that not all benchmarks need to be institutionalized cheating, he paired with Alan “Dormando” of Memcached to see how ScyllaDB stacks up from a caching perspective. We also heard from Luc Lenôtre, a talented engineer, who toyed with a kernel written in Rust. Luc showed us lots of flame graphs and low-level tuning of Maestro. Continuing with the Rust theme was Amos Wenger in a very interesting look at making HTTP faster with io_uring. There were other great talks from well-known companies. For example, Jason Rahman from Microsoft shared his insights on tracing Linux scheduler behavior using ftrace. Also, Christopher Peck from Uber shared their experience tuning with generational ZGC. This reflects much of the P99 CONF content – real world, production experience taming P99 latencies at scale. Another expanding theme at this year’s P99 CONF eBPF. And who better than Liz Rice from Isovalent to kick it off with her keynote, Zero-overhead Container Networking with eBPF and Netkit. I love listening to Liz explain in technical detail the concepts and benefits of using eBPF and will definitely be reading through her book–Learning eBPF on the long flight home to Australia. By the way, books! There are now so many authors associated with P99 CONF which, I think, is a testament to the quality and professionalism of the speakers. We were giving away book bundles to members of the community who were top contributors in the chat, answering and asking great questions (huge thank you!). Some of the books on offer – which you can grab for yourself – are: Database Performance at Scale – by Felipe Mendes (ScyllaDB) et. al. Latency – by Pekka Enberg (Turso) Think Distributed Systems – by Dominik Tornow (Resonate HQ) Writing for Developers: Blogs that Get Read – by Piotr Sarna (poolside) ScyllaDB in Action – by Bo Ingram (Discord) And if you’re truly a tech bookworm, see this blog post for an extensive reading list: 14 Books by P99 CONF Speakers: Latency, Wasm, Databases & More. By mid-afternoon, day 1, Gunnar Morling from decodable lightened things up with his keynote on creating the 1 billion row challenge. I’m sure you’ve heard of it, and we had another speaker, Shraddha Agrawal following up with her version in Golang. We enjoyed lots more great content in the afternoon, including Piotr Sarna (from poolside AI and co-author of Database Performance at Scale + Writing for Developers) taking us back to the long-standing database theme of the conference with performance perspectives on database drivers. Speaking of themes, Wasm returned with book authors Brian Sletten and Ramnivas Laddad looking at WebAssembly on the edge. And the two Adams from Zoo gave us unique insight into building a remote CAD solution that feels local. And showing that we can finish day 1 just as strong as we started it, Carl Lerche, creator of tokio-rs, returned to P99 CONF for the Day 1 closing keynote. This year, he highlighted how Rust – which is typically used at the infrastructure level for all the features we love, like safety, concurrency, and performance – is also applicable at higher levels in the stack. He also announced the first version of Toasty, an ORM for Rust. Day 2 The second day kicked off with Andy Pavlo from CMU and his take on the tension between the database and operating system, with a unique research project, Tigger, a database proxy that pushes a database into kernel space using eBPF. Leading on from that, we had Bryan Cantrill, CTO of Oxide and creator of DTrace, reviewing DTrace’s 21-year history with plenty of insights into the origins and evolution of this framework. Bryan has presented at every P99 CONF and is one of the many industry giants, that you can5X to 40X Lower DynamoDB Costs— with Better P99 Latency
At our recent events, I’ve been fielding a lot of questions about DynamoDB costs. So, I wanted to highlight the cost comparison aspect of a recent benchmark comparing ScyllaDB and DynamoDB. This involved a detailed price-performance comparison analyzing: How cost compares across both DynamoDB pricing models under various workload conditions How latency compares across this set of workloads I’ll share details below, but here’s a quick summary: ScyllaDB costs are significantly lower in all but one scenario. In realistic workloads, costs would be 5X to 40X lower — with up to 4X better P99 latency. Here’s a consolidated look at how DynamoDB and ScyllaDB compare on a Uniform distribution (DynamoDB’s sweet spot). Now, more details on the cost aspect of this comparison. For a deeper dive into the performance aspect, see this DynamoDB vs ScyllaDB price-performance comparison blog as well as the complete benchmark report. How We Compared DynamoDB Costs vs ScyllaDB Costs For our cost comparisons, we launched a small 3-node cluster in ScyllaDB Cloud and measured performance on a wide range of workload scenarios. Next, we calculated the cost of running the same workloads on DynamoDB. We used an item size of 1081 bytes, which translates to 2 WCUs per write operation and 1 RCU per read operation on DynamoDB. Our working data set size was 1 TB, with an approximate cost of ~$250/month in DynamoDB. We used the same ScyllaDB cluster through every testing scenario, thus simplifying ScyllaDB Cloud costs. Hourly rates (on-demand) were used. As ScyllaDB linearly scales with the amount of resources, you can predictably adjust costs to match your desired performance outcome. Annual pricing provides significant cost reduction but is out of the scope of this benchmark. DynamoDB has two modes for non-annual pricing: provisioned and on-demand pricing. Provisioned mode is recommended if your workloads are reasonably predictable. On-demand pricing is significantly more expensive and is a fit for unpredictable, low-throughput workloads. It is possible to combine modes, add auto-scaling, and so forth. DynamoDB provides considerable flexibility around managing the cost and scale of the aforementioned options, but this also results in considerable complexity. For details on how we calculated costs, refer to the Cost Calculations section at the end of this article. Throughout all tests, we ensured ScyllaDB had spare capacity at all times by keeping its load below 75%. Given that, note that it is possible to achieve higher traffic than the numbers reported here at no additional cost, in turn allowing for additional growth. The number of operations per second that the ScyllaDB cluster performs for each workload is reported under the X axis in the following graphs. Provisioned Cost Comparison: DynamoDB vs ScyllaDB Provisioned mode is recommended if your workloads are reasonably predictable. With DynamoDB, you need to be able to predict per-table capacity following AWS DynamoDB read/write capacity unit pricing model. With just one exception, DynamoDB’s cost estimates were consistently higher than ScyllaDB – and much more so for the most write-heavy workloads. In the 1 out of 15 cases where DynamoDB turned out to be less expensive, ScyllaDB could actually drive more utilization to win over DynamoDB. However, we wanted to keep the results consistent and fair. This is not surprising, given that DynamoDB charges 5X more for writes than for reads, while ScyllaDB does not differentiate between operations, and its pricing is based on the actual cluster size. On-Demand Cost Comparison: DynamoDB vs ScyllaDB On-demand pricing is best when the application’s workload is unclear, the application’s data traffic patterns are unknown, and/or your company prefers a pay-as-you-go option. However, as the results show, the convenience and flexibility of DynamoDB’s on-demand pricing often come at quite a cost. To see how we calculated costs, refer to the Cost Calculations section at the end of this article. Here, the same general trends hold true. ScyllaDB cost is fixed across the board, and its cost advantage grows as write throughput increases. ScyllaDB’s cost advantage over on-demand DynamoDB tables is significantly greater when compared to provisioned capacity on DynamoDB. Why? Because DynamoDB’s on-demand pricing is significantly higher than its provisioned capacity counterpart. Therefore, workloads with unpredictable traffic spikes (which would justify not using provisioned capacity) may easily end up with runaway bills compared to costs with ScyllaDB Cloud. Making ScyllaDB Even More Cost Effective Unlike DynamoDB (where you provision tables), ScyllaDB is provisioned as a cluster, capable of hosting several tables – and therefore consolidating several workloads under a single deployment. Excess hardware capacity may be shared to power more use cases on that cluster. ScyllaDB Cloud and Enterprise users can also use ScyllaDB’s Workload Prioritization to prioritize specific access patterns and further drive consolidation. For example, assume there are 10 use cases that require 100K OPS each. With DynamoDB, users would be forced to allocate a provisioned workload per table or to use the rather expensive on-demand mode. This introduces a set of caveats: If every workload consistently reaches its peak capacity, it will likely get throttled by AWS (provisioned mode), or result in runaway bills (on-demand mode). Likewise, the opposite also holds true: If most workloads are consistently idle, provisioned mode results in non-consumed capacity bills. On-demand mode doesn’t guarantee immediate capacity to support traffic surges. This, in turn, causes your applications to experience some degree of throttling. A standard ScyllaDB deployment is not only more cost effective, but also simplifies management. It allows users to consolidate all workloads within a single cluster and share idle capacity among them. With ScyllaDB Cloud and Enterprise, users further have the flexibility to define priorities on a per-workload basis, allowing the database to make more informed decisions when two or more workloads compete against each other for resources. Cost Calculation Details Here’s how we calculated costs across the different databases and pricing models. DynamoDB Cost Calculations Provisioned Costs for DynamoDB With DynamoDB’s provisioned capacity mode, planning is required. You specify the number of reads and writes per second that you expect your application to require. You can make use of auto-scaling to automatically adjust your table’s capacity based on a specific utilization target in order to sustain spikes outside of your projected planning. In provisioned mode, you need to provision DynamoDB with the expected throughput. You set WCUs (Write Capacity Units) and RCUs (Read Capacity Units) which signify the allowed number of write and read operations per second, respectively. They are priced per hour. One WCU is $0.00065 per hour One RCU is $0.00013 per hour This yields the following formulas for calculating monthly costs: On-Demand Costs for DynamoDB With DynamoDB’s on-demand mode, no planning is required. You pay for the actual reads/writes that your application is using (the total number of actual writes or reads, not writes or reads per second). In this mode, you pay by usage and the cost is per request unit (rather than per capacity unit, as in the provisioned mode). AWS charges $1.25 per million write request units (WRU) and $0.25 per million read request units (RRU). Therefore, a single request unit costs 1 millionth of the actual write/read operation, as follows: One WRU is $0.00000125 per write One RRU is $0.00000025 per read This yields the following formulas for calculating monthly costs: ScyllaDB Cost Calculations As stated previously, we used ScyllaDB’s on-demand pricing for all cost comparisons in this study. ScyllaDB’s on-demand costs were determined using our online pricing calculator as follows: From the ScyllaDB Pricing Calculator This calculator estimates the size and cost of a ScyllaDB Cloud cluster based on the specified technical requirements around throughput and item/data size. Note that in ScyllaDB, the primary aspect driving costs is the cluster size, unlike DynamoDB’s model on the volume of reads and writes. Once the cluster size is determined, the deployment can often exceed throughput requirements. For comparison, DynamoDB’s provisioned pricing structure requires users to explicitly specify sustained and peak throughput. Overprovisioning equivalent performance in DynamoDB would be significantly pricier compared to ScyllaDB. Without an annual commitment for cost savings, the estimated annual cost for the ScyllaDB Cloud cluster is $54,528, calculated at a monthly rate of $4,544. Conclusion As the results indicate, what might begin at a seemingly reasonable cost can quickly escalate into “bill shock” with DynamoDB – especially at high throughputs, and particularly with write-heavy workloads. This makes DynamoDB a suboptimal choice for data-intensive applications anticipating steady or rapid growth. ScyllaDB’s significantly lower costs – a reflection of ScyllaDB taking full advantage of modern infrastructure for high throughput and low latency – make it a more cost-effective solution for data-intensive applications. ScyllaDB – with its LSM-tree-based storage, unified caching, shard-per-core design, and advanced schedulers – allows you to maximize the advantages of modern hardware, from huge CPU chips to blazing-fast NVMe. Beyond the presented cost savings, ScyllaDB sustains 2X peaks and provides 2X-4X better P99 latency. Additionally, it can further reduce latency when idle – or enable spare resources to be shared across multiple tables. For larger workloads spanning 500K-1M OPS and beyond, this can result in a cost saving in the millions – with better performance and fewer query limitations.Why ScyllaDB’s Shard Per Core Architecture Matters
3 perspectives on what shard-per-core involves and why it matters for teams who care about database performance. Also … puppies! ScyllaDB is best known for 3 things: 1) Predictable database performance at scale 2) A shard-per-core architecture 3) Cute sea monsters The monster’s cuteness speaks for itself – especially if you’ve seen the plushie version. But a little more explanation is often required to communicate what’s so special about the shard-per-core architecture, and how it contributes to ScyllaDB’s predictable performance at scale. That’s what we’ll cover in this article. Below are three different perspectives on what shard-per-core involves and why it matters for teams who care about database performance: How Dor Laor (ScyllaDB Co-founder and CEO) first introduced the concept back when ScyllaDB launched in 2015 The ScyllaDB power user perspective by Bo Ingram, author of ScyllaDB in Action and the monstrously popular ScyllaDB migration blog A more detailed look under the hood, explained by Tzach Livyatan (VP of Product) earlier this year at ScyllaDB Summit Bonus: We’ll bring some puppies into the mix since (some) puppies are just as cute as the ScyllaDB monster. The ScyllaDB Sea Monster with Baunilha Dor Laor’s 2015 introduction: independent, lock-free processing across server cores From a 2015 ScyllaDB feature on ZDNet “Everybody does sharding in a cluster but usually the granularity is per server. We do it per core. The result is each of the shards within the server is fully independent, so there’s no need for locking. there are no regular threads that you need to protect your data structures. The independence of the shard means there is no contention, with each one managing its own resources, whether that is a networking card, memory or the CPU itself. Each shard has its own CPU, its own memory – it’s local to that node so it’s multi-socket friendly – that’s NUMA-friendly [non-uniform memory access]. It’s expensive to access memory between one core and one socket with memory that belongs to another socket. Within a server, we have lots of such shards – exactly the amount of the x86 cores that exist in that server. The bigger the server grows with the newer CPUs that Intel produces, the better for us because we scale up linearly. In the relations between the cores, everything is independent.” Bo Ingram’s power user’s perspective: predictable low latencies From Bo’s new book, “ScyllaDB in Action” “ScyllaDB’s biggest architectural difference is its shard-per-core architecture. Both Cassandra and ScyllaDB shard a data set across its various nodes via placement in a hash ring. ScyllaDB takes this further by leveraging the Seastar framework (https://seastar.io/) to shard data within a node, splitting it up per CPU-core and giving each shard its own CPU, memory, and network bandwidth allocation. Cassandra does not follow this paradigm, however, and limits the sharding to only per node. If a data partition gets a large amount of requests, it can overwhelm the node, leading to cluster-wide struggles. Performance justifies the rewrite. Both in benchmarks (https://thenewstack.io/benchmarking-apache-cassandra-40-nodes-vs-scylladb-4-nodes/) and in the wild (https://discord.com/blog/how-discord-stores-trillions-of-messages), ScyllaDB is faster, more consistent, and requires fewer servers to operate than Cassandra.” Read more from ScyllaDB In Action – Free Tzach Livyatan: Unraveling the threads towards linear scalability From Tzach Livyatan’s keynote, “A Deep Dive into ScyllaDB’s Architecture” “ScyllaDB was designed with performance in mind – in particular, high throughput and low latency. The shard-per-core design is fundamental for that. Systems that aren’t as obsessed with performance tend to use a thread pool, with many threads competing for the same resources. At any given second, threads might be trying to get to memory, trying to get to disk, or trying to do something that requires synchronization – and they get blocked. The CPU will then context switch. When we profiled other databases, we found that this synchronization between the threads is often responsible for consuming the majority of the resources. ScyllaDB takes a different approach, with a shard per core architecture. Each thread is pinned to a specific core. And each thread is assigned its own designated chunk of memory, its own designated chunk of network, and its own designated chunk of storage. As a result, there’s minimal interaction between the cores, and each can run independently in a very efficient way: it never context switches, it never waits. I think this is probably the most important design decision behind the high performance that users get from ScyllaDB. It also allows ScyllaDB to scale linearly with the number of cores. If you deploy ScyllaDB on 16 cores, and then on 32 cores, you get exactly twice the performance. And if you double the cores again, you again get double the performance. Since each core is an independent shard, there is virtually no interaction between the cores and you can scale with the number of cores in a very efficient way.” See Tzach’s complete ScyllaDB architecture deep dive here: And now… puppies If you’ve ever fed a group of puppies, you’ll recognize the top image here. There are 6 bowls of dog food and 6 puppies. But multiple puppies are fighting over a couple bowls of food, and a few bowls of food are totally empty. Food is spilled all over the ground as a result of the fights. That’s like the shared thread architecture used by other systems, like Cassandra. When a job needs to be done, it’s basically thrown down to a thread pool, and a thread takes the job. However, the threads bump into each other – like our puppies. In the systems case, that could cause problems such as latency spikes. In the lower image, you can see each puppy is happily eating from its own bowl of food. There’s no fighting and no waste. Similarly, ScyllaDB’s shard-per core architecture eliminates all that contention for resources. We take all of the system resources and all of the data and split it up evenly, based on the number of cores. Just like each puppy has its own portion of food, each shard has its own dedicated RAM, its own network, its own I/O, and its own piece of the data. Coming soon…quantifying the impact of a shard-per-core architecture Almost a year ago to the day, Dor Laor kicked off P99 CONF 23 with a shard-per-core deep dive that people are still talking about. His teaser: Most software isn’t architected to take advantage of modern hardware. How does a shard-per-code and shared-nothing architecture help – and exactly what impact can it make? I will examine technical opportunities and tradeoffs, as well as share the results of a new benchmark study. To give that talk the depth it deserves, we’ll write it up in a dedicated article. So stay tuned if you prefer to read. If you can’t wait, we invite you to watch it now. See Dor’s Shard-Per-Core Keynote Join us for P99 CONF 24 – Oct 23 and 24Book Excerpt: ScyllaDB versus Other Databases
How does ScyllaDB compare to other databases? Read what Bo Ingram (Staff Engineer at Discord) has to say – in this excerpt from his new book “ScyllaDB in Action.” Editor’s note We’re thrilled to share the following excerpt from Bo Ingram’s informative – and fun! – new book on ScyllaDB: ScyllaDB in Action. It’s available now via Manning and Amazon. You can also access a 3-chapter excerpt for free, compliments of ScyllaDB. Get the first 3 book chapters, free You might have already experienced Bo’s expertise and engaging communication style in his blog How Discord Stores Trillions of Messages or ScyllaDB Summit talks How Discord Migrated Trillions of Messages from Cassandra to ScyllaDB and So You’ve Lost Quorum: Lessons From Accidental Downtime If not, you should 😉 And if you want to learn more from Bo, join him at our upcoming Masterclass: Data Modeling for Performance Masterclass. We’ve ordered boxes of print books and will be giving them out! Join Bo at the “Data Modeling for Performance” Masterclass This blog post shares how ScyllaDB compares to: Relational databases Apache Cassandra Amazon DynamoDB Google Cloud Spanner MongoDB Distributed relational databases (CockroachDB, TiDB, and YugabyteDB) The following is an excerpt from Chapter 1; it’s reprinted here with permission of the publisher. ScyllaDB versus relational databases ScyllaDB runs multiple nodes, making it a distributed system. By spreading its data across its deployment, it uses that to achieve its desired availability and consistency, which, when combined, differentiates the database from other systems. I’ve introduced ScyllaDB by describing its features in comparison with relational databases, but we’ll examine in closer detail here the differences. Relational databases such as PostgreSQL and MySQL are the standard for data storage in software applications, and they’re almost always the default choice for a new developer looking to build an application. Relational databases are a very strong option for many use cases, but that doesn’t mean they’re a strong option for every use case. ScyllaDB is a distributed NoSQL database. By distributing data across a cluster, ScyllaDB unlocks better availability when nodes go awry than a single-node all-or-nothing relational database. PostgreSQL and MySQL can run in a distributed mode, but that is either powered through extensions or newer storage engines and not the primary native mode of the database. This distribution is native to ScyllaDB and the bedrock of its design. By running as a distributed system, ScyllaDB empowers horizontal scalability. Many relational databases are only vertically scalable – you can only add more resources by running it on a bigger server. With horizontal scalability, you can add additional nodes to a system to increase its capacity. ScyllaDB supports this expansion; administrators can add more nodes, and the cluster will rebalance itself, offloading data to the new cluster member. In a relational database, horizontal scaling is possible, but it’s often manual. Operators need to manually shard data between multiple nodes to achieve this behavior. ScyllaDB does not provide a relational database’s ACID (atomicity, consistency, isolation, and durability) guarantees, instead opting for a softer model called BASE (Basic Availability, Soft-state, and Eventual consistency), where the database has basic availability and is eventually consistent. This decision leads to faster writes than a relational database, which has to validate the consistency of the database after every write, whereas ScyllaDB only needs to save the write since it doesn’t promise that degree of correctness. The tradeoff, though, is that developers need to consider ScyllaDB’s weaker consistency. … Ultimately, ScyllaDB versus relational databases is a foundational and philosophical decision. They operate so differently and provide such varying guarantees to their clients that picking one over the other has large effects on an application. If you’re looking for availability and scalability in your database, ScyllaDB is a strong option. ScyllaDB versus Cassandra ScyllaDB is a rewrite of Apache Cassandra. It is frequently described as “a more performant Cassandra” or “Cassandra but in C++”. ScyllaDB is designed to be compatible with Cassandra: it uses a compatible API, query language, on-disk storage format, and hash ring architecture. Like Cassandra, but better, is ScyllaDB’s goal; it makes some improvements to accomplish this. The choice of language in the rewrite immediately unlocks better performance. Cassandra is written in Java, which leverages a garbage collector to perform memory management. Because objects get loaded into memory, at some point, they will need to be removed. Java’s garbage collection algorithms handle this removal, but it comes at the cost of compute. Time spent garbage collecting is time Cassandra can’t spend executing queries. If garbage collection reaches a certain threshold, the Java Virtual Machine will pause all execution for a brief time while it cleans up memory, referred to as a “stop the world” pause. Even if it’s just for milliseconds, that pause can be painful to clients. Although Java exposes many configuration knobs and improves the garbage collector with each release, it’s a tax that all Java-based applications have to pay — whether in garbage collection time or time spent mitigating it. ScyllaDB avoids this tax because it is implemented in C++ and provides more granular controls for memory management. By having full control of memory allocation and cleanup, ScyllaDB doesn’t need to let a garbage collector perform this functionality on an application-wide scale. It avoids “stop the world” pauses and can dedicate its compute time to executing queries. ScyllaDB’s biggest architectural difference is its shard-per-core architecture (figure 1.9). Both Cassandra and ScyllaDB shard a data set across its various nodes via placement in a hash ring, which you’ll learn more about in chapter 3. ScyllaDB takes this further by leveraging the Seastar framework (https://seastar.io/) to shard data within a node, splitting it up per CPU-core and giving each shard its own CPU, memory, and network bandwidth allocation. Figure 1.9 ScyllaDB shards data not only within the cluster, but also within each instance. This sharding further limits the blast radius due to hot traffic patterns – the damage is limited to just that shard on that node. Cassandra does not follow this paradigm, however, and limits the sharding to only per node. If a data partition gets a large amount of requests, it can overwhelm the node, leading to cluster-wide struggles. Performance justifies the rewrite. Both in benchmarks (https://thenewstack.io/benchmarking-apache-cassandra-40-nodes-vs-scylladb-4-nodes/) and in the wild (https://discord.com/blog/how-discord-stores-trillions-of-messages), Scylladb is faster, more consistent, and requires fewer servers to operate than Cassandra. ScyllaDB versus Amazon Aurora / Google Cloud Google / Spanner AlloyDB I’ve lumped a few similar systems together here – Amazon Aurora, Amazon DynamoDB, Google Cloud Spanner, and Google AlloyDB. They can be generally described as scalable cloud-hosted databases. They aim to take a relational data model and provide greater scalability than out-of-the-box PostgreSQL or MySQL. This effort accentuates a need in the market for scalable databases, showing the value of ScyllaDB. These systems have two related drawbacks – cloud vendor lock-in and cost. As cloud providers provide these databases, they run in only that specific vendor’s cloud environment. You can’t run Google Cloud Spanner in Amazon Web Services. If your application is heavily dependent on one of these systems, there can be a high engineering cost if you decide to switch cloud providers, as you’ll need to migrate data into a different system with a potentially different storage paradigm. If you’re not using that provider (or any provider), these options aren’t even on the table for you. And by using a cloud provider, companies pay money for these services. Operating and maintaining a database is challenging (which is partly why you’re reading this book), and although these cloud vendors provide solutions to make it potentially simpler, that can get quite expensive for clients. Of course, operating a database yourself can also be costly. ScyllaDB, however, can be run anywhere. Companies are running it on-premises or within various cloud providers. It provides a scalable and fault-tolerance database that you can take to any hosting solution. ScyllaDB versus document stores I’m not talking about Google Drive here, but instead, databases that store unstructured documents by a given key, such as MongoDB. These systems support querying these documents, allowing users to access arbitrary document fields without defining a database schema. ScyllaDB eschews this flexibility to provide (relatively) predictable performance. By requiring users to define their schema up front, it clarifies to both users and the system how data is distributed across the cluster. By forcing users to query data in patterns that match this distribution, ScyllaDB can limit the number of nodes involved in a query, preventing surprisingly expensive queries. Document stores, on the other hand, tend to bias toward initial ease of use. In MongoDB, no schema definition is required, but users still need to consider the design of their data to query it effectively. MongoDB runs as a distributed system, but unlike ScyllaDB, it doesn’t out-of-the-box attempt to minimize inefficient queries that hit more than the expected number of nodes, leading to potential performance surprises. In the CAP theorem, MongoDB is a CP (consistent and partition-tolerant) system. Writes require the presence of a primary node and are blocked until a new primary is elected in the event of a network partition. ScyllaDB, however, prioritizes availability in its query path, keeping the system up and relying on its tunable consistency. ScyllaDB versus distributed relational databases One interesting development for databases over the past few years has been the growth of distributed transactional databases. These systems — such as CockroachDB, TiDB, and YugabyteDB — focus on improving the availability of a traditional relational database like PostgreSQL while still offering strong consistency. In the CAP theorem’s classifications, they’re CP systems; they prefer consistency over availability. By emphasizing correctness, they need a quorum of nodes to respond to successfully complete a query; if quorum is lost, the database loses availability. ScyllaDB, however, provides tunable consistency to dodge this problem. By allowing weaker consistency levels, such as ONE, Scylla can handle a greater loss of availability to preserve functionality. In a relational database, writes are the computationally intensive operation. The database needs to validate its consistency on every write. Scylla, on the other hand, skips this verification, opting for speed and simplicity when writing data. The tradeoff, however, is that reads in Scylla will be slower than writes, as you need to gather data from multiple nodes that have data stored in different places on disk. You’ll learn a lot more about this behavior in chapters 6 and 7, but the big takeaway is that writes in Scylla will be faster than in these systems. When to prefer other databases I’ve described ScyllaDB’s benefits relative to other databases, but sometimes, I admit, it’s not the best tool for the job. I can’t describe it as a unique database because of the Cassandra rewrite approach, but it does trade operational and design complexity for more graceful failure modes. Choosing Scylla requires you to design applications differently because it has specific data-modeling needs to best use its capabilities and adds more complexity than something like a cloud-hosted PostgreSQL server. If you don’t need ScyllaDB’s horizontal scalability and nuanced availability, the increased operational overhead might not be worth it. If your application is small, makes few requests, and isn’t expected to grow over time, ScyllaDB might be overkill. A database backing comments on your blog probably doesn’t need a ScyllaDB cluster, unless, like many of us, you’re wanting that as an excuse to try it out. Operating and maintaining a ScyllaDB cluster isn’t a hands-off exercise. If you can’t dedicate time to operating and maintaining a cluster, that is another signal that a managed offering might be preferable for you. Teams must choose wisely about how they spend their time and their money on what they do; choosing a less hands-on is a valid decision. One thing you’ll see about Scylla in upcoming chapters is that, with data modeling, it can be inflexible to change your database’s design. Adding new query patterns that don’t fit in with your initial design can be challenging. While there are ways to work around it, other databases can potentially give you more flexibility when you’re in the prototyping and learning stage of building features for an application. Lastly, some use cases might prefer a stronger transactional model like ACID. If you’re working with financial data, you might want to use a relational database so that you can have isolation in your operations. One popular example to demonstrate the importance of ACID transactions is concurrent access to bank accounts. Without isolation, you run the risk of concurrent operations causing a mismatch between how much money the database thinks you have and how much money you actually have. Accountants traditionally prefer accuracy in these areas, so you might prefer a relational database when working with something that needs stronger database transactions. While scaling a relational database has its challenges, they might be preferable to take on than surrendering ACID’s guarantees. Scylla can get closer to ACID through careful design and usage of some more advanced features you’ll learn about in chapter 6, but it’s not quite as an “out-of-the-box” experience as a relational database. Get the first 3 book chapters, free Join Bo at the “Data Modeling for Performance” MasterclassIntroducing Netflix’s TimeSeries Data Abstraction Layer
By Rajiv Shringi, Vinay Chella, Kaidan Fullerton, Oleksii Tkachuk, Joey Lynch
Introduction
As Netflix continues to expand and diversify into various sectors like Video on Demand and Gaming, the ability to ingest and store vast amounts of temporal data — often reaching petabytes — with millisecond access latency has become increasingly vital. In previous blog posts, we introduced the Key-Value Data Abstraction Layer and the Data Gateway Platform, both of which are integral to Netflix’s data architecture. The Key-Value Abstraction offers a flexible, scalable solution for storing and accessing structured key-value data, while the Data Gateway Platform provides essential infrastructure for protecting, configuring, and deploying the data tier.
Building on these foundational abstractions, we developed the TimeSeries Abstraction — a versatile and scalable solution designed to efficiently store and query large volumes of temporal event data with low millisecond latencies, all in a cost-effective manner across various use cases.
In this post, we will delve into the architecture, design principles, and real-world applications of the TimeSeries Abstraction, demonstrating how it enhances our platform’s ability to manage temporal data at scale.
Note: Contrary to what the name may suggest, this system is not built as a general-purpose time series database. We do not use it for metrics, histograms, timers, or any such near-real time analytics use case. Those use cases are well served by the Netflix Atlas telemetry system. Instead, we focus on addressing the challenge of storing and accessing extremely high-throughput, immutable temporal event data in a low-latency and cost-efficient manner.
Challenges
At Netflix, temporal data is continuously generated and utilized, whether from user interactions like video-play events, asset impressions, or complex micro-service network activities. Effectively managing this data at scale to extract valuable insights is crucial for ensuring optimal user experiences and system reliability.
However, storing and querying such data presents a unique set of challenges:
- High Throughput: Managing up to 10 million writes per second while maintaining high availability.
- Efficient Querying in Large Datasets: Storing petabytes of data while ensuring primary key reads return results within low double-digit milliseconds, and supporting searches and aggregations across multiple secondary attributes.
- Global Reads and Writes: Facilitating read and write operations from anywhere in the world with adjustable consistency models.
- Tunable Configuration: Offering the ability to partition datasets in either a single-tenant or multi-tenant datastore, with options to adjust various dataset aspects such as retention and consistency.
- Handling Bursty Traffic: Managing significant traffic spikes during high-demand events, such as new content launches or regional failovers.
- Cost Efficiency: Reducing the cost per byte and per operation to optimize long-term retention while minimizing infrastructure expenses, which can amount to millions of dollars for Netflix.
TimeSeries Abstraction
The TimeSeries Abstraction was developed to meet these requirements, built around the following core design principles:
- Partitioned Data: Data is partitioned using a unique temporal partitioning strategy combined with an event bucketing approach to efficiently manage bursty workloads and streamline queries.
- Flexible Storage: The service is designed to integrate with various storage backends, including Apache Cassandra and Elasticsearch, allowing Netflix to customize storage solutions based on specific use case requirements.
- Configurability: TimeSeries offers a range of tunable options for each dataset, providing the flexibility needed to accommodate a wide array of use cases.
- Scalability: The architecture supports both horizontal and vertical scaling, enabling the system to handle increasing throughput and data volumes as Netflix expands its user base and services.
- Sharded Infrastructure: Leveraging the Data Gateway Platform, we can deploy single-tenant and/or multi-tenant infrastructure with the necessary access and traffic isolation.
Let’s dive into the various aspects of this abstraction.
Data Model
We follow a unique event data model that encapsulates all the data we want to capture for events, while allowing us to query them efficiently.
Let’s start with the smallest unit of data in the abstraction and work our way up.
- Event Item: An event item is a key-value pair that users use to store data for a given event. For example: {“device_type”: “ios”}.
- Event: An event is a structured collection of one or more such event items. An event occurs at a specific point in time and is identified by a client-generated timestamp and an event identifier (such as a UUID). This combination of event_time and event_id also forms part of the unique idempotency key for the event, enabling users to safely retry requests.
- Time Series ID: A time_series_id is a collection of one or more such events over the dataset’s retention period. For instance, a device_id would store all events occurring for a given device over the retention period. All events are immutable, and the TimeSeries service only ever appends events to a given time series ID.
- Namespace: A namespace is a collection of time series IDs and event data, representing the complete TimeSeries dataset. Users can create one or more namespaces for each of their use cases. The abstraction applies various tunable options at the namespace level, which we will discuss further when we explore the service’s control plane.
API
The abstraction provides the following APIs to interact with the event data.
WriteEventRecordsSync: This endpoint writes a batch of events and sends back a durability acknowledgement to the client. This is used in cases where users require a guarantee of durability.
WriteEventRecords: This is the fire-and-forget version of the above endpoint. It enqueues a batch of events without the durability acknowledgement. This is used in cases like logging or tracing, where users care more about throughput and can tolerate a small amount of data loss.
{
"namespace": "my_dataset",
"events": [
{
"timeSeriesId": "profile100",
"eventTime": "2024-10-03T21:24:23.988Z",
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventItems": [
{
"eventItemKey": "deviceType",
"eventItemValue": "aW9z"
},
{
"eventItemKey": "deviceMetadata",
"eventItemValue": "c29tZSBtZXRhZGF0YQ=="
}
]
},
{
"timeSeriesId": "profile100",
"eventTime": "2024-10-03T21:23:30.000Z",
"eventId": "123e4567-e89b-12d3-a456-426614174000",
"eventItems": [
{
"eventItemKey": "deviceType",
"eventItemValue": "YW5kcm9pZA=="
}
]
}
]
}
ReadEventRecords: Given a combination of a namespace, a timeSeriesId, a timeInterval, and optional eventFilters, this endpoint returns all the matching events, sorted descending by event_time, with low millisecond latency.
{
"namespace": "my_dataset",
"timeSeriesId": "profile100",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"eventFilters": [
{
"matchEventItemKey": "deviceType",
"matchEventItemValue": "aW9z"
}
],
"pageSize": 100,
"totalRecordLimit": 1000
}
SearchEventRecords: Given a search criteria and a time interval, this endpoint returns all the matching events. These use cases are fine with eventually consistent reads.
{
"namespace": "my_dataset",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"searchQuery": {
"booleanQuery": {
"searchQuery": [
{
"equals": {
"eventItemKey": "deviceType",
"eventItemValue": "aW9z"
}
},
{
"range": {
"eventItemKey": "deviceRegistrationTimestamp",
"lowerBound": {
"eventItemValue": "MjAyNC0xMC0wMlQwMDowMDowMC4wMDBa",
"inclusive": true
},
"upperBound": {
"eventItemValue": "MjAyNC0xMC0wM1QwMDowMDowMC4wMDBa"
}
}
}
],
"operator": "AND"
}
},
"pageSize": 100,
"totalRecordLimit": 1000
}
AggregateEventRecords: Given a search criteria and an aggregation mode (e.g. DistinctAggregation) , this endpoint performs the given aggregation within a given time interval. Similar to the Search endpoint, users can tolerate eventual consistency and a potentially higher latency (in seconds).
{
"namespace": "my_dataset",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"searchQuery": {...some search criteria...},
"aggregationQuery": {
"distinct": {
"eventItemKey": "deviceType",
"pageSize": 100
}
}
}
In the subsequent sections, we will talk about how we interact with this data at the storage layer.
Storage Layer
The storage layer for TimeSeries comprises a primary data store and an optional index data store. The primary data store ensures data durability during writes and is used for primary read operations, while the index data store is utilized for search and aggregate operations. At Netflix, Apache Cassandra is the preferred choice for storing durable data in high-throughput scenarios, while Elasticsearch is the preferred data store for indexing. However, similar to our approach with the API, the storage layer is not tightly coupled to these specific data stores. Instead, we define storage API contracts that must be fulfilled, allowing us the flexibility to replace the underlying data stores as needed.
Primary Datastore
In this section, we will talk about how we leverage Apache Cassandra for TimeSeries use cases.
Partitioning Scheme
At Netflix’s scale, the continuous influx of event data can quickly overwhelm traditional databases. Temporal partitioning addresses this challenge by dividing the data into manageable chunks based on time intervals, such as hourly, daily, or monthly windows. This approach enables efficient querying of specific time ranges without the need to scan the entire dataset. It also allows Netflix to archive, compress, or delete older data efficiently, optimizing both storage and query performance. Additionally, this partitioning mitigates the performance issues typically associated with wide partitions in Cassandra. By employing this strategy, we can operate at much higher disk utilization, as it reduces the need to reserve large amounts of disk space for compactions, thereby saving costs.
Here is what it looks like :
Time Slice: A time slice is the unit of data retention and maps directly to a Cassandra table. We create multiple such time slices, each covering a specific interval of time. An event lands in one of these slices based on the event_time. These slices are joined with no time gaps in between, with operations being start-inclusive and end-exclusive, ensuring that all data lands in one of the slices. By utilizing these time slices, we can efficiently implement retention by dropping entire tables, which reduces storage space and saves on costs.
Why not use row-based Time-To-Live (TTL)?
Using TTL on individual events would generate a significant number of tombstones in Cassandra, degrading performance, especially during range scans. By employing discrete time slices and dropping them, we avoid the tombstone issue entirely. The tradeoff is that data may be retained slightly longer than necessary, as an entire table’s time range must fall outside the retention window before it can be dropped. Additionally, TTLs are difficult to adjust later, whereas TimeSeries can extend the dataset retention instantly with a single control plane operation.
Time Buckets: Within a time slice, data is further partitioned into time buckets. This facilitates effective range scans by allowing us to target specific time buckets for a given query range. The tradeoff is that if a user wants to read the entire range of data over a large time period, we must scan many partitions. We mitigate potential latency by scanning these partitions in parallel and aggregating the data at the end. In most cases, the advantage of targeting smaller data subsets outweighs the read amplification from these scatter-gather operations. Typically, users read a smaller subset of data rather than the entire retention range.
Event Buckets: To manage extremely high-throughput write operations, which may result in a burst of writes for a given time series within a short period, we further divide the time bucket into event buckets. This prevents overloading the same partition for a given time range and also reduces partition sizes further, albeit with a slight increase in read amplification.
Note: With Cassandra 4.x onwards, we notice a substantial improvement in the performance of scanning a range of data in a wide partition. See Future Enhancements at the end to see the Dynamic Event bucketing work that aims to take advantage of this.
Storage Tables
We use two kinds of tables
- Data tables: These are the time slices that store the actual event data.
- Metadata table: This table stores information about how each time slice is configured per namespace.
Data tables
The partition key enables splitting events for a time_series_id over a range of time_bucket(s) and event_bucket(s), thus mitigating hot partitions, while the clustering key allows us to keep data sorted on disk in the order we almost always want to read it. The value_metadata column stores metadata for the event_item_value such as compression.
Writing to the data table:
User writes will land in a given time slice, time bucket, and event bucket as a factor of the event_time attached to the event. This factor is dictated by the control plane configuration of a given namespace.
For example:
During this process, the writer makes decisions on how to handle the data before writing, such as whether to compress it. The value_metadata column records any such post-processing actions, ensuring that the reader can accurately interpret the data.
Reading from the data table:
The below illustration depicts at a high-level on how we scatter-gather the reads from multiple partitions and join the result set at the end to return the final result.
Metadata table
This table stores the configuration data about the time slices for a given namespace.
Note the following:
- No Time Gaps: The end_time of a given time slice overlaps with the start_time of the next time slice, ensuring all events find a home.
- Retention: The status indicates which tables fall inside and outside of the retention window.
- Flexible: This metadata can be adjusted per time slice, allowing us to tune the partition settings of future time slices based on observed data patterns in the current time slice.
There is a lot more information that can be stored into the metadata column (e.g., compaction settings for the table), but we only show the partition settings here for brevity.
Index Datastore
To support secondary access patterns via non-primary key attributes, we index data into Elasticsearch. Users can configure a list of attributes per namespace that they wish to search and/or aggregate data on. The service extracts these fields from events as they stream in, indexing the resultant documents into Elasticsearch. Depending on the throughput, we may use Elasticsearch as a reverse index, retrieving the full data from Cassandra, or we may store the entire source data directly in Elasticsearch.
Note: Again, users are never directly exposed to Elasticsearch, just like they are not directly exposed to Cassandra. Instead, they interact with the Search and Aggregate API endpoints that translate a given query to that needed for the underlying datastore.
In the next section, we will talk about how we configure these data stores for different datasets.
Control Plane
The data plane is responsible for executing the read and write operations, while the control plane configures every aspect of a namespace’s behavior. The data plane communicates with the TimeSeries control stack, which manages this configuration information. In turn, the TimeSeries control stack interacts with a sharded Data Gateway Platform Control Plane that oversees control configurations for all abstractions and namespaces.
Separating the responsibilities of the data plane and control plane helps maintain the high availability of our data plane, as the control plane takes on tasks that may require some form of schema consensus from the underlying data stores.
Namespace Configuration
The below configuration snippet demonstrates the immense flexibility of the service and how we can tune several things per namespace using our control plane.
"persistence_configuration": [
{
"id": "PRIMARY_STORAGE",
"physical_storage": {
"type": "CASSANDRA", // type of primary storage
"cluster": "cass_dgw_ts_tracing", // physical cluster name
"dataset": "tracing_default" // maps to the keyspace
},
"config": {
"timePartition": {
"secondsPerTimeSlice": "129600", // width of a time slice
"secondPerTimeBucket": "3600", // width of a time bucket
"eventBuckets": 4 // how many event buckets within
},
"queueBuffering": {
"coalesce": "1s", // how long to coalesce writes
"bufferCapacity": 4194304 // queue capacity in bytes
},
"consistencyScope": "LOCAL", // single-region/multi-region
"consistencyTarget": "EVENTUAL", // read/write consistency
"acceptLimit": "129600s" // how far back writes are allowed
},
"lifecycleConfigs": {
"lifecycleConfig": [ // Primary store data retention
{
"type": "retention",
"config": {
"close_after": "1296000s", // close for reads/writes
"delete_after": "1382400s" // drop time slice
}
}
]
}
},
{
"id": "INDEX_STORAGE",
"physicalStorage": {
"type": "ELASTICSEARCH", // type of index storage
"cluster": "es_dgw_ts_tracing", // ES cluster name
"dataset": "tracing_default_useast1" // base index name
},
"config": {
"timePartition": {
"secondsPerSlice": "129600" // width of the index slice
},
"consistencyScope": "LOCAL",
"consistencyTarget": "EVENTUAL", // how should we read/write data
"acceptLimit": "129600s", // how far back writes are allowed
"indexConfig": {
"fieldMapping": { // fields to extract to index
"tags.nf.app": "KEYWORD",
"tags.duration": "INTEGER",
"tags.enabled": "BOOLEAN"
},
"refreshInterval": "60s" // Index related settings
}
},
"lifecycleConfigs": {
"lifecycleConfig": [
{
"type": "retention", // Index retention settings
"config": {
"close_after": "1296000s",
"delete_after": "1382400s"
}
}
]
}
}
]
Provisioning Infrastructure
With so many different parameters, we need automated provisioning workflows to deduce the best settings for a given workload. When users want to create their namespaces, they specify a list of workload desires, which the automation translates into concrete infrastructure and related control plane configuration. We highly encourage you to watch this ApacheCon talk, by one of our stunning colleagues Joey Lynch, on how we achieve this. We may go into detail on this subject in one of our future blog posts.
Once the system provisions the initial infrastructure, it then scales in response to the user workload. The next section describes how this is achieved.
Scalability
Our users may operate with limited information at the time of provisioning their namespaces, resulting in best-effort provisioning estimates. Further, evolving use-cases may introduce new throughput requirements over time. Here’s how we manage this:
- Horizontal scaling: TimeSeries server instances can auto-scale up and down as per attached scaling policies to meet the traffic demand. The storage server capacity can be recomputed to accommodate changing requirements using our capacity planner.
- Vertical scaling: We may also choose to vertically scale our TimeSeries server instances or our storage instances to get greater CPU, RAM and/or attached storage capacity.
- Scaling disk: We may attach EBS to store data if the capacity planner prefers infrastructure that offers larger storage at a lower cost rather than SSDs optimized for latency. In such cases, we deploy jobs to scale the EBS volume when the disk storage reaches a certain percentage threshold.
- Re-partitioning data: Inaccurate workload estimates can lead to over or under-partitioning of our datasets. TimeSeries control-plane can adjust the partitioning configuration for upcoming time slices, once we realize the nature of data in the wild (via partition histograms). In the future we plan to support re-partitioning of older data and dynamic partitioning of current data.
Design Principles
So far, we have seen how TimeSeries stores, configures and interacts with event datasets. Let’s see how we apply different techniques to improve the performance of our operations and provide better guarantees.
Event Idempotency
We prefer to bake in idempotency in all mutation endpoints, so that users can retry or hedge their requests safely. Hedging is when the client sends an identical competing request to the server, if the original request does not come back with a response in an expected amount of time. The client then responds with whichever request completes first. This is done to keep the tail latencies for an application relatively low. This can only be done safely if the mutations are idempotent. For TimeSeries, the combination of event_time, event_id and event_item_key form the idempotency key for a given time_series_id event.
SLO-based Hedging
We assign Service Level Objectives (SLO) targets for different endpoints within TimeSeries, as an indication of what we think the performance of those endpoints should be for a given namespace. We can then hedge a request if the response does not come back in that configured amount of time.
"slos": {
"read": { // SLOs per endpoint
"latency": {
"target": "0.5s", // hedge around this number
"max": "1s" // time-out around this number
}
},
"write": {
"latency": {
"target": "0.01s",
"max": "0.05s"
}
}
}
Partial Return
Sometimes, a client may be sensitive to latency and willing to accept a partial result set. A real-world example of this is real-time frequency capping. Precision is not critical in this case, but if the response is delayed, it becomes practically useless to the upstream client. Therefore, the client prefers to work with whatever data has been collected so far rather than timing out while waiting for all the data. The TimeSeries client supports partial returns around SLOs for this purpose. Importantly, we still maintain the latest order of events in this partial fetch.
Adaptive Pagination
All reads start with a default fanout factor, scanning 8 partition buckets in parallel. However, if the service layer determines that the time_series dataset is dense — i.e., most reads are satisfied by reading the first few partition buckets — then it dynamically adjusts the fanout factor of future reads in order to reduce the read amplification on the underlying datastore. Conversely, if the dataset is sparse, we may want to increase this limit with a reasonable upper bound.
Limited Write Window
In most cases, the active range for writing data is smaller than the range for reading data — i.e., we want a range of time to become immutable as soon as possible so that we can apply optimizations on top of it. We control this by having a configurable “acceptLimit” parameter that prevents users from writing events older than this time limit. For example, an accept limit of 4 hours means that users cannot write events older than now() — 4 hours. We sometimes raise this limit for backfilling historical data, but it is tuned back down for regular write operations. Once a range of data becomes immutable, we can safely do things like caching, compressing, and compacting it for reads.
Buffering Writes
We frequently leverage this service for handling bursty workloads. Rather than overwhelming the underlying datastore with this load all at once, we aim to distribute it more evenly by allowing events to coalesce over short durations (typically seconds). These events accumulate in in-memory queues running on each instance. Dedicated consumers then steadily drain these queues, grouping the events by their partition key, and batching the writes to the underlying datastore.
The queues are tailored to each datastore since their operational characteristics depend on the specific datastore being written to. For instance, the batch size for writing to Cassandra is significantly smaller than that for indexing into Elasticsearch, leading to different drain rates and batch sizes for the associated consumers.
While using in-memory queues does increase JVM garbage collection, we have experienced substantial improvements by transitioning to JDK 21 with ZGC. To illustrate the impact, ZGC has reduced our tail latencies by an impressive 86%:
Because we use in-memory queues, we are prone to losing events in case of an instance crash. As such, these queues are only used for use cases that can tolerate some amount of data loss .e.g. tracing/logging. For use cases that need guaranteed durability and/or read-after-write consistency, these queues are effectively disabled and writes are flushed to the data store almost immediately.
Dynamic Compaction
Once a time slice exits the active write window, we can leverage the immutability of the data to optimize it for read performance. This process may involve re-compacting immutable data using optimal compaction strategies, dynamically shrinking and/or splitting shards to optimize system resources, and other similar techniques to ensure fast and reliable performance.
The following section provides a glimpse into the real-world performance of some of our TimeSeries datasets.
Real-world Performance
The service can write data in the order of low single digit milliseconds
while consistently maintaining stable point-read latencies:
At the time of writing this blog, the service was processing close to 15 million events/second across all the different datasets at peak globally.
Time Series Usage @ Netflix
The TimeSeries Abstraction plays a vital role across key services at Netflix. Here are some impactful use cases:
- Tracing and Insights: Logs traces across all apps and micro-services within Netflix, to understand service-to-service communication, aid in debugging of issues, and answer support requests.
- User Interaction Tracking: Tracks millions of user interactions — such as video playbacks, searches, and content engagement — providing insights that enhance Netflix’s recommendation algorithms in real-time and improve the overall user experience.
- Feature Rollout and Performance Analysis: Tracks the rollout and performance of new product features, enabling Netflix engineers to measure how users engage with features, which powers data-driven decisions about future improvements.
- Asset Impression Tracking and Optimization: Tracks asset impressions ensuring content and assets are delivered efficiently while providing real-time feedback for optimizations.
- Billing and Subscription Management: Stores historical data related to billing and subscription management, ensuring accuracy in transaction records and supporting customer service inquiries.
and more…
Future Enhancements
As the use cases evolve, and the need to make the abstraction even more cost effective grows, we aim to make many improvements to the service in the upcoming months. Some of them are:
- Tiered Storage for Cost Efficiency: Support moving older, lesser-accessed data into cheaper object storage that has higher time to first byte, potentially saving Netflix millions of dollars.
- Dynamic Event Bucketing: Support real-time partitioning of keys into optimally-sized partitions as events stream in, rather than having a somewhat static configuration at the time of provisioning a namespace. This strategy has a huge advantage of not partitioning time_series_ids that don’t need it, thus saving the overall cost of read amplification. Also, with Cassandra 4.x, we have noted major improvements in reading a subset of data in a wide partition that could lead us to be less aggressive with partitioning the entire dataset ahead of time.
- Caching: Take advantage of immutability of data and cache it intelligently for discrete time ranges.
- Count and other Aggregations: Some users are only interested in counting events in a given time interval rather than fetching all the event data for it.
Conclusion
The TimeSeries Abstraction is a vital component of Netflix’s online data infrastructure, playing a crucial role in supporting both real-time and long-term decision-making. Whether it’s monitoring system performance during high-traffic events or optimizing user engagement through behavior analytics, TimeSeries Abstraction ensures that Netflix operates seamlessly and efficiently on a global scale.
As Netflix continues to innovate and expand into new verticals, the TimeSeries Abstraction will remain a cornerstone of our platform, helping us push the boundaries of what’s possible in streaming and beyond.
Stay tuned for Part 2, where we’ll introduce our Distributed Counter Abstraction, a key element of Netflix’s Composite Abstractions, built on top of the TimeSeries Abstraction.
Acknowledgments
Special thanks to our stunning colleagues who contributed to TimeSeries Abstraction’s success: Tom DeVoe Mengqing Wang, Kartik Sathyanarayanan, Jordan West, Matt Lehman, Cheng Wang, Chris Lohfink .
Introducing Netflix’s TimeSeries Data Abstraction Layer was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.