Inside the Database Internals Talks at P99 CONF 2025

“Never write a database. Even if you want to, even if you think you should. Resist. Never write a database. Unless you have to write a database. But you don’t.” – Charity Majors
But someone has to write the databases that others rely on. Hearing about the engineering challenges they’re tackling is both fascinating and Schadenfreude-invoking – so perfect tech conference material. 😉 Since database performance is so near and dear to ScyllaDB, we reached out to our friends and colleagues across the community to that ensure a nice range of distributed data systems, approaches, and challenges would be represented at P99 CONF 2025. As you can see from our agenda, the response was overwhelming. A quick PSA for the uninitiated: P99 CONF is a free 2-day community event that’s intentionally virtual, highly interactive, and purely technical. It’s an immersion into all things performance. Distributed systems, database internals, Rust, C++, Java, Go, Wasm, Zig, Linux kernel, tracing, AI/ML & more – it’s all on the agenda. This year, you can look forward to first-hand engineering experiences from the likes of Pinterest, Clickhouse, Gemini, Arm, Rivian and VW Group Technology, Meta, Wayfair, Disney, NVIDIA, Turso, Neon, TigerBeetle, ScyllaDB, and too many others to list here. Here’s a sneak peek of the database internals talks you can look forward to at P99 CONF 2025… Join us at P99 CONF (free + virtual) Clickhouse’s C++ and Rust Journey Alexey Milovidov, Co-founder and CTO at Clickhouse Full rewrite from C++ to Rust or gradual integration with Rust libraries? For a large C++ codebase, only the latter works, but even then, there are many complications and rough edges. In my presentation, I will describe our experience integrating Rust and C++ code and some weird and unusual problems we had to overcome. Rethinking Durable Workflows and Queues: A Library-based Approach Qian Li, Co-founder at DBOS, Inc Durable workflow engines checkpoint program state to persistent storage (like a database) so that execution can always recover from where it left off. Most systems today rely on external orchestration: a centralized orchestrator and distributed workers communicating via message-passing. While this model is well-established, it’s often heavyweight, introducing substantial overhead, write amplification, and operational complexity. In this talk, we explore an alternative: a lightweight library-based durable workflow engine that embeds into application code and checkpoint state directly to the database. It handles queues and flow control through the database itself. This approach eliminates the need for a separate orchestrator, reduces network traffic, and improves performance by avoiding unnecessary writes. We’ll share our experience building DBOS, a library-based engine designed for simplicity and efficiency. We’ll discuss the architectural trade-offs, challenges in failure recovery, and key optimizations for scalability and maintainability. The Gory Details of a Full-Featured Userspace CPU Scheduler Avi Kivity, Co-founder and CTO at ScyllaDB Userspace CPU schedulers, which often accompany asynchronous I/O engines like io_uring and Linux AIO, are usually simplistic run-to-completion FIFO loops. This suffices for I/O bound applications, but for use cases that can be both CPU bound and I/O bound, this is not enough. Avi Kivity, CTO of ScyllaDB and co-maintainer of Seastar, will cover the design and implementation of the Seastar userspace CPU scheduler, which caters to more complex applications that require preemption and prioritization. The Tale of Taming TigerBeetle’s Tail Latency Tobias Ziegler, Software Engineer at Tigerbeetle In this talk, we dive into how we reduced TigerBeetle’s tail latency through algorithm engineering. ‘Algorithm engineering goes beyond studying theoretical complexity and considers how algorithms are executed efficiently on modern super-scalar CPUs. Specifically, we will look at Radix Sort and a k-way merge and explore how to implement them efficiently. We then demonstrate how we apply these algorithms incrementally to avoid latency spikes in practice. Why We’re Rewriting SQLite in Rust Glauber Costa, Co-founder and CEO at Turso Over two years ago, we forked SQLite. We were huge fans of the embedded nature of SQLite, but wanted a more open model of development…and libSQL was born as an Open Contribution project. Last year, as we were adding Vector Search to SQLite, we had a crazy idea. What could we achieve if we were to completely rewrite SQLite in Rust? This talk explains what drove us down this path, how we’re using deterministic simulation testing to ensure the reliability of the Rust rewrite, and the lessons learned (so far). I will show how a reimagining of this iconic database can lead to performance improvements of over 500x in some cases by looking at what powers it under the hood. Shared Nothing Databases at Scale Nick Van Wiggeren, CTO at PlanetScale This talk will discuss how PlanetScale scales databases in the cloud, focusing on a shared-nothing architecture that is built around expecting failure. Nick will go into how they built low-latency high-throughput systems that span multiple nodes, availability zones, and regions, while maintaining sub-millisecond response times. This starts at the storage layer and builds all the way up to micro-optimizing the load balancer, with a lot of learning at every step of the way. Reworking the Neon IO stack: Rust+tokio+io_uring+O_DIRECT Christian Schwarz,  Member of Technical Staff at Databricks Neon is a serverless Postgres platform. Recently acquired by Databricks, the same technology now also powers Databricks Lakebase. In this talk, we will dive into Pageserver, the multi-tenant storage service at the heart of the architecture. We share techniques and lessons learned from reworking its IO stack to a fully asynchronous model, with direct IO against local NVMe drives; all during a period of rapid growth. Pageserver is implemented in Rust, we use the tokio async runtime for networking, and integrate it with io_uring for filesystem access. A Deep Dive into the Seastar Event Loop Pavel Emelyanov, Principal Software Engineer at ScyllaDB The core and the basis of ScyllaDB’s outstanding performance is the Seastar framework, and the core and the basis of seastar is its event loop. In this presentation, we’ll see what the loop does in great detail, analyze the limitations that it runs in and all the consequences that follow those limitations. We’ll also learn how the loop is observed by the user and various means to understand its behavior. Cost Effective, Low Latency Vector Search In Databases: A Case Study with Azure Cosmos DB Magdalen Manohar, Senior Researcher at Microsoft We’ve integrated DiskANN, a state-of-the-art vector indexing algorithm, into Azure Cosmos DB NoSQL, a state-of-the-art cloud-native operational database. Learn how we overcame the systems and algorithmic challenges of this integration to achieve <20ms query latency at the 10 million scale, while supporting scale-out to billions of vectors via automatic partitioning. Measuring Query Latency the Hard Way: An Adventure in Impractical Postgres Monitoring Simon Notley, Observability and Optimization at EnterpriseDB Sampling the session state (as exposed by pg_stat_activity) is a surprisingly powerful way to understand how your Postgres instance spends its time. It is something I can wholeheartedly recommend to any Postgres DBA that needs a lightweight way to monitor query performance in production. However, it’s a terrible way to measure query latency, fraught with complexity and weird statistical biases that could be avoided by simply using an extension built for the job, or even log analysis. But pursuing terrible ideas can be fun, so in this talk, I dive into my adventures in measuring query latency from session sampling, generate some extremely funky charts, and end up unexpectedly performing a vector similarity search. In this talk I’ll show how instead of attempting to correct the biases that plague estimates of query latency based time-domain sampling, we can instead pre-calculate the distribution of (biased) estimates based on a range of true distributions and use vector search to compare our observed distribution to these pre-calculate ones, thereby inferring the true query latency. This ‘eccentric’ method is actually surprisingly effective, and surprisingly fun. Fast and Deterministic Full Table Scans at Scale Felipe Cardeneti Mendes, Technical Director at ScyllaDB ScyllaDB’s new tablet replication algorithm replaces static vNodes with dynamic, elastic data distribution that adapts to shifting workloads. This talk discusses how tablets enable fast, predictable full table scans by keeping operations shard-local, balancing load automatically, and scaling linearly through a simple layer of indirection. Optimizing Tiered Storage for Low-Latency Real-Time Analytics Neha Pawar, Founding Engineer and Head of Data at StarTree Real-time OLAP databases usually trade performance for cost when moving from local storage to cloud object storage. This talk shows how we extended Apache Pinot to use cloud storage while still achieving sub-second P99 latencies. We’ll cover the abstraction that makes Pinot location-agnostic, strategies like pipelining, prefetching, and selective block fetches, and how to balance local and cloud storage for both cost efficiency and speed. As Fast as Possible, But Not Faster: ScyllaDB Flow Control Nadav Har’El, Distinguished Engineer at ScyllaDB  Pushing requests faster than a system can handle results in rapidly growing queues. If unchecked, it risks depleting memory and system stability. This talk discusses how we engineered ScyllaDB’s flow control for high volume ingestions, allowing it to throttle over-eager clients to exactly the right pace – not so fast that we run out of memory, but also not so slow that we let available resources go to waste. Push the Database Beyond the Edge Nikita Sivukhin, Software Engineer at Turso Almost any application can benefit from having data available locally – enabling blazing-fast access and optimized write patterns. This talk will walk you through one approach to designing a full-featured sync engine, applicable across a wide range of domains, including front-end, back-end, and machine learning training. Engineering a Low-Latency Vector Search Engine for ScyllaDB Pawel Pery, Senior Software Engineer at ScyllaDB Implementing Vector Search in ScyllaDB brings challenges from low-latency to predictable performance at scale. Rather than embedding HNSW indexing directly into the core database, we decoupled vector indexing and similarity search into a dedicated Rust engine. Learn about the architectural design decisions that enabled us to combine and integrate ScyllaDB’s shard-per-core for real-time operations and high-performance ANN processing via USearch. We Told B+ Trees to Do Sorted Sets—They Nailed It (Joe Zhou, Dragonfly) Joe Zhou, Developer Advocate at DragonflyDB Sorted sets are a critical Redis data type used for leaderboards, time-series data, and priority queues. However, Redis’s skiplist-based implementation introduces significant memory overhead—averaging 37 bytes per entry on top of the essential 16 bytes for the (member, score) pair. For large sorted sets, this inefficiency can become a major bottleneck. In this talk, we’ll explore how Dragonfly reimplemented sorted sets using a B+ tree, reducing memory overhead to just 2-3 bytes per entry while improving performance. We’ll cover: Why skiplists are inefficient for large sorted sets. How B+ trees with bucketing drastically cut memory usage while maintaining O(log N) operations. Benchmark results showing 40% lower memory and better throughput vs. Redis. This optimization, now stable in Dragonfly, demonstrates how rethinking core data structures can unlock major efficiency gains. Attendees will leave with insights into: Trade-offs between skiplists and B+ trees. Real-world impact on memory and latency (P99 improvements). Lessons from implementing a custom ranking API for B+ trees. Keynote: Andy Pavlo You can also look forward to a keynote by Andy Pavlo.  We’re not revealing the topic yet, but if you know Andy, you know you won’t want to miss it. Join us at P99 CONF (free + virtual)

