New cassandra_latest.yaml configuration for a top performant Apache Cassandra®

Welcome to our deep dive into the latest advancements in Apache Cassandra® 5.0, specifically focusing on the cassandra_latest.yaml configuration that is available for new Cassandra 5.0 clusters.  

This blog post will walk you through the motivation behind these changes, how to use the new configuration, and the benefits it brings to your Cassandra clusters. 

Motivation 

The primary motivation for introducing cassandra_latest.yaml is to bridge the gap between maintaining backward compatibility and leveraging the latest features and performance improvements. The yaml addresses the following varying needs for new Cassandra 5.0 clusters: 

  1. Cassandra Developers: who want to push new features but face challenges due to backward compatibility constraints. 
  2. Operators: who prefer stability and minimal disruption during upgrades. 
  3. Evangelists and New Users: who seek the latest features and performance enhancements without worrying about compatibility. 

Using cassandra_latest.yaml 

Using cassandra_latest.yaml is straightforward. It involves copying the cassandra_latest.yaml content to your cassandra.yaml or pointing the cassandra.config JVM property to the cassandra_latest.yaml file.  

This configuration is designed for new Cassandra 5.0 clusters (or those evaluating Cassandra), ensuring they get the most out of the latest features in Cassandra 5.0 and performance improvements. 

Key changes and features 

Key Cache Size 

  • Old: Evaluated as a minimum from 5% of the heap or 100MB
  • Latest: Explicitly set to 0

Impact: Setting the key cache size to 0 in the latest configuration avoids performance degradation with the new SSTable format. This change is particularly beneficial for clusters using the new SSTable format, which doesn’t require key caching in the same way as the old format. Key caching was used to reduce the time it takes to find a specific key in Cassandra storage. 

Commit Log Disk Access Mode 

  • Old: Set to legacy
  • Latest: Set to auto

Impact: The auto setting optimizes the commit log disk access mode based on the available disks, potentially improving write performance. It can automatically choose the best mode (e.g., direct I/O) depending on the hardware and workload, leading to better performance without manual tuning.

Memtable Implementation 

  • Old: Skiplist-based
  • Latest: Trie-based

Impact: The trie-based memtable implementation reduces garbage collection overhead and improves throughput by moving more metadata off-heap. This change can lead to more efficient memory usage and higher write performance, especially under heavy load.

create table … with memtable = {'class': 'TrieMemtable', … }

Memtable Allocation Type 

  • Old: Heap buffers 
  • Latest: Off-heap objects 

Impact: Using off-heap objects for memtable allocation reduces the pressure on the Java heap, which can improve garbage collection performance and overall system stability. This is particularly beneficial for large datasets and high-throughput environments. 

Trickle Fsync 

  • Old: False 
  • Latest: True 

Impact: Enabling trickle fsync improves performance on SSDs by periodically flushing dirty buffers to disk, which helps avoid sudden large I/O operations that can impact read latencies. This setting is particularly useful for maintaining consistent performance in write-heavy workloads. 

SSTable Format 

  • Old: big 
  • Latest: bti (trie-indexed structure) 

Impact: The new BTI format is designed to improve read and write performance by using a trie-based indexing structure. This can lead to faster data access and more efficient storage management, especially for large datasets. 

sstable:
  selected_format: bti
  default_compression: zstd
  compression:
    zstd:
      enabled: true
      chunk_length: 16KiB
      max_compressed_length: 16KiB

Default Compaction Strategy 

  • Old: STCS (Size-Tiered Compaction Strategy) 
  • Latest: Unified Compaction Strategy 

Impact: The Unified Compaction Strategy (UCS) is more efficient and can handle a wider variety of workloads compared to STCS. UCS can reduce write amplification and improve read performance by better managing the distribution of data across SSTables. 

default_compaction:
  class_name: UnifiedCompactionStrategy
  parameters:
    scaling_parameters: T4
    max_sstables_to_compact: 64
    target_sstable_size: 1GiB
    sstable_growth: 0.3333333333333333
    min_sstable_size: 100MiB

Concurrent Compactors 

  • Old: Defaults to the smaller of the number of disks and cores
  • Latest: Explicitly set to 8

Impact: Setting the number of concurrent compactors to 8 ensures that multiple compaction operations can run simultaneously, helping to maintain read performance during heavy write operations. This is particularly beneficial for SSD-backed storage where parallel I/O operations are more efficient. 

Default Secondary Index 

  • Old: legacy_local_table
  • Latest: sai

Impact: SAI is a new index implementation that builds on the advancements made with SSTable Storage Attached Secondary Index (SASI). Provide a solution that enables users to index multiple columns on the same table without suffering scaling problems, especially at write time. 

Stream Entire SSTables 

  • Old: implicity set to True
  • Latest: explicity set to True

Impact: When enabled, it permits Cassandra to zero-copy stream entire eligible, SSTables between nodes, including every component. This speeds up the network transfer significantly subject to throttling specified by

entire_sstable_stream_throughput_outbound

and

entire_sstable_inter_dc_stream_throughput_outbound

for inter-DC transfers. 

UUID SSTable Identifiers 

  • Old: False
  • Latest: True

Impact: Enabling UUID-based SSTable identifiers ensures that each SSTable has a unique name, simplifying backup and restore operations. This change reduces the risk of name collisions and makes it easier to manage SSTables in distributed environments. 

Storage Compatibility Mode 

  • Old: Cassandra 4
  • Latest: None

Impact: Setting the storage compatibility mode to none enables all new features by default, allowing users to take full advantage of the latest improvements, such as the new sstable format, in Cassandra. This setting is ideal for new clusters or those that do not need to maintain backward compatibility with older versions. 

Testing and validation 

The cassandra_latest.yaml configuration has undergone rigorous testing to ensure it works seamlessly. Currently, the Cassandra project CI pipeline tests both the standard (cassandra.yaml) and latest (cassandra_latest.yaml) configurations, ensuring compatibility and performance. This includes unit tests, distributed tests, and DTests. 

Future improvements 

Future improvements may include enforcing password strength policies and other security enhancements. The community is encouraged to suggest features that could be enabled by default in cassandra_latest.yaml. 

Conclusion 

The cassandra_latest.yaml configuration for new Cassandra 5.0 clusters is a significant step forward in making Cassandra more performant and feature-rich while maintaining the stability and reliability that users expect. Whether you are a developer, an operator professional, or an evangelist/end user, cassandra_latest.yaml offers something valuable for everyone. 

Try it out 

Ready to experience the incredible power of the cassandra_latest.yaml configuration on Apache Cassandra 5.0? Spin up your first cluster with a free trial on the Instaclustr Managed Platform and get started today with Cassandra 5.0!

The post New cassandra_latest.yaml configuration for a top performant Apache Cassandra® appeared first on Instaclustr.

P99 CONF 24 Recap: Heckling on the Shoulders of Giants

As I sit here at the hotel breakfast bar contemplating what a remarkable couple of days it’s been for the fourth annual P99 CONF, I feel quite honored to have helped host it. While the coffee is strong and the presentations are fresh in my mind, let’s recap some of the great content we shared and reveal some of the behind-the-scenes efforts that made it all happen. Watch P99 CONF Talks On Demand Day 1 While we warmed up the live hosting stage, we had a fright with Error 1016 origin DNS errors on our event platform. As we scrambled to potentially host live on an alternative platform, Cloudflare saved the day and the show was back on the road. DNS issues weren’t going to stop us from launching P99 CONF! Felipe Cardeneti Mendes got things started in the lounge with hundreds of people asking great questions about ScyllaDB pre-show. Co-founder of ScyllaDB, Dor Laor, opened the show with his keynote about ScyllaDB tablets. In the first few slides we were looking at assembly, then 128 fully utilized CPU cores not long after that. By the end of the presentation, we had throughput north of 1M ops/sec – complete with ScyllaDB’s now famous predictable low latency. To help set the scene of P99 CONF, we heard from Pekka Enberg, CTO of Turso (sorry, I’m the one who overlooked the company name mistake in the original video). Pekka dove into the patterns of low latency. This generated more great conversation in chat. If you want all the details, then his book simply titled Latency is a must-read. Since parallel programming is hard, we opened up 3 stages for you to choose from following the keynotes. Felipe returned, this time as a session speaker. Proving that not all benchmarks need to be institutionalized cheating, he paired with Alan “Dormando” of Memcached to see how ScyllaDB stacks up from a caching perspective. We also heard from Luc Lenôtre, a talented engineer, who toyed with a kernel written in Rust. Luc showed us lots of flame graphs and low-level tuning of Maestro. Continuing with the Rust theme was Amos Wenger in a very interesting look at making HTTP faster with io_uring. There were other great talks from well-known companies. For example, Jason Rahman from Microsoft shared his insights on tracing Linux scheduler behavior using ftrace. Also, Christopher Peck from Uber shared their experience tuning with generational ZGC. This reflects much of the P99 CONF content – real world, production experience taming P99 latencies at scale. Another expanding theme at this year’s P99 CONF eBPF. And who better than Liz Rice from Isovalent to kick it off with her keynote, Zero-overhead Container Networking with eBPF and Netkit. I love listening to Liz explain in technical detail the concepts and benefits of using eBPF and will definitely be reading through her book–Learning eBPF on the long flight home to Australia. By the way, books! There are now so many authors associated with P99 CONF which, I think, is a testament to the quality and professionalism of the speakers. We were giving away book bundles to members of the community who were top contributors in the chat, answering and asking great questions (huge thank you!). Some of the books on offer – which you can grab for yourself – are: Database Performance at Scale – by Felipe Mendes (ScyllaDB) et. al. Latency – by Pekka Enberg (Turso) Think Distributed Systems – by Dominik Tornow (Resonate HQ) Writing for Developers: Blogs that Get Read – by Piotr Sarna (poolside) ScyllaDB in Action – by Bo Ingram (Discord) And if you’re truly a tech bookworm, see this blog post for an extensive reading list: 14 Books by P99 CONF Speakers: Latency, Wasm, Databases & More. By mid-afternoon, day 1, Gunnar Morling from decodable lightened things up with his keynote on creating the 1 billion row challenge. I’m sure you’ve heard of it, and we had another speaker, Shraddha Agrawal following up with her version in Golang. We enjoyed lots more great content in the afternoon, including Piotr Sarna (from poolside AI and co-author of Database Performance at Scale + Writing for Developers) taking us back to the long-standing database theme of the conference with performance perspectives on database drivers. Speaking of themes, Wasm returned with book authors Brian Sletten and Ramnivas Laddad looking at WebAssembly on the edge. And the two Adams from Zoo gave us unique insight into building a remote CAD solution that feels local. And showing that we can finish day 1 just as strong as we started it, Carl Lerche, creator of tokio-rs, returned to P99 CONF for the Day 1 closing keynote. This year, he highlighted how Rust – which is typically used at the infrastructure level for all the features we love, like safety, concurrency, and performance – is also applicable at higher levels in the stack. He also announced the first version of Toasty, an ORM for Rust. Day 2 The second day kicked off with Andy Pavlo from CMU and his take on the tension between the database and operating system, with a unique research project, Tigger, a database proxy that pushes a database into kernel space using eBPF. Leading on from that, we had Bryan Cantrill, CTO of Oxide and creator of DTrace, reviewing DTrace’s 21-year history with plenty of insights into the origins and evolution of this framework. Bryan has presented at every P99 CONF and is one of the many industry giants, that you can stand on the shoulders of heckle in chat. We love the dynamism that Bryan brings each year to P99 CONF. More great talks followed with Cameron from Shopify, Cary from Oracle, Vivkek from Linkedin and Richard from Datadog. We were also honored to host the creator of Postgres and Turing Award winner Michael Stonebraker. I also enjoyed Tanel Poder’s take on using eBPF off-CPU sampling to get to the bottom of database wait time. There was also more database content from Lukasz at ScyllaDB with advanced compression techniques. Avi Kivity led the afternoon’s keynote with his fascinating asymptotic performance journey through optimization and tradeoffs he’s made as CTO for ScyllaDB. Also on ScyllaDB, the Sharechat team described the evolution of their feature store. First they made it fast, as described at P99 CONF 23. Now, they’re making it cheaper to run with this impressive performance. Another blast of technical content in the afternoon with Benjamin and Tyler from American Express building card payment systems with ultra-low latency. A P99 favorite, Aleksei from TigerBeetle looked at just-in-time compaction with incredible detail. Dominik from Resonate to us through distributed async await patterns in the cloud. Peter from Percona dove into Redis alternatives. Ash from Unum Cloud gave us insights to AI powered real-time searches taking advantage of SIMD and GPU accelerated implementations. And Cristian from Uber was all about improving p99 latency in third-party APIs. Jose Fernandez from Netflix capped off the second day with his keynote on noisy neighbor detection with eBPF, which was a strong finish to another fantastic conference. What you didn’t see on stage, but happened in the recently renamed “Champagne Room,” was a mini happy birthday celebration for Cynthia – complete with a near-miss disaster champagne opening (no electrical equipment was harmed). Conference chat was lively right until the end. As I finish my second cup of coffee, I think that this is really what makes P99 CONF so special. Not only the ability to watch people from around the world share their real-life P99-related experiences, but having the chance to chat with them too. I look forward to hosting this again next year. Watch P99 CONF Talks On Demand

5X to 40X Lower DynamoDB Costs— with Better P99 Latency

At our recent events, I’ve been fielding a lot of questions about DynamoDB costs. So, I wanted to highlight the cost comparison aspect of a recent benchmark comparing ScyllaDB and DynamoDB. This involved a detailed price-performance comparison analyzing: How cost compares across both DynamoDB pricing models under various workload conditions How latency compares across this set of workloads I’ll share details below, but here’s a quick summary: ScyllaDB costs are significantly lower in all but one scenario. In realistic workloads, costs would be 5X to 40X lower — with up to 4X better P99 latency. Here’s a consolidated look at how DynamoDB and ScyllaDB compare on a Uniform distribution (DynamoDB’s sweet spot). Now, more details on the cost aspect of this comparison. For a deeper dive into the performance aspect, see this DynamoDB vs ScyllaDB price-performance comparison blog as well as the complete benchmark report. How We Compared DynamoDB Costs vs ScyllaDB Costs For our cost comparisons, we launched a small 3-node cluster in ScyllaDB Cloud and measured performance on a wide range of workload scenarios. Next, we calculated the cost of running the same workloads on DynamoDB. We used an item size of 1081 bytes, which translates to 2 WCUs per write operation and 1 RCU per read operation on DynamoDB. Our working data set size was 1 TB, with an approximate cost of ~$250/month in DynamoDB. We used the same ScyllaDB cluster through every testing scenario, thus simplifying ScyllaDB Cloud costs. Hourly rates (on-demand) were used. As ScyllaDB linearly scales with the amount of resources, you can predictably adjust costs to match your desired performance outcome. Annual pricing provides significant cost reduction but is out of the scope of this benchmark. DynamoDB has two modes for non-annual pricing: provisioned and on-demand pricing. Provisioned mode is recommended if your workloads are reasonably predictable. On-demand pricing is significantly more expensive and is a fit for unpredictable, low-throughput workloads. It is possible to combine modes, add auto-scaling, and so forth. DynamoDB provides considerable flexibility around managing the cost and scale of the aforementioned options, but this also results in considerable complexity. For details on how we calculated costs, refer to the Cost Calculations section at the end of this article. Throughout all tests, we ensured ScyllaDB had spare capacity at all times by keeping its load below 75%. Given that, note that it is possible to achieve higher traffic than the numbers reported here at no additional cost, in turn allowing for additional growth. The number of operations per second that the ScyllaDB cluster performs for each workload is reported under the X axis in the following graphs. Provisioned Cost Comparison: DynamoDB vs ScyllaDB Provisioned mode is recommended if your workloads are reasonably predictable. With DynamoDB, you need to be able to predict per-table capacity following AWS DynamoDB read/write capacity unit pricing model. With just one exception, DynamoDB’s cost estimates were consistently higher than ScyllaDB – and much more so for the most write-heavy workloads. In the 1 out of 15 cases where DynamoDB turned out to be less expensive, ScyllaDB could actually drive more utilization to win over DynamoDB. However, we wanted to keep the results consistent and fair. This is not surprising, given that DynamoDB charges 5X more for writes than for reads, while ScyllaDB does not differentiate between operations, and its pricing is based on the actual cluster size. On-Demand Cost Comparison: DynamoDB vs ScyllaDB On-demand pricing is best when the application’s workload is unclear, the application’s data traffic patterns are unknown, and/or your company prefers a pay-as-you-go option. However, as the results show, the convenience and flexibility of DynamoDB’s on-demand pricing often come at quite a cost. To see how we calculated costs, refer to the Cost Calculations section at the end of this article.     Here, the same general trends hold true. ScyllaDB cost is fixed across the board, and its cost advantage grows as write throughput increases. ScyllaDB’s cost advantage over on-demand DynamoDB tables is significantly greater when compared to provisioned capacity on DynamoDB. Why? Because DynamoDB’s on-demand pricing is significantly higher than its provisioned capacity counterpart. Therefore, workloads with unpredictable traffic spikes (which would justify not using provisioned capacity) may easily end up with runaway bills compared to costs with ScyllaDB Cloud. Making ScyllaDB Even More Cost Effective Unlike DynamoDB (where you provision tables), ScyllaDB is provisioned as a cluster, capable of hosting several tables – and therefore consolidating several workloads under a single deployment. Excess hardware capacity may be shared to power more use cases on that cluster. ScyllaDB Cloud and Enterprise users can also use ScyllaDB’s Workload Prioritization to prioritize specific access patterns and further drive consolidation. For example, assume there are 10 use cases that require 100K OPS each. With DynamoDB, users would be forced to allocate a provisioned workload per table or to use the rather expensive on-demand mode. This introduces a set of caveats: If every workload consistently reaches its peak capacity, it will likely get throttled by AWS (provisioned mode), or result in runaway bills (on-demand mode). Likewise, the opposite also holds true: If most workloads are consistently idle, provisioned mode results in non-consumed capacity bills. On-demand mode doesn’t guarantee immediate capacity to support traffic surges. This, in turn, causes your applications to experience some degree of throttling. A standard ScyllaDB deployment is not only more cost effective, but also simplifies management. It allows users to consolidate all workloads within a single cluster and share idle capacity among them. With ScyllaDB Cloud and Enterprise, users further have the flexibility to define priorities on a per-workload basis, allowing the database to make more informed decisions when two or more workloads compete against each other for resources. Cost Calculation Details Here’s how we calculated costs across the different databases and pricing models. DynamoDB Cost Calculations Provisioned Costs for DynamoDB With DynamoDB’s provisioned capacity mode, planning is required. You specify the number of reads and writes per second that you expect your application to require. You can make use of auto-scaling to automatically adjust your table’s capacity based on a specific utilization target in order to sustain spikes outside of your projected planning. In provisioned mode, you need to provision DynamoDB with the expected throughput. You set WCUs (Write Capacity Units) and RCUs (Read Capacity Units) which signify the allowed number of write and read operations per second, respectively. They are priced per hour. One WCU is $0.00065 per hour One RCU is $0.00013 per hour This yields the following formulas for calculating monthly costs:     On-Demand Costs for DynamoDB With DynamoDB’s on-demand mode, no planning is required. You pay for the actual reads/writes that your application is using (the total number of actual writes or reads, not writes or reads per second). In this mode, you pay by usage and the cost is per request unit (rather than per capacity unit, as in the provisioned mode). AWS charges $1.25 per million write request units (WRU) and $0.25 per million read request units (RRU). Therefore, a single request unit costs 1 millionth of the actual write/read operation, as follows: One WRU is $0.00000125 per write One RRU is $0.00000025 per read This yields the following formulas for calculating monthly costs:   ScyllaDB Cost Calculations As stated previously, we used ScyllaDB’s on-demand pricing for all cost comparisons in this study. ScyllaDB’s on-demand costs were determined using our online pricing calculator as follows: From the ScyllaDB Pricing Calculator This calculator estimates the size and cost of a ScyllaDB Cloud cluster based on the specified technical requirements around throughput and item/data size. Note that in ScyllaDB, the primary aspect driving costs is the cluster size, unlike DynamoDB’s model on the volume of reads and writes. Once the cluster size is determined, the deployment can often exceed throughput requirements. For comparison, DynamoDB’s provisioned pricing structure requires users to explicitly specify sustained and peak throughput. Overprovisioning equivalent performance in DynamoDB would be significantly pricier compared to ScyllaDB. Without an annual commitment for cost savings, the estimated annual cost for the ScyllaDB Cloud cluster is $54,528, calculated at a monthly rate of $4,544. Conclusion As the results indicate, what might begin at a seemingly reasonable cost can quickly escalate into “bill shock” with DynamoDB – especially at high throughputs, and particularly with write-heavy workloads. This makes DynamoDB a suboptimal choice for data-intensive applications anticipating steady or rapid growth. ScyllaDB’s significantly lower costs – a reflection of ScyllaDB taking full advantage of modern infrastructure for high throughput and low latency – make it a more cost-effective solution for data-intensive applications. ScyllaDB – with its LSM-tree-based storage, unified caching, shard-per-core design, and advanced schedulers – allows you to maximize the advantages of modern hardware, from huge CPU chips to blazing-fast NVMe. Beyond the presented cost savings, ScyllaDB sustains 2X peaks and provides 2X-4X better P99 latency. Additionally, it can further reduce latency when idle – or enable spare resources to be shared across multiple tables. For larger workloads spanning 500K-1M OPS and beyond, this can result in a cost saving in the millions – with better performance and fewer query limitations.

