Introducing Netflix’s Key-Value Data Abstraction Layer

Vidhya Arvind, Rajasekhar Ummadisetty, Joey Lynch, Vinay Chella

Introduction

At Netflix our ability to deliver seamless, high-quality, streaming experiences to millions of users hinges on robust, global backend infrastructure. Central to this infrastructure is our use of multiple online distributed databases such as Apache Cassandra, a NoSQL database known for its high availability and scalability. Cassandra serves as the backbone for a diverse array of use cases within Netflix, ranging from user sign-ups and storing viewing histories to supporting real-time analytics and live streaming.

Over time as new key-value databases were introduced and service owners launched new use cases, we encountered numerous challenges with datastore misuse. Firstly, developers struggled to reason about consistency, durability and performance in this complex global deployment across multiple stores. Second, developers had to constantly re-learn new data modeling practices and common yet critical data access patterns. These include challenges with tail latency and idempotency, managing “wide” partitions with many rows, handling single large “fat” columns, and slow response pagination. Additionally, the tight coupling with multiple native database APIs — APIs that continually evolve and sometimes introduce backward-incompatible changes — resulted in org-wide engineering efforts to maintain and optimize our microservice’s data access.

To overcome these challenges, we developed a holistic approach that builds upon our Data Gateway Platform. This approach led to the creation of several foundational abstraction services, the most mature of which is our Key-Value (KV) Data Abstraction Layer (DAL). This abstraction simplifies data access, enhances the reliability of our infrastructure, and enables us to support the broad spectrum of use cases that Netflix demands with minimal developer effort.

In this post, we dive deep into how Netflix’s KV abstraction works, the architectural principles guiding its design, the challenges we faced in scaling diverse use cases, and the technical innovations that have allowed us to achieve the performance and reliability required by Netflix’s global operations.

The Key-Value Service

The KV data abstraction service was introduced to solve the persistent challenges we faced with data access patterns in our distributed databases. Our goal was to build a versatile and efficient data storage solution that could handle a wide variety of use cases, ranging from the simplest hashmaps to more complex data structures, all while ensuring high availability, tunable consistency, and low latency.

Data Model

At its core, the KV abstraction is built around a two-level map architecture. The first level is a hashed string ID (the primary key), and the second level is a sorted map of a key-value pair of bytes. This model supports both simple and complex data models, balancing flexibility and efficiency.

HashMap<String, SortedMap<Bytes, Bytes>>

For complex data models such as structured Records or time-ordered Events, this two-level approach handles hierarchical structures effectively, allowing related data to be retrieved together. For simpler use cases, it also represents flat key-value Maps (e.g. id → {"" → value}) or named Sets (e.g.id → {key → ""}). This adaptability allows the KV abstraction to be used in hundreds of diverse use cases, making it a versatile solution for managing both simple and complex data models in large-scale infrastructures like Netflix.

The KV data can be visualized at a high level, as shown in the diagram below, where three records are shown.

message Item (   
Bytes key,
Bytes value,
Metadata metadata,
Integer chunk
)

Database Agnostic Abstraction

The KV abstraction is designed to hide the implementation details of the underlying database, offering a consistent interface to application developers regardless of the optimal storage system for that use case. While Cassandra is one example, the abstraction works with multiple data stores like EVCache, DynamoDB, RocksDB, etc…

For example, when implemented with Cassandra, the abstraction leverages Cassandra’s partitioning and clustering capabilities. The record ID acts as the partition key, and the item key as the clustering column:

The corresponding Data Definition Language (DDL) for this structure in Cassandra is:

CREATE TABLE IF NOT EXISTS <ns>.<table> (
id text,
key blob,
value blob,
value_metadata blob,

PRIMARY KEY (id, key))
WITH CLUSTERING ORDER BY (key <ASC|DESC>)

Namespace: Logical and Physical Configuration

A namespace defines where and how data is stored, providing logical and physical separation while abstracting the underlying storage systems. It also serves as central configuration of access patterns such as consistency or latency targets. Each namespace may use different backends: Cassandra, EVCache, or combinations of multiple. This flexibility allows our Data Platform to route different use cases to the most suitable storage system based on performance, durability, and consistency needs. Developers just provide their data problem rather than a database solution!

In this example configuration, the ngsegment namespace is backed by both a Cassandra cluster and an EVCache caching layer, allowing for highly durable persistent storage and lower-latency point reads.

"persistence_configuration":[                                                   
{
"id":"PRIMARY_STORAGE",
"physical_storage": {
"type":"CASSANDRA",
"cluster":"cassandra_kv_ngsegment",
"dataset":"ngsegment",
"table":"ngsegment",
"regions": ["us-east-1"],
"config": {
"consistency_scope": "LOCAL",
"consistency_target": "READ_YOUR_WRITES"
}
}
},
{
"id":"CACHE",
"physical_storage": {
"type":"CACHE",
"cluster":"evcache_kv_ngsegment"
},
"config": {
"default_cache_ttl": 180s
}
}
]

Key APIs of the KV Abstraction

To support diverse use-cases, the KV abstraction provides four basic CRUD APIs:

PutItems — Write one or more Items to a Record

The PutItems API is an upsert operation, it can insert new data or update existing data in the two-level map structure.

message PutItemRequest (
IdempotencyToken idempotency_token,
string namespace,
string id,
List<Item> items
)

As you can see, the request includes the namespace, Record ID, one or more items, and an idempotency token to ensure retries of the same write are safe. Chunked data can be written by staging chunks and then committing them with appropriate metadata (e.g. number of chunks).

GetItems — Read one or more Items from a Record

The GetItemsAPI provides a structured and adaptive way to fetch data using ID, predicates, and selection mechanisms. This approach balances the need to retrieve large volumes of data while meeting stringent Service Level Objectives (SLOs) for performance and reliability.

message GetItemsRequest (
String namespace,
String id,
Predicate predicate,
Selection selection,
Map<String, Struct> signals
)

The GetItemsRequest includes several key parameters:

  • Namespace: Specifies the logical dataset or table
  • Id: Identifies the entry in the top-level HashMap
  • Predicate: Filters the matching items and can retrieve all items (match_all), specific items (match_keys), or a range (match_range)
  • Selection: Narrows returned responses for example page_size_bytes for pagination, item_limit for limiting the total number of items across pages and include/exclude to include or exclude large values from responses
  • Signals: Provides in-band signaling to indicate client capabilities, such as supporting client compression or chunking.

The GetItemResponse message contains the matching data:

message GetItemResponse (
List<Item> items,
Optional<String> next_page_token
)
  • Items: A list of retrieved items based on the Predicate and Selection defined in the request.
  • Next Page Token: An optional token indicating the position for subsequent reads if needed, essential for handling large data sets across multiple requests. Pagination is a critical component for efficiently managing data retrieval, especially when dealing with large datasets that could exceed typical response size limits.

DeleteItems — Delete one or more Items from a Record

The DeleteItems API provides flexible options for removing data, including record-level, item-level, and range deletes — all while supporting idempotency.

message DeleteItemsRequest (
IdempotencyToken idempotency_token,
String namespace,
String id,
Predicate predicate
)

Just like in the GetItems API, the Predicate allows one or more Items to be addressed at once:

  • Record-Level Deletes (match_all): Removes the entire record in constant latency regardless of the number of items in the record.
  • Item-Range Deletes (match_range): This deletes a range of items within a Record. Useful for keeping “n-newest” or prefix path deletion.
  • Item-Level Deletes (match_keys): Deletes one or more individual items.

Some storage engines (any store which defers true deletion) such as Cassandra struggle with high volumes of deletes due to tombstone and compaction overhead. Key-Value optimizes both record and range deletes to generate a single tombstone for the operation — you can learn more about tombstones in About Deletes and Tombstones.

Item-level deletes create many tombstones but KV hides that storage engine complexity via TTL-based deletes with jitter. Instead of immediate deletion, item metadata is updated as expired with randomly jittered TTL applied to stagger deletions. This technique maintains read pagination protections. While this doesn’t completely solve the problem it reduces load spikes and helps maintain consistent performance while compaction catches up. These strategies help maintain system performance, reduce read overhead, and meet SLOs by minimizing the impact of deletes.

Complex Mutate and Scan APIs

Beyond simple CRUD on single Records, KV also supports complex multi-item and multi-record mutations and scans via MutateItems and ScanItems APIs. PutItems also supports atomic writes of large blob data within a single Item via a chunked protocol. These complex APIs require careful consideration to ensure predictable linear low-latency and we will share details on their implementation in a future post.

Design Philosophies for reliable and predictable performance

Idempotency to fight tail latencies

To ensure data integrity the PutItems and DeleteItems APIs use idempotency tokens, which uniquely identify each mutative operation and guarantee that operations are logically executed in order, even when hedged or retried for latency reasons. This is especially crucial in last-write-wins databases like Cassandra, where ensuring the correct order and de-duplication of requests is vital.

In the Key-Value abstraction, idempotency tokens contain a generation timestamp and random nonce token. Either or both may be required by backing storage engines to de-duplicate mutations.

message IdempotencyToken (
Timestamp generation_time,
String token
)

At Netflix, client-generated monotonic tokens are preferred due to their reliability, especially in environments where network delays could impact server-side token generation. This combines a client provided monotonic generation_time timestamp with a 128 bit random UUID token. Although clock-based token generation can suffer from clock skew, our tests on EC2 Nitro instances show drift is minimal (under 1 millisecond). In some cases that require stronger ordering, regionally unique tokens can be generated using tools like Zookeeper, or globally unique tokens such as a transaction IDs can be used.

The following graphs illustrate the observed clock skew on our Cassandra fleet, suggesting the safety of this technique on modern cloud VMs with direct access to high-quality clocks. To further maintain safety, KV servers reject writes bearing tokens with large drift both preventing silent write discard (write has timestamp far in past) and immutable doomstones (write has a timestamp far in future) in storage engines vulnerable to those.

Handling Large Data through Chunking

Key-Value is also designed to efficiently handle large blobs, a common challenge for traditional key-value stores. Databases often face limitations on the amount of data that can be stored per key or partition. To address these constraints, KV uses transparent chunking to manage large data efficiently.

For items smaller than 1 MiB, data is stored directly in the main backing storage (e.g. Cassandra), ensuring fast and efficient access. However, for larger items, only the id, key, and metadata are stored in the primary storage, while the actual data is split into smaller chunks and stored separately in chunk storage. This chunk storage can also be Cassandra but with a different partitioning scheme optimized for handling large values. The idempotency token ties all these writes together into one atomic operation.

By splitting large items into chunks, we ensure that latency scales linearly with the size of the data, making the system both predictable and efficient. A future blog post will describe the chunking architecture in more detail, including its intricacies and optimization strategies.

Client-Side Compression

The KV abstraction leverages client-side payload compression to optimize performance, especially for large data transfers. While many databases offer server-side compression, handling compression on the client side reduces expensive server CPU usage, network bandwidth, and disk I/O. In one of our deployments, which helps power Netflix’s search, enabling client-side compression reduced payload sizes by 75%, significantly improving cost efficiency.

Smarter Pagination

We chose payload size in bytes as the limit per response page rather than the number of items because it allows us to provide predictable operation SLOs. For instance, we can provide a single-digit millisecond SLO on a 2 MiB page read. Conversely, using the number of items per page as the limit would result in unpredictable latencies due to significant variations in item size. A request for 10 items per page could result in vastly different latencies if each item was 1 KiB versus 1 MiB.

Using bytes as a limit poses challenges as few backing stores support byte-based pagination; most data stores use the number of results e.g. DynamoDB and Cassandra limit by number of items or rows. To address this, we use a static limit for the initial queries to the backing store, query with this limit, and process the results. If more data is needed to meet the byte limit, additional queries are executed until the limit is met, the excess result is discarded and a page token is generated.

This static limit can lead to inefficiencies, one large item in the result may cause us to discard many results, while small items may require multiple iterations to fill a page, resulting in read amplification. To mitigate these issues, we implemented adaptive pagination which dynamically tunes the limits based on observed data.

Adaptive Pagination

When an initial request is made, a query is executed in the storage engine, and the results are retrieved. As the consumer processes these results, the system tracks the number of items consumed and the total size used. This data helps calculate an approximate item size, which is stored in the page token. For subsequent page requests, this stored information allows the server to apply the appropriate limits to the underlying storage, reducing unnecessary work and minimizing read amplification.

While this method is effective for follow-up page requests, what happens with the initial request? In addition to storing item size information in the page token, the server also estimates the average item size for a given namespace and caches it locally. This cached estimate helps the server set a more optimal limit on the backing store for the initial request, improving efficiency. The server continuously adjusts this limit based on recent query patterns or other factors to keep it accurate. For subsequent pages, the server uses both the cached data and the information in the page token to fine-tune the limits.

In addition to adaptive pagination, a mechanism is in place to send a response early if the server detects that processing the request is at risk of exceeding the request’s latency SLO.

For example, let us assume a client submits a GetItems request with a per-page limit of 2 MiB and a maximum end-to-end latency limit of 500ms. While processing this request, the server retrieves data from the backing store. This particular record has thousands of small items so it would normally take longer than the 500ms SLO to gather the full page of data. If this happens, the client would receive an SLO violation error, causing the request to fail even though there is nothing exceptional. To prevent this, the server tracks the elapsed time while fetching data. If it determines that continuing to retrieve more data might breach the SLO, the server will stop processing further results and return a response with a pagination token.

This approach ensures that requests are processed within the SLO, even if the full page size isn’t met, giving clients predictable progress. Furthermore, if the client is a gRPC server with proper deadlines, the client is smart enough not to issue further requests, reducing useless work.

If you want to know more, the How Netflix Ensures Highly-Reliable Online Stateful Systems article talks in further detail about these and many other techniques.

Signaling

KV uses in-band messaging we call signaling that allows the dynamic configuration of the client and enables it to communicate its capabilities to the server. This ensures that configuration settings and tuning parameters can be exchanged seamlessly between the client and server. Without signaling, the client would need static configuration — requiring a redeployment for each change — or, with dynamic configuration, would require coordination with the client team.

For server-side signals, when the client is initialized, it sends a handshake to the server. The server responds back with signals, such as target or max latency SLOs, allowing the client to dynamically adjust timeouts and hedging policies. Handshakes are then made periodically in the background to keep the configuration current. For client-communicated signals, the client, along with each request, communicates its capabilities, such as whether it can handle compression, chunking, and other features.

KV Usage @ Netflix