Building a Resilient Data Platform with Write-Ahead Log at Netflix

By Prudhviraj Karumanchi, Samuel Fu, Sriram Rangarajan, Vidhya Arvind, Yun Wang, John Lu

Introduction

Netflix operates at a massive scale, serving hundreds of millions of users with diverse content and features. Behind the scenes, ensuring data consistency, reliability, and efficient operations across various services presents a continuous challenge. At the heart of many critical functions lies the concept of a Write-Ahead Log (WAL) abstraction. At Netflix scale, every challenge gets amplified. Some of the key challenges we encountered include:

  • Accidental data loss and data corruption in databases
  • System entropy across different datastores (e.g., writing to Cassandra and Elasticsearch)
  • Handling updates to multiple partitions (e.g., building secondary indices on top of a NoSQL database)
  • Data replication (in-region and across regions)
  • Reliable retry mechanisms for real time data pipeline at scale
  • Bulk deletes to database causing OOM on the Key-Value nodes

All the above challenges either resulted in production incidents or outages, consumed significant engineering resources, or led to bespoke solutions and technical debt. During one particular incident, a developer issued an ALTER TABLE command that led to data corruption. Fortunately, the data was fronted by a cache, so the ability to extend cache TTL quickly together with the app writing the mutations to Kafka allowed us to recover. Absent the resilience features on the application, there would have been permanent data loss. As the data platform team, we needed to provide resilience and guarantees to protect not just this application, but all the critical applications we have at Netflix.

Regarding the retry mechanisms for real time data pipelines, Netflix operates at a massive scale where failures (network errors, downstream service outages, etc.) are inevitable. We needed a reliable and scalable way to retry failed messages, without sacrificing throughput.

With these problems in mind, we decided to build a system that would solve all the aforementioned issues and continue to serve the future needs of Netflix in the online data platform space. Our Write-Ahead Log (WAL) is a distributed system that captures data changes, provides strong durability guarantees, and reliably delivers these changes to downstream consumers. This blog post dives into how Netflix is building a generic WAL solution to address common data challenges, enhance developer efficiency, and power high-leverage capabilities like secondary indices, enable cross-region replication for non-replicated storage engines, and support widely used patterns like delayed queues.

API

Our API is intentionally simple, exposing just the essential parameters. WAL has one main API endpoint, WriteToLog, abstracting away the internal implementation and ensuring that users can onboard easily.

rpc WriteToLog (WriteToLogRequest) returns (WriteToLogResponse) {...}

/**
* WAL request message
* namespace: Identifier for a particular WAL
* lifecycle: How much delay to set and original write time
* payload: Payload of the message
* target: Details of where to send the payload
*/
message WriteToLogRequest {
string namespace = 1;
Lifecycle lifecycle = 2;
bytes payload = 3;
Target target = 4;
}

/**
* WAL response message
* durable: Whether the request succeeded, failed, or unknown
* message: Reason for failure
*/
message WriteToLogResponse {
Trilean durable = 1;
string message = 2;
}

A namespace defines where and how data is stored, providing logical separation while abstracting the underlying storage systems. Each namespace can be configured to use different queues: Kafka, SQS, or combinations of multiple. Namespace also serves as a central configuration of settings, such as backoff multiplier or maximum number of retry attempts, and more. This flexibility allows our Data Platform to route different use cases to the most suitable storage system based on performance, durability, and consistency needs.

WAL can assume different personas depending on the namespace configuration.

Persona #1 (Delayed Queues)

In the example configuration below, the Product Data Systems (PDS) namespace uses SQS as the underlying message queue, enabling delayed messages. PDS uses Kafka extensively, and failures (network errors, downstream service outages, etc.) are inevitable. We needed a reliable and scalable way to retry failed messages, without sacrificing throughput. That’s when PDS started leveraging WAL for delayed messages.

"persistenceConfigurations": {
"persistenceConfiguration": [
{
"physicalStorage": {
"type": "SQS",
},
"config": {
"wal-queue": [
"dgwwal-dq-pds"
],
"wal-dlq-queue": [
"dgwwal-dlq-pds"
],
"queue.poll-interval.secs": 10,
"queue.max-messages-per-poll": 100
}
}
]
}

Persona #2 (Generic Cross-Region Replication)

Below is the namespace configuration for cross-region replication of EVCache using WAL, which replicates messages from a source region to multiple destinations. It uses Kafka under the hood.

"persistence_configurations": {
"persistence_configuration": [
{
"physical_storage": {
"type": "KAFKA"
},
"config": {
"consumer_stack": "consumer",
"context": "This is for cross region replication for evcache_foobar",
"target": {
"euwest1": "dgwwal.foobar.cluster.eu-west-1.netflix.net",
"type": "evc-replication",
"useast1": "dgwwal.foobar.cluster.us-east-1.netflix.net",
"useast2": "dgwwal.foobar.cluster.us-east-2.netflix.net",
"uswest2": "dgwwal.foobar.cluster.us-west-2.netflix.net"
},
"wal-kafka-dlq-topics": [],
"wal-kafka-topics": [
"evcache_foobar"
],
"wal.kafka.bootstrap.servers.prefix": "kafka-foobar"
}
}
]
}

Persona #3 (Handling multi-partition mutations)

Below is the namespace configuration for supporting mutateItems API in Key-Value, where multiple write requests can go to different partitions and have to be eventually consistent. A key detail in the below configuration is the presence of Kafka and durable_storage. These data stores are required to facilitate two phase commit semantics, which we will discuss in detail below.