Why ScyllaDB’s Shard Per Core Architecture Matters

3 perspectives on what shard-per-core involves and why it matters for teams who care about database performance. Also …  puppies! ScyllaDB is best known for 3 things: 1) Predictable database performance at scale 2) A shard-per-core architecture 3) Cute sea monsters The monster’s cuteness speaks for itself – especially if you’ve seen the plushie version. But a little more explanation is often required to communicate what’s so special about the shard-per-core architecture, and how it contributes to ScyllaDB’s predictable performance at scale. That’s what we’ll cover in this article. Below are three different perspectives on what shard-per-core involves and why it matters for teams who care about database performance: How Dor Laor (ScyllaDB Co-founder and CEO) first introduced the concept back when ScyllaDB launched in 2015 The ScyllaDB power user perspective by Bo Ingram, author of ScyllaDB in Action and the monstrously popular ScyllaDB migration blog A more detailed look under the hood, explained by Tzach Livyatan (VP of Product) earlier this year at ScyllaDB Summit Bonus: We’ll bring some puppies into the mix since (some) puppies are just as cute as the ScyllaDB monster. The ScyllaDB Sea Monster with Baunilha Dor Laor’s 2015 introduction: independent, lock-free processing across server cores From a 2015 ScyllaDB feature on ZDNet “Everybody does sharding in a cluster but usually the granularity is per server. We do it per core. The result is each of the shards within the server is fully independent, so there’s no need for locking. there are no regular threads that you need to protect your data structures. The independence of the shard means there is no contention, with each one managing its own resources, whether that is a networking card, memory or the CPU itself. Each shard has its own CPU, its own memory – it’s local to that node so it’s multi-socket friendly – that’s NUMA-friendly [non-uniform memory access]. It’s expensive to access memory between one core and one socket with memory that belongs to another socket. Within a server, we have lots of such shards – exactly the amount of the x86 cores that exist in that server. The bigger the server grows with the newer CPUs that Intel produces, the better for us because we scale up linearly. In the relations between the cores, everything is independent.” Bo Ingram’s power user’s perspective: predictable low latencies From Bo’s new book, “ScyllaDB in Action” “ScyllaDB’s biggest architectural difference is its shard-per-core architecture. Both Cassandra and ScyllaDB shard a data set across its various nodes via placement in a hash ring. ScyllaDB takes this further by leveraging the Seastar framework (https://seastar.io/) to shard data within a node, splitting it up per CPU-core and giving each shard its own CPU, memory, and network bandwidth allocation. Cassandra does not follow this paradigm, however, and limits the sharding to only per node. If a data partition gets a large amount of requests, it can overwhelm the node, leading to cluster-wide struggles. Performance justifies the rewrite. Both in benchmarks (https://thenewstack.io/benchmarking-apache-cassandra-40-nodes-vs-scylladb-4-nodes/) and in the wild (https://discord.com/blog/how-discord-stores-trillions-of-messages), ScyllaDB is faster, more consistent, and requires fewer servers to operate than Cassandra.” Read more from ScyllaDB In Action – Free Tzach Livyatan: Unraveling the threads towards linear scalability From Tzach Livyatan’s keynote, “A Deep Dive into ScyllaDB’s Architecture” “ScyllaDB was designed with performance in mind – in particular, high throughput and low latency. The shard-per-core design is fundamental for that. Systems that aren’t as obsessed with performance tend to use a thread pool, with many threads competing for the same resources. At any given second, threads might be trying to get to memory, trying to get to disk, or trying to do something that requires synchronization – and they get blocked. The CPU will then context switch. When we profiled other databases, we found that this synchronization between the threads is often responsible for consuming the majority of the resources. ScyllaDB takes a different approach, with a shard per core architecture. Each thread is pinned to a specific core. And each thread is assigned its own designated chunk of memory, its own designated chunk of network, and its own designated chunk of storage. As a result, there’s minimal interaction between the cores, and each can run independently in a very efficient way: it never context switches, it never waits. I think this is probably the most important design decision behind the high performance that users get from ScyllaDB. It also allows ScyllaDB to scale linearly with the number of cores. If you deploy ScyllaDB on 16 cores, and then on 32 cores, you get exactly twice the performance. And if you double the cores again, you again get double the performance. Since each core is an independent shard, there is virtually no interaction between the cores and you can scale with the number of cores in a very efficient way.” See Tzach’s complete ScyllaDB architecture deep dive here:   And now… puppies If you’ve ever fed a group of puppies, you’ll recognize the top image here. There are 6 bowls of dog food and 6 puppies. But multiple puppies are fighting over a couple bowls of food, and a few bowls of food are totally empty. Food is spilled all over the ground as a result of the fights. That’s like the shared thread architecture used by other systems, like Cassandra. When a job needs to be done, it’s basically thrown down to a thread pool, and a thread takes the job. However, the threads bump into each other – like our puppies. In the systems case, that could cause problems such as latency spikes. In the lower image, you can see each puppy is happily eating from its own bowl of food. There’s no fighting and no waste. Similarly, ScyllaDB’s shard-per core architecture eliminates all that contention for resources. We take all of the system resources and all of the data and split it up evenly, based on the number of cores. Just like each puppy has its own portion of food, each shard has its own dedicated RAM, its own network, its own I/O, and its own piece of the data. Coming soon…quantifying the impact of a shard-per-core architecture Almost a year ago to the day, Dor Laor kicked off P99 CONF 23 with a shard-per-core deep dive that people are still talking about. His teaser: Most software isn’t architected to take advantage of modern hardware. How does a shard-per-code and shared-nothing architecture help – and exactly what impact can it make? I will examine technical opportunities and tradeoffs, as well as share the results of a new benchmark study. To give that talk the depth it deserves, we’ll write it up in a dedicated article. So stay tuned if you prefer to read. If you can’t wait, we invite you to watch it now. See Dor’s Shard-Per-Core Keynote Join us for P99 CONF 24 – Oct 23 and 24

Book Excerpt: ScyllaDB versus Other Databases

How does ScyllaDB compare to other databases? Read what Bo Ingram (Staff Engineer at Discord) has to say – in this excerpt from his new book “ScyllaDB in Action.” Editor’s note We’re thrilled to share the following excerpt from Bo Ingram’s informative – and fun! – new book on ScyllaDB: ScyllaDB in Action. It’s available now via Manning and Amazon. You can also access a 3-chapter excerpt for free, compliments of ScyllaDB. Get the first 3 book chapters, free You might have already experienced Bo’s expertise and engaging communication style in his blog How Discord Stores Trillions of Messages or ScyllaDB Summit talks How Discord Migrated Trillions of Messages from Cassandra to ScyllaDB and  So You’ve Lost Quorum: Lessons From Accidental Downtime  If not, you should 😉 And if you want to learn more from Bo, join him at our upcoming Masterclass: Data Modeling for Performance Masterclass. We’ve ordered boxes of print books and will be giving them out! Join Bo at the “Data Modeling for Performance” Masterclass This blog post shares how ScyllaDB compares to: Relational databases Apache Cassandra Amazon DynamoDB Google Cloud Spanner MongoDB Distributed relational databases (CockroachDB, TiDB, and YugabyteDB)   The following is an excerpt from Chapter 1; it’s reprinted here with permission of the publisher. ScyllaDB versus relational databases ScyllaDB runs multiple nodes, making it a distributed system. By spreading its data across its deployment, it uses that to achieve its desired availability and consistency, which, when combined, differentiates the database from other systems. I’ve introduced ScyllaDB by describing its features in comparison with relational databases, but we’ll examine in closer detail here the differences. Relational databases such as PostgreSQL and MySQL are the standard for data storage in software applications, and they’re almost always the default choice for a new developer looking to build an application. Relational databases are a very strong option for many use cases, but that doesn’t mean they’re a strong option for every use case. ScyllaDB is a distributed NoSQL database. By distributing data across a cluster, ScyllaDB unlocks better availability when nodes go awry than a single-node all-or-nothing relational database. PostgreSQL and MySQL can run in a distributed mode, but that is either powered through extensions or newer storage engines and not the primary native mode of the database. This distribution is native to ScyllaDB and the bedrock of its design. By running as a distributed system, ScyllaDB empowers horizontal scalability. Many relational databases are only vertically scalable – you can only add more resources by running it on a bigger server. With horizontal scalability, you can add additional nodes to a system to increase its capacity. ScyllaDB supports this expansion; administrators can add more nodes, and the cluster will rebalance itself, offloading data to the new cluster member. In a relational database, horizontal scaling is possible, but it’s often manual. Operators need to manually shard data between multiple nodes to achieve this behavior. ScyllaDB does not provide a relational database’s ACID (atomicity, consistency, isolation, and durability) guarantees, instead opting for a softer model called BASE (Basic Availability, Soft-state, and Eventual consistency), where the database has basic availability and is eventually consistent. This decision leads to faster writes than a relational database, which has to validate the consistency of the database after every write, whereas ScyllaDB only needs to save the write since it doesn’t promise that degree of correctness. The tradeoff, though, is that developers need to consider ScyllaDB’s weaker consistency. … Ultimately, ScyllaDB versus relational databases is a foundational and philosophical decision. They operate so differently and provide such varying guarantees to their clients that picking one over the other has large effects on an application. If you’re looking for availability and scalability in your database, ScyllaDB is a strong option. ScyllaDB versus Cassandra ScyllaDB is a rewrite of Apache Cassandra. It is frequently described as “a more performant Cassandra” or “Cassandra but in C++”. ScyllaDB is designed to be compatible with Cassandra: it uses a compatible API, query language, on-disk storage format, and hash ring architecture. Like Cassandra, but better, is ScyllaDB’s goal; it makes some improvements to accomplish this. The choice of language in the rewrite immediately unlocks better performance. Cassandra is written in Java, which leverages a garbage collector to perform memory management. Because objects get loaded into memory, at some point, they will need to be removed. Java’s garbage collection algorithms handle this removal, but it comes at the cost of compute. Time spent garbage collecting is time Cassandra can’t spend executing queries. If garbage collection reaches a certain threshold, the Java Virtual Machine will pause all execution for a brief time while it cleans up memory, referred to as a “stop the world” pause. Even if it’s just for milliseconds, that pause can be painful to clients. Although Java exposes many configuration knobs and improves the garbage collector with each release, it’s a tax that all Java-based applications have to pay — whether in garbage collection time or time spent mitigating it. ScyllaDB avoids this tax because it is implemented in C++ and provides more granular controls for memory management. By having full control of memory allocation and cleanup, ScyllaDB doesn’t need to let a garbage collector perform this functionality on an application-wide scale. It avoids “stop the world” pauses and can dedicate its compute time to executing queries. ScyllaDB’s biggest architectural difference is its shard-per-core architecture (figure 1.9). Both Cassandra and ScyllaDB shard a data set across its various nodes via placement in a hash ring, which you’ll learn more about in chapter 3. ScyllaDB takes this further by leveraging the Seastar framework (https://seastar.io/) to shard data within a node, splitting it up per CPU-core and giving each shard its own CPU, memory, and network bandwidth allocation. Figure 1.9 ScyllaDB shards data not only within the cluster, but also within each instance. This sharding further limits the blast radius due to hot traffic patterns – the damage is limited to just that shard on that node. Cassandra does not follow this paradigm, however, and limits the sharding to only per node. If a data partition gets a large amount of requests, it can overwhelm the node, leading to cluster-wide struggles. Performance justifies the rewrite. Both in benchmarks (https://thenewstack.io/benchmarking-apache-cassandra-40-nodes-vs-scylladb-4-nodes/) and in the wild (https://discord.com/blog/how-discord-stores-trillions-of-messages), Scylladb is faster, more consistent, and requires fewer servers to operate than Cassandra. ScyllaDB versus Amazon Aurora / Google Cloud Google / Spanner AlloyDB I’ve lumped a few similar systems together here – Amazon Aurora, Amazon DynamoDB, Google Cloud Spanner, and Google AlloyDB. They can be generally described as scalable cloud-hosted databases. They aim to take a relational data model and provide greater scalability than out-of-the-box PostgreSQL or MySQL. This effort accentuates a need in the market for scalable databases, showing the value of ScyllaDB. These systems have two related drawbacks – cloud vendor lock-in and cost. As cloud providers provide these databases, they run in only that specific vendor’s cloud environment. You can’t run Google Cloud Spanner in Amazon Web Services. If your application is heavily dependent on one of these systems, there can be a high engineering cost if you decide to switch cloud providers, as you’ll need to migrate data into a different system with a potentially different storage paradigm. If you’re not using that provider (or any provider), these options aren’t even on the table for you. And by using a cloud provider, companies pay money for these services. Operating and maintaining a database is challenging (which is partly why you’re reading this book), and although these cloud vendors provide solutions to make it potentially simpler, that can get quite expensive for clients. Of course, operating a database yourself can also be costly. ScyllaDB, however, can be run anywhere. Companies are running it on-premises or within various cloud providers. It provides a scalable and fault-tolerance database that you can take to any hosting solution. ScyllaDB versus document stores I’m not talking about Google Drive here, but instead, databases that store unstructured documents by a given key, such as MongoDB. These systems support querying these documents, allowing users to access arbitrary document fields without defining a database schema. ScyllaDB eschews this flexibility to provide (relatively) predictable performance. By requiring users to define their schema up front, it clarifies to both users and the system how data is distributed across the cluster. By forcing users to query data in patterns that match this distribution, ScyllaDB can limit the number of nodes involved in a query, preventing surprisingly expensive queries. Document stores, on the other hand, tend to bias toward initial ease of use. In MongoDB, no schema definition is required, but users still need to consider the design of their data to query it effectively. MongoDB runs as a distributed system, but unlike ScyllaDB, it doesn’t out-of-the-box attempt to minimize inefficient queries that hit more than the expected number of nodes, leading to potential performance surprises. In the CAP theorem, MongoDB is a CP (consistent and partition-tolerant) system. Writes require the presence of a primary node and are blocked until a new primary is elected in the event of a network partition. ScyllaDB, however, prioritizes availability in its query path, keeping the system up and relying on its tunable consistency. ScyllaDB versus distributed relational databases One interesting development for databases over the past few years has been the growth of distributed transactional databases. These systems — such as CockroachDB, TiDB, and YugabyteDB — focus on improving the availability of a traditional relational database like PostgreSQL while still offering strong consistency. In the CAP theorem’s classifications, they’re CP systems; they prefer consistency over availability. By emphasizing correctness, they need a quorum of nodes to respond to successfully complete a query; if quorum is lost, the database loses availability. ScyllaDB, however, provides tunable consistency to dodge this problem. By allowing weaker consistency levels, such as ONE, Scylla can handle a greater loss of availability to preserve functionality. In a relational database, writes are the computationally intensive operation. The database needs to validate its consistency on every write. Scylla, on the other hand, skips this verification, opting for speed and simplicity when writing data. The tradeoff, however, is that reads in Scylla will be slower than writes, as you need to gather data from multiple nodes that have data stored in different places on disk. You’ll learn a lot more about this behavior in chapters 6 and 7, but the big takeaway is that writes in Scylla will be faster than in these systems. When to prefer other databases I’ve described ScyllaDB’s benefits relative to other databases, but sometimes, I admit, it’s not the best tool for the job. I can’t describe it as a unique database because of the Cassandra rewrite approach, but it does trade operational and design complexity for more graceful failure modes. Choosing Scylla requires you to design applications differently because it has specific data-modeling needs to best use its capabilities and adds more complexity than something like a cloud-hosted PostgreSQL server. If you don’t need ScyllaDB’s horizontal scalability and nuanced availability, the increased operational overhead might not be worth it. If your application is small, makes few requests, and isn’t expected to grow over time, ScyllaDB might be overkill. A database backing comments on your blog probably doesn’t need a ScyllaDB cluster, unless, like many of us, you’re wanting that as an excuse to try it out. Operating and maintaining a ScyllaDB cluster isn’t a hands-off exercise. If you can’t dedicate time to operating and maintaining a cluster, that is another signal that a managed offering might be preferable for you. Teams must choose wisely about how they spend their time and their money on what they do; choosing a less hands-on is a valid decision. One thing you’ll see about Scylla in upcoming chapters is that, with data modeling, it can be inflexible to change your database’s design. Adding new query patterns that don’t fit in with your initial design can be challenging. While there are ways to work around it, other databases can potentially give you more flexibility when you’re in the prototyping and learning stage of building features for an application. Lastly, some use cases might prefer a stronger transactional model like ACID. If you’re working with financial data, you might want to use a relational database so that you can have isolation in your operations. One popular example to demonstrate the importance of ACID transactions is concurrent access to bank accounts. Without isolation, you run the risk of concurrent operations causing a mismatch between how much money the database thinks you have and how much money you actually have. Accountants traditionally prefer accuracy in these areas, so you might prefer a relational database when working with something that needs stronger database transactions. While scaling a relational database has its challenges, they might be preferable to take on than surrendering ACID’s guarantees. Scylla can get closer to ACID through careful design and usage of some more advanced features you’ll learn about in chapter 6, but it’s not quite as an “out-of-the-box” experience as a relational database. Get the first 3 book chapters, free Join Bo at the “Data Modeling for Performance” Masterclass

Introducing Netflix’s TimeSeries Data Abstraction Layer

By Rajiv Shringi, Vinay Chella, Kaidan Fullerton, Oleksii Tkachuk, Joey Lynch

Introduction

As Netflix continues to expand and diversify into various sectors like Video on Demand and Gaming, the ability to ingest and store vast amounts of temporal data — often reaching petabytes — with millisecond access latency has become increasingly vital. In previous blog posts, we introduced the Key-Value Data Abstraction Layer and the Data Gateway Platform, both of which are integral to Netflix’s data architecture. The Key-Value Abstraction offers a flexible, scalable solution for storing and accessing structured key-value data, while the Data Gateway Platform provides essential infrastructure for protecting, configuring, and deploying the data tier.

Building on these foundational abstractions, we developed the TimeSeries Abstraction — a versatile and scalable solution designed to efficiently store and query large volumes of temporal event data with low millisecond latencies, all in a cost-effective manner across various use cases.

In this post, we will delve into the architecture, design principles, and real-world applications of the TimeSeries Abstraction, demonstrating how it enhances our platform’s ability to manage temporal data at scale.

Note: Contrary to what the name may suggest, this system is not built as a general-purpose time series database. We do not use it for metrics, histograms, timers, or any such near-real time analytics use case. Those use cases are well served by the Netflix Atlas telemetry system. Instead, we focus on addressing the challenge of storing and accessing extremely high-throughput, immutable temporal event data in a low-latency and cost-efficient manner.

Challenges

At Netflix, temporal data is continuously generated and utilized, whether from user interactions like video-play events, asset impressions, or complex micro-service network activities. Effectively managing this data at scale to extract valuable insights is crucial for ensuring optimal user experiences and system reliability.

However, storing and querying such data presents a unique set of challenges:

  • High Throughput: Managing up to 10 million writes per second while maintaining high availability.
  • Efficient Querying in Large Datasets: Storing petabytes of data while ensuring primary key reads return results within low double-digit milliseconds, and supporting searches and aggregations across multiple secondary attributes.
  • Global Reads and Writes: Facilitating read and write operations from anywhere in the world with adjustable consistency models.
  • Tunable Configuration: Offering the ability to partition datasets in either a single-tenant or multi-tenant datastore, with options to adjust various dataset aspects such as retention and consistency.
  • Handling Bursty Traffic: Managing significant traffic spikes during high-demand events, such as new content launches or regional failovers.
  • Cost Efficiency: Reducing the cost per byte and per operation to optimize long-term retention while minimizing infrastructure expenses, which can amount to millions of dollars for Netflix.

TimeSeries Abstraction

The TimeSeries Abstraction was developed to meet these requirements, built around the following core design principles:

  • Partitioned Data: Data is partitioned using a unique temporal partitioning strategy combined with an event bucketing approach to efficiently manage bursty workloads and streamline queries.
  • Flexible Storage: The service is designed to integrate with various storage backends, including Apache Cassandra and Elasticsearch, allowing Netflix to customize storage solutions based on specific use case requirements.
  • Configurability: TimeSeries offers a range of tunable options for each dataset, providing the flexibility needed to accommodate a wide array of use cases.
  • Scalability: The architecture supports both horizontal and vertical scaling, enabling the system to handle increasing throughput and data volumes as Netflix expands its user base and services.
  • Sharded Infrastructure: Leveraging the Data Gateway Platform, we can deploy single-tenant and/or multi-tenant infrastructure with the necessary access and traffic isolation.

Let’s dive into the various aspects of this abstraction.

Data Model

We follow a unique event data model that encapsulates all the data we want to capture for events, while allowing us to query them efficiently.

Let’s start with the smallest unit of data in the abstraction and work our way up.

  • Event Item: An event item is a key-value pair that users use to store data for a given event. For example: {“device_type”: “ios”}.
  • Event: An event is a structured collection of one or more such event items. An event occurs at a specific point in time and is identified by a client-generated timestamp and an event identifier (such as a UUID). This combination of event_time and event_id also forms part of the unique idempotency key for the event, enabling users to safely retry requests.
  • Time Series ID: A time_series_id is a collection of one or more such events over the dataset’s retention period. For instance, a device_id would store all events occurring for a given device over the retention period. All events are immutable, and the TimeSeries service only ever appends events to a given time series ID.
  • Namespace: A namespace is a collection of time series IDs and event data, representing the complete TimeSeries dataset. Users can create one or more namespaces for each of their use cases. The abstraction applies various tunable options at the namespace level, which we will discuss further when we explore the service’s control plane.

API

The abstraction provides the following APIs to interact with the event data.

WriteEventRecordsSync: This endpoint writes a batch of events and sends back a durability acknowledgement to the client. This is used in cases where users require a guarantee of durability.

WriteEventRecords: This is the fire-and-forget version of the above endpoint. It enqueues a batch of events without the durability acknowledgement. This is used in cases like logging or tracing, where users care more about throughput and can tolerate a small amount of data loss.

{
"namespace": "my_dataset",
"events": [
{
"timeSeriesId": "profile100",
"eventTime": "2024-10-03T21:24:23.988Z",
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventItems": [
{
"eventItemKey": "deviceType",
"eventItemValue": "aW9z"
},
{
"eventItemKey": "deviceMetadata",
"eventItemValue": "c29tZSBtZXRhZGF0YQ=="
}
]
},
{
"timeSeriesId": "profile100",
"eventTime": "2024-10-03T21:23:30.000Z",
"eventId": "123e4567-e89b-12d3-a456-426614174000",
"eventItems": [
{
"eventItemKey": "deviceType",
"eventItemValue": "YW5kcm9pZA=="
}
]
}
]
}

ReadEventRecords: Given a combination of a namespace, a timeSeriesId, a timeInterval, and optional eventFilters, this endpoint returns all the matching events, sorted descending by event_time, with low millisecond latency.

{
"namespace": "my_dataset",
"timeSeriesId": "profile100",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"eventFilters": [
{
"matchEventItemKey": "deviceType",
"matchEventItemValue": "aW9z"
}
],
"pageSize": 100,
"totalRecordLimit": 1000
}

SearchEventRecords: Given a search criteria and a time interval, this endpoint returns all the matching events. These use cases are fine with eventually consistent reads.

{
"namespace": "my_dataset",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"searchQuery": {
"booleanQuery": {
"searchQuery": [
{
"equals": {
"eventItemKey": "deviceType",
"eventItemValue": "aW9z"
}
},
{
"range": {
"eventItemKey": "deviceRegistrationTimestamp",
"lowerBound": {
"eventItemValue": "MjAyNC0xMC0wMlQwMDowMDowMC4wMDBa",
"inclusive": true
},
"upperBound": {
"eventItemValue": "MjAyNC0xMC0wM1QwMDowMDowMC4wMDBa"
}
}
}
],
"operator": "AND"
}
},
"pageSize": 100,
"totalRecordLimit": 1000
}

AggregateEventRecords: Given a search criteria and an aggregation mode (e.g. DistinctAggregation) , this endpoint performs the given aggregation within a given time interval. Similar to the Search endpoint, users can tolerate eventual consistency and a potentially higher latency (in seconds).

{
"namespace": "my_dataset",
"timeInterval": {
"start": "2024-10-02T21:00:00.000Z",
"end": "2024-10-03T21:00:00.000Z"
},
"searchQuery": {...some search criteria...},
"aggregationQuery": {
"distinct": {
"eventItemKey": "deviceType",
"pageSize": 100
}
}
}

In the subsequent sections, we will talk about how we interact with this data at the storage layer.

Storage Layer

The storage layer for TimeSeries comprises a primary data store and an optional index data store. The primary data store ensures data durability during writes and is used for primary read operations, while the index data store is utilized for search and aggregate operations. At Netflix, Apache Cassandra is the preferred choice for storing durable data in high-throughput scenarios, while Elasticsearch is the preferred data store for indexing. However, similar to our approach with the API, the storage layer is not tightly coupled to these specific data stores. Instead, we define storage API contracts that must be fulfilled, allowing us the flexibility to replace the underlying data stores as needed.

Primary Datastore

In this section, we will talk about how we leverage Apache Cassandra for TimeSeries use cases.

Partitioning Scheme

At Netflix’s scale, the continuous influx of event data can quickly overwhelm traditional databases. Temporal partitioning addresses this challenge by dividing the data into manageable chunks based on time intervals, such as hourly, daily, or monthly windows. This approach enables efficient querying of specific time ranges without the need to scan the entire dataset. It also allows Netflix to archive, compress, or delete older data efficiently, optimizing both storage and query performance. Additionally, this partitioning mitigates the performance issues typically associated with wide partitions in Cassandra. By employing this strategy, we can operate at much higher disk utilization, as it reduces the need to reserve large amounts of disk space for compactions, thereby saving costs.

Here is what it looks like :

Time Slice: A time slice is the unit of data retention and maps directly to a Cassandra table. We create multiple such time slices, each covering a specific interval of time. An event lands in one of these slices based on the event_time. These slices are joined with no time gaps in between, with operations being start-inclusive and end-exclusive, ensuring that all data lands in one of the slices. By utilizing these time slices, we can efficiently implement retention by dropping entire tables, which reduces storage space and saves on costs.

Why not use row-based Time-To-Live (TTL)?

Using TTL on individual events would generate a significant number of tombstones in Cassandra, degrading performance, especially during range scans. By employing discrete time slices and dropping them, we avoid the tombstone issue entirely. The tradeoff is that data may be retained slightly longer than necessary, as an entire table’s time range must fall outside the retention window before it can be dropped. Additionally, TTLs are difficult to adjust later, whereas TimeSeries can extend the dataset retention instantly with a single control plane operation.

Time Buckets: Within a time slice, data is further partitioned into time buckets. This facilitates effective range scans by allowing us to target specific time buckets for a given query range. The tradeoff is that if a user wants to read the entire range of data over a large time period, we must scan many partitions. We mitigate potential latency by scanning these partitions in parallel and aggregating the data at the end. In most cases, the advantage of targeting smaller data subsets outweighs the read amplification from these scatter-gather operations. Typically, users read a smaller subset of data rather than the entire retention range.

Event Buckets: To manage extremely high-throughput write operations, which may result in a burst of writes for a given time series within a short period, we further divide the time bucket into event buckets. This prevents overloading the same partition for a given time range and also reduces partition sizes further, albeit with a slight increase in read amplification.

Note: With Cassandra 4.x onwards, we notice a substantial improvement in the performance of scanning a range of data in a wide partition. See Future Enhancements at the end to see the Dynamic Event bucketing work that aims to take advantage of this.

Storage Tables

We use two kinds of tables

  • Data tables: These are the time slices that store the actual event data.
  • Metadata table: This table stores information about how each time slice is configured per namespace.

Data tables

The partition key enables splitting events for a time_series_id over a range of time_bucket(s) and event_bucket(s), thus mitigating hot partitions, while the clustering key allows us to keep data sorted on disk in the order we almost always want to read it. The value_metadata column stores metadata for the event_item_value such as compression.

Writing to the data table:

User writes will land in a given time slice, time bucket, and event bucket as a factor of the event_time attached to the event. This factor is dictated by the control plane configuration of a given namespace.

For example:

During this process, the writer makes decisions on how to handle the data before writing, such as whether to compress it. The value_metadata column records any such post-processing actions, ensuring that the reader can accurately interpret the data.

Reading from the data table:

The below illustration depicts at a high-level on how we scatter-gather the reads from multiple partitions and join the result set at the end to return the final result.

Metadata table

This table stores the configuration data about the time slices for a given namespace.

Note the following:

  • No Time Gaps: The end_time of a given time slice overlaps with the start_time of the next time slice, ensuring all events find a home.
  • Retention: The status indicates which tables fall inside and outside of the retention window.
  • Flexible: This metadata can be adjusted per time slice, allowing us to tune the partition settings of future time slices based on observed data patterns in the current time slice.

There is a lot more information that can be stored into the metadata column (e.g., compaction settings for the table), but we only show the partition settings here for brevity.

Index Datastore

To support secondary access patterns via non-primary key attributes, we index data into Elasticsearch. Users can configure a list of attributes per namespace that they wish to search and/or aggregate data on. The service extracts these fields from events as they stream in, indexing the resultant documents into Elasticsearch. Depending on the throughput, we may use Elasticsearch as a reverse index, retrieving the full data from Cassandra, or we may store the entire source data directly in Elasticsearch.

Note: Again, users are never directly exposed to Elasticsearch, just like they are not directly exposed to Cassandra. Instead, they interact with the Search and Aggregate API endpoints that translate a given query to that needed for the underlying datastore.

In the next section, we will talk about how we configure these data stores for different datasets.

Control Plane

The data plane is responsible for executing the read and write operations, while the control plane configures every aspect of a namespace’s behavior. The data plane communicates with the TimeSeries control stack, which manages this configuration information. In turn, the TimeSeries control stack interacts with a sharded Data Gateway Platform Control Plane that oversees control configurations for all abstractions and namespaces.

Separating the responsibilities of the data plane and control plane helps maintain the high availability of our data plane, as the control plane takes on tasks that may require some form of schema consensus from the underlying data stores.

Namespace Configuration

The below configuration snippet demonstrates the immense flexibility of the service and how we can tune several things per namespace using our control plane.

"persistence_configuration": [
{
"id": "PRIMARY_STORAGE",
"physical_storage": {
"type": "CASSANDRA", // type of primary storage
"cluster": "cass_dgw_ts_tracing", // physical cluster name
"dataset": "tracing_default" // maps to the keyspace
},
"config": {
"timePartition": {
"secondsPerTimeSlice": "129600", // width of a time slice
"secondPerTimeBucket": "3600", // width of a time bucket
"eventBuckets": 4 // how many event buckets within
},
"queueBuffering": {
"coalesce": "1s", // how long to coalesce writes
"bufferCapacity": 4194304 // queue capacity in bytes
},
"consistencyScope": "LOCAL", // single-region/multi-region
"consistencyTarget": "EVENTUAL", // read/write consistency
"acceptLimit": "129600s" // how far back writes are allowed
},
"lifecycleConfigs": {
"lifecycleConfig": [ // Primary store data retention
{
"type": "retention",
"config": {
"close_after": "1296000s", // close for reads/writes
"delete_after": "1382400s" // drop time slice
}
}
]
}
},
{
"id": "INDEX_STORAGE",
"physicalStorage": {
"type": "ELASTICSEARCH", // type of index storage
"cluster": "es_dgw_ts_tracing", // ES cluster name
"dataset": "tracing_default_useast1" // base index name
},
"config": {
"timePartition": {
"secondsPerSlice": "129600" // width of the index slice
},
"consistencyScope": "LOCAL",
"consistencyTarget": "EVENTUAL", // how should we read/write data
"acceptLimit": "129600s", // how far back writes are allowed
"indexConfig": {
"fieldMapping": { // fields to extract to index
"tags.nf.app": "KEYWORD",
"tags.duration": "INTEGER",
"tags.enabled": "BOOLEAN"
},
"refreshInterval": "60s" // Index related settings
}
},
"lifecycleConfigs": {
"lifecycleConfig": [
{
"type": "retention", // Index retention settings
"config": {
"close_after": "1296000s",
"delete_after": "1382400s"
}
}
]
}
}
]

Provisioning Infrastructure

With so many different parameters, we need automated provisioning workflows to deduce the best settings for a given workload. When users want to create their namespaces, they specify a list of workload desires, which the automation translates into concrete infrastructure and related control plane configuration. We highly encourage you to watch this ApacheCon talk, by one of our stunning colleagues Joey Lynch, on how we achieve this. We may go into detail on this subject in one of our future blog posts.

Once the system provisions the initial infrastructure, it then scales in response to the user workload. The next section describes how this is achieved.

Scalability

Our users may operate with limited information at the time of provisioning their namespaces, resulting in best-effort provisioning estimates. Further, evolving use-cases may introduce new throughput requirements over time. Here’s how we manage this:

  • Horizontal scaling: TimeSeries server instances can auto-scale up and down as per attached scaling policies to meet the traffic demand. The storage server capacity can be recomputed to accommodate changing requirements using our capacity planner.
  • Vertical scaling: We may also choose to vertically scale our TimeSeries server instances or our storage instances to get greater CPU, RAM and/or attached storage capacity.
  • Scaling disk: We may attach EBS to store data if the capacity planner prefers infrastructure that offers larger storage at a lower cost rather than SSDs optimized for latency. In such cases, we deploy jobs to scale the EBS volume when the disk storage reaches a certain percentage threshold.
  • Re-partitioning data: Inaccurate workload estimates can lead to over or under-partitioning of our datasets. TimeSeries control-plane can adjust the partitioning configuration for upcoming time slices, once we realize the nature of data in the wild (via partition histograms). In the future we plan to support re-partitioning of older data and dynamic partitioning of current data.

Design Principles

So far, we have seen how TimeSeries stores, configures and interacts with event datasets. Let’s see how we apply different techniques to improve the performance of our operations and provide better guarantees.

Event Idempotency

We prefer to bake in idempotency in all mutation endpoints, so that users can retry or hedge their requests safely. Hedging is when the client sends an identical competing request to the server, if the original request does not come back with a response in an expected amount of time. The client then responds with whichever request completes first. This is done to keep the tail latencies for an application relatively low. This can only be done safely if the mutations are idempotent. For TimeSeries, the combination of event_time, event_id and event_item_key form the idempotency key for a given time_series_id event.

SLO-based Hedging

We assign Service Level Objectives (SLO) targets for different endpoints within TimeSeries, as an indication of what we think the performance of those endpoints should be for a given namespace. We can then hedge a request if the response does not come back in that configured amount of time.

"slos": {
"read": { // SLOs per endpoint
"latency": {
"target": "0.5s", // hedge around this number
"max": "1s" // time-out around this number
}
},
"write": {
"latency": {
"target": "0.01s",
"max": "0.05s"
}
}
}

Partial Return

Sometimes, a client may be sensitive to latency and willing to accept a partial result set. A real-world example of this is real-time frequency capping. Precision is not critical in this case, but if the response is delayed, it becomes practically useless to the upstream client. Therefore, the client prefers to work with whatever data has been collected so far rather than timing out while waiting for all the data. The TimeSeries client supports partial returns around SLOs for this purpose. Importantly, we still maintain the latest order of events in this partial fetch.

Adaptive Pagination

All reads start with a default fanout factor, scanning 8 partition buckets in parallel. However, if the service layer determines that the time_series dataset is dense — i.e., most reads are satisfied by reading the first few partition buckets — then it dynamically adjusts the fanout factor of future reads in order to reduce the read amplification on the underlying datastore. Conversely, if the dataset is sparse, we may want to increase this limit with a reasonable upper bound.

Limited Write Window

In most cases, the active range for writing data is smaller than the range for reading data — i.e., we want a range of time to become immutable as soon as possible so that we can apply optimizations on top of it. We control this by having a configurable “acceptLimit” parameter that prevents users from writing events older than this time limit. For example, an accept limit of 4 hours means that users cannot write events older than now() — 4 hours. We sometimes raise this limit for backfilling historical data, but it is tuned back down for regular write operations. Once a range of data becomes immutable, we can safely do things like caching, compressing, and compacting it for reads.

Buffering Writes

We frequently leverage this service for handling bursty workloads. Rather than overwhelming the underlying datastore with this load all at once, we aim to distribute it more evenly by allowing events to coalesce over short durations (typically seconds). These events accumulate in in-memory queues running on each instance. Dedicated consumers then steadily drain these queues, grouping the events by their partition key, and batching the writes to the underlying datastore.

The queues are tailored to each datastore since their operational characteristics depend on the specific datastore being written to. For instance, the batch size for writing to Cassandra is significantly smaller than that for indexing into Elasticsearch, leading to different drain rates and batch sizes for the associated consumers.

While using in-memory queues does increase JVM garbage collection, we have experienced substantial improvements by transitioning to JDK 21 with ZGC. To illustrate the impact, ZGC has reduced our tail latencies by an impressive 86%:

Because we use in-memory queues, we are prone to losing events in case of an instance crash. As such, these queues are only used for use cases that can tolerate some amount of data loss .e.g. tracing/logging. For use cases that need guaranteed durability and/or read-after-write consistency, these queues are effectively disabled and writes are flushed to the data store almost immediately.

Dynamic Compaction

Once a time slice exits the active write window, we can leverage the immutability of the data to optimize it for read performance. This process may involve re-compacting immutable data using optimal compaction strategies, dynamically shrinking and/or splitting shards to optimize system resources, and other similar techniques to ensure fast and reliable performance.

The following section provides a glimpse into the real-world performance of some of our TimeSeries datasets.

Real-world Performance

The service can write data in the order of low single digit milliseconds

while consistently maintaining stable point-read latencies:

At the time of writing this blog, the service was processing close to 15 million events/second across all the different datasets at peak globally.

Time Series Usage @ Netflix

The TimeSeries Abstraction plays a vital role across key services at Netflix. Here are some impactful use cases:

  • Tracing and Insights: Logs traces across all apps and micro-services within Netflix, to understand service-to-service communication, aid in debugging of issues, and answer support requests.
  • User Interaction Tracking: Tracks millions of user interactions — such as video playbacks, searches, and content engagement — providing insights that enhance Netflix’s recommendation algorithms in real-time and improve the overall user experience.
  • Feature Rollout and Performance Analysis: Tracks the rollout and performance of new product features, enabling Netflix engineers to measure how users engage with features, which powers data-driven decisions about future improvements.
  • Asset Impression Tracking and Optimization: Tracks asset impressions ensuring content and assets are delivered efficiently while providing real-time feedback for optimizations.
  • Billing and Subscription Management: Stores historical data related to billing and subscription management, ensuring accuracy in transaction records and supporting customer service inquiries.

and more…

Future Enhancements

As the use cases evolve, and the need to make the abstraction even more cost effective grows, we aim to make many improvements to the service in the upcoming months. Some of them are:

  • Tiered Storage for Cost Efficiency: Support moving older, lesser-accessed data into cheaper object storage that has higher time to first byte, potentially saving Netflix millions of dollars.
  • Dynamic Event Bucketing: Support real-time partitioning of keys into optimally-sized partitions as events stream in, rather than having a somewhat static configuration at the time of provisioning a namespace. This strategy has a huge advantage of not partitioning time_series_ids that don’t need it, thus saving the overall cost of read amplification. Also, with Cassandra 4.x, we have noted major improvements in reading a subset of data in a wide partition that could lead us to be less aggressive with partitioning the entire dataset ahead of time.
  • Caching: Take advantage of immutability of data and cache it intelligently for discrete time ranges.
  • Count and other Aggregations: Some users are only interested in counting events in a given time interval rather than fetching all the event data for it.

Conclusion

The TimeSeries Abstraction is a vital component of Netflix’s online data infrastructure, playing a crucial role in supporting both real-time and long-term decision-making. Whether it’s monitoring system performance during high-traffic events or optimizing user engagement through behavior analytics, TimeSeries Abstraction ensures that Netflix operates seamlessly and efficiently on a global scale.

As Netflix continues to innovate and expand into new verticals, the TimeSeries Abstraction will remain a cornerstone of our platform, helping us push the boundaries of what’s possible in streaming and beyond.

Stay tuned for Part 2, where we’ll introduce our Distributed Counter Abstraction, a key element of Netflix’s Composite Abstractions, built on top of the TimeSeries Abstraction.

Acknowledgments

Special thanks to our stunning colleagues who contributed to TimeSeries Abstraction’s success: Tom DeVoe Mengqing Wang, Kartik Sathyanarayanan, Jordan West, Matt Lehman, Cheng Wang, Chris Lohfink .


Introducing Netflix’s TimeSeries Data Abstraction Layer was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

We Compared ScyllaDB and Memcached and… We Lost?

An in-depth look at database and cache internals, and the tradeoffs in each. ScyllaDB would like to publicly acknowledge dormando (Memcached maintainer) and Danny Kopping for their contributions to this project, as well as thank them for their support and patience. Engineers behind ScyllaDB – the database for predictable performance at scale – joined forces with Memcached maintainer dormando to compare both technologies head-to-head, in a collaborative vendor-neutral way. The results reveal that: Both Memcached and ScyllaDB maximized disks and network bandwidth while being stressed under similar conditions, sustaining similar performance overall. While ScyllaDB required data modeling changes to fully saturate the network throughput, Memcached required additional IO threads to saturate disk I/O. Although ScyllaDB showed better latencies when compared to Memcached pipelined requests to disk, Memcached latencies were better for individual requests. This document explains our motivation for these tests, provides a summary of the tested scenarios and results, then presents recommendations for anyone who might be deciding between ScyllaDB and Memcached. Along the way, we analyze the architectural differences behind these two solutions and discuss the tradeoffs involved in each. There’s also a detailed Gitbook for this project, with a more extensive look at the tests and results and links to the specific configurations you can use to perform the tests yourself. Bonus: dormando and I will recently discussed this project at P99 CONF, a highly technical conference on performance and low latency engineering. Watch on demand Why have we done this? First and foremost, ScyllaDB invested lots of time and engineering resources optimizing our database to deliver predictable low latencies for real-time data-intensive applications. ScyllaDB’s shard-per-core, shared-nothing architecture, userspace I/O scheduler and internal cache implementation (fully bypassing the Linux page cache) are some notable examples of such optimizations. Second: performance converges over time. In-memory caches have been (for a long time) regarded as one of the fastest infrastructure components around. Yet, it’s been a few years now since caching solutions started to look into the realm of flash disks. These initiatives obviously pose an interesting question: If an in-memory cache can rely on flash storage, then why can’t a persistent database also work as a cache? Third: We previously discussed 7 Reasons Not to Put a Cache in Front of Your Database and recently explored how specific teams have successfully replaced their caches with ScyllaDB. Fourth: At last year’s P99 CONF, Danny Kopping gave us an enlightening talk, Cache Me If You Can, where he explained how Memcached Extstore helped Grafana Labs scale their cache footprint 42x while driving down costs. And finally, despite the (valid) criticism that performance benchmarks receive, they still play an important role in driving innovation. Benchmarks are a useful resource for engineers seeking in-house optimization opportunities. Now, on to the comparison. Setup Instances Tests were carried out using the following AWS instance types: Loader: c7i.16xlarge (64 vCPUs, 128GB RAM) Memcached: i4i.4xlarge (16 vCPUs, 128GB RAM, 3.75TB NVMe) ScyllaDB: i4i.4xlarge (16 vCPUs, 128GB RAM, 3.75TB NVMe) All instances can deliver up to 25Gbps of network bandwidth. Keep in mind that specially during tests maxing out the promised Network Capacity, we noticed throttling shrinking down the bandwidth down to the instances’ baseline capacity. Optimizations and Settings To overcome potential bottlenecks, the following optimizations and settings were applied: AWS side: All instances used a Cluster placement strategy, following the AWS Docs: “This strategy enables workloads to achieve the low-latency network performance necessary for tightly-coupled node-to-node communication that is typical of high-performance computing (HPC) applications.” Memcached: Version 1.6.25, compiled with Extstore enabled. Except where denoted, run with 14 threads, pinned to specific CPUs. The remaining 2 vCPUs were assigned to CPU 0 (core & HT sibling) to handle Network IRQs, as specified by the sq_split mode in seastar perftune.py. CAS operations were disabled to save space on per-item overhead. The full command line arguments were: taskset -c 1-7,9-15 /usr/local/memcached/bin/memcached -v -A -r -m 114100 -c 4096 –lock-memory –threads 14 -u scylla -C ScyllaDB: Default settings as configured by ScyllaDB Enterprise 2024.1.2 AMI (ami-id: ami-018335b47ba6bdf9a) in an i4i.4xlarge. This includes the same CPU pinning settings as described above for Memcached. Stressors For Memcached loaders, we used mcshredder, part of memcached’s official testing suite. The applicable stressing profiles are in the fee-mendes/shredders GitHub repository. For ScyllaDB, we used cassandra-stress, as shipped with ScyllaDB, and specified comparable workloads as the ones used for Memcached. Tests and Results The following is a summary of the tests we conducted and their results. If you want a more detailed description and analysis, go to the extended writeup of this project. RAM Caching Efficiency The more items you can fit into RAM, the better your chance of getting cache hits. More cache hits result in significantly faster access than going to disk. Ultimately, that improves latency. This project began by measuring how many items we could store to each datastore. Throughout our tests, the key was between 4 to 12 bytes (key0 .. keyN) for Memcached, and 12 bytes for ScyllaDB. The value was fixed to 1000 bytes. Memcached Memcached stored roughly 101M items until eviction started. It’s memory efficient. Out of Memcached’s 114G assigned memory, this is approximately 101G worth of values, without considering the key size and other flags: Memcached stored 101M items in memory before evictions started ScyllaDB ScyllaDB stored between 60 to 61M items before evictions started. This is no surprise, given that its protocol requires more data to be stored as part of a write (such as the write timestamp since epoch, row liveness, etc). ScyllaDB also persists data to disk as you go, which means that Bloom Filters (and optionally Indexes) need to be stored in memory for subsequent disk lookups. With ScyllaDB, eviction starts under memory pressure while trying to load 61M rows Takeaways Memcached stored approximately 65% more in-memory items than ScyllaDB. ScyllaDB rows have higher per-item overhead to support a wide-column orientation. In ScyllaDB, Bloom Filters, Index Caching, and other components are also stored in-memory to support efficient disk lookups, contributing to yet another layer of overhead. Read-only In-Memory Workload The ideal (though unrealistic) workload for a cache is one where all the data fits in RAM – so that reads don’t require disk accesses and no evictions or misses occur. Both ScyllaDB and Memcached employ LRU (Least Recently Used) logic for freeing up memory: When the system runs under pressure, items get evicted from the LRU’s tail; these are typically the least active items. Taking evictions and cache misses out of the picture helps measure and set a performance baseline for both datastores. It places the focus on what matters most for these kinds of workloads: read throughput and request latency. In this test, we first warmed up both stores with the same payload sizes used during the previous test. Then, we initiated reads against their respective ranges for 30 minutes. Memcached Memcached achieved an impressive 3 Million Gets per second, fully maximizing AWS NIC bandwidth (25 Gbps)! Memcached kept a steady 3M rps, fully maximizing the NIC throughput The parsed results show that p99.999 responses completed below 1ms: stat: cmd_get : Total Ops: 5503513496 Rate: 3060908/s === timer mg === 1-10us 0 0.000% 10-99us 343504394 6.238% 100-999us 5163057634 93.762% 1-2ms 11500 0.00021% ScyllaDB To read more rows in ScyllaDB, we needed to devise a better data model for client requests due to protocol characteristics (in particular, no pipelining). With a clustering key, we could fully maximize ScyllaDB’s cache, resulting in a significant improvement in the number of cached rows. We ingested 5M partitions, each with 16 clustering keys, for a total of 80M cached rows. As a result, the number of records within the cache significantly improved compared to the key-value numbers shown previously. As dormando correctly pointed out (thanks!), this configuration is significantly different than the previous Memcached set-up. While the Memcached workload always hits an individual key-value pair, a single request in ScyllaDB results in several rows being returned. Notably, the same results could be achieved using Memcached by feeding the entire payload as the value under a single key, with the results scaling accordingly. We explained the reasons for these changes in the detailed writeup. There, we covered characteristics of the CQL protocol (such as the per-item overhead [compared to memcached] and no support for pipelining) which make wide-partitions more efficient on ScyllaDB than single-key fetches. With these adjustments, our loaders ran a total of 187K read ops/second over 30 minutes. Each operation resulted in 16 rows getting retrieved. Similarly to memcached, ScyllaDB also maximized the NIC throughput. It served roughly 3M rows/second solely from in-memory data: ScyllaDB Server Network Traffic as reported by node_exporter Number of read operations (left) and rows being hit (right) from cache during the exercise ScyllaDB exposes server-side latency information, which is useful for analyzing latency without the network. During the test, ScyllaDB’s server-side p99 latency remained within 1ms bounds: Latency and Network traffic from ScyllaDB matching the adjustments done The client-side percentiles are, unsurprisingly, higher than the server-side latency with a read P99 of 0.9ms. cassandra-stress P99 latency histogram Takeaways Both Memcached and ScyllaDB fully saturated the network; to prevent saturating the maximum network packets per second, Memcached relied on request pipelining whereas ScyllaDB was switched to a wide-column orientation. ScyllaDB’s cache showed considerable gains following a wide-column schema, able to store more items compared to the previous simple key-value orientation. On the protocol level, Memcached’s protocol is simpler and lightweight, whereas ScyllaDB’s CQL provides richer features but can be heavier. Adding Disks to the Picture Measuring flash storage performance introduces its own set of challenges, which makes it almost impossible to fully characterize a given workload realistically. For disk-related tests, we decided to measure the most pessimistic situation: Compare both solutions serving data (mostly) from block storage, knowing that: The likelihood of realistic workloads doing this is somewhere close to zero Users should expect numbers in between the previous optimistic cache workload and the pessimistic disk-bound workload in practice Memcached Extstore The Extstore wiki page provides extensive detail into the solution’s inner workings. At a high-level, it allows memcached to keep its hash table and keys in memory, but store values onto external storage. During our tests, we populated memcached with 1.25B items with a value size of 1KB and a keysize of up to 14 bytes: Evictions started as soon as we hit approximately 1.25B items, despite free disk space With Extstore, we stored around 11X the number of items compared to the previous in-memory workload until evictions started to kick in (as shown in the right hand panel in the image above). Even though 11X is an already impressive number, the total data stored on flash was only 1.25TB out of the total 3.5TB provided by the AWS instance. Read-Only Performance For the actual performance tests, we stressed Extstore against item sizes of 1KB and 8KB. The table below summarizes the results: Test Type Items per GET Payload Size IO Threads GET Rate P99 perfrun_metaget_pipe 16 1KB 32 188K/s 4~5 ms perfrun_metaget 1 1KB 32 182K/s <1ms perfrun_metaget_pipe 16 1KB 64 261K/s 5~6 ms perfrun_metaget 1 1KB 64 256K/s 1~2ms perfrun_metaget_pipe 16 8KB 16 92K/s 5~6 ms perfrun_metaget 1 8KB 16 90K/s <1ms perfrun_metaget_pipe 16 8KB 32 110K/s 3~4 ms perfrun_metaget 1 8KB 32 105K/s <1ms ScyllaDB We populated ScyllaDB with the same number of items as used for memcached. Although ScyllaDB showed higher GET rates than memcached, it did so under slightly higher tail latencies compared to memcached’s non-pipelining workloads. This is summarized below: Test Type Items per GET Payload Size GET Rate Server-side P99 Client-side P99 1KB Read 1 1KB 268.8K/s 2ms 2.4ms 8KB Read 1 8KB 156.8K/s 1.54ms 1.9ms Takeaways Extstore required considerable tuning to its settings in order to fully saturate flash storage I/O. Due to Memcached architecture, smaller payloads are unable to fully utilize the available disk space, providing smaller gains compared to ScyllaDB. ScyllaDB rates were overall higher than Memcached in a key-value orientation, especially under higher payload sizes. Latencies were better than pipelined requests, but slightly higher than individual GETs in Memcached. Overwrite Workload Following our previous Disk results, we then compared both solutions in a read-mostly workload targeting the same throughput (250K ops/sec). The workload in question is a slight modification of memcached’s ‘basic’ test for Extstore, with 10% random overwrites. It is considered a “semi-worst case scenario.”. Memcached Memcached achieved a rate of slightly under 249K during the test. Although the write rates remained steady during the duration of the test, we observed that reads fluctuated slightly throughout the run: Memcached: Read-mostly workload metrics We also observed slightly high extstore_io_queue metrics despite the lowered read ratios, but latencies still remained low. These results are summarized below: Operation IO Threads Rate P99 Latency cmd_get 64 224K/s 1~2 ms cmd_set 64 24.8K/s <1ms ScyllaDB The ScyllaDB test was run using 2 loaders, each with half of the target rate. Even though ScyllaDB achieved a slightly higher throughput (259.5K), the write latencies were kept low throughout the run and the read latencies were higher (similarly as with memcached): ScyllaDB: Read-mostly workload metrics The table below summarizes the client-side run results across the two loaders: Loader Rate Write P99 Read P99 loader1 124.9K/s 1.4ms 2.6 ms loader2 124.6K/s 1.3ms 2.6 ms Takeaways Both Memcached and ScyllaDB write rates were steady, with reads slightly fluctuating throughout the run ScyllaDB writes still account for the commitlog overhead, which sits in the hot write path ScyllaDB server-side latencies were similar to those observed in Memcached results, although client-side latencies were slightly higher Read a more detailed analysis in the Gitbook for this project Wrapping Up Both memcached and ScyllaDB managed to maximize the underlying hardware utilization across all tests and keep latencies predictably low. So which one should you pick? The real answer: It depends. If your existing workload can accommodate a simple key-value model and it benefits from pipelining, then memcached should be more suitable to your needs. On the other hand, if the workload requires support for complex data models, then ScyllaDB is likely a better fit. Another reason for sticking with Memcached: it easily delivers traffic far beyond what a NIC can sustain. In fact, in this Hacker News thread, dormando mentioned that he could scale it up past 55 million read ops/sec for a considerably larger server. Given that, you could make use of smaller and/or cheaper instance types to sustain a similar workload, provided the available memory and disk footprint suffice your workload needs. A different angle to consider is the data set size. Even though Extstore provides great cost savings by allowing you to store items beyond RAM, there’s a limit to how many keys can fit per GB of memory. Workloads with very small items should observe smaller gains compared to those with larger items. That’s not the case with ScyllaDB, which allows you to store billions of items irrespective of their sizes. It’s also important to consider whether data persistence is required. If it is, then running ScyllaDB as a replicated distributed cache provides you greater resilience and non-stop operations, with the tradeoff being (and as memcached correctly states) that replication halves your effective cache size. Unfortunately Extstore doesn’t support warm restarts and thus the failure or maintenance of a single node is prone to elevating your cache miss ratios. Whether this is acceptable or not depends on your application semantics: If a cache miss corresponds to a round-trip to the database, then the end-to-end latency will be momentarily higher. With regards to consistent hashing, memcached clients are responsible for distributing keys across your distributed servers. This may introduce some hiccups, as different client configurations will cause keys to be assigned differently, and some implementations may not be compatible with each other. These details are outlined in Memcached’s ConfiguringClient wiki. ScyllaDB takes a different approach: consistent hashing is done at the server level and propagated to clients when the connection is first established. This ensures that all connected clients always observe the same topology as you scale. So who won (or who lost)? Well… This does not have to be a competition, nor an exhaustive list outlining every single consideration for each solution. Both ScyllaDB and memcached use different approaches to efficiently utilize the underlying infrastructure. When configured correctly, both of them have the potential to provide great cost savings. We were pleased to see ScyllaDB matching the numbers of the industry-recognized Memcached. Of course, we had no expectations of our database being “faster.” In fact, as we approach microsecond latencies at scale, the definition of faster becomes quite subjective. 🙂 Continuing the Discussion at P99 CONF Reminder: dormando (Alan Kasindorf) and I recently discussed this project at P99 CONF, a highly technical conference on performance and low latency engineering. Watch video  

Apache Cassandra 5.0 and DataStax: The Benefits of Staying in Sync

As an Apache Cassandra® committer and long-time advocate, I'd like to walk you through the relationship between the open-source Cassandra project and DataStax. With the recent release of Cassandra 5.0, it's the perfect time to explore how this collaboration drives innovation and benefits the entire...

Low-Latency Database Strategies Featured at P99 CONF 24

Obsessed with high-performance low-latency data systems? See the 20+ data-related tech talks sessions we’re hosting at P99 CONF 2024. P99 CONF is a (free + online) highly-technical conference for engineers who obsess over P99 percentiles and long-tail latencies. The open source, community-focused event is hosted by ScyllaDB, the company behind the monstrously fast and scalable NoSQL database (and the adorable one-eyed sea monster). Since database performance is so near and dear to ScyllaDB, we quite eagerly reached out to our friends and colleagues across the community to ensure a wide spectrum of distributed data systems, approaches, and challenges would be represented at P99 CONF. As you can see on our agenda, the response was overwhelming. This year’s attendees get to hear from – and interact with – the creators of Postgres, ScyllaDB, Turso, SlateDB, SpiceDB, Arroyo, Responsive, FerretDB, and Percona. We’re also looking forward to sessions with engineers from Redis, Oracle, TigerBeetle, AWS, QuestDB, and more. There’s a series of keynotes focused on rethinking the database, deep dives into database internals, and case studies of database engineering feats at organizations like Uber, Shopify, ShareChat, and LinkedIn. If you share our obsession with high-performance low-latency data systems, here’s a rundown of sessions to consider joining at P99 CONF 24. Register Now – It’s Free   Just In Time LSM Compaction Aleksei Kladov (TigerBeetle) TigerBeetle is a reliable and fast accounting database. Its primary on-disk data structure is a log-structured merge-tree. This talk is a deep dive into TigerBeetle’s compaction algorithm — “garbage collection” for LSM, which achieves several unusual goals: Full storage determinism across replicas, enabling recovery from disk faults. Absence of dynamic memory allocation. Highly concurrent implementation, utilizing all available CPU and disk bandwidth. Perfect pacing: resources are carefully balanced between compaction and normal transaction processing, avoiding starvation and guaranteeing bounded P100. Time-Series and Analytical Databases Walk in a Bar… Andrei Pechkurov (QuestDB) A good time-series database also has to be a decent analytical database. This implies both SQL features and efficient query processing. That’s why we recently added many optimizations to QuestDB’s SQL engine, featuring better on-disk data layout, specialized data structures, SIMD and SWAR-based code, scalable aggregation algorithms, and parallel execution pipelines. Many of these additions can be met in popular databases, some are unique to QuestDB. In this talk, we will go through the most important optimizations, and discuss what’s yet to be added and where we are when compared with the fastest analytical databases. The Next Chapter in the Sordid Love/Hate Relationship Between DBs and OSes Andy Pavlo (Carnegie Mellon) Database management systems (DBMSs) are beautiful, free-spirited software that want nothing more than to help users store and access data as quickly as possible. To achieve this goal, DBMSs have spent decades trying to avoid operating systems (OSs) at all costs. Such avoidance is necessary because OSs always try to impose their will on DBMSs and stifle their ambitions through disingenuous syscall semantics, unscalable kernel-level data structures, and excessive data copying. The many attempts to avoid the OS through kernel-bypass methods or custom hardware have such high engineering/R&D costs that few DBMSs support them. In the end, DBMSs are stuck in an abusive relationship: they need the OS to run their software and provide them with basic functionalities (e.g., memory allocation), but they do not like how the OS treats them. However, new technologies like eBPF, which allow DBMSs to run custom code safely inside the OS kernel to override its functionality, are poised to upend this power struggle. In this talk, I will present a new design approach called “user-bypass” for building high-performance database systems and services with eBPF. I will discuss recent developments in eBPF relevant to the DBMS community and what parts of a DBMS are most amenable to using it. We will also present the design of BPF-DB, an embedded DBMS written in eBPF that provides ACID transactions over multi-versioned data and runs entirely in the Linux kernel. Designing a Query Queue for ScyllaDB Avi Kivity (ScyllaDB) Database queries can be highly variable. Some are served immediately from cache, return a single row, and are done in milliseconds. Others return gigabytes or terabytes of data, take minutes or hours, and require plenty of disk I/O and compute. Deciding what concurrency to use when running these queries is a delicate balance of CPU consumption, memory consumption, and the queue designer’s nerves. A bad design can mean high latency, under-utilization of available resources, or crashing when one runs out of memory. This talk will cover the decisions we made while designing ScyllaDB’s query queue. Reliable Data Replication Cameron Morgan (Shopify) Data replication is required to make data highly available to users. Highly available in this context means users can access data in a reliable, consistent and timely fashion. Because it is so important, if this problem has not come up in your work, you have certainly used such a system. This talk focuses on the hard problems of data replication, the ones that are usually skipped in talks. What is a backfill and why do I need them to be reliable, non-blocking and often? How do you handle schema changes? How do you validate the data is correct? How can you be resistant to failure? How can you write in parallel? This talk is about the hard problems Shopify solved replicating Shopify stores to the Edge and reaching ~5M rows replicated per second with < 1s replication lag p99. Rust: A Productive Language for Writing Database Applications Carl Lerche (AWS) When you think about Rust, you might think of performance, safety, and reliability, but what about productivity? Last year, I recommended considering Rust for developing high-level applications. Rust showed great promise, but its library ecosystem needed to mature. What has changed since then? Many higher-level applications sit on top of a database. In this talk, I will explore the current state of Rust libraries for database access, focusing on ergonomics and ease of use—two crucial factors in high-level database application development. Building a Cloud Native LSM on Object Storage Chris Riccomini (Materialized View) and Rohan Desai (Responsive) This talk discusses the design and implementation of SlateDB, an open source cloud native storage engine built as a log-structured merge-tree (LSM) on top of an object store like S3, Google Cloud Storage (GCS), or Azure Blob Store (ABS). LSMs are traditionally built assuming data will reside on local storage. Building an LSM on object storage allows SlateDB to benefit from object storage replication and durability guarantees while presenting unique latency and cost challenges. We’ll discuss the design decisions and tradeoffs we faced when building SlateDB. Taming Tail Latencies in Apache Pinot with Generational ZGC Christopher Peck (Uber) The introduction of Generational ZGC mostly eliminated concerns around pause time for Java applications. This session will cover a real world application of Generational ZGC and the effects. The session will also cover how application level configs/features can be used to offset some of the trade-offs we encountered when switching to Generational ZGC. Apache Pinot is an OLAP database with an emphasis on low latency. We’ll walk through how we solved large scatter gather induced tail latencies in our Pinot clusters by switching to Generational ZGC, uncovering the low latency query potential of Pinot. We’ll also a couple of Pinot’s features which made using Generational ZGC possible. Elevating PostgreSQL: Benchmarking Vector Search Performance Daniel Seybold (benchANT) The database market is constantly evolving with new database systems addressing specific use cases such as time series data or vector search. But there is one open source database system which has been around since nearly three decades and which has gained a strong momentum over the last years, PostgreSQL. Due its pure open source approach and strong community, PostgreSQL is continuously improving on its features, performance and extensions that enable PostgreSQL to handle also specific use cases such as vector search. Over the last years, multiple native vector database systems have been established and many NoSQL and relational database systems have released vector extensions for their database systems. The same goes for PostgreSQL with two available vector search extensions, pgvector and pgvecto.rs. And since vector search performance is a crucial differentiating factor for every vector search database, we report on the latest vector search benchmark results for PostgreSQL with pgvector and pgvecto.rs. The benchmarking study covers multiple data sets of varying vector dimensions, also considering different PostgreSQL configurations, from a baseline configuration to tuned configurations. Overcoming Distributed Databases Scaling Challenges with Tablets Dor Laor (ScyllaDB) Getting fantastic performance cannot stop at the server level. Even after rewriting your code in assembly, you would need multiple servers to run at scale and to provide availability. Clusters are often sharded to achieve good performance. In this session, I will cover tablets, a new dynamic sharding design we applied at ScyllaDB in order to maximize cpu and storage utilization dynamically and to maximize elasticity speed. Why Databases Cache, but Caches Go to Disk Felipe Mendes (ScyllaDB) and Alan Kasindorf (Cache Forge) Caches and Databases are different animals. Yet, databases have always cached data and caches are exploring disks. To quantify the strengths and tradeoffs of each, ScyllaDB joined forces with Memcached’s maintainer to compare both across different scenarios. Join us as we discuss how the results trace back to each underlying architectures’ design decisions. Specifically, we’ll compare ScyllaDB row-based cache with Memcached’s in-memory hash table, and look at how Memcached’s External Flash Storage compares to ScyllaDB’s userspace I/O scheduler and asynchronous AIO/DIO. Feature Store Evolution Under Cost Constraints: When Cost is Part of the Architecture Ivan Burmistrov and David Malinge (ShareChat) At P99 CONF 23, the ShareChat team presented the scaling challenges for the ML Feature Store so it could handle 1 billion features per second. Once the system was scaled to handle the load, the next challenge the team faced was extreme cost constraints: it was required to make the same quality system much cheaper to run. This year, we will talk about approaches the team implemented in order to optimize for cost in the Cloud environment while maintaining the same SLA for the service. The talk will touch on such topics as advanced optimizations on various levels to bring down the compute, minimizing the waste when running on Kubernetes, autoscaling challenges for stateful Apache Flink jobs, and others. The talk should be useful for those who are either interested in building or optimizing an ML Feature Store or in general looking into cost optimizations in the cloud environment. Running Low-Latency Workloads on Kubernetes Jimmy Zelinskie (authzed) Configuring Kubernetes to optimally run a particular workload is best described as a continuous journey. Depending on your requirements, best practices might not only no longer apply, but actively harm performance. In this session, we document what we’ve found to work best in our journey running SpiceDB, a low-latency authorization system. Cheating the Cloud: 50% Savings with Compression Dictionaries Łukasz Paszkowsk (ScyllaDB) Discover how to slash networking costs by up to 50% with advanced compression techniques. This session covers a real-world case where the default LZ4 compression in Cassandra, with its limited 25% efficiency, was causing high costs in inter-zone replication. We’ll introduce a custom RPC compressor with external dictionary support that samples RPC traffic, trains optimized dictionaries, and seamlessly switches connections to these new dictionaries. Learn how this approach dramatically improves compression ratios, reduces cloud expenses, and enhances data transfer efficiency across distributed systems. It’s perfect for those looking to optimize cloud infrastructure. Latency, Throughput & Fault Tolerance: Designing the Arroyo Streaming Engine Micah Wylde (Arroyo) Arroyo is a distributed, stateful stream processing engine written in Rust. It combines predictable millisecond-latency processing with the throughput of a high-performance batch query engine—on top of a distributed checkpointing implementation that provides fault tolerance and exactly-once processing. These design goals are often in tension: increasing throughput generally comes at the expense of latency, and consistent checkpointing can introduce periodic latency spikes while we wait for alignment and IO. In this talk, I will cover the distributed architecture and implementation of Arroyo including the core Arrow-based dataflow engine, algorithms for stateful windowing and aggregates, and the Chandy-Lamport inspired distributed checkpointing system. You’re Doing It All Wrong Michel Stonebraker (MIT, Postgres creator) In this talk, we consider business data processing applications, which have historically been written for a three-tier architecture. Two ideas totally upset this applecart. Idea #1: The Cloud All enterprises are moving everything possible to the cloud as quickly as possible. In this new environment, you are highly encouraged to use a cloud-native architecture, whereby your system is composed of distributed functions, working in parallel, and running on a serverless (and stateless) platform like AWS Lambda or Azure Functions. You program your application as a workflow of “steps.” To make systems resilient to failures you require a separate state machine and workflow manager (e.g., AWS Step Functions, Airflow, etc.). If you use this architecture, then you don’t pay for resources when your application is idle, often a major benefit. Depending on the platform, you may also get automatic resource elasticity and load balancing; additional major benefits. Idea #2: Leverage the DBMS Obviously, your data belongs in a DBMS. However, by extension, so does the state of your application. Keeping track of application state in the DBMS allows one to provide once-and-only-once execution semantics for your workflow. One can also use the database concept of “sagas” to allow multi-transaction applications to be done to completion or not at all. Furthermore, to go an order of magnitude faster that AWS Lambda, you need to collocate your application and the DBMS. The fastest alternative is to run your application inside the DBMS using stored procedures (SPs). However, it is imperative to overcome SP weaknesses, specifically the requirement of a different language (e.g.PL/SQL) and the absence of a debugging environment. The latter can be accomplished by persisting the database log and allowing “time travel debugging” for SPs. The former can be supported by coding SPs in a conventional language such as Typescript. Extending this idea to the operating environment, one can time travel the entire system, thereby allowing recovery to a previous point in time when disasters happen (errant programs, adversary intrusions, ransomware, etc.). I will discuss one such platform (DBOS) with all of the above features. In my opinion, this is an example of why “you are doing it all wrong.” Taming Discard Latency Spikes Patryk Wróbel (ScyllaDB) Discover an interesting lesson related to the impact of discarding files on read and write latency on modern NVMe SSDs, learned while fixing a real-world problem in ScyllaDB. Dive into the way how TRIM requests are issued when online discard is enabled for the XFS file system, the problems that may occur, and possible solutions. Redis Alternatives Compared Peter Zaitsev (Percona) In my talk, I will delve into a variety of Redis alternatives, providing an unbiased analysis that encompasses emerging forks like Valley and Redix, established contenders such as DragonflyDB and KeyDB, and unique options like Microsoft Garnet and Redka. My presentation will cover critical aspects of these alternatives, including licensing models and their implications, comparisons of feature sets and functionality, governance and community support structures, and performance considerations tailored to different use cases. You will leave with a clearer understanding of how each alternative could meet specific needs, insights into open source compliance and licensing, and an appreciation of the importance of performance and support options in choosing the right solution. Join me to clarify your options and strategize your approach in the ever-changing world of Redis alternatives. Database Drivers: Performance Perspectives Piot Sarna (poolside) This talk explains how to get the most out of database drivers by understanding their design and potential. It’s a technical deep dive into how database drivers work underneath, and how to adjust their performance to your expectations. Using eBPF Off-CPU Sampling to See What Your Databases Are Really Waiting For Tanel Poder At P99 CONF 23, I introduced the general concept of using eBPF-populated Task State Arrays to keep track of all Linux applications’ (including database engines) thread states and activity without relying on the built-in instrumentation of the application. For example, the “wait events” built into database engines are not perfect; some voluntary waits (system calls) are not properly instrumented in all database engines. There are also other involuntary waits caused by OS-level issues, like memory allocation stalls, CPU queuing, and task scheduler glitches. This year, I will show the latest eBPF-based “xcapture” tool in practical use, measuring where MySQL, Postgres, and DuckDB really spend their time, both when on CPU and sleeping. All this can be done without having to change any source code of the database engine or applications running on it. Java Heap Memory Optimization to Improve P99 Query Latency at LinkedIn Scale Vivek Iyer Vaidyanathan Iyer (LinkedIn) Apache Pinot is a real-time, distributed, OLAP database designed to serve low-latency SQL queries at high throughput. It was built and open-sourced by Linkedin and powers many site facing use cases for low latency realtime analytics. Pinot Servers, the work-horses of SQL query processing, store table shards on local SSDs and memory map the columnar data buffers (data, indexes etc). In some specialized use cases where we have P99 query SLA under 100ms, the column buffers are loaded on Java heap as opposed to off heap (memory map). The data in these on heap column buffers are characterized by high cardinality, featuring a high number of unique objects alongside a notable abundance of DUPLICATE objects. Duplicate Objects waste almost 20% of the JVM heap per host. The memory-intensive nature of our OnHeap dictionary indexes leads to high Java Heap usage resulting in spiky P99 latencies due to the unpredictable nature of Java Garbage Collection. This talk will challenge the conventional notion that discourages the use of Interning methodologies and showcase how the Pinot production deployments at LinkedIn saw 20% heap savings per host along while improving P99 query latencies by 35% using a home-grown, efficient strategy of FALF Interning – Fixed-Size Array Lock-Free Interning. Designed as a small, fixed-size, open-hashmap-based object cache that duplicates objects opportunistically, these Interners work on all object types and are 17% faster than the traditional Interners. In this talk, we will present on how we used the JXRAY memory analysis to discover the problem, design, implementation and the P99 query latency improvements we observed in production @ LinkedIn Scale. We will discuss the general challenges to solve the duplicate objects problem for Java-based systems where the traditional methods of tuning JVM parameters, employing native or Guava Interners don’t work. Join the Conference Online – It’s Free

Understanding Distributed System Performance… from the Grocery Store

Learn essential steps for boosting distributed system performance– explained with grocery store checkout analogies. I visited a small local grocery store which happens to be in a touristy part of my neighborhood. If you’ve ever traveled abroad, then you’ve probably visited a store like that to stock up on bottled water without purchasing the overpriced hotel equivalent. This was one of these stores. To my misfortune, my visit happened to coincide with a group of tourists arriving all at once to buy beverages and warm up (it’s winter!). It just so happens that selecting beverages is often much faster than buying fruit – the reason for my visit. So after I had selected some delicious apples and grapes, I ended up waiting in line behind 10 people. And there was a single cashier to serve us all. The tourists didn’t seem to mind the wait (they were all chatting in line), but I sure wish that the store had more cashiers so I could get on with my day faster. What does this have to do with system performance? You’ve probably experienced a similar situation yourself and have your own tale to tell. It happens so frequently that sometimes we forget how applicable these situations can be to other domain areas, including distributed systems.Sometimes when you evaluate a new solution, the results don’t meet your expectations. Why is latency high? Why is the throughput so low? Those are two of the top questions that pop up every now and then. Many times, the challenges can be resolved by optimizing your performance testing approach, as well as better maximizing your solution’s potential. As you’ll realize, improving the performance of a distributed system is a lot like ensuring speedy checkouts in a grocery store. This blog covers 7 performance-focused steps for you to follow as you evaluate distributed systems performance. Step #1: Measure Time With groceries, the first step towards doing any serious performance optimization is to precisely measure how long it takes for a single cashier to scan a barcode. Some goods, like bulk fruits that require weighing, may take longer to scan than products in industrial packaging. A common misconception is that processing happens in parallel. It does not (note: we’re not referring to capabilities like SIMD and pipelining here). Cashiers do not service more than a single person at a time, nor do they scan your products’ barcodes simultaneously. Likewise, a single CPU in a system will process one work unit at a time, no matter how many requests are sent to it. In a distributed system, consider all the different work units you have and execute them in an isolated way against a single shard. Execute your different items with single-threaded execution and measure how many requests per second the system can process. Eventually, you may learn that different requests get processed at different rates. For example, if the system is able to process a thousand 1 KB requests/sec, the average latency is 1ms. Similarly, if throughput is 500 requests/sec for a larger payload size, then the average latency is 2ms. Step #2: Find the Saturation Point A cashier is never scanning barcodes all the time. Sometimes, they will be idle waiting for customers to place their items onto the checkout counter, or waiting for payment to complete. This introduces delays you’ll typically want to avoid. Likewise, every request your client submits against a system incurs, for example, network round trip time –  and you will always pay a penalty under low concurrency. To eliminate this idleness and further increase throughput, simply increase the concurrency. Do it in small increments until you observe that the throughput saturates and the latency starts to grow. Once you reach that point, congratulations! You effectively reached the system’s limits. In other words, unless you manage to get your work items processed faster (for example, by reducing the payload size) or tune the system to work more efficiently with your workload, you won’t achieve gains past that point. You definitely don’t want to find yourself in a situation where you are constantly pushing the system against its limits, though. Once you reach the saturation area, fall back to lower concurrency numbers to account for growth and unpredictability. Step #3: Add More Workers If you live in a busy area, grocery store demand might be beyond what a single cashier can sustain. Even if the store happened to hire the fastest cashier in the world, they would still be busy as demand/concurrency increases. Once the saturation point is reached it is time to hire more workers. In the distributed systems case, this means adding more shards to the system to scale throughput under the latency you’ve previously measured. This leads us to the following formula: Number of Workers = Target Throughput / Single worker limit You already discovered the performance limits of a single worker in the previous exercise. To find the total number of workers you need, simply divide your target throughput by how much a single worker can sustain under your defined latency requirements. Distributed systems like ScyllaDB provide linear scale, which simplifies the math (and total cost of ownership [TCO]). In fact, as you add more workers, chances are that you’ll achieve even higher rates than under a single worker. The reason is due to Network IRQs, and out of scope for this write-up (but see this perftune docs page for some details). Step #4: Increase Parallelism Think about it. The total time to check out an order is driven by the number of items in a cart divided by the speed of a single cashier. Instead of adding all the pressure on a single cashier, wouldn’t it be far more efficient to divide the items in your shopping cart (our work) and distribute them among friends who could then check out in parallel?  Sometimes the number of work items you need to process might not be evenly split across all available cashiers. For example, if you have 100 items to check out, but there are only 5 cashiers, then you would route 20 items per counter.  You might wonder: “Why shouldn’t I instead route only 5 customers with 20 items each?” That’s a great question – and you probably should do that, rather than having the store’s security kick you out. When designing real-time low latency OLTP systems, however, you mostly care about the time it takes for a single work unit to get processed. Although it is possible to “batch” multiple requests against a single shard, it is far more difficult (though not impossible) to consistently accomplish that task in such a way that every item is owned by that specific worker. The solution is to always ensure you dispatch individual requests one at a time. Keep concurrency high enough to overcome external delays like client processing time and network RTT, and introduce more clients for higher parallelism. Step #5: Avoid Hotspots Even after multiple cashiers get hired, it sometimes happens that a long line of customers queue after a handful of them. More often than not you should be able to find less busy – or even totally free – cashiers simply by walking through the hallway. This is known as a hotspot, and it often gets triggered due to unbound concurrency. It manifests in multiple ways. A common situation is when you have a traffic spike to a few popular items (load). That momentarily causes a single worker to queue a considerable amount of requests. Another example: low cardinality (uneven data distribution) prevents you from fully benefiting from the increased workforce. There’s also another commonly overlooked situation that frequently arises. It’s when you dispatch too much work against a single worker to coordinate, and that single worker depends on other workers to complete that task. Let’s get back to the shopping analogy: Assume you’ve found yourself on a blessed day as you approach the checkout counters. All cashiers are idle and you can choose any of them. After most of your items get scanned,  you say “Dear Mrs. Cashier, I want one of those whiskies sitting in your locked closet.” The cashier then calls for another employee to pick up your order. A few minutes later, you realize: “Oops, I forgot to pick up my toothpaste,” and another idling cashier nicely goes and picks it up for you. This approach introduces a few problems. First, your payment needs to be aggregated by a single cashier – the one you ran into when you approached the checkout counter. Second, although we parallelized, the “main” cashier will be idle waiting for their completion, adding delays. Third, further delays may be introduced in between each additional and individual request completion: for example, when the keys of the locked closet are only held by a single employee, so the total latency will be driven by the slowest response. Consider the following pseudocode: See that? Don’t do that. The previous pattern works nicely when there is a single work unit (or shard) to route requests to. Key-value caches are a great example of how multiple requests can get pipelined all together for higher efficiency. As we introduce sharding into the picture, this becomes a great way to undermine your latencies given the previously outlined reasons. Step #6: Limit Concurrency When more clients are introduced, it’s like customers inadvertently ending up at the supermarket during rush hour. Suddenly, they can easily end up in a situation where many clients all decide to queue under a handful of cashiers. You previously discovered the maximum concurrency at which a single shard can service requests. These are hard numbers and – as you observed during small scale testing – you won’t see any benefits if you try to push requests further. The formula goes like this: Concurrency = Throughput * Latency If a single shard sustains up to 5K ops/second under an average latency of 1 ms, then you can execute up to 5 concurrent in-flight requests at all times. Later you added more shards to scale that throughput. Say you scaled to 20 shards for a total throughput goal of 100K ops/second. Intuitively, you would think that your maximum useful concurrency would become 100. But there’s a problem. Introducing more shards to a distributed system doesn’t increase the maximum concurrency that a single shard can handle. To continue the shopping analogy, a single cashier will continue to scan barcodes at a fixed rate – and if several customers line up waiting to get serviced, their wait time will increase. To mitigate (though not necessarily prevent) that situation, divide the maximum useful concurrency among the number of clients. For example, if you’ve got 10 clients and a maximum useful concurrency of 100, then each client should be able to queue up to 10 requests across all available shards. This generally works when your requests are evenly distributed. However, it can still backfire when you have a certain degree of imbalance. Say all 10 clients decided to queue at least one request under the same shard. At a given point in time, that shard’s concurrency climbed to 10, double our initially discovered maximum concurrency. As a result, latency increases, and so does your P99. There are different approaches to prevent that situation. The right one to follow depends on your application and use case semantics. One option is to limit your client concurrency even further to minimize its P99 impact. Another strategy is to throttle at the system level, allowing each shard to shed requests as soon as it queues past a certain threshold. Step #7: Consider Background Operations Cashiers do not work at their maximum speed at all times. Sometimes, they inevitably slow down. They drink water, eat lunch, go to the restroom, and eventually change shifts. That’s life! It is now time for real-life production testing. Apply what you’ve learned so far and observe how the system behaves over long periods of time. Distributed systems often need to run background maintenance activities (like compactions and repairs) to keep things running smoothly. In fact, that’s precisely the reason why I recommended that you stay away from the saturation area at the beginning of this article. Background tasks inevitably consume system resources, and are often tricky to diagnose. I commonly receive reports like “We observed a latency increase due to compactions”, only to find out later the actual cause was something else – for example, a spike in queued requests to a given shard. Irrespective of the cause, don’t try to “throttle” system tasks. They exist and need to run for a reason. Throttling their execution will likely backfire on you eventually. Yes, background tasks slow down a given shard momentarily (that’s normal!). Your application should simply prefer other less busy replicas (or cashiers) when it happens. For a great detailed discussion of these points, see Brian Taylor’s insights during his How to Maximize Database Concurrency talk. Applying These Steps Hopefully, you are now empowered to address questions like “why latency is high”, or “why throughput is so low”.  As you start evaluating performance, start small. This minimizes costs, and gives you fine-grained control during each step. If latencies are sub-optimal under small scale, it either means you are pushing a single shard too hard, or that your expectations are off. Do not engage in larger scale testing until you are happy with the performance a single shard gives you. Once you feel comfortable with the performance of a single shard, scale capacity accordingly. Keep an eye on concurrency at all times and watch out for imbalances, mitigating or preventing them as needed. When you find yourself in a situation where throughput no longer increases but the system is idling, add more clients to increase parallelism. These concepts generally apply to every distributed system out there, including ScyllaDB. Our shard-per-core architecture linearly scales, making it easy for you to follow through the steps we discussed here. If you’d like to know more on how we can help, book a technical session with us.  

Introducing Netflix’s Key-Value Data Abstraction Layer

Vidhya Arvind, Rajasekhar Ummadisetty, Joey Lynch, Vinay Chella

Introduction

At Netflix our ability to deliver seamless, high-quality, streaming experiences to millions of users hinges on robust, global backend infrastructure. Central to this infrastructure is our use of multiple online distributed databases such as Apache Cassandra, a NoSQL database known for its high availability and scalability. Cassandra serves as the backbone for a diverse array of use cases within Netflix, ranging from user sign-ups and storing viewing histories to supporting real-time analytics and live streaming.

Over time as new key-value databases were introduced and service owners launched new use cases, we encountered numerous challenges with datastore misuse. Firstly, developers struggled to reason about consistency, durability and performance in this complex global deployment across multiple stores. Second, developers had to constantly re-learn new data modeling practices and common yet critical data access patterns. These include challenges with tail latency and idempotency, managing “wide” partitions with many rows, handling single large “fat” columns, and slow response pagination. Additionally, the tight coupling with multiple native database APIs — APIs that continually evolve and sometimes introduce backward-incompatible changes — resulted in org-wide engineering efforts to maintain and optimize our microservice’s data access.

To overcome these challenges, we developed a holistic approach that builds upon our Data Gateway Platform. This approach led to the creation of several foundational abstraction services, the most mature of which is our Key-Value (KV) Data Abstraction Layer (DAL). This abstraction simplifies data access, enhances the reliability of our infrastructure, and enables us to support the broad spectrum of use cases that Netflix demands with minimal developer effort.

In this post, we dive deep into how Netflix’s KV abstraction works, the architectural principles guiding its design, the challenges we faced in scaling diverse use cases, and the technical innovations that have allowed us to achieve the performance and reliability required by Netflix’s global operations.

The Key-Value Service

The KV data abstraction service was introduced to solve the persistent challenges we faced with data access patterns in our distributed databases. Our goal was to build a versatile and efficient data storage solution that could handle a wide variety of use cases, ranging from the simplest hashmaps to more complex data structures, all while ensuring high availability, tunable consistency, and low latency.

Data Model

At its core, the KV abstraction is built around a two-level map architecture. The first level is a hashed string ID (the primary key), and the second level is a sorted map of a key-value pair of bytes. This model supports both simple and complex data models, balancing flexibility and efficiency.

HashMap<String, SortedMap<Bytes, Bytes>>

For complex data models such as structured Records or time-ordered Events, this two-level approach handles hierarchical structures effectively, allowing related data to be retrieved together. For simpler use cases, it also represents flat key-value Maps (e.g. id → {"" → value}) or named Sets (e.g.id → {key → ""}). This adaptability allows the KV abstraction to be used in hundreds of diverse use cases, making it a versatile solution for managing both simple and complex data models in large-scale infrastructures like Netflix.

The KV data can be visualized at a high level, as shown in the diagram below, where three records are shown.

message Item (   
Bytes key,
Bytes value,
Metadata metadata,
Integer chunk
)

Database Agnostic Abstraction

The KV abstraction is designed to hide the implementation details of the underlying database, offering a consistent interface to application developers regardless of the optimal storage system for that use case. While Cassandra is one example, the abstraction works with multiple data stores like EVCache, DynamoDB, RocksDB, etc…

For example, when implemented with Cassandra, the abstraction leverages Cassandra’s partitioning and clustering capabilities. The record ID acts as the partition key, and the item key as the clustering column:

The corresponding Data Definition Language (DDL) for this structure in Cassandra is:

CREATE TABLE IF NOT EXISTS <ns>.<table> (
id text,
key blob,
value blob,
value_metadata blob,

PRIMARY KEY (id, key))
WITH CLUSTERING ORDER BY (key <ASC|DESC>)

Namespace: Logical and Physical Configuration

A namespace defines where and how data is stored, providing logical and physical separation while abstracting the underlying storage systems. It also serves as central configuration of access patterns such as consistency or latency targets. Each namespace may use different backends: Cassandra, EVCache, or combinations of multiple. This flexibility allows our Data Platform to route different use cases to the most suitable storage system based on performance, durability, and consistency needs. Developers just provide their data problem rather than a database solution!

In this example configuration, the ngsegment namespace is backed by both a Cassandra cluster and an EVCache caching layer, allowing for highly durable persistent storage and lower-latency point reads.

"persistence_configuration":[                                                   
{
"id":"PRIMARY_STORAGE",
"physical_storage": {
"type":"CASSANDRA",
"cluster":"cassandra_kv_ngsegment",
"dataset":"ngsegment",
"table":"ngsegment",
"regions": ["us-east-1"],
"config": {
"consistency_scope": "LOCAL",
"consistency_target": "READ_YOUR_WRITES"
}
}
},
{
"id":"CACHE",
"physical_storage": {
"type":"CACHE",
"cluster":"evcache_kv_ngsegment"
},
"config": {
"default_cache_ttl": 180s
}
}
]

Key APIs of the KV Abstraction

To support diverse use-cases, the KV abstraction provides four basic CRUD APIs:

PutItems — Write one or more Items to a Record

The PutItems API is an upsert operation, it can insert new data or update existing data in the two-level map structure.

message PutItemRequest (
IdempotencyToken idempotency_token,
string namespace,
string id,
List<Item> items
)

As you can see, the request includes the namespace, Record ID, one or more items, and an idempotency token to ensure retries of the same write are safe. Chunked data can be written by staging chunks and then committing them with appropriate metadata (e.g. number of chunks).

GetItems — Read one or more Items from a Record

The GetItemsAPI provides a structured and adaptive way to fetch data using ID, predicates, and selection mechanisms. This approach balances the need to retrieve large volumes of data while meeting stringent Service Level Objectives (SLOs) for performance and reliability.

message GetItemsRequest (
String namespace,
String id,
Predicate predicate,
Selection selection,
Map<String, Struct> signals
)

The GetItemsRequest includes several key parameters:

  • Namespace: Specifies the logical dataset or table
  • Id: Identifies the entry in the top-level HashMap
  • Predicate: Filters the matching items and can retrieve all items (match_all), specific items (match_keys), or a range (match_range)
  • Selection: Narrows returned responses for example page_size_bytes for pagination, item_limit for limiting the total number of items across pages and include/exclude to include or exclude large values from responses
  • Signals: Provides in-band signaling to indicate client capabilities, such as supporting client compression or chunking.

The GetItemResponse message contains the matching data:

message GetItemResponse (
List<Item> items,
Optional<String> next_page_token
)
  • Items: A list of retrieved items based on the Predicate and Selection defined in the request.
  • Next Page Token: An optional token indicating the position for subsequent reads if needed, essential for handling large data sets across multiple requests. Pagination is a critical component for efficiently managing data retrieval, especially when dealing with large datasets that could exceed typical response size limits.

DeleteItems — Delete one or more Items from a Record

The DeleteItems API provides flexible options for removing data, including record-level, item-level, and range deletes — all while supporting idempotency.

message DeleteItemsRequest (
IdempotencyToken idempotency_token,
String namespace,
String id,
Predicate predicate
)

Just like in the GetItems API, the Predicate allows one or more Items to be addressed at once:

  • Record-Level Deletes (match_all): Removes the entire record in constant latency regardless of the number of items in the record.
  • Item-Range Deletes (match_range): This deletes a range of items within a Record. Useful for keeping “n-newest” or prefix path deletion.
  • Item-Level Deletes (match_keys): Deletes one or more individual items.

Some storage engines (any store which defers true deletion) such as Cassandra struggle with high volumes of deletes due to tombstone and compaction overhead. Key-Value optimizes both record and range deletes to generate a single tombstone for the operation — you can learn more about tombstones in About Deletes and Tombstones.

Item-level deletes create many tombstones but KV hides that storage engine complexity via TTL-based deletes with jitter. Instead of immediate deletion, item metadata is updated as expired with randomly jittered TTL applied to stagger deletions. This technique maintains read pagination protections. While this doesn’t completely solve the problem it reduces load spikes and helps maintain consistent performance while compaction catches up. These strategies help maintain system performance, reduce read overhead, and meet SLOs by minimizing the impact of deletes.

Complex Mutate and Scan APIs

Beyond simple CRUD on single Records, KV also supports complex multi-item and multi-record mutations and scans via MutateItems and ScanItems APIs. PutItems also supports atomic writes of large blob data within a single Item via a chunked protocol. These complex APIs require careful consideration to ensure predictable linear low-latency and we will share details on their implementation in a future post.

Design Philosophies for reliable and predictable performance

Idempotency to fight tail latencies

To ensure data integrity the PutItems and DeleteItems APIs use idempotency tokens, which uniquely identify each mutative operation and guarantee that operations are logically executed in order, even when hedged or retried for latency reasons. This is especially crucial in last-write-wins databases like Cassandra, where ensuring the correct order and de-duplication of requests is vital.

In the Key-Value abstraction, idempotency tokens contain a generation timestamp and random nonce token. Either or both may be required by backing storage engines to de-duplicate mutations.

message IdempotencyToken (
Timestamp generation_time,
String token
)

At Netflix, client-generated monotonic tokens are preferred due to their reliability, especially in environments where network delays could impact server-side token generation. This combines a client provided monotonic generation_time timestamp with a 128 bit random UUID token. Although clock-based token generation can suffer from clock skew, our tests on EC2 Nitro instances show drift is minimal (under 1 millisecond). In some cases that require stronger ordering, regionally unique tokens can be generated using tools like Zookeeper, or globally unique tokens such as a transaction IDs can be used.

The following graphs illustrate the observed clock skew on our Cassandra fleet, suggesting the safety of this technique on modern cloud VMs with direct access to high-quality clocks. To further maintain safety, KV servers reject writes bearing tokens with large drift both preventing silent write discard (write has timestamp far in past) and immutable doomstones (write has a timestamp far in future) in storage engines vulnerable to those.

Handling Large Data through Chunking

Key-Value is also designed to efficiently handle large blobs, a common challenge for traditional key-value stores. Databases often face limitations on the amount of data that can be stored per key or partition. To address these constraints, KV uses transparent chunking to manage large data efficiently.

For items smaller than 1 MiB, data is stored directly in the main backing storage (e.g. Cassandra), ensuring fast and efficient access. However, for larger items, only the id, key, and metadata are stored in the primary storage, while the actual data is split into smaller chunks and stored separately in chunk storage. This chunk storage can also be Cassandra but with a different partitioning scheme optimized for handling large values. The idempotency token ties all these writes together into one atomic operation.

By splitting large items into chunks, we ensure that latency scales linearly with the size of the data, making the system both predictable and efficient. A future blog post will describe the chunking architecture in more detail, including its intricacies and optimization strategies.

Client-Side Compression

The KV abstraction leverages client-side payload compression to optimize performance, especially for large data transfers. While many databases offer server-side compression, handling compression on the client side reduces expensive server CPU usage, network bandwidth, and disk I/O. In one of our deployments, which helps power Netflix’s search, enabling client-side compression reduced payload sizes by 75%, significantly improving cost efficiency.

Smarter Pagination

We chose payload size in bytes as the limit per response page rather than the number of items because it allows us to provide predictable operation SLOs. For instance, we can provide a single-digit millisecond SLO on a 2 MiB page read. Conversely, using the number of items per page as the limit would result in unpredictable latencies due to significant variations in item size. A request for 10 items per page could result in vastly different latencies if each item was 1 KiB versus 1 MiB.

Using bytes as a limit poses challenges as few backing stores support byte-based pagination; most data stores use the number of results e.g. DynamoDB and Cassandra limit by number of items or rows. To address this, we use a static limit for the initial queries to the backing store, query with this limit, and process the results. If more data is needed to meet the byte limit, additional queries are executed until the limit is met, the excess result is discarded and a page token is generated.

This static limit can lead to inefficiencies, one large item in the result may cause us to discard many results, while small items may require multiple iterations to fill a page, resulting in read amplification. To mitigate these issues, we implemented adaptive pagination which dynamically tunes the limits based on observed data.

Adaptive Pagination

When an initial request is made, a query is executed in the storage engine, and the results are retrieved. As the consumer processes these results, the system tracks the number of items consumed and the total size used. This data helps calculate an approximate item size, which is stored in the page token. For subsequent page requests, this stored information allows the server to apply the appropriate limits to the underlying storage, reducing unnecessary work and minimizing read amplification.

While this method is effective for follow-up page requests, what happens with the initial request? In addition to storing item size information in the page token, the server also estimates the average item size for a given namespace and caches it locally. This cached estimate helps the server set a more optimal limit on the backing store for the initial request, improving efficiency. The server continuously adjusts this limit based on recent query patterns or other factors to keep it accurate. For subsequent pages, the server uses both the cached data and the information in the page token to fine-tune the limits.

In addition to adaptive pagination, a mechanism is in place to send a response early if the server detects that processing the request is at risk of exceeding the request’s latency SLO.

For example, let us assume a client submits a GetItems request with a per-page limit of 2 MiB and a maximum end-to-end latency limit of 500ms. While processing this request, the server retrieves data from the backing store. This particular record has thousands of small items so it would normally take longer than the 500ms SLO to gather the full page of data. If this happens, the client would receive an SLO violation error, causing the request to fail even though there is nothing exceptional. To prevent this, the server tracks the elapsed time while fetching data. If it determines that continuing to retrieve more data might breach the SLO, the server will stop processing further results and return a response with a pagination token.

This approach ensures that requests are processed within the SLO, even if the full page size isn’t met, giving clients predictable progress. Furthermore, if the client is a gRPC server with proper deadlines, the client is smart enough not to issue further requests, reducing useless work.

If you want to know more, the How Netflix Ensures Highly-Reliable Online Stateful Systems article talks in further detail about these and many other techniques.

Signaling

KV uses in-band messaging we call signaling that allows the dynamic configuration of the client and enables it to communicate its capabilities to the server. This ensures that configuration settings and tuning parameters can be exchanged seamlessly between the client and server. Without signaling, the client would need static configuration — requiring a redeployment for each change — or, with dynamic configuration, would require coordination with the client team.

For server-side signals, when the client is initialized, it sends a handshake to the server. The server responds back with signals, such as target or max latency SLOs, allowing the client to dynamically adjust timeouts and hedging policies. Handshakes are then made periodically in the background to keep the configuration current. For client-communicated signals, the client, along with each request, communicates its capabilities, such as whether it can handle compression, chunking, and other features.

KV Usage @ Netflix

The KV abstraction powers several key Netflix use cases, including:

  • Streaming Metadata: High-throughput, low-latency access to streaming metadata, ensuring personalized content delivery in real-time.
  • User Profiles: Efficient storage and retrieval of user preferences and history, enabling seamless, personalized experiences across devices.
  • Messaging: Storage and retrieval of push registry for messaging needs, enabling the millions of requests to flow through.
  • Real-Time Analytics: This persists large-scale impression and provides insights into user behavior and system performance, moving data from offline to online and vice versa.

Future Enhancements

Looking forward, we plan to enhance the KV abstraction with:

  • Lifecycle Management: Fine-grained control over data retention and deletion.
  • Summarization: Techniques to improve retrieval efficiency by summarizing records with many items into fewer backing rows.
  • New Storage Engines: Integration with more storage systems to support new use cases.
  • Dictionary Compression: Further reducing data size while maintaining performance.

Conclusion

The Key-Value service at Netflix is a flexible, cost-effective solution that supports a wide range of data patterns and use cases, from low to high traffic scenarios, including critical Netflix streaming use-cases. The simple yet robust design allows it to handle diverse data models like HashMaps, Sets, Event storage, Lists, and Graphs. It abstracts the complexity of the underlying databases from our developers, which enables our application engineers to focus on solving business problems instead of becoming experts in every storage engine and their distributed consistency models. As Netflix continues to innovate in online datastores, the KV abstraction remains a central component in managing data efficiently and reliably at scale, ensuring a solid foundation for future growth.

Acknowledgments: Special thanks to our stunning colleagues who contributed to Key Value’s success: William Schor, Mengqing Wang, Chandrasekhar Thumuluru, Rajiv Shringi, John Lu, George Cambell, Ammar Khaku, Jordan West, Chris Lohfink, Matt Lehman, and the whole online datastores team (ODS, f.k.a CDE).


Introducing Netflix’s Key-Value Data Abstraction Layer was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

ScyllaDB’s Rust Developer Workshop: What We Learned

A recap of the recent Rust developer workshop, where we built and refactored a high-performance Rust app for real-time data streaming (with ScyllaDB and Redpanda). Felipe Cardeneti Mendes (ScyllaDB Technical Director) and I recently got together with a couple thousand curious Rustaceans for a ScyllaDB Rust Developer Workshop. The agenda: walk through how we built and refactored a high-performance Rust app for real-time data streaming. We promised to show developers, engineers, and architects how to: Create and compile a sample social media app with Rust Connect the application to ScyllaDB (NoSQL data store) and Redpanda (streaming data) Negotiate tradeoffs related to data modeling and querying Manage and monitor the database for consistently low latencies This blog post is a quick recap of what we covered. Hopefully, it’s a nice wrapup for those who joined us live. If you missed it, you can still watch the recording (uncut – complete with a little cat chasing!). And feel free to ping me or Felipe with any questions you have. Access the workshop now Attend P99 CONF (free + virtual) to watch Rust tech talks First Things First First, I wanted to cover how I approach an existing, legacy codebase that Felipe so kindly generated for the workshop. I think it’s really important to respect everyone who interacts with code – past, present and future. That mindset helps foster good collaboration and leads to more maintainable and high quality code. Who knows, you might even have a laugh along the way. You probably spotted me using an Integrated Development Environment (IDE). Depending on your budget (from free to perhaps a couple hundred dollars), an IDE will really help streamline your coding process, especially when working with complex projects. The eagle eyed among you may have spotted some AI in there as well from our friends at GitHub. Every bit helps! Dealing with Dependencies In the code walkthrough, I first tackled the structure of the code, and showed how to organize workspace members. This helps me resolve dependencies efficiently and start to test the binaries in isolation: [workspace] members = ["backend", "consumer", "frontend"] resolver = "1" Then I could just run the consumer after stopping it in docker-compose with: cargo run --package consumer --bin consumer Updating the Driver Another thing I did was update the driver. It’s important to keep things in check with releases from ScyllaDB so we upgraded the Rust driver for the whole project. I did a quick walkthrough of application functionality and decided to write a quick smoke test that simulated traffic on the front end in terms of messaging between users. If you’re interested, I used a great load testing tool called k6 to simulate that load. Here’s the script: export default function () { http.post('http://localhost:3001/new_post', JSON.stringify({ content: 'bar', subject: 'foo', id: '8d8712fc-786f-4d72-98ea-3669e56f7012' }), { headers: { 'Content-Type': 'application/json', }, }); } Dealing with an Offset Bug Once we had some messages flowing (perhaps way too many, as it turned out) I discovered a potential bug, where the offset was not being persisted between application restarts. This meant every time we restarted the application, all of the messages would be read from the topic and then re-written to the database. Without understanding functionality like the ability to parse consumer offsets in Redpanda, I went for a more naive approach by storing the offset in ScyllaDB instead. I’m sure I’m not the first dev to go down the wrong path, and I fully blame Felipe for not intercepting earlier 😉 Refactoring Time In any case, it was fun to see how we might approach the topic of refactoring code. It’s always easier to start with small, manageable tasks when making improvements or refactoring code. The first thing I did was decide what the table (and ultimately query) might look like. This “query first design” is an important design concept in ScyllaDB..Be sure to check out some ScyllaDB University courses on this. I decided the table would look something like this to store my offset value: CREATE TABLE IF NOT EXISTS ks.offset (consumer text PRIMARY KEY, count BigInt) We briefly touched on why I chose a BigInt primitive instead of a Counter value. The main reason is that we can’t arbitrarily set the latter to a value, only increment or decrement it. We then tackled how we might write to that table and came up with the following: async fn update_offset(offset: i64, session: &Session, update_counter: &PreparedStatement, consumer: &str) -> Result<()> { session.execute(update_counter, (offset, consumer)).await?; Ok(()) } You’ll notice here that I’m passing it a prepared statement which is an important concept to grasp when making your code perform well with ScyllaDB. Be sure to read the docs on that if you’re unsure. I also recall writing a TODO to move some existing prepared query statements outside a for loop. The main reason: you only need to do this once for your app, not over and over. So watch out for that mistake. I also stored my query as a constant: const UPDATE_OFFSET: &str = "UPDATE ks.offset SET count = ? WHERE consumer = ?"; There are different ways to skin this, like maybe some form of model approach, but this was a simple way to keep the queries in one place within the consumer code. We restarted the app and checked the database using cqlsh to see if the offsets were being written – and they weren’t! But first, a quick tip from other webinars: If you’re running ScyllaDB in a docker container, you can simple exec to it and run the tool: docker exec -it scylla cqlsh Back to my mistake, why no writes to the table? If you recall, I write the offset after the consumer has finished processing records from the topic: offset = consumer(&postclient, "posts", offset, &session).await; update_offset(offset, &session, &update_counter, "posts").await.expect("Failed to update offset"); tokio::time::sleep(Duration::from_secs(5)).await; Since I had written a load test with something like 10K records, that consumer takes some time to complete, so update_offset wasn’t getting called straight away. By the end of the webinar, it actually finished reading from the topic and wrote the offset to the table. Another little change I snuck in there was on: tokio::time::sleep(Duration::from_secs(5)).await; Felipe spoke to the benefits of using tokio, an asynchronous runtime for Rust. The previous thread sleep would in fact do nothing, hence the change. Hooray for quick fixes! Once we had writes, we needed to read from the table, so I added another function that looked like this: async fn fetch_offset(session: &Session, consumer: &str) -> Result { let query = "SELECT count FROM ks.offset WHERE consumer = ?"; let values = (consumer,); let result = session.query(query, values).await.expect("Failed to execute query"); if let Some(row) = result.maybe_first_row_typed::<(i64,)>().expect("Failed to get row") { Ok(row.0) } else { Ok(0) } } I spoke about some common gotchas here, like misunderstanding how query values work, with different types, and whether to use a slice &[] or a tuple (). Query text is constant, but the values might change. You can pass changing values to a query by specifying a list of variables as bound values. Don’t forget the parenthesis! I also highlighted some of the convenience methods in query result, like maybe_first_row_typed. That returns Option<RowT> containing the first row from the result – which is handy when you just want the first row or None. Once again, you can play around with types, and even use custom structs if you prefer for the output. In my case, it was just a tuple with an i64. The complete consumer code for posts looked something like this: tokio::spawn(async move { use std::time::Duration; info!("Posts Consumer Started"); let session = db_session().await; let update_counter = session.prepare(UPDATE_OFFSET).await.expect("Failed to prepare query"); loop { let mut offset = fetch_offset(&session, "posts").await.expect("Failed to fetch offset"); offset = consumer(&postclient, "posts", offset, &session).await; update_offset(offset, &session, &update_counter, "posts").await.expect("Failed to update offset"); tokio::time::sleep(Duration::from_secs(5)).await; } }); You can see I prepare the statement before the loop, then I fetch the offset from the database, consume the topic, write the offset to the database and sleep. Keep doing that forever! What We Didn’t Have Time to Cover There were a few things that I wanted to cover, but ran out of time. If you wanted to write results to a custom struct, the code might look something like: #[derive(Default, FromRow)] pub struct Offset { consumer: String, count: i64, } use scylla::IntoTypedRows; async fn fetch_offset_type(session: &Session, consumer: &str) -> Offset { let query = "SELECT * FROM ks.offset WHERE consumer = ?"; let values = (consumer,); let result = session.query(query, values).await.expect("Failed to execute query"); if let Some(rows) = result.rows { if let Some(row) = rows.into_typed::().next() { let offset: Offset = row.expect("Failed to parse row"); return offset; } } Offset { consumer: consumer.to_string(), count: 0, } } There are some custom values you’ll come across like CqlTimestamps and Counter… so you should be aware of the ways to handle these different data types. For example, rather than convert everything to and from millisecond timestamps, you can add the chrono feature flag on the crate to interact with time. You can also improve logging with the driver’s support of the tracing crate for your logs. If you add that, you can use a tracing subscriber as follows: #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); … Wrapping Up I personally find refactoring code enjoyable. I’d encourage you to have a patient, persistent approach to coding, testing and refactoring. When it comes to ScyllaDB it’s a product where it really pays to read the documentation, as many of the foot guns are well documented. If you still find yourself stuck, feel free to ask questions on the ScyllaDB forum and learn from your peers. And remember, small, continuous improvements lead to long-term benefits. Have fun! See what you missed – watch the video

Training Updates, Forum, Upcoming ScyllaDB University LIVE Event

The ScyllaDB “sea monster” community is growing fast, so we’re excited to announce lots of new resources to get you on the fast track to ScyllaDB success. In this post, I’ll update you on our next ScyllaDB Labs and ScyllaDB University LIVE training events, introduce new lessons on ScyllaDB University, and summarize some interesting discussions from the community forum. ScyllaDB University Updates For those not (yet) familiar with ScyllaDB University, it’s an online learning and training center for ScyllaDB. The self-paced lessons include theory and hands-on labs that you can run yourself. To get started, just create a (free) account. As the product evolves, we enhance and update the training material. One of the lessons we recently updated is the “How to Write Better Apps” lesson. Many of the issues new users commonly face can be avoided by using best practices and straightforward built-in debugging mechanisms. This lesson covers important metrics users should track, and how to keep track of them using the Monitoring stack. Other topics include prepared statements, token-aware drivers, denormalizing data, working with multiple data centers, and data modeling best practices (partition sizing, distribution, retries, and batching). Another lesson we recently updated is the “Materialized Views, Secondary Indexes, and Filtering” lesson. ScyllaDB offers three indexing options: Materialized Views, Global Secondary Indexes, and Local Secondary Indexes. I often get questions from users about the differences between them and how to use them. They are all covered in this lesson, along with a comparison, examples of when to use each, quizzes, and hands-on labs. By the end of the lesson, users will have an understanding of the different index types in ScyllaDB, how to use them, and when to use each one. Additionally, they’ll gain some hands-on experience by creating and using these indexes in the labs. Additionally, we are embedding interactive, hands-on labs within the lessons, as you can see in the Quick Wins lab. Having the lab embedded within the browser means that you can run it regardless of your operating system – and without any prerequisites. In addition to the on-demand ScyllaDB University portal, we periodically host live online training sessions. The next two we have scheduled are ScyllaDB Labs and ScyllaDB University LIVE. Start Learning at ScyllaDB University   ScyllaDB Labs – 17 September, 2024 The interactive workshop will be held in an Asia-friendly time zone. Its focus is on providing hands-on experience building and interacting with high-performance applications using ScyllaDB. The labs introduce NoSQL strategies used by top teams, guiding participants through the creation of sample applications. We cover key topics such as determining if ScyllaDB is suitable for your use case, achieving low-latency NoSQL at scale, application performance optimization, data modeling and making important decisions when getting started with ScyllaDB. I will be hosting it together with my colleague Tim Koopmans. He will start with an introduction to ScyllaDB, covering some core concepts and a getting started lab. After that, my talk will focus on Data Modeling basics, including some architecture and another hands-on lab. Finally, attendees will have some time to run labs independently while we will be there to answer questions. Save Your Spot   ScyllaDB University Live – 24 September, 2024 ScyllaDB University LIVE  is an instructor-led NoSQL database online, half-day training event. It includes live sessions conducted by ScyllaDB’s lead engineers and architects and has two parallel tracks. Essentials: ScyllaDB architecture, key components, data modeling, and building your first ScyllaDB-powered app (in Rust!). Advanced Topics: Deep dives into cluster topology, advanced data modeling, optimizing elasticity and efficiency, and power user tips and tricks. Participants can switch between tracks or attend specific sessions of interest. After the training, there will be an expert panel to answer user questions. Attendees will also have the chance to complete quizzes, participate in hands-on labs, earn certificates, and receive exclusive ScyllaDB swag. The sessions are live, and there’s no on-demand option. Save Your Spot   What’s Trending on the ScyllaDB Community Forum The community forum is the place to discuss all things NoSQL related. You can ask questions, learn what your peers are doing, and share how you’re using ScyllaDB. Here is a summary of some of the top topics. GUI Visualization for ScyllaDB Many new users ask about using a GUI with ScyllaDB. Some relevant tools are the ScyllaDB Manager and the Monitoring Stack. One user suggested using JetBrains Rider, a paid tool that he found useful. Additionally, DBeaver, in some versions, now supports ScyllaDB. It’s a universal database manager that allows users to run CQL queries and view result tables directly within the interface. See the complete discussion Kafka: How to extract nested field in JSON structure? Here a user that is migrating from MySQL to ScyllaDB, and using ElasticSearch, is using the scylladb-cdc-source-connector to publish CDC messages to a Kafka topic and is facing issues with accessing nested fields in the JSON message structure. Existing Single Message Transformations (SMTs) don’t support accessing nested fields, some workaround options are discussed. See the complete discussion Best way to Fetch N rows in ScyllaDB: Count, Limit or Paging This topic discusses different ways for fetching N rows while using the TTL feature, optimizing for performance and efficiency. Three options are mentioned: using count(), using LIMIT, and using Paging. Some suggestions were to change the clustering key to include the timestamp and allow for more efficient queries, as well as using a Counter Table. Another point that was brought up was the performance difference between COUNT and LIMIT. See the complete discussion Read_concurrency_semaphore & p99 read latency The user is experiencing high P99 read latency in an application that queries time-series data using ScyllaDB, despite low average latency. The application uses a ScyllaDB cluster with 3 nodes, each with 16 cores, 128 GB RAM, and 3 TB RAID0 SSDs. The schema is designed for time-series data with a composite primary key and TimeWindowCompactionStrategy for compaction. While the ScyllaDB dashboard shows P99 read latency as low (1-10ms), the gocql latency report shows occasional spikes in P99 latency (700ms to 1s). The user has tried profiling and tracing but cannot identify the root cause. See the complete discussion ScyllaDB vs Aerospike The user read a whitepaper comparing ScyllaDB’s and Aerospike’s performance. The paper shows that ScyllaDB outperforms Aerospike by 30-40%. The user has several questions about the methodology of the tests used, versions, configurations, and so on. See the complete discussion Say Hello to the Community  

Be Part of Something BIG – Speak at Monster Scale Summit

A little sneak peek at something massive: a new virtual conference on extreme-scale engineering! Whether you’re designing, implementing, or optimizing systems that are pushed to their limits, we’d love to hear about your most impressive achievements and lessons learned – at Monster Scale Summit 2025. Become a Monster Scale Summit Speaker Register for Monster Scale Summit [Free] What’s Monster Scale Summit? Monster Scale Summit is a technical conference that connects the community of professionals working on performance-sensitive data-intensive applications. Engineers, architects, and SREs from gamechangers around the globe will be gathering virtually to explore “monster scale” challenges with respect to extreme levels of throughput, data, and global distribution. It’s a lot like P99 CONF (also hosted by ScyllaDB) – a two-day event that’s free, fully virtual, and highly interactive. The core difference is that it’s focused on extreme-scale engineering vs. all things performance. We just opened the call for speakers, and the lineup already includes engineers from Slack, Salesforce, VISA, American Express, ShareChat, Cloudflare, and Disney. Keynotes include Gwen Sharpira, Chris Riccomini, and Martin Kleppmann (Designing Data Intensive Applications). What About ScyllaDB Summit? You might already be familiar with ScyllaDB Summit. Monster Scale Summit is the next evolution of that conference. We’re scaling it up and out to bring attendees more – and broader – insights on designing, implementing, and optimizing performance-sensitive data-intensive applications. But don’t worry – ScyllaDB and sea monsters will still be featured prominently throughout the event. And speakers will get sea monster plushies as part of the swag pack. 😉   Details please! When: March 11 + 12 Where: Wherever you’d like! It’s intentionally virtual, so you can present and interact with attendees from anywhere around the world. Topics: Core topics include: Distributed databases Streaming and real-time processing Intriguing system designs Approaches to a massive scaling challenge Methods for balancing latency/concurrency/throughput SRE techniques proven at scale Infrastructure built for unprecedented demands. What we’re looking for: We welcome a broad spectrum of talks about tackling the challenges that arise in the most massive, demanding environments. The conference prioritizes technical talks sharing first-hand experiences. Sessions are just 15-20 minutes – so consider this your TED Talk debut! Share Your Ideas

Clues in Long Queues: High IO Queue Delays Explained

How seemingly peculiar metrics might provide interesting insights into system performance In large systems, you often encounter effects that seem weird at first glance, but – when studied carefully – give an invaluable clue to understanding system behavior. When supporting ScyllaDB deployments, we observe many workload patterns that reveal themselves in various amusing ways. Sometimes what seems to be a system misbehaving stems from a bad configuration or sometimes a bug in the code. However, pretty often what seems to be impossible at first sight turns into an interesting phenomenon. Previously we described one of such effects called “phantom jams.” In this post, we’re going to show another example of the same species. As we’ve learned from many of the ScyllaDB deployments we track, sometimes a system appears to be lightly loaded and only a single parameter stands out, indicating something that typically denotes a system bottleneck. The immediate response is typically to disregard the outlier and attribute it to spurious system slow-down. However, thorough and careful analysis of all the parameters, coupled with an understanding of the monitoring system architecture, shows that the system is indeed under-loaded but imbalanced – and that crazy parameter was how the problem actually surfaced. Scraping metrics Monitoring systems often follow a time-series approach. To avoid overwhelming their monitored targets and frequently populating a time-series database (TSDB) with redundant data, these solutions apply a concept known as a “scrape interval.” Although different monitoring solutions exist, we’ll mainly refer to Prometheus and Grafana throughout this article, given that these are what we use for ScyllaDB Monitoring. Prometheus polls its monitored endpoints periodically and retrieves a set of metrics. This is called “scraping”. Metrics samples collected in a single scrape consist of name:value pairs, where value is a number. Prometheus supports four core types of metrics, but we are going to focus on two of those: counters and gauges. Counters are monotonically increasing metrics that reflect some value accumulated over time. When observed through Grafana, the rate() function is applied to counters, as it reflects the changes since the previous scrape instead of its total accumulated value. Gauges, on the other hand, are a type of metric that can arbitrarily rise and fall. Apparently (and surprisingly at the same time) gauges reflect a metric state as observed during scrape-time. This effectively means that any changes made between scrape intervals will be overlooked, and are lost forever. Before going further with the metrics, let’s take a step back and look at what makes it possible for ScyllaDB to serve millions and billions of user requests per second at sub-millisecond latency. IO in ScyllaDB ScyllaDB uses the Seastar framework to run its CPU, IO, and Network activity. A task represents a ScyllaDB operation run in lightweight threads (reactors) managed by Seastar. IO is performed in terms of requests and goes through a two-phase process that happens inside the subsystem we call the IO scheduler. The IO Scheduler plays a critical role in ensuring that IO gets both prioritized and dispatched in a timely manner, which often means predictability – some workloads require that submitted requests complete no later than within a given, rather small, time. To achieve that, the IO Scheduler sits in the hot path – between the disks and the database operations – and is built with a good understanding of the underlying disk capabilities. To perform an IO, first a running task submits a request to the scheduler. At that time, no IO happens. The request is put into the Seastar queue for further processing. Periodically, the Seastar reactor switches from running tasks to performing service operations, such as handling IO. This periodic switch is called polling and it happens in two circumstances: When there are no more tasks to run (such as when all tasks are waiting for IO to complete), or When a timer known as a task-quota elapses, by default at every 0.5 millisecond intervals. The second phase of IO handling involves two actions. First, the kernel is asked for any completed IO requests that were made previously. Second, outstanding requests in the ScyllaDB IO queues are dispatched to disk using the Linux kernel AIO API. Dispatching requests into the kernel is performed at some rate that’s evaluated out of pre-configured disk throughput and the previously mentioned task-quota parameter. The goal of this throttled dispatching is to make sure that dispatched requests are completed within the duration of task-quota. Urgent requests that may pop up in the queue during that time don’t need to wait for the disk to be able to serve them. For the scope of this article, let’s just say that dispatching happens at the disk throughput. For example, if disk throughput is 10k operations per second and poll happens each millisecond, then the dispatch rate will be 10 requests per poll. IO Scheduler Metrics Since the IO Scheduler sits in the hot path of all IO operations, it is important to understand how the IO Scheduler is performing. In ScyllaDB, we accomplish that via metrics. Seastar exposes many metrics, and several IO-related ones are included among them. All IO metrics are exported per class with the help of metrics labeling, and each represents a given IO class activity at a given point in time. IO Scheduler Metrics for the commitlog class Bandwidth and IOPS are two metrics that are easy to reason about. They show the rates at which requests get dispatched to disk. Bandwidth is a counter that gets increased by the request length every time it’s sent to disk. IOPS is a counter that gets incremented every time a request is sent to disk. When observed through Grafana, the aforementioned rate() function is applied and these counters are shown as BPS (bytes per second) and IO/s (IO per second), under their respective IO classes. Queue length metrics are gauges that represent the size of a queue. There are two kinds of queue length metrics. One represents the number of outstanding requests under the IO class. The other represents the number of requests dispatched to the kernel. These queues are also easy to reason about. Every time ScyllaDB makes a request, the class queue length is incremented. When the request gets dispatched to disk, the class queue length gauge is decremented and the disk queue length gauge is incremented. Eventually, as the IO completes, the disk queue length gauge goes down. When observing those metrics, it’s important to remember that they reflect the queue sizes as they were at the exact moment when they got scraped. It’s not at all connected to how large (or small) the queue was over the scrape period. This common misconception may cause one to end up with the wrong conclusions about how the IO scheduler or the disks are performing. Lastly, we have latency metrics known as IO delays. There are two of those – one for the software queue, and another for the disk. Each represents the average time requests spent waiting to get serviced. In earlier ScyllaDB versions, latency metrics were represented as gauges. The value shown was the latency of the last dispatched request (from the IO class queue to disk), or completed request (a disk IO completion). Because of that, the latencies shown weren’t accurate and didn’t reflect reality. A single ScyllaDB shard can perform thousands of requests per second and show the latency of a single request scraped after a long interval omits important insights about what really happened since the previous scrape. That’s why we eventually replaced these gauges with counters. Since then, latencies have been shown as a rate between the scrape intervals. Therefore, to calculate the average request delay, the new counter metrics are divided by the total number of IOPS dispatched within the scrape period. Disk can do more When observing IO for a given class, it is common to see corresponding events that took place during a specific interval. Consider the following picture: IO Scheduler Metrics – sl:default class The exact numbers are not critical here. What matters is how different plots correspond to each other. What’s strange here? Observe the two rightmost panels – bandwidth and IOPS. On a given shard, bandwidth starts at 5MB/s and peaks at 20MB/s, whereas IOPS starts at 200 operations/sec and peaks at 800 ops. These are really conservative numbers. The system from which those metrics were collected can sustain 1GB/s bandwidth under several thousands IOPS. Therefore, given that the numbers above are per-shard, the disk is using about 10% of its total capacity. Next, observe that the queue length metric (the second from the left) is empty most of the time. This is expected, partially because it’s a gauge and it represents the number of requests sitting under the queue as observed during scrape time – but not the total number of requests which got queued. Since disk capacity is far from being saturated, the IO scheduler dispatches all requests to disk shortly after they arrive into the scheduler queue. Given that IO polling happens at sub-millisecond intervals, in-queue requests get dispatched to disk within a millisecond. So, why do the latencies shown in the queue delay metric (the leftmost one) grow close to 40 milliseconds? In such situations, ScyllaDB users commonly wonder, “The disk can do more – why isn’t ScyllaDB’s IO scheduler consuming the remaining disk capacity?!” IO Queue delays explained To get an idea of what’s going on, let’s simplify the dispatching model described above and then walk through several thought experiments on an imaginary system. Assume that a disk can do 100k IOPS, and ignore its bandwidth as part of this exercise. Next, assume that the metrics scraping interval is 1 second, and that ScyllaDB polls its queues once every millisecond. Under these assumptions, according to the dispatching model described above, ScyllaDB will dispatch at most 100 requests at every poll. Next, we’ll see what happens if servicing 10k requests within a second, corresponding to 10% of what our disk can handle. IOPS Capacity Polling interval Dispatch Rate Target Request Rate Scrape Interval 100K 1ms 100 per poll 10K/second 1s   Even request arrival In the first experiment, requests arrive evenly at the queue – one request at every 1/10k = 0.1 millisecond. By the end of each tick, there will be 10 requests in the queue, and the IO scheduler will dispatch them all to disk. When polling occurs, each request will have accumulated its own in-queue delays. The first request waited 0.9ms, the second 0.8ms, …, 0 ms. The sum results in approximately 5ms of total in-queue delay. After 1 second or 1K ticks/polls), we’ll observe a total in-queue delay of 5 seconds. When scraped, the metrics will be: A rate of 10K IOPS An empty queue An average in-queue delay/latency of 0.5ms (5 seconds total delay / 10K IOPS) Single batch of requests In the second experiment, all 10k requests arrive at the queue in the very beginning and queue up. As the dispatch rate corresponds to 100 requests per tick, the IO scheduler will need 100 polls to fully drain the queue. The requests dispatched at the first tick will contribute 1 millisecond each to the total in queue delay, with a total sum of 100 milliseconds. Requests dispatched at the second tick will contribute 2 milliseconds each, with a total sum of 200 milliseconds. Therefore, requests dispatched during the Nth tick will contribute N*100 milliseconds to the delay counter. After 100 ticks the total in-queue delay will be 100 + 200 + … + 10000 ms = 500000 ms = 500 seconds. Once the metrics endpoint gets scraped, we’ll observe: The same rate of 10k IOPS, the ordering of arrival won’t influence the result The same empty queue, given that all requests were dispatched in 100ms (prior to scrape time) 50 milliseconds in-queue delay (500 seconds total delay / 10K IOPS) Therefore, the same work done differently resulted in higher IO delays. Multiple batches If the submission of requests happens more evenly, such as 1k batches arriving at every 100ms, the situation would be better, though still not perfect. Each tick would dispatch 100 requests, fully draining the queue within 10 ticks. However, given our polling interval of 1ms, the following batch will arrive only after 90 ticks and the system will be idling. As we observed in the previous examples, each tick contributes N*100 milliseconds to the total in-queue delay. After the queue gets fully drained, the batch contribution is 100 + 200 + … + 1000 ms = 5000 ms = 5 seconds. After 10 batches, this results in 50 seconds of total delay. When scraped, we’ll observe: The same rate of 10k IOPS The same empty queue 5 milliseconds in-queue delay (50 seconds / 10K IOPS) To sum up: The above experiments aimed to demonstrate that the same workload may render a drastically different observable “queue delay” when averaged over a long enough period of time. It can be an “expected” delay of half-a-millisecond. Or, it can be very similar to the puzzle that was shown previously – the disk seemingly can do more, the software queue is empty, and the in-queue latency gets notably higher than the tick length. Average queue length over time Queue length is naturally a gauge-type metric. It frequently increases and decreases as IO requests arrive and get dispatched. Without collecting an array of all the values, it’s impossible to get an idea of how it changed over a given period of time. Therefore, sampling the queue length between long intervals is only reliable in cases of very uniform incoming workloads. There are many parameters of the same nature in the computer world. The most famous example is the load average in Linux. It denotes the length of the CPU run-queue (including tasks waiting for IO) over the past 1, 5 and 15 minutes. It’s not a full history of run-queue changes, but it gives an idea of how it looked over time. Implementing a similar “queue length average” would improve the observability of IO queue length changes. Although possible, that would require sampling the queue length more regularly and exposing more gauges. But as we’ve demonstrated above, accumulated in-queue total time is yet another option – one that requires a single counter, but still shows some history. Why is a scheduler needed? Sometimes you may observe that doing no scheduling at all may result in much better in-queue latency. Our second experiment clearly shows why. Consider that – as in that experiment, 10k requests arrive in one large batch and ScyllaDB just forwards them straight to disk in the nearest tick. This will result in a 10000 ms total latency counter, respectively 1ms average queue delay. The initial results look great. At this point, the system will not be overloaded. As we know, no new requests will arrive and the disk will have enough time and resources to queue and service all dispatched requests. In fact, the disk will probably perform IO even better than it would while being fed eventually with requests. Doing so would likely maximize the disk’s internal parallelism in a better way, and give it more opportunities to apply internal optimizations, such as request merging or batching FTL updates. So why don’t we simply flush the whole queue into disk whatever length it is? The answer lies in the details, particularly in the “as we know” piece. First of all, Seastar assigns different IO classes for different kinds of workloads. To reflect the fact that different workloads have different importance to the system, IO classes have different priorities called “shares.” It is then the IO scheduler’s responsibility to dispatch queued IO requests to the underlying disk according to class shares value. For example, any IO activity that’s triggered by user queries runs under its own class named “statement” in ScyllaDB Open Source, and “sl:default” in Enterprise. This class usually has the largest shares denoting its high priority. Similarly, any IO performed during compactions occurs in the “compaction” class, whereas memtable flushes happen inside the “memtable” class – and both typically have low shares. We say “typically” because ScyllaDB dynamically adjusts shares of those two classes when it detects more work is needed for a respective workflow (for example, when it detects that compaction is falling behind). Next, after sending 10k requests to disk, we may expect that they will all complete in about 10k/100k = 100ms. Therefore, there isn’t much of a difference whether requests get queued by the IO scheduler or by the disk. The problem happens if and only if a new high-priority request pops up when we are waiting for the batch to get serviced. Even if we dispatch this new urgent request instantly, it will likely need to wait for the first batch to complete. Chances that disk will reorder it and service earlier are too low to rely upon, and that’s the delay the scheduler tries to avoid. Urgent requests need to be prioritized accordingly, and get served much faster. With the IO Scheduler dispatching model, we guarantee that a newly arrived urgent request will get serviced almost immediately. Conclusion Understanding metrics is crucial for understanding the behavior of complex systems. Queues are an essential element present in any data processing, and seeing how data traverses through queues is crucial for engineers solving real-life performance problems. Since it’s impossible to track every single data unit, compound metrics like counters and gauges become great companions for achieving said task. Queue length is a very important parameter. Observing its change over time reveals bottlenecks of the system, thus shedding light on performance issues that can arise in complex highly loaded systems. Unfortunately, one cannot see the full history of queue length changes (like you can with many other parameters), and this results in a misunderstanding of the system behavior. This article described an attempt to map queue length from gauge-type metrics to a counter-type one – thus making it possible to accumulate a history of the queue length changes over time. Even though the described “total delay” metrics and its behavior is heavily tied to how ScyllaDB monitoring and Seastar IO scheduler work, this way of accumulating and monitoring latencies is generic enough to be applied to other systems as well. More ScyllaDB Engineering Blogs 