The KV abstraction powers several key Netflix use cases, including:

  • Streaming Metadata: High-throughput, low-latency access to streaming metadata, ensuring personalized content delivery in real-time.
  • User Profiles: Efficient storage and retrieval of user preferences and history, enabling seamless, personalized experiences across devices.
  • Messaging: Storage and retrieval of push registry for messaging needs, enabling the millions of requests to flow through.
  • Real-Time Analytics: This persists large-scale impression and provides insights into user behavior and system performance, moving data from offline to online and vice versa.

Future Enhancements

Looking forward, we plan to enhance the KV abstraction with:

  • Lifecycle Management: Fine-grained control over data retention and deletion.
  • Summarization: Techniques to improve retrieval efficiency by summarizing records with many items into fewer backing rows.
  • New Storage Engines: Integration with more storage systems to support new use cases.
  • Dictionary Compression: Further reducing data size while maintaining performance.

Conclusion

The Key-Value service at Netflix is a flexible, cost-effective solution that supports a wide range of data patterns and use cases, from low to high traffic scenarios, including critical Netflix streaming use-cases. The simple yet robust design allows it to handle diverse data models like HashMaps, Sets, Event storage, Lists, and Graphs. It abstracts the complexity of the underlying databases from our developers, which enables our application engineers to focus on solving business problems instead of becoming experts in every storage engine and their distributed consistency models. As Netflix continues to innovate in online datastores, the KV abstraction remains a central component in managing data efficiently and reliably at scale, ensuring a solid foundation for future growth.

Acknowledgments: Special thanks to our stunning colleagues who contributed to Key Value’s success: William Schor, Mengqing Wang, Chandrasekhar Thumuluru, Rajiv Shringi, John Lu, George Cambell, Ammar Khaku, Jordan West, Chris Lohfink, Matt Lehman, and the whole online datastores team (ODS, f.k.a CDE).


Introducing Netflix’s Key-Value Data Abstraction Layer was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

ScyllaDB’s Rust Developer Workshop: What We Learned

A recap of the recent Rust developer workshop, where we built and refactored a high-performance Rust app for real-time data streaming (with ScyllaDB and Redpanda). Felipe Cardeneti Mendes (ScyllaDB Technical Director) and I recently got together with a couple thousand curious Rustaceans for a ScyllaDB Rust Developer Workshop. The agenda: walk through how we built and refactored a high-performance Rust app for real-time data streaming. We promised to show developers, engineers, and architects how to: Create and compile a sample social media app with Rust Connect the application to ScyllaDB (NoSQL data store) and Redpanda (streaming data) Negotiate tradeoffs related to data modeling and querying Manage and monitor the database for consistently low latencies This blog post is a quick recap of what we covered. Hopefully, it’s a nice wrapup for those who joined us live. If you missed it, you can still watch the recording (uncut – complete with a little cat chasing!). And feel free to ping me or Felipe with any questions you have. Access the workshop now Attend P99 CONF (free + virtual) to watch Rust tech talks First Things First First, I wanted to cover how I approach an existing, legacy codebase that Felipe so kindly generated for the workshop. I think it’s really important to respect everyone who interacts with code – past, present and future. That mindset helps foster good collaboration and leads to more maintainable and high quality code. Who knows, you might even have a laugh along the way. You probably spotted me using an Integrated Development Environment (IDE). Depending on your budget (from free to perhaps a couple hundred dollars), an IDE will really help streamline your coding process, especially when working with complex projects. The eagle eyed among you may have spotted some AI in there as well from our friends at GitHub. Every bit helps! Dealing with Dependencies In the code walkthrough, I first tackled the structure of the code, and showed how to organize workspace members. This helps me resolve dependencies efficiently and start to test the binaries in isolation: [workspace] members = ["backend", "consumer", "frontend"] resolver = "1" Then I could just run the consumer after stopping it in docker-compose with: cargo run --package consumer --bin consumer Updating the Driver Another thing I did was update the driver. It’s important to keep things in check with releases from ScyllaDB so we upgraded the Rust driver for the whole project. I did a quick walkthrough of application functionality and decided to write a quick smoke test that simulated traffic on the front end in terms of messaging between users. If you’re interested, I used a great load testing tool called k6 to simulate that load. Here’s the script: export default function () { http.post('http://localhost:3001/new_post', JSON.stringify({ content: 'bar', subject: 'foo', id: '8d8712fc-786f-4d72-98ea-3669e56f7012' }), { headers: { 'Content-Type': 'application/json', }, }); } Dealing with an Offset Bug Once we had some messages flowing (perhaps way too many, as it turned out) I discovered a potential bug, where the offset was not being persisted between application restarts. This meant every time we restarted the application, all of the messages would be read from the topic and then re-written to the database. Without understanding functionality like the ability to parse consumer offsets in Redpanda, I went for a more naive approach by storing the offset in ScyllaDB instead. I’m sure I’m not the first dev to go down the wrong path, and I fully blame Felipe for not intercepting earlier 😉 Refactoring Time In any case, it was fun to see how we might approach the topic of refactoring code. It’s always easier to start with small, manageable tasks when making improvements or refactoring code. The first thing I did was decide what the table (and ultimately query) might look like. This “query first design” is an important design concept in ScyllaDB..Be sure to check out some ScyllaDB University courses on this. I decided the table would look something like this to store my offset value: CREATE TABLE IF NOT EXISTS ks.offset (consumer text PRIMARY KEY, count BigInt) We briefly touched on why I chose a BigInt primitive instead of a Counter value. The main reason is that we can’t arbitrarily set the latter to a value, only increment or decrement it. We then tackled how we might write to that table and came up with the following: async fn update_offset(offset: i64, session: &Session, update_counter: &PreparedStatement, consumer: &str) -> Result<()> { session.execute(update_counter, (offset, consumer)).await?; Ok(()) } You’ll notice here that I’m passing it a prepared statement which is an important concept to grasp when making your code perform well with ScyllaDB. Be sure to read the docs on that if you’re unsure. I also recall writing a TODO to move some existing prepared query statements outside a for loop. The main reason: you only need to do this once for your app, not over and over. So watch out for that mistake. I also stored my query as a constant: const UPDATE_OFFSET: &str = "UPDATE ks.offset SET count = ? WHERE consumer = ?"; There are different ways to skin this, like maybe some form of model approach, but this was a simple way to keep the queries in one place within the consumer code. We restarted the app and checked the database using cqlsh to see if the offsets were being written – and they weren’t! But first, a quick tip from other webinars: If you’re running ScyllaDB in a docker container, you can simple exec to it and run the tool: docker exec -it scylla cqlsh Back to my mistake, why no writes to the table? If you recall, I write the offset after the consumer has finished processing records from the topic: offset = consumer(&postclient, "posts", offset, &session).await; update_offset(offset, &session, &update_counter, "posts").await.expect("Failed to update offset"); tokio::time::sleep(Duration::from_secs(5)).await; Since I had written a load test with something like 10K records, that consumer takes some time to complete, so update_offset wasn’t getting called straight away. By the end of the webinar, it actually finished reading from the topic and wrote the offset to the table. Another little change I snuck in there was on: tokio::time::sleep(Duration::from_secs(5)).await; Felipe spoke to the benefits of using tokio, an asynchronous runtime for Rust. The previous thread sleep would in fact do nothing, hence the change. Hooray for quick fixes! Once we had writes, we needed to read from the table, so I added another function that looked like this: async fn fetch_offset(session: &Session, consumer: &str) -> Result { let query = "SELECT count FROM ks.offset WHERE consumer = ?"; let values = (consumer,); let result = session.query(query, values).await.expect("Failed to execute query"); if let Some(row) = result.maybe_first_row_typed::<(i64,)>().expect("Failed to get row") { Ok(row.0) } else { Ok(0) } } I spoke about some common gotchas here, like misunderstanding how query values work, with different types, and whether to use a slice &[] or a tuple (). Query text is constant, but the values might change. You can pass changing values to a query by specifying a list of variables as bound values. Don’t forget the parenthesis! I also highlighted some of the convenience methods in query result, like maybe_first_row_typed. That returns Option<RowT> containing the first row from the result – which is handy when you just want the first row or None. Once again, you can play around with types, and even use custom structs if you prefer for the output. In my case, it was just a tuple with an i64. The complete consumer code for posts looked something like this: tokio::spawn(async move { use std::time::Duration; info!("Posts Consumer Started"); let session = db_session().await; let update_counter = session.prepare(UPDATE_OFFSET).await.expect("Failed to prepare query"); loop { let mut offset = fetch_offset(&session, "posts").await.expect("Failed to fetch offset"); offset = consumer(&postclient, "posts", offset, &session).await; update_offset(offset, &session, &update_counter, "posts").await.expect("Failed to update offset"); tokio::time::sleep(Duration::from_secs(5)).await; } }); You can see I prepare the statement before the loop, then I fetch the offset from the database, consume the topic, write the offset to the database and sleep. Keep doing that forever! What We Didn’t Have Time to Cover There were a few things that I wanted to cover, but ran out of time. If you wanted to write results to a custom struct, the code might look something like: #[derive(Default, FromRow)] pub struct Offset { consumer: String, count: i64, } use scylla::IntoTypedRows; async fn fetch_offset_type(session: &Session, consumer: &str) -> Offset { let query = "SELECT * FROM ks.offset WHERE consumer = ?"; let values = (consumer,); let result = session.query(query, values).await.expect("Failed to execute query"); if let Some(rows) = result.rows { if let Some(row) = rows.into_typed::().next() { let offset: Offset = row.expect("Failed to parse row"); return offset; } } Offset { consumer: consumer.to_string(), count: 0, } } There are some custom values you’ll come across like CqlTimestamps and Counter… so you should be aware of the ways to handle these different data types. For example, rather than convert everything to and from millisecond timestamps, you can add the chrono feature flag on the crate to interact with time. You can also improve logging with the driver’s support of the tracing crate for your logs. If you add that, you can use a tracing subscriber as follows: #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); … Wrapping Up I personally find refactoring code enjoyable. I’d encourage you to have a patient, persistent approach to coding, testing and refactoring. When it comes to ScyllaDB it’s a product where it really pays to read the documentation, as many of the foot guns are well documented. If you still find yourself stuck, feel free to ask questions on the ScyllaDB forum and learn from your peers. And remember, small, continuous improvements lead to long-term benefits. Have fun! See what you missed – watch the video

Training Updates, Forum, Upcoming ScyllaDB University LIVE Event

The ScyllaDB “sea monster” community is growing fast, so we’re excited to announce lots of new resources to get you on the fast track to ScyllaDB success. In this post, I’ll update you on our next ScyllaDB Labs and ScyllaDB University LIVE training events, introduce new lessons on ScyllaDB University, and summarize some interesting discussions from the community forum. ScyllaDB University Updates For those not (yet) familiar with ScyllaDB University, it’s an online learning and training center for ScyllaDB. The self-paced lessons include theory and hands-on labs that you can run yourself. To get started, just create a (free) account. As the product evolves, we enhance and update the training material. One of the lessons we recently updated is the “How to Write Better Apps” lesson. Many of the issues new users commonly face can be avoided by using best practices and straightforward built-in debugging mechanisms. This lesson covers important metrics users should track, and how to keep track of them using the Monitoring stack. Other topics include prepared statements, token-aware drivers, denormalizing data, working with multiple data centers, and data modeling best practices (partition sizing, distribution, retries, and batching). Another lesson we recently updated is the “Materialized Views, Secondary Indexes, and Filtering” lesson. ScyllaDB offers three indexing options: Materialized Views, Global Secondary Indexes, and Local Secondary Indexes. I often get questions from users about the differences between them and how to use them. They are all covered in this lesson, along with a comparison, examples of when to use each, quizzes, and hands-on labs. By the end of the lesson, users will have an understanding of the different index types in ScyllaDB, how to use them, and when to use each one. Additionally, they’ll gain some hands-on experience by creating and using these indexes in the labs. Additionally, we are embedding interactive, hands-on labs within the lessons, as you can see in the Quick Wins lab. Having the lab embedded within the browser means that you can run it regardless of your operating system – and without any prerequisites. In addition to the on-demand ScyllaDB University portal, we periodically host live online training sessions. The next two we have scheduled are ScyllaDB Labs and ScyllaDB University LIVE. Start Learning at ScyllaDB University   ScyllaDB Labs – 17 September, 2024 The interactive workshop will be held in an Asia-friendly time zone. Its focus is on providing hands-on experience building and interacting with high-performance applications using ScyllaDB. The labs introduce NoSQL strategies used by top teams, guiding participants through the creation of sample applications. We cover key topics such as determining if ScyllaDB is suitable for your use case, achieving low-latency NoSQL at scale, application performance optimization, data modeling and making important decisions when getting started with ScyllaDB. I will be hosting it together with my colleague Tim Koopmans. He will start with an introduction to ScyllaDB, covering some core concepts and a getting started lab. After that, my talk will focus on Data Modeling basics, including some architecture and another hands-on lab. Finally, attendees will have some time to run labs independently while we will be there to answer questions. Save Your Spot   ScyllaDB University Live – 24 September, 2024 ScyllaDB University LIVE  is an instructor-led NoSQL database online, half-day training event. It includes live sessions conducted by ScyllaDB’s lead engineers and architects and has two parallel tracks. Essentials: ScyllaDB architecture, key components, data modeling, and building your first ScyllaDB-powered app (in Rust!). Advanced Topics: Deep dives into cluster topology, advanced data modeling, optimizing elasticity and efficiency, and power user tips and tricks. Participants can switch between tracks or attend specific sessions of interest. After the training, there will be an expert panel to answer user questions. Attendees will also have the chance to complete quizzes, participate in hands-on labs, earn certificates, and receive exclusive ScyllaDB swag. The sessions are live, and there’s no on-demand option. Save Your Spot   What’s Trending on the ScyllaDB Community Forum The community forum is the place to discuss all things NoSQL related. You can ask questions, learn what your peers are doing, and share how you’re using ScyllaDB. Here is a summary of some of the top topics. GUI Visualization for ScyllaDB Many new users ask about using a GUI with ScyllaDB. Some relevant tools are the ScyllaDB Manager and the Monitoring Stack. One user suggested using JetBrains Rider, a paid tool that he found useful. Additionally, DBeaver, in some versions, now supports ScyllaDB. It’s a universal database manager that allows users to run CQL queries and view result tables directly within the interface. See the complete discussion Kafka: How to extract nested field in JSON structure? Here a user that is migrating from MySQL to ScyllaDB, and using ElasticSearch, is using the scylladb-cdc-source-connector to publish CDC messages to a Kafka topic and is facing issues with accessing nested fields in the JSON message structure. Existing Single Message Transformations (SMTs) don’t support accessing nested fields, some workaround options are discussed. See the complete discussion Best way to Fetch N rows in ScyllaDB: Count, Limit or Paging This topic discusses different ways for fetching N rows while using the TTL feature, optimizing for performance and efficiency. Three options are mentioned: using count(), using LIMIT, and using Paging. Some suggestions were to change the clustering key to include the timestamp and allow for more efficient queries, as well as using a Counter Table. Another point that was brought up was the performance difference between COUNT and LIMIT. See the complete discussion Read_concurrency_semaphore & p99 read latency The user is experiencing high P99 read latency in an application that queries time-series data using ScyllaDB, despite low average latency. The application uses a ScyllaDB cluster with 3 nodes, each with 16 cores, 128 GB RAM, and 3 TB RAID0 SSDs. The schema is designed for time-series data with a composite primary key and TimeWindowCompactionStrategy for compaction. While the ScyllaDB dashboard shows P99 read latency as low (1-10ms), the gocql latency report shows occasional spikes in P99 latency (700ms to 1s). The user has tried profiling and tracing but cannot identify the root cause. See the complete discussion ScyllaDB vs Aerospike The user read a whitepaper comparing ScyllaDB’s and Aerospike’s performance. The paper shows that ScyllaDB outperforms Aerospike by 30-40%. The user has several questions about the methodology of the tests used, versions, configurations, and so on. See the complete discussion Say Hello to the Community  