"persistence_configurations": {
"persistence_configuration": [
{
"physical_storage": {
"type": "KAFKA"
},
"config": {
"consumer_stack": "consumer",
"contacts": "unknown",
"context": "WAL to support multi-id/namespace mutations for dgwkv.foobar",
"durable_storage": {
"namespace": "foobar_wal_type",
"shard": "walfoobar",
"type": "kv"
},
"target": {},
"wal-kafka-dlq-topics": [
"foobar_kv_multi_id-dlq"
],
"wal-kafka-topics": [
"foobar_kv_multi_id"
],
"wal.kafka.bootstrap.servers.prefix": "kaas_kafka-dgwwal_foobar7102"
}
}
]
}

An important note is that requests to WAL support at-least once semantics due to the underlying implementation.

Under the Hood

The core architecture consists of several key components working together.

Message Producer and Message Consumer separation: The message producer receives incoming messages from client applications and adds them into the queue, while the message consumer processes messages from the queue and sends them to the targets. Because of this separation, other systems can bring their own pluggable producers or consumers, depending on their use cases. WAL’s control plane allows for a pluggable model, which, depending on the use-case, allows us to switch between different message queues.

SQS and Kafka with a dead letter queue by default: Every WAL namespace has its own message queue and gets a dead letter queue (DLQ) by default, because there can be transient errors and hard errors. Application teams using Key-Value abstraction simply need to toggle a flag to enable WAL and get all this functionality without needing to understand the underlying complexity.

  • Kafka-backed namespaces: handle standard message processing
  • SQS-backed namespaces: support delayed queue semantics (we added custom logic to go beyond the standard defaults enforced in terms of delay, size limits, etc)
  • Complex multi-partition scenarios: use queues and durable storage

Target Flexibility: The messages added to WAL are pushed to the target datastores. Targets can be Cassandra databases, Memcached caches, Kafka queues, or upstream applications. Users can specify the target via namespace configuration and in the API itself.

Architecture of WAL
Architecture of WAL

Deployment Model

WAL is deployed using the Data Gateway infrastructure. This means that WAL deployments automatically come with mTLS, connection management, authentication, runtime and deployment configurations out of the box.

Each data gateway abstraction (including WAL) is deployed as a shard. A shard is a physical concept describing a group of hardware instances. Each use case of WAL is usually deployed as a separate shard. For example, the Ads Events service will send requests to WAL shard A, while the Gaming Catalog service will send requests to WAL shard B, allowing for separation of concerns and avoiding noisy neighbour problems.

Each shard of WAL can have multiple namespaces. A namespace is a logical concept describing a configuration. Each request to WAL has to specify its namespace so that WAL can apply the correct configuration to the request. Each namespace has its own configuration of queues to ensure isolation per use case. If the underlying queue of a WAL namespace becomes the bottleneck of throughput, the operators can choose to add more queues on the fly by modifying the namespace configurations. The concept of shards and namespaces is shared across all Data Gateway Abstractions, including Key-Value, Counter, Timeseries, etc. The namespace configurations are stored in a globally replicated Relational SQL database to ensure availability and consistency.

Deployment model of WAL
Deployment model of WAL

Based on certain CPU and network thresholds, the Producer group and the Consumer group of each shard will (separately) automatically scale up the number of instances to ensure the service has low latency, high throughput and high availability. WAL, along with other abstractions, also uses the Netflix adaptive load shedding libraries and Envoy to automatically shed requests beyond a certain limit. WAL can be deployed to multiple regions, so each region will deploy its own group of instances.

Solving different flavors of problems with no change to the core architecture

The WAL addresses multiple data reliability challenges with no changes to the core architecture:

Data Loss Prevention: In case of database downtime, WAL can continue to hold the incoming mutations. When the database becomes available again, replay mutations back to the database. The tradeoff is eventual consistency rather than immediate consistency, and no data loss.

Generic Data Replication: For systems like EVCache (using Memcached) and RocksDB that do not support replication by default, WAL provides systematic replication (both in-region and across-region). The target can be another application, another WAL, or another queue — it’s completely pluggable through configuration.

System Entropy and Multi-Partition Solutions: Whether dealing with writes across two databases (like Cassandra and Elasticsearch) or mutations across multiple partitions in one database, the solution is the same — write to WAL first, then let the WAL consumer handle the mutations. No more asynchronous repairs needed; WAL handles retries and backoff automatically.

Data Corruption Recovery: In case of DB corruptions, restore to the last known good backup, then replay mutations from WAL omitting the offending write/mutation.

There are some major differences between using WAL and directly using Kafka/SQS. WAL is an abstraction on the underlying queues, so the underlying technology can be swapped out depending on use cases with no code changes. WAL emphasizes an easy yet effective API that saves users from complicated setups and configurations. We leverage the control plane to pivot technologies behind WAL when needed without app or client intervention.

WAL usage at Netflix

Delay Queue

The most common use case for WAL is as a Delay Queue. If an application is interested in sending a request at a certain time in the future, it can offload its requests to WAL, which guarantees that their requests will land after the specified delay.

Netflix’s Live Origin processes and delivers Netflix live stream video chunks, storing its video data in a Key-Value abstraction backed by Cassandra and EVCache. When Live Origin decides to delete certain video data after an event is completed, it issues delete requests to the Key-Value abstraction. However, the large amount of delete requests in a short burst interfere with the more important real-time read/write requests, causing performance issues in Cassandra and timeouts for the incoming live traffic. To get around this, Key-Value issues the delete requests to WAL first, with a random delay and jitter set for each delete request. WAL, after the delay, sends the delete requests back to Key-Value. Since the deletes are now a flatter curve of requests over time, Key-Value is then able to send the requests to the datastore with no issues.

Requests being spread out over time through delayed requests
Requests being spread out over time through delayed requests

Additionally, WAL is used by many services that utilize Kafka to stream events, including Ads, Gaming, Product Data Systems, etc. Whenever Kafka requests fail for any reason, the client apps will send WAL a request to retry the kafka request with a delay. This abstracts away the backoff and retry layer of Kafka for many teams, increasing developer efficiency.

Backoff and delayed retries for clients producing to Kafka
Backoff and delayed retries for clients consuming from Kafka

Cross-Region Replication

WAL is also used for global cross-region replication. The architecture of WAL is generic and allows any datastore/applications to onboard for cross-region replication. Currently, the largest use case is EVCache, and we are working to onboard other storage engines.

EVCache is deployed by clusters of Memcached instances across multiple regions, where each cluster in each region shares the same data. Each region’s client apps will write, read, or delete data from the EVCache cluster of the same region. To ensure global consistency, the EVCache client of one region will replicate write and delete requests to all other regions. To implement this, the EVCache client that originated the request will send the request to a WAL corresponding to the EVCache cluster and region.

Since the EVCache client acts as the message producer group in this case, WAL only needs to deploy the message consumer groups. From there, the multiple message consumers are set up to each target region. They will read from the Kafka topic, and send the replicated write or delete requests to a Writer group in their target region. The Writer group will then go ahead and replicate the request to the EVCache server in the same region.

EVCache Global Cross-Region Replication Implemented through WAL

The biggest benefits of this approach, compared to our legacy architecture, is being able to migrate from multi-tenant architecture to single tenant architecture for the most latency sensitive applications. For example, Live Origin will have its own dedicated Message Consumer and Writer groups, while a less latency sensitive service can be multi-tenant. This helps us reduce the blast radius of the issues and also prevents noisy neighbor issues.

Multi-Table Mutations

WAL is used by Key-Value service to build the MutateItems API. WAL enables the API’s multi-table and multi-id mutations by implementing 2-phase commit semantics under the hood. For this discussion, we can assume that Key-Value service is backed by Cassandra, and each of its namespaces represents a certain table in a Cassandra DB.

When a Key-Value client issues a MutateItems request to Key-Value server, the request can contain multiple PutItems or DeleteItems requests. Each of those requests can go to different ids and namespaces, or Cassandra tables.

message MutateItemsRequest {
repeated MutationRequest mutations = 1;
message MutationRequest {
oneof mutation {
PutItemsRequest put = 1;
DeleteItemsRequest delete = 2;
}
}
}