Instaclustr for Apache Cassandra® 5.0 Now Generally Available

NetApp is excited to announce the general availability (GA) of Apache Cassandra® 5.0 on the Instaclustr Platform. This follows the release of the public preview in March.

NetApp was the first managed service provider to release the beta version, and now the Generally Available version, allowing the deployment of Cassandra 5.0 across the major cloud providers: AWS, Azure, and GCP, and onpremises.

Apache Cassandra has been a leader in NoSQL databases since its inception and is known for its high availability, reliability, and scalability. The latest version brings many new features and enhancements, with a special focus on building data-driven applications through artificial intelligence and machine learning capabilities.

Cassandra 5.0 will help you optimize performance, lower costs, and get started on the next generation of distributed computing by: 

  • Helping you build AI/ML-based applications through Vector Search  
  • Bringing efficiencies to your applications through new and enhanced indexing and processing capabilities 
  • Improving flexibility and security 

With the GA release, you can use Cassandra 5.0 for your production workloads, which are covered by NetApp’s industryleading SLAs. NetApp has conducted performance benchmarking and extensive testing while removing the limitations that were present in the preview release to offer a more reliable and stable version. Our GA offering is suitable for all workload types as it contains the most up-to-date range of features, bug fixes, and security patches.  

Support for continuous backups and private network addons is available. Currently, Debezium is not yet compatible with Cassandra 5.0. NetApp will work with the Debezium community to add support for Debezium on Cassandra 5.0 and it will be available on the Instaclustr Platform as soon as it is supported. 