Be Part of Something BIG – Speak at Monster Scale Summit

A little sneak peek at something massive: a new virtual conference on extreme-scale engineering! Whether you’re designing, implementing, or optimizing systems that are pushed to their limits, we’d love to hear about your most impressive achievements and lessons learned – at Monster Scale Summit 2025. Become a Monster Scale Summit Speaker Register for Monster Scale Summit [Free] What’s Monster Scale Summit? Monster Scale Summit is a technical conference that connects the community of professionals working on performance-sensitive data-intensive applications. Engineers, architects, and SREs from gamechangers around the globe will be gathering virtually to explore “monster scale” challenges with respect to extreme levels of throughput, data, and global distribution. It’s a lot like P99 CONF (also hosted by ScyllaDB) – a two-day event that’s free, fully virtual, and highly interactive. The core difference is that it’s focused on extreme-scale engineering vs. all things performance. We just opened the call for speakers, and the lineup already includes engineers from Slack, Salesforce, VISA, American Express, ShareChat, Cloudflare, and Disney. Keynotes include Gwen Sharpira, Chris Riccomini, and Martin Kleppman (Designing Data Intensive Applications). What About ScyllaDB Summit? You might already be familiar with ScyllaDB Summit. Monster Scale Summit is the next evolution of that conference. We’re scaling it up and out to bring attendees more – and broader – insights on designing, implementing, and optimizing performance-sensitive data-intensive applications. But don’t worry – ScyllaDB and sea monsters will still be featured prominently throughout the event. And speakers will get sea monster plushies as part of the swag pack. 😉   Details please! When: March 11 + 12 Where: Wherever you’d like! It’s intentionally virtual, so you can present and interact with attendees from anywhere around the world. Topics: Core topics include: Distributed databases Streaming and real-time processing Intriguing system designs Approaches to a massive scaling challenge Methods for balancing latency/concurrency/throughput SRE techniques proven at scale Infrastructure built for unprecedented demands. What we’re looking for: We welcome a broad spectrum of talks about tackling the challenges that arise in the most massive, demanding environments. The conference prioritizes technical talks sharing first-hand experiences. Sessions are just 15-20 minutes – so consider this your TED Talk debut! Share Your Ideas

Clues in Long Queues: High IO Queue Delays Explained

How seemingly peculiar metrics might provide interesting insights into system performance In large systems, you often encounter effects that seem weird at first glance, but – when studied carefully – give an invaluable clue to understanding system behavior. When supporting ScyllaDB deployments, we observe many workload patterns that reveal themselves in various amusing ways. Sometimes what seems to be a system misbehaving stems from a bad configuration or sometimes a bug in the code. However, pretty often what seems to be impossible at first sight turns into an interesting phenomenon. Previously we described one of such effects called “phantom jams.” In this post, we’re going to show another example of the same species. As we’ve learned from many of the ScyllaDB deployments we track, sometimes a system appears to be lightly loaded and only a single parameter stands out, indicating something that typically denotes a system bottleneck. The immediate response is typically to disregard the outlier and attribute it to spurious system slow-down. However, thorough and careful analysis of all the parameters, coupled with an understanding of the monitoring system architecture, shows that the system is indeed under-loaded but imbalanced – and that crazy parameter was how the problem actually surfaced. Scraping metrics Monitoring systems often follow a time-series approach. To avoid overwhelming their monitored targets and frequently populating a time-series database (TSDB) with redundant data, these solutions apply a concept known as a “scrape interval.” Although different monitoring solutions exist, we’ll mainly refer to Prometheus and Grafana throughout this article, given that these are what we use for ScyllaDB Monitoring. Prometheus polls its monitored endpoints periodically and retrieves a set of metrics. This is called “scraping”. Metrics samples collected in a single scrape consist of name:value pairs, where value is a number. Prometheus supports four core types of metrics, but we are going to focus on two of those: counters and gauges. Counters are monotonically increasing metrics that reflect some value accumulated over time. When observed through Grafana, the rate() function is applied to counters, as it reflects the changes since the previous scrape instead of its total accumulated value. Gauges, on the other hand, are a type of metric that can arbitrarily rise and fall. Apparently (and surprisingly at the same time) gauges reflect a metric state as observed during scrape-time. This effectively means that any changes made between scrape intervals will be overlooked, and are lost forever. Before going further with the metrics, let’s take a step back and look at what makes it possible for ScyllaDB to serve millions and billions of user requests per second at sub-millisecond latency. IO in ScyllaDB ScyllaDB uses the Seastar framework to run its CPU, IO, and Network activity. A task represents a ScyllaDB operation run in lightweight threads (reactors) managed by Seastar. IO is performed in terms of requests and goes through a two-phase process that happens inside the subsystem we call the IO scheduler. The IO Scheduler plays a critical role in ensuring that IO gets both prioritized and dispatched in a timely manner, which often means predictability – some workloads require that submitted requests complete no later than within a given, rather small, time. To achieve that, the IO Scheduler sits in the hot path – between the disks and the database operations – and is built with a good understanding of the underlying disk capabilities. To perform an IO, first a running task submits a request to the scheduler. At that time, no IO happens. The request is put into the Seastar queue for further processing. Periodically, the Seastar reactor switches from running tasks to performing service operations, such as handling IO. This periodic switch is called polling and it happens in two circumstances: When there are no more tasks to run (such as when all tasks are waiting for IO to complete), or When a timer known as a task-quota elapses, by default at every 0.5 millisecond intervals. The second phase of IO handling involves two actions. First, the kernel is asked for any completed IO requests that were made previously. Second, outstanding requests in the ScyllaDB IO queues are dispatched to disk using the Linux kernel AIO API. Dispatching requests into the kernel is performed at some rate that’s evaluated out of pre-configured disk throughput and the previously mentioned task-quota parameter. The goal of this throttled dispatching is to make sure that dispatched requests are completed within the duration of task-quota. Urgent requests that may pop up in the queue during that time don’t need to wait for the disk to be able to serve them. For the scope of this article, let’s just say that dispatching happens at the disk throughput. For example, if disk throughput is 10k operations per second and poll happens each millisecond, then the dispatch rate will be 10 requests per poll. IO Scheduler Metrics Since the IO Scheduler sits in the hot path of all IO operations, it is important to understand how the IO Scheduler is performing. In ScyllaDB, we accomplish that via metrics. Seastar exposes many metrics, and several IO-related ones are included among them. All IO metrics are exported per class with the help of metrics labeling, and each represents a given IO class activity at a given point in time. IO Scheduler Metrics for the commitlog class Bandwidth and IOPS are two metrics that are easy to reason about. They show the rates at which requests get dispatched to disk. Bandwidth is a counter that gets increased by the request length every time it’s sent to disk. IOPS is a counter that gets incremented every time a request is sent to disk. When observed through Grafana, the aforementioned rate() function is applied and these counters are shown as BPS (bytes per second) and IO/s (IO per second), under their respective IO classes. Queue length metrics are gauges that represent the size of a queue. There are two kinds of queue length metrics. One represents the number of outstanding requests under the IO class. The other represents the number of requests dispatched to the kernel. These queues are also easy to reason about. Every time ScyllaDB makes a request, the class queue length is incremented. When the request gets dispatched to disk, the class queue length gauge is decremented and the disk queue length gauge is incremented. Eventually, as the IO completes, the disk queue length gauge goes down. When observing those metrics, it’s important to remember that they reflect the queue sizes as they were at the exact moment when they got scraped. It’s not at all connected to how large (or small) the queue was over the scrape period. This common misconception may cause one to end up with the wrong conclusions about how the IO scheduler or the disks are performing. Lastly, we have latency metrics known as IO delays. There are two of those – one for the software queue, and another for the disk. Each represents the average time requests spent waiting to get serviced. In earlier ScyllaDB versions, latency metrics were represented as gauges. The value shown was the latency of the last dispatched request (from the IO class queue to disk), or completed request (a disk IO completion). Because of that, the latencies shown weren’t accurate and didn’t reflect reality. A single ScyllaDB shard can perform thousands of requests per second and show the latency of a single request scraped after a long interval omits important insights about what really happened since the previous scrape. That’s why we eventually replaced these gauges with counters. Since then, latencies have been shown as a rate between the scrape intervals. Therefore, to calculate the average request delay, the new counter metrics are divided by the total number of IOPS dispatched within the scrape period. Disk can do more When observing IO for a given class, it is common to see corresponding events that took place during a specific interval. Consider the following picture: IO Scheduler Metrics – sl:default class The exact numbers are not critical here. What matters is how different plots correspond to each other. What’s strange here? Observe the two rightmost panels – bandwidth and IOPS. On a given shard, bandwidth starts at 5MB/s and peaks at 20MB/s, whereas IOPS starts at 200 operations/sec and peaks at 800 ops. These are really conservative numbers. The system from which those metrics were collected can sustain 1GB/s bandwidth under several thousands IOPS. Therefore, given that the numbers above are per-shard, the disk is using about 10% of its total capacity. Next, observe that the queue length metric (the second from the left) is empty most of the time. This is expected, partially because it’s a gauge and it represents the number of requests sitting under the queue as observed during scrape time – but not the total number of requests which got queued. Since disk capacity is far from being saturated, the IO scheduler dispatches all requests to disk shortly after they arrive into the scheduler queue. Given that IO polling happens at sub-millisecond intervals, in-queue requests get dispatched to disk within a millisecond. So, why do the latencies shown in the queue delay metric (the leftmost one) grow close to 40 milliseconds? In such situations, ScyllaDB users commonly wonder, “The disk can do more – why isn’t ScyllaDB’s IO scheduler consuming the remaining disk capacity?!” IO Queue delays explained To get an idea of what’s going on, let’s simplify the dispatching model described above and then walk through several thought experiments on an imaginary system. Assume that a disk can do 100k IOPS, and ignore its bandwidth as part of this exercise. Next, assume that the metrics scraping interval is 1 second, and that ScyllaDB polls its queues once every millisecond. Under these assumptions, according to the dispatching model described above, ScyllaDB will dispatch at most 100 requests at every poll. Next, we’ll see what happens if servicing 10k requests within a second, corresponding to 10% of what our disk can handle. IOPS Capacity Polling interval Dispatch Rate Target Request Rate Scrape Interval 100K 1ms 100 per poll 10K/second 1s   Even request arrival In the first experiment, requests arrive evenly at the queue – one request at every 1/10k = 0.1 millisecond. By the end of each tick, there will be 10 requests in the queue, and the IO scheduler will dispatch them all to disk. When polling occurs, each request will have accumulated its own in-queue delays. The first request waited 0.9ms, the second 0.8ms, …, 0 ms. The sum results in approximately 5ms of total in-queue delay. After 1 second or 1K ticks/polls), we’ll observe a total in-queue delay of 5 seconds. When scraped, the metrics will be: A rate of 10K IOPS An empty queue An average in-queue delay/latency of 0.5ms (5 seconds total delay / 10K IOPS) Single batch of requests In the second experiment, all 10k requests arrive at the queue in the very beginning and queue up. As the dispatch rate corresponds to 100 requests per tick, the IO scheduler will need 100 polls to fully drain the queue. The requests dispatched at the first tick will contribute 1 millisecond each to the total in queue delay, with a total sum of 100 milliseconds. Requests dispatched at the second tick will contribute 2 milliseconds each, with a total sum of 200 milliseconds. Therefore, requests dispatched during the Nth tick will contribute N*100 milliseconds to the delay counter. After 100 ticks the total in-queue delay will be 100 + 200 + … + 10000 ms = 500000 ms = 500 seconds. Once the metrics endpoint gets scraped, we’ll observe: The same rate of 10k IOPS, the ordering of arrival won’t influence the result The same empty queue, given that all requests were dispatched in 100ms (prior to scrape time) 50 milliseconds in-queue delay (500 seconds total delay / 10K IOPS) Therefore, the same work done differently resulted in higher IO delays. Multiple batches If the submission of requests happens more evenly, such as 1k batches arriving at every 100ms, the situation would be better, though still not perfect. Each tick would dispatch 100 requests, fully draining the queue within 10 ticks. However, given our polling interval of 1ms, the following batch will arrive only after 90 ticks and the system will be idling. As we observed in the previous examples, each tick contributes N*100 milliseconds to the total in-queue delay. After the queue gets fully drained, the batch contribution is 100 + 200 + … + 1000 ms = 5000 ms = 5 seconds. After 10 batches, this results in 50 seconds of total delay. When scraped, we’ll observe: The same rate of 10k IOPS The same empty queue 5 milliseconds in-queue delay (50 seconds / 10K IOPS) To sum up: The above experiments aimed to demonstrate that the same workload may render a drastically different observable “queue delay” when averaged over a long enough period of time. It can be an “expected” delay of half-a-millisecond. Or, it can be very similar to the puzzle that was shown previously – the disk seemingly can do more, the software queue is empty, and the in-queue latency gets notably higher than the tick length. Average queue length over time Queue length is naturally a gauge-type metric. It frequently increases and decreases as IO requests arrive and get dispatched. Without collecting an array of all the values, it’s impossible to get an idea of how it changed over a given period of time. Therefore, sampling the queue length between long intervals is only reliable in cases of very uniform incoming workloads. There are many parameters of the same nature in the computer world. The most famous example is the load average in Linux. It denotes the length of the CPU run-queue (including tasks waiting for IO) over the past 1, 5 and 15 minutes. It’s not a full history of run-queue changes, but it gives an idea of how it looked over time. Implementing a similar “queue length average” would improve the observability of IO queue length changes. Although possible, that would require sampling the queue length more regularly and exposing more gauges. But as we’ve demonstrated above, accumulated in-queue total time is yet another option – one that requires a single counter, but still shows some history. Why is a scheduler needed? Sometimes you may observe that doing no scheduling at all may result in much better in-queue latency. Our second experiment clearly shows why. Consider that – as in that experiment, 10k requests arrive in one large batch and ScyllaDB just forwards them straight to disk in the nearest tick. This will result in a 10000 ms total latency counter, respectively 1ms average queue delay. The initial results look great. At this point, the system will not be overloaded. As we know, no new requests will arrive and the disk will have enough time and resources to queue and service all dispatched requests. In fact, the disk will probably perform IO even better than it would while being fed eventually with requests. Doing so would likely maximize the disk’s internal parallelism in a better way, and give it more opportunities to apply internal optimizations, such as request merging or batching FTL updates. So why don’t we simply flush the whole queue into disk whatever length it is? The answer lies in the details, particularly in the “as we know” piece. First of all, Seastar assigns different IO classes for different kinds of workloads. To reflect the fact that different workloads have different importance to the system, IO classes have different priorities called “shares.” It is then the IO scheduler’s responsibility to dispatch queued IO requests to the underlying disk according to class shares value. For example, any IO activity that’s triggered by user queries runs under its own class named “statement” in ScyllaDB Open Source, and “sl:default” in Enterprise. This class usually has the largest shares denoting its high priority. Similarly, any IO performed during compactions occurs in the “compaction” class, whereas memtable flushes happen inside the “memtable” class – and both typically have low shares. We say “typically” because ScyllaDB dynamically adjusts shares of those two classes when it detects more work is needed for a respective workflow (for example, when it detects that compaction is falling behind). Next, after sending 10k requests to disk, we may expect that they will all complete in about 10k/100k = 100ms. Therefore, there isn’t much of a difference whether requests get queued by the IO scheduler or by the disk. The problem happens if and only if a new high-priority request pops up when we are waiting for the batch to get serviced. Even if we dispatch this new urgent request instantly, it will likely need to wait for the first batch to complete. Chances that disk will reorder it and service earlier are too low to rely upon, and that’s the delay the scheduler tries to avoid. Urgent requests need to be prioritized accordingly, and get served much faster. With the IO Scheduler dispatching model, we guarantee that a newly arrived urgent request will get serviced almost immediately. Conclusion Understanding metrics is crucial for understanding the behavior of complex systems. Queues are an essential element present in any data processing, and seeing how data traverses through queues is crucial for engineers solving real-life performance problems. Since it’s impossible to track every single data unit, compound metrics like counters and gauges become great companions for achieving said task. Queue length is a very important parameter. Observing its change over time reveals bottlenecks of the system, thus shedding light on performance issues that can arise in complex highly loaded systems. Unfortunately, one cannot see the full history of queue length changes (like you can with many other parameters), and this results in a misunderstanding of the system behavior. This article described an attempt to map queue length from gauge-type metrics to a counter-type one – thus making it possible to accumulate a history of the queue length changes over time. Even though the described “total delay” metrics and its behavior is heavily tied to how ScyllaDB monitoring and Seastar IO scheduler work, this way of accumulating and monitoring latencies is generic enough to be applied to other systems as well. More ScyllaDB Engineering Blogs 