The MutateItems request operates on an eventually consistent model. When the Key-Value server returns a success response, it guarantees that every operation within the MutateItemsRequest will eventually complete successfully. Individual put or delete operations may be partitioned into smaller chunks based on request size, meaning a single operation could spawn multiple chunk requests that must be processed in a specific sequence.

Two approaches exist to ensure Key-Value client requests achieve success. The synchronous approach involves client-side retries until all mutations complete. However, this method introduces significant challenges; datastores might not natively support transactions and provide no guarantees about the entire request succeeding. Additionally, when more than one replica set is involved in a request, latency occurs in unexpected ways, and the entire request chain must be retried. Also, partial failures in synchronous processing can leave the database in an inconsistent state if some mutations succeed while others fail, requiring complex rollback mechanisms or leaving data integrity compromised. The asynchronous approach was ultimately adopted to address these performance and consistency concerns.

Given Key-Value’s stateless architecture, the service cannot maintain the mutation success state or guarantee order internally. Instead, it leverages a Write-Ahead Log (WAL) to guarantee mutation completion. For each MutateItems request, Key-Value forwards individual put or delete operations to WAL as they arrive, with each operation tagged with a sequence number to preserve ordering. After transmitting all mutations, Key-Value sends a completion marker indicating the full request has been submitted.

The WAL producer receives these messages and persists the content, state, and ordering information to a durable storage. The message producer then forwards only the completion marker to the message queue. The message consumer retrieves these markers from the queue and reconstructs the complete mutation set by reading the stored state and content data, ordering operations according to their designated sequence. Failed mutations trigger re-queuing of the completion marker for subsequent retry attempts.

Architecture of Multi-Table Mutations through WAL
Sequence diagram for Multi-Table Mutations through WAL

Closing Thoughts

Building Netflix’s generic Write-Ahead Log system has taught us several key lessons that guided our design decisions:

Pluggable Architecture is Core: The ability to support different targets, whether databases, caches, queues, or upstream applications, through configuration rather than code changes has been fundamental to WAL’s success across diverse use cases.

Leverage Existing Building Blocks: We had control plane infrastructure, Key-Value abstractions, and other components already in place. Building on top of these existing abstractions allowed us to focus on the unique challenges WAL needed to solve.

Separation of Concerns Enables Scale: By separating message processing from consumption and allowing independent scaling of each component, we can handle traffic surges and failures more gracefully.

Systems Fail — Consider Tradeoffs Carefully: WAL itself has failure modes, including traffic surges, slow consumers, and non-transient errors. We use abstractions and operational strategies like data partitioning and backpressure signals to handle these, but the tradeoffs must be understood.

Future work

  • We are planning to add secondary indices in Key-Value service leveraging WAL.
  • WAL can also be used by a service to guarantee sending requests to multiple datastores. For example, a database and a backup, or a database and a queue at the same time etc.

Acknowledgements

Launching WAL was a collaborative effort involving multiple teams at Netflix, and we are grateful to everyone who contributed to making this idea a reality. We would like to thank the following teams for their roles in this launch.

  • Caching team — Additional thanks to Shih-Hao Yeh, Akashdeep Goel for contributing to cross region replication for KV, EVCache etc. and owning this service.
  • Product Data System team — Carlos Matias Herrero, Brandon Bremen for contributing to the delay queue design and being early adopters of WAL giving valuable feedback.
  • KeyValue and Composite abstractions team — Raj Ummadisetty for feedback on API design and mutateItems design discussions. Rajiv Shringi for feedback on API design.
  • Kafka and Real Time Data Infrastructure teams — Nick Mahilani for feedback and inputs on integrating the WAL client into Kafka client. Sundaram Ananthanarayan for design discussions around the possibility of leveraging Flink for some of the WAL use cases.
  • Joseph Lynch for providing strategic direction and organizational support for this project.

Building a Resilient Data Platform with Write-Ahead Log at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

ScyllaDB X Cloud: An Inside Look with Avi Kivity (Part 3)

ScyllaDB’s co-founder/CTO discusses decisions to increase efficiency for storage-bound workloads and allow deployment on mixed size clusters To get the engineering perspective on the recent shifts to ScyllaDB’s architecture, Tim Koopmans recently caught up with ScyllaDB Co-Founder and CTO Avi Kivity. In part 1 of this 3-part series, they talked about the motivations and architectural shifts behind ScyllaDB X Cloud, particularly with respect to Raft and tablets-based data distribution. In part 2, they went deeper into how tablets work, then looked at the design of ScyllaDB X Cloud’s autoscaling. In this final part of the series, they discuss changes that increase efficiency for storage-bound workloads and allow deployment on mixed size clusters. You can watch the complete video here. Storage-bound workloads and compression Tim: Let’s switch gears and talk about compression. This was a way to double-down on storage-bound workloads, right? Would you say storage-bound workloads are more common than CPU bound ones? Is that what’s driving this? Avi: Yes, and there’s two reasons for that. One reason is that our CPU efficiency is quite high. If you’re CPU-efficient, then storage is going to dominate. And the other reason is that when your database grows – say it’s twice as large as before – it’s rare that you actually have twice the amount of work. It can happen. But for many workloads, the growth of data is mostly historical, so the number of ops doesn’t scale linearly with the size of the database. As the database grows, the ratio of ops to storage decreases, and it becomes storage-bound. So, many of our larger workloads are storage-bound. The small and medium ones can be either storage-bound or CPU-bound…it really depends on the workload. We have some workloads where most of the storage in the cluster isn’t used because they’re so CPU-intensive. And we have others where the CPU is mostly idle, but the cluster is holding a lot of storage. We try to cater to all of these workloads. Tim: So a storage-bound workload is likely to have lower CPU utilization in general, and that gives you more CPU bandwidth to do things like more advanced compression? What’s the default compression, in terms of planning for storage? Is it like 50%? Or what’s the typical rate? Or is the real answer just “it depends”? Avi: “It depends” is an easy escape, but the truth is there’s a wide variety of storage options now. A recent addition is dictionary-based compression. That’s where the cluster periodically samples data on disk and constructs a dictionary from those samples. That dictionary is then used to boost compression. Everyone probably knows dictionary compression: it finds repetitive byte sequences in the data and matches against them. By having samples, you can match against the samples and gain higher compression. We recently started rolling it out, and it does give a nice improvement. Of course, it varies widely. Some people store data that’s already compressed, so it won’t compress further. Others store data like JSON, which compresses very well. In those cases, we might see above 50% compression ratios. And for many storage-bound workloads, you can set the compression parameters higher and gain more compression at the expense of CPU…but it’s CPU that you already have. Tim: Is there anything else on the compression roadmap, like column aware compression? Avi:  It’s not on the roadmap yet, but we will do columnar storage for time series and data. But there’s no timeline for that yet. Tim: Any hardware accelerated stuff? Avi: We looked at hardware acceleration, but it’s too rare to really matter. One problem is that on the cloud, it’s only available with the very largest instance sizes. And while we do have clusters with large instance sizes, it’s not enough to justify the work. I’m talking about machines with 96 vCPUs and 60TB of storage per node. It would only make sense for the very largest clusters, the petabyte-class clusters. They do exist, but they’re not yet common enough to make it worth the effort. On smaller instances, the accelerators are just hidden by virtualization. The other problem with hardware-accelerated compression is that it doesn’t keep up with the advances in software compression. That’s a general problem with hardware. For example, dictionary compression isn’t supported by those accelerators, but dictionary compression is very useful. We wouldn’t want to give that up. Tim:  Yeah, it seems like unless there’s a very specific, almost niche need for it, it’s safer to stick with software-based compression. Mixed size  types  & CPU: Storage ratios Tim: And in a roundabout way, this brings me back to the last thing I wanted to ask about. I think we’ve already touched on it: the idea of 90% storage utilization. You’ve already mentioned reasons why, including tablets. And we also spoke about having mixed instance types in the cluster. That’s quite significant for this release, right? Avi: Yes, it’s quite important. Assume you have those large instances with 96 vCPUs and 60TB of storage per node… and your data grows. It’s not doubling, just incremental growth. If you have a large amount of data, the rate of growth won’t be very large. So, you want to add a smaller amount of storage each time, not 60TB. That gives you two options. One option is to compose your cluster from a large number of very small instances. But large clusters introduce management problems. The odds of a node failing grow as the cluster grows, so you want to keep clusters at a manageable size. The other option is to have mixed-size clusters. For example, if you have clusters of 60TB nodes, then you might add a 6TB node. As the data grows, you can then replace those smaller nodes with larger ones, until you’re back to having a cluster that’s full of the largest node size. There’s another reason for mixed-size clusters: changing the CPU-to-storage ratio. Typically, storage bound clusters use nodes with a large disk-to-CPU ratio – a lot of disk and relatively little CPU. But there might be times across a day or throughout the year where the number of OPS increases without a corresponding increase in storage. For example, think about Black Friday or workloads spiking in certain geographies. In those cases, you might switch from nodes with a low CPU-to-disk ratio to ones with a high CPU-to-disk ratio, then switch back later. That way, you keep total storage constant, but increase the amount of CPU serving that storage. It lets you adapt to changing CPU requirements without having to buy more storage. Tim: Got it. So it’s really about averaging out the ratios to get the price–performance balance you want between storage and CPU. Is that something the user has to figure out, or does it fall under the autoscaler? Avi:  It will be automatic. It’s too much to ask a user to track the right mix of instances and keep managing that. Looking back and looking forward Tim: Looking back, and a little forward…if you could go back to 2014, when you first came up with ScyllaDB, would you tell your past self to do anything different? Or do you think it’s evolved naturally? Would you save yourself some pain? Avi:  Yeah. So, when you start a project, it always looks simple and you think you know everything. Then you discover how much you didn’t know. I don’t even know what my 2014 self would say about how much I mispredicted the amount of work that would be necessary to do this. I mean, I knew databases were hard – one of the most complex areas in software engineering – but I didn’t know how hard. Tim: And what about looking forward?What’s the next big thing on the horizon that people aren’t really talking about yet? Avi:  I want to fully complete the tablets project before we talk about the next step. Tim:  Just one last question from me before we wrap. Aside from the correct pronunciation of ScyllaDB, what’s the most misunderstood part of ScyllaDB’s new architecture? What are people getting wrong? Avi:  I don’t think people are getting it wrong. It’s not that complicated. It’s another layer of indirection, and people do understand that. We have some nice visualizations of that as well. Maybe we should have a session showing how tablets move around, because it’s a little like Tetris – how we fit different tablets to fill the nodes.  So I think tablets are easily understood. It’s complex to implement, but not complicated to understand.