Some of the key new features in Cassandra 5.0 include: 

  • Storage-Attached Indexes (SAI): A highly scalable, globally distributed index for Cassandra databases. With SAI, column-level indexes can be added, leading to unparalleled I/O throughput for searches across different data types, including vectors. SAI also enables lightning-fast data retrieval through zero-copy streaming of indices, resulting in unprecedented efficiency.  
  • Vector Search: This is a powerful technique for searching relevant content or discovering connections by comparing similarities in large document collections and is particularly useful for AI applications. It uses storage-attached indexing and dense indexing techniques to enhance data exploration and analysis.  
  • Unified Compaction Strategy: This strategy unifies compaction approaches, including leveled, tiered, and time-windowed strategies. It leads to a major reduction in SSTable sizes. Smaller SSTables mean better read and write performance, reduced storage requirements, and improved overall efficiency.  
  • Numerous stability and testing improvements: You can read all about these changes here. 

All these new features are available out-of-the-box in Cassandra 5.0 and do not incur additional costs.  

Our Development team has worked diligently to bring you a stable release of Cassandra 5.0. Substantial preparatory work was done to ensure you have a seamless experience with Cassandra 5.0 on the Instaclustr Platform. This includes updating the Cassandra YAML and Java environment and enhancing the monitoring capabilities of the platform to support new data types.  