How to Model Leaderboards for 1M Player Game with ScyllaDB

Ever wondered how a game like League of Legends, Fortnite, or even Rockband models its leaderboards? In this article, we’ll explore how to properly model a schema for leaderboards…using a monstrously fast database (ScyllaDB)! 1. Prologue Ever since I was a kid, I’ve been fascinated by games and how they’re made. My favorite childhood game was Guitar Hero 3: Legends of Rock. Well, more than a decade later, I decided to try to contribute to some games in the open source environment, like rust-ro (Rust Ragnarok Emulator) and YARG (Yet Another Rhythm Game). YARG is another rhythm game, but this project is completely open source. It unites legendary contributors in game development and design. The game was being picked up and played mostly by Guitar Hero/Rockband streamers on Twitch. I thought: Well, it’s an open-source project, so maybe I can use my database skills to create a monstrously fast leaderboard for storing past games. It started as a simple chat on their Discord, then turned into a long discussion about how to make this project grow faster. Ultimately, I decided to contribute to it by building a leaderboard with ScyllaDB. In this blog, I’ll show you some code and concepts! 2. Query-Driven Data Modeling With NoSQL, you should first understand which query you want to run depending on the paradigm (document, graph, wide-column, etc.). Focus on the query and create your schema based on that query. In this project, we will handle two types of paradigms: Key-Value Wide Column (Clusterization) Now let’s talk about the queries/features of our modeling. 2.1 Feature: Storing the matches Every time you finish a YARG gameplay, you want to submit your scores plus other in-game metrics. Basically, it will be a single query based on a main index. SELECT score, stars, missed_notes, instrument, ... FROM leaderboard.submisisons WHERE submission_id = 'some-uuid-here-omg' 2.2 Feature: Leaderboard And now our main goal: a super cool leaderboard that you don’t need to worry about after you perform good data modeling. The leaderboard is per song: every time you play a specific song, your best score will be saved and ranked. The interface has filters that dictate exactly which leaderboard to bring: song_id: required instrument: required modifiers: required difficulty: required player_id: optional score: optional Imagine our query looks like this, and it returns the results sorted by score in descending order: SELECT player_id, score, ... FROM leaderboard.song_leaderboard WHERE instrument = 'guitar' AND difficulty = 'expert' AND modifiers = {'none'} AND track_id = 'dani-california' LIMIT 100; -- player_id | score ----------------+------- -- tzach | 12000 -- danielhe4rt | 10000 -- kadoodle | 9999 ----------------+------- Can you already imagine what the final schema will look like? No? Ok, let me help you with that! 3. Data Modeling time! It’s time to take a deep dive into data modeling with ScyllaDB and better understand how to scale it. 3.1 – Matches Modeling First, let’s understand a little more about the game itself: It’s a rhythm game; You play a certain song at a time; You can activate “modifiers” to make your life easier or harder before the game; You must choose an instrument (e.g. guitar, drums, bass, and microphone). Every aspect of the gameplay is tracked, such as: Score; Missed notes; Overdrive count; Play speed (1.5x ~ 1.0x); Date/time of gameplay; And other cool stuff. Thinking about that, let’s start our data modeling. It will turn into something like this: CREATE TABLE IF NOT EXISTS leaderboard.submissions ( submission_id uuid, track_id text, player_id text, modifiers frozen<set>, score int, difficulty text, instrument text, stars int, accuracy_percentage float, missed_count int, ghost_notes_count int, max_combo_count int, overdrive_count int, speed int, played_at timestamp, PRIMARY KEY (submission_id, played_at) ); Let’s skip all the int/text values and jump to the set<text>. The set type allows you to store a list of items of a particular type. I decided to use this list to store the modifiers because it’s a perfect fit. Look at how the queries are executed: INSERT INTO leaderboard.submissions ( submission_id, track_id, modifiers, played_at ) VALUES ( some-cool-uuid-here, 'starlight-muse' {'all-taps', 'hell-mode', 'no-hopos'}, '2024-01-01 00:00:00' ); With this type, you can easily store a list of items to retrieve later. Another cool piece of information is that this query is a key-value like! What does that mean? Since you will always query it by the submission_id only, it can be categorized as a key-value. 3.2 Leaderboard Modeling Now we’ll cover some cool wide-column database concepts. In our leaderboard query, we will always need some dynamic values in the WHERE clauses. That means these values will belong to the Partition Key while the Clustering Keys will have values that can be “optional”. A partition key is a hash based on a combination of fields that you added to identify a value. Let’s imagine that you played Starlight - Muse 100x times. If you were to query this information, it would return 100x different results differentiated by Clustering Keys like score or player_id. SELECT player_id, score --- FROM leaderboard.song_leaderboard WHERE track_id = 'starlight-muse' LIMIT 100; If 1,000,000 players play this song, your query will become slow and it will become a problem in the future because your partition key consists of only one field, which is track_id. However, if you add more fields to your Partition Key, like mandatory things before playing the game, maybe you can shrink these possibilities for a faster query. Now do you see the big picture? Adding the fields like Instrument, Difficulty, and Modifiers will give you a way to split the information about that specific track evenly. Let’s imagine with some simple numbers: -- Query Partition ID: '1' SELECT player_id, score, ... FROM leaderboard.song_leaderboard WHERE instrument = 'guitar' AND difficulty = 'expert' AND modifiers = {'none'} AND -- Modifiers Changed track_id = 'starlight-muse' LIMIT 100; -- Query Partition ID: '2' SELECT player_id, score, ... FROM leaderboard.song_leaderboard WHERE instrument = 'guitar' AND difficulty = 'expert' AND modifiers = {'all-hopos'} AND -- Modifiers Changed track_id = 'starlight-muse' LIMIT 100; So, if you build the query in a specific shape it will always look for a specific token and retrieve the data based on these specific Partition Keys. Let’s take a look at the final modeling and talk about the clustering keys and the application layer: CREATE TABLE IF NOT EXISTS leaderboard.song_leaderboard ( submission_id uuid, track_id text, player_id text, modifiers frozen<set>, score int, difficulty text, instrument text, stars int, accuracy_percentage float, missed_count int, ghost_notes_count int, max_combo_count int, overdrive_count int, speed int, played_at timestamp, PRIMARY KEY ((track_id, modifiers, difficulty, instrument), score, player_id) ) WITH CLUSTERING ORDER BY (score DESC, player_id ASC); The partition key was defined as mentioned above, consisting of our REQUIRED PARAMETERS such as track_id, modifiers, difficulty and instrument. And for the Clustering Keys, we added score and player_id. Note that by default the clustering fields are ordered by score DESC and just in case a player has the same score, the criteria to choose the winner will be alphabetical ¯\(ツ)/¯. First, it’s good to understand that we will have only ONE SCORE PER PLAYER. But, with this modeling, if the player goes through the same track twice with different scores, it will generate two different entries. INSERT INTO leaderboard.song_leaderboard ( track_id, player_id, modifiers, score, difficulty, instrument, stars, played_at ) VALUES ( 'starlight-muse', 'daniel-reis', {'none'}, 133700, 'expert', 'guitar', '2023-11-23 00:00:00' ); INSERT INTO leaderboard.song_leaderboard ( track_id, player_id, modifiers, score, difficulty, instrument, stars, played_at ) VALUES ( 'starlight-muse', 'daniel-reis', {'none'}, 123700, 'expert', 'guitar', '2023-11-23 00:00:00' ); SELECT player_id, score FROM leaderboard.song_leaderboard WHERE instrument = 'guitar' AND difficulty = 'expert' AND modifiers = {'none'} AND track_id = 'starlight-muse' LIMIT 2; -- player_id | score ----------------+------- -- daniel-reis | 133700 -- daniel-reis | 123700 ----------------+------- So how do we fix this problem? Well, it’s not a problem per se. It’s a feature! As a developer, you have to create your own business rules based on the project’s needs, and this is no different. What do I mean by that? You can run a simple DELETE query before inserting the new entry. That will guarantee that you will not have specific data from the player_id with less than the new score inside that specific group of partition keys. -- Before Insert the new Gampleplay DELETE FROM leaderboard.song_leaderboard WHERE instrument = 'guitar' AND difficulty = 'expert' AND modifiers = {'none'} AND track_id = 'starlight-muse' AND player_id = 'daniel-reis' AND score <= 'your-new-score-here'; -- Now you can insert the new payload... And with that, we finished our simple leaderboard system, the same one that runs in YARG and can also be used in games with MILLIONS of entries per second 😀 4. How to Contribute to YARG Want to contribute to this wonderful open-source project? We’re building a brand new platform for all the players using: Game: Unity3d (Repository) Front-end: NextJS (Repository) Back-end: Laravel 10.x (Repository) We will need as many developers and testers as possible to discuss future implementations of the game together with the main contributors! First, make sure to join this Discord Community. This is where all the technical discussions happen with the backing of the community before going to the development board. Also, outside of Discord, the YARG community is mostly focused on the EliteAsian (core contributor and project owner) X account for development showcases. Be sure to follow him there as well.
New replay viewer HUD for #YARG! There are still some issues with it, such as consistency, however we are planning to address them by the official stable release of v0.12. pic.twitter.com/9ACIJXAZS4 — EliteAsian (@EliteAsian123) December 16, 2023
And FYI, the Lead Artist of the game, (aka Kadu) is also a Broadcast Specialist and Product Innovation Developer at Elgato who worked with streamers like: Ninja Nadeshot StoneMountain64 and the legendary DJ Marshmello. Kadu also uses his X to share some insights and early previews of new features and experimentations for YARG. So, don’t forget to follow him as well!
Here's how the replay venue looks like now, added a lot of details on the desk, really happy with the result so far, going to add a few more and start the textures pic.twitter.com/oHH27vkREe — ⚡Kadu Waengertner (@kaduwaengertner) August 10, 2023
Here are some useful links to learn more about the project: Official Website Github Repository Task Board
Fun fact: YARG got noticed by Brian Bright, project lead on Guitar Hero, who liked the fact that the project was open source. Awesome, right?
5. Conclusion Data modeling is sometimes challenging. This project involved learning many new concepts and a lot of testing together with my community on Twitch. I have also published a Gaming Leaderboard Demo, where you can get some insights on how to implement the same project using NextJS and ScyllaDB! Also, if you like ScyllaDB and want to learn more about it, I strongly suggest you watch our free Masterclass Courses or visit ScyllaDB University!  

How ShareChat Scaled their ML Feature Store 1000X without Scaling the Database

