“Never write a database. Even if you want to,
even if you think you should. Resist. Never write a database.
Unless you have to write a database. But you don’t.” – Charity
Majors
But someone has to write the databases that others rely
on. Hearing about the engineering challenges they’re tackling is
both fascinating and Schadenfreude-invoking – so perfect tech
conference material. 😉 Since database performance is so near and
dear to ScyllaDB, we reached out to our friends and colleagues
across the community to that ensure a nice range of distributed
data systems, approaches, and challenges would be represented at
P99 CONF 2025. As you can see
from our agenda, the
response was overwhelming. A quick PSA for the uninitiated: P99
CONF is a free 2-day community event that’s intentionally virtual,
highly interactive, and purely technical. It’s an immersion into
all things performance. Distributed systems, database internals,
Rust, C++, Java, Go, Wasm, Zig, Linux kernel, tracing, AI/ML & more
– it’s all on the agenda.This year, you can look forward
to first-hand engineering experiences from the likes of Pinterest,
Clickhouse, Gemini, Arm, Rivian and VW Group Technology, Meta,
Wayfair, Disney, NVIDIA, Turso, Neon, TigerBeetle, ScyllaDB, and
too many others to list here. Here’s a sneak peek of the
database internals talks you can look forward to at P99 CONF 2025…
Join us at P99 CONF (free +
virtual) Clickhouse’s C++ and Rust Journey Alexey
Milovidov, Co-founder and CTO at Clickhouse Full rewrite
from C++ to Rust or gradual integration with Rust libraries? For a
large C++ codebase, only the latter works, but even then, there are
many complications and rough edges. In my presentation, I will
describe our experience integrating Rust and C++ code and some
weird and unusual problems we had to overcome. Rethinking Durable
Workflows and Queues: A Library-based Approach Qian Li,
Co-founder at DBOS, Inc Durable workflow engines
checkpoint program state to persistent storage (like a database) so
that execution can always recover from where it left off. Most
systems today rely on external orchestration: a centralized
orchestrator and distributed workers communicating via
message-passing. While this model is well-established, it’s often
heavyweight, introducing substantial overhead, write amplification,
and operational complexity. In this talk, we explore an
alternative: a lightweight library-based durable workflow engine
that embeds into application code and checkpoint state directly to
the database. It handles queues and flow control through the
database itself. This approach eliminates the need for a separate
orchestrator, reduces network traffic, and improves performance by
avoiding unnecessary writes. We’ll share our experience building
DBOS, a library-based engine designed for simplicity and
efficiency. We’ll discuss the architectural trade-offs, challenges
in failure recovery, and key optimizations for scalability and
maintainability. The Gory Details of a Full-Featured Userspace CPU
Scheduler Avi Kivity, Co-founder and CTO at
ScyllaDB Userspace CPU schedulers, which often accompany
asynchronous I/O engines like io_uring and Linux AIO, are usually
simplistic run-to-completion FIFO loops. This suffices for I/O
bound applications, but for use cases that can be both CPU bound
and I/O bound, this is not enough. Avi Kivity, CTO of ScyllaDB and
co-maintainer of Seastar, will cover the design and implementation
of the Seastar userspace CPU scheduler, which caters to more
complex applications that require preemption and prioritization.
The Tale of Taming TigerBeetle’s Tail Latency Tobias
Ziegler, Software Engineer at Tigerbeetle In this talk, we
dive into how we reduced TigerBeetle’s tail latency through
algorithm engineering. ‘Algorithm engineering goes beyond studying
theoretical complexity and considers how algorithms are executed
efficiently on modern super-scalar CPUs. Specifically, we will look
at Radix Sort and a k-way merge and explore how to implement them
efficiently. We then demonstrate how we apply these algorithms
incrementally to avoid latency spikes in practice. Why We’re
Rewriting SQLite in Rust Glauber Costa, Co-founder and CEO
at Turso Over two years ago, we forked SQLite. We were
huge fans of the embedded nature of SQLite, but wanted a more open
model of development…and libSQL was born as an Open Contribution
project. Last year, as we were adding Vector Search to SQLite, we
had a crazy idea. What could we achieve if we were to completely
rewrite SQLite in Rust? This talk explains what drove us down this
path, how we’re using deterministic simulation testing to ensure
the reliability of the Rust rewrite, and the lessons learned (so
far). I will show how a reimagining of this iconic database can
lead to performance improvements of over 500x in some cases by
looking at what powers it under the hood. Shared Nothing Databases
at Scale Nick Van Wiggeren, CTO at PlanetScale
This talk will discuss how PlanetScale scales databases in the
cloud, focusing on a shared-nothing architecture that is built
around expecting failure. Nick will go into how they built
low-latency high-throughput systems that span multiple nodes,
availability zones, and regions, while maintaining sub-millisecond
response times. This starts at the storage layer and builds all the
way up to micro-optimizing the load balancer, with a lot of
learning at every step of the way. Reworking the Neon IO stack:
Rust+tokio+io_uring+O_DIRECT Christian Schwarz,
Member of Technical Staff at Databricks Neon is a
serverless Postgres platform. Recently acquired by Databricks, the
same technology now also powers Databricks Lakebase. In this talk,
we will dive into Pageserver, the multi-tenant storage service at
the heart of the architecture. We share techniques and lessons
learned from reworking its IO stack to a fully asynchronous model,
with direct IO against local NVMe drives; all during a period of
rapid growth. Pageserver is implemented in Rust, we use the tokio
async runtime for networking, and integrate it with io_uring for
filesystem access. A Deep Dive into the Seastar Event Loop
Pavel Emelyanov, Principal Software Engineer at
ScyllaDB The core and the basis of ScyllaDB’s outstanding
performance is the Seastar framework, and the core and the basis of
seastar is its event loop. In this presentation, we’ll see what the
loop does in great detail, analyze the limitations that it runs in
and all the consequences that follow those limitations. We’ll also
learn how the loop is observed by the user and various means to
understand its behavior. Cost Effective, Low Latency Vector Search
In Databases: A Case Study with Azure Cosmos DB Magdalen
Manohar, Senior Researcher at Microsoft We’ve integrated
DiskANN, a state-of-the-art vector indexing algorithm, into Azure
Cosmos DB NoSQL, a state-of-the-art cloud-native operational
database. Learn how we overcame the systems and algorithmic
challenges of this integration to achieve <20ms query latency at
the 10 million scale, while supporting scale-out to billions of
vectors via automatic partitioning. Measuring Query Latency the
Hard Way: An Adventure in Impractical Postgres Monitoring
Simon Notley, Observability and Optimization at
EnterpriseDB Sampling the session state (as exposed by
pg_stat_activity) is a surprisingly powerful way to understand how
your Postgres instance spends its time. It is something I can
wholeheartedly recommend to any Postgres DBA that needs a
lightweight way to monitor query performance in production.
However, it’s a terrible way to measure query latency, fraught with
complexity and weird statistical biases that could be avoided by
simply using an extension built for the job, or even log analysis.
But pursuing terrible ideas can be fun, so in this talk, I dive
into my adventures in measuring query latency from session
sampling, generate some extremely funky charts, and end up
unexpectedly performing a vector similarity search. In this talk
I’ll show how instead of attempting to correct the biases that
plague estimates of query latency based time-domain sampling, we
can instead pre-calculate the distribution of (biased) estimates
based on a range of true distributions and use vector search to
compare our observed distribution to these pre-calculate ones,
thereby inferring the true query latency. This ‘eccentric’ method
is actually surprisingly effective, and surprisingly fun. Fast and
Deterministic Full Table Scans at Scale Felipe Cardeneti
Mendes, Technical Director at ScyllaDB ScyllaDB’s new
tablet replication algorithm replaces static vNodes with dynamic,
elastic data distribution that adapts to shifting workloads. This
talk discusses how tablets enable fast, predictable full table
scans by keeping operations shard-local, balancing load
automatically, and scaling linearly through a simple layer of
indirection. Optimizing Tiered Storage for Low-Latency Real-Time
Analytics Neha Pawar, Founding Engineer and Head of Data at
StarTree Real-time OLAP databases usually trade
performance for cost when moving from local storage to cloud object
storage. This talk shows how we extended Apache Pinot to use cloud
storage while still achieving sub-second P99 latencies. We’ll cover
the abstraction that makes Pinot location-agnostic, strategies like
pipelining, prefetching, and selective block fetches, and how to
balance local and cloud storage for both cost efficiency and speed.
As Fast as Possible, But Not Faster: ScyllaDB Flow Control
Nadav Har’El, Distinguished Engineer at
ScyllaDB Pushing requests faster than a system can
handle results in rapidly growing queues. If unchecked, it risks
depleting memory and system stability. This talk discusses how we
engineered ScyllaDB’s flow control for high volume ingestions,
allowing it to throttle over-eager clients to exactly the right
pace – not so fast that we run out of memory, but also not so slow
that we let available resources go to waste. Push the Database
Beyond the Edge Nikita Sivukhin, Software Engineer at
Turso Almost any application can benefit from having data
available locally – enabling blazing-fast access and optimized
write patterns. This talk will walk you through one approach to
designing a full-featured sync engine, applicable across a wide
range of domains, including front-end, back-end, and machine
learning training. Engineering a Low-Latency Vector Search Engine
for ScyllaDB Pawel Pery, Senior Software Engineer at
ScyllaDB Implementing Vector Search in ScyllaDB brings
challenges from low-latency to predictable performance at scale.
Rather than embedding HNSW indexing directly into the core
database, we decoupled vector indexing and similarity search into a
dedicated Rust engine. Learn about the architectural design
decisions that enabled us to combine and integrate ScyllaDB’s
shard-per-core for real-time operations and high-performance ANN
processing via USearch. We Told B+ Trees to Do Sorted Sets—They
Nailed It (Joe Zhou, Dragonfly) Joe Zhou, Developer
Advocate at DragonflyDB Sorted sets are a critical Redis
data type used for leaderboards, time-series data, and priority
queues. However, Redis’s skiplist-based implementation introduces
significant memory overhead—averaging 37 bytes per entry on top of
the essential 16 bytes for the (member, score) pair. For large
sorted sets, this inefficiency can become a major bottleneck. In
this talk, we’ll explore how Dragonfly reimplemented sorted sets
using a B+ tree, reducing memory overhead to just 2-3 bytes per
entry while improving performance. We’ll cover: Why skiplists are
inefficient for large sorted sets. How B+ trees with bucketing
drastically cut memory usage while maintaining O(log N) operations.
Benchmark results showing 40% lower memory and better throughput
vs. Redis. This optimization, now stable in Dragonfly, demonstrates
how rethinking core data structures can unlock major efficiency
gains. Attendees will leave with insights into: Trade-offs between
skiplists and B+ trees. Real-world impact on memory and latency
(P99 improvements). Lessons from implementing a custom ranking API
for B+ trees. Keynote: Andy Pavlo You can also look forward to a
keynote by Andy Pavlo. We’re not revealing the topic yet, but
if you know Andy, you know you won’t want to miss it. Join us at P99 CONF (free + virtual)
Netflix operates at a massive scale, serving hundreds of
millions of users with diverse content and features. Behind the
scenes, ensuring data consistency, reliability, and efficient
operations across various services presents a continuous challenge.
At the heart of many critical functions lies the concept of a
Write-Ahead Log (WAL) abstraction. At Netflix scale, every
challenge gets amplified. Some of the key challenges we encountered
include:
Accidental data loss and data corruption in databases
System entropy across different datastores (e.g., writing to
Cassandra and Elasticsearch)
Handling updates to multiple partitions (e.g., building
secondary indices on top of a NoSQL database)
Data replication (in-region and across regions)
Reliable retry mechanisms for real time data pipeline
at scale
Bulk deletes to database causing OOM on the Key-Value
nodes
All the above challenges either resulted in production incidents
or outages, consumed significant engineering resources, or led to
bespoke solutions and technical debt. During one particular
incident, a developer issued an ALTER TABLE command that led to
data corruption. Fortunately, the data was fronted by a cache, so
the ability to extend cache TTL quickly together with the app
writing the mutations to Kafka allowed us to recover. Absent the
resilience features on the application, there would have been
permanent data loss. As the data platform team, we needed to
provide resilience and guarantees to protect not just this
application, but all the critical applications we have
at Netflix.
Regarding the retry mechanisms for real time data pipelines,
Netflix operates at a massive scale where failures (network errors,
downstream service outages, etc.) are inevitable. We needed a
reliable and scalable way to retry failed messages, without
sacrificing throughput.
With these problems in mind, we decided to build a system that
would solve all the aforementioned issues and continue to serve the
future needs of Netflix in the online data platform space. Our
Write-Ahead Log (WAL) is a distributed system that captures data
changes, provides strong durability guarantees, and reliably
delivers these changes to downstream consumers. This blog post
dives into how Netflix is building a generic WAL solution to
address common data challenges, enhance developer efficiency, and
power high-leverage capabilities like secondary indices, enable
cross-region replication for non-replicated storage engines, and
support widely used patterns like delayed queues.
API
Our API is intentionally simple, exposing just the essential
parameters. WAL has one main API endpoint, WriteToLog,
abstracting away the internal implementation and ensuring that
users can onboard easily.
/** * WAL request message * namespace: Identifier for a particular WAL * lifecycle: How much delay to set and original write time * payload: Payload of the message * target: Details of where to send the payload */ message WriteToLogRequest { string namespace = 1; Lifecycle lifecycle = 2; bytes payload = 3; Target target = 4; }
A namespace defines where and how data is stored,
providing logical separation while abstracting the underlying
storage systems. Each namespace can be configured to use
different queues: Kafka, SQS, or combinations of multiple.
Namespace also serves as a central configuration of
settings, such as backoff multiplier or maximum number of retry
attempts, and more. This flexibility allows our Data Platform to
route different use cases to the most suitable storage system based
on performance, durability, and consistency needs.
WAL can assume different personas depending on the
namespace configuration.
Persona #1 (Delayed Queues)
In the example configuration below, the Product Data Systems
(PDS) namespace uses SQS as the underlying message queue,
enabling delayed messages. PDS uses Kafka extensively, and failures
(network errors, downstream service outages, etc.) are inevitable.
We needed a reliable and scalable way to retry failed messages,
without sacrificing throughput. That’s when PDS started leveraging
WAL for delayed messages.
Below is the namespace configuration for cross-region
replication of
EVCache using WAL, which replicates messages from a source
region to multiple destinations. It uses Kafka under
the hood.
Below is the namespace configuration for supporting
mutateItems API in
Key-Value, where multiple write requests can go to different
partitions and have to be eventually consistent. A key detail in
the below configuration is the presence of Kafka and
durable_storage. These data stores are required to facilitate two
phase commit semantics, which we will discuss in
detail below.
An important note is that requests to WAL support at-least once
semantics due to the underlying implementation.
Under the Hood
The core architecture consists of several key components working
together.
Message Producer and Message Consumer
separation: The message producer receives incoming
messages from client applications and adds them into the queue,
while the message consumer processes messages from the queue and
sends them to the targets. Because of this separation, other
systems can bring their own pluggable producers or consumers,
depending on their use cases. WAL’s control plane allows for a
pluggable model, which, depending on the use-case, allows us to
switch between different message queues.
SQS and Kafka with a dead letter queue by
default: Every WAL namespace has its own message
queue and gets a dead letter queue (DLQ) by default, because there
can be transient errors and hard errors. Application teams using
Key-Value abstraction simply need to toggle a flag to enable
WAL and get all this functionality without needing to understand
the underlying complexity.
Kafka-backed namespaces: handle standard
message processing
SQS-backed namespaces: support delayed queue
semantics (we added custom logic to go beyond the standard defaults
enforced in terms of delay, size limits, etc)
Complex multi-partition scenarios: use queues
and durable storage
Target Flexibility: The messages added to WAL
are pushed to the target datastores. Targets can be Cassandra
databases, Memcached caches, Kafka queues, or upstream
applications. Users can specify the target via namespace
configuration and in the API itself.
Architecture of WAL
Deployment Model
WAL is deployed using the
Data Gateway infrastructure. This means that WAL deployments
automatically come with mTLS, connection management,
authentication, runtime and deployment configurations out of
the box.
Each data gateway abstraction (including WAL) is deployed as a
shard. A shard is a physical concept describing a
group of hardware instances. Each use case of WAL is usually
deployed as a separate shard. For example, the Ads Events
service will send requests to WAL shard A, while the
Gaming Catalog service will send requests to WAL shard B,
allowing for separation of concerns and avoiding noisy neighbour
problems.
Each shard of WAL can have multiple
namespaces. A namespace is a logical concept
describing a configuration. Each request to WAL has to specify its
namespace so that WAL can apply the correct configuration
to the request. Each namespace has its own configuration
of queues to ensure isolation per use case. If the underlying queue
of a WAL namespace becomes the bottleneck of throughput,
the operators can choose to add more queues on the fly by modifying
the namespace configurations. The concept of
shards and namespaces is shared across all Data
Gateway Abstractions, including
Key-Value,
Counter,
Timeseries, etc. The namespace configurations are
stored in a globally replicated Relational SQL database to ensure
availability and consistency.
Deployment model of WAL
Based on certain CPU and network thresholds, the Producer group
and the Consumer group of each shard will (separately)
automatically scale up the number of instances to ensure the
service has low latency, high throughput and high availability.
WAL, along with other abstractions, also uses the
Netflix adaptive load shedding libraries and Envoy to
automatically shed requests beyond a certain limit. WAL can be
deployed to multiple regions, so each region will deploy its own
group of instances.
Solving different flavors of problems with no change to the
core architecture
The WAL addresses multiple data reliability challenges with no
changes to the core architecture:
Data Loss Prevention: In case of database
downtime, WAL can continue to hold the incoming mutations. When the
database becomes available again, replay mutations back to the
database. The tradeoff is eventual consistency rather than
immediate consistency, and no data loss.
Generic Data Replication: For systems like
EVCache (using Memcached) and RocksDB that do not support
replication by default, WAL provides systematic replication (both
in-region and across-region). The target can be another
application, another WAL, or another queue — it’s completely
pluggable through configuration.
System Entropy and Multi-Partition Solutions:
Whether dealing with writes across two databases (like Cassandra
and Elasticsearch) or mutations across multiple partitions in one
database, the solution is the same — write to WAL first, then let
the WAL consumer handle the mutations. No more asynchronous repairs
needed; WAL handles retries and backoff automatically.
Data Corruption Recovery: In case of DB
corruptions, restore to the last known good backup, then replay
mutations from WAL omitting the offending write/mutation.
There are some major differences between using WAL and directly
using Kafka/SQS. WAL is an abstraction on the underlying queues, so
the underlying technology can be swapped out depending on use cases
with no code changes. WAL emphasizes an easy yet effective API that
saves users from complicated setups and configurations. We leverage
the control plane to pivot technologies behind WAL when needed
without app or client intervention.
WAL usage at Netflix
Delay Queue
The most common use case for WAL is as a Delay Queue. If an
application is interested in sending a request at a certain time in
the future, it can offload its requests to WAL, which guarantees
that their requests will land after the specified delay.
Netflix’s Live Origin processes and delivers Netflix live stream
video chunks, storing its video data in a Key-Value
abstraction backed by Cassandra and EVCache. When Live
Origin decides to delete certain video data after an event is
completed, it issues delete requests to the Key-Value abstraction.
However, the large amount of delete requests in a short burst
interfere with the more important real-time read/write requests,
causing performance issues in Cassandra and timeouts for the
incoming live traffic. To get around this, Key-Value issues the
delete requests to WAL first, with a random delay and jitter set
for each delete request. WAL, after the delay, sends the delete
requests back to Key-Value. Since the deletes are now a flatter
curve of requests over time, Key-Value is then able to send the
requests to the datastore with no issues.
Requests being spread out over time through
delayed requests
Additionally, WAL is used by many services that utilize Kafka to
stream events, including Ads, Gaming, Product Data Systems, etc.
Whenever Kafka requests fail for any reason, the client apps will
send WAL a request to retry the kafka request with a delay. This
abstracts away the backoff and retry layer of Kafka for many teams,
increasing developer efficiency.
Backoff and delayed retries for clients producing
to KafkaBackoff and delayed retries for clients consuming
from Kafka
Cross-Region Replication
WAL is also used for global cross-region replication. The
architecture of WAL is generic and allows any
datastore/applications to onboard for cross-region replication.
Currently, the largest use case is
EVCache, and we are working to onboard other
storage engines.
EVCache is deployed by clusters of Memcached instances across
multiple regions, where each cluster in each region shares the same
data. Each region’s client apps will write, read, or delete data
from the EVCache cluster of the same region. To ensure global
consistency, the EVCache client of one region will replicate write
and delete requests to all other regions. To implement this, the
EVCache client that originated the request will send the request to
a WAL corresponding to the EVCache cluster and region.
Since the EVCache client acts as the message producer group in
this case, WAL only needs to deploy the message consumer groups.
From there, the multiple message consumers are set up to each
target region. They will read from the Kafka topic, and send the
replicated write or delete requests to a Writer group in their
target region. The Writer group will then go ahead and replicate
the request to the EVCache server in the same region.
EVCache Global Cross-Region Replication Implemented
through WAL
The biggest benefits of this approach, compared to our legacy
architecture, is being able to migrate from multi-tenant
architecture to single tenant architecture for the most latency
sensitive applications. For example, Live Origin will have its own
dedicated Message Consumer and Writer groups, while a less latency
sensitive service can be multi-tenant. This helps us reduce the
blast radius of the issues and also prevents noisy
neighbor issues.
Multi-Table Mutations
WAL is used by
Key-Value service to build the MutateItems API. WAL enables the
API’s multi-table and multi-id mutations by implementing 2-phase
commit semantics under the hood. For this discussion, we can assume
that Key-Value service is backed by Cassandra, and each of its
namespaces represents a certain table in a Cassandra
DB.
When a Key-Value client issues a MutateItems request to
Key-Value server, the request can contain multiple PutItems or
DeleteItems requests. Each of those requests can go to different
ids and namespaces, or Cassandra tables.
The MutateItems request operates on an eventually consistent
model. When the Key-Value server returns a success response, it
guarantees that every operation within the MutateItemsRequest will
eventually complete successfully. Individual put or delete
operations may be partitioned into smaller chunks based on request
size, meaning a single operation could spawn multiple chunk
requests that must be processed in a specific sequence.
Two approaches exist to ensure Key-Value client requests achieve
success. The synchronous approach involves client-side retries
until all mutations complete. However, this method introduces
significant challenges; datastores might not natively support
transactions and provide no guarantees about the entire request
succeeding. Additionally, when more than one replica set is
involved in a request, latency occurs in unexpected ways, and the
entire request chain must be retried. Also, partial failures in
synchronous processing can leave the database in an inconsistent
state if some mutations succeed while others fail, requiring
complex rollback mechanisms or leaving data integrity compromised.
The asynchronous approach was ultimately adopted to address these
performance and consistency concerns.
Given Key-Value’s stateless architecture, the service cannot
maintain the mutation success state or guarantee order internally.
Instead, it leverages a Write-Ahead Log (WAL) to guarantee mutation
completion. For each MutateItems request, Key-Value forwards
individual put or delete operations to WAL as they arrive, with
each operation tagged with a sequence number to preserve ordering.
After transmitting all mutations, Key-Value sends a completion
marker indicating the full request has been submitted.
The WAL producer receives these messages and persists the
content, state, and ordering information to a durable storage. The
message producer then forwards only the completion marker to the
message queue. The message consumer retrieves these markers from
the queue and reconstructs the complete mutation set by reading the
stored state and content data, ordering operations according to
their designated sequence. Failed mutations trigger re-queuing of
the completion marker for subsequent retry attempts.
Architecture of Multi-Table Mutations
through WALSequence diagram for Multi-Table Mutations
through WAL
Closing Thoughts
Building Netflix’s generic Write-Ahead Log system has taught us
several key lessons that guided our design decisions:
Pluggable Architecture is Core: The ability to
support different targets, whether databases, caches, queues, or
upstream applications, through configuration rather than code
changes has been fundamental to WAL’s success across diverse
use cases.
Leverage Existing Building Blocks: We had
control plane infrastructure, Key-Value abstractions, and other
components already in place. Building on top of these existing
abstractions allowed us to focus on the unique challenges WAL
needed to solve.
Separation of Concerns Enables Scale: By
separating message processing from consumption and allowing
independent scaling of each component, we can handle traffic surges
and failures more gracefully.
Systems Fail — Consider Tradeoffs Carefully:
WAL itself has failure modes, including traffic surges, slow
consumers, and non-transient errors. We use abstractions and
operational strategies like data partitioning and backpressure
signals to handle these, but the tradeoffs must be understood.
Future work
We are planning to add secondary indices in Key-Value service
leveraging WAL.
WAL can also be used by a service to guarantee sending requests
to multiple datastores. For example, a database and a backup, or a
database and a queue at the same time etc.
Acknowledgements
Launching WAL was a collaborative effort involving multiple
teams at Netflix, and we are grateful to everyone who contributed
to making this idea a reality. We would like to thank the following
teams for their roles in this launch.
Caching team — Additional thanks to Shih-Hao Yeh,
Akashdeep
Goel for contributing to cross region replication for KV,
EVCache etc. and owning this service.
Product Data System team — Carlos Matias
Herrero, Brandon
Bremen for contributing to the delay queue design and being
early adopters of WAL giving valuable feedback.
KeyValue and Composite abstractions team — Raj Ummadisetty for
feedback on API design and mutateItems design discussions. Rajiv Shringi for
feedback on API design.
Kafka and Real Time Data Infrastructure teams — Nick Mahilani for
feedback and inputs on integrating the WAL client into Kafka
client. Sundaram
Ananthanarayan for design discussions around the possibility of
leveraging Flink for some of the WAL use cases.
Joseph Lynch for
providing strategic direction and organizational support for
this project.
ScyllaDB’s co-founder/CTO discusses decisions to increase
efficiency for storage-bound workloads and allow deployment on
mixed size clusters To get the engineering perspective on
the recent shifts to ScyllaDB’s architecture, Tim Koopmans recently
caught up with ScyllaDB Co-Founder and CTO Avi Kivity. In
part 1 of this 3-part series, they talked about the motivations
and architectural shifts behind ScyllaDB X Cloud, particularly with
respect to Raft and tablets-based data distribution. In
part 2, they went deeper into how tablets work, then looked at
the design of ScyllaDB X Cloud’s autoscaling. In this final part of
the series, they discuss changes that increase efficiency for
storage-bound workloads and allow deployment on mixed size
clusters. You can watch the
complete video here. Storage-bound workloads and compression
Tim: Let’s switch gears and talk about
compression. This was a way to double-down on storage-bound
workloads, right? Would you say storage-bound workloads are more
common than CPU bound ones? Is that what’s driving this?
Avi: Yes, and there’s two reasons for that.
One reason is that our CPU efficiency is quite high. If you’re
CPU-efficient, then storage is going to dominate. And the other
reason is that when your database grows – say it’s twice as large
as before – it’s rare that you actually have twice the amount of
work. It can happen. But for many workloads, the growth of data is
mostly historical, so the number of ops doesn’t scale linearly with
the size of the database. As the database grows, the ratio of ops
to storage decreases, and it becomes storage-bound. So, many of our
larger workloads are storage-bound. The small and medium ones can
be either storage-bound or CPU-bound…it really depends on the
workload. We have some workloads where most of the storage in the
cluster isn’t used because they’re so CPU-intensive. And we have
others where the CPU is mostly idle, but the cluster is holding a
lot of storage. We try to cater to all of these workloads.
Tim: So a storage-bound workload is likely to have
lower CPU utilization in general, and that gives you more CPU
bandwidth to do things like more advanced compression? What’s the
default compression, in terms of planning for storage? Is it like
50%? Or what’s the typical rate? Or is the real answer just “it
depends”? Avi: “It depends” is an easy escape, but
the truth is there’s a wide variety of storage options now. A
recent addition is dictionary-based compression. That’s where the
cluster periodically samples data on disk and constructs a
dictionary from those samples. That dictionary is then used to
boost compression. Everyone probably knows dictionary compression:
it finds repetitive byte sequences in the data and matches against
them. By having samples, you can match against the samples and gain
higher compression. We recently started rolling it out, and it does
give a nice improvement. Of course, it varies widely. Some people
store data that’s already compressed, so it won’t compress further.
Others store data like JSON, which compresses very well. In those
cases, we might see above 50% compression ratios. And for many
storage-bound workloads, you can set the compression parameters
higher and gain more compression at the expense of CPU…but it’s CPU
that you already have. Tim: Is there anything else
on the compression roadmap, like column aware compression?
Avi: It’s not on the roadmap yet, but we
will do columnar storage for time series and data. But there’s no
timeline for that yet. Tim: Any hardware
accelerated stuff? Avi: We looked at hardware
acceleration, but it’s too rare to really matter. One problem is
that on the cloud, it’s only available with the very largest
instance sizes. And while we do have clusters with large instance
sizes, it’s not enough to justify the work. I’m talking about
machines with 96 vCPUs and 60TB of storage per node. It would only
make sense for the very largest clusters, the petabyte-class
clusters. They do exist, but they’re not yet common enough to make
it worth the effort. On smaller instances, the accelerators are
just hidden by virtualization. The other problem with
hardware-accelerated compression is that it doesn’t keep up with
the advances in software compression. That’s a general problem with
hardware. For example, dictionary compression isn’t supported by
those accelerators, but dictionary compression is very useful. We
wouldn’t want to give that up. Tim: Yeah, it
seems like unless there’s a very specific, almost niche need for
it, it’s safer to stick with software-based compression. Mixed size
types & CPU: Storage ratios Tim: And in a
roundabout way, this brings me back to the last thing I wanted to
ask about. I think we’ve already touched on it: the idea of 90%
storage utilization. You’ve already mentioned reasons why,
including tablets. And we also spoke about having mixed instance
types in the cluster. That’s quite significant for this release,
right? Avi: Yes, it’s quite important. Assume you
have those large instances with 96 vCPUs and 60TB of storage per
node… and your data grows. It’s not doubling, just incremental
growth. If you have a large amount of data, the rate of growth
won’t be very large. So, you want to add a smaller amount of
storage each time, not 60TB. That gives you two options. One option
is to compose your cluster from a large number of very small
instances. But large clusters introduce management problems. The
odds of a node failing grow as the cluster grows, so you want to
keep clusters at a manageable size. The other option is to have
mixed-size clusters. For example, if you have clusters of 60TB
nodes, then you might add a 6TB node. As the data grows, you can
then replace those smaller nodes with larger ones, until you’re
back to having a cluster that’s full of the largest node size.
There’s another reason for mixed-size clusters: changing the
CPU-to-storage ratio. Typically, storage bound clusters use nodes
with a large disk-to-CPU ratio – a lot of disk and relatively
little CPU. But there might be times across a day or throughout the
year where the number of OPS increases without a corresponding
increase in storage. For example, think about Black Friday or
workloads spiking in certain geographies. In those cases, you might
switch from nodes with a low CPU-to-disk ratio to ones with a high
CPU-to-disk ratio, then switch back later. That way, you keep total
storage constant, but increase the amount of CPU serving that
storage. It lets you adapt to changing CPU requirements without
having to buy more storage. Tim: Got it. So it’s
really about averaging out the ratios to get the price–performance
balance you want between storage and CPU. Is that something the
user has to figure out, or does it fall under the autoscaler?
Avi: It will be automatic. It’s too much to
ask a user to track the right mix of instances and keep managing
that. Looking back and looking forward Tim:
Looking back, and a little forward…if you could go back to 2014,
when you first came up with ScyllaDB, would you tell your past self
to do anything different? Or do you think it’s evolved naturally?
Would you save yourself some pain? Avi:
Yeah. So, when you start a project, it always looks simple and you
think you know everything. Then you discover how much you didn’t
know. I don’t even know what my 2014 self would say about how much
I mispredicted the amount of work that would be necessary to do
this. I mean, I knew databases were hard – one of the most complex
areas in software engineering – but I didn’t know how hard.
Tim: And what about looking forward?What’s the
next big thing on the horizon that people aren’t really talking
about yet? Avi: I want to fully complete the
tablets project before we talk about the next step.
Tim: Just one last question from me before
we wrap. Aside from the correct pronunciation of ScyllaDB, what’s
the most misunderstood part of ScyllaDB’s new architecture? What
are people getting wrong? Avi: I don’t think
people are getting it wrong. It’s not that complicated. It’s
another layer of indirection, and people do understand that. We
have some nice visualizations of that as well. Maybe we should have
a session showing how tablets move around, because it’s a little
like Tetris – how we fit different tablets to fill the nodes. So
I think tablets are easily understood. It’s complex to implement,
but not complicated to understand.
How to choose between cache-aside, read-through,
write-through, client-side, and distributed caching
strategies As we mentioned in the recent
Why Cache Data? post, we’re delighted that Pekka Enberg
decided to write an
entire book on latency and we’re proud to sponsor 3 chapters
from it. Get
the Latency book excerpt PDF Also, Pekka will be sharing key
takeaways from that book in a (free + virtual) Latency
Masterclass on September 25. Be there as he speedruns through the
latency-related patterns you’ll want to know when working on
low-latency apps. And bring your toughest latency questions!
Join
us live on September 25 (free + virtual) Until then, let’s
continue our Latency book excerpts with moe from Pekka’s
caching chapter. It’s reprinted here with permission of the
publisher. *** When adding caching to your application, you must
first consider your caching strategy, which determines how reads
and writes happen from the cache and the underlying backing store,
such as a database or a service. At a high level, you need to
decide if the cache is passive or active when there is a cache
miss. In other words, when your application looks up a value from
the cache, but the value is not there or has expired, the caching
strategy mandates whether it’s your application or the cache that
retrieves the value from the backing store. As usual, different
caching strategies have different trade-offs on latency and
complexity, so let’s get right into it. Cache-Aside Caching
Cache-aside caching is perhaps the most typical caching strategy
you will encounter. When there is a cache hit, data access latency
is dominated by communication latency, which is typically small, as
you can get a cache close by on a cache server or even in your
application memory space. However, when there is a cache miss, with
cache-aside caching, the cache is a passive store updated by the
application. That is, the cache just reports a miss and the
application is responsible for fetching data from the backing store
and updating the cache. Figure 1 shows an example of cache-aside
caching in action. An application looks up a value from a cache by
a caching key, which determines the data the application is
interested in. If the key exists in the cache, the cache returns
the value associated with the key, which the application can use.
However, if the key does not exist or is expired in the cache, we
have a cache miss, which the application has to handle. The
application queries the value from the backing store and stores the
value in the cache. Suppose you are caching user information and
using the user ID as the lookup key. In that case, the application
performs a query by the user ID to read user information from the
database. The user information returned from the database is then
transformed into a format you can store in the cache. Then, the
cache is updated with the user ID as the cache key and the
information as the value. For example, a typical way to perform
this type of caching is to transform the user information returned
from the database into JSON and store that in the cache. Figure
1: With cache-aside caching, the client first looks up a key from
the cache. On cache miss, the client queries the database and
updates the cache. Cache-aside caching is popular because it
is easy to set up a cache server such as Redis and use it to cache
database queries and service responses. With cache-aside caching,
the cache server is passive and does not need to know which
database you use or how the results are mapped to the cache. It is
your application doing all the cache management and data
transformation. In many cases, cache-aside caching is a simple and
effective way to reduce application latency. You can hide database
access latency by having the most relevant information in a cache
server close to your application. However, cache-aside caching can
also be problematic if you have data consistency or freshness
requirements. For example, if you have multiple concurrent readers
that are looking up a key in the cache, you need to coordinate in
your application how you handle concurrent cache misses; otherwise,
you may end up with multiple database accesses and cache updates,
which may result in subsequent cache lookups returning different
values. However, with cache-aside caching, you lose transaction
support because the cache and the database do not know each other,
and it’s the application’s responsibility to coordinate updates to
the data. Finally, cache-aside caching can have significant tail
latency because some cache lookups experience the database read
latency on a cache miss. That is, although in the case of a cache
hit, access latency is fast because it’s coming from a nearby cache
server; cache lookups that experience a cache miss are only as fast
as database access. That’s why the geographic latency to your
database still can matter a great deal even if you are caching
because tail latency is experienced surprisingly often in many
scenarios. Read-Through Caching Read-through caching is a strategy
where, unlike cache-aside caching, the cache is an active component
when there is a cache miss. When there is a cache miss, a
read-through cache attempts to read a value for the key from the
backing store automatically. Latency is similar to cache-aside
caching, although backing store retrieval latency is from the cache
to the backing store, not from application to backing store, which
may be smaller, depending on your deployment architecture. Figure 2
shows an example of a read-through cache in action. The application
performs a cache lookup on a key, and if there is a cache miss, the
cache performs a read to the database to obtain the value for the
key. The cache then updates itself and returns the value to the
application. From an application point of view, a cache miss is
transparent because the cache always returns a key if one exists,
regardless of whether there was a cache miss or not. Figure 2:
With read-through caching, the client looks up a key from the
cache. Unlike with cache-aside caching, the cache queries the
database and updates itself on cache miss. Read-through
caching is more complex to implement because a cache needs to be
able to read the backing store, but it also needs to transform the
database results into a format for the cache. For example, if the
backing store is an SQL database server, you need to convert the
query results into a JSON or similar format to store the results in
the cache. The cache is, therefore, more coupled with your
application logic because it needs to know more about your data
model and formats. However, because the cache coordinates the
updates and the database reads with read-through caching, it can
give transactional guarantees to the application and ensure
consistency on concurrent cache misses. Furthermore, although a
read-through cache is more complex from an application integration
point of view, it does remove cache management complexity from the
application. Of course, the same caveat of tail latency applies to
read-through caches as they do to cache-aside caching. An
exception: as active components, read-through caches can hide the
latency better with, for example, refresh-ahead caching. Here, the
cache asynchronously updates the cache before the values are
expired – therefore hiding the database access latency from
applications altogether when a value is in the cache. Write-Through
Caching Cache-aside and read-through caching are strategies around
caching reads, but sometimes, you also want the cache to support
writes. In such cases, the cache provides an interface for updating
the value of a key that the application can invoke. In the case of
cache-aside caching, the application is the only one communicating
with the backing store and, therefore, updates the cache. However,
with read-through caching, there are two options for dealing with
writes: write-through and write-behind caching. Write-through
caching is a strategy where an update to the cache propagates
immediately to the backing store. Whenever a cache is updated, the
cache synchronously updates the backing store with the cached
value. The write latency of write-through cache is dominated by the
write latency to the backing store, which can be significant. As
shown in Figure 3, an application updates a cache using an
interface provided by the cache with a key and a value pair. The
cache updates its state with the new value, updates the database
with the new value and waits for the database to commit the update
until acknowledging the cache update to the application. Figure
3: With write-through caching, the client writes a key-value pair
to the cache. The cache immediately updates the cache and the
database. Write-through caching aims to keep the cache and the
backing storage in sync. However, for non-transactional caches, the
cache and backing store can be out of sync in the presence of
errors. For example, if write to cache succeeds, but the write to
backing store fails, the two will be out of sync. Of course, a
write-through cache can provide transactional guarantees by trading
off some latency to ensure that the cache and the database are
either both updated or neither of them is. As with a read-through
cache, write-through caching assumes that the cache can connect to
the database and transform a cache value into a database query. For
example, if you are caching user data where the user ID serves as
the key and a JSON document represents the value, the cache must be
able to transform the JSON representation of user information into
a database update. With write-through caching, the simplest
solution is often to store the JSON in the database. The primary
drawback of write-through caching is the latency associated with
cache updates, which is essentially equivalent to database commit
latency. This can be significant. Write-Behind Caching Write-behind
caching strategy updates the cache immediately, unlike
write-through caching, which defers the database updates. In other
words, with write-behind caching, the cache may accept multiple
updates before updating the backing store, as shown in Figure 4,
where the cache accepts three cache updates before updating the
database. Figure 4: With write-behind caching, the client
writes a key-value pair to the cache. However, unlike with
write-through caching, the cache updates the cache but defers the
database update. Instead, write-behind cache will batch multiple
cache updates to a single database update. The write latency
of a write-behind cache is lower than with write-through caching
because the backing store is updated asynchronously. That is, the
cache can acknowledge the write immediately to the application,
resulting in a low-latency write, and then perform the backing
store update in the background. However, the downside of
write-behind caching is that you lose transaction support because
the cache can no longer guarantee that the cache and the database
are in sync. Furthermore, write-behind caching can reduce
durability, which is the guarantee that you don’t lose data. If the
cache crashes before flushing updates to the backing store, you can
lose the updates. Client-Side Caching A client-side caching
strategy means having the cache at the client layer within your
application. Although cache servers such as Redis use in-memory
caching, the application must communicate over the network to
access the cache via the Redis protocol. If the application is a
service running in a data center, a cache server is excellent for
caching because the network round trip within a data center is
fast, and the cache complexity is in the cache itself. However,
last-mile latency can still be a significant factor in user
experience on a device, which is why client-side caching is so
lucrative. Instead of using a cache server, you have the cache in
your application. With client-side caching, a combination of
read-through and write-behind caching is optimal from a latency
point of view because both reads and writes are fast. Of course,
your client usually won’t be able to connect with the database
directly, but instead accesses the database indirectly via a proxy
or an API server. Client-side caching also makes transactions hard
to guarantee because of the database access indirection layers and
latency. For many applications that need low-latency client-side
caching, the local-first approach to replication may be more
practical. But for simple read caching, client-side caching can be
a good solution to achieve low latency. Of course, client-side
caching also has a trade-off: It can increase the memory
consumption of the application because you need space for the
cache. Distributed Caching So far, we have only discussed caching
as if a single cache instance existed. For example, you use an
in-application cache or a single Redis server to cache queries from
a PostgreSQL database. However, you often need multiple copies of
the data to reduce geographic latency across various locations or
scale out to accommodate your workload. With such distributed
caching, you have numerous instances of the cache that either work
independently or in a cache cluster. With distributed caching, you
have many of the same complications and considerations as with
discussed in Chapter 4 on replication and Chapter 5 on
partitioning. With distributed caching, you don’t want to fit all
the cached data on every instance but instead have cached data
partitioned between the nodes. Similarly, you can replicate the
partitions on multiple instances for high availability and reduced
access latency. Overall, distributed caching is an intersection of
the benefits and problems of caching, partitioning and replication,
so watch out if you’re going with that. *** To keep reading,
download the
3-chapter Latency excerpt free from ScyllaDB or purchase the complete book
from Manning. Also, catch Pekka live during the upcoming
“Building
Low Latency Apps Masterclass” (free and virtual).
ScyllaDB’s co-founder/CTO goes deeper into how tablets
work, then looks at the design behind ScyllaDB X Cloud’s
autoscaling Following the recent ScyllaDB X
Cloud release, Tim Koopmans sat down (virtually) with ScyllaDB
Co-Founder and CTO Avi Kivity. The goal: get the engineering
perspective on all the multiyear projects leading up to this
release. This includes using Raft for topology and schema metadata,
moving from vNodes to tablets-based data distribution, allowing up
to 90% storage utilization, new compression approaches, etc. etc.
In part 1 of this 3-part series, we looked at
the motivations and architectural shifts behind ScyllaDB X
Cloud, particularly with respect to Raft and tablets-based data
distribution. This blog post goes deeper into how tablets work,
then looks at the design behind ScyllaDB X Cloud’s autoscaling.
Read part 1 You can watch the
complete video here. Tackling technical challenges
Tim: With such a complex project, I’m guessing
that you didn’t nail everything perfectly on the first try. Could
you walk us through some of the hard problems that took time to
crack? How did you work around those hurdles? Avi:
One of the difficult things was the distribution related to racks
or availability zones (we use those terms interchangeably). With
the vNodes method of data distribution, a particular replica can
hop around different racks. That does work, but it creates problems
when you have materialized views. With a materialized view, each
row in the base table is tied to a row in the materialized view. If
there’s a change in the relationship between which replica on the
base table owns the row on the materialized view, that can cause
problems with data consistency. We struggled with that a lot until
we came to a solution of just forbidding having a replication
factor that’s different from the number of racks or availability
zones. That simple change solved a lot of problems. It’s a very
small restriction because, practically speaking, the vast majority
of users have a replication factor of 3, and they use 3 racks or 3
availability zones. So the restriction affects very few people, but
solves a large number of problems for us…so we’re happy that we
made it. How tablets prevent hot partitions Tim:
What about things like hot partitions and data skew in tablets?
Does tablets help here since you’re working with smaller chunks?
Avi: Yes. With tablets, our granularity is 5GB, so
we can balance data in 5GB chunks. That might sound large, but it’s
actually very small compared to the node capacity. The 5GB size was
selected because it’s around 1% of the data that a single vCPU can
hold. For example, an i3 node has around 600GB of storage per vCPU,
and 1% of that is 5GB. That’s where the 5GB number came from. Since
we control individual tablets, we can isolate a tablet to a single
vCPU. Then, instead of a tablet being 1% of a vCPU, it can take
100% of it. That effectively increases the amount of compute power
that is dedicated to the tablet by a factor of 100. This will let
us isolate hot partitions into their own vCPUs. We don’t do this
yet, but detecting hot partitions and isolating them in this way
will improve the system’s resilience to hot partition problems.
Tim: That’s really interesting. So have we gone
from shard per core to almost tablet per core? Is that what the 1%
represents, on average? Avi: The change is
that we now have additional flexibility. With a static
distribution, you look at the partition key and you know in advance
where it will go. Here, you look at the partition key and you
consult an indirection table. And that indirection table is under
our control…which means we can play with it and adjust things.
Tim: Can you say more about the indirection
table? Avi: It’s called system.tablets. It
lays out the topology of the cluster. For every table and every
token range, it lists what node and what shard will handle those
keys. It’s important that it’s per table. With vNodes, we had the
same layout for all tables. Some tables can be very large, some
tables can be very small, some tables can be hot, some tables can
be cold…so the one-size-fits-all approach doesn’t always work. Now,
we have the flexibility to lay out different tables in different
ways. How driver changes simplify complexity
Tim: Very cool. So tablets seem to solve a
lot of problems – they just have a lot of good things going for
them. I guess they can start servicing requests as soon as a new
node receives a tablet? That should help with long-tail latency for
cluster operations. We also get more fine-grained control over how
we pack data into the cluster (and we’ll talk about storage
utilization shortly). But you mentioned the additional table. Is
there any other overhead or any operational complexity?
Avi: Yes. It does introduce more complexity. But
since it’s under our control, we also introduced mitigations for
that. For example,
the drivers now have to know about this indirection layer, so we
modified them. We have this reactive approach where a driver
doesn’t read the tablets table upfront. Instead, when it doesn’t
know the layout of tablets on a cluster, it just fires off a
request randomly. If it hits, great. If it misses, then along with
the results, we’ll get back a notification about the topology of
that particular tablet. As it fires off more requests, it will
gradually learn the topology of the cluster. And when the topology
changes, it will react to how the cluster layout changes. That
saves it from doing a lot of upfront work – so it can send requests
as soon as it connects to the cluster. ScyllaDB’s approach
to autoscaling Tim: Let’s shift over to
autoscaling. Autoscaling in databases generally seems more like
marketing than reality to me. What’s different about ScyllaDB X
Cloud’s approach to autoscaling? Avi: One
difference is that we can autoscale much later, at least for
storage-bound workloads. Before, we would scale at around 70%
storage utilization. But now we will start scaling at 90%. This
decreases the cluster cost because more of the cluster storage is
used to store data, rather than being used as a free space cushion.
Tablets allow us to do that. Since tablets lets us add nodes
concurrently, we can scale much faster. Also, since each tablet is
managed independently, we can remove its storage as soon as the
tablet is migrated off its previous node. Before, we had to wait
until the data was completely transitioned to a new node, and then
we would run a cleaner process that would erase it from the
original node. But now this is done incrementally (in 5GB
increments), so it happens very quickly. We can migrate a 5GB
tablet in around a minute, sometimes even less. As soon as a
cluster scale out begins, the node storage decreases immediately.
That means we can defer the scale out decision, waiting until it’s
really needed. Scaling for CPU, by measuring the CPU usage, will be
another part of that. CPU is used for many different things in
ScyllaDB. It can be used for serving queries, but it’s also used
for internal background tasks like compaction. It can also be used
for queries that – from the user’s perspective – are background
queries like running analytics. You wouldn’t want to scale your
cluster just because you’re running analytics on it. These are jobs
that can take as long as they need to; you wouldn’t necessarily
want to add more hardware just to make them run faster. We can
distinguish between CPU usage for foreground tasks (for queries
that are latency sensitive) and CPU usage for maintenance tasks,
for background work, and for queries where latency is not so
important. We will only scale when the CPU for foreground tasks
runs low. Tim: Does the user have to do anything
special to prioritize the foreground vs background queries? Is that
just part of
workload prioritization? Or does it just understand the
difference? Avi: We’re trying not to be too
clever. It does use the existing service level mechanism. And in
the service level definition, you can say whether it’s a
transaction workload or a batch workload. All you need to do is run
an alter service level statement to designate a particular service
level as a batch workload. And once you do that, then the cluster
will not scale because that service level needs more CPU. It will
only scale if your real-time queries are running out of CPU. It’s
pretty normal to see ScyllaDB at 100% CPU. But that 100% is split:
part goes to your workload, and part goes to maintenance like
compaction. You don’t want to trigger scaling just because the
cluster is using idle CPU power for background work. So, we track
every cycle and categorize it as either foreground work or
background work, then we make decisions based on that. We don’t
want it to scale out too far when that’s just not valuable.
Latency is a monstrous concern here at ScyllaDB. So we’re
pleased to bring you excerpts from Pekka Enberg’s new book on
Latency…and a masterclass with Pekka as well! Latency is a
monstrous concern here at ScyllaDB. Our engineers, our users, and
our broader community are obsessed with it…to the point that we
developed an entire conference on low latency. [Side note: That’s
P99 CONF, a free + virtual
conference coming to you live, October 22-23.] Join P99 CONF – Free + Virtual We’re
delighted that Pekka Enberg decided to write an entire book on
latency to punish himself share his hard-fought
latency lessons learned. The book (quite efficiently titled
“Latency”) is now off to the printers. From his intro: Latency
is so important across a variety of use cases today. Still, it’s a
tricky topic because many low-latency techniques are effectively
developer folklore hidden in blog posts, mailing lists, and side
notes in books. When faced with a latency problem, understanding
what you’re even dealing with often takes a lot of time. I remember
multiple occasions where I saw peculiar results from benchmarks,
which resulted in an adventure down the software and hardware stack
where I learned something new.By reading this book, you
will understand latency better, how you can measure latency
accurately, and discover the different techniques for achieving low
latency. This is the book I always wished I had when grappling with
latency issues. Although this book focuses on applying the
techniques in practice, I will also bring up enough of the
background side of things to try to balance between theory and
practice. ScyllaDB is sponsoring chapters from the book, so
we’ll be trickling out some excerpts on our blogs. Get the Latency book
excerpt PDF More good news: Pekka is also joining us for a
Latency Masterclass on September 25. Be there as he speedruns
through the latency related patterns you’ll want to know when
working on low latency apps. And bring your toughest latency
questions! Join
us live on September 25 Until then, let’s kick off our
Latency book excerpts with the start of Pekka’s caching
chapter. It’s reprinted here with permission of the publisher. ***
Why cache data? Typically, you should consider caching for
reducing latency over other techniques if your application or
system: Doesn’t need transactions or complex queries. Cannot be
changed which makes using techniques such as replication hard. Has
compute or storage constraints that prevent other techniques. Many
applications and systems are simple enough that a key-value
interface, typical for caching solutions, is more than sufficient.
For example, you can store user data such as profiles and settings
as a key-value pair where the key is the user ID, and the value is
the user data in JSON or a similar format. Similarly, session
management, where you keep track of logged in user session state is
often simple enough that it doesn’t require complex queries.
However, caching can eventually be too limiting as you move to more
complicated use cases, such as recommendations or ad delivery. You
have to look into other techniques. Overall, whether your
application is simple enough to use caching is highly use
case-specific. Often, you look into caching because you cannot or
don’t want to change the existing system. For example, you may have
a database system that you cannot change, which does not support
replication, but you have clients accessing the database from
multiple locations. You may then look into caching some query
results to reduce latency and scale the system, which is a typical
use of caching. However, this comes with various caveats on data
freshness and consistency, which we’ll discuss in this chapter.
Compute and storage constraints can also be a reason to use caching
instead of other techniques. Depending on their implementation,
colocation and replication can have high storage requirements,
which may prevent you from using them. For example, suppose you
want to reduce access latency to a large data set, such as a
product catalog in an e-commerce site. In that case, it may be
impractical to replicate the whole data set in a client with the
lowest access latency. However, caching parts of the product
catalog may still make sense to cache in the client to reduce
latency but simultaneously live with the client’s storage
constraints. Similarly, it may be impractical to replicate a whole
database to a client or a service because database access requires
compute capacity for query execution, which may not be there.
Caching overview With caching, you can keep a temporary copy of
data to reduce access time significantly by reusing the same result
many times. For example, if you have a REST API that takes a long
time to compute a result, you can cache the REST API results in the
client to reduce latency. Accessing the cached results can be as
fast as reading from the memory, which can significantly reduce
latency. You can also use caching for data items that don’t exist,
called negative caching. For example, maybe the REST API you use is
there to look up customer information based on some filtering
parameters. In some cases, no results will match the filter, but
you still need to perform the expensive computation to discover
that. In that scenario, you would use negative caching to cache the
fact that there are no results, speeding up the search. Of course,
caching has a downside, too: you trade off data freshness for
reduced access latency. You also need more storage space to keep
the cached data around. But in many use cases, it’s a trade-off you
are willing to take. Cache storage is where you keep the temporary
copies of data. Depending on the use case, cache storage can either
be in the main memory or on disk, and cache storage can be accessed
either in the same memory address space as the application or over
a network protocol. For example, you can use an in-memory cache
library value in your application memory or a key-value store such
as Redis or Memcached to cache values in a remote server. With
caching, an application looks up values based on a cache key from
the cache. When the cache has a copy of the value, we call that a
cache hit and serve the data access from the cache. However, if
there is no value in the cache, we call that scenario a cache miss
and must retrieve the value from the backing store. A key metric
for an effective caching solution is the cache hit-to-miss ratio,
which describes how often the application finds a relevant value in
the cache and how frequently the cache does not have a value. If a
cache has a high cache hit ratio, it is utilized well, meaning
there is less need to perform a slow lookup or compute the result.
With a high cache miss ratio, you are not taking advantage of the
cache. This can mean that your application runs slower than without
caching because caching itself has some overhead. One major
complication with caches is cache eviction policies or what values
to throw out from the cache. The main point of a cache is to
provide fast access but also fit the cache in a limited storage
space. For example, you may have a database with hundreds of
gigabytes of data. Still, you can only reasonably cache tens of
gigabytes in the memory address space of your application because
of machine resource limitations. You, therefore, need some policy
to determine which values stay in the cache and which ones you can
evict if you run out of cache space. Similarly, once you cache a
value, you can’t always retain the value in the cache indefinitely
if the source value changes. For example, you may have a time-based
eviction policy enforcing that a cached value can be at least a
minute old before updating to the latest source value. Despite the
challenges, caching is an effective technique to reduce latency in
your application, in particular when you can’t change some parts of
the system and when your use case doesn’t warrant investment in
things like colocation, replication, or partitioning. With that in
mind, let’s look at the different caching strategies. Caching
strategies When adding caching to your application, you must first
consider your caching strategy, which determines how reads and
writes happen from the cache and the underlying backing store, such
as a database or a service. At a high level, you need to decide if
the cache is passive or active when there is a cache miss. In other
words, when your application looks up a value from the cache, but
the value is not there or has expired, the caching strategy
mandates whether it’s your application or the cache that retrieves
the value from the backing store. As usual, different caching
strategies have different trade-offs on latency and complexity, so
let’s get right into it. To be continued… Get the Latency book
excerpt PDFJoin
the Latency Masterclass on September 25
ScyllaDB’s co-founder/CTO on the motivations and
architectural shifts behind ScyllaDB X Cloud — focusing on Raft and
tablets-based data distribution If you follow ScyllaDB,
you’ve probably heard us talking about Raft and tablets-based data
distribution for a few years now. The ultimate goal of these
projects (plus a few related ones) was to optimize elasticity and
price performance – especially for dynamic and storage-bound
workloads. And we finally hit a nice milestone along that journey:
the release of ScyllaDB X Cloud. You can read about the release in
our earlier blog post. Here, we wanted to share the engineering
perspective on these architectural shifts. Tim Koopmans recently
sat down with Avi Kivity – ScyllaDB Co-Founder and CTO – to chat
about the underlying motivation and design decisions. You can
watch
the complete video here. But if you prefer to read, we’re
writing up the highlights. This is the first blog post in a
three-part series. Why ScyllaDB X Cloud? For scaling large clusters
Tim: Let’s start with a big picture. What really
motivated the architectural evolution behind what we know as
ScyllaDB X Cloud? Was this change inevitable? How did it come into
place? Avi: It came from our experience managing
clusters for our customers. With the existing architecture, things
like scaling up the cluster in preparation for events like Black
Friday could take a long time. Since ScyllaDB can manage very large
nodes (e.g., nodes with 30TB of data), moving that data onto new
nodes could take a long time, sometimes a day. Also, nodes had to
be added one at a time. If you had a large cluster, scaling the
cluster would be a nail-biting experience. So we decided to improve
that experience and, along the way, we improved many parts of the
architecture. Tim: Do you have any numbers around
what it used to be like to scale a large cluster?
Avi: One of our large clusters has 75 nodes, each
of which has around 60TB. It’s a pretty hefty cluster. It’s nice
watching clusters like that on our dashboards and seeing
compactions at tens of gigabytes per second aggregate across the
cluster. Those clusters are churning through large amounts of data
per second and carrying a huge amount of data. Now, we can scale
this amount of data in minutes, maybe an hour for the most extreme
cases. So it’s quite a huge change. Why ScyllaDB addressed scaling
with Tablets & Raft Tim: When you think
about dynamic or storage-bound workloads today, what are other
databases getting wrong in this space? How did that lead you to
this new approach, with tablets? Avi: “Other
databases” is a huge area – there are hundreds of databases. Let’s
talk about our heritage. We came from the Cassandra model. And the
basic problem there was the static distribution of data. The node
layout determines how data is distributed, and as long as you don’t
add or remove nodes, it remains static. That means you have no
flexibility. Also, the focus on having availability over
consistency led to no central point for managing the topology.
Without a coordinating authority, you could make only one change at
a time. One of the first changes that we made was to add a
coordinating authority in the form of Raft. Before, we managed
topology with Gossip, which really puts cluster management
responsibility on the operator. We moved it to a Raft group to
centralize the management. You’ve probably heard the old proverb
that anything in computer science can be solved with another layer
of indirection. We did that with tablets, more or less. We inserted
a layer of indirection so that instead of having a static
distribution of data to nodes, it goes through a translation table.
Each range of rows is mapped to a different node in a tablets
table. By manipulating the tablets table, we can redirect small
packages of data (specifically, 5GB – that’s pretty small for us).
We can redirect the granularity of 5GB to any node and any CPU on
any node. We can move those packages around at will, and those
packages are moved at the line rate, so it’s no problem to fire
them away at gigabits per second across the cluster. And that gives
us the ability to rebalance data on a cluster or add and remove
nodes very quickly. Tim: So tablets are
really a new ScyllaDB abstraction? Is it an abstraction that breaks
those tables into independently managed units? And I think you said
the size is 5GB – is that configurable? Avi:
It’s configurable, but I don’t recommend playing with it. Normally,
you stay between 2.6GB and 10GB. When it reaches 10GB, it triggers
a split, which will bring it back to 5GB. So each tablet will be
split into two. If it goes down to 2.5GB, it will trigger a merge,
merging two tablets into one larger tablet – again, bringing it
back to 5GB. Tim: So ensuring that things can be
dynamically split…We can move data around, rebalance across the
cluster…That gives us finer-grained load distribution as well as
better scalability and perhaps a bit of separation between compute
and storage, right? Because we’re not necessarily tied to the size
of the compute node anymore. We can have different instance types
in a cluster now, as an indirect result of this change. The tipping
point Tim: Avi, you said that re-architecting
around tablets has been a huge shift. So what was the tipping
point? Was it just that vNodes didn’t work anymore in terms of how
you organize data? What was your aha moment where you said, “Yeah,
I think we need to do something different here”?
Avi: It was a combination of things, and
since this was such a major change, we needed a lot of motivation
to do it. One part of it was the inability to perform topology
changes that involve more than one node at a time. Another part was
that the previous streaming mechanism was very slow. Yet another
part is that, because the streaming mechanism was so slow, we had
to scale well in advance of exhausting the storage on the node.
That required us to leave a lot of free space on the node, and
that’s wasteful. We took all of this into consideration, and that
was enough motivation for us to take on a multi-year change. I
think it was well worth it. Tim:
Multiyear…So how long ago did you start workshopping different
ideas to solve? Avi: The first phase was
changing topology to be strongly consistent and having a central
authority to coordinate it. I think it took around a couple of
years to switch to Raft topology. Before that, we switched schema
management to use Raft as well. That was a separate problem, but
since those two problems had the same solution, we jumped on it.
We’re still not completely done. There are still a few features
that are not yet fully compatible with tablets – but we see the
light at the end of the tunnel now. [Stay tuned for parts 2 and 2]
Share your “extreme scale engineering” expertise with ~20K
like-minded engineers Whether you’re designing,
implementing, or optimizing systems that are pushed to their
limits, we’d love to hear about your most impressive achievements
and lessons learned – at Monster Scale Summit 2026. Become a
Monster Scale Summit Speaker What’s Monster Scale Summit?
Monster Scale Summit is a technical conference that connects the
community of people working on performance-sensitive data-intensive
applications. Engineers, architects, and SREs from gamechangers
around the globe will be gathering virtually to explore “monster
scale” challenges with respect to extreme levels of throughput,
data, and global distribution. It’s a lot like P99 CONF (also hosted by ScyllaDB) –
a two-day event that’s free, fully virtual, and highly interactive.
The core difference is that it’s focused on extreme scale
engineering vs. all things performance. Last time, we hosted
industry giants like Kelsey Hightower, Martin Kleppmann,
Discord, Slack, Canva… Browse
past sessions Details please! When: March 11 +
12 Where: Wherever you’d like! It’s intentionally
virtual, so you can present and interact with attendees from
anywhere around the world. Topics: Core topics
include distributed databases, streaming and real-time processing,
intriguing system designs, methods for balancing
latency/concurrency/throughput, SRE techniques proven at scale, and
infrastructure built for unprecedented demands. What we’re
looking for: We welcome a broad range of talks about
tackling the challenges that arise in the most massive, demanding
environments. The conference prioritizes technical talks sharing
first-hand experiences. Sessions are just 18-20 minutes – so
consider this your TED Talk debut! Share your
ideas