We also conducted extensive performance testing and benchmarked version 5.0 with the existing stable Apache Cassandra 4.1.5 version. We will be publishing our benchmarking results shortly; the highlight so far is that Cassandra 5.0 improves responsiveness by reducing latencies by up to 30% during peak load times.  

Through our dedicated Apache Cassandra committer, NetApp has contributed to the development of Cassandra 5.0 by enhancing the documentation for new features like Vector Search (Cassandra-19030), enabling Materialized Views (MV) with only partition keys (Cassandra-13857), fixing numerous bugs, and contributing to the improvements for the unified compaction strategy feature, among many other things. 

Lifecycle Policy Updates 

As previously communicated, the project will no longer maintain Apache Cassandra 3.0 and 3.11 versions (full details of the announcement can be found on the Apache Cassandra website).

To help you transition smoothly, NetApp will provide extended support for these versions for an additional 12 months. During this period, we will backport any critical bug fixes, including security patches, to ensure the continued security and stability of your clusters. 

Cassandra 3.0 and 3.11 versions will reach end-of-life on the Instaclustr Managed Platform within the next 12 months. We will work with you to plan and upgrade your clusters during this period.  

Additionally, the Cassandra 5.0 beta version and the Cassandra 5.0 RC2 version, which were released as part of the public preview, are now end-of-life You can check the lifecycle status of different Cassandra application versions here.  