How ShareChat successfully scaled 1000X without scaling the underlying database (ScyllaDB) The demand for low-latency machine learning feature stores is higher than ever, but actually implementing one at scale remains a challenge. That became clear when ShareChat engineers Ivan Burmistrov and Andrei Manakov took the P99 CONF 23 stage to share how they built a low-latency ML feature store based on ScyllaDB. This isn’t a tidy case study where adopting a new product saves the day. It’s a “lessons learned” story, a look at the value of relentless performance optimization – with some important engineering takeaways. The original system implementation fell far short of the company’s scalability requirements. The ultimate goal was to support 1 billion features per second, but the system failed under a load of just 1 million. With some smart problem solving, the team pulled it off though. Let’s look at how their engineers managed to pivot from the initial failure to meet their lofty performance goal without scaling the underlying database. Obsessed with performance optimizations and low-latency engineering? Join your peers at P99 24 CONF, a free highly technical virtual conference on “all things performance.” Speakers include: Michael Stonebraker, Postgres creator and MIT professor Bryan Cantrill, Co-founder and CTO of Oxide Computer Avi Kivity, KVM creator, ScyllaDB co-founder and CTO Liz Rice, Chief open source officer with eBPF specialists Isovalent Andy Pavlo, CMU professor Ashley Williams, Axo founder/CEO, former Rust core team, Rust Foundation founder Carl Lerche, Tokio creator, Rust contributor and engineer at AWS Register Now – It’s Free In addition to another great talk by Ivan from ShareChat’, expect more than 60 engineering talks on performance optimizations at Disney/Hulu, Shopify, Lyft, Uber, Netflix,  American Express, Datadog, Grafana, LinkedIn, Google, Oracle, Redis, AWS, ScyllaDB and more. Register for free. ShareChat: India’s Leading Social Media Platform To understand the scope of the challenge, it’s important to know a little about ShareChat, the leading social media platform in India. On the ShareChat app, users discover and consume content in more than 15 different languages, including videos, images, songs and more. ShareChat also hosts a TikTok-like short video platform (Moj) that encourages users to be creative with trending tags and contests. Between the two applications, they serve a rapidly growing user base that already has over 325 million monthly active users. And their AI-based content recommendation engine is essential for driving user retention and engagement. Machine learning feature stores at ShareChat This story focuses on the system behind ML feature stores for the short-form video app Moj. It offers fully personalized feeds to around 20 million daily active users, 100 million monthly active users. Feeds serve 8,000 requests per second, and there’s an average of 2,000 content candidates being ranked on each request (for example, to find the 10 best items to recommend). “Features” are pretty much anything that can be extracted from the data: Ivan Burmistrov, principal staff software engineer at ShareChat, explained: “We compute features for different ‘entities.’ Post is one entity, User is another and so on. From the computation perspective, they’re quite similar. However, the important difference is in the number of features we need to fetch for each type of entity. When a user requests a feed, we fetch user features for that single user. However, to rank all the posts, we need to fetch features for each candidate (post) being ranked, so the total load on the system generated by post features is much larger than the one generated by user features. This difference plays an important role in our story.” What went wrong At first, the primary focus was on building a real-time user feature store because, at that point, user features were most important. The team started to build the feature store with that goal in mind. But then priorities changed and post features became the focus too. This shift happened because the team started building an entirely new ranking system with two major differences versus its predecessor: Near real-time post features were more important The number of posts to rank increased from hundreds to thousands Ivan explained: “When we went to test this new system, it failed miserably. At around 1 million features per second, the system became unresponsive, latencies went through the roof and so on.” Ultimately, the problem stemmed from how the system architecture used pre-aggregated data buckets called tiles. For example, they can aggregate the number of likes for a post in a given minute or other time range. This allows them to compute metrics like the number of likes for multiple posts in the last two hours. Here’s a high-level look at the system architecture. There are a few real-time topics with raw data (likes, clicks, etc.). A Flink job aggregates them into tiles and writes them to ScyllaDB. Then there’s a feature service that requests tiles from ScyllaDB, aggregates them and returns results to the feed service. The initial database schema and tiling configuration led to scalability problems. Originally, each entity had its own partition, with rows timestamp and feature name being ordered clustering columns. [Learn more in this NoSQL data modeling masterclass]. Tiles were computed for segments of one minute, 30 minutes and one day. Querying one hour, one day, seven days or 30 days required fetching around 70 tiles per feature on average. If you do the math, it becomes clear why it failed. The system needed to handle around 22 billion rows per second. However, the database capacity was only 10 million rows/sec.   Initial optimizations At that point, the team went on an optimization mission. The initial database schema was updated to store all feature rows together, serialized as protocol buffers for a given timestamp. Because the architecture was already using Apache Flink, the transition to the new tiling schema was fairly easy, thanks to Flink’s advanced capabilities in building data pipelines. With this optimization, the “Features” multiplier has been removed from the equation above, and the number of required rows to fetch has been reduced by 100X: from around 2 billion to 200 million rows/sec. The team also optimized the tiling configuration, adding additional tiles for five minutes, three hours and five days to one minute, 30 minutes and one day tiles. This reduced the average required tiles from 70 to 23, further reducing the rows/sec to around 73 million. To handle more rows/sec on the database side, they changed the ScyllaDB compaction strategy from incremental to leveled. [Learn more about compaction strategies]. That option better suited their query patterns, keeping relevant rows together and reducing read I/O. The result: ScyllaDB’s capacity was effectively doubled. The easiest way to accommodate the remaining load would have been to scale ScyllaDB 4x. However, more/larger clusters would increase costs and that simply wasn’t in their budget. So the team continued focusing on improving the scalability without scaling up the ScyllaDB cluster. Improved cache locality One potential way to reduce the load on ScyllaDB was to improve the local cache hit rate, so the team decided to research how this could be achieved. The obvious choice was to use a consistent hashing approach, a well-known technique to direct a request to a certain replica from the client based on some information about the request. Since the team was using NGINX Ingress in their Kubernetes setup, using NGINX’s capabilities for consistent hashing seemed like a natural choice. Per NGINX Ingress documentation, setting up consistent hashing would be as simple as adding three lines of code. What could go wrong? A bit. This simple configuration didn’t work. Specifically: The client subset led to a huge key remapping – up 100% in the worst case. Since the node keys can be changed in a hash ring, it was impossible to use real-life scenarios with autoscaling. [See the ingress implementation] It was tricky to provide a hash value for a request because Ingress doesn’t support the most obvious solution: a gRPC header. The latency suffered severe degradation, and it was unclear what was causing the tail latency. To support a subset of the pods, the team modified their approach. They created a two-step hash function: first hashing an entity, then adding a random prefix. That distributed the entity across the desired number of pods. In theory, this approach could cause a collision when an entity is mapped to the same pod several times. However, the risk is low given the large number of replicas. Ingress doesn’t support using gRPC header as a variable, but the team found a workaround: using path rewriting and providing the required hash key in the path itself. The solution was admittedly a bit “hacky” … but it worked. Unfortunately, pinpointing the cause of latency degradation would have required considerable time, as well as observability improvements. A different approach was needed to scale the feature store in time. To meet the deadline, the team split the Feature service into 27 different services and manually split all entities between them on the client. It wasn’t the most elegant approach, but, it was simple and practical – and it achieved great results. The cache hit rate improved to 95% and the ScyllaDB load was reduced to 18.4 million rows per second. With this design, ShareChat scaled its feature store to 1B features per second by March. However, this “old school” deployment-splitting approach still wasn’t the ideal design. Maintaining 27 deployments was tedious and inefficient. Plus, the cache hit rate wasn’t stable, and scaling was limited by having to keep a high minimum pod count in every deployment. So even though this approach technically met their needs, the team continued their search for a better long-term solution. The next phase of optimizations: consistent hashing, Feature service Ready for yet another round of optimization, the team revisited the consistent hashing approach using a sidecar, called Envoy Proxy, deployed with the feature service. Envoy Proxy provided better observability which helped identify the latency tail issue. The problem: different request patterns to the Feature service caused a huge load on the gRPC layer and cache. That led to extensive mutex contention. The team then optimized the Feature service. They: Forked the caching library (FastCache from VictoriaMetrics) and implemented batch writes and better eviction to reduce mutex contention by 100x. Forked gprc-go and implemented buffer pool across different connections to avoid contention during high parallelism. Used object pooling and tuned garbage collector (GC) parameters to reduce allocation rates and GC cycles. With Envoy Proxy handling 15% of traffic in their proof-of-concept, the results were promising: a 98% cache hit rate, which reduced the load on ScyllaDB to 7.4M rows/sec. They could even scale the feature store more: from 1 billion features/second to 3 billion features/second. Lessons learned Here’s what this journey looked like from a timeline perspective: To close, Andrei summed up the team’s top lessons learned from this project (so far): Use proven technologies. Even as the ShareChat team drastically changed their system design, ScyllaDB, Apache Flink and VictoriaMetrics continued working well. Each optimization is harder than the previous one – and has less impact. Simple and practical solutions (such as splitting the feature store into 27 deployments) do indeed work. The solution that delivers the best performance isn’t always user-friendly. For instance, their revised database schema yields good performance, but is difficult to maintain and understand. Ultimately, they wrote some tooling around it to make it simpler to work with. Every system is unique. Sometimes you might need to fork a default library and adjust it for your specific system to get the best performance. Watch their complete P99 CONF talk

Simplifying Cassandra and DynamoDB Migrations with the ScyllaDB Migrator