The Latency vs. Complexity Tradeoffs with 6 Caching Strategies

How to choose between cache-aside, read-through, write-through, client-side, and distributed caching strategies As we mentioned in the recent  Why Cache Data? post,  we’re delighted that Pekka Enberg decided to write an entire book on latency and we’re proud to sponsor 3 chapters from it. Get the Latency book excerpt PDF Also, Pekka will be sharing key takeaways from that book in a (free + virtual)  Latency Masterclass on September 25. Be there as he speedruns through the latency-related patterns you’ll want to know when working on low-latency apps. And bring your toughest latency questions! Join us live on September 25 (free + virtual) Until then, let’s continue our Latency book excerpts with moe from Pekka’s caching chapter. It’s reprinted here with permission of the publisher. *** When adding caching to your application, you must first consider your caching strategy, which determines how reads and writes happen from the cache and the underlying backing store, such as a database or a service. At a high level, you need to decide if the cache is passive or active when there is a cache miss. In other words, when your application looks up a value from the cache, but the value is not there or has expired, the caching strategy mandates whether it’s your application or the cache that retrieves the value from the backing store. As usual, different caching strategies have different trade-offs on latency and complexity, so let’s get right into it. Cache-Aside Caching Cache-aside caching is perhaps the most typical caching strategy you will encounter. When there is a cache hit, data access latency is dominated by communication latency, which is typically small, as you can get a cache close by on a cache server or even in your application memory space. However, when there is a cache miss, with cache-aside caching, the cache is a passive store updated by the application. That is, the cache just reports a miss and the application is responsible for fetching data from the backing store and updating the cache. Figure 1 shows an example of cache-aside caching in action. An application looks up a value from a cache by a caching key, which determines the data the application is interested in. If the key exists in the cache, the cache returns the value associated with the key, which the application can use. However, if the key does not exist or is expired in the cache, we have a cache miss, which the application has to handle. The application queries the value from the backing store and stores the value in the cache. Suppose you are caching user information and using the user ID as the lookup key. In that case, the application performs a query by the user ID to read user information from the database. The user information returned from the database is then transformed into a format you can store in the cache. Then, the cache is updated with the user ID as the cache key and the information as the value. For example, a typical way to perform this type of caching is to transform the user information returned from the database into JSON and store that in the cache. Figure 1: With cache-aside caching, the client first looks up a key from the cache. On cache miss, the client queries the database and updates the cache. Cache-aside caching is popular because it is easy to set up a cache server such as Redis and use it to cache database queries and service responses. With cache-aside caching, the cache server is passive and does not need to know which database you use or how the results are mapped to the cache. It is your application doing all the cache management and data transformation. In many cases, cache-aside caching is a simple and effective way to reduce application latency. You can hide database access latency by having the most relevant information in a cache server close to your application. However, cache-aside caching can also be problematic if you have data consistency or freshness requirements. For example, if you have multiple concurrent readers that are looking up a key in the cache, you need to coordinate in your application how you handle concurrent cache misses; otherwise, you may end up with multiple database accesses and cache updates, which may result in subsequent cache lookups returning different values. However, with cache-aside caching, you lose transaction support because the cache and the database do not know each other, and it’s the application’s responsibility to coordinate updates to the data. Finally, cache-aside caching can have significant tail latency because some cache lookups experience the database read latency on a cache miss. That is, although in the case of a cache hit, access latency is fast because it’s coming from a nearby cache server; cache lookups that experience a cache miss are only as fast as database access. That’s why the geographic latency to your database still can matter a great deal even if you are caching because tail latency is experienced surprisingly often in many scenarios. Read-Through Caching Read-through caching is a strategy where, unlike cache-aside caching, the cache is an active component when there is a cache miss. When there is a cache miss, a read-through cache attempts to read a value for the key from the backing store automatically. Latency is similar to cache-aside caching, although backing store retrieval latency is from the cache to the backing store, not from application to backing store, which may be smaller, depending on your deployment architecture. Figure 2 shows an example of a read-through cache in action. The application performs a cache lookup on a key, and if there is a cache miss, the cache performs a read to the database to obtain the value for the key. The cache then updates itself and returns the value to the application. From an application point of view, a cache miss is transparent because the cache always returns a key if one exists, regardless of whether there was a cache miss or not. Figure 2: With read-through caching, the client looks up a key from the cache. Unlike with cache-aside caching, the cache queries the database and updates itself on cache miss. Read-through caching is more complex to implement because a cache needs to be able to read the backing store, but it also needs to transform the database results into a format for the cache. For example, if the backing store is an SQL database server, you need to convert the query results into a JSON or similar format to store the results in the cache. The cache is, therefore, more coupled with your application logic because it needs to know more about your data model and formats. However, because the cache coordinates the updates and the database reads with read-through caching, it can give transactional guarantees to the application and ensure consistency on concurrent cache misses. Furthermore, although a read-through cache is more complex from an application integration point of view, it does remove cache management complexity from the application. Of course, the same caveat of tail latency applies to read-through caches as they do to cache-aside caching. An exception: as active components, read-through caches can hide the latency better with, for example, refresh-ahead caching. Here, the cache asynchronously updates the cache before the values are expired – therefore hiding the database access latency from applications altogether when a value is in the cache. Write-Through Caching Cache-aside and read-through caching are strategies around caching reads, but sometimes, you also want the cache to support writes. In such cases, the cache provides an interface for updating the value of a key that the application can invoke. In the case of cache-aside caching, the application is the only one communicating with the backing store and, therefore, updates the cache. However, with read-through caching, there are two options for dealing with writes: write-through and write-behind caching. Write-through caching is a strategy where an update to the cache propagates immediately to the backing store. Whenever a cache is updated, the cache synchronously updates the backing store with the cached value. The write latency of write-through cache is dominated by the write latency to the backing store, which can be significant. As shown in Figure 3, an application updates a cache using an interface provided by the cache with a key and a value pair. The cache updates its state with the new value, updates the database with the new value and waits for the database to commit the update until acknowledging the cache update to the application. Figure 3: With write-through caching, the client writes a key-value pair to the cache. The cache immediately updates the cache and the database. Write-through caching aims to keep the cache and the backing storage in sync. However, for non-transactional caches, the cache and backing store can be out of sync in the presence of errors. For example, if write to cache succeeds, but the write to backing store fails, the two will be out of sync. Of course, a write-through cache can provide transactional guarantees by trading off some latency to ensure that the cache and the database are either both updated or neither of them is. As with a read-through cache, write-through caching assumes that the cache can connect to the database and transform a cache value into a database query. For example, if you are caching user data where the user ID serves as the key and a JSON document represents the value, the cache must be able to transform the JSON representation of user information into a database update. With write-through caching, the simplest solution is often to store the JSON in the database. The primary drawback of write-through caching is the latency associated with cache updates, which is essentially equivalent to database commit latency. This can be significant. Write-Behind Caching Write-behind caching strategy updates the cache immediately, unlike write-through caching, which defers the database updates. In other words, with write-behind caching, the cache may accept multiple updates before updating the backing store, as shown in Figure 4, where the cache accepts three cache updates before updating the database. Figure 4: With write-behind caching, the client writes a key-value pair to the cache. However, unlike with write-through caching, the cache updates the cache but defers the database update. Instead, write-behind cache will batch multiple cache updates to a single database update. The write latency of a write-behind cache is lower than with write-through caching because the backing store is updated asynchronously. That is, the cache can acknowledge the write immediately to the application, resulting in a low-latency write, and then perform the backing store update in the background. However, the downside of write-behind caching is that you lose transaction support because the cache can no longer guarantee that the cache and the database are in sync. Furthermore, write-behind caching can reduce durability, which is the guarantee that you don’t lose data. If the cache crashes before flushing updates to the backing store, you can lose the updates. Client-Side Caching A client-side caching strategy means having the cache at the client layer within your application. Although cache servers such as Redis use in-memory caching, the application must communicate over the network to access the cache via the Redis protocol. If the application is a service running in a data center, a cache server is excellent for caching because the network round trip within a data center is fast, and the cache complexity is in the cache itself. However, last-mile latency can still be a significant factor in user experience on a device, which is why client-side caching is so lucrative. Instead of using a cache server, you have the cache in your application. With client-side caching, a combination of read-through and write-behind caching is optimal from a latency point of view because both reads and writes are fast. Of course, your client usually won’t be able to connect with the database directly, but instead accesses the database indirectly via a proxy or an API server. Client-side caching also makes transactions hard to guarantee because of the database access indirection layers and latency. For many applications that need low-latency client-side caching, the local-first approach to replication may be more practical. But for simple read caching, client-side caching can be a good solution to achieve low latency. Of course, client-side caching also has a trade-off: It can increase the memory consumption of the application because you need space for the cache. Distributed Caching So far, we have only discussed caching as if a single cache instance existed. For example, you use an in-application cache or a single Redis server to cache queries from a PostgreSQL database. However, you often need multiple copies of the data to reduce geographic latency across various locations or scale out to accommodate your workload. With such distributed caching, you have numerous instances of the cache that either work independently or in a cache cluster. With distributed caching, you have many of the same complications and considerations as with discussed in Chapter 4 on replication and Chapter 5 on partitioning. With distributed caching, you don’t want to fit all the cached data on every instance but instead have cached data partitioned between the nodes. Similarly, you can replicate the partitions on multiple instances for high availability and reduced access latency. Overall, distributed caching is an intersection of the benefits and problems of caching, partitioning and replication, so watch out if you’re going with that. *** To keep reading, download the 3-chapter Latency excerpt free from ScyllaDB or purchase the complete book from Manning. Also, catch Pekka live during the upcoming “Building Low Latency Apps Masterclass” (free and virtual).  