You can read more about our lifecycle policies on our website. 

Getting Started 

Upgrading to Cassandra 5.0 will allow you to stay current and start taking advantage of its benefits. The Instaclustr by NetApp Support team is ready to help customers upgrade clusters to the latest version.  

  • Wondering if it’s possible to upgrade your workloads from Cassandra 3.x to Cassandra 5.0? Find the answer to this and other similar questions in this detailed blog.
  • Click here to read about Storage Attached Indexes in Apache Cassandra 5.0.
  • Learn about 4 new Apache Cassandra 5.0 features to be excited about. 
  • Click here to learn what you need to know about Apache Cassandra 5.0. 

Why Choose Apache Cassandra on the Instaclustr Managed Platform? 

NetApp strives to deliver the best of supported applications. Whether it’s the latest and newest application versions available on the platform or additional platform enhancements, we ensure a high quality through thorough testing before entering General Availability.  

NetApp customers have the advantage of accessing the latest versions—not just the major version releases but also minor version releases—so that they can benefit from any new features and are protected from any vulnerabilities.  

Don’t have an Instaclustr account yet? Sign up for a trial or reach out to our Sales team and start exploring Cassandra 5.0.  

With more than 375 million node hours of management experience, Instaclustr offers unparalleled expertise. Visit our website to learn more about the Instaclustr Managed Platform for Apache Cassandra.  

If you would like to upgrade your Apache Cassandra version or have any issues or questions about provisioning your cluster, please contact Instaclustr Support at any time.  

The post Instaclustr for Apache Cassandra® 5.0 Now Generally Available appeared first on Instaclustr.