Learn about the architecture of ScyllaDB Migrator, how to use it, recent developments, and upcoming features. ScyllaDB offers both a CQL-compatible API and a DynamoDB-compatible API, allowing applications that use Apache Cassandra or DynamoDB to take advantage of reduced costs and lower latencies with minimal code changes. We previously described the two main migration strategies: cold and hot migrations. In both cases, you need to backfill ScyllaDB with historical data. Either can be efficiently achieved with the ScyllaDB Migrator. In this blog post, we will provide an update on its status. You will learn about its architecture, how to use it, recent developments, and upcoming features. The Architecture of the ScyllaDB Migrator The ScyllaDB Migrator leverages Apache Spark to migrate terabytes of data in parallel. It can migrate data from various types of sources, as illustrated in the following diagram: We initially developed it to migrate from Apache Cassandra, but we have since added support for more types of data sources. At the time of writing, the Migrator can migrate data from either: A CQL-compatible source: An Apache Cassandra table. Or a Parquet file stored locally or on Amazon S3. Or a DynamoDB-compatible source: A DynamoDB table. Or a DynamoDB table export on Amazon S3. What’s so interesting about ScyllaDB Migrator? Since it runs as an Apache Spark application, you can adjust its throughput by scaling the underlying Spark cluster. It is designed to be resilient to read or write failures. If it stops prior to completion, the migration can be restarted from where it left off. It can rename item columns along the way. When migrating from DynamoDB, the Migrator can endlessly replicate new changes to ScyllaDB. This is useful for hot migration strategies. How to Use the ScyllaDB Migrator More details are available in the official Migrator documentation. The main steps are: Set Up Apache Spark: There are several ways to set up an Apache Spark cluster, from using a pre-built image on AWS EMR to manually following the official Apache Spark documentation to using our automated Ansible playbook on your own infrastructure. You may also use Docker to run a cluster on a single machine. Prepare the Configuration File: Create a YAML configuration file that specifies the source database, target ScyllaDB cluster, and any migration option. Run the Migrator: Execute the ScyllaDB Migrator using the spark-submit command. Pass the configuration file as an argument to the migrator. Monitor the Migration: The Spark UI provides logs and metrics to help you monitor the migration process. You can track the progress and troubleshoot any issues that arise. You should also monitor the source and target databases to check whether they are saturated or not. Recent Developments The ScyllaDB Migrator has seen several significant improvements, making it more versatile and easier to use: Support for Reading DynamoDB S3 Exports: You can now migrate data from DynamoDB S3 exports directly to ScyllaDB, broadening the range of sources you can migrate from. PR #140. AWS AssumeRole Authentication: The Migrator now supports AWS AssumeRole authentication, allowing for secure access to AWS resources during the migration process. PR #150. Schema-less DynamoDB Migrations: By adopting a schema-less approach, the Migrator enhances reliability when migrating to ScyllaDB Alternator, ScyllaDB’s DynamoDB-compatible API. PR #105. Dedicated Documentation Website: The Migrator’s documentation is now available on a proper website, providing comprehensive guides, examples, and throughput tuning tips. PR #166. Update to Spark 3.5 and Scala 2.13: The Migrator has been updated to support the latest versions of Spark and Scala, ensuring compatibility and leveraging the latest features and performance improvements. PR #155. Ansible Playbook for Spark Cluster Setup: An Ansible playbook is now available to automate the setup of a Spark cluster, simplifying the initial setup process. PR #148. Publish Pre-built Assemblies: You don’t need to manually build the Migrator from the source anymore. Download the latest release and pass it to the spark-submit command. PR #158. Strengthened Continuous Integration: We have set up a testing infrastructure that reduces the risk of introducing regressions and prevents us from breaking backward compatibility. PRs #107, #121, #127. Hands-on Migration Example The content of this section has been extracted from the documentation website. The original content is kept up to date. Let’s go through a migration example to illustrate some of the points listed above. We will perform a cold migration to replicate 1,000,000 items from a DynamoDB table to ScyllaDB Alternator. The whole system is composed of the DynamoDB service, a Spark cluster with a single worker node, and a ScyllaDB cluster with a single node, as illustrated below: To make it easier for interested readers to follow along, we will create all those services using Docker. All you need is the AWS CLI and Docker. The example files can be found at  https://github.com/scylladb/scylla-migrator/tree/b9be9fb684fb0e51bf7c8cbad79a1f42c6689103/docs/source/tutorials/dynamodb-to-scylladb-alternator Set Up the Services and Populate the Source Database We use Docker Compose to define each service. Our docker-compose.yml file looks as follows: Let’s break down this Docker Compose file. We define the DynamoDB service by reusing the official image amazon/dynamodb-local. We use the TCP port 8000 for communicating with DynamoDB. We define the Spark master and Spark worker services by using a custom image (see below). Indeed, the official Docker images for Spark 3.5.1 only support Scala 2.12 for now, but we need Scala 2.13. We mount the local directory ./spark-data to the Spark master container path /app so that we can supply the Migrator jar and configuration to the Spark master node. We expose the ports 8080 and 4040 of the master node to access the Spark UIs from our host environment. We allocate 2 cores and 4 GB of memory to the Spark worker node. As a general rule, we recommend allocating 2 GB of memory per core on each worker. We define the ScyllaDB service by reusing the official image scylladb/scylla. We use the TCP port 8001 for communicating with ScyllaDB Alternator. The Spark services rely on a local Dockerfile located at path ./dockerfiles/spark/Dockerfile. For the sake of completeness, here is the content of this file, which you can copy-paste: And here is the entry point used by the image, which needs to be executable: This Docker image installs Java and downloads the official Spark release. The entry point of the image takes an argument that can be either master or worker to control whether to start a master node or a worker node. Prepare your system for building the Spark Docker image with the following commands: mkdir spark-data chmod +x entrypoint.sh Finally, start all the services with the following command: docker compose up Your system’s Docker daemon will download the DynamoDB and ScyllaDB images and build our Spark Docker image. Check that you can access the Spark cluster UI by opening http://localhost:8080 in your browser. You should see your worker node in the workers list. Once all the services are up, you can access your local DynamoDB instance and your local ScyllaDB instance by using the standard AWS CLI. Make sure to configure the AWS CLI as follows before running the dynamodb commands: # Set dummy region and credentials aws configure set region us-west-1 aws configure set aws_access_key_id dummy aws configure set aws_secret_access_key dummy # Access DynamoDB aws --endpoint-url http://localhost:8000 dynamodb list-tables # Access ScyllaDB Alternator aws --endpoint-url http://localhost:8001 dynamodb list-tables The last preparatory step consists of creating a table in DynamoDB and filling it with random data. Create a file named create-data.sh, make it executable, and write the following content into it: This script creates a table named Example and adds 1 million items to it. It does so by invoking another script, create-25-items.sh, that uses the batch-write-item command to insert 25 items in a single call: Every added item contains an id and five columns, all filled with random data. Run the script: ./create-data.sh and wait for a couple of hours until all the data is inserted (or change the last line of create-data.sh to insert fewer items and make the demo faster). Perform the Migration Once you have set up the services and populated the source database, you are ready to perform the migration. Download the latest stable release of the Migrator in the spark-data directory: wget https://github.com/scylladb/scylla-migrator/releases/latest/download/scylla-migrator-assembly.jar \ –directory-prefix=./spark-data Create a configuration file in spark-data/config.yaml and write the following content: This configuration tells the Migrator to read the items from the table Example in the dynamodb service, and to write them to the table of the same name in the scylla service. Finally, start the migration with the following command: docker compose exec spark-master \ /spark/bin/spark-submit \ --executor-memory 4G \ --executor-cores 2 \ --class com.scylladb.migrator.Migrator \ --master spark://spark-master:7077 \ --conf spark.driver.host=spark-master \ --conf spark.scylla.config=/app/config.yaml \ /app/scylla-migrator-assembly.jar This command calls spark-submit in the spark-master service with the file scylla-migrator-assembly.jar, which bundles the Migrator and all its dependencies. In the spark-submit command invocation, we explicitly tell Spark to use 4 GB of memory; otherwise, it would default to 1 GB only. We also explicitly tell Spark to use 2 cores. This is not really necessary as the default behavior is to use all the available cores, but we set it for the sake of illustration. If the Spark worker node had 20 cores, it would be better to use only 10 cores per executor to optimize the throughput (big executors require more memory management operations, which decrease the overall application performance). We would achieve this by passing --executor-cores 10, and the Spark engine would allocate two executors for our application to fully utilize the resources of the worker node. The migration process inspects the source table, replicates its schema to the target database if it does not exist, and then migrates the data. The data migration uses the Hadoop framework under the hood to leverage the Spark cluster resources. The migration process breaks down the data to transfer chunks of about 128 MB each, and processes all the partitions in parallel. Since the source is a DynamoDB table in our example, each partition translates into a scan segment to maximize the parallelism level when reading the data. Here is a diagram that illustrates the migration process: During the execution of the command, a lot of logs are printed, mostly related to Spark scheduling. Still, you should be able to spot the following relevant lines: 24/07/22 15:46:13 INFO migrator: ScyllaDB Migrator 0.9.2 24/07/22 15:46:20 INFO alternator: We need to transfer: 2 partitions in total 24/07/22 15:46:20 INFO alternator: Starting write… 24/07/22 15:46:20 INFO DynamoUtils: Checking for table existence at destination And when the migration ends, you will see the following line printed: 24/07/22 15:46:24 INFO alternator: Done transferring table snapshot During the migration, it is possible to monitor the underlying Spark job by opening the Spark UI available at http://localhost:4040 Example of a migration broken down in 6 tasks. The Spark UI allows us to follow the overall progress, and it can also show specific metrics such as the memory consumption of an executor. In our example the size of the source table is ~200 MB. In practice, it is common to migrate tables containing several terabytes of data. If necessary, and as long as your DynamoDB source supports a higher read throughput level, you can increase the migration throughput by adding more Spark worker nodes. The Spark engine will automatically spread the workload between all the worker nodes. Future Enhancements The ScyllaDB team is continuously improving the Migrator. Some of the upcoming features include: Support for Savepoints with DynamoDB Sources: This will allow users to resume the migration from a specific point in case of interruptions. This is currently supported with Cassandra sources only. Shard-Aware ScyllaDB Driver: The Migrator will fully take advantage of ScyllaDB’s specific optimizations for even faster migrations. Support for SQL-based Sources: For instance, migrate from MySQL to ScyllaDB. Conclusion Thanks to the ScyllaDB Migrator, migrating data to ScyllaDB has never been easier. With its robust architecture, recent enhancements, and active development, the migrator is an indispensable tool for ensuring a smooth and efficient migration process. For more information, check out the ScyllaDB Migrator lesson on ScyllaDB University. Another useful resource is the official ScyllaDB Migrator documentation. Are you using the Migrator? Any specific feature you’d like to see? For any questions about your specific use case or about the Migrator in general, tap into the community knowledge on the ScyllaDB Community Forum.

Inside ScyllaDB’s Continuous Optimizations for Reducing P99 Latency

How the ScyllaDB Engineering team reduced latency spikes during administrative operations through continuous monitoring and rigorous testing In the world of databases, smooth and efficient operation is crucial. However, both ScyllaDB and its predecessor Cassandra have historically encountered challenges with latency spikes during administrative operations such as repair, backup, node addition, decommission, replacement, upgrades, compactions etc.. This blog post shares how the ScyllaDB Engineering team embraced continuous improvement to tackle these challenges head-on. Protecting Performance by Measuring Operational Latency Understanding and improving the performance of a database system like ScyllaDB involves continuous monitoring and rigorous testing. Each week, our team tackles this challenge by measuring performance under three types of workload scenarios: write, read, and mixed (50% read/write). We focus specifically on operational latency: how the system performs during typical and intensive operations like repair, node addition, node termination, decommission or upgrade. Our Measurement Methodology To ensure accurate results, we preload each cluster with data at a 10:1 data-to-memory ratio—equivalent to inserting 650GB on 64GB memory instances. Our benchmarks begin by recording the latency during a steady state to establish a baseline before initiating various cluster operations. We follow a strict sequence during testing: Preload data to simulate real user environments. Baseline latency measurement for a stable reference point. Sequential operational tests involving: Repair operations via Scylla Manager. Addition of three new nodes. Termination and replacement of a node. Decommissioning of three nodes. Latency is our primary metric; if it exceeds 15ms, we immediately start investigating it. We also monitor CPU instructions per operation and track reactor stalls, which are critical for understanding performance bottlenecks. How We Measure Latency Measuring latency effectively requires looking beyond the time it takes for ScyllaDB to process a command. We consider the entire lifecycle of a request: Response time: The time from the moment the query is initiated to when the response is delivered back to the client. Advanced metrics: We utilize High Dynamic Range (HDR) Histograms to capture and analyze latency from each cassandra-stress worker. This ensures we can compute a true representation of latency percentiles rather than relying on simple averages. Results from these tests are meticulously compiled and compared with previous runs. This not only helps us detect any performance degradation but also highlights improvements. It keeps the entire team informed through detailed reports that include operation durations and latency breakdowns for both reads and writes. Better Metrics, Better Performance When we started to verify performance regularly, we mostly focused on the latencies. At that time, reports lacked many details (like HDR results), but were sufficient to identify performance issues. These included high latency when decommissioning a node, or issues with latencies during the steady state. Since then, we have optimized our testing approach to include more – and more detailed – metrics. This enables us to spot emerging performance issues sooner and root out the culprit faster. The improved testing approach has been a valuable tool, providing fast and precise feedback on how well (or not) our product optimization strategies are actually working in practice. Total metrics Our current reports include HDR Histogram details providing a comprehensive overview of system latency throughout the entire test. Number of reactor stalls (which are pauses in processing due to overloaded conditions)  prompts immediate attention and action when they increase significantly. We take a similar approach to kernel callstacks which are logged when kernel space holds locks for too long.   Management repair After populating the cluster with data, we start our test from a full cluster repair using Scylla Manager and measure the latencies:   During this period, the P99 latency was 3.87 ms for writes and 9.41 ms for reads. In comparison, during the “steady state” (when no operations were performed), the latencies were 2.23 ms and 3.87 ms, respectively. Cluster growth After the repair, we add three nodes to the cluster and conduct a similar latency analysis:   Each cycle involves adding one node sequentially. These results provide a clear view of how latency changes and the duration required to expand the cluster. Node Termination and Replacement Following the cluster growth, one node is terminated and replaced with another. Cluster Shrinkage The test concludes with shrinking the cluster back to its initial size by decommissioning random nodes one by one.   These tests and reports are invaluable, uncovering numerous performance issues like increased latencies during decommission, detecting long reactor stalls in row cache update or short but frequent ones in sstable reader paths that lead to crucial fixes, improvements, and insights. This progress is evident in the numbers, where current latencies remain in the single-digit range under various conditions. Looking Ahead Our optimization journey is ongoing. ScyllaDB 6.0 introduced tablets, significantly accelerating cluster resizing to market-leading levels. The introduction of immediate node joining, which can start in parallel with accelerated data streaming, shows significant improvements across all metrics. With these improvements, we start measuring and optimizing not only the latencies during these operations but also the operations durations. Stay tuned for more details about these advancements soon. Our proactive approach to tackling latency issues not only improves our database performance but also exemplifies our commitment to excellence. As we continue to innovate and refine our processes, ScyllaDB remains dedicated to delivering superior database solutions that meet the evolving needs of our users.

ScyllaDB Elasticity: Demo, Theory, and Future Plans

Watch a demo of how ScyllaDB’s Raft and tablets initiatives play out with real operations on a real ScyllaDB cluster — and get a glimpse at what’s next on our roadmap. If you follow ScyllaDB, you’ve likely heard us talking about Raft and tablets initiatives for years now. (If not, read more on tablets from Avi Kivity and Raft from Kostja Osipov) You might have even seen some really cool animations. But how does it play out with real operations on a real ScyllaDB cluster? And what’s next on our roadmap – particularly in terms of user impacts? ScyllaDB Co-Founder Dor Laor and Technical Director Felipe Mendes recently got together to answer those questions. In case you missed it or want a recap of the action-packed and information-rich session, here’s the complete recording:   In case you want to skip to a specific section, here’s a breakdown of what they covered when: 4:45 ScyllaDB already scaled linearly 8:11 Tablets + Raft = elasticity, speed, simplicity, TCO 11:45 Demo time! 30:23 Looking under the hood 46:19: Looking ahead And in case you prefer to read vs watch, here are some key points… Double Double Demo After Dor shared why ScyllaDB adopted a new dynamic “tablets-based” data replication architecture for faster scaling, he passed the mic over to Felipe to show it in action. Felipe covers: Parallel scaling operations (adding and removing nodes) – speed and impact on latency How new nodes can start servicing increased demand almost instantly Dynamic load balancing based on node capacity, including automated resharding for new/different instance types The demo starts with the following initial setup: 3-node cluster running on AWS i4i.xlarge Each node processing ~17,000 operations/second System load at ~50% Here’s a quick play-by-play… Scale out: Bootstrapped 3 additional i4i.large nodes in parallel New nodes start serving traffic once the first tablets arrive, before the entire data set is received. Tablets migration complete in ~3 minutes Writes are at sub-millisecond latencies; so are read latencies once the cache warms up (in the meantime, reads go to warmed up nodes, thanks to heat-weighted load balancing) Scale up: Added 3 nodes of a larger instance size (i4i.2xlarge, with double the capacity of the original nodes) and increased the client load The larger nodes receive more tablets and service almost twice the traffic than the smaller replicas (as appropriate for their higher capacity) The expanded cluster handles over 100,000 operations/second with the potential to handle 200,000-300,000 operations/second Downscale: A total of 6 nodes were decommissioned in parallel As part of the decommission process, tablets were migrated to other replicas Only 8 minutes were required to fully decommission 6 replicas while serving traffic A Special Raft for the ScyllaDB Sea Monster Starting with the ScyllaDB 6.0 release, topology metadata is managed by the Raft protocol. The process of adding, removing, and replacing nodes is fully linearized. This contributes to parallel operations, simplicity, and correctness. Read barriers and fencing are two interesting aspects of our Raft implementation. Basically, if a node doesn’t know the most recent topology, it’s barred from responding to related queries. This prevents, for example, a node from observing an incorrect topology state in the cluster – which could result in data loss. It also prevents a situation where a removed node or an external node using the same cluster name could silently come back or join the cluster simply by gossiping with another replica. Another difference: Schema versions are now linearized, and use a TimeUUID to indicate the most up-to-date schema. Linearizing schema updates not only makes the operation safer; it also considerably improves performance. Previously, a schema change could take a while to propagate via gossip – especially in large cluster deployments. Now, this is gone. TimeUUIDs provide an additional safety net. Since schema versions now contain a time-based component, ScyllaDB can ensure schema versioning, which helps with: Improved visibility on conditions triggering a schema change on logs Accurately restoring a cluster backup Rejecting out-of-order schema updates Tablets relieve operational pains The latest changes simplify ScyllaDB operations in several ways: You don’t need to perform operations one by one and wait in between them; you can just initiate the operation to add or remove all the nodes you need, all at once You no longer need to cleanup after you scale the cluster Resharding (the process of changing the shard count of an existing node) is simple. Since tablets are already split on a per-shard boundary, resharding simply updates the shard ownership Managing the system_auth keyspace (for authentication) is no longer needed. All auth-related data is now automatically replicated to every node in the cluster Soon, repairs will also be automated   Expect less: typeless, sizeless, limitless ScyllaDB’s path forward from here certainly involves less: typeless, sizeless, limitless. You could be typeless. You won’t have to think about instance types ahead of time. Do you need a storage-intensive instance like the i3ens, or a throughput-intensive instance like the i4is? It no longer matters, and you can easily transition or even mix among these. You could be sizeless. That means you won’t have to worry about capacity planning when you start off. Start small and evolve from there. You could also be limitless. You could start off anticipating a high throughput and then reduce it, or you could commit to a base and add on-demand usage if you exceed it.

