When ScyllaDB is Overkill vs. DynamoDB

ScyllaDB isn’t for everyone. In some cases, migrating from DynamoDB won’t reduce your costs at all. ScyllaDB isn’t for everyone. It’s designed specifically for teams that need predictable ultra-low latency with high throughput. And since one size never fits all, there are inevitably situations where you’d be much better served with a different database. For teams that need relatively high performance and want something simple to use, that “different database” is often DynamoDB. We’re the first to admit that there are situations where you’d be quite fine with DynamoDB. Sometimes moving to ScyllaDB just doesn’t make sense from a pure cost perspective. Other times, it could lower costs 5-40X. And there are other situations when one database is just a better fit from a technical perspective – regardless of costs. So where is the tipping point if you’re looking at the decision from a cost perspective? Often, it’s somewhere around 10,000 OPS. But the precise answer varies depending on your workload characteristics (read/write ratios) and the amount of data under management. That’s what we’ll focus on in this article. ScyllaDB and DynamoDB are a lot alike But first, let’s step back. Why are we comparing ScyllaDB and DynamoDB in the first place? For the past couple of years, we’ve worked with a lot of users moving from DynamoDB to ScyllaDB. That makes sense, given that ScyllaDB and DynamoDB share common goals of high performance and low hassle. But they’re fundamentally built differently. ScyllaDB’s close-to-the-metal architecture handles millions of ops/sec with predictable single-digit millisecond latencies and lower, predictable costs. Still, you can use the same data model in ScyllaDB as in DynamoDB. And an open source DynamoDB API (“Alternator”) simplifies migration. You redirect your application to ScyllaDB instead of DynamoDB and it just works like magic (actually, we listen on a specific port that understands the DynamoDB API). In most cases, minimal code changes are required. Moreover, both databases provide high performance, high availability, and multi-region support – with a fully managed “database-as-a-service” option. Can you make use of (at least) a minimally viable ScyllaDB cluster? One key difference between ScyllaDB and DynamoDB is provisioning clusters versus provisioning tables. When you work with DynamoDB, you provision tables and assign a different billing mode or capacity to each. In ScyllaDB, you provision a group of nodes (VMs, containers, pods, etc) that collectively manage and distribute data as a cluster. You can route traffic to that cluster with great performance…as long as there is enough capacity to sustain your workload’s needs. Even for applications with minimal traffic, you still want to provision a full ScyllaDB cluster. The smallest ScyllaDB Cloud cluster runs with 3-nodes. That’s 1) to ensure high availability and 2) to serve strongly consistent [or quorum] reads even if one node fails or goes out for maintenance. But in some cases, even that minimal ScyllaDB cluster might provide more power than you really need. ScyllaDB’s sweet spot is throughput-heavy workloads, given its ability to sustain massive parallel processing at scale. If your throughput wouldn’t really benefit from that, ScyllaDB is quite possibly overkill. The amount of data under management is another factor to consider. ScyllaDB is optimized for high throughput and low latencies and assumes that most of your data is frequently queried and requires low latencies. We achieve this by relying on local SSDs, which provide high concurrency and low latencies compared to other storage mediums. If you have a lot of data under management, your ScyllaDB cluster might require more, or larger, VMs. However, if your throughput isn’t high enough to make good use of that infrastructure because most of your data set is read infrequently, then ScyllaDB is probably overkill from a cost perspective. Discovering your specific tipping point With that reasoning in mind, let’s get more specific. You’re probably curious about what makes most sense for your own workload and storage requirements? There’s a calculator for that! We built a cost estimation calculator that compares ScyllaDB’s on-demand pricing vs DynamoDB’s on-demand pricing, using the same math as AWS. If you’re debating between selecting DynamoDB and ScyllaDB – or thinking of migrating from DynamoDB – these on-demand cost estimates are a fast way to see if ScyllaDB could be worth exploring. First off, note the minimums. The lowest number of operations per second (OPS) you can specify is 10K and the minimal storage set size is 1TB. That’s a pretty big hint – ScyllaDB is probably overkill for anything under that. Second, be careful when you’re entering your storage utilization. If you’re already using DynamoDB, don’t just copy over your DynamoDB storage utilization. Keep in mind that the reported utilization there refers to uncompressed RAW data. As you move to another database, your storage utilization will be less because you will typically be able to achieve at least 50% compression, if not more. Before you go and plug in your own numbers, let’s walk through how it plays out across two very different scenarios. Scenario 1: Storage Bound First, let’s consider a “storage-bound” case. For example, say you’re consuming 250TB of storage with DynamoDB. You would need 18 nodes of i3en.24xlarge – which are VERY large instances – to support 250TB in ScyllaDB. But if you consider that ScyllaDB’s compression typically provides a 50% gain, that would take us down to 125TB and require 9 nodes of i3en.24xlarge. If you look at the cluster capacity area, you see that this cluster can easily sustain close to 3M operations/second (as a rather conservative estimate) Now, if we click the DynamoDB button, you’ll see that our calculator tries to make a ScyllaDB and DynamoDB cost comparison analysis for you. But, unfortunately, it is telling us to talk to Sales. This indicates ScyllaDB is likely more expensive than DynamoDB here because you have so much storage for so few operations. Still, if you have other non-cost-related reasons to move, it might be worth a discussion. However, now assume that you decided to store only your most frequently accessed data on ScyllaDB. Also assume that’s 10% of the total 125TB (thus 13TB). In this case, the ScyllaDB pricing also drops….by a factor of ~10. Scenario 2: Write-Heavy Now, let’s increase the rate of writes, which are more expensive than reads with DynamoDB. If you bump up the throughput, note that the ScyllaDB estimates are the same for anywhere from 200K to 2M writes per second. That’s because as you move from 200K to 2M with ScyllaDB, you already have enough hardware to sustain this level of operations. In contrast, DynamoDB pricing keeps increasing when you increase the write throughput. Get your own cost estimation now Other factors to consider Of course, cost isn’t everything. Even when ScyllaDB seems like a more cost-effective option, moving from DynamoDB doesn’t always make sense. If your use case is heavily dependent on the AWS ecosystem, moving from DynamoDB to ScyllaDB might require considerable refactoring work. I once met with a DynamoDB user who started looking at ScyllaDB to replace DynamoDB and they had over a thousand Lambdas connected to DynamoDB. That’s a thousand lambdas you would need to refactor to connect to ScyllaDB. Also consider the feature set you currently require from DynamoDB. Although There is not a one-to-to one mapping for every DynamoDB features like multi-item transactions (TransactWriteItems, TransactGetItems) and accounting/capping are not (yet) available in ScyllaDB. For example, accounting and capping is an interesting one. I once met with a user who used DynamoDB’s provisioned throughput to their benefit, and throttled and billed their own customers according to it. Currently, there’s no such thing in ScyllaDB since we don’t impose any throughput limits by default. In this case, ScyllaDB didn’t make sense. On the flip side, there are two key situations where it makes sense to consider ScyllaDB even if there’s not an impressive cost advantage vs. DynamoDB: when you need better latency and more deployment flexibility. If DynamoDB can’t achieve the latency you need for whatever reason – or you want to keep latencies low without the hassles/cost of DAX – ScyllaDB could likely help you address that. And if you need to move your DynamoDB workloads to another cloud or on-prem, ScyllaDB can help you move fast with just minimal application refactoring – often just a one-line change. I talk a lot more about both of these situations in the blog post, DynamoDB: When to Move Out. Rule of thumb So the bottom line truly is “it depends.” But ScyllaDB could very well be overkill if your workload is under 10K OPS and You don’t expect much throughput growth, You’re satisfied with how DynamoDB meets your latency SLAs, and You have no foreseeable need to move beyond AWS Bonus: DynamoDB Cost Optimization Masterclass If you want more opinions, more strategies,and more options about DynamoDB cost optimization, I encourage you to take a look at this masterclass I recently participated in with Ales DeBrie and Miles Ward

