WebAssembly: Putting Code and Data Where They Belong

Brian Sletten and Piotr Sarna chat about Wasm + data trends at ScyllaDB Summit

ScyllaDB Summit isn’t just about ScyllaDB. It’s also a space for learning about the latest trends impacting the broader data and the database world. Case in point: the WebAssembly discussions at ScyllaDB Summit 23.

Free Registration for ScyllaDB Summit 24

Last year, the conference featured two talks that addressed Wasm from two distinctly different perspectives: Brian Sletten (WebAssembly: The Definitive Guide author and President at Bosatsu Consulting) and Piotr Sarna (maintainer of libSQL, which supports Wasm user-defined functions and compiles to WebAssembly).

Brian’s Everything in its Place: Putting Code and Data Where They Belong talk shared how Wasm challenges the fundamental assumption that code runs on computers and data is stored in databases. And Piotr’s libSQL talk introduced, well, libSQL (Turso’s fork of SQLite that’s modernized for edge computing and distributed systems) and what’s behind its Wasm-powered support for dynamic function creation and execution.

See Brian’s Talk & Deck

See Piotr’s Talk & Deck

As the event unfolded, Brian and Piotr met up for a casual Wasm chat. Here are some highlights from their discussion…

How people are using WebAssembly to create a much closer binding between the data and the application

Brian: From mainframe, to client server, to extending servers with things like servlets, Perl scripts, and cgi-bin, I think we’ve just really been going back and forth, and back and forth, back and forth. The reality is that different problems require different topologies. We’re gaining more topologies to support, but also tools that give us support for those more topologies.

The idea that we can co-locate large amounts of data for long-term training sessions in the cloud avoids having to push a lot of that back and forth. But the more we want to support things like offline applications and interactions, the more we’re going to need to be able to capture data on-device and on-browser. So, the idea of being able to push app databases into the browser, like DuckDB and SQLite, and Postgres, and – I fully expect someday – ScyllaDB as well, allows us to say, “Let’s run the application, capture whatever interactions with the user we want locally, then sync it incrementally or in an offline capacity.”

We’re getting more and more options for where we co-locate these things, and it’s going to be driven by the business requirements as much as technical requirements for certain kinds of interactions. And things like regulatory compliance are a big part of that as well. For example, people might want to schedule work in Europe – or outside of Europe – for various regulatory reasons. So, there are lots of things happening that are driving the need, and technologies like WebAssembly and LLVM are helping us address that.

How Turso’s libSQL lets users get their compute closer to their data

Piotr: libSQL is actually just an embedded database library. By itself, it’s not a distributed system – but, it’s a very nice building block for one. You can use it at the edge. That’s a very broad term; to be clear, we don’t mean running software on your fridges or light bulbs. It’s just local data centers that are closer to the users. And we can build a distributed database by having lots of small instances running libSQL and then replicating to it. CRDTs – basically, this offline-first approach where users can interact with something and then sync later – is also a very interesting area. Turso is exploring that direction. And there’s actually another project called cr-sqlite that applies CRDTs to SQLite. It’s close to the approach we’d like to take with libSQL. We want to have such capabilities natively so users can write these kinds of offline-first applications and the system knows how to resolve conflicts and apply changes later.

Moving from “Web Scale” big data to a lot of small data

Brian: I think this shift represents a more realistic view. Our definition of scale is “web scale.” Nobody would think about trying to put the web into a container. Instead, you put things into the web. The idea that all of our data has to go into one container is something that clearly has an upper limit. That limit is getting bigger over time, but it will never be “all of our data.”
Protocols and data models that interlink loosely federated collections of data (on-demand, as needed, using standard query mechanisms) will allow us to keep the smaller amounts of data on-device, and then connect it back up. You’ve learned additional things locally that may be of interest when aggregated and connected. That could be an offline syncing to real-time linked data kinds of interactions.

Really, this idea of trying to “own” entire datasets is essentially an outdated mentality. We have to get it to where it needs to go, and obviously have the optimizations and engineering constraints around the analytics that we need to ask. But the reality is that data is produced in lots of different places. And having a way to work with different scenarios of where that data lives and how we connect it is all part of that story.

I’ve been a big advocate for linked data and knowledge, graphs, and things like that for quite some time. That’s where I think things like WebAssembly and linked data and architectural distribution, and serverless functions, and cloud computing and edge computing are all sort of coalescing on this fluid fabric view of data and computation.

Trends: From JavaScript to Rust, WebAssembly, and LLVM

Brian: JavaScript is a technology that began in the browser, but was increasingly pushed to the back end. For that to work, you either need a kind of flat computational environment where everything is the same, or something where we can virtualize the computation in the form of Java bytecode, or .NET, CIL, or whatever. JavaScript is part of that because we try to have a JavaScript engine that runs in all these different environments. You get better code reuse, and you get more portability in terms of where the code runs. But JavaScript itself also has some limitations in terms of the kinds of low-level stuff that it can handle (for example, there’s no real opportunity for ahead-of-time optimizations).

Rust represents a material advancement in languages for system engineering and application development. That then helps improve the performance and safety of our code. However, when Rust is built on the LLVM infrastructure, its pluggable back-end capability allows it to emit native code, emit WebAssembly, and even WASI-flavored WebAssembly. This ensures that it runs within an environment providing the capabilities required to do what it needs…and nothing else.

That’s why I think the intersection of architecture, data models, and computational substrates – and I consider both WebAssembly and LLVM important parts of those – to be a big part of solving this problem. I mean, the reason Apple was able to migrate from x86 to ARM relatively quickly is because they changed their tooling to be LLVM-based. At that point, it becomes a question of recompiling to some extent.

What’s needed to make small localized databases on the edge + big backend databases a successful design pattern

Piotr: These smaller local databases that are on this magical edge need some kind of source of truth that keeps everything consistent. Especially if ScyllaDB goes big on stronger consistency levels (with Raft), I do imagine a design where you can have these small local databases (say, libSQL instances) that are able to store user data, and users interact with them because they’re close by. That brings really low latency. Then, these small local databases could be synced to a larger database that becomes the single source of truth for all the data, allowing users to access this global state as well.

Watch the Complete WebAssembly Chat

You can watch the complete chat Brian <> Piotr chat here:



How ShareChat Performs Aggregations at Scale with Kafka + ScyllaDB

Sub-millisecond P99 latencies – even over 1M operations per second

ShareChat is India’s largest homegrown social media platform, with ~180 million monthly average users and 50 million daily active users. They capture and aggregate various engagement metrics such as likes, views, shares, comments, etc., at the post level to curate better content for their users.

Since engagement performance directly impacts users, ShareChat needs a datastore that offers ultra-low latencies while remaining highly available, resilient, and scalable – ideally, all at a reasonable cost. This blog shares how they accomplished that using in-house Kafka streams and ScyllaDB. It’s based on the information that Charan Movva, Technical Lead at ShareChat, shared at ScyllaDB Summit 23.

Join us at ScyllaDB Summit 24 to hear more firsthand accounts of how teams are tackling their toughest database challenges. Disney, Discord, Paramount, Expedia, and more are all on the agenda. Plus we’ll be featuring more insights from ShareChat. Ivan Burmistrov and Andrei Manakov will be presenting two tech talks: “Getting the Most Out of ScyllaDB Monitoring: ShareChat’s Tips” and “From 1M to 1B Features Per Second: Scaling ShareChat’s ML Feature Store.”

Register Now – It’s Free

ShareChat: India’s Largest Social Media Platform

ShareChat aims to foster an inclusive community by supporting content creation and consumption in 15 languages. Allowing users to share experiences in the language they’re most comfortable with increases engagement – as demonstrated by their over 1.3 billion monthly shares, with users spending an average of 31 minutes per day on the app.

As all these users interact with the app, ShareChat collects events, including post views and engagement actions such as likes, shares, and comments. These events, which occur at a rate of 370k to 440k ops/second, are critical for populating the user feed and curating content via their data science and machine learning models. All of this is critical for enhancing the user experience and providing valuable content to ShareChat’s diverse community.

Why Stream Processing?

The team considered three options for processing all of these engagement events:

  • Request-response promised the lowest latency. But given the scale of events they handle, it would burden the database with too many connections and (unnecessary) writes and updates. For example, since every number of likes between 12,500 likes and 12,599 likes is displayed as “12.5 likes,” those updates don’t require the level of precision that this approach offers (at a price).
  • Batch processing offered high throughput, but also brought challenges such as stale data and delayed updates, especially for post engagements. This is especially problematic for early engagement on a post (imagine a user sharing something, then incorrectly thinking that nobody likes it because your updates are delayed).
  • Stream processing emerged as a well-balanced solution, offering continuous and non-blocking data processing. This was particularly important given their dynamic event stream with an unbounded and ever-growing dataset. The continuous nature of stream processing bridged the gap between request-response and batching.

Charan explained, “Our specific technical requirements revolve around windowed aggregation, where we aggregate events over predefined time frames, such as the last 5 or 10 minutes. Moreover, we need support for multiple aggregation windows based on engagement levels, requiring instant aggregation for smaller counters and a more flexible approach for larger counters.”

Additional technical considerations include:

  • Support for triggers, which are vital for user engagement
  • The ease of configuring new counters, triggers, and aggregration windows, which enables them to quickly evolve the product

Inside The ShareChat Architecture

Here’s a look at the architecture they designed to satisfy these requirements.

Various types of engagement events are captured by backend services, then sent to the Kafka cluster. Business logic at different services captures and derives internal counters crucial for understanding and making decisions about how the post should be displayed on the feed. Multiple Kafka topics cater to various use cases.

Multiple instances of the aggregation service are all running on Kubernetes. The Kafka Streams API handles the core logic for windowed aggregations and triggers. Once aggregations are complete, they update the counter or aggregated value in ScyllaDB and publish a change log to a Kafka topic.

Under the hood, ShareChat taps Kafka’s consistent hashing to prevent different messages or events for a given entity ID (post ID) and counter from being processed by different Kubernetes pods or instances. To support this, the combination of entity ID and counter is used as the partition key, and all its relevant events will be processed by the same consumers.

All the complex logic related to windowing and aggregations is managed using the Kafka Streams. Each stream processing application executes a defined topology, essentially a directed acyclic graph (DAG). Events are pushed through a series of transformations, then the aggregated values are then updated in the data store, which is ScyllaDB.

Exploring the ShareChat Topology

Here’s how Charan mapped out their topology.

Starting from the top left:

  • They consume events from an engagement topic, apply basic filtering to check if the counter is registered, and divert unregistered events for monitoring and logging.
  • The aggregation window is defined based on the counter’s previous value, branching the main event stream into different streams with distinct windows. To handle stateful operations, especially during node shutdowns or rebalancing, Kafka Streams employs an embedded RocksDB for in-memory storage, persisting data to disk for rapid recovery.
  • The output stream from aggregations is created by merging the individual streams, and the aggregated values are updated in the data store for the counters. They log changes in a change log topic before updating counters on the data store.

Next, Charan walked through some critical parts of their code, highlighting design decisions such as their grace second setting and sharing how aggregations and views were implemented. He concluded, “Ultimately, this approach achieved a 7:1 ratio of events received as inputs compared to the database writes. That resulted in an approximately 80% reduction in messages.”

Where ScyllaDB Comes In

As ShareChat’s business took off, it became clear that their existing DBaaS solution (from their cloud provider) couldn’t provide acceptable latencies. That’s when they decided to move to ScyllaDB.

As Charan explained, “ScyllaDB is continuously delivering sub-millisecond latencies for the counters cluster. In addition to meeting our speed requirements, it also provides critical transparency into the database through flexible monitoring. Having multiple levels of visibility—data center, cluster, instance, and shards—helps us identify issues, abnormalities, or unexpected occurrences so we can react promptly.”

The ScyllaDB migration has also paid off in terms of cost savings: they’ve cut database costs by over 50% in the face of rapid business growth.

For the engagement counters use case, ShareChat runs a three-node ScyllaDB cluster. Each node has 48 vCPUs and over 350 GB of memory. Below, you can see the P99 read and write latencies: all microseconds, even under heavy load.

With a similar setup, they tested the cluster’s response to an extreme (but feasible) 1.2 million ops/sec by replicating events in a parallel cluster. Even at 90% load, the cluster remained stable with minimal impact on their ultra-low latencies.

Charan summed it up as follows: “ScyllaDB has helped us a great deal in optimizing our application and better serving our end users. We are really in awe of what ScyllaDB has to offer and are expanding ScyllaDB adoption across our organization.”

Watch the Complete Tech Talk

You can watch the complete tech talk and skim through the deck in our tech talk library

Watch the Full Tech Talk

Learn more in this ShareChat blog

The ScyllaDB NoSQL Community: Shaping the Future

Get involved with the ScyllaDB community – via forums, open source contributions, events & more – and help shape the future of ScyllaDB 

ScyllaDB was created from the get-go as an open-source, community-driven solution. Across the wide and dynamic landscape of modern databases, ScyllaDB stands out as a high-performance, distributed NoSQL database – especially when single-digit millisecond latency and high scalability are required. A big part of this success can be attributed to a thriving open-source community that plays a crucial role in shaping and enhancing this cutting-edge technology.

Being open-source also has a strong impact on our company culture, but that’s a topic for a different post.

The ScyllaDB community is vibrant and growing. Our Slack Channel has thousands of users. Our Community Forum, launched last year, has become the go-to place for meaningful conversations.

No matter how you prefer to engage, we welcome discussions, bug reports, fixes, feature contributions, feature requests, documentation improvements, and other ways to make ScyllaDB even faster, more flexible, and more robust.

Say hello & share how you’re using ScyllaDB

How to Get Involved