ScyllaDB X Cloud: An Inside Look with Avi Kivity (Part 2)

ScyllaDB’s co-founder/CTO goes deeper into how tablets work, then looks at the design behind ScyllaDB X Cloud’s autoscaling Following the recent ScyllaDB X Cloud release, Tim Koopmans sat down (virtually) with ScyllaDB Co-Founder and CTO Avi Kivity. The goal: get the engineering perspective on all the multiyear projects leading up to this release. This includes using Raft for topology and schema metadata, moving from vNodes to tablets-based data distribution, allowing up to 90% storage utilization, new compression approaches, etc. etc. In part 1 of this 3-part series, we looked at the motivations and architectural shifts behind ScyllaDB X Cloud, particularly with respect to Raft and tablets-based data distribution. This blog post goes deeper into how tablets work, then looks at the design behind ScyllaDB X Cloud’s autoscaling. Read part 1 You can watch the complete video here. Tackling technical challenges Tim: With such a complex project, I’m guessing that you didn’t nail everything perfectly on the first try. Could you walk us through some of the hard problems that took time to crack? How did you work around those hurdles? Avi: One of the difficult things was the distribution related to racks or availability zones (we use those terms interchangeably). With the vNodes method of data distribution, a particular replica can hop around different racks. That does work, but it creates problems when you have materialized views. With a materialized view, each row in the base table is tied to a row in the materialized view. If there’s a change in the relationship between which replica on the base table owns the row on the materialized view, that can cause problems with data consistency. We struggled with that a lot until we came to a solution of just forbidding having a replication factor that’s different from the number of racks or availability zones. That simple change solved a lot of problems. It’s a very small restriction because, practically speaking, the vast majority of users have a replication factor of 3, and they use 3 racks or 3 availability zones. So the restriction affects very few people, but solves a large number of problems for us…so we’re happy that we made it. How tablets prevent hot partitions Tim: What about things like hot partitions and data skew in tablets? Does tablets help here since you’re working with smaller chunks? Avi: Yes. With tablets, our granularity is 5GB, so we can balance data in 5GB chunks. That might sound large, but it’s actually very small compared to the node capacity. The 5GB size was selected because it’s around 1% of the data that a single vCPU can hold. For example, an i3 node has around 600GB of storage per vCPU, and 1% of that is 5GB. That’s where the 5GB number came from. Since we control individual tablets, we can isolate a tablet to a single vCPU. Then, instead of a tablet being 1% of a vCPU, it can take 100% of it. That effectively increases the amount of compute power that is dedicated to the tablet by a factor of 100. This will let us isolate hot partitions into their own vCPUs. We don’t do this yet, but detecting hot partitions and isolating them in this way will improve the system’s resilience to hot partition problems. Tim: That’s really interesting. So have we gone from shard per core to almost tablet per core? Is that what the 1% represents, on average? Avi:  The change is that we now have additional flexibility. With a static distribution, you look at the partition key and you know in advance where it will go. Here, you look at the partition key and you consult an indirection table. And that indirection table is under our control…which means we can play with it and adjust things. Tim:  Can you say more about the indirection table? Avi:  It’s called system.tablets. It lays out the topology of the cluster. For every table and every token range, it lists what node and what shard will handle those keys. It’s important that it’s per table. With vNodes, we had the same layout for all tables. Some tables can be very large, some tables can be very small, some tables can be hot, some tables can be cold…so the one-size-fits-all approach doesn’t always work. Now, we have the flexibility to lay out different tables in different ways. How driver changes simplify complexity Tim:  Very cool. So tablets seem to solve a lot of problems – they just have a lot of good things going for them. I guess they can start servicing requests as soon as a new node receives a tablet? That should help with long-tail latency for cluster operations. We also get more fine-grained control over how we pack data into the cluster (and we’ll talk about storage utilization shortly). But you mentioned the additional table. Is there any other overhead or any operational complexity? Avi: Yes. It does introduce more complexity. But since it’s under our control, we also introduced mitigations for that. For example, the drivers now have to know about this indirection layer, so we modified them. We have this reactive approach where a driver doesn’t read the tablets table upfront. Instead, when it doesn’t know the layout of tablets on a cluster, it just fires off a request randomly. If it hits, great. If it misses, then along with the results, we’ll get back a notification about the topology of that particular tablet. As it fires off more requests, it will gradually learn the topology of the cluster. And when the topology changes, it will react to how the cluster layout changes. That saves it from doing a lot of upfront work – so it can send requests as soon as it connects to the cluster.   ScyllaDB’s approach to autoscaling Tim:  Let’s shift over to autoscaling. Autoscaling in databases generally seems more like marketing than reality to me. What’s different about ScyllaDB X Cloud’s approach to autoscaling? Avi:  One difference is that we can autoscale much later, at least for storage-bound workloads. Before, we would scale at around 70% storage utilization. But now we will start scaling at 90%. This decreases the cluster cost because more of the cluster storage is used to store data, rather than being used as a free space cushion. Tablets allow us to do that. Since tablets lets us add nodes concurrently, we can scale much faster. Also, since each tablet is managed independently, we can remove its storage as soon as the tablet is migrated off its previous node. Before, we had to wait until the data was completely transitioned to a new node, and then we would run a cleaner process that would erase it from the original node. But now this is done incrementally (in 5GB increments), so it happens very quickly. We can migrate a 5GB tablet in around a minute, sometimes even less. As soon as a cluster scale out begins, the node storage decreases immediately. That means we can defer the scale out decision, waiting until it’s really needed. Scaling for CPU, by measuring the CPU usage, will be another part of that. CPU is used for many different things in ScyllaDB. It can be used for serving queries, but it’s also used for internal background tasks like compaction. It can also be used for queries that – from the user’s perspective – are background queries like running analytics. You wouldn’t want to scale your cluster just because you’re running analytics on it. These are jobs that can take as long as they need to; you wouldn’t necessarily want to add more hardware just to make them run faster. We can distinguish between CPU usage for foreground tasks (for queries that are latency sensitive) and CPU usage for maintenance tasks, for background work, and for queries where latency is not so important. We will only scale when the CPU for foreground tasks runs low. Tim: Does the user have to do anything special to prioritize the foreground vs background queries? Is that just part of workload prioritization? Or does it just understand the difference? Avi: We’re trying not to be too clever. It does use the existing service level mechanism. And in the service level definition, you can say whether it’s a transaction workload or a batch workload. All you need to do is run an alter service level statement to designate a particular service level as a batch workload. And once you do that, then the cluster will not scale because that service level needs more CPU. It will only scale if your real-time queries are running out of CPU. It’s pretty normal to see ScyllaDB at 100% CPU. But that 100% is split: part goes to your workload, and part goes to maintenance like compaction. You don’t want to trigger scaling just because the cluster is using idle CPU power for background work. So, we track every cycle and categorize it as either foreground work or background work, then we make decisions based on that. We don’t want it to scale out too far when that’s just not valuable.