Netflix’s Distributed Counter Abstraction

By: Rajiv Shringi, Oleksii Tkachuk, Kartik Sathyanarayanan

Introduction

In our previous blog post, we introduced Netflix’s TimeSeries Abstraction, a distributed service designed to store and query large volumes of temporal event data with low millisecond latencies. Today, we’re excited to present the Distributed Counter Abstraction. This counting service, built on top of the TimeSeries Abstraction, enables distributed counting at scale while maintaining similar low latency performance. As with all our abstractions, we use our Data Gateway Control Plane to shard, configure, and deploy this service globally.

Distributed counting is a challenging problem in computer science. In this blog post, we’ll explore the diverse counting requirements at Netflix, the challenges of achieving accurate counts in near real-time, and the rationale behind our chosen approach, including the necessary trade-offs.

Note: When it comes to distributed counters, terms such as ‘accurate’ or ‘precise’ should be taken with a grain of salt. In this context, they refer to a count very close to accurate, presented with minimal delays.

Use Cases and Requirements

At Netflix, our counting use cases include tracking millions of user interactions, monitoring how often specific features or experiences are shown to users, and counting multiple facets of data during A/B test experiments, among others.

At Netflix, these use cases can be classified into two broad categories:

  1. Best-Effort: For this category, the count doesn’t have to be very accurate or durable. However, this category requires near-immediate access to the current count at low latencies, all while keeping infrastructure costs to a minimum.
  2. Eventually Consistent: This category needs accurate and durable counts, and is willing to tolerate a slight delay in accuracy and a slightly higher infrastructure cost as a trade-off.

Both categories share common requirements, such as high throughput and high availability. The table below provides a detailed overview of the diverse requirements across these two categories.

Distributed Counter Abstraction

To meet the outlined requirements, the Counter Abstraction was designed to be highly configurable. It allows users to choose between different counting modes, such as Best-Effort or Eventually Consistent, while considering the documented trade-offs of each option. After selecting a mode, users can interact with APIs without needing to worry about the underlying storage mechanisms and counting methods.

Let’s take a closer look at the structure and functionality of the API.

API

Counters are organized into separate namespaces that users set up for each of their specific use cases. Each namespace can be configured with different parameters, such as Type of Counter, Time-To-Live (TTL), and Counter Cardinality, using the service’s Control Plane.

The Counter Abstraction API resembles Java’s AtomicInteger interface:

AddCount/AddAndGetCount: Adjusts the count for the specified counter by the given delta value within a dataset. The delta value can be positive or negative. The AddAndGetCount counterpart also returns the count after performing the add operation.

{
"namespace": "my_dataset",
"counter_name": "counter123",
"delta": 2,
"idempotency_token": {
"token": "some_event_id",
"generation_time": "2024-10-05T14:48:00Z"
}
}

The idempotency token can be used for counter types that support them. Clients can use this token to safely retry or hedge their requests. Failures in a distributed system are a given, and having the ability to safely retry requests enhances the reliability of the service.

GetCount: Retrieves the count value of the specified counter within a dataset.

{
"namespace": "my_dataset",
"counter_name": "counter123"
}

ClearCount: Effectively resets the count to 0 for the specified counter within a dataset.

{
"namespace": "my_dataset",
"counter_name": "counter456",
"idempotency_token": {...}
}

Now, let’s look at the different types of counters supported within the Abstraction.

Types of Counters

The service primarily supports two types of counters: Best-Effort and Eventually Consistent, along with a third experimental type: Accurate. In the following sections, we’ll describe the different approaches for these types of counters and the trade-offs associated with each.

Best Effort Regional Counter

This type of counter is powered by EVCache, Netflix’s distributed caching solution built on the widely popular Memcached. It is suitable for use cases like A/B experiments, where many concurrent experiments are run for relatively short durations and an approximate count is sufficient. Setting aside the complexities of provisioning, resource allocation, and control plane management, the core of this solution is remarkably straightforward:

// counter cache key
counterCacheKey = <namespace>:<counter_name>

// add operation
return delta > 0
? cache.incr(counterCacheKey, delta, TTL)
: cache.decr(counterCacheKey, Math.abs(delta), TTL);

// get operation
cache.get(counterCacheKey);

// clear counts from all replicas
cache.delete(counterCacheKey, ReplicaPolicy.ALL);

EVCache delivers extremely high throughput at low millisecond latency or better within a single region, enabling a multi-tenant setup within a shared cluster, saving infrastructure costs. However, there are some trade-offs: it lacks cross-region replication for the increment operation and does not provide consistency guarantees, which may be necessary for an accurate count. Additionally, idempotency is not natively supported, making it unsafe to retry or hedge requests.

Edit: A note on probabilistic data structures:

Probabilistic data structures like HyperLogLog (HLL) can be useful for tracking an approximate number of distinct elements, like distinct views or visits to a website, but are not ideally suited for implementing distinct increments and decrements for a given key. Count-Min Sketch (CMS) is an alternative that can be used to adjust the values of keys by a given amount. Data stores like Redis support both HLL and CMS. However, we chose not to pursue this direction for several reasons:

  • We chose to build on top of data stores that we already operate at scale.
  • Probabilistic data structures do not natively support several of our requirements, such as resetting the count for a given key or having TTLs for counts. Additional data structures, including more sketches, would be needed to support these requirements.
  • On the other hand, the EVCache solution is quite simple, requiring minimal lines of code and using natively supported elements. However, it comes at the trade-off of using a small amount of memory per counter key.

Eventually Consistent Global Counter

While some users may accept the limitations of a Best-Effort counter, others opt for precise counts, durability and global availability. In the following sections, we’ll explore various strategies for achieving durable and accurate counts. Our objective is to highlight the challenges inherent in global distributed counting and explain the reasoning behind our chosen approach.

Approach 1: Storing a Single Row per Counter

Let’s start simple by using a single row per counter key within a table in a globally replicated datastore.

Let’s examine some of the drawbacks of this approach:

  • Lack of Idempotency: There is no idempotency key baked into the storage data-model preventing users from safely retrying requests. Implementing idempotency would likely require using an external system for such keys, which can further degrade performance or cause race conditions.
  • Heavy Contention: To update counts reliably, every writer must perform a Compare-And-Swap operation for a given counter using locks or transactions. Depending on the throughput and concurrency of operations, this can lead to significant contention, heavily impacting performance.

Secondary Keys: One way to reduce contention in this approach would be to use a secondary key, such as a bucket_id, which allows for distributing writes by splitting a given counter into buckets, while enabling reads to aggregate across buckets. The challenge lies in determining the appropriate number of buckets. A static number may still lead to contention with hot keys, while dynamically assigning the number of buckets per counter across millions of counters presents a more complex problem.

Let’s see if we can iterate on our solution to overcome these drawbacks.

Approach 2: Per Instance Aggregation

To address issues of hot keys and contention from writing to the same row in real-time, we could implement a strategy where each instance aggregates the counts in memory and then flushes them to disk at regular intervals. Introducing sufficient jitter to the flush process can further reduce contention.

However, this solution presents a new set of issues:

  • Vulnerability to Data Loss: The solution is vulnerable to data loss for all in-memory data during instance failures, restarts, or deployments.
  • Inability to Reliably Reset Counts: Due to counting requests being distributed across multiple machines, it is challenging to establish consensus on the exact point in time when a counter reset occurred.
  • Lack of Idempotency: Similar to the previous approach, this method does not natively guarantee idempotency. One way to achieve idempotency is by consistently routing the same set of counters to the same instance. However, this approach may introduce additional complexities, such as leader election, and potential challenges with availability and latency in the write path.