The community, which comprises developers, engineers, users, and enthusiasts from around the globe, actively contributes to ScyllaDB’s growth and evolution of the database. The collaborative nature of open source allows for diverse perspectives and expertise to come together for continuous improvements and valuable innovations.

Here are a few ways to get involved:

There are also online communities in languages beyond English, as well as related open-source projects: the Seastar framework, the ScyllaDB Monitoring Stack, the Kubernetes ScyllaDB Operator, the Go CQL Extension (GoCQLX) driver, and more.

ScyllaDB Community Forum

It’s been a bit over a year since we went live with the ScyllaDB Community Forum. We chose Discourse as the underlying platform.

ScyllaDB developers actively participate in forum discussions, offering insights, addressing technical queries, and providing updates on the latest features and developments. This direct interaction between the community and the core development team fosters a sense of transparency and collaboration, enhancing the product and the overall user experience.

Here is a taste of some popular topics. You can see many more discussions on the forum itself.

Popular ScyllaDB Forum Topics

The differences between column families in Cassandra’s data model compared to Bigtable

In this discussion, a user is seeking clarification on the relationship between Cassandra’s column-family-based data model and Google’s Bigtable. The user notes the multi-dimensional sparse map structure in Bigtable, where data is organized by rows, columns, and time.

The explanation is that, initially, Cassandra’s data model was indeed based on Bigtable’s, where a row could include any number of columns, following a schema-less approach. However, as Cassandra evolved, the developers recognized the value of schemas for application correctness. The introduction of Cassandra Query Language (CQL) around Cassandra version 0.8 marked a shift towards a more structured approach with clustering keys and schemas.

The response emphasizes the importance of clustering keys, which define the structure within wide rows (now called partitions). Cassandra internally maintains a dual representation, converting user-facing CQL rows into old-style wide rows on disk.

Listing all keyspaces in a ScyllaDB (or Cassandra) cluster

The topic deals with retrieving a list of all existing keyspaces in a cluster. This is useful, for example, when a user forgets the keyspace name they previously created. The provided answer suggests using the CQL Shell command DESC KEYSPACES or DESCRIBE to achieve this. This command works identically for both ScyllaDB and Cassandra, and additional information can be found in the documentation.

The best way to fetch rows in ScyllaDB (Count, Limit, or paging)

There are different ways to query data to fetch rows. The discussion lists some examples of how and when to use each method.

The user is utilizing ScyllaDB to limit users’ actions within the past 24 hours, allowing only a specific number of orders. They use ScyllaDB’s TTL, count records, and employ the Murmur3 hash for the partition key. The user seeks advice on the most efficient query method, considering options like COUNT(), LIMIT, and paging.

The response emphasizes the importance of always using paging to avoid latency and memory fragmentation. It suggests using LIMIT with paging, highlighting potential issues with the user’s initial approach.

Community Forum vs. Slack

Both the Community Forum and Slack serve as valuable communication channels within the ScyllaDB community, but they cater to different needs and scenarios.

When should you use each one? Here are some tips.

ScyllaDB Community Forum

Technical Discussions

  • When to Use: For in-depth technical questions, discussing specific features, or getting help with troubleshooting. If you think your question would be helpful to others, see if someone already asked it. If not, ask it in the forum so people can find it in the future.
  • Why: The forum is easy to search. It provides a structured environment for technical discussions, allowing for detailed explanations, code snippets, and collaborative problem-solving.

Showcasing Use Cases, Tips and Guides

  • When to Use: For seeking or providing use cases, tips, and best practices, and practical applications related to ScyllaDB.
  • Why: The forum is easy to search. It’s an excellent repository for community-generated content, making it a valuable resource for users looking to learn or share knowledge.

Announcements and Updates

  • When to Use: To stay informed about the latest releases, updates, events, and announcements from the ScyllaDB team.
  • Why: Important news and updates are posted on the forum, providing a centralized location for community members to stay up-to-date.

ScyllaDB Slack

Real-Time, Specific Communication

  • When to Use: For quick, very specific questions, immediate assistance, or engaging in real-time discussions with community members.
  • Why: Slack offers a more instantaneous communication channel, making it suitable for situations where quick responses are crucial.

Informal Conversations

  • When to Use: For informal conversations, networking, and community bonding.
  • Why: Slack channels often have a more relaxed and conversational atmosphere, providing a space for community members to connect on a personal level.

Be aware that the support provided by the community on Slack and on the Forum is “best effort.” If you are running ScyllaDB in production and want premium support with guaranteed SLAs I recommend you look into ScyllaDB Enterprise or ScyllaDB Cloud which offer 24×7 priority support by our dedicated support engineers.

Community Events and Appreciation

We regularly organize events that serve as platforms for learning, collaboration, and the celebration of achievements within the ScyllaDB ecosystem.

In just a few weeks,  we’ll host ScyllaDB Summit. This free online event brings together developers, engineers, and database enthusiasts from around the globe. It’s an immersive experience, with keynote speakers, technical sessions, and hands-on workshops covering a myriad of topics, from performance optimization to real-world use cases. Participants gain insights into the latest developments, upcoming features, and best practices directly from the core contributors and experts behind ScyllaDB.

Other notable events include P99 CONF (the technical conference for anyone who obsesses over high-performance, low-latency applications), ScyllaDB University Live ( instructor-led NoSQL database training sessions), Meetups, NoSQL Masterclasses, ScyllaDB Labs (hands-on, online, training event), our webinars and more. All are free and virtual.

Contributors are recognized for their efforts through various means, including contributor spotlights, awards, and acknowledgments within the ScyllaDB ecosystem. This recognition not only celebrates individual achievements but also motivates others to actively participate and contribute to the community.

Say hello & share how you’re using ScyllaDB

Closing Thoughts

ScyllaDB embraces an open-source, community-driven approach, fostering collaboration and transparency.
The collaborative nature of open source allows for meaningful discussions, bug reports, feature contributions, and more. There are multiple avenues for involvement, including the Community Forum, Slack channel, and GitHub repository.

As the ScyllaDB community continues to grow, events like the ScyllaDB Summit, P99 CONF, ScyllaDB University Live, and more provide platforms for learning, collaboration, and celebrating achievements. I hope to hear from you soon on one of these platforms!

Connect to ScyllaDB Clusters using the DBeaver Universal Database Manager

Learn how to use DBeaver as a simple alternative to cqlsh – so you can access your ScyllaDB clusters through a GUI with syntax highlighting

DBeaver is a universal database manager for working with SQL and NoSQL. It provides a central GUI for connecting to and administering a wide variety of popular databases (Postgres, MySQL, Redis…) . You can use it to toggle between the various databases in your tech stack, view and evaluate multiple database schemas, and visualize your data.

And now you can use it with ScyllaDB, the monstrously fast NoSQL database. This provides a simple alternative to cqlsh – enabling you to access your ScyllaDB clusters through a GUI with syntax highlighting.

DBeaver Enterprise has released version 23.3, introducing support for ScyllaDB. This update lets you seamlessly connect to ScyllaDB clusters using DBeaver. Both self-hosted and ScyllaDB Cloud instances work well with DBeaver.You can install DBeaver on Windows, Linux, and MacOS.

In this post, I will show you how easy it is to connect DBeaver and ScyllaDB and run some basic queries.

Connect to ScyllaDB in DBeaver

Since ScyllaDB is compatible with Apache Cassandra, you can leverage DBeaver’s Cassandra driver to interact with ScyllaDB.
To create a new ScyllaDB connection in DBeaver:

  1. Download DBeaver Enterprise at https://dbeaver.com/download/enterprise/
  2. Click the “New database connection” button and search for “ScyllaDB.”
  3. Click “Next”.
  4. Enter the database connection details as follows:
  5. Click “Test Connection” and “Finish.”
  6. Inspect your tables.

  7. Run your CQL queries.

Query examples

Once the connection is set up, you can run all your CQL queries and see the result table right below your query. For example:

Aside from simple SELECT queries, you can also run queries to create new objects in the database, like a materialized view:

Wrapping up

DBeaver is a great database management tool that allows easy access to your ScyllaDB clusters. It provides a simple alternative to CQLsh with a nice user interface and syntax highlighting. Get started by downloading DBeaver and creating a new ScyllaDB Cloud cluster.

Any questions? You can discuss this post and share your thoughts in our community forum.

Worldwide Local Latency With ScyllaDB: The ZeroFlucs Strategy

How data is replicated to support low latency for ZeroFlucs’ global usage patterns – without racking up unnecessary costs

ZeroFlucs’ business – processing sports betting data – is rather latency sensitive. Content must be processed in near real-time, constantly, and in a region local to both the customer and the data. And there’s incredibly high throughput and concurrency requirements – events can update dozens of times per minute and each one of those updates triggers tens of thousands of new simulations (they process ~250,000 in-game events per second).

At ScyllaDB Summit 23, ZeroFlucs’ Director of Software Engineering Carly Christensen walked attendees through how ZeroFlucs uses ScyllaDB to provide optimized data storage local to the customer – including how their recently open-sourced package (cleverly named Charybdis) facilitates this. This blog post, based on that talk, shares their brilliant approach to figuring out exactly how data should be replicated to support low latency for their global usage patterns without racking up unnecessary storage costs.

Join us at ScyllaDB Summit 24 to hear more firsthand accounts of how teams are tackling their toughest database challenges. Disney, Discord, Expedia, Fanatics, Paramount, and more are all on the agenda.

Register Now – It’s Free

What’s ZeroFlucs?

First, a little background on the business challenges that the ZeroFlucs technology is supporting. ZeroFlucs’ same-game pricing lets sports enthusiasts bet on multiple correlated outcomes within a single game. This is leagues beyond traditional bets on what team will win a game and by what spread. Here, customers are encouraged to design and test sophisticated game theories involving interconnected outcomes within the game. As a result, placing a bet is complex, and there’s a lot more at stake as the live event unfolds.

For example, assume there are three “markets” for bets:

  • Will Team A or Team B win?
  • Which player will score the first touchdown?
  • Will the combined scores of Team A and B be over or under 45.5 points?

Someone could place a bet on team A to win, B. Bhooma to score the first touchdown, and for the total score to be under 45.5 points. If you look at those 3 outcomes and multiply the prices together, you get a price of around $28. But in this case, the correct price is approximately $14.50.

Carly explains why. “It’s because these are correlated outcomes. So, we need to use a simulation-based approach to more effectively model the relationships between those outcomes. If a team wins, it’s much more likely that they will score the first touchdown or any other touchdown in that match. So, we run simulations, and each simulation models a game end-to-end, play-by-play. We run tens of thousands of these simulations to ensure that we cover as much of the probability space as possible.”

The ZeroFlucs Architecture

The ZeroFlucs platform was designed from the ground up to be cloud native. Their software stack runs on Kubernetes, using Oracle Container Engine for Kubernetes. There are 130+ microservices, growing every week. And a lot of their environment can be managed through custom resource definitions (CRDs) and operators. As Carly explains, “For example, if we want to add a new sport, we just define a new instance of that resource type and deploy that YAML file out to all of our clusters.” A few more tech details:

  • Services are primarily Golang
  • Python is used for modeling and simulation services
  • GRPC is used for internal communications
  • Kafka is used for “at least once” delivery of all incoming and outgoing updates
  • GraphQL is used for external-facing APIs

As the diagram above shows:

  • Multiple third-party sources send content feeds.
  • Those content items are combined into booking events, which are then used for model simulations.
  • The simulation results are used to generate hundreds to thousands of new markets (specific outcomes that can be bet on), which are then stored back on the original booking event.
  • Customers can interact directly with that booking event. Or, they can use the ZeroFlucs API to request prices for custom combinations of outcomes via the ZeroFlucs query engine. Those queries are answered with stored results from their simulations.

Any content update starts the entire process over again.

Keeping Pace with Live In-Play Events

ZeroFlucs’ ultimate goal is to process and simulate events fast enough to offer same-game prices for live in-play events. For example, they need to predict whether this play results in a touchdown and which player will score the next touchdown – and they must do it fast enough to provide the prices before the play is completed. There are two main challenges to accomplishing this:

  • High throughput and concurrency. Events can update dozens of times per minute, and each update triggers tens of thousands of new simulations (hundreds of megabytes of data). They’re currently processing about 250,000 in-game events per second.
  • Customers can be located anywhere in the world. That means ZeroFlucs must be able to place their services — and the associated data – near these customers. With each request passing through many microservices, even a small increase in latency between those services and the database can result in a major impact on the total end-to-end processing time.

Selecting a Database That’s Up to the Task

Carly and team initially explored whether three popular databases might meet their needs here.

  • MongoDB was familiar to many team members. However, they discovered that with a high number of concurrent queries, some queries took several seconds to complete.
  • Cassandra supported network-aware replication strategies, but its performance and resource usage fell short of their requirements.
  • CosmosDB addressed all their performance and regional distribution needs, but its high cost and Azure-only availability posed limitations on their portability. But they couldn’t justify its high cost, or the vendor lock-in.

Then they thought about ScyllaDB, a database they had discovered while working on a different project. It didn’t make sense for the earlier use case, but it met this project’s requirements quite nicely. As Carly put it: “ScyllaDB supported the distributed architecture that we needed, so we could locate our data replicas near our services and our customers to ensure that they always had low latency. It also supported the high throughput and concurrency that we required. We haven’t yet found a situation that we couldn’t just scale through. ScyllaDB was also easy to adopt. Using ScyllaDB Operator, we didn’t need a lot of domain knowledge to get started.”

Inside their ScyllaDB Architecture

ZeroFlucs is currently using ScyllaDB hosted on Oracle Cloud Flex 4 VMs. These VMs allow them to change the CPU and memory allocation to those nodes if needed. It’s currently performing well, but the company’s throughput increases with every new customer. That’s why they appreciate being able to scale up and run on bare metal if needed in the future.