Why Cache Data? [Latency Book Excerpt]

Latency is a monstrous concern here at ScyllaDB. So we’re pleased to bring you excerpts from Pekka Enberg’s new book on Latency…and a masterclass with Pekka as well! Latency is a monstrous concern here at ScyllaDB. Our engineers, our users, and our broader community are obsessed with it…to the point that we developed an entire conference on low latency. [Side note: That’s P99 CONF, a free + virtual conference coming to you live, October 22-23.] Join P99 CONF – Free + Virtual We’re delighted that Pekka Enberg decided to write an entire book on latency to punish himself share his hard-fought latency lessons learned. The book (quite efficiently titled “Latency”) is now off to the printers. From his intro: Latency is so important across a variety of use cases today. Still, it’s a tricky topic because many low-latency techniques are effectively developer folklore hidden in blog posts, mailing lists, and side notes in books. When faced with a latency problem, understanding what you’re even dealing with often takes a lot of time. I remember multiple occasions where I saw peculiar results from benchmarks, which resulted in an adventure down the software and hardware stack where I learned something new. By reading this book, you will understand latency better, how you can measure latency accurately, and discover the different techniques for achieving low latency. This is the book I always wished I had when grappling with latency issues. Although this book focuses on applying the techniques in practice, I will also bring up enough of the background side of things to try to balance between theory and practice. ScyllaDB is sponsoring chapters from the book, so we’ll be trickling out some excerpts on our blogs. Get the Latency book excerpt PDF More good news: Pekka is also joining us for a Latency Masterclass on September 25. Be there as he speedruns through the latency related patterns you’ll want to know when working on low latency apps. And bring your toughest latency questions! Join us live on September 25 Until then, let’s kick off our Latency book excerpts with the start of Pekka’s caching chapter. It’s reprinted here with permission of the publisher. ***   Why cache data? Typically, you should consider caching for reducing latency over other techniques if your application or system: Doesn’t need transactions or complex queries. Cannot be changed which makes using techniques such as replication hard. Has compute or storage constraints that prevent other techniques. Many applications and systems are simple enough that a key-value interface, typical for caching solutions, is more than sufficient. For example, you can store user data such as profiles and settings as a key-value pair where the key is the user ID, and the value is the user data in JSON or a similar format. Similarly, session management, where you keep track of logged in user session state is often simple enough that it doesn’t require complex queries. However, caching can eventually be too limiting as you move to more complicated use cases, such as recommendations or ad delivery. You have to look into other techniques. Overall, whether your application is simple enough to use caching is highly use case-specific. Often, you look into caching because you cannot or don’t want to change the existing system. For example, you may have a database system that you cannot change, which does not support replication, but you have clients accessing the database from multiple locations. You may then look into caching some query results to reduce latency and scale the system, which is a typical use of caching. However, this comes with various caveats on data freshness and consistency, which we’ll discuss in this chapter. Compute and storage constraints can also be a reason to use caching instead of other techniques. Depending on their implementation, colocation and replication can have high storage requirements, which may prevent you from using them. For example, suppose you want to reduce access latency to a large data set, such as a product catalog in an e-commerce site. In that case, it may be impractical to replicate the whole data set in a client with the lowest access latency. However, caching parts of the product catalog may still make sense to cache in the client to reduce latency but simultaneously live with the client’s storage constraints. Similarly, it may be impractical to replicate a whole database to a client or a service because database access requires compute capacity for query execution, which may not be there. Caching overview With caching, you can keep a temporary copy of data to reduce access time significantly by reusing the same result many times. For example, if you have a REST API that takes a long time to compute a result, you can cache the REST API results in the client to reduce latency. Accessing the cached results can be as fast as reading from the memory, which can significantly reduce latency. You can also use caching for data items that don’t exist, called negative caching. For example, maybe the REST API you use is there to look up customer information based on some filtering parameters. In some cases, no results will match the filter, but you still need to perform the expensive computation to discover that. In that scenario, you would use negative caching to cache the fact that there are no results, speeding up the search. Of course, caching has a downside, too: you trade off data freshness for reduced access latency. You also need more storage space to keep the cached data around. But in many use cases, it’s a trade-off you are willing to take. Cache storage is where you keep the temporary copies of data. Depending on the use case, cache storage can either be in the main memory or on disk, and cache storage can be accessed either in the same memory address space as the application or over a network protocol. For example, you can use an in-memory cache library value in your application memory or a key-value store such as Redis or Memcached to cache values in a remote server. With caching, an application looks up values based on a cache key from the cache. When the cache has a copy of the value, we call that a cache hit and serve the data access from the cache. However, if there is no value in the cache, we call that scenario a cache miss and must retrieve the value from the backing store. A key metric for an effective caching solution is the cache hit-to-miss ratio, which describes how often the application finds a relevant value in the cache and how frequently the cache does not have a value. If a cache has a high cache hit ratio, it is utilized well, meaning there is less need to perform a slow lookup or compute the result. With a high cache miss ratio, you are not taking advantage of the cache. This can mean that your application runs slower than without caching because caching itself has some overhead. One major complication with caches is cache eviction policies or what values to throw out from the cache. The main point of a cache is to provide fast access but also fit the cache in a limited storage space. For example, you may have a database with hundreds of gigabytes of data. Still, you can only reasonably cache tens of gigabytes in the memory address space of your application because of machine resource limitations. You, therefore, need some policy to determine which values stay in the cache and which ones you can evict if you run out of cache space. Similarly, once you cache a value, you can’t always retain the value in the cache indefinitely if the source value changes. For example, you may have a time-based eviction policy enforcing that a cached value can be at least a minute old before updating to the latest source value. Despite the challenges, caching is an effective technique to reduce latency in your application, in particular when you can’t change some parts of the system and when your use case doesn’t warrant investment in things like colocation, replication, or partitioning. With that in mind, let’s look at the different caching strategies. Caching strategies When adding caching to your application, you must first consider your caching strategy, which determines how reads and writes happen from the cache and the underlying backing store, such as a database or a service. At a high level, you need to decide if the cache is passive or active when there is a cache miss. In other words, when your application looks up a value from the cache, but the value is not there or has expired, the caching strategy mandates whether it’s your application or the cache that retrieves the value from the backing store. As usual, different caching strategies have different trade-offs on latency and complexity, so let’s get right into it. To be continued… Get the Latency book excerpt PDF Join the Latency Masterclass on September 25