How to Visualize ScyllaDB Tables and Run Queries with DBSchema

Learn how to connect DBSchema to ScyllaDB, visualize the keyspace, and run queries While ScyllaDB power users are quite accustomed to CQL, users getting started with ScyllaDB often ask if we offer a visual interface for designing database schema and running queries. That’s why we’re excited to share that DBSchema, a visual database design and management tool, just introduced support for ScyllaDB in its most recent release. With DBSchema, teams can centralize efforts to design database schemas and run queries across all major NoSQL and SQL databases (e.g. PostgreSQL, MongoDB, and Snowflake as well as ScyllaDB). DBSchema can be a great alternative to cqlsh, ScyllaDB’s standard command line tool. For example, in cqlsh, you can use the DESCRIBE KEYSPACES command to list all keyspaces in the database: Then, you can use the DESCRIBE TABLES command to list all tables per keyspace: With DBSchema this functionality is easier. Right after connecting to a database, you can see more than the table within a keyspace; you can also see the columns and column types. Both self-hosted and Cloud versions of ScyllaDB work with DBSchema. In this post, I will show you how to connect DBSchema to ScyllaDB, visualize the keyspace, and run queries. Before you get started, download DBSchema on Windows, Mac, or Linux. The free version also has ScyllaDB support. Connect to ScyllaDB in DBSchema To create a new ScyllaDB connection in DBSchema: Download DBSchema: https://dbschema.com/download.html Click the “Connect to Database” button, search for “ScyllaDB,” then click “Next.” Enter the database connection details as follows, then click “Connect.” Select the Keyspaces you want to use (you can select multiple), then click “OK.” DBSchema then reverse engineers your tables in the selected keyspaces. To write new queries, select “Editors -> “SQL Editor.” Don’t worry about the “SQL Editor” label; we’re writing CQL commands here, not SQL: Query examples Once the connection is set up, you can run all your CQL queries and see the result table immediately below your query. For example: Aside from simple SELECT queries, you can also run queries to create new objects in the database – for example, a materialized view: Wrapping up DBSchema is a unique tool in the sense that: It’s available in a free version You can run CQL The output is exactly the same as if you used cqlsh, but with a GUI It provides a seamless way to visualize your database keyspaces or test your CQL queries before using them in production Get started by downloading DBSchema and creating a new ScyllaDB Cloud cluster. Any questions? You can discuss this post and share your thoughts in our community forum.

Benchmarking MongoDB vs ScyllaDB: Social Media Workload Deep Dive

benchANT’s comparison of ScyllaDB vs MongoDB in terms of throughput, latency, scalability, and cost for a social media workload BenchANT recently benchmarked the performance and scalability of the market-leading general-purpose NoSQL database MongoDB and its performance-oriented challenger ScyllaDB. You can read a summary of the results in the blog Benchmarking MongoDB vs ScyllaDB: Performance, Scalability & Cost, see the key takeaways for various workloads in this technical summary, and access all results (including the raw data) from the complete benchANT report. This blog offers a deep dive into the tests performed for the social workload. This workload is based on the YCSB Workload B. It creates a read-heavy workload, with 95% read operations and 5% update operations. We use two shapes of this workload, which differ in terms of the request distribution patterns, namely uniform and hotspot distribution. These workloads are executed against the small database scaling size with a data set of 500GB and against the medium scaling size with a data set of 1TB. Before we get into the benchmark details, here is a summary of key insights for this workload. ScyllaDB outperforms MongoDB with higher throughput and lower latency for all measured configurations of the social workload. ScyllaDB provides up to 12 times higher throughput ScyllaDB provides significantly lower (down to 47 times) update latencies compared to MongoDB ScyllaDB provides lower read latencies, down to 5 times Throughput Results for MongoDB vs ScyllaDB The throughput results for the social workload with the uniform request distribution show that the small ScyllaDB cluster is able to serve 60 kOps/s with a cluster CPU utilization of ~85% while the small MongoDB cluster serves only 10 kOps/s under a comparable cluster utilization of 80-90%. For the medium cluster sizes, ScyllaDB achieves an average throughput of 232 kOps/s showing ~85% cluster utilization while MongoDB achieves 42 kOps/s at a CPU utilization of ~85%. The throughput results for the social workload with the hotspot request distribution show a similar trend, but with higher throughput numbers since the data is mostly read from the cache. The small ScyllaDB cluster serves 152 kOps/s while the small MongoDB serves 14 kOps/s. For the medium cluster sizes, ScyllaDB achieves an average throughput of 587 kOps/s and MongoDB achieves 48 kOps/s. Scalability Results for MongoDB vs ScyllaDB These results also enable us to compare the theoretical throughput scalability with the actually achieved throughput scalability. For this, we consider a simplified scalability model that focuses on compute resources. It assumes the scalability factor is reflected by the increased compute capacity from the small to medium cluster size. For ScyllaDB, this means we double the cluster size from 3 to 6 nodes and also double the instance size from 8 cores to 16 cores per instance, resulting in a theoretical scalability of 400%. For MongoDB, we move from one replica set of three data nodes to a cluster with three shards and nine data nodes and increase the instance size from 8 cores to 16 cores, resulting in a theoretical scalability factor of 600%. The ScyllaDB scalability results for the uniform and hotspot distributions both show that ScyllaDB is close to achieving linear scalability by achieving a throughput scalability of 386% (of the theoretically possible 400%). With MongoDB, the gap between theoretical throughput scalability and the actually achieved throughput scalability is significantly higher. For the uniform distribution, MongoDB achieves a scaling factor of 420% (of the theoretically possible 600%). For the hotspot distribution, we measure 342% (of the theoretically possible 600%). Throughput per Cost Ratio In order to compare the costs/month in relation to the provided throughput, we take the MongoDB Atlas throughput/$ as baseline (i.e. 100%) and compare it with the provided ScyllaDB Cloud throughput/$. The results for the uniform distribution show that ScyllaDB provides five times more operations/$ compared to MongoDB Atlas for the small scaling size and 5.7 times more operations/$ for the medium scaling size. For the hotspot distribution, the results show an even better throughput/cost ratio for ScyllaDB, providing 9 times more operations/$ for the small scaling size and 12.7 times more for the medium scaling size. Latency Results for MongoDB vs ScyllaDB For the uniform distribution, ScyllaDB provides stable and low P99 latencies for the read and update operations for the scaling sizes small and medium. MongoDB generally has higher P99 latencies. Here, the read latencies are 2.8 times higher for the small scaling size and 5.5 times higher for the medium scaling size. The update latencies show an even more distinct difference; MongoDB’s P99 update latency in the small scaling size is 47 times higher compared to ScyllaDB and 12 times higher in the medium scaling size. For the hotspot distribution, the results show a similar trend for the stable and low ScyllaDB latencies. For MongoDB, read and update latencies increase from the small to medium scaling size. It is interesting that in contrast to the uniform distribution, the read latency only increases by a factor of 2.8 while the update latency increases by 970%. Technical Nugget – Performance Impact of the Data Model The default YCSB data model is composed of a primary key and a data item with 10 fields of strings that results in a document with 10 attributes for MongoDB and a table with 10 columns for ScyllaDB. We analyze how performance changes if a pure key-value data model is applied for both databases: a table with only one column for ScyllaDB and a document with only one field for MongoDB The results show that for ScyllaDB the throughput improves by 24% while for MongoDB the throughput increase is only 5%.   Technical Nugget – Performance Impact of the Consistency Level All standard benchmarks are run with the MongoDB client consistency writeConcern=majority/readPreference=primary and for ScyllaDB with writeConsistney=QUORUM/readConsistency=QUORUM. Besides these client consistent configurations, we also analyze the performance impact of weaker read consistency settings. For this, we enable MongoDB to read from the secondaries (readPreference=secondarypreferred) and set readConsistency=ONE for ScyllaDB. The results show an expected increase in throughput: for ScyllaDB 56% and for MongoDB 49%. Continue Comparing ScyllaDB vs MongoDB Here are some additional resources for learning about the differences between MongoDB and ScyllaDB: Benchmarking  MongoDB vs ScyllaDB: Results from benchANT’s complete benchmarking study that comprises 133 performance and scalability measurements that compare MongoDB against ScyllaDB. Benchmarking MongoDB vs ScyllaDB: Caching Workload Deep Dive: benchANT’s comparison of  ScyllaDB vs MongoDB in terms of throughput, latency, scalability, and cost for a caching workload (50% read operations and 50% update operations). Benchmarking MongoDB vs ScyllaDB: IoT Sensor Workload Deep Dive: benchANT’s comparison of  ScyllaDB vs MongoDB in terms of throughput, latency, scalability, and cost for a workload simulating an IoT sensor (90% insert operations and 10% read operations). A Technical Comparison of MongoDB vs ScyllaDB: benchANT’s technical analysis of how MongoDB and ScyllaDB compare with respect to their features, architectures, performance, and scalability. ScyllaDB’s MongoDB vs ScyllaDB page: Features perspectives from users – like Discord – who have moved from MongoDB to ScyllaDB.

ScyllaDB in Action Book Excerpt: ScyllaDB, a Distributed Database