They’re already using ScyllaDB Operator to manage ScyllaDB, and they were reviewing their strategy around ScyllaDB Manager and ScyllaDB Monitoring at the time of the talk.

Ensuring Data is Local to Customers

To make the most of ScyllaDB, ZeroFlucs divided their data into three main categories:

  • Global data. This is slow-changing data used by all their customers. It’s replicated to each and every one of their regions.
  • Regional data. This is data that’s used by multiple customers in a single region (for example, a sports feed). If a customer in another region requires their data, they separately replicate it into that region.
  • Customer data. This is data that is specific to that customer, such as their booked events or their simulation results. Each customer has a home region where multiple replicas of their data are stored. ZeroFlucs also keeps additional copies of their data in other agents that they can use for disaster recovery purposes.

Carly shared an example: “Just to illustrate that idea, let’s say we have a customer in London. We will place a copy of our services (“a cell”) into that region. And all of that customer’s interactions will be contained in that region, ensuring that they always have low latency. We’ll place multiple replicas of their data in that region. And will also place additional replicas of their data in other regions. This becomes important later.”

Now assume there’s a customer in the Newport region. They would place a cell of their services there, and all of that customer’s interactions would be contained within the Newport region so they also have low latency.

Carly continues, “If the London data center becomes unavailable, we can redirect that customer’s requests to the Newport region. And although they would have increased latency on the first hop of those requests, the rest of the processing is still contained within one data center – so it would still be low latency.” With a complete outage for that customer averted, ZeroFlucs would then increase the number of replicas of their data in that region to restore data resiliency for them.

Between Scylla(DB) and Charybdis

ZeroFlucs separates data into services and keyspaces, with each service using at least one keyspace. Global data has just one keyspace, regional data has a keyspace per region, and customer data has a keyspace per customer. Some services can have more than one data type, and thus might have both a global keyspace as well as customer keyspaces.

They needed a simple way to manage the orchestration and updating of keyspaces across all their services. Enter Charybdis, the Golang ScyllaDB helper library that the ZeroFlucs team created and open sourced. Charybdis features a table manager that will automatically create keyspaces as well as add tables, columns, and indexes. It offers simplified functions for CRUD-style operations, and it supports LWT and TTL.

Note: For an in-depth look at the design decisions behind Charydbis, see this blog by ZeroFlucs Founder and CEO Steve Gray.

There’s also a topology Controller Service that’s responsible for managing the replication settings and keyspace information related to every service.

Upon startup, the service calls the topology controller and retrieves its replication settings. It then combines that data with its table definitions and uses it to maintain its keyspaces in ScyllaDB. The above image shows sample Charybdis-generated DDL statements that include a network topology strategy.

Next on their Odyssey

Carly concluded: “We still have a lot to learn, and we’re really early in our journey. For example, our initial attempt at dynamic keyspace creation caused some timeouts between our services, especially if it was the first request for that instance of the service. And there are still many Scylla DB settings that we have yet to explore. I’m sure that we’ll be able to increase our performance and get even more out of Scylla DB in the future.”

Watch the Complete Tech Talk

You can watch Carly’s complete tech talk and skim through her deck in our tech talk library.

Watch the Full Tech Talk

Running Apache Cassandra® Single and Multi-Node Clusters on Docker with Docker Compose

Sometimes you might need to spin up a local test database quickly–a database that doesn’t need to last beyond a set time or number of uses. Or maybe you want to integrate Apache Cassandra® into an existing Docker setup.  

Either way, you’re going to want to run Cassandra on Docker, which means running it in a container with Docker as the container manager. This tutorial is here to guide you through running a single and multi-node setup of Apache Cassandra on Docker. 

Prerequisites 

Before getting started, you’ll need to have a few things already installed, and a few basic skills. These will make deploying and running your Cassandra database in Docker a seamless experience: 

  • Docker installed  
  • Basic knowledge of containers and Docker (see the Docker documentation for more insight) 
  • Basic command line knowledge 
  • A code editor (I use VSCode) 
  • CQL shell, aka cqlsh, installed (instructions for installing a standalone cqlsh without installing Cassandra can be found here) 

Method 1: Running a single Cassandra node using Docker CLI 

This method uses the Docker CLI to create a container based on the latest official Cassandra image. In this example we will: 

  • Set up the Docker container 
  • Test that it’s set up by connecting to it and running cqlsh 
  • Clean up the container once you’re done with using it. 

Setting up the container 

You can run Cassandra on your machine by opening up a terminal and using the following command in the Docker CLI: 

docker run –name my-cassandra-db  -d cassandra:latest 

Let’s look at what this command does: 

  • Docker uses the ‘run’ subcommand to run new containers.  
  • The ‘–name’ field allows us to name the container, which helps for later use and cleanup; we’ll use the name ‘my-cassandra-db’ 
  • The ‘-d’ flag tells Docker to run the container in the background, so we can run other commands or close the terminal without turning off the container.  
  • The final argument ‘cassandra:latest’ is the image to build the container from; we’re using the latest official Cassandra image 

When you run this, you should see an ID, like the screenshot below: 

To check and make sure everything is running smoothly, run the following command: 

docker ps -a 

You should see something like this: 

Connecting to the container 

Now that the data container has been created, you can now connect to it using the following command: 

docker exec -it my-cassandra-db cqlsh 

This will run cqlsh, or CQL Shell, inside your container, allowing you to make queries to your new Cassandra database. You should see a prompt like the following: 

Cleaning up the container 

Once you’re done, you can clean up the container with the ’docker rm’ command. First, you’ll need to stop the container though, so you must to run the following 2 commands:  

docker stop my-cassandra-db 

docker rm my-cassandra-db 

This will delete the database container, including all data that was written to the database. You’ll see a prompt like the following, which, if it worked correctly, will show the ID of the container being stopped/removed: 

Method 2: Deploying a three-node Apache Cassandra cluster using Docker compose 

This method allows you to have multiple nodes running on a single machine. But in which situations would you want to use this method? Some examples include testing the consistency level of your queries, your replication setup, and more.

Writing a docker-compose.yml 

The first step is creating a docker-compose.yml file that describes our Cassandra cluster. In your code editor, create a docker-compose.yml file and enter the following into it: 

version: '3.8' 

networks: 

  cassandra: 

services: 

  cassandra1: 

    image: cassandra:latest 

    container_name: cassandra1 

    hostname: cassandra1 

    networks: 

      - cassandra 

    ports: 

      - "9042:9042" 

    environment: &environment  

        CASSANDRA_SEEDS: "cassandra1,cassandra2"   

        CASSANDRA_CLUSTER_NAME: MyTestCluster 

        CASSANDRA_DC: DC1 

        CASSANDRA_RACK: RACK1 

        CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch 

        CASSANDRA_NUM_TOKENS: 128 

  cassandra2: 

    image: cassandra:latest 

    container_name: cassandra2 

    hostname: cassandra2 

    networks: 

      - cassandra 

    ports: 

      - "9043:9042" 

    environment: *environment   

    depends_on: 

      cassandra1:  

        condition: service_started 

  cassandra3: 

    image: cassandra:latest 

    container_name: cassandra3 

    hostname: cassandra3 

    networks: 

      - cassandra 

    ports: 

      - "9044:9042" 

    environment: *environment   

    depends_on: 

      cassandra2:   

        condition: service_started

So what does this all mean? Let’s examine it part-by-part:  

First, we declare our docker compose version.  

version: '3.8'

Then, we declared a network called cassandra to host our cluster.  

networks: 

  cassandra:

Under services, cassandra1 is started. (NOTE: the depends on service start conditions in cassandra2 and cassandra3’s `depends_on~ attributes prevent them from starting until the service on cassandra1 and cassandra2 have started, respectively.) We also set the port forwarding here so that our local 9042 port will map to the container’s 9042. We also add it to the cassandra network we established: 

services: 

  cassandra1: 

    image: cassandra:latest 

    container_name: cassandra1 

    hostname: cassandra1 

    networks: 

      - cassandra 

    ports: 

      - "9042:9042" 

    environment: &environment  

        CASSANDRA_SEEDS: "cassandra1,cassandra2"   

        CASSANDRA_CLUSTER_NAME: MyTestCluster 

        CASSANDRA_DC: DC1 

        CASSANDRA_RACK: RACK1 

        CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch 

        CASSANDRA_NUM_TOKENS: 128

Finally, we set some environment variables needed for startup, such as declaring CASSANDRA_SEEDS to be cassandra1 and cassandra2. 

The configurations for containers ‘cassandra2 ‘and ‘cassandra3’ are very similar; the only real difference are the names.  

  • Both use the same cassandra:latest image, set container names, add themselves to the Cassandra network, and expose their 9042 port.  
  • They also point to the same environment variables as cassandra1 with the *environment syntax. 
    • Their only difference? cassandra2 waits on cassandra1, and cassandra3 waits on cassandra2. 

Here is the code section that this maps to: 

cassandra2: 

    image: cassandra:latest 

    container_name: cassandra2 

    hostname: cassandra2 

    networks: 

      - cassandra 

    ports: 

      - "9043:9042" 

    environment: *environment   

    depends_on: 

      cassandra1:  

        condition: service_started 

  cassandra3: 

    image: cassandra:latest 

    container_name: cassandra3 

    hostname: cassandra3 

    networks: 

      - cassandra 

    ports: 

      - "9044:9042" 

    environment: *environment   

    depends_on: 

      cassandra2:   

        condition: service_started

Deploying your Cassandra cluster and running commands 

To deploy your Cassandra cluster, use the Docker CLI in the same folder as your docker-compose.yml to run the following command (the -d causes the containers to run in the background): 

docker compose up -d

Quite a few things should happen in your terminal when you run the command, but when the dust has settled you should see something like this: 

If you run the ‘docker ps -a,’ command,  you should see three running containers: 

To access your Cassandra cluster, you can use csqlsh to connect to the container database using the following commands: 

sudo docker exec -it cassandra1 cqlsh

You can also check the cluster configuration using: 

docker exec -it cassandra1 nodetool status

Which will get you something like this: 

And the node info with: 

docker exec -it cassandra1 nodetool info

From which you’ll see something similar to the following: 

You can also run these commands on the cassandra2 and cassandra3 containers. 

Cleaning up 

Once you’re done with the database cluster, you can take it down and remove it with the following command: 

docker compose down

This will stop and destroy all three containers, outputting something like this:

Now that we’ve covered two ways to run Cassandra in Docker, let’s look at a few things to keep in mind when you’re using it. 

Important things to know about running Cassandra in Docker 

Data Permanence 

Unless you declare volumes on the machine that maps to container volumes, the data you write to your Cassandra database will be erased when the container is destroyed. (You can read more about using Docker volumes here).

Performance and Resources 

Apache Cassandra can take a lot of resources, especially when a cluster is deployed on a single machine. This can affect the performance of queries, and you’ll need a decent amount of CPU and RAM to run a cluster locally. 

Conclusion 

There are several ways to run Apache Cassandra on Docker, and we hope this post has illuminated a few ways to do so. If you’re interested in learning more about Cassandra, you can find out more about how data modelling works with Cassandra, or how PostgreSQL and Cassandra differ. 

Ready to spin up some Cassandra clusters yourself? Give it a go with a free trial on the Instaclustr Managed Platform for Apache Cassandra® today!

The post Running Apache Cassandra® Single and Multi-Node Clusters on Docker with Docker Compose appeared first on Instaclustr.

Database Insights from Disney, Discord, Expedia & More at ScyllaDB Summit

Discover how your peers at Disney, Discord, Expedia, Supercell, Digital Turbine, ShareChat, Zee, Paramount, and more are tackling their toughest database challenges.

Join us at ScyllaDB Summit, a free 2-day community event that’s intentionally virtual, highly interactive, and purely technical.  You will hear about your peers’ experiences and discover new ways to alleviate your own latency, throughput, and cost pains.

This year’s agenda is exceptional. Teams from Disney, Discord, Expedia, Supercell, Digital Turbine, ShareChat, Zee, Paramount, and more will be sharing their expertise and lessons learned. And the talks span a broad range of topics: AI/ML, Kafka, change data capture (CDC), database internals, Cassandra, MongoDB, DynamoDB, and of course ScyllaDB.

See the Agenda

Keynotes include:

.Redefining the Boundaries of Performance at Scale
Dor Laor, ScyllaDB

So You’ve Lost Quorum: Lessons From Accidental Downtime
Bo Ingram, Discord

 

Tablets: Rethinking Replication
Avi Kivity, ScyllaDB

 

Radically Outperforming DynamoDB 
Miles Ward (SADA) and Joseph Shorter (Digital Turbine)

CTO Insights: Steering a High-Stakes Database Migration
Kishore Krishnamurthy, Zee5

ScyllaDB Cloud: Faster and More Flexible
Michael Hollander, ScyllaDB

Plus, stay tuned for one more keynote announcement coming soon – someone that we know will set the live event chat on fire.

Register Now – It’s Free

Here are a few things to note in the agenda…

Hands-On Training with ScyllaDB Labs

mascot looking at open laptopNew this year: We’ll kick off the event by offering two tracks of interactive hands-on training. The training is part of the free “all-inclusive” package. 🙂 Just choose your path and start leveling up your database performance game before the first keynote begins.

Building High-Performance Apps: Designed to help anyone new to ScyllaDB get up to speed fast. Topics include:

  • Achieving and maintaining low-latency NoSQL at scale
  • Data modeling, the read/write path, high availability, consistency, and replication
  • Building high-performance applications with ScyllaDB with a sample app
  • Navigating key decisions & logistics when getting started with ScyllaDB

Power User Best Practices: Advanced database performance strategies for users who have already mastered the ScyllaDB fundamentals. Topics include:

  • Taking advantage of advanced data modeling features
  • Writing better apps using optimizations and best practices
  • Gaining insight by customizing your monitoring for different priorities & pains
  • Making your applications faster and more available

Bonus: Get a head start at ScyllaDB University

Real-World NoSQL Challenges

ScyllaDB user sessions are always the most talked-about element of ScyllaDB Summit. This year’s talks cover all phases of the ScyllaDB “sea monster” odyssey: from comparing ScyllaDB to other candidates you might be considering, to the nuts-and-bolts logistics of a migration, to best practices for keeping your cluster running smoothly. There’s even critical lessons learned from a real-world perfect storm stressing the ScyllaDB cluster, including tips on how you can avoid making a fault too big to tolerate.

We’re also preparing something special for the teams considering a move from DynamoDB, MongoDB, or Cassandra to ScyllaDB. For each of those situations, we’ve curated a special “cluster” of sessions to help you understand what to expect. Each provides a jumpstart based on what we’ve learned from working with your peers across hundreds of use cases, a customer perspective on that migration, and strategies for success.

The Latest from ScyllaDB Engineering

ScyllaDB Engineering has earned a reputation for pushing the limits of what’s possible for database performance, as well as the broader discipline of distributed database engineering. Our shard-per-core architecture, Seastar framework, specialized internal cache, IO scheduler, shard-aware Rust drivers, WebAssembly UDFs and UDAs… if you haven’t been following it all, you can catch up on our engineering blog.

The team has reached some major milestones this year. At ScyllaDB Summit, you’ll hear a lot about our new tablets replication algorithm, our journey to extreme elasticity, our move to strong consistency, and a host of ways that we’re optimizing performance.

Books! (And Swag!)

If you look carefully through the speaker list, you’ll find two book authors: Discord’s Bo Ingram and ScyllaDB’s Felipe Cardeneti Mendes.

Bo is currently completing ScyllaDB in Action, a practical guide to everything you need to know about ScyllaDB, from your very first queries to running it in a production environment. This book teaches you ScyllaDB the best way—through hands-on examples.  Bo introduces it this way: “ScyllaDB in Action is the book I desperately wanted years ago…Building a database that easily scales and performs queries quickly and efficiently is a throughline of the book; I want to show you how to build, maintain, and run in production an effective and efficient database.”

database performance at scale guideAnd Felipe is (hopefully!) now recovered from the 2023 Database Performance at Scale book project that a few of us (Felipe, Piotr Sarna, Pavel Emelyanov, and I) collaborated on. As noted in this blog, we wrote the book to share our collective experience with performance-focused database engineering as well as performance-focused database users. It represents what we think teams striving for extreme database performance — low latency, high throughput, or both—should be thinking about, but often overlook. This includes the nuances of DB internals, drivers, infrastructure, topology, monitoring, and quite a lot more. It’s not about ScyllaDB per se, but it should definitely be of interest to anyone using or considering ScyllaDB.

Register for ScyllaDB Summit – Books & Swag

We’ll be offering access to both of these books throughout ScyllaDB Summit. And if you’ve attended any of our virtual events in the past, you know that our swag packs are second to none. Here’s an idea of what to expect:

We hope to see you there!

 

Migrating from Postgres to ScyllaDB, with 349X Faster Query Processing

How Coralogix cut processing times from 30 seconds to 86 milliseconds with a PostgreSQL to ScyllaDB migration

Speed matters for Coralogix, an observability platform that dev teams trust to spot incidents before they escalate into problems. Coralogix uses a real-time streaming analytics pipeline, providing monitoring, visualization, and alerting capabilities without requiring indexing.

One of Coralogix’s key differentiators is a distributed query engine for fast queries on mapped data from a customer’s archives in remote storage. That engine queries semi-structured data stored in object storage (e.g., GCS, S3) using a specialized Parquet format. It was originally designed as a stateless query engine on top of the underlying object storage, but reading Parquet metadata during query execution introduced an unacceptable latency hit. To overcome this, they developed a metadata store (simply called “metastore”) to enable faster retrieval and processing of the Parquet metadata needed to execute large queries.

The original metastore implementation, built on top of PostgreSQL, wasn’t fast enough for their needs. So, the team tried a new implementation – this time, with ScyllaDB instead of PostgreSQL. Spoiler: It worked. They achieved impressive performance gains – cutting query processing time from 30 seconds to 86 milliseconds. And their engineers Dan Harris (Principal Software Engineer) and Sebastian Vercruysse (Senior Software Engineer) took the stage at ScyllaDB Summit to explain how they did it. This article is based on their presentation.

Join us at ScyllaDB Summit 24 to hear more firsthand accounts of how teams are tackling their toughest database challenges. Disney, Discord, Expedia, Supercell, Paramount, and more are all on the agenda.

Register Now – It’s Free

Metastore Motivation and Requirements

Before getting into the metastore implementation details, let’s take a step back and look at the rationale for building a metastore in the first place.

“We initially designed this platform as a stateless query engine on top of the underlying object storage – but we quickly realized that the cost of reading Parquet metadata during query execution is a large percentage of the query time,” explained Dan Harris. They realized that they could speed this up by placing it in a fast storage system that they could query quickly (instead of reading and processing the Parquet metadata directly from the underlying object storage).

They envisioned a solution that would:

  • Store the Parquet metadata in a decomposed format for high scalability and throughput
  • Use bloom filters to efficiently identify files to scan for each query
  • Use transactional commit logs to transactionally add, update, and replace existing data in the underlying object storage

Key requirements included low latency, scalability in terms of both read/write capacity, and scalability of the underlying storage. And to understand the extreme scalability required, consider this: a single customer generates 2,000 Parquet files per hour (50,000 per day), totaling 15 TB per day, resulting in 20 GB in Parquet metadata alone for a single day.

The Initial PostgreSQL Implementation

“We started the initial implementation on Postgres, understanding at the time that a non-distributed engine wouldn’t be sufficient for the long run,” Dan acknowledged. That original implementation stored key information such as “blocks,” representing one row group and one Parquet file. This includes metadata like the file’s URL, row group index, and minimal details about the file. For example:

Block url: 
s3://cgx-production-c4c-archive-data/cx/parquet/v1/team_id=555585/…
…dt=2022-12-02/hr=10/0246f9e9-f0da-4723-9b64-a12346095d25.parquet
Row group: 0, 1, 2 …
Min timestamp 
Max timestamp
Number of rows
Total size
…

To optimize reading, they used bloom filters for efficient data pruning. Dan detailed, “Eventually, we want to support something like full-text search. Basically, when we’re ingesting these files into our system, we can build a bloom filter for all the distinct tokens that we find in the file. Then, based off a particular query, we can use those bloom filters to prune the data that we need to scan.” They stored bloom filters in a block-split setup, breaking them into 32-byte blocks for efficient retrieval. They’re stored independently so the system doesn’t have to read the entire bloom filter at query time.

Additionally, they stored column metadata for each Parquet file. For example:

Block URL
Row Group
Column Name
Column metadata (blob)

Dan explained: “The files that we’re writing are quite wide, sometimes as many as 20,000 columns. So, by reading only the metadata that we need, we can really reduce the amount of IO required on any given query.”

ScyllaDB Implementation

Next, let’s look at the ScyllaDB implementation as outlined by Dan’s teammate, Sebastian Vercruysse.

Blocks Data Modeling

The block modeling had to be revisited for the new implementation. Here’s an example of a block URL:

s3://cgx-production-c4c-archive-data/cx/parquet/v1/team_id=555585/…
…dt=2022-12-02/hr=10/0246f9e9-f0da-4723-9b64-a12346095d25.parquet

The bold part is the customer’s top-level bucket; inside the bucket, items are partitioned by hour. In this case, what should be used as the primary key?

  • (Table url)? But some customers have many more Parquet files than other customers, and they wanted to keep things balanced
  • ((Block url, row group))? This uniquely identifies a given block, but it would be difficult to list all the blocks for a given day because the timestamp is not in the key
  • ((Table url, hour))? That works because if you have 24 hours to query, you can query quite easily
  • ((Table url, hour), block url, row group)? That’s what they selected. By adding the block url and row group as clustering keys, they can easily retrieve a specific block within an hour, which also simplifies the process of updating or deleting blocks and row groups.

Bloom Filter Chunking and Data Modeling

The next challenge: how to verify that certain bits are set, given that ScyllaDB doesn’t offer out-of-the-box functions for that. The team decided to read bloom filters and process them in the application. However, remember that they are dealing with up to 50,000 blocks per day per customer, each block containing 262 KB for the bloom filter part. That’s a total of 12 GB – too much to pull back into the application for one query. But they didn’t need to read the whole bloom filter each time; they needed only parts of it, depending on the tokens involved during query execution. So, they ended up chunking and splitting bloom filters into rows, which reduced the data read to a manageable 1.6 megabytes.

For data modeling, one option was to use ((block_url, row_group), chunk index) as the primary key. That would generate 8192 chunks of 32 bytes per bloom filter, resulting in an even distribution with about 262 KB per partition. With every bloom filter in the same partition, it would be easy to insert and delete data with a single batch query. But there’s a catch that impacts read efficiency: you would need to know the ID of the block before you could read the bloom filter. Additionally, the approach would involve accessing a substantial number of partitions; 50K blocks means 50K partitions. And as Sebastian noted, “Even with something as fast as ScyllaDB, it’s still hard to achieve sub-second processing for 50K partitions.”

Another option (the one they ultimately decided on): ((table url, hour, chunk index), block url, row group). Note that this is the same partition key as the Blocks one, with an added index to the partition key that represents the nth token required by the query engine. With this approach, scanning 5 tokens spanning a 24 hour window results in 120 partitions – an impressive improvement compared to the previous data modeling option.

Furthermore, this approach no longer requires the block ID before reading the bloom filter – allowing for faster reads. Of course, there are always trade-offs. Here, due to the blocked bloom filter approach, they have to split a single bloom filter into 8192 unique partitions. This ends up impacting the ingestion speed compared to the previous partitioning approach that allowed ingesting all bloom filter chunks at once. However, the ability to quickly read a given block within an hour is more important to them than fast writes – so they decided that this tradeoff was worth it.

Data Modeling Woes

Not surprisingly, moving from SQL to NoSQL involved a fair amount of data modeling rework, including some trial and error. For example, Sebastian shared, “One day, I figured out that we had messed up min and max timestamps – and I wondered how I was going to fix it. I thought maybe I could rename the columns and then somehow make it work again. But, here you cannot rename a column if it’s part of a clustering key. I thought I could perhaps add new columns and run an UPDATE query to update all rows. Unfortunately, this doesn’t work in NoSQL either.”

Ultimately, they decided to truncate the table and start over again vs writing migration code. Their best advice on this front is to get it right the first time. 🙂

Performance Gains

Despite the data modeling work required, the migration paid off nicely. For the metastore block listing:

  • Each node currently handles 4-5 TB.
  • They’re currently processing around 10K writes per second with P99 latency consistently below one millisecond.
  • The block listing results in about 2000 parquet files in an hour; with their bloom filters, they’re processed in less than 20 milliseconds.
  • For 50K files, it’s less than 500 milliseconds.

They also do checking of bits. But, for 50K Parquet files, 500 milliseconds is fine for their needs.

In the column metadata processing, the P50 is quite good, but there’s a high tail latency. Sebastian explained: “The problem is that if we have 50K Parquet files, our executors are fetching all of these in parallel. That means we have a lot of concurrent queries and we’re not using the best disks. We assume that’s at the root of the problem.”

ScyllaDB Setup

Notably, Coralogix moved from first discovering ScyllaDB to getting into production with terabytes of data in just 2 months (and this was a SQL to NoSQL migration requiring data modeling work, not a much simpler Cassandra or DynamoDB migration).

The implementation was written in Rust on top of the ScyllaDB Rust driver, and they found ScyllaDB Operator for Kubernetes, ScyllaDB Monitoring, and ScyllaDB Manager all rather helpful for the speedy transition. Since offering their own customers a low-cost observability alternative is important to Coralogix, the Coralogix team was pleased by the favorable price-performance of their ScyllaDB infrastructure: a 3-node cluster with:

  • 8 vCPU
  • 32 GiB memory
  • ARM/Graviton
  • EBS volumes (gp3) with 500 MBps bandwidth and 12k IOPS

Using ARM reduces costs, and the decision to use EBS (gp3) volumes ultimately came down to availability, flexibility, and price-performance. They admitted, “This is a controversial decision – but we’re trying to make it work and we’ll see how long we can manage.”

Lessons Learned

Their key lessons learned here…

Keep an eye on partition sizes: The biggest difference in working with ScyllaDB vs working with Postgres is that you have to think rather carefully about your partitioning and partition sizes. Effective partitioning and clustering key selection makes a huge difference for performance.

Think about read/write patterns: You also have to think carefully about read/write patterns. Is your workload read-heavy? Does it involve a good mix of reads and writes? Or, is predominantly write-heavy? Coralogix’s workloads are quite write-heavy because they’re constantly ingesting data, but they need to prioritize reads because read latency is most critical to their business.

Avoid EBS: The team admits they were warned not to use EBS: “We didn’t listen, but we probably should have. If you’re considering using ScyllaDB, it would probably be a good idea to look at instances that have local SSDs instead of trying to use EBS volumes.”

Future Plans: WebAssembly UDFs with Rust

In the future, they want to find the middle ground between writing large enough chunks and reading unnecessary data. They’re splitting the chunks into ~8,000 rows and believe they can split them further into 1,000 rows, which could speed up their inserts.

Their ultimate goal is to offload even more work to ScyllaDB by taking advantage of User Defined Functions (UDFs) with WebAssembly. With their existing Rust code, integrating UDFs would eliminate the need to send data back to the application, providing flexibility for chunking adjustments and potential enhancements.

Sebastian shares, “We already have everything written in Rust. It would be really nice if we can start using the UDFs so we don’t have to send anything back to the application. That gives us a bit more leeway to play with the chunking.”

Watch the Complete Tech Talk

You can watch the complete tech talk and skim through the deck in our tech talk library.

Watch the Full Tech Talk

The World’s Largest Apache Kafka® and Apache Cassandra® Migration?

Here at Instaclustr by NetApp, we pride ourselves on our ability to migrate customers from self-managed or other managed providers with minimal risk and zero downtime, no matter how complex the scenario. At any point in time, we typically have 5-10 cluster migrations in progress. Planning and executing these migrations with our customers is a core part of the expertise of our Technical Operations team. 

Recently, we completed the largest new customer onboarding migration exercise in our history and it’s quite possibly the largest Apache Cassandra and Apache Kafka migration exercise ever completed by anyone. While we can’t tell you who the customer is, in this blog we will walk through the overall process and provide details of our approach. This will give you an idea of the lengths we go to onboarding customers and perhaps to pick up some hints for your own migration exercises. 

Firstly, some stats to give you a sense of the scale of the exercise: 

  • Apache Cassandra: 
    • 58 clusters
    • 1,079 nodes
    • 17 node sizes (ranging from r6g.medium to im4gn.4xlarge)
    • 2 cloud providers (AWS and GCP)
    • 6 cloud provider regions
  • Apache Kafka 
    • 154 clusters
    • 1,050 nodes
    • 21 node sizes (ranging from r6g.large to im4gn.4xlarge and r6gd.4xlarge)
    • 2 cloud providers (AWS and GCP)
    • 6 cloud provider regions

From the size of the environment, you can get a sense that the customer involved is a pretty large and mature organisation. Interestingly, this customer had been an Instaclustr support customer for a number of years. Based on that support experience, they decide to trust us with taking on full management of their clusters to help reduce costs and improve reliability. 

Clearly, completing this number of migrations required a big effort both from Instaclustr and our customer. The timeline for the project looked something like: 

  • July 2022: contract signed and project kicked off 
  • July 2022 – March 2023: customer compliance review, POCs and feature enhancement development 
  • February 2023 – October 2023: production migrations 

Project Management and Governance 

A key to the success of any large project like this is strong project management and governance. Instaclustr has a wellestablished customer project management methodology that we apply to projects: 

Source: Instaclustr 

In line with this methodology, we staffed several key roles to support this project: 

  • Overall program manager 
  • Cassandra migration project manager 
  • Cassandra technical lead 
  • Kafka migration project manager 
  • Kafka technical lead 
  • Key Customer Product Manager 

The team worked directly with our customer counterparts and established several communication mechanisms that were vital to the success of the project. 

Architectural and Security Compliance 

While high-level compliance with the customer’s security and architecture requirements had been established during the pre-contract phase, the first phase of post-contract work was a more detailed solution review with the customer’s compliance and architectural teams.  

To facilitate this requirement, Instaclustr staff met regularly with the customer’s security team to understand their requirements and explain Instaclustr’s existing controls that met these needs. 

As expected, Instaclustr’s existing SOC2 and PCI certified controls meant that a very high percentage of the customer’s requirements were met right out of the box. This included controls such as intrusion detection, access logging and operating system hardening.  

However, as is common in mature environments with well-established requirements, a few gaps were identified and Instaclustr agreed to take these on as system enhancements. Some examples of the enhancements we delivered prior to commencing production migrations include: 

  • Extending the existing system to export logs to a customer-owned location to include audit logs 
  • The ability to opt-in at an account level for all newly created clusters to be automatically configured with log shipping 
  • Allowing the process that loads custom Kafka Connect connectors to use instance roles rather than access keys for s3 access 
  • Enhancements to our SCIM API for provisioning SSO access 

In addition to establishing security compliance, we used this period to further validate architectural fit and identified some enhancements that would help to ensure an optimal fit for the migrated clusters. Two key enhancements were delivered to meet this goal: 

  • Support for Kafka clusters running in two Availability Zones with RF2 
  • This is necessary as the customer has a fairly unique architecture that delivers HA above the Kafka cluster level 
  • Enabling multiple new AWS and GCP node types to optimize infrastructure spend 

Apache Kafka Migration 

Often when migrating Apache Kafka, the simplest approach is what we call Drain Out.   

In this approach, Kafka consumers are pointed at both the source and destination clusters; the producers are then switched to send messages to just the destination cluster. Once all messages are read from the source cluster, the consumers there can be switched off and the migration is complete. 

However, while this is the simplest approach from a Kafka point of view, it does not allow you to preserve message ordering through the cutover. This can be important in many use cases, and was certainly important for this customer. 

When the Drain Out approach is not suitable, using MirrorMaker2 can also be an option; we have deployed it on many occasions for other migrations. In this particular case, however, the level of consumer/producer application dependency for this approach ruled out using MirorrMaker2.  

This left us with the Shared Cluster approach, where we operate the source and destination clusters as a single cluster for a period before decommissioning the source. 

The high-level steps we followed for this shared cluster migration approach are: 

1. Provision destination Instaclustr managed cluster, shut down and wipe all data 

2. Update configurations on the destination cluster to match source cluster as required 

3. Join network environments with the source cluster (VPC peering, etc) 

4. Start up destination Apache ZooKeeper™ in observer mode, and start up destination Kafka brokers 

5. Use Kafka partition reassignment to move data:

a. Increase replication factor and replicate across destination as well as source brokers 

b. Swap preferred leaders to destination brokers 

c. Decrease replication factor to remove replicas from source brokers 

6. Reconfigure clients to use destination brokers as initial contact points 

7. Remove old brokers 

For each cluster, a detailed change plan was created by Instaclustr to cover all of the high-level steps listed above and rollback if any issues arose.  

 A couple of other specific requirements from this environment that added extra complexity worth mentioning: 

  • The source environment shared a single ZooKeeper instance across multiple clusters. This is not a configuration that we support and the customer agreed that it was a legacy configuration that they would rather leave behind. To accommodate the migration from this shared ZooKeeper, we had to develop functionality for custom configuration of ZooKeeper node names in our managed clusters as well as build a tool to “clean” the destination ZooKeeper of data related to other clusters after migration (for security and ongoing supportability). 
  • The existing clusters had port listener mappings that did not align with the mappings supported by our management system, and reconfiguring these prior to migration would have added extensive work on the customer side. We therefore extended our custom configuration to allow more extensive custom configuration of listeners. Like other custom configuration we support, this is stored in our central configuration database so it survives node replacements and is automatically added to new nodes in a cluster. 

Apache Cassandra Migration 

We have been doing zero downtime migrations of Apache Cassandra since 2014. All of them basically follow the “add a datacenter to an existing cluster” process that we outlined in a 2016 blog. 

One key enhancement that we’ve made since this blog–and even utilized since this most recent migration–is the introduction of the Instaclustr Minotaur consistent rebuild tool (available on GitHub here).  

If the source cluster is missing replicas of some data prior to starting the rebuild, the standard Cassandra data center rebuild process can try to copy more than one replica from the same source node. This results in even fewer replicas of data on the destination cluster.  

Instaclustr Minotaur addresses these issues.  

This can mean that in the standard case of replication factor 3 and consistency level quorum queries, you can go from having 2 replicas and data being consistently returned on the source cluster to only 1 replica (or even 0 replicas) and data being intermittently missed on the destination cluster.  

The “textbook” Cassandra approach to address this is to run Cassandra repairs after the rebuild, which will ensure all expected replicas are in sync. However, we are frequently asked to migrate clusters that have not been repaired for a long time and that can make running repairs a very tricky operation.  

Using the Minotaur tool, we can guarantee that the destination cluster has at least as many replicas as the source cluster. Running repairs to get the cluster back into a fully healthy state can then be left until the cluster is fully migrated, and our Tech Ops team can hand-hold the process. 

This approach was employed across all Cassandra migrations for this customer and proved particularly important for certain clusters with high levels of inconsistency pre-migration; one particularly tricky cluster even took two and half months to fully repair post migration!  

Another noteworthy challenge from this migration was a set of clusters where tables were dropped every 2 to 3 hours.  

This is a common design pattern for temporary data in Cassandra as it allows the data to be quickly and completely removed when it is no longer required (rather than a standard delete creating “tombstones” or virtual delete records). The downside is that the streaming of data to new nodes fails if a schema change occurs during a streaming operation and can’t be restarted.  

Through the migration process, we managed to work around this with manual coordination of pausing the table drop operation on the customer side while each node rebuild was occurring. However, it quickly became apparent that this would be too cumbersome to sustain through ongoing operations.  

To remedy this, we held a joint brainstorming meeting with the customer to work through the issue and potential solutions. The end result was a design for the automation on the customer-side to pause the dropping of tables whenever it was detected that a node in the cluster was not fully available. Instaclustr’s provisioning API provided node status information that could be used to facilitate this automation.  

Conclusion 

This was a mammoth effort that not only relied on Instaclustr’s accumulated expertise from many years of running Cassandra and Kafka, but also our strong focus on working as part of one team with our customers.  

The following feedback we received from the customer project manager is exactly the type of reaction we aim for with every customer interaction: 

“We’ve hit our goal ahead of schedule and could not have done it without the work from everyone on the Instaclustr side and [customer team]. It was a pleasure working with all parties involved!  

“The migration when smoothly with minimal disruption and some lessons learned. I’m looking forward to working with the Instaclustr team as we start to normalize with the new environment and build new processes with your teams to leverage your expertise.  

“Considering the size, scope, timeline and amount of data transferred, this was the largest migration I’ve ever worked on and I couldn’t have asked for better partners on both sides.” 

Interested in doing a migration yourself? Reach out to our team of engineers and we’ll get started on the best plan of action for your use case! 

The post The World’s Largest Apache Kafka® and Apache Cassandra® Migration? appeared first on Instaclustr.

How to Build a Low-Latency Video Streaming App with ScyllaDB & NextJS

We created a new video streaming app to help you learn about ScyllaDB. This blog covers its features, tech stack, and data modeling.

We  just published a new ScyllaDB sample application, a video streaming app. The project is available on GitHub. This blog covers the video streaming application’s features, tech stack, and breaks down the data modeling process.

Video streaming app features

The app has a minimal design with the most essential video streaming application features:

  • List all videos, sorted by creation date (home page)
  • List videos that you started watching
  • Watch video
  • Continue watching a video where you left off
  • Display a progress bar under each video thumbnail

Technology stack

Using ScyllaDB for low-latency video streaming applications

ScyllaDB is a low-latency and high-performance NoSQL database compatible with Apache Cassandra and DynamoDB. It is well-suited to handle the large-scale data storage and retrieval requirements of video streaming applications. ScyllaDB has drivers in all the popular programming languages and, as this sample application demonstrates, it integrates well with modern web development frameworks like NextJS.

Low latency in the context of video streaming services is crucial for delivering a seamless user experience. To lay the groundwork for high performance, you need to design a data model that fits your needs. Let’s continue with an example data modeling process to see what that looks like.

Video streaming app data modeling

In the ScyllaDB University Data Modeling course, we teach that NoSQL data modeling should always start with your application and queries first. Then you work backwards and create the schema based on the queries you want to run in your app. This process ensures that you create a data model that fits your queries and meets your requirements.

With that in mind, let’s go over the queries that our video streaming app needs to run on each page load!

Page: Continue Watching

On this page, you can list all the videos that they’ve started to watch. This view includes the video thumbnails and the progress bar under the thumbnail.

Query – get watch progress:

Schema – watch history table:

For this query, it makes sense to define `user_id` as the partition key because that is the filter we use to query the watch history table. Keep in mind that this schema might need to be updated later if there is a query that requires filtering on other columns beyond the `user_id`. For now, though, this schema is correct for the defined query.

Besides the progress value, the app also needs to fetch the actual metadata of each video (for example, the title and the thumbnail image). For this, the `video` table has to be queried.

Query – get video metadata:

Notice how we use the “IN” operator and not “=” because we need to fetch a list of videos not just a single video.

Schema – video table:

For the video table, let’s define the `id` as the partition key because that’s the only filter we use in the query.

Page: Watch Video

If you click on any of the “Watch” buttons, they will be redirected to a page with a video player where they can start and pause the video.

Query – get video content:

This is a very similar query to the one that runs on the Continue Watching page. Thus, the same schema will work just fine for this query as well.

Schema – video table:

Page: Most Recent Videos

Finally, let’s break down the Most Recent Videos page, which is the home page of the application. We analyze this page last because it is the most complex one from a data modeling perspective. This page lists ten of the most recently uploaded videos that are available in the database ordered by the video creation date.

We will have to fetch these videos in two steps: first get the timestamps, then get the actual video content.

Query – get the most recent ten videos’ timestamp:

You might notice that we use a custom function called `top10()`. This is not a standard function in ScyllaDB. It’s a UDF (user-defined function) that we created to solve this data modeling problem. This function returns an array of the most recent `created_at` timestamps in the table. Creating a new UDF in ScyllaDB can be a great way to solve your unique data modeling challenges.

These timestamp values can then be used to query the actual video content that we want to show on the page.

Query – get metadata for those videos:

Schema – recent videos:

In the recent videos materialized view, the `created_at` column is the primary key because we filter by that column in our first query to get the most recent timestamp values. Be aware that in some cases, this can cause a hot partition.

Furthermore, the UI also shows a small progress bar under each video’s thumbnail which indicates the progress you made watching that video. To fetch this value for each video, the app has to query the `watch history` table.

Query – get watch progress for each video:

Schema – watch history:

You might have noticed that the watch history table was already used in a previous query to fetch data. Now this time, the schema has to be modified slightly to fit this query. Let’s add `video_id` as a clustering key. This way, the query to fetch watch progress will work correctly.

That’s it. Now let’s see the final database schema!

Final database schema

User-defined function for the Most Recent Videos page

This UDF uses Lua, but you could also use Wasm to create UDFs in ScyllaDB. Creating the function make sure to enable UDFs in the scylla.yaml configuration file (location: /etc/scylla/scylla.yaml):

Clone the repo and get started!

To get started…

Clone the repository:
git clone https://github.com/scylladb/video-streaming

Install the dependencies:
npm install

Modify the configuration file:

Migrate the database and insert sample data:
npm run migrate

Run the server:
npm run dev

Wrapping up

We hope you enjoy our video streaming app and it helps you build low-latency and high-performance applications with ScyllaDB. If you want to keep on learning, check out ScyllaDB University where we have free courses on data modeling, ScyllaDB drivers, and much more! If you have questions about the video streaming sample app or ScyllaDB, go to our forum and let’s discuss!

More ScyllaDB sample applications:

Relevant resources:

Inside ScyllaDB’s Internal Cache

Why ScyllaDB completely bypasses the Linux cache during reads, using its own highly efficient row-based cache instead

ScyllaDB is built on the Seastar framework – and Seastar is the main component driving the database’s shard-per-core and shared-nothing asynchronous architecture. Seastar provides additional benefits for real-time and latency-sensitive applications, such as a dedicated userspace I/O scheduler, scheduling groups for workload isolation and prioritization, and more. You can learn more about this framework and ScyllaDB’s shard-per-core architecture in Dor Laor’s recent P99 CONF keynote.

However, one critical component of ScyllaDB’s architecture is not part of Seastar: our specialized cache. ScyllaDB completely bypasses the Linux cache during reads, using its own highly efficient row-based cache instead. This approach allows for low-latency reads without the added complexity of external caches, as discussed in this blog and recent talk. Taking this approach provides us the control needed to achieve predictable low latencies (e.g., single-digit millisecond P99 latencies for millions of OPS). It also allows us to offer users full visibility into details like cache hits and misses, evictions, and cache size so they can better understand and optimize performance from their side. Our specialized cache often enhances performance to the point where users can replace their external cache.

Let’s take a high-level look at why we took this path for ScyllaDB’s cache, then go a bit deeper into the technical details for those who are curious.

Hear more from ScyllaDB Engineering at ScyllaDB Summit (free + virtual). Talks cover rethinking replication with tablets, why Raft matters for ScyllaDB, balancing compaction principles and practices, and more. 

Why Not Use the Linux Page Cache?

Since ScyllaDB is designed to be fully compatible with Apache Cassandra (as well as DynamoDB), it takes advantage of the best elements of Cassandra’s design. Cassandra’s reliance on the default Linux page cache is not one of those “best elements.” The Linux page cache, also called disk cache, is a general-purpose type of cache. Although it can be tuned to better serve database workloads, it still lacks context over key database-specific needs.

Linux caching is inefficient for database implementations for a few reasons. First, the Linux page cache improves operating system performance by storing page-size chunks of files in memory to save on expensive disk reads. The Linux kernel treats files as 4KB chunks by default. This speeds up performance, but only when data is 4KB or larger. The problem is that many common database operations involve data smaller than 4KB. In those cases, Linux’s 4KB minimum leads to high read amplification.

Adding to the problem, the extra data is rarely useful for subsequent queries (since it usually has poor ‘spatial locality’). In most cases, it’s just wasted bandwidth. Cassandra attempts to alleviate read amplification by adding a key cache and a row cache, which directly store frequently used objects. However, Cassandra’s extra caches increase overall complexity and are very difficult to configure properly. The operator allocates memory to each cache. Different ratios produce varying performance characteristics and different workloads benefit from different settings. The operator also must decide how much memory to allocate to the JVM’s heap as well as the offheap memory structures. Since the allocations are performed at boot time, it’s practically impossible to get it right, especially for dynamic workloads that change dramatically over time.

Another problem: under the hood, the Linux page cache also performs synchronous blocking operations that impair system performance and predictability. Since Cassandra is unaware that a requested object does not reside in the Linux page cache, accesses to non-resident pages will cause a page fault and context switch to read from disk. Then it will context switch again to run another thread, and the original thread is paused. Eventually, when the disk data is ready (yet another interrupt context switch), the kernel will schedule in the original thread.

The diagram below shows the architecture of Cassandra’s caches, with layered key, row, and underlying Linux page caches.

A look at the Linux page cache used by Apache Cassandra

Achieving Better Performance (and Control)

We recognized that a special-purpose cache would deliver better performance than Linux’s default cache – so, we implemented our own. Our unified cache can dynamically tune itself to any workload and removes the need to manually tune multiple different caches (as users are forced to do with Apache Cassandra). With an understanding of which objects are being cached, we can carefully control how items are populated and evicted. Additionally, the cache can be dynamically expanded or contracted in response to different workloads and under memory pressures.

Upon a read, if the data is no longer cached, then ScyllaDB will initiate a continuation task to asynchronously read from disk. The Seastar framework that ScyllaDB is built on will execute the continuation task in a μsec (1 million tasks/core/sec) and will rush to run the next task. There’s no blocking, heavyweight context switch, or waste.

This cache design enables each ScyllaDB node to serve more data, which in turn lets users run smaller clusters of more powerful nodes with larger disks. It also simplifies operations since it eliminates multiple competing caches and dynamically tunes itself at runtime to accommodate varying workloads. Moreover, having an efficient internal cache eliminates the need for a separate external cache, making for a more efficient, reliable, secure, and cost-effective unified solution.

Deeper Into Our Cache Design

With that overview, let’s go deeper into the details of ScyllaDB’s cache implementation. To start, let’s examine the data flow from the replica side.

When a write arrives at a replica, it first goes to an in-memory data structure – the memtable, which lives in RAM. For the write to be considered successful, it must also go to the commitlog for recovery – but the commitlog itself is not relevant here.


When the memtable grows large enough, we flush it to an SSTable (an immutable data structure that lives on disk). At that point, a new memtable gets created (to receive incoming writes), the flushed memtable contents are merged with the cache, and then the memtable is removed from memory. This process continues, and SSTables accumulate.

When a read comes in, we need to combine data from the memtable and all the accumulated SSTables to get a consistent view of all the existing writes.


Read consistency is relatively simple to achieve here. For example, it can be achieved by taking a snapshot of memtables and pinning them in memory, taking a snapshot of SSTables, and then combining the data from all the sources. However, there’s a problem: it’s slow. It has to go to disk every time, and it reads from multiple parts. A cache can speed this along.

To avoid reading from disk every time, we use a read-through cache that semantically represents everything on disk (in SSTables) and caches a subset of that data in memory. The traditional way to implement that would be to use a buffer cache, caching the buffers we read from the SSTable files. Those buffers are typically 4 KB, which is what you would get if you used the Linux page cache.

Why Not Use a Buffer Cache?

As alluded to earlier, there are some problems with this approach (which is why ScyllaDB doesn’t use it).

Inefficient Use of Memory

First, it leads to inefficient memory use. If you want to cache just a single row, you need to cache a full buffer. A buffer is 4 KB, but the row can be much smaller (e.g., 300 bytes). If your data set is larger than the available RAM, access locality is not very likely, and this leads to inefficient use of memory.

Poor Negative Caching

This approach also affects negative caching very poorly. Since you don’t have a key, you need to cache the entire data buffer to indicate absent data. This further undermines memory efficiency.

Redundant Buffers Due to LSM

More problems with this approach: since the read might need to touch multiple SSTables, caching the row might require caching multiple buffers. This leads to inefficient memory usage again, and also to increased CPU overhead.

High CPU Overhead for Reads

When a row is cached across multiple buffers, data from those buffers must be merged on each read – and this consumes CPU cycles. Adding to the CPU overhead, storing buffers requires us to parse those buffers. The SSTable format isn’t optimized for read speed; it’s optimized for compact storage. You have to parse index buffers sequentially to interpret the index, and then you have to parse the data files. This can eat up even more CPU resources.

Premature Cache Eviction due to SSTable Compaction

SSTable compaction (which rewrites SSTables because they may contain redundant or expired data) can lead to premature cache eviction. The compaction process writes a new SSTable and deletes the old files. Deleting the old files implies that the buffers must be invalidated, which essentially invalidates the cache. Read performance suffers as a result because the reads result in cache misses.

Using an Object Cache Instead

Clearly, there are many problems with the buffer cache approach in this context. That’s why we opted to take a different path: implement an object cache. This specialized cache stores actual row objects, like memtables, that are not associated with files on disk. Think about it as another tree that holds the rows.

This data structure doesn’t suffer from the host of problems detailed above. More specifically, it’s optimized for fast reads and low CPU overhead. There’s a single version of a row, combining data from all relevant SSTables. And the caching is done with row granularity: if you want, you can keep only a single row in cache.

Memory Management

We don’t use the Linux page cache at all. We reserve the vast majority of each node’s available memory for ScyllaDB; a very small reserve is left for the OS (e.g., for socket buffers). ScyllaDB’s portion of the memory is mostly dedicated to the cache. The cache is assigned to use all available memory and then shrink on demand when there’s pressure from other parts of the system (like memtables or operational tasks). We have implemented controllers to ensure that other pressures never steal too much memory from the cache.

CPU Sharding

Another important design element has to do with sharding. With our shard-per-core architecture, every CPU in ScyllaDB is responsible for a subset of data and every CPU has separate data structures to manage the data. So, every CPU has a separate cache and separate memtables – and those data structures can be accessed only from the CPU that owns them.


Thread-Per-Core Architecture

Thread-per-core is another important component of this design. All processing is done in a single thread per CPU. Execution is divided into short tasks that are executed sequentially with cooperative preemption, which means that our code can precisely control the exact boundaries of the tasks. If there’s a preemption signal (which comes from a timer), then the task has to yield cooperatively. It’s not preempted in any place; it can determine the possible preemption points.

Cache Coherency

All of this allows us to have complex operations on data within the task – without having to deal with real concurrency (which we would face if we used multi-threading and accessed data from multiple CPUs). We don’t have to use locks, we avoid lock contention, and we don’t have to implement complex lock-free algorithms. Our data structures and algorithms can be simple. For example, when we have a read, it can access cache and memtable lookup in a single task, and have a consistent view on both. That’s achieved without involving any synchronization mechanisms. That means that everything works fast and performance is quite predictable.

Range Queries and Range Deletion

Supporting our query language and the data model is a potential problem with an object cache. ScyllaDB isn’t a simple key-value store. We support more complex operations such as range queries and range deletions, and that impacts how caching is implemented. This can lead to complications with an object cache that aren’t an issue with a buffer cache.

For example, consider range queries. You have a query that wants to scan a set of rows. If your cache was just key-value, you wouldn’t be able to use it to speed up your reads because you would never know if there was other data stored on disk in between the entries in the cache. As a result, such a read would have to always go to disk for the gaps.

Our cache is designed to handle this case. We store information about range continuity: indicating that a given range in cache is complete (and there’s no need for the read to check if the disk contains additional entries). If you repeat the scan, it won’t go to disk.

Also, range deletions require special handling. Due to ScyllaDB’s eventual consistency model, deletion is not just about removing data. It also leaves a marker for future reconciliation. That marker is called a tombstone, and the cache needs to be able to store this marker. Our cache is prepared to handle that; it piggybacks on the range continuity mentioned above – basically annotating the range continuity information with the tombstone.

Two Other Distinctive Caching Elements

Before we close, let’s look briefly at two other interesting things we’ve implemented with respect to ScyllaDB caching.

Cache Bypass

The fact that ScyllaDB is a read-through type of cache means that – by default – every read you perform will be populated to the cache and then served by the users. However, this may not always be what users want. For example, if you are potentially scanning tons of data, or occasionally need to query data that probably won’t be read again in the future, this could invalidate important items from your existing cache.

To prevent this, we extended the CQL protocol with the BYPASS CACHE extension. This tells the database that it should NOT populate the cache with the items it reads as a result of your query – thus stopping it from invalidating important records. BYPASS CACHE is also often used in conjunction with ScyllaDB’s Workload Prioritization on analytics use cases that frequently scan data in bulk.

SSTable Index Caching

SSTable Index Caching is a relatively new addition to ScyllaDB. Since the 5.0 release, we also cache the SSTable index component to further speed up reads that have to go to disk. The SSTable Index is automatically populated on access; upon memory pressure, it’s evicted in a way that doesn’t affect cache performance.

One advantage of SSTable Index caching is its impact on large partition performance. Below, you can see our before and after measurements with large partitions directly reading cold data from disk. Note that the throughput more than tripled after we introduced SSTable indexing, making on-disk lookups even faster.

Summary

In summary, ScyllaDB has a fast cache that’s optimized for speeding up reads. It is highly efficient because the data and cache are colocated on the same CPU. And it upholds all the semantics of the query language and the data model, as well as ScyllaDB’s consistency guarantees.

3 Technical Shifts that Reduce Distributed Database Costs

How teams can reduce the total cost of owning and operating a highly-available database 

For teams working on data-intensive applications, the database can be a low-hanging fruit for significant cost reduction. If you’re working with data-intensive applications, the total cost of operating a highly-available database can be formidable – whether you’re working with open source on-premises, fully-managed database-as-a-service or anything in between. That “total cost” goes beyond the price of the database and the infrastructure it runs on; there’s also all the operational expenses related to database tuning, monitoring, admin, and so on to consider.

This post looks at how teams can reduce that total cost of ownership by making 3 technical shifts:

  • Reduce node sprawl by moving to fewer, larger nodes
  • Reduce manual tuning and configuration by letting the database manage itself
  • Reduce infrastructure by consolidating workloads under a single cluster

Reduce cluster sprawl by moving to fewer, larger nodes

For over a decade, NoSQL’s promise has been enabling massive horizontal scalability with relatively inexpensive commodity hardware. This has allowed organizations to deploy architectures that would have been prohibitively expensive and impossible to scale using traditional relational database systems.

But a focus on horizontal scaling results in system sprawl, which equates to operational overhead, with a far larger footprint to keep managed and secure. Big clusters of small instances demand more attention, are more likely to experience failures, and generate more alerts, than small clusters of large instances. All of those small nodes multiply the effort of real-time monitoring and periodic maintenance, such as rolling upgrades.

Let’s look at a real-world example.  The diagram below illustrates the server costs and administration benefits experienced by a ScyllaDB customer. This customer migrated a Cassandra installation, distributed across 120 i4i.2xlarge AWS instances with 8 virtual CPUs each, to ScyllaDB. Using the same node sizes, ScyllaDB achieved the customer’s performance targets with a much smaller cluster of only 12 nodes. The initial reduction in node sprawl produced a 10X reduction in server costs, from $721,123 to $72,112 annually. It also achieved a 10X reduction in administrative overhead, encompassing failures, upgrades, monitoring, etc.

ScyllaDB reduced server cost by 10X and improves MTBF by 40X (120 nodes to 3)

Given ScyllaDB’s scale-up capabilities, the customer then moved to the larger nodes, the i4i.8xlarge instance with 32 virtual CPUs each. While the cost remained the same, those 3 large nodes were capable of maintaining the customer’s SLAs. Scaling up resulted in reducing complexity (and administration, failures, etc.) by a factor of 40 compared with where the customer began (by moving from 120 nodes to 3).

Reducing the size of your cluster also offers additional advantages beyond reducing the administrative burden:

  • Less Noisy Neighbors: On cloud platforms, multi-tenancy is the norm. A cloud platform is, by definition, based on shared network bandwidth, I/O, memory, storage, and so on. As a result, a deployment of many small nodes is susceptible to the ‘noisy neighbor’ effect. This effect is experienced when one application or virtual machine consumes more than its fair share of available resources. As nodes increase in size, fewer and fewer resources are shared among tenants. In fact, beyond a certain size your applications are likely to be the only tenant on the physical machines on which your system is deployed. This isolates your system from potential degradation and outages. Large nodes shield your systems from noisy neighbors.
  • Fewer Failures: Since nodes large and small fail at roughly the same rate, large nodes deliver a higher mean time between failures, or “MTBF” than small nodes. Failures in the data layer require operator intervention, and restoring a large node requires the same amount of human effort as a small one. In a cluster of a thousand nodes, you’ll likely see failures every day. As a result, big clusters of small nodes magnify administrative costs.
  • Datacenter Density: Many organizations with on-premises datacenters are seeking to increase density by consolidating servers into fewer, larger boxes with more computing resources per server. Small clusters of large nodes help this process by efficiently consuming denser resources, in turn decreasing energy and operating costs.

Learn more:

 

Reduce manual tuning and configuration by letting the database manage itself

Distributed database tuning often requires specialized resources and consumes much more time than any team really would like to allocate to it.

In any distributed database, many actors compete for available disk I/O bandwidth. Databases like Cassandra and Redis control I/O submission by statically capping background operations such as compaction, streaming, and repair. Getting that cap right requires painstaking, trial-and-error tuning combined with expert knowledge of specific database internals. Spiky workloads, with wide variance between reads and writes, or unpredictable end-user demand, are risky. Set the cap too high and foreground operations will be starved of bandwidth, creating erratic latency and bad customer experiences. Set it too low and it might take days to stream data between nodes, making auto-scaling a nightmare. What works today may be disastrous tomorrow.

Users are largely relieved of tuning when the database can automatically prioritize its own activities due to real-time, real-world conditions. For instance, ScyllaDB achieves this with several embedded schedulers/actuators, each of which is responsible for resources such as network bandwidth, disk I/O, and CPU. For example, ScyllaDB can enable application-level throttling, prioritize requests and process them selectively, and cancel requests before they impact a lower layer’s software stack. It achieves a certain level of concurrency inside the disk to get maximum bandwidth from it, but not to make this concurrency too high in order to prevent the disk from queueing requests internally for longer than needed.

This means that even under the most intense workloads, ScyllaDB runs smoothly without requiring frequent administrator supervision and intervention. Users don’t have to worry about tuning the underlying Linux kernel for better performance, or play with any Garbage Collector setting whatsoever (ScyllaDB is implemented in C++, so there is no Garbage Collection) .

The diagram below shows this architecture at a high level. In ScyllaDB, requests bypass the kernel for processing and are sent directly to ScyllaDB’s user space disk I/O scheduler. The I/O scheduler applies rich processing to simultaneously maintain system stability and meet SLAs.

On the left, requests generated by the userspace process are thrown directly into the kernel and lower layers. On the right, ScyllaDB’s disk I/O scheduler intermediates requests. ScyllaDB classifies requests into semantically meaningful classes (A and B), then tracks and prioritizes them – guaranteeing balance while ensuring that lower layers are never overloaded.

The tedium of managing compaction can also be reduced. Configuring compaction manually requires intimate knowledge of both expected end user consumption patterns along with low-level database-specific configuration parameters. That can be avoided if the database uses algorithms to let the system self-correct and find the optimal compaction rate under varying loads. This radically lowers the risk of catastrophic failures, and also improves the interplay between maintenance and customer facing latency.

The impact can be seen in the following performance graphs.

Throughput of a CPU in the system (green), versus percentage of CPU time used by compactions (yellow). In the beginning, there are no compactions. As time progresses the system reaches steady state as the throughput steadily drops.

Disk space assigned to a particular CPU in the system (yellow) versus shares assigned to compaction (green). Shares are proportional to the backlog, which at some point will reach steady state

95th, 99th, and 99.9th percentile latencies. Even under 100% resource utilization, latencies are still low and bounded.

The ingestion rate (yellow line) suddenly increases from 55MB/s to 110MB/s, as the payload of each request increases in size. The system is disturbed from its steady state position but will find a new equilibrium for the backlog (green line).

A sudden increase in requests requires the system to ingest data faster. As the ingestion bandwidth increases and data gets persisted into disk more aggressively, the compaction backlog also increases. The effects are shown below. With the new ingestion rate, the system is disturbed and the backlog grows faster than before. However, the compaction controller will automatically increase shares of the internal compaction process and the system in turn achieves a new state of equilibrium

Reduce infrastructure by running multiple workloads on a single cluster

When looking to save costs, teams might consider running multiple different use cases against the database. It is often compelling to aggregate different use cases under a single cluster, especially when those use cases need to work on the exact same data set. Keeping several use cases together under a single cluster can also reduce costs. But, it’s essential to avoid resource contention when implementing latency-critical use cases. Failure to do so may introduce hard-to-diagnose performance situations, where one misbehaving use case ends up dragging down the entire cluster’s performance.

There are many ways to accomplish workload isolation to minimize the resource contention that could occur when running multiple workloads on a single cluster. Here are a few that we have seen work well. Keep in mind that the approach to choose will depend on your existing database available options, as well as your use case’s requirements:

  • Physical Isolation: This setup is often used to entirely isolate one workload from another. It involves essentially extending your deployment to an additional region (which may be physically the same as your existing one, but logically different on the database side). As a result, the use cases are split to replicate data to another region, but queries are executed only within a particular region – in such a way that a performance bottleneck in one use case won’t degrade or bottleneck the other. Note that a downside of this solution is that your infrastructure costs double.
  • Logical Isolation: Some databases or deployment options allow you to logically isolate workloads without needing to increase your infrastructure resources. For example, ScyllaDB has a Workload Prioritization feature where you can assign different weights for specific workloads to help the database understand which workload you want it to prioritize in the event of system contention. If your database does not offer such a feature, you may still be able to run two or more workloads in parallel, but watch out against potential contentions on your database.
  • Scheduled Isolation: Many times, you might need to simply run batched scheduled jobs at specified intervals in order to support other business-related activities, such as extracting analytics reports. In those cases, consider running the workload in question at low-peak periods (if any exist), and experiment with different concurrency settings in order to avoid impairing the latency of the primary workload that’s running alongside it.

Let’s look at ScyllaDB’s Workload Prioritization in a bit more detail. This capability is often used to balance OLAP and OLTP workloads. The purpose of workload balancing is to provide a mechanism that ensures each defined task has a fair share of system resources such that no single job monopolizes system resources, starving other jobs of their needed minimums to continue operations.

Figure 6: OLTP and OLAP p99 latencies without workload prioritization

Latency between OLAP and OLTP on the same cluster before enabling workload prioritization.

In the above image, note that latency for both loads nearly converges. OLTP processing began at or below 2 ms for p99 up until the OLAP job began at 12:15. When OLAP was enabled OLTP p99 latencies shot up to 8 ms, then further degraded, plateauing around 11 –12 ms until the OLAP job terminates after 12:26. These latencies are approximately 6x greater than when OLTP ran by itself. (OLAP latencies hover between 12 – 14 ms, but, again, OLAP is not latency-sensitive.).

Figure 7: OLTP and OLAP throughput without workload prioritization

Comparative throughput results for OLAP and OLTP on the same cluster without workload prioritization enabled.

In the above image, note that throughput on OLTP sinks from around 60,000 ops to half that — 30,000 ops. You can see the reason why. OLAP, being throughput hungry, is maintaining roughly 260,000 ops.

The bottom line is that OLTP suffers with respect to both latency and throughput, and users experience slow response times. In many real-world conditions, such OLTP responses would violate a customer’s SLA.

Figure 8: OLTP and OLAP latencies with workload prioritization enabled

OLAP and OLTP latencies with workload prioritization enabled.

After workload prioritization is enabled, the OLTP workload similarly starts out at sub-millisecond to 2 ms p99 latencies. Once an OLAP workload is added performance degrades on OLTP processing, but with p99 latencies hovering between 4 – 7 ms (about half of the 11-12 ms p99 latencies when workload prioritization was not enabled). It is important to note that once system contention kicks in, the OLTP latencies are still somewhat impacted – just not to the same extent they were prior to workload prioritization. If your real-time workload requires ultra-constant single digit or less p99 millisecond latencies, then we strongly recommend that you avoid introducing any form of contention.

The OLAP workload, not being as latency-sensitive, has p99 latencies that hover between 25 – 65 ms. These are much higher latencies than before – the tradeoff for keeping the OLTP latencies lower.

Here, OLTP traffic is a smooth 60,000 ops until the OLAP load is also enabled. Thereafter it does dip in performance, but only slightly, hovering between 54,000 to 58,000 ops. That is only a 3% – 10% drop in throughput. The OLAP workload, for its part, hovers between 215,000 – 250,000 ops. That is a drop of 4% – 18%, which means an OLAP workload would take longer to complete. Both workloads suffer degradation, as would be expected for an overloaded cluster, but neither to a crippling degree.

Using this capability, many teams can avoid having one datacenter for OLTP real-time workloads and another for analytics workloads. That means less admin burden as well as lower infrastructure costs.

Wrap Up

These 3 technical shifts are just the start. Here are some additional ways to reduce your database spend:

And if you’d like advice on which, if any, of these options might be a good fit for your team’s particular workload, use case and ecosystem, the architects at ScyllaDB would be happy to provide a technical consultation.

 

Our Top NoSQL Blogs of the Year: Rust, Raft, MongoDB, Books & Tablets

As 2023 draws to a close, let’s take a moment to look back at our top 10 NoSQL blogs written this year – plus 10 “timeless classics” that continue to attract attention.

Before we start, a heartfelt thanks to the many community members who contributed to our blogs in various ways – from customers sharing best practices at ScyllaDB Summit, to open source contributors and ScyllaDB engineers explaining how they raised the bar on what’s possible for NoSQL performance, to anyone who has initiated or contributed to the discussion on Hacker News, Reddit, and other platforms.

Drumroll, please! Here are the most-read NoSQL blogs that we published in 2023:

Introducing “Database Performance at Scale”: A Free, Open Source Book

By Dor Laor

Introducing a new book that provides practical guidance for understanding the opportunities, trade-offs, and traps you might encounter while trying to optimize data-intensive applications for high throughput and low latency.

 

Why ScyllaDB is Moving to a New Replication Algorithm: Tablets

By Tomasz Grabiec

How moving from Vnode-based replication to tablets helps dynamically distribute data across the cluster, ultimately increasing ScyllaDB’s elasticity.

 

Top Mistakes with ScyllaDB: Storage

By Felipe Cardeneti Mendes

All about disk types, best practices, common misconceptions, filesystem types, RAID setups, tunable parameters, and what makes disks so special in the context of databases.

 

When to Use ScyllaDB vs MongoDB: Lessons Learned From 5+ Years in Production

By Cynthia Dunlop

Numberly has been using both ScyllaDB and MongoDB in production for 5+ years. Learn which NoSQL database they rely on for different use cases and why.

 

Benchmarking MongoDB vs ScyllaDB: Performance, Scalability & Cost

By Dr. Daniel Seybold

Dr. Daniel Seybold shares how MongoDB and ScyllaDB compare on throughput, latency, scalability, and price-performance in this third-party benchmark by benchANT.

 

How Numberly Replaced Kafka with a Rust-Based ScyllaDB Shard-Aware Application

By Alexys Jacob

How Numberly used Rust & ScyllaDB to replace Kafka, streamlining the way all its AdTech components send and track messages (whatever their form).

 

ScyllaDB Open Source Release NotesScyllaDB Open Source 5.2: With Raft-Based Schema Management

By Tzach Livyatan

ScyllaDB 5.2 introduces Raft-based strongly consistent schema management, DynamoDB Alternator TTL, and many more improvements and bug fixes.

 

5 Intriguing ScyllaDB Capabilities You Might Have Overlooked

By Felipe Cardeneti Mendes

A look at 5 “hidden gem” ScyllaDB capabilities that help ScyllaDB power users: workload prioritization, heat-weighted load balancing, per shard concurrency limit, per partition rate limit, and bypass cache.

 

MongoDB vs Postgres vs ScyllaDB: Tractian’s Benchmarking and Migration

By João Pedro Voltani and João Granzotti

TRACTIAN shares their comparison of ScyllaDB vs MongoDB and PostgreSQL, then provides an overview of their MongoDB to ScyllaDB migration process, challenges & results.

 

Rust in the Real World: Super Fast Data Ingestion Using ScyllaDB

By Javier Ramos

A walk through of a real-world use case where 1) You have hierarchical or graph data stored in an S3 data lake, 2) You need to ingest it with high throughput and speed into ScyllaDB, and 3) You need to traverse or search the graph very fast.

 

Bonus: Top NoSQL Database Blogs From Years Past

Many of the blogs published in previous years continued to resonate with the community. Here’s a rundown of 10 enduring favorites:

 

 

 

  • The Taming of the B-Trees  (Pavel “Xemul” Emelyanov) – Discover how and why ScyllaDB engineers implemented B-tree and B+-tree data structures in their NoSQL distributed database. We share the practical details you won’t read in books. (2021)

 

 

 

 

 

 

  • NoSQL vs. NewSQL: CockroachDB vs. ScyllaDB  (Ivan Prisyazhynyy) – NewSQL and NoSQL are both designs for distributed databases that go beyond the capabilities of a traditional RDBMS. But how are they similar and how do they differ? See this apples-and-oranges comparison of CockroachDB to ScyllaDB. (2021)

Vector Search for Production: A GPU-Powered KNN Ground Truth Dataset Generator

DataStax Astra DB and Apache Cassandra have always treated performance and scalability as first class citizens. When building Astra DB's vector search capabilities and its underlying ANN library (jvector), we spent a lot of time and energy optimizing for recall and precision in addition to the...

ScyllaDB Summit Registration + A Sneak Peek

If database performance at scale matters to your team, join your like-minded peers at ScyllaDB Summit 2024 on February 14 and 15. It’s free, intentionally virtual, and highly interactive.

Whether you want to discover the latest ScyllaDB advancements, hear how top engineering teams are solving their toughest database challenges, or explore the latest trends across the broader data ecosystem, we’ve got you covered.

Register Now – It’s Free

You’ll get 2 days of keynotes, technical sessions, hands-on labs, and community building. An impressive lineup of engineers across AdTech, social media, entertainment, and technology leaders are gearing up to share their insights – and we hope you will be part of the discussion.

Here’s a tease of what you can expect at the 2024 conference:

  • Why Disney Moved from DynamoDB to ScyllaDB
  • Tracking Millions of Heartbeats on Zee’s OTT Platform
  • Inside Expedia’s CDC Migration from Cassandra to ScyllaDB
  • A Benchmark of the DBaaS Market
  • Radically Outperforming DynamoDB @ Digital Turbine with SADA and Google Cloud
  • Real-Time Persisted Events at Supercell
  • Getting the Most Out of ScyllaDB Monitoring: ShareChat’s Tips
  • The Strategy Behind ReversingLabs’ Massive Key-Value Migration
  • ScyllaDB Under the Hood: Why Our Architecture Matters

Of course, we’ll also share the latest from ScyllaDB Engineering, with a deep dive into our move to immediate consistency and extreme elasticity via Raft and tablets. You’ll also learn what’s next for ScyllaDB with respect to Kubernetes, compaction strategies, shard-aware drivers, and Change Data Capture (CDC).

This isn’t your typical virtual conference. It’s exceptionally interactive, with constant opportunities to connect and collaborate with speakers, ScyllaDB engineers, and your peers across the database community. There’s no better place to share, discuss, and socialize with your performance-minded database peers.

Binge Watch Past Years’ Sessions

To get a taste of what our 2024 event will be like, you can browse and binge-watch our previous years’ sessions – no registration required. Some of the most talked-about sessions include:

  • How Discord Migrated Trillions of Messages from Cassandra to ScyllaDB
  • Using ScyllaDB for Distribution of Game Assets in Unreal Engine
  • The Consistency vs Throughput Tradeoff in Distributed Databases
  • Worldwide Local Latency With ScyllaDB
  • From Postgres to ScyllaDB: Migration Strategies and Performance Gains
  • Building a 100% ScyllaDB Shard-Aware Application Using Rust
  • ShareChat’s Journey Migrating 100TB of Data to ScyllaDB with NO Downtime

 

Watch on demand

Stay Tuned for Updated

Follow us on the socials to hear announcements of additional sessions and some new aspects we’re adding to shake things up this year. We’ll have more announcements soon!

 

NoSQL Data Modeling Best Practices for Performance

Even if you adopt the fastest database on powerful infrastructure, you won’t be able to tap its full potential unless you get your data modeling right.

Getting your data modeling wrong is one of the easiest ways to ruin your performance. And it’s especially easy to screw this up when you’re working with NoSQL, which (ironically) tends to be used for the most performance-sensitive workloads. NoSQL data modeling might initially appear quite simple: just model your data to suit your application’s access patterns. But in practice, that’s much easier said than done.

Fixing data modeling is no fun, but it’s often a necessary evil. If your data modeling is fundamentally inefficient, your performance will suffer once you scale to some tipping point that varies based on your specific workload and deployment. Even if you adopt the fastest database on the most powerful infrastructure, you won’t be able to tap its full potential unless you get your data modeling right.

This article explores three of the most common data modeling mistakes that ruin NoSQL database performance, along with best practices for how to avoid or resolve them.

Collections

Teams don’t always use collections, but when they do, they often use them incorrectly. Collections are meant for storing/denormalizing a relatively small amount of data. They’re essentially stored in a single cell, which can make serialization/deserialization extremely expensive.

When you use collections, you can define whether the field in question is frozen or non-frozen. A frozen collection can only be written as a whole; you cannot append or remove elements from it. A non-frozen collection can be appended to, and that’s exactly the type of collection that people most misuse. To make matters worse, you can even have nested collections, such as a map that contains another map, which includes a list, and so on.

Misused collections will introduce performance problems much sooner than large partitions, for example. If you care about performance, collections can’t be very large at all. For example, if we create a simple key:value table, where our key is a `sensor_id` and our value a collection of samples recorded over time, our performance will be suboptimal as soon as we start ingesting data.


        CREATE TABLE IF NOT EXISTS {table} (
                sensor_id uuid PRIMARY KEY,
                events map<timestamp, FROZEN<map<text, int>>>,
                )

The following monitoring snapshots show what happens when you try to append several items to a collection at once.

You can see that while the throughput decreases, the p99 latency increases. Why does this occur?

  • Collection cells are stored in memory as sorted vectors.
  • Adding elements requires a merge of two collections (old and new).
  • Adding an element has a cost proportional to the size of the entire collection.
  • Trees (instead of vectors) would improve the performance, BUT…
  • Trees would make small collections less efficient!

Returning that same example, the solution would be to move the timestamp to a clustering key and transform the map into a FROZEN collection (since you no longer need to append data to it). These very simple changes will greatly improve the performance of the use case.


        CREATE TABLE IF NOT EXISTS {table} (
                sensor_id uuid,
                record_time timestamp,
                events FROZEN<map<text, int>>,
         PRIMARY KEY(sensor_id, record_time)
                )

Large Partitions

Large partitions commonly emerge as teams scale their distributed databases. Large partitions are partitions that grow too big, up to the point when they start introducing performance problems across the cluster’s replicas.

One of the questions that we hear often – at least once a month – is “What constitutes a large partition?”  Well, it depends. Some things to consider:

  • Latency expectations:  The larger your partition grows, the longer it will take to be retrieved. Consider your page size and the number of client-server round trips needed to fully scan a partition.
  • Average payload size:  Larger payloads generally lead to higher latency. They require more server-side processing time for serialization and deserialization, and also incur a higher network data transmission overhead.
  • Workload needs: Some workloads organically require larger payloads than others. For instance, I’ve worked with a Web3 blockchain company that would store several transactions as BLOBs under a single key, and every key could easily get past 1megabyte in size.
  • How you read from these partitions:  For example, a time series use case will typically have a timestamp clustering component. In that case, reading from a specific time-window will retrieve much less data than if you were to scan the entire partition.

The following table illustrates the impact of large partitions under different payload sizes, such as 1, 2 and 4 kilobytes.

As you can see, the higher your payload gets under the same row count, the larger your partition is going to be. However, if your use case frequently requires scanning partitions as a whole, then be aware that databases have limits to prevent unbounded memory consumption. For example, ScyllaDB cuts off pages at every 1MB to prevent the system from potentially running out of memory. Other databases (even relational ones) have similar protection mechanisms to prevent an unbounded bad query from starving the database resources. To retrieve a payload size of 4KB and 10K rows with ScyllaDB, you would need to retrieve at least 40 pages to scan the partition with a single query. This may not seem a big deal at first. However, as you scale over time,  it could affect your overall client-side tail latency.

Another consideration: With databases like ScyllaDB and Cassandra, data written to the database is stored in the commitlog and under an in-memory data structure called a “memtable.”

The commitlog is a write-ahead log that is never really read from, except when there’s a server crash or a service interruption. Since the memtable lives in memory, it eventually gets full. To free up memory space, the database flushes memtables to disk. That process results in SSTables (sorted strings tables), which is how your data gets persisted.

What does all this have to do with large partitions? Well, SSTables have specific components that need to be held in memory when the database starts. This ensures that reads are always efficient and minimizes wasting storage disk I/O when looking for data. When you have extremely large partitions (for example, we recently had a user with a 2.5 terabyte partition in ScyllaDB), these SSTable components introduce heavy memory pressure, therefore shrinking the database’s room for caching and further constraining your latencies.

How do you address large partitions via data modeling?  Basically, it’s time to rethink your primary key. The primary key determines how your data will be distributed across the cluster, which improves performance as well as resource utilization. A good partition key should have high cardinality and roughly even distribution. For example, a high cardinality attribute like User Name, User ID or Sensor ID might be a good partition key. Something like State would be a bad choice because states like California and Texas are likely to have more data than less populated states such as Wyoming and Vermont.

Or consider this example. The following table could be used in a distributed air quality monitoring system with multiple sensors:

CREATE TABLE air_quality_data (
sensor_id text,
time timestamp,
co_ppm int,
PRIMARY KEY (sensor_id, time)
);

With time being our table’s clustering key, it’s easy to imagine that partitions for each sensor can grow very large, especially if data is gathered every couple of milliseconds. This innocent-looking table can eventually become unusable. In this example, it takes only ~50 days.

A standard solution is to amend the data model to reduce the number of clustering keys per partition key. In this case, let’s take a look at the updated `air_quality_data` table:

CREATE TABLE air_quality_data (
sensor_id text,
date text,
time timestamp,
co_ppm int,
PRIMARY KEY ((sensor_id, date), time)
);

After the change, one partition holds the values gathered in a single day, which makes it less likely to overflow. This technique is called bucketing, as it allows us to control how much data is stored in partitions.

Bonus: See how Discord applies the same bucketing technique to avoid large partitions.

Hot Spots

Hot spots can be a side effect of large partitions. If you have a large partition (storing a large portion of your data set), it’s quite likely that your application access patterns will hit that partition more frequently than others. In that case, it also becomes a hot spot.

Hot spots occur whenever a problematic data access pattern causes an imbalance in the way data is accessed in your cluster.  One culprit: when the application fails to impose any limits on the client side and allows tenants to potentially spam a given key. For example, think about bots in a messaging app frequently spamming messages in a channel. Hot spots could also be introduced by erratic client-side configurations in the form of retry storms. That is, a client attempts to query specific data, times out before the database does and retries the query while the database is still processing the previous one.

Monitoring dashboards should make it simple for you to find hot spots in your cluster. For example, this dashboard shows that shard 20 is overwhelmed with reads.

For another example, the following graph shows three shards with higher utilization, which correlates to the replication factor of three, configured for the keyspace in question.

Here, shard 7 introduces a much higher load due to the spamming.

How do you address hot spots? First, use a vendor utility on one of the affected nodes to sample which keys are most frequently hit during your sampling period. You can also use tracing, such as probabilistic tracing, to analyze which queries are hitting which shards and then act from there.

If you find hot spots, consider:

  • Reviewing your application access patterns. You might find that you need a data modeling change such as the previously-mentioned bucketing technique. If you need sorting, you could use a monotonically increasing component, such as Snowflake. Or, maybe it’s best to apply a concurrency limiter and throttle down potential bad actors.
  • Specifying per-partition rate limits, after which the database will reject any queries that hit that same partition.
  • Ensuring that your client-side timeouts are higher than the server-side timeouts to prevent clients from retrying queries before the server has a chance to process them ( “retry storms”).

Learn More: On-Demand NoSQL Data Modeling Masterclass

Want to learn more about NoSQL data modeling best practices for performance? Take a look at our NoSQL data modeling masterclass – three hours of expert instruction, now on demand and free.  You will learn how to:

  • Analyze your application’s data usage patterns and determine which data modeling approach will be most performant for your specific usage patterns.
  • Select the appropriate data modeling options to address a broad range of technical challenges, including benefits and trade-offs of each option
  • Apply common NoSQL data modeling strategies in the context of a sample application
  • Identify signs that indicate your data modeling is at risk of causing hot spots, timeouts and performance degradation – and how to recover

Access the Data Modeling Masterclass On-Demand (It’s Free)