That said, this approach may still be suitable in scenarios where these trade-offs are acceptable. However, let’s see if we can address some of these issues with a different event-based approach.

Approach 3: Using Durable Queues

In this approach, we log counter events into a durable queuing system like Apache Kafka to prevent any potential data loss. By creating multiple topic partitions and hashing the counter key to a specific partition, we ensure that the same set of counters are processed by the same set of consumers. This setup simplifies facilitating idempotency checks and resetting counts. Furthermore, by leveraging additional stream processing frameworks such as Kafka Streams or Apache Flink, we can implement windowed aggregations.

However, this approach comes with some challenges:

  • Potential Delays: Having the same consumer process all the counts from a given partition can lead to backups and delays, resulting in stale counts.
  • Rebalancing Partitions: This approach requires auto-scaling and rebalancing of topic partitions as the cardinality of counters and throughput increases.

Furthermore, all approaches that pre-aggregate counts make it challenging to support two of our requirements for accurate counters:

  • Auditing of Counts: Auditing involves extracting data to an offline system for analysis to ensure that increments were applied correctly to reach the final value. This process can also be used to track the provenance of increments. However, auditing becomes infeasible when counts are aggregated without storing the individual increments.
  • Potential Recounting: Similar to auditing, if adjustments to increments are necessary and recounting of events within a time window is required, pre-aggregating counts makes this infeasible.

Barring those few requirements, this approach can still be effective if we determine the right way to scale our queue partitions and consumers while maintaining idempotency. However, let’s explore how we can adjust this approach to meet the auditing and recounting requirements.

Approach 4: Event Log of Individual Increments

In this approach, we log each individual counter increment along with its event_time and event_id. The event_id can include the source information of where the increment originated. The combination of event_time and event_id can also serve as the idempotency key for the write.

However, in its simplest form, this approach has several drawbacks:

  • Read Latency: Each read request requires scanning all increments for a given counter potentially degrading performance.
  • Duplicate Work: Multiple threads might duplicate the effort of aggregating the same set of counters during read operations, leading to wasted effort and subpar resource utilization.
  • Wide Partitions: If using a datastore like Apache Cassandra, storing many increments for the same counter could lead to a wide partition, affecting read performance.
  • Large Data Footprint: Storing each increment individually could also result in a substantial data footprint over time. Without an efficient data retention strategy, this approach may struggle to scale effectively.

The combined impact of these issues can lead to increased infrastructure costs that may be difficult to justify. However, adopting an event-driven approach seems to be a significant step forward in addressing some of the challenges we’ve encountered and meeting our requirements.

How can we improve this solution further?

Netflix’s Approach

We use a combination of the previous approaches, where we log each counting activity as an event, and continuously aggregate these events in the background using queues and a sliding time window. Additionally, we employ a bucketing strategy to prevent wide partitions. In the following sections, we’ll explore how this approach addresses the previously mentioned drawbacks and meets all our requirements.

Note: From here on, we will use the words “rollup” and “aggregate” interchangeably. They essentially mean the same thing, i.e., collecting individual counter increments/decrements and arriving at the final value.

TimeSeries Event Store:

We chose the TimeSeries Data Abstraction as our event store, where counter mutations are ingested as event records. Some of the benefits of storing events in TimeSeries include:

High-Performance: The TimeSeries abstraction already addresses many of our requirements, including high availability and throughput, reliable and fast performance, and more.

Reducing Code Complexity: We reduce a lot of code complexity in Counter Abstraction by delegating a major portion of the functionality to an existing service.

TimeSeries Abstraction uses Cassandra as the underlying event store, but it can be configured to work with any persistent store. Here is what it looks like:

Handling Wide Partitions: The time_bucket and event_bucket columns play a crucial role in breaking up a wide partition, preventing high-throughput counter events from overwhelming a given partition. For more information regarding this, refer to our previous blog.

No Over-Counting: The event_time, event_id and event_item_key columns form the idempotency key for the events for a given counter, enabling clients to retry safely without the risk of over-counting.

Event Ordering: TimeSeries orders all events in descending order of time allowing us to leverage this property for events like count resets.

Event Retention: The TimeSeries Abstraction includes retention policies to ensure that events are not stored indefinitely, saving disk space and reducing infrastructure costs. Once events have been aggregated and moved to a more cost-effective store for audits, there’s no need to retain them in the primary storage.

Now, let’s see how these events are aggregated for a given counter.

Aggregating Count Events:

As mentioned earlier, collecting all individual increments for every read request would be cost-prohibitive in terms of read performance. Therefore, a background aggregation process is necessary to continually converge counts and ensure optimal read performance.

But how can we safely aggregate count events amidst ongoing write operations?

This is where the concept of Eventually Consistent counts becomes crucial. By intentionally lagging behind the current time by a safe margin, we ensure that aggregation always occurs within an immutable window.

Lets see what that looks like:

Let’s break this down:

  • lastRollupTs: This represents the most recent time when the counter value was last aggregated. For a counter being operated for the first time, this timestamp defaults to a reasonable time in the past.
  • Immutable Window and Lag: Aggregation can only occur safely within an immutable window that is no longer receiving counter events. The “acceptLimit” parameter of the TimeSeries Abstraction plays a crucial role here, as it rejects incoming events with timestamps beyond this limit. During aggregations, this window is pushed slightly further back to account for clock skews.

This does mean that the counter value will lag behind its most recent update by some margin (typically in the order of seconds). This approach does leave the door open for missed events due to cross-region replication issues. See “Future Work” section at the end.

  • Aggregation Process: The rollup process aggregates all events in the aggregation window since the last rollup to arrive at the new value.

Rollup Store:

We save the results of this aggregation in a persistent store. The next aggregation will simply continue from this checkpoint.

We create one such Rollup table per dataset and use Cassandra as our persistent store. However, as you will soon see in the Control Plane section, the Counter service can be configured to work with any persistent store.

LastWriteTs: Every time a given counter receives a write, we also log a last-write-timestamp as a columnar update in this table. This is done using Cassandra’s USING TIMESTAMP feature to predictably apply the Last-Write-Win (LWW) semantics. This timestamp is the same as the event_time for the event. In the subsequent sections, we’ll see how this timestamp is used to keep some counters in active rollup circulation until they have caught up to their latest value.

Rollup Cache

To optimize read performance, these values are cached in EVCache for each counter. We combine the lastRollupCount and lastRollupTs into a single cached value per counter to prevent potential mismatches between the count and its corresponding checkpoint timestamp.

But, how do we know which counters to trigger rollups for? Let’s explore our Write and Read path to understand this better.

Add/Clear Count:

An add or clear count request writes durably to the TimeSeries Abstraction and updates the last-write-timestamp in the Rollup store. If the durability acknowledgement fails, clients can retry their requests with the same idempotency token without the risk of overcounting. Upon durability, we send a fire-and-forget request to trigger the rollup for the request counter.

GetCount:

We return the last rolled-up count as a quick point-read operation, accepting the trade-off of potentially delivering a slightly stale count. We also trigger a rollup during the read operation to advance the last-rollup-timestamp, enhancing the performance of subsequent aggregations. This process also self-remediates a stale count if any previous rollups had failed.

With this approach, the counts continually converge to their latest value. Now, let’s see how we scale this approach to millions of counters and thousands of concurrent operations using our Rollup Pipeline.

Rollup Pipeline:

Each Counter-Rollup server operates a rollup pipeline to efficiently aggregate counts across millions of counters. This is where most of the complexity in Counter Abstraction comes in. In the following sections, we will share key details on how efficient aggregations are achieved.