How does ScyllaDB provide scalability and fault tolerance by distributing its data across multiple nodes? Read what Bo Ingram (Staff Engineer at Discord) has to say – in this excerpt from the book “ScyllaDB in Action.” Editor’s note: We’re honored to share the following excerpt from Bo Ingram’s informative – and fun! – new book on ScyllaDB: ScyllaDB in Action. You might have already experienced Bo’s expertise and engaging communication style in his blog How Discord Stores Trillions of Messages or ScyllaDB Summit talks How Discord Migrated Trillions of Messages from Cassandra to ScyllaDB and  So You’ve Lost Quorum: Lessons From Accidental Downtime  If not, you should 😉 You can purchase the full 370-page book from Manning.com. You can also access a 122-page early-release digital copy for free, compliments of ScyllaDB. The book excerpt includes a discount code for 45% off the complete book. Get the 122-page PDF for free The following is an excerpt from Chapter 1; it’s reprinted here with permission of the publisher. *** ScyllaDB runs multiple nodes, making it a distributed system. By spreading its data across its deployment, it uses that to achieve its desired availability and consistency, which, when combined, differentiates the database from other systems. All distributed systems have a bar to meet: they must deliver enough value to overcome the introduced complexity. ScyllaDB, designed to be a distributed system, achieves its scalability and fault tolerance through this design. When users write data to ScyllaDB, they start by contacting any node. Many systems follow a leader-follower topology, where one node is designated as a leader, giving it special responsibilities within the system. If the leader dies, a new leader is elected, and the system continues operating. ScyllaDB does not follow this model; each node is as special as any other. Without a centralized coordinator deciding who stores what, each node must know where any given piece of data should be stored. Internally, Scylla can map a given row to the node that owns it, forwarding requests to the appropriate nodes by calculating its owner using the hash ring that you’ll learn about in chapter 3. To provide fault tolerance, ScyllaDB not only distributes data but replicates it across multiple nodes. The database stores a row in multiple locations – the amount depends upon the configured replication factor. In a perfect world, each node acknowledges every request instantly every time, but what happens if it doesn’t? To help with unexpected trouble, the database provides tunable consistency. How you query data is dependent on what degree of consistency you’re looking to get. ScyllaDB is an eventually consistent database, and you perhaps will see inconsistent data as the system converges toward consistency. Developers must keep this eventual consistency in mind when working with the database. To facilitate the various needs of consistency, ScyllaDB provides a variety of consistency levels for queries, including those listed in table 1.1. With a consistency level of ALL, you can require that all replicas for a key acknowledge a query, but this setting harms availability. You can no longer tolerate the loss of a node. With a consistency level of ONE, you require a single replica for a key to acknowledge a query, but this greatly increases our chances of inconsistent results. Luckily, some options aren’t as extreme. ScyllaDB lets you tune consistency via the concept of quorums. A quorum is when a group has a majority of members. Legislative bodies, such as the US Senate, do not operate when the number of members present is below the quorum threshold. When applied to ScyllaDB, you can achieve intermediate forms of consistency. With a QUORUM consistency level, the database requires a majority of replicas for a key to acknowledge a query. If you have three replicas, two of them must accept every read and every write. If you lose one node, you can still rely on the other two to keep serving traffic. You additionally guarantee that a majority of your nodes get every update, preventing inconsistent data if you use the same consistency level when reading. Once you have picked your consistency level, you know how many replicas you need to execute a successful query. A client sends a request to a node, which serves as the coordinator for that query. Your coordinator node reaches out to the replicas for the given key, including itself if it is a replica. Those replicas return results to the coordinator, and the coordinator evaluates them according to our consistency. If it finds the result satisfies the consistency requirements, it returns the result to the caller. The CAP theorem (https://en.wikipedia.org/wiki/CAP_theorem) classifies distributed systems by saying that they cannot provide all three of these properties – consistency, availability, and network partition tolerance, as seen in figure 1.8. For the CAP theorem’s purposes, we define consistency as every request reading the most recent write; it’s a measure of correctness within the database. Availability is whether the system can serve requests, and network partition tolerance is the ability to handle a disconnected node. Figure 1.8 The CAP theorem says a database can only provide two of three properties — consistency, availability, and partition tolerance. ScyllaDB is classified as an AP system. According to the CAP theorem, a distributed system must have partition tolerance, so it ultimately chooses between consistency and availability. If a system is consistent, it must be impossible to read inconsistent data. To achieve consistency, it must ensure that all nodes receive all necessary copies of data. This requirement means it cannot tolerate the loss of a node, therefore losing availability. TIP: In practice, systems aren’t as rigidly classified as the CAP theorem suggests. For a more nuanced discussion of these properties, you can research the PACELC theorem (https://en.wikipedia.org/wiki/PACELC_theorem), which illustrates how systems make partial tradeoffs between latency and consistency. ScyllaDB is typically classified as an AP system. When encountered with a network partition, it chooses to sacrifice consistency and maintain availability. You can see this in its design – ScyllaDB repeatedly makes choices, via quorums and eventual consistency, to keep the system up and running in exchange for potentially weaker consistency. By emphasizing availability, you see one of ScyllaDB’s differentiators against its most popular competition — relational databases.

Getting Started with Database-Level Encryption at Rest in ScyllaDB Cloud

Learn about ScyllaDB database-level encryption with Customer-Managed Keys & see how to set up and manage encryption with a customer key — or delegate encryption to ScyllaDB ScyllaDB Cloud takes a proactive approach to ensuring the security of sensitive data: we provide database-level encryption in addition to the default EC2 storage-level encryption. With this added layer of protection, customer data is always protected against attacks. Customers can focus on their core operations, knowing that their critical business and customer assets are well-protected. Customers can either use a customer-managed key or let ScyllaDB Cloud manage a key for them. This article explains how ScyllaDB Cloud protects customer data. It focuses on the technical aspects of ScyllaDB database-level encryption with Customer-Managed Keys (CMK). Specifically, it includes a walkthrough of how to set up and manage encryption on ScyllaDB Cloud clusters with a customer key or how to delegate encryption to ScyllaDB. Storage-level encryption Encryption at rest is when data files are encrypted before being written to persistent storage. ScyllaDB Cloud always uses encrypted volumes to prevent data breaches caused by physical access to disks. Database-level encryption Database-level encryption is a technique for encrypting all data before it is stored in the database. 
 The ScyllaDB Cloud feature is based on the proven ScyllaDB Enterprise database-level encryption at rest, extended with the Customer Managed Keys (CMK) encryption control. This ensures that the data is securely stored – and the customer is the one holding the key. The keys are stored and protected separately from the database, substantially increasing security. ScyllaDB Cloud provides full database-level encryption using the Customer Managed Keys (CMK) concept. It is based on envelope encryption to encrypt the data and decrypt only when the data is needed. This is essential to protect the customer data at rest. Some industries, like healthcare or finance, have strict data security regulations. Encrypting all data helps businesses comply with these requirements, avoiding the need to prove that all tables holding sensitive personal data are covered by encryption. It also helps businesses protect their corporate data, which can be even more valuable. A key feature of CMK is that the customer has complete control of the encryption keys. Data encryption keys will be introduced later (it is confusing to cover them at  the beginning). The customer can: Revoke data access at any time Restore data access at any time Manage the master keys needed for decryption Log all access attempts to keys and data Customers can delegate all key management operations to the ScyllaDB Cloud support team if they prefer this. To do this, the customer can choose the ScyllaDB key when creating the cluster. To ensure customer data is secure and adheres to all privacy regulations. By default, encryption uses the symmetrical algorithm AES-128, a solid corporate encryption standard covering all practical applications. Breaking AES-128 can take an immense amount of time, approximately trillions of years. The strength can be increased to to AES-256. Note: Database-level encryption in ScyllaDB Cloud is available for all clusters deployed in Amazon Web Services. Support for other cloud services, like Google Cloud KMS, will come later this year. Encryption To ensure all user data is protected, ScyllaDB will encrypt: All user tables Commit logs Batch logs Hinted handoff data This ensures all customer data is properly encrypted. The first step of the encryption process is to encrypt every record with a data encryption key (DEK). Once the data is encrypted with the DEK, it is sent to AWS KMS, where the master key (MK) resides. The DEK is then encrypted with the master key (MK), producing an encrypted DEK (EDEK or a wrapped key). The master key remains in the KMS, while the EDEK is returned and stored with the data. The DEK used to encrypt the data is destroyed to ensure data protection. A new DEK will be generated the next time new data needs to be encrypted. Decryption Because the original non-encrypted DEK is destroyed when the EDEK was produced, the data cannot be decrypted. The EDEK cannot be used to decrypt the data directly because the DEK key is encrypted. It has to be decrypted, and for that, the master key will be required again. This can only be decrypted with the master key(MK) in the KMS. Once the DEK is unwrapped, the data can be decrypted. As you can see, the data cannot be decrypted without the master key – which is protected at all times in the KMS and cannot be “copied” outside of KMS. By revoking the master key, the customer can disable access to the data independently from the database or application authorization. Multi-region deployment Adding new data centers to the ScyllaDB cluster will create additional local keys in those regions. All master keys support multi-regions, and a copy of each key resides locally in each region – ensuring those multi-regional setups are protected from regional outages for the cloud provider and against disaster. The keys are available in the same region as the data center and can be controlled independently. In case you use a Customer Key – AWS KMS will charge $1/month for each cluster prorated per hour. Each additional DC creates a replica that is counted as an additional key. There is an additional cost per key request. ScyllaDB Enterprise utilizes those requests efficiently, resulting in an estimated monthly cost of up to $1 for a 9-node cluster. Managing encryption keys adds another layer of administrative work in addition to the extra cost. ScyllaDB Cloud offers database clusters that can be encrypted using keys managed by ScyllaDB support. They provide the same level of protection, but our support team helps you manage the master keys. The ScyllaDB keys are applied by default and are free to our customers. Creating a Cluster with Database-Level Encryption Creating a cluster with database-level encryption requires: A ScyllaDB Cloud account – If you don’t have one, you can create a ScyllaDB Cloud account here. 10 minutes with ScyllaDB Key 20 minutes creating your own key To create a cluster with database-level encryption enabled, we will need a master key. We can either create a customer-managed key using ScyllaDB Cloud UI or skip this step completely and use a ScyllaDB Managed Key, which will skip the next six steps. In both cases, all the data will be protected by strong encryption at the database level. Create a Customer Managed Key After logging into the ScyllaDB Cloud portal, select the “Security” option from the user menu. Then, we can use the “Add Key” option. The Add Key menu will be presented. It allows selection of cloud providers and regions. Currently, only the region can be selected. Select the region where you plan to deploy your cluster. For multi-DC setups, choose the cluster where your first cluster will be. For my cluster, it will be US East. Click the “Set Key.” The key is successfully configured, but it also has to be created in AWS on my behalf so I can have full control over it. This is done using CloudFormation Stack, which allows you to execute a cloud stack with your AWS permissions that will provision the key directly in AWS KMS. Click Launch Stack in the summary pop-up to open the CloudFormation Stack in a new tab and provision your key in AWS. The AWS page will request a login and “StackPrincipal.” The role of the “StackPrincipal” has to be provided in ARN format. Assuming your user has enough permissions, the following cloudshell command will return the ARN as the last line of the response. [CloudShell-user@ip-10-136-55-116 ~]$ aws sts get-caller-identity { "UserId": "AR**********************************domain.com", "Account": "023***********", "Arn": "arn:aws:sts::023**************************domain.com" } Once you successfully execute the stack, the key should be created, and ScyllaDB will get permission to use it with a cluster of your choice. In ScyllaDB, you will see the key name and a green “available,” meaning the key is ready and can be assigned. You can still manage the key. However, if you disable it, it can no longer encrypt or decrypt the data. The next step is to create a cluster with the new key. Choose the Dedicated VM or the Free Trial option from the New Cluster menu. Then, select Customer Key, and you can select the key we created. The key will show only if the cluster and key regions match. Use ScyllaDB Managed Key If you prefer to use ScyllaDB Managed Key, skip all the above steps and choose ScyllaDB Key instead. This is an easier option that encrypts the data on the database level with the same encryption but the key will be managed by ScyllaDB Cloud agents. Create an Encrypted Cluster Once the master key is chosen, click Next and wait a few minutes until the cluster is created with the selected encryption option. You can see the indicator for encryption at rest will be enabled. That’s it! Your cluster is now using database-level encryption with the selected master key. Transparent database-level encryption in ScyllaDB Cloud significantly boosts the security of your ScyllaDB clusters and backups.   Next Steps Start using this feature in ScyllaDB Cloud. Get your questions answered in our community forum and Slack channel. Or, use our contact form.

How Freshworks Cut Database P99 Latency by 95% – with Lower Costs

How Freshworks tackled high tail latencies, Cassandra admin burden, and any little surge causing an increase in timeouts Freshworks creates AI-boosted business software that is purpose-built for IT, customer support, sales, and marketing teams to work more efficiently. Given their scale, managing petabytes of data across multiple RDBMS and NoSQL databases was a challenge. Preparing for 10x growth under such circumstances required a strategic approach that would allow them to scale without interrupting business continuity. Spoiler: this approach included ScyllaDB. In the following video, Sunderjeet Singh (ScyllaDB India Manager) kicks off with an introduction to ScyllaDB and Freshworks. Then, Sreedhar Gade (VP of Engineering at Freshworks) shares how Freshworks architected a solution that enables the company to scale operations while keeping costs under control.  Here are highlights from the talk, as shared by Sreedhar Gade… About Freshworks Freshworks was founded in 2010 with the goal of empowering millions of companies across the world in multiple domains. The company went public in 2021. Today Freshworks’ revenue is near $600 million. We are relied upon by customers in over 120 countries, and have earned many recognitions across industry verticals. Technical Challenges From an application perspective, serving Freshworks’ global customer base requires the team to serve products and data with ultra-low latency and high performance. When using Cassandra, the team faced challenges such as: High tail latencies. Every SaaS product vendor is good at serving with high performance up to the 80th or 90th percentiles. But the long tail is where the performance actually starts getting impacted. Improving this can really improve the customer experience. Administrative burden. We don’t want to keep adding SREs and database engineers in step with our company growth. We want to make sure that we stay lean and mean – but still be able to manage a large fleet of database instances. Any slight surge in traffic led to an increase in timeouts.  Any surge in traffic could introduce problems. And with a global customer base, traffic patterns are quite unpredictable. Surges can lead to timeouts – unless we’re able to rapidly scale up and down. Why ScyllaDB ScyllaDB proved that it could solve these challenges for our former Cassandra use cases. It helps us deliver engaging experiences to our customers across the world. It helps us reduce toil for our engineers. It’s easy to scale up. And more importantly, it’s very cost effective — easy on the eyes for our CFO. 😉 Migrating from Cassandra to ScyllaDB To start the migration, we enabled zero downtime dual writes on the Cassandra databases that we wanted to migrate to ScyllaDB. Then, we took a snapshot of the existing Cassandra cluster and created volumes in the ScyllaDB cluster. We started with around 10 TB as part of this project, then moved it forward in different phases. And once the Cassandra migration was done, we used the CDM migrator to validate the migration quality. The Results So Far We are currently live with ScyllaDB in one of the regions, and we’ve been able to migrate about 25% of the data (more than two terabytes) as part of this project. We have already achieved a 20X reduction in tail latency – we brought the P99 latency down from one second to 50 milliseconds. What’s Next with ScyllaDB at Freshworks There are many more opportunities with ScyllaDB at Freshworks, and we have great plans going forward. One of the major projects we’re considering involves taking the text/BLOB data that’s currently stored in MySQL and moving it into ScyllaDB. We expect that will give us cost benefits as well as a performance boost. We are also looking to use ScyllaDB to improve the scalability, performance, and maintenance-related activities across our existing Cassandra workloads, across all our business units and products. This will help ensure that our products can scale 10x and scale on demand.

Medusa 0.16 was released

The k8ssandra team is happy to announce the release of Medusa for Apache Cassandra™ v0.16. This is a special release as we did a major overhaul of the storage classes implementation. We now have less code (and less dependencies) while providing much faster and resilient storage communications.

Back to ~basics~ official SDKs

Medusa has been an open source project for about 4 years now, and a private one for a few more. Over such a long time, other software it depends upon (or doesn’t) evolves as well. More specifically, the SDKs of the major cloud providers evolved a lot. We decided to check if we could replace a lot of custom code doing asynchronous parallelisation and calls to the cloud storage CLI utilities with the official SDKs.

Our storage classes so far relied on two different ways of interacting with the object storage backends:

  • Apache Libcloud, which provided a Python API for abstracting ourselves from the different protocols. It was convenient and fast for uploading a lot of small files, but very inefficient for large transfers.
  • Specific cloud vendors CLIs, which were much more efficient with large file transfers, but invoked through subprocesses. This created an overhead that made them inefficient for small file transfers. Relying on subprocesses also created a much more brittle implementation which led the community to create a lot of issues we’ve been struggling to fix.

To cut a long story short, we did it!

  • We started by looking at S3, where we went for the official boto3. As it turns out, boto3 does all the chunking, throttling, retries and parallelisation for us. Yay!
  • Next we looked at GCP. Here we went with TalkIQ’s gcloud-aio-storage. It works very well for everything, including the large files. The only thing missing is the throughput throttling.
  • Finally, we used Azure’s official SDK to cover Azure compatibility. Sadly, this still works without throttling as well.

Right after finishing these replacements, we spotted the following improvements:

  • The integration tests duration against the storage backends dropped from ~45 min to ~15 min.
    • This means Medusa became far more efficient.
    • There is now much less time spent managing storage interaction thanks to it being asynchronous to the core.
  • The Medusa uncompressed image size we bundle into k8ssandra dropped from ~2GB to ~600MB and its build time went from 2 hours to about 15 minutes.
    • Aside from giving us much faster feedback loops when working on k8ssandra, this should help k8ssandra itself move a little bit faster.
  • The file transfers are now much faster.
    • We observed up to several hundreds of MB/s per node when moving data from a VM to blob storage within the same provider. The available network speed is the limit now.
    • We are also aware that consuming the whole network throughput is not great. That’s why we now have proper throttling for S3 and are working on a solution for this in other backends too.

The only compromise we had to make was to drop Python 3.6 support. This is because the Pythons asyncio features only come in Python 3.7.

The other good stuff

Even though we are the happiest about the storage backends, there is a number of changes that should not go without mention:

  • We fixed a bug with hierarchical storage containers in Azure. This flavor of blob storage works more like a regular file system, meaning it has a concept of directories. None of the other backends do this (including the vanilla Azure ones), and Medusa was not dealing gracefully with this.
  • We are now able to build Medusa images for multiple architectures, including the arm64 one.
  • Medusa can now purge backups of nodes that have been decommissioned, meaning they are no longer present in the most recent backups. Use the new medusa purge-decommissioned command to trigger such a purge.

Upgrade now

We encourage all Medusa users to upgrade to version 0.16 to benefit from all these storage improvements, making it much faster and reliable.

Medusa v0.16 is the default version in the newly released k8ssandra-operator v1.9.0, and it can be used with previous releases by setting the .spec.medusa.containerImage.tag field in your K8ssandraCluster manifests.