ScyllaDB X Cloud: An Inside Look with Avi Kivity (Part 1)

ScyllaDB’s co-founder/CTO on the motivations and architectural shifts behind ScyllaDB X Cloud — focusing on Raft and tablets-based data distribution If you follow ScyllaDB, you’ve probably heard us talking about Raft and tablets-based data distribution for a few years now. The ultimate goal of these projects (plus a few related ones) was to optimize elasticity and price performance – especially for dynamic and storage-bound workloads. And we finally hit a nice milestone along that journey: the release of ScyllaDB X Cloud. You can read about the release in our earlier blog post. Here, we wanted to share the engineering perspective on these architectural shifts. Tim Koopmans recently sat down with Avi Kivity – ScyllaDB Co-Founder and CTO – to chat about the underlying motivation and design decisions. You can watch the complete video here. But if you prefer to read, we’re writing up the highlights. This is the first blog post in a three-part series. Why ScyllaDB X Cloud? For scaling large clusters Tim: Let’s start with a big picture. What really motivated the architectural evolution behind what we know as ScyllaDB X Cloud? Was this change inevitable? How did it come into place? Avi: It came from our experience managing clusters for our customers. With the existing architecture, things like scaling up the cluster in preparation for events like Black Friday could take a long time. Since ScyllaDB can manage very large nodes (e.g., nodes with 30TB of data), moving that data onto new nodes could take a long time, sometimes a day. Also, nodes had to be added one at a time. If you had a large cluster, scaling the cluster would be a nail-biting experience. So we decided to improve that experience and, along the way, we improved many parts of the architecture. Tim: Do you have any numbers around what it used to be like to scale a large cluster? Avi: One of our large clusters has 75 nodes, each of which has around 60TB. It’s a pretty hefty cluster. It’s nice watching clusters like that on our dashboards and seeing compactions at tens of gigabytes per second aggregate across the cluster. Those clusters are churning through large amounts of data per second and carrying a huge amount of data. Now, we can scale this amount of data in minutes, maybe an hour for the most extreme cases. So it’s quite a huge change. Why ScyllaDB addressed scaling with Tablets & Raft Tim:  When you think about dynamic or storage-bound workloads today, what are other databases getting wrong in this space? How did that lead you to this new approach, with tablets? Avi:  “Other databases” is a huge area – there are hundreds of databases. Let’s talk about our heritage. We came from the Cassandra model. And the basic problem there was the static distribution of data. The node layout determines how data is distributed, and as long as you don’t add or remove nodes, it remains static. That means you have no flexibility. Also, the focus on having availability over consistency led to no central point for managing the topology. Without a coordinating authority, you could make only one change at a time. One of the first changes that we made was to add a coordinating authority in the form of Raft. Before, we managed topology with Gossip, which really puts cluster management responsibility on the operator. We moved it to a Raft group to centralize the management. You’ve probably heard the old proverb that anything in computer science can be solved with another layer of indirection. We did that with tablets, more or less. We inserted a layer of indirection so that instead of having a static distribution of data to nodes, it goes through a translation table. Each range of rows is mapped to a different node in a tablets table. By manipulating the tablets table, we can redirect small packages of data (specifically, 5GB – that’s pretty small for us). We can redirect the granularity of 5GB to any node and any CPU on any node. We can move those packages around at will, and those packages are moved at the line rate, so it’s no problem to fire them away at gigabits per second across the cluster. And that gives us the ability to rebalance data on a cluster or add and remove nodes very quickly. Tim:  So tablets are really a new ScyllaDB abstraction? Is it an abstraction that breaks those tables into independently managed units? And I think you said the size is 5GB – is that configurable? Avi:  It’s configurable, but I don’t recommend playing with it. Normally, you stay between 2.6GB and 10GB. When it reaches 10GB, it triggers a split, which will bring it back to 5GB. So each tablet will be split into two. If it goes down to 2.5GB, it will trigger a merge, merging two tablets into one larger tablet – again, bringing it back to 5GB. Tim: So ensuring that things can be dynamically split…We can move data around, rebalance across the cluster…That gives us finer-grained load distribution as well as better scalability and perhaps a bit of separation between compute and storage, right? Because we’re not necessarily tied to the size of the compute node anymore. We can have different instance types in a cluster now, as an indirect result of this change. The tipping point Tim: Avi, you said that re-architecting around tablets has been a huge shift. So what was the tipping point? Was it just that vNodes didn’t work anymore in terms of how you organize data? What was your aha moment where you said, “Yeah, I think we need to do something different here”? Avi:  It was a combination of things, and since this was such a major change, we needed a lot of motivation to do it. One part of it was the inability to perform topology changes that involve more than one node at a time. Another part was that the previous streaming mechanism was very slow. Yet another part is that, because the streaming mechanism was so slow, we had to scale well in advance of exhausting the storage on the node. That required us to leave a lot of free space on the node, and that’s wasteful. We took all of this into consideration, and that was enough motivation for us to take on a multi-year change. I think it was well worth it. Tim:  Multiyear…So how long ago did you start workshopping different ideas to solve? Avi:  The first phase was changing topology to be strongly consistent and having a central authority to coordinate it. I think it took around a couple of years to switch to Raft topology. Before that, we switched schema management to use Raft as well. That was a separate problem, but since those two problems had the same solution, we jumped on it. We’re still not completely done. There are still a few features that are not yet fully compatible with tablets – but we see the light at the end of the tunnel now. [Stay tuned for parts 2 and 2]  

Be Part of Something Big – Speak at Monster Scale Summit

Share your “extreme scale engineering” expertise with ~20K like-minded engineers Whether you’re designing, implementing, or optimizing systems that are pushed to their limits, we’d love to hear about your most impressive achievements and lessons learned – at Monster Scale Summit 2026. Become a Monster Scale Summit Speaker What’s Monster Scale Summit? Monster Scale Summit is a technical conference that connects the community of people working on performance-sensitive data-intensive applications. Engineers, architects, and SREs from gamechangers around the globe will be gathering virtually to explore “monster scale” challenges with respect to extreme levels of throughput, data, and global distribution. It’s a lot like P99 CONF (also hosted by ScyllaDB) – a two-day event that’s free, fully virtual, and highly interactive. The core difference is that it’s focused on extreme scale engineering vs. all things performance. Last time, we hosted industry giants like Kelsey Hightower, Martin Kleppmann, Discord, Slack, CanvaBrowse past sessions Details please! When: March 11 + 12 Where: Wherever you’d like! It’s intentionally virtual, so you can present and interact with attendees from anywhere around the world. Topics: Core topics include distributed databases, streaming and real-time processing, intriguing system designs, methods for balancing latency/concurrency/throughput, SRE techniques proven at scale, and infrastructure built for unprecedented demands. What we’re looking for: We welcome a broad range of talks about tackling the challenges that arise in the most massive, demanding environments. The conference prioritizes technical talks sharing first-hand experiences. Sessions are just 18-20 minutes – so consider this your TED Talk debut! Share your ideas