Light-Weight Roll-Up Event: As seen in our Write and Read paths above, every operation on a counter sends a light-weight event to the Rollup server:

rollupEvent: {
"namespace": "my_dataset",
"counter": "counter123"
}

Note that this event does not include the increment. This is only an indication to the Rollup server that this counter has been accessed and now needs to be aggregated. Knowing exactly which specific counters need to be aggregated prevents scanning the entire event dataset for the purpose of aggregations.

In-Memory Rollup Queues: A given Rollup server instance runs a set of in-memory queues to receive rollup events and parallelize aggregations. In the first version of this service, we settled on using in-memory queues to reduce provisioning complexity, save on infrastructure costs, and make rebalancing the number of queues fairly straightforward. However, this comes with the trade-off of potentially missing rollup events in case of an instance crash. For more details, see the “Stale Counts” section in “Future Work.”

Minimize Duplicate Effort: We use a fast non-cryptographic hash like XXHash to ensure that the same set of counters end up on the same queue. Further, we try to minimize the amount of duplicate aggregation work by having a separate rollup stack that chooses to run fewer beefier instances.

Availability and Race Conditions: Having a single Rollup server instance can minimize duplicate aggregation work but may create availability challenges for triggering rollups. If we choose to horizontally scale the Rollup servers, we allow threads to overwrite rollup values while avoiding any form of distributed locking mechanisms to maintain high availability and performance. This approach remains safe because aggregation occurs within an immutable window. Although the concept of now() may differ between threads, causing rollup values to sometimes fluctuate, the counts will eventually converge to an accurate value within each immutable aggregation window.

Rebalancing Queues: If we need to scale the number of queues, a simple Control Plane configuration update followed by a re-deploy is enough to rebalance the number of queues.

      "eventual_counter_config": {             
"queue_config": {
"num_queues" : 8, // change to 16 and re-deploy
...

Handling Deployments: During deployments, these queues shut down gracefully, draining all existing events first, while the new Rollup server instance starts up with potentially new queue configurations. There may be a brief period when both the old and new Rollup servers are active, but as mentioned before, this race condition is managed since aggregations occur within immutable windows.

Minimize Rollup Effort: Receiving multiple events for the same counter doesn’t mean rolling it up multiple times. We drain these rollup events into a Set, ensuring a given counter is rolled up only once during a rollup window.

Efficient Aggregation: Each rollup consumer processes a batch of counters simultaneously. Within each batch, it queries the underlying TimeSeries abstraction in parallel to aggregate events within specified time boundaries. The TimeSeries abstraction optimizes these range scans to achieve low millisecond latencies.

Dynamic Batching: The Rollup server dynamically adjusts the number of time partitions that need to be scanned based on cardinality of counters in order to prevent overwhelming the underlying store with many parallel read requests.

Adaptive Back-Pressure: Each consumer waits for one batch to complete before issuing the rollups for the next batch. It adjusts the wait time between batches based on the performance of the previous batch. This approach provides back-pressure during rollups to prevent overwhelming the underlying TimeSeries store.

Handling Convergence:

In order to prevent low-cardinality counters from lagging behind too much and subsequently scanning too many time partitions, they are kept in constant rollup circulation. For high-cardinality counters, continuously circulating them would consume excessive memory in our Rollup queues. This is where the last-write-timestamp mentioned previously plays a crucial role. The Rollup server inspects this timestamp to determine if a given counter needs to be re-queued, ensuring that we continue aggregating until it has fully caught up with the writes.

Now, let’s see how we leverage this counter type to provide an up-to-date current count in near-realtime.

Experimental: Accurate Global Counter

We are experimenting with a slightly modified version of the Eventually Consistent counter. Again, take the term ‘Accurate’ with a grain of salt. The key difference between this type of counter and its counterpart is that the delta, representing the counts since the last-rolled-up timestamp, is computed in real-time.

And then, currentAccurateCount = lastRollupCount + delta

Aggregating this delta in real-time can impact the performance of this operation, depending on the number of events and partitions that need to be scanned to retrieve this delta. The same principle of rolling up in batches applies here to prevent scanning too many partitions in parallel. Conversely, if the counters in this dataset are accessed frequently, the time gap for the delta remains narrow, making this approach of fetching current counts quite effective.

Now, let’s see how all this complexity is managed by having a unified Control Plane configuration.

Control Plane

The Data Gateway Platform Control Plane manages control settings for all abstractions and namespaces, including the Counter Abstraction. Below, is an example of a control plane configuration for a namespace that supports eventually consistent counters with low cardinality:

"persistence_configuration": [
{
"id": "CACHE", // Counter cache config
"scope": "dal=counter",
"physical_storage": {
"type": "EVCACHE", // type of cache storage
"cluster": "evcache_dgw_counter_tier1" // Shared EVCache cluster
}
},
{
"id": "COUNTER_ROLLUP",
"scope": "dal=counter", // Counter abstraction config
"physical_storage": {
"type": "CASSANDRA", // type of Rollup store
"cluster": "cass_dgw_counter_uc1", // physical cluster name
"dataset": "my_dataset_1" // namespace/dataset
},
"counter_cardinality": "LOW", // supported counter cardinality
"config": {
"counter_type": "EVENTUAL", // Type of counter
"eventual_counter_config": { // eventual counter type
"internal_config": {
"queue_config": { // adjust w.r.t cardinality
"num_queues" : 8, // Rollup queues per instance
"coalesce_ms": 10000, // coalesce duration for rollups
"capacity_bytes": 16777216 // allocated memory per queue
},
"rollup_batch_count": 32 // parallelization factor
}
}
}
},
{
"id": "EVENT_STORAGE",
"scope": "dal=ts", // TimeSeries Event store
"physical_storage": {
"type": "CASSANDRA", // persistent store type
"cluster": "cass_dgw_counter_uc1", // physical cluster name
"dataset": "my_dataset_1", // keyspace name
},
"config": {
"time_partition": { // time-partitioning for events
"buckets_per_id": 4, // event buckets within
"seconds_per_bucket": "600", // smaller width for LOW card
"seconds_per_slice": "86400", // width of a time slice table
},
"accept_limit": "5s", // boundary for immutability
},
"lifecycleConfigs": {
"lifecycleConfig": [
{
"type": "retention", // Event retention
"config": {
"close_after": "518400s",
"delete_after": "604800s" // 7 day count event retention
}
}
]
}
}
]

Using such a control plane configuration, we compose multiple abstraction layers using containers deployed on the same host, with each container fetching configuration specific to its scope.

Provisioning

As with the TimeSeries abstraction, our automation uses a bunch of user inputs regarding their workload and cardinalities to arrive at the right set of infrastructure and related control plane configuration. You can learn more about this process in a talk given by one of our stunning colleagues, Joey Lynch : How Netflix optimally provisions infrastructure in the cloud.

Performance

At the time of writing this blog, this service was processing close to 75K count requests/second globally across the different API endpoints and datasets:

while providing single-digit millisecond latencies for all its endpoints:

Future Work

While our system is robust, we still have work to do in making it more reliable and enhancing its features. Some of that work includes:

  • Regional Rollups: Cross-region replication issues can result in missed events from other regions. An alternate strategy involves establishing a rollup table for each region, and then tallying them in a global rollup table. A key challenge in this design would be effectively communicating the clearing of the counter across regions.
  • Error Detection and Stale Counts: Excessively stale counts can occur if rollup events are lost or if a rollup fails and isn’t retried. This isn’t an issue for frequently accessed counters, as they remain in rollup circulation. This issue is more pronounced for counters that aren’t accessed frequently. Typically, the initial read for such a counter will trigger a rollup, self-remediating the issue. However, for use cases that cannot accept potentially stale initial reads, we plan to implement improved error detection, rollup handoffs, and durable queues for resilient retries.

Conclusion

Distributed counting remains a challenging problem in computer science. In this blog, we explored multiple approaches to implement and deploy a Counting service at scale. While there may be other methods for distributed counting, our goal has been to deliver blazing fast performance at low infrastructure costs while maintaining high availability and providing idempotency guarantees. Along the way, we make various trade-offs to meet the diverse counting requirements at Netflix. We hope you found this blog post insightful.

Stay tuned for Part 3 of Composite Abstractions at Netflix, where we’ll introduce our Graph Abstraction, a new service being built on top of the Key-Value Abstraction and the TimeSeries Abstraction to handle high-throughput, low-latency graphs.

Acknowledgments

Special thanks to our stunning colleagues who contributed to the Counter Abstraction’s success: Joey Lynch, Vinay Chella, Kaidan Fullerton, Tom DeVoe, Mengqing Wang


Netflix’s Distributed Counter Abstraction was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How We Updated ScyllaDB Drivers for Tablets Elasticity

Rethinking ScyllaDB’s shard-aware drivers for our new Raft-based tablets architecture ScyllaDB recently released new versions of our drivers (Rust, Go, Python…) that will provide a nice throughput and latency boost when used in concert with our new tablets architecture. In this blog post, I’d like to share details about how the drivers’ query routing scheme has changed, and why it’s now more beneficial than ever to use ScyllaDB drivers instead of Cassandra drivers. Before we dive into that, let’s take a few steps back. How do ScyllaDB drivers work? And what’s meant by “tablets”? When we say “drivers,” we’re talking about the libraries that your application uses to communicate with ScyllaDB. ScyllaDB drivers use the CQL protocol, which is inherited from Cassandra (ScyllaDB is compatible with Cassandra as well as DynamoDB). It’s possible to use Cassandra drivers with ScyllaDB, but we recommend using ScyllaDB’s drivers for optimal performance. Some of these drivers were forked from Cassandra drivers. Others, such as our Rust driver, were built from the ground up. The interesting thing about ScyllaDB drivers is that they are “shard-aware.” What does that mean? First, it’s important to understand that ScyllaDB is built with a shard-per-core architecture: every node is a multithreaded process whose every thread performs some relatively independent work. Each piece of data stored in a ScyllaDB database is bound to a specific CPU core. ScyllaDB shard-awareness allows client applications to perform load balancing following our shard-per-core architecture. Shard-aware drivers establish one connection per shard, allowing them to load balance and route queries directly to the single CPU core owning it. This optimization further optimizes latency and is also very friendly towards CPU caches. Before we get into how these shard-aware drivers support tablets, let’s take a brief look at ScyllaDB’s new “tablets” replication architecture in case you’re not yet familiar with it. We replaced vNode-based replication with tablets-based replication to enable more flexible load balancing. Each table is split into smaller fragments (“tablets”) to evenly distribute data and load across the system. Tablets are replicated to multiple ScyllaDB nodes for high availability and fault tolerance. This new approach separates token ownership from servers – ultimately allowing ScyllaDB to scale faster and in parallel. For more details on tablets, see these blog posts. How We’ve Modified Shard-Aware Drivers for Tablets With all that defined, let’s move to how we’ve modified our shard-aware drivers for tablets. Specifically, let’s look at how our query routing has changed. Before tablets Without tablets, when you made a select, insert, update, or delete statement, this query was routed to a specific node. We determined this routing by calculating the hashes of the partition key. The ring represented the data, the hashes were the partition keys, and this ring was split into many vNodes. When data was replicated, one piece of the data was stored on many nodes. The problem with this approach is that this mapping was fairly static. We couldn’t move the vNodes around and the driver didn’t expect this mapping to change. It was all very static. Users wanted to add and remove capacity faster than this vNodes paradigm allowed. With tablets With tablets, ScyllaDB now maintains metadata about the tablets. The tablet contains information about which table each tablet represents and the range of data it stores. Each tablet contains information about which replicas hold the data within the specified start and end range. Interestingly, we decided to have the driver start off without any knowledge of where data is located – even though that information is stored in the tablet mapping table. We saw that scanning that mapping would cause a performance hit for large deployments. Instead, we took a different approach: we learn the routing information “lazily” and update it dynamically. For example, assume that the driver wants to execute a query. The driver will make a request to some random node and shard – this initial request is guessing, so it might be the incorrect node and shard. All the nodes know which node owns which data. Even if the driver contacts the wrong node, it will forward the request to the correct node. It will also tell the driver which node owns that data, and the driver will update its tablet metadata to include that information. The next time the driver wants to access data on that tablet, it will reference this tablet metadata and send the query directly to the correct node. If the tablet containing that data moves (e.g., because a new node is added or removed), the driver will continue sending statements to the old replicas – but since at least one of them is not currently a replica, it will return the correct information about this tablet. The alternative would be to refresh all of the tablet metadata periodically (maybe every five seconds), which could place significant strain on the system. Another benefit of this approach: the driver doesn’t have to store metadata about all tablets. For example, if the driver only queries one table, it will only persist information about the tablets for that specific table. Ultimately, this approach enables very fast startup, even with 100K entries in the tablets table. Why use tablet-aware drivers When using ScyllaDB tablets, it’s more important than ever to use ScyllaDB shard-aware – and now also tablet-aware – drivers instead of Cassandra drivers. The existing drivers will still work, but they won’t work as efficiently because they won’t know where each tablet is located. Using the latest ScyllaDB drivers should provide a nice throughput and latency boost.    

The Reality of AI and Cloud Adoption

When you want to understand where technology is really heading, look past the headlines and listen to the practitioners. The recent Apache Cassandra® user survey offers exactly that kind of ground truth, with 144 power users sharing insights that cut through the market noise about AI adoption and...

Making Effective Partitions for ScyllaDB Data Modeling

Learn how to ensure your tables are perfectly partitioned to satisfy your queries  – in this excerpt from the book “ScyllaDB in Action.” Editor’s note We’re thrilled to share the following excerpt from Bo Ingram’s informative – and fun! – new book on ScyllaDB: ScyllaDB in Action. It’s available now via Manning and Amazon. You can also access a 3-chapter excerpt for free, compliments of ScyllaDB. Get the first 3 book chapters, free You might have already experienced Bo’s expertise and engaging communication style in his blog How Discord Stores Trillions of Messages or ScyllaDB Summit talks How Discord Migrated Trillions of Messages from Cassandra to ScyllaDB and  So You’ve Lost Quorum: Lessons From Accidental Downtime  If not, you should 😉 And if you want to learn more from Bo, join him at our upcoming Masterclass: Data Modeling for Performance Masterclass. We’ve ordered boxes of print books and will be giving them out! Join Bo at the “Data Modeling for Performance” Masterclass The following is an excerpt from Chapter 3; it’s reprinted here with permission of the publisher. Read more from Chapter 3 here *** When analyzing the queries your application needs, you identified several tables for your schema. The next step in schema design is to ask how can these tables be uniquely identified and partitioned to satisfy the queries?. The primary key contains the row’s partition key — you learned in chapter 2 that the primary key determines a row’s uniqueness. Therefore, before you can determine the partition key, you need to know the primary key. PRIMARY KEYS First, you should check to see if what you’re trying to store contains a property that is popularly used for its uniqueness. Cars, for example, have a vehicle identification number, that’s unique per car. Books (including this one!) have an ISBN (international standard book number) to uniquely identify them. There’s no international standard for identifying an article or an author, so you’ll need to think a little harder to find the primary and partition keys. ScyllaDB does provide support for generating unique identifiers, but they come with some drawbacks that you’ll learn about in the next chapter. Next, you can ask if any fields can be combined to make a good primary key. What could you use to determine a unique article? The title might be reused, especially if you have unimaginative writers. The content would ideally be unique per article, but that’s a very large value for a key. Perhaps you could use a combination of fields — maybe date, author, and title? That probably works, but I find it’s helpful to look back at what you’re trying to query. When your application runs the Read Article query, it’s trying to read only a single article. Whatever is executing that query, probably a web server responding to the contents of a URL, is trying to load that article, so it needs information that can be stored in a URL. Isn’t it obnoxious when you go to paste a link somewhere and the link feels like it’s a billion characters long? To load an article, you don’t want to have to keep track of the author ID, title, and date. That’s why using an ID as a primary key is often a strong choice. Providing a unique identifier satisfies the uniqueness requirement of a primary key, and they can potentially encode other information inside them, such as time, which can be used for relative ordering. You’ll learn more about potential types of IDs in the next chapter when you explore data types. What uniquely identifies an author? You might think an email, but people sometimes change their email addresses. Supplying your own unique identifier works well again here. Perhaps it’s a numeric ID, or maybe it’s a username, but authors, as your design stands today, need extra information to differentiate them from other authors within your database. Article summaries, like the other steps you’ve been through, are a little more interesting. For the various article summary tables, if you try to use the same primary key as articles, an ID, you’re going to run into trouble. If an ID alone makes an article unique in the articles table, then presumably it suffices for the index tables. That turns out to not be the case. An ID can still differentiate uniqueness, but you also want to query by at least the partition key to have a performant query, and if an ID is the partition key, that doesn’t satisfy the use cases for querying by author, date, or score. Because the partition key is contained within the primary key, you’ll need to include those fields in your primary key (figure 3.12). Taking article_summaries_by_author, your primary key for that field would become author and your article ID. Similarly, the other two tables would have the date and the article ID for article_summaries_by_date, and article_summaries_by_score would use the score and the article ID for its primary key.   Figure 3.12 You sometimes need to add fields to an already-unique primary key to use them in a partition key, especially when denormalizing data. With your primary keys figured out, you can move forward to determining your partition keys and potentially adjusting your primary keys. PARTITION KEYS You learned in the last chapter that the first entry in the primary key serves as the partition key for the row (you’ll see later how to build a composite primary key). A good partition key distributes data evenly across the cluster and is used by the queries against that table. It stores a relatively small amount of data (a good rule of thumb is that 100 MB is a large partition, but it depends on the use case). I’ve listed the tables and their primary key; knowing the primary keys, you can now rearrange them to specify your partition keys. What would be a good partition key for each of these tables?   Table 3.3 Each table has a primary key from which the partition key is extracted For the articles and authors tables, it’s very straightforward. There is one column in the primary key; therefore, the partition key is the only column. If only everything was that easy! TIP: By only having the ID as the primary key and the partition key, you can look up rows only by their ID. A query where you want to get multiple rows wouldn’t work with this approach, because you’d be querying across partitions, bringing in extra nodes and hampering performance. Instead, you want to query by a partition key that would contain multiple rows, as you’ll do with article summaries. The article summaries tables, however, present you with a choice. Given a primary key of an ID and an author, for article_summaries_by_author, which should be the partition key? If you choose ID, Scylla will distribute the rows for this table around the cluster based on the ID of the article. This distribution would mean that if you wanted to load all articles by their author, the query would have to hit nodes all across the cluster. That behavior is not efficient. If you partitioned your data by the author, a query to find all articles written by a given author would hit only the nodes that store that partition, because you’re querying within that partition (figure 3.13). You almost always want to query by at least the partition key — it is critical for good performance. Because the use case for this table is to find articles by who wrote them, the author is the choice for the partition key. This distribution makes your primary key author_id, id because the partition key is the first entry in the primary key. Figure 3.13 The partition key for each table should match the queries you previously determined to achieve the best performance. article_summaries_by_date and article_summaries_by_score, in what might be the least surprising statement of the book, present a similar choice as article_summaries_by_author with a similar solution. Because you’re querying article_summaries_by_date by the date of the article, you want the data partitioned by it as well, making the primary key date, id, or score, id for article_summaries_by_score. Tip: Right-sizing partition keys:  If a partition key is too large, data will be unevenly distributed across the cluster. If a partition key is too small, range scans that load multiple rows might need several queries to load the desired amount of data. Consider querying for articles by date — if you publish one article a week but your partition key is per day, then querying for the most recent articles will require queries with no results six times out of seven. Partitioning your data is a balancing exercise. Sometimes, it’s easy. Author is a natural partition key for articles by author, whereas something like date might require some finessing to get the best performance in your application. Another problem to consider is one partition taking an outsized amount of traffic — more than the node can handle. In the next chapters, as you refine your design, you’ll learn how to adjust your partition key to fit your database just right. After looking closely at the tables, your primary keys are now ordered correctly to partition the data. For the article summaries tables, your primary keys are divided into two categories — partition key and not-partition key. The leftover bit has an official name and performs a specific function in your table. Let’s take a look at the not-partition keys — clustering keys. CLUSTERING KEYS A clustering key is the non-partition key part of a table’s primary key that defines the ordering of the rows within the table. In the previous chapter, when you looked at your example table, you noticed that Scylla had filled in the table’s implicit configuration, ordering the non-partition key columns — the clustering keys — by ascending order. CREATE TABLE initial.food_reviews ( restaurant text, ordered_at timestamp, food text, review text, PRIMARY KEY (restaurant, ordered_at, food) ) WITH CLUSTERING ORDER BY (ordered_at ASC, food ASC); Within the partition, each row is sorted by its time ordered and then by the name of the food, each in an ascending sort (so earlier order times and alphabetically by food). Left to its own devices, ScyllaDB always defaults to the ascending natural order of the clustering keys. When creating tables, if you have clustering keys, you need to be intentional about specifying their order to make sure that you’re getting results that match your requirements. Consider article_summaries_by_author. The purpose of that table is to retrieve the articles for a given author, but do you want to see their oldest articles first or their newest ones? By default, Scylla is going to sort the table by id ASC, giving you the old articles first. When creating your summary tables, you’ll want to specify their sort orders so that you get the newest articles first — specifying id DESC. You now have tables defined, and with those tables, the primary keys and partition keys are set to specify uniqueness, distribute the data, and provide an ordering based on your queries. Your queries, however, are looking for more than just primary keys — authors have named, and articles have titles and content. Not only do you need to store these fields, you need to specify the structure of that data. To accomplish this definition, you’ll use data types. In the next chapter, you’ll learn all about them and continue practicing query-first design.

Maximizing Self-Managed Apache Cassandra with Kubernetes

Today’s infrastructure architects need to manage large-scale, enterprise Apache Cassandra® clusters to meet emerging AI data needs. To simplify that layer, we introduced DataStax Mission Control, an all-in-one platform for distributed database operations, observability, and deployment. As Mission...

Database Internals: Optimizing Memory Management

How databases can get better performance by optimizing memory management The following blog post is an excerpt from Chapter 3 of the Database Performance at Scale book, which is available for free. This book sheds light on often overlooked factors that impact database performance at scale. Get the complete book, free Memory management is the central design point in all aspects of programming. Even comparing programming languages to one another always involves discussions about the way programmers are supposed to handle memory allocation and freeing. No wonder memory management design affects the performance of a database so much. Applied to database engineering, memory management typically falls into two related but independent subsystems: memory allocation and cache control. The former is in fact a very generic software engineering issue, so considerations about it are not extremely specific to databases (though they are crucial and are worth studying). Opposite to that, the latter topic is itself very broad, affected by the usage details and corner cases. Respectively, in the database world, cache control has its own flavor. Allocation The manner in which programs or subsystems allocate and free memory lies at the core of memory management. There are several approaches worth considering. As illustrated by Figure 3-2, a so-called “log-structured allocation” is known from filesystems where it puts sequential writes to a circular log on the persisting storage and handles updates the very same way. At some point, this filesystem must reclaim blocks that became obsolete entries in the log area to make some more space available for future writes. In a naive implementation, unused entries are reclaimed by rereading and rewriting the log from scratch; obsolete blocks are then skipped in the process. Figure 3-2: A log-structured allocation puts sequential writes to a circular log on the persisting storage and handles updates the very same way A memory allocator for naive code can do something similar. In its simplest form, it would allocate the next block of memory by simply advancing a next-free pointer. Deallocation would just need to mark the allocated area as freed. One advantage of this approach is the speed of allocation. Another is the simplicity and efficiency of deallocation if it happens in FIFO order or affects the whole allocation space. Stack memory allocations are later released in the order that’s reverse to allocation, so this is the most prominent and the most efficient example of such an approach. Using linear allocators as general-purpose allocators can be more problematic because of the difficulty of space reclamation. To reclaim space, it’s not enough to just mark entries as free. This leads to memory fragmentation, which in turn outweighs the advantages of linear allocation. So, as with the filesystem, the memory must be reclaimed so that it only contains allocated entries and the free space can be used again. Reclamation requires moving allocated entries around – a process that changes and invalidates their previously known addresses. In naive code, the locations of references to allocated entries (addresses stored as pointers) are unknown to the allocator. Existing references would have to be patched to make the allocator action transparent to the caller; that’s not feasible for a general-purpose allocator like malloc. Logging allocator use is tied to the programming language selection. Some RTTIs, like C++, can greatly facilitate this by providing move-constructors. However, passing pointers to libraries that are outside of your control (e.g., glibc) would still be an issue. Another alternative is adopting a strategy of pool allocators, which provide allocation spaces for allocation entries of a fixed size (see Figure 3-3). By limiting the allocation space that way, fragmentation can be reduced. A number of general-purpose allocators use pool allocators for small allocations. In some cases, those application spaces exist on a per-thread basis to eliminate the need for locking and improve CPU cache utilization. Figure 3-3: Pool allocators provide allocation spaces for allocation entries of a fixed size. Fragmentation is reduced by limiting the allocation space This pool allocation strategy provides two core benefits. First, it saves you from having to search for available memory space. Second, it alleviates memory fragmentation because it pre-allocates in memory a cache for use with a collection of object sizes. Here’s how it works to achieve that: The region for each of the sizes has fixed-size memory chunks that are suitable for the contained objects, and those chunks are all tracked by the allocator. When it’s time for the allocator to actually allocate memory for a certain type of data object, it’s typically possible to use a free slot (chunk) within one of the existing memory slabs. ( Note: We are using the term “slab” to mean one or more contiguous memory pages that contain pre-allocated chunks of memory.) When it’s time for the allocator to free the object’s memory, it can simply move that slot over to the containing slab’s list of unused/free memory slots. That memory slot (or some other free slot) will be removed from the list of free slots whenever there’s a call to create an object of the same type (or a call to allocate memory of the same size). The best allocation approach to pick heavily depends upon the usage scenario. One great benefit of a log-structured approach is that it handles fragmentation of small sub-pools in a more efficient way. Pool allocators, on the other hand, generate less background load on the CPU because of the lack of compacting activity. Cache control When it comes to memory management in a software application that stores lots of data on disk, you cannot overlook the topic of cache control. Caching is always a must in data processing, and it’s crucial to decide what and where to cache. If caching is done at the I/O level, for both read/write and mmap, caching can become the responsibility of the kernel. The majority of the system’s memory is given over to the page cache. The kernel decides which pages should be evicted when memory runs low, when pages need to be written back to disk, and controls read-ahead. The application can provide some guidance to the kernel using the madvise(2) and fadvise(2) system calls. The main advantage of letting the kernel control caching is that great effort has been invested by the kernel developers over many decades into tuning the algorithms used by the cache. Those algorithms are used by thousands of different applications and are generally effective. The disadvantage, however, is that these algorithms are general-purpose and not tuned to the application. The kernel must guess how the application will behave next. Even if the application knows differently, it usually has no way to help the kernel guess correctly. This results in the wrong pages being evicted, I/O scheduled in the wrong order, or read-ahead scheduled for data that will not be consumed in the near future. Next, doing the caching at the I/O level interacts with the topic often referred to as IMR – in memory representation. No wonder that the format in which data is stored on disk differs from the form the same data is allocated in memory as objects. The simplest reason why it’s not the same is byte-ordering. With that in mind, if the data is cached once it’s read from the disk, it needs to be further converted or parsed into the object used in memory. This can be a waste of CPU cycles, so applications may choose to cache at the object level. Choosing to cache at the object level affects a lot of other design points. With that, the cache management is all on the application side including cross-core synchronization, data coherence, invalidation, etc. Next, since objects can be (and typically are) much smaller than the average I/O size, caching millions and billions of those objects requires a collection selection that can handle it (we’ll get to this in a follow-up blog). Finally, caching on the object level greatly affects the way I/O is done. Read about ScyllaDB’s Caching

NoSQL Data Modeling: Application Design Before Schema Design

Learn how to implement query-first design to build a ScyllaDB schema for a sample application  – in this excerpt from the book “ScyllaDB in Action.” You might have already experienced Bo’s expertise and engaging communication style in his blog How Discord Stores Trillions of Messages or ScyllaDB Summit talks How Discord Migrated Trillions of Messages from Cassandra to ScyllaDB and  So You’ve Lost Quorum: Lessons From Accidental Downtime  If not, you should 😉 And if you want to learn more from Bo, join him at our upcoming Masterclass: Data Modeling for Performance Masterclass. We’ve ordered boxes of print books and will be giving them out! Join Bo at the “Data Modeling for Performance” Masterclass The following is an excerpt from Chapter 3; it’s reprinted here with permission of the publisher. *** When designing a database schema, you need to create a schema that is synergistic with your database. If you consider Scylla’s goals as a database, it wants to distribute data across the cluster to provide better scalability and fault tolerance. Spreading the data means queries involve multiple nodes, so you want to design your application to make queries that use the partition key, minimizing the nodes involved in a request. Your schema needs to fit these constraints: It needs to distribute data across the cluster It should also query the minimal amount of nodes for your desired consistency level There’s some tension in these constraints — your schema wants to spread data across the cluster to balance load between the nodes, but you want to minimize the number of nodes required to serve a query. Satisfying these constraints can be a balancing act — do you have smaller partitions that might require more queries to aggregate together, or do you have larger partitions that require fewer queries, but spread it potentially unevenly across the cluster? In figure 3.1, you can see the cost of a query that utilizes the partition key and queries across the minimal amount of nodes versus one that doesn’t use the partition key, necessitating scanning each node for matching data. Using the partition key in your query allows the coordinator — the node servicing the request — to direct queries to nodes that own that partition, lessening the load on the cluster and returning results faster. Figure 3.1 Using the partition key minimizes the number of nodes required to serve the request. The aforementioned design constraints are each related to queries. You want your data to be spread across the cluster so that your queries distribute the load amongst multiple nodes. Imagine if all of your data was clustered on a small subset of nodes in your cluster. Some nodes would be working quite hard, whereas others might not be taking much traffic. If some of those heavily utilized nodes became overwhelmed, you could suffer quite degraded performance, as because of the imbalance, many queries could be unable to complete. However, you also want to minimize the number of nodes hit per query to minimize the work your query needs to do; a query that uses all nodes in a very large cluster would be very inefficient. These query-centric constraints necessitate a query-centric approach to design. How you query Scylla is a key component of its performance, and since you need to consider the impacts of your queries across multiple dimensions, it’s critical to think carefully about how you query Scylla. When designing schemas in Scylla, it’s best to practice an approach called query-first-design, where you focus on the queries your application needs to make and then build your database schema around that. In Scylla, you structure your data based on how you want to query it — query-first design helps you. Your query-first design toolbox In query-first design, you take a set of application requirements and ask yourself a series of questions that guide you through translating the requirements into a schema in ScyllaDB. Each of these questions builds upon the next, iteratively guiding you through building an effective ScyllaDB schema. These questions include the following: What are the requirements for my application? What are the queries my application needs to run to meet these requirements? What tables are these queries querying? How can these tables be uniquely identified and partitioned to satisfy the queries? What data goes inside these tables? Does this design match the requirements? Can this schema be improved? This process is visualized in (figure 3.2), showing how you start with your application requirements and ask yourself a series of questions, guiding you from your requirements to the queries you need to run to ultimately, a fully designed ScyllaDB schema ready to store data effectively. Figure 3.2 Query-first design guides you through taking application requirements and converting them to a ScyllaDB schema. You begin with requirements and then use the requirements to identify the queries you need to run. These queries are seeking something — those “somethings” need to be stored in tables. Your tables need to be partitioned to spread data across the cluster, so you determine that partitioning to stay within your requirements and database design constraints. You then specify the fields inside each table, filling it out. At this point, you can check two things: Does the design match the requirements? Can it be improved? This up-front design is important because in ScyllaDB changing your schema to match new use cases can be a high-friction operation. While Scylla supports new query patterns via some of its features (which you’ll learn about in chapter 7), these come at an additional performance cost, and if they don’t fit your needs, might necessitate a full manual copy of data into a new table. It’s important to think carefully about your design: not only what it needs to be, but also what it could be in the future. You start by extracting the queries from your requirements and expanding your design until you have a schema that fits both your application and ScyllaDB. To practice query-first design in Scylla, let’s take the restaurant review application introduced at the beginning of the chapter and turn it into a ScyllaDB schema. The sample application requirements In the last chapter, you took your restaurant reviews and stored them inside ScyllaDB. You enjoyed working with the database, and as you went to more places, you realized you could combine your two great loves — restaurant reviews and databases (if these aren’t your two great loves, play along with me). You decide to build a website to share your restaurant reviews. Because you already have a ScyllaDB cluster, you choose to use that (this is a very different book if you pick otherwise) as the storage for your website. The first step to query-first design is identifying the requirements for your application, as seen in figure 3.4. Figure 3.4 You begin query-first design by determining your application’s requirements. After ruminating on your potential website, you identify the features it needs, and most importantly, you give it a name — Restaurant Reviews. It does what it says! Restaurant Reviews has the following initial requirements: Authors post articles to the website Users view articles to read restaurant reviews Articles contain a title, an author, a score, a date, a gallery of images, the review text, and the restaurant A review’s score is between 1 and 10 The home page contains a summary of articles sorted by most recent, showing the title, author name, score, and one image The home page links to articles Authors have a dedicated page containing summaries of their articles Authors have a name, bio, and picture Users can view a list of article summaries sorted by the highest review score You have a hunch that as time goes by, you’ll want to add more features to this application and use those features to learn more about ScyllaDB. For now, these features give you a base to practice decomposing requirements and building your schema — let’s begin! Determining the queries Next, you ask what are the queries my application needs to run to meet these requirements?, as seen in figure 3.5 Your queries drive how you design your database schema; therefore, it is critical to understand your queries at the beginning of your design. Figure 3.5 Next, you take your requirements and use them to determine your application’s queries. For identifying queries, you can use the familiar CRUD operations — create, read, update, and delete — as verbs. These queries will act on nouns in your requirements, such as authors or articles. Occasionally, you’ll want to filter your queries — you can notate this filtering with by followed by the filter condition. For example, if your app needed to load events on a given date, you might use a Read Events by Date query. If you take a look at your requirements, you’ll see several queries you’ll need. TIP:  These aren’t the actual queries you’ll run; those are written in CQL, as seen in chapter 2, and look more like SELECT * FROM your_cool_table WHERE your_awesome_primary_key = 12;. These are descriptions of what you’ll need to query — in a later chapter when you finish your design, you’ll turn these into actual CQL queries. The first requirement is “Authors post articles to the website,” which sounds an awful lot like a process that would involve inserting an article into a database. Because you insert articles in the database via a query, you will need a Create Article statement. You might be asking at this point — what is an article? Although other requirements discuss these fields, you should skip that concern for the moment. Focus first on what queries you need to run, and then you’ll later figure out the needed fields. The second requirement is “Users view articles to read restaurant reviews.” Giving users unfettered access to the database is a security no-go, so the app needs to load an article to display to a user. This functionality suggests a Read Article query (which is different than the user perusing the article), which you can use to retrieve an article for a user. The following two requirements refer to the data you need to store and not a novel way to access them: Articles contain a title, an author, a score, a date, a gallery of images, the review text, and the restaurant A review’s score is between 1 and 10 Articles need certain fields, and each article is associated with a review score that fits within specified parameters. You can save these requirements for later when you fill out what fields are needed in each table. The next relevant requirement says “The home page contains a summary of articles sorted by most recent, showing the title, author name, score, and one image.” The home page shows article summaries, which you’ll need to load by their date, sorted by most recent– Read Article Summaries by Date. Article summaries, at first glance, look a lot like articles. Because you’re querying different data, and you also need to retrieve summaries by their time, you should consider them as different queries. Querying for an article: loads a title, author, score, date, a gallery of images, the review text, and the restaurant retrieves one specific article On the other hand, loading the most recent article summaries: loads only the title, author name, score, and one image loads several summaries, sorted by their publishing date Perhaps they can run against the same table, but you can see if that’s the case further on in the exercise. When in doubt, it’s best to not over-consolidate. Be expansive in your designs, and if there’s duplication, you can reduce it when refining your designs. As you work through implementing your design, you might discover reasons for what seems to be unnecessarily duplicated in one step to be a necessary separation later. Following these requirements is “the home page links to articles”, which makes explicit that the article summaries link to the article — you’ll look closer at this one when you determine what fields you need. The next two requirements are about authors. The website will contain a page for each author — presumably only you at the moment, but you have visions of a media empire. This author page will contain article summaries for each author — meaning you’ll need to Read Article Summaries by Author. In the last requirement, there’s data that each author has. You can study the specifics of these in a moment, but it means that you’ll need to read information about the author, so you will need a Read Author query. For the last requirement — ”Users can view a list of article summaries sorted by the highest review score” — you’ll need a way to surface article summaries sorted by their scores. This approach requires a Read Article Summaries by Score. TIP: What would make a good partition key for reading articles sorted by score? It’s a tricky problem; you’ll learn how to attack it shortly. Having analyzed the requirements, you’ve determined six queries your schema needs to support: Create Article Read Article Read Article Summaries by Date Read Article Summaries by Author Read Article Summaries by Score Read Author You might notice a problem here — where do article summaries and authors get created? How can you read them if nothing makes them exist? Requirement lists often have implicit requirements — because you need to read article summaries, they have to be created somehow. Go ahead and add a Create Article Summary and Create Author query to your list. You now have eight queries from the requirements you created, listed in table 3.1. There’s a joke that asks “How do you draw an owl?” — you draw a couple of circles, and then you draw the rest of the owl. Query-first design sometimes feels similar. You need to map your queries to a database schema that is not only effective in meeting the application’s requirements but is performant and uses ScyllaDB’s benefits and features. Drawing queries out of requirements is often straightforward, whereas designing the schema from those queries requires balancing both application and database concerns. Let’s take a look at some techniques you can apply as you design a database schema…