Preventing Data Resurrection with Repair Based Tombstone Garbage Collection

Delayed repairs can lead to a data resurrection problem, where deleted data come back to life. In this post, I explain a novel way that ScyllaDB removes this limitation: deleting tombstones based on repair execution, not a strict duration (known as gc_grace_seconds). This will be available in ScyllaDB Open Source 5.0.

Background: Tombstones in ScyllaDB and Other CQL-Compatible Databases

You might wonder what a tombstone is. In ScyllaDB, as well as in Apache Cassandra and other CQL-compatible databases, a tombstone is a special data marker written into the database to indicate that a user has deleted some data.

This marker is needed because ScyllaDB and these other databases use immutable SSTable files. So when you need to delete data, rather than alter a database record itself, you leave a marker to let the database know that data has been marked as deleted. In future reads, the database will spot the tombstone and ensure that the deleted data is not returned as part of a query result.

Eventually, after a compaction process, the deleted rows and the tombstones will be discarded. Why not just keep all the tombstone markers? It leads to unbounded size. We could not keep the deleted data and tombstones forever. Otherwise, if there are a lot of deletes, the database will contain an ever-increasing amount of deleted data and tombstones. So you generally want tombstones purged to save disk space.

The solution to this problem is to drop tombstones when they are no longer necessary. Currently, we drop the tombstones when the following happens:

  • First, the data covered by the tombstone and tombstone can compact away together.
  • Second, when the tombstone is old enough. It is older than the gc_grace_seconds option, which is 10 days by default.

All of this becomes a non-trivial issue when data and tombstones are replicated to multiple nodes. There are cases where the tombstones might be missing on some of the replica nodes – for instance, if the node was down during a deletion and no repair was performed within gc_grace_seconds.  As a result, data resurrection could happen.

Here is an example:

  1. In a 3-node cluster, a deletion with consistency level QUORUM is performed.
  2. One node was down but the other two were up, so the deletion succeeds and tombstones are written on the two up nodes.
  3. Eventually the downed node rejoins the cluster. However, if it rejoins beyond the Hinted Handoff window, it does not get the message that one of its records was marked for deletion.
  4. When gc_grace_seconds is exceeded, the two nodes with tombstones do GC, so tombstones and the covered data are gone.
  5. The node that did not receive the tombstone still has the data that was supposed to be deleted.

When the user performs a read query, the database could return deleted data since one of the nodes still has the deleted data.

Timeout-based Tombstone GC

Let’s call the current tombstone GC method “timeout based tombstone GC.”

With this method, users have to run full cluster-wide repair within gc_grace_seconds to make sure tombstones are synced to all the nodes in the cluster.

However, this GC method is not robust since the correctness depends on user operations. And there is no guarantee that the repair can be finished in time, since repair is a maintenance operation and it has lowest priority. If there are important tasks like user workload and compaction, repair will be slowed down and might miss the deadline.

This adds pressure on ScyllaDB admins to finish repair within gc_grace_seconds. Note we encourage admins to use ScyllaDB Manager, which can help with automation of backups, repairs and compaction.

In practice, users may want to avoid repair to reduce performance impacts and serve more user workload during critical periods, e.g., holiday shopping.

So, we need a more robust solution.

Repair-based Tombstone GC

We implemented the repair based tombstone GC method to solve this problem.

The idea is that we automatically remove tombstones — perform GC — only after repair is performed. This guarantees that all replica nodes have the tombstones, whether or not the repair is finished within gc_grace_seconds.

This provides multiple benefits. There is no need to figure out a proper gc_grace_seconds number with this feature. In fact, it is very hard to find a proper one that works for different workloads. Further, there is no more data resurrection if repair is not performed in time. This means less pressure to run repairs in a timely manner. Since there is no more hard requirement to finish repairs in time, we can throttle repair intensity even more to reduce the latency impact on user workload. Conversely, if repair is performed more frequently than the gc_grace_seconds setting, tombstones can be GC’ed faster, which results in better read performance.

Usage

You can use ALTER TABLE and CREATE TABLE to turn on this feature using a new option: tombstone_gc.

For example,

ALTER TABLE ks.cf WITH tombstone_gc = {'mode':'repair'} ;

CREATE TABLE ks.cf (key blob PRIMARY KEY,  val blob) WITH tombstone_gc = {'mode':'repair'};

You can specify the following modes: timeout, repair, disabled and immediate.

  • The timeout mode is the default mode. It runs GC after gc_grace_seconds. It is the same as without this feature.
  • The repair mode is the one users should use. It will conduct GC after a repair is performed.
  • The disabled mode is useful when loading data into ScyllaDB. We do not want to run GC when only part of the data is available, since tools can generate out of order writes or writes in the past. In such cases, we can set the mode to disabled when the tools are loading data.
  • The immediate mode is mostly useful for Time Window Compaction Strategy (TWCS) with no user deletes. It is much safer than setting gc_grace_seconds to zero. We are even considering rejecting user deletes if the mode is immediate,  to be on the safe side.

A new gossip feature bit TOMBSTONE_GC_OPTIONS is introduced. A cluster will not use this new feature until all the nodes have been upgraded. To keep maximum compatibility and avoid surprising users, the default mode is still timeout. Users have to enable experimental features and then set the mode to repair explicitly to turn on this feature.

Summary

Repair based tombstone GC is a new ScyllaDB experimental feature which provides a more robust solution to ensuring that your dead data stays dead. It guarantees data consistency even if repair is not performed within gc_grace_seconds.  It makes it easier to operate ScyllaDB and makes the database safer in terms of data consistency. This feature will be available as an experimental new feature in ScyllaDB Open Source 5.0.

LEARN MORE ABOUT SCYLLADB OPEN SOURCE

Why Disney+ Hotstar Replaced Redis and Elasticsearch with ScyllaDB Cloud

Disney+ Hotstar, India’s most popular streaming service, accounts for 40% of the global Disney+ subscriber base. Disney+ Hotstar offers over 100,000 hours of content on demand, as well as livestreams of the world’s most-watched sporting events (Indian Premier League, with over 25 million concurrent viewers). IPL viewership has grown an order of magnitude over the past six years. And with Ms. Marvel, the first south Asian heritage superhero making her debut this month, Disney+ Hotstar’s rapid growth will certainly continue by reaching new audiences and demographics.

The “Continue Watching” feature is critical to the on-demand streaming experience for the 300 million-plus monthly active users. That’s what lets you pause a video on one device and instantly pick up where you left off on any device, anywhere in the world. It’s also what entices you to binge-watch your favorite series: complete one episode of a show and the next one just starts playing automatically.

However, it’s not easy to make things so simple. In fact, the underlying data infrastructure powering this feature had grown overly complicated. It was originally built on a combination of Redis and Elasticsearch, connected to an event processor for Kafka streaming data. Having multiple data stores meant maintaining multiple data models, making each change a huge burden. Moreover, data doubling every six months required constantly increasing the cluster size, resulting in yet more admin and soaring costs.

This blog shares an inside look into how the Disney+ Hotstar team led by Vamsi Subash Achanta (architect) and Balakrishnan Kaliyamoorthy (senior data engineer) simplified this data architecture for agility at scale.

TL;DR First, the team adopted a new data model, then they moved to a high-performance low-latency database-as-a-service (ScyllaDB Cloud). This enabled them to free up resources for the many other priorities and projects on the team’s plate. It also lowered latencies for both reads and writes to ensure the snappy user experience that today’s streaming users expect – even with a rapidly expanding content library and skyrocketing subscriber base.

Inside Disney+ Hotstar’s ‘Continue Watching’ Functionality

At Disney+ Hotstar, “Continue Watching” promotes an engaging, seamless viewing experience in a number of ways:

  • If the user plays a video and then later pauses or stops it, the video is added to their “Continue Watching” tray.
  • Whenever the user is ready to resume watching the video on any device, they can easily find it on the home page and pick up exactly where they left off.
  • When the user completes one episode in a series, the next episode is added to their “Continue Watching” tray.
  • If new episodes are added to a series that the user previously completed, the next new episode is added to their “Continue Watching” tray.

Figure 1: One example of Disney+ Hotstar’s “Continue Watching” functionality in action

Disney+ Hotstar users watch an average of 1 billion minutes of video every day. The company also processes nearly 100 to 200 gigabytes of data daily to ensure that the “Continue Watching” functionality is accurate for hundreds of millions of monthly users. Due to the volatile nature of user watching behavior, Disney+ Hotstar needed a database that could handle write-heavy workloads. They also needed a database that could scale appropriately during high traffic times, when the request volume increases by 10 to 20 times within a minute.

Figure 2 shows how the “Continue Watching” functionality was originally architected.

Figure 2: A look at the legacy architecture that Disney+ Hotstar decided to replace

First, the user’s client would send a “watch video” event to Kafka. From Kafka, the event would be processed and saved to both Redis and Elasticsearch. If a user opened the home page, the backend was called, and data was retrieved from Redis and Elasticsearch. Their Redis cluster held 500 GB of data, and the Elasticsearch cluster held 20 terabytes. Their key-value data ranged from 5 to 10 kilobytes per event. Once the data was saved, an API server read from the two databases and sent values back to the client whenever the user next logged in or resumed watching.

Redis provided acceptable latencies, but the increase in data size meant that they needed to horizontally scale their cluster. This increased their cost every three to four months. Elasticsearch latencies were on the higher end of 200 milliseconds. Moreover, the average cost of Elasticsearch was quite high considering the returns. They often experienced issues with node maintenance and manual effort was required to resolve the issues.

Here’s the data model behind that legacy data architecture:

Not surprisingly, having two data stores led to some significant scaling challenges. They had multiple data stores with different data models for the same use case: one key-value and one document. With an influx of users joining Disney+ Hotstar daily, it was becoming increasingly difficult to manage all this data. Moreover, it became quite costly to maintain two data stores with different code bases and different query patterns at high scales. Every six months, they were almost doubling their data. This required an increase in clusters, which resulted in burdensome administration and spiraling costs.

Redesigning the Data Model

The first step in addressing these challenges was designing a new data model: a NoSQL key-value data store. To simplify, they aimed for a data model with only two tables.

The User table is used to retrieve the entire “Continue Watching” tray for the given user, all at once. If a new video needs to be added to the user’s “Continue Watching” tray, it is appended to the list for the same User-Id key.

The User-Content table is used for modifying specific Content-Id data. For example, when the user resumes the video and then pauses it, the updated Timestamp is stored. When the video is fully watched, the entry can be directly queried and deleted. In this table, User-Id is the primary key and Content-Id is the secondary (clustering) key.

Selecting a New Database

The team considered a number of alternatives, from Apache Cassandra and Apache HBase to Amazon DynamoDB to ScyllaDB. Why did they ultimately choose ScyllaDB? A few important reasons:

  • Performance: ScyllaDB’s deep architectural advancements deliver consistently low latencies for both reads and writes, ensuring a snappy user experience even when live events exceed 25 million concurrent viewers.
  • Operational simplicity: ScyllaDB was built from the ground up to deliver self-optimizing capabilities that deliver a range of benefits, including the ability to run operational and analytics workloads against unified infrastructure, higher levels of utilization that prevent wasteful overprovisioning and significantly lower administrative overhead.
  • Cost efficiency: ScyllaDB Cloud, a fully-managed database-as-a-service (NoSQL DBaaS), offers a much lower cost than the other options they considered.

Figure 3: Performance monitoring results of ScyllaDB showing sub-millisecond p99 latencies and average read and write latencies in the range of 150 – 200 microseconds

Migrating with Zero Downtime

From Redis and Elasticsearch to ScyllaDB Cloud

Disney+ Hotstar’s migration process began with Redis. The Redis to ScyllaDB migration was fairly straightforward because the data model was so similar. They captured a Redis snapshot in an RDB format file, which was then converted into comma-separated value (CSV) for uploading into ScyllaDB Cloud using cqlsh. A lesson learned from their experience: Watch for maximum useful concurrency of writes to avoid write timeouts.

Running with seven threads, they migrated 1 million records in 15 minutes. To speed up the process, they scaled up the number of threads and added more machines.

Figure 4: The Redis to ScyllaDB Cloud migration

A similar process was applied to the Elasticsearch migration. JSON documents were converted to CSV files, then CSV files were copied to ScyllaDB Cloud.

Once ScyllaDB Cloud had been loaded with the historical data from both Redis and Elasticsearch, it was kept in sync by:

  • Modifying their processor application to ensure that all new writes were also made to ScyllaDB.
  • Upgrading the API server so that all reads could be made from ScyllaDB as well.

Figure 5: Moving from Redis and Elasticsearch to ScyllaDB Cloud

At that point, writes and reads could be completely cut out from the legacy Redis and Elasticsearch systems, leaving ScyllaDB to handle all ongoing traffic. This migration strategy completely avoided any downtime.

Figure 6: ScyllaDB Cloud now handles all ongoing traffic

ScyllaDB Open Source to ScyllaDB Cloud

The Disney+ Hotstar team had also done some work with ScyllaDB Open Source and needed to move that data into their managed ScyllaDB Cloud environment as well. There were two different processes they could use: SSTableloader or the ScyllaDB Spark Migrator.

Figure 7: SSTableloader based migration from ScyllaDB open source to ScyllaDB Cloud

SSTableloader uses a nodetool snapshot of each server in a cluster, and then uploads the snapshots to the new database in ScyllaDB Cloud. This can be run in batches or all at once. The team noted that this migration process slowed down considerably when they had a secondary (composite) key. To avoid this slowdown, the team implemented the ScyllaDB Spark Migrator instead.

Figure 8: Migrating from ScyllaDB Open Source to ScyllaDB Cloud with the ScyllaDB Spark migrator

In this process, the data was first backed up to S3 storage, then put onto a single node ScyllaDB Open Source instance (a process known as unirestore). From there, it was pumped into ScyllaDB Cloud using the ScyllaDB Spark Migrator.

Serving the Fastest-Growing Segment of Disney+

The team is now achieving sub-millisecond p99 latencies, with average read and write latencies in the range of 150 to 200 microseconds. Moreover, with a database-as-a-service relieving them of administrative burdens like database backups, upgrades and repairs, they can focus on delivering exceptional experiences across the fastest-growing segment of Disney+ global subscribers. For example, they recently rearchitected the platform’s recommendation features to use ScyllaDB Cloud. Additional projects on the short-term horizon include migrating their watchlist functionality to ScyllaDB Cloud.

DISCOVER SCYLLADB CLOUD

 

 

Numberly: Learning Rust the Hard Way for Kafka + ScyllaDB in Production

Alexys Jacob is CTO of Numberly, a French digital data marketing powerhouse whose experts and systems help brands connect with their customers using all digital channels available. The developer community is quite familiar with Alexys, where he is known as “Ultrabug,” working on a variety of open source projects, such as ScyllaDB, Python and Gentoo Linux.

Alexys has a penchant to accept deep technical challenges in gaining performance with ScyllaDB. Such as when he delved into what it takes to make a shard-aware Python driver (which you can read more about here and here) — gaining between 15% to 25% better throughput. Or when he was able to get Spark job processing with ScyllaDB down from 12 minutes to just over 1 minute.

Alexys Jacob, basically.

At a recent webinar Alexys described a new performance challenge he set for himself and for Numberly: to move a key element of their code from Python to Rust, in order to accelerate a data pipeline powered by ScyllaDB and Apache Kafka for event streaming.

He began by describing the reasoning behind such a decision. It comes from Numberly’s use of event streaming and specialized data pipelining applications, which they call data processor applications. (You can read more about how Numberly uses Kafka with ScyllaDB.)

Each such data processor prepares and enriches the incoming data so that it is useful to the downstream business partner or client applications in a timely manner. Alexys emphasized, “availability and latency are business critical to us. Latency and resilience are the pillars upon which we have to build our platforms to make our business reliable in the face of our clients and partners.” Simply put, for Numberly to succeed Kafka and ScyllaDB can’t fail.

Over the past five years Numberly relied heavily upon three of the most demanding data processors which had been written in Python. Alexys noted the risks and natural reluctance to change them out: “They were battle tested and trustworthy. We knew them by heart.”

During that same time, Alexys kept tabs on the maturation of Rust as a development language. His natural curiosity and desire to improve his skills and Numberly’s capabilities drove him to consider switching out these data processors to Rust. It wasn’t a decision to be taken lightly.

Regarding Rust, Alexys commented, “it felt less intimidating to me than C or C++ — sorry Avi.” So when this opportunity came he went to his colleagues and suggested rewriting them in Rust. The internal response was, at first, less than enthusiastic. There was no Rust expertise at Numberly. This would be Alexys’ first project with the language. There were major risks with this particular bit of code. “Okay, I must admit that I lost my CTO badge for a few seconds when I saw their faces.”

Alexys needed to justify the decision with clear rationale, and delineated the promises Rust makes. “It’s supposed to be secure, easy to deploy, makes few or no [performance] compromises and it also plays well with Python.But furthermore their marketing motto speaks to the marketer inside me: ‘a language empowering everyone to build reliable and efficient software.’”

This resonated strongly with Alexys. “That’s me. Reliable and efficient software.” Alexys noted that ‘efficient’ software is not precisely synonymous with ‘fastest’ software. “Brett Cannon, a Python core developer, advocates that selecting a programming language for being faster on paper is a form of premature optimization.”

Alexys enumerated just a few possible meanings of “fast:”

  • Fast to develop?
  • Fast to maintain?
  • Fast to prototype?
  • Fast to process data?
  • Fast to cover all failure cases?

“I agree with him in the sense that the word ‘fast’ has different meanings depending on your objectives. To me Rust can be said to be faster as a consequence of being efficient, which does not cover all the items on the list here.”

Applying them to the Numberly context, no, Rust would not be faster to develop than Python, since there was a learning curve involved. Whereas Alexys had over 15 years of Python experience.

Nor would it be faster to maintain, since they had not yet made Rust an operational language in their production environment.

Would it be faster to prototype? Again, no, since unlike the immediacy of interpreted Python there would need to be compile times involved.

Would it be faster to process data? On paper, yes. That was the key reason to adopt Rust, and it was an educated guess that it would perform faster than Python. They still needed to prove it and measure the gains. Because right now, Python had proven to be “fast enough.”

Alexys asked the not-so-rhetorical questions he was facing. “So why would I want to lose time? The short answer is innovation. Innovation cannot exist if you don’t accept to lose time. The question is to know when and on what project.” Alexys had an inner surety this project was the right one at the right time. While Rust would make him slow at first, its unique design would provide more reliable software. Stronger software. Type safe. More predictable, readable and maintainable.

Alexys quipped, “It’s still more helpful to have a compiler error that is explained very well than a random Python exception.”

Plus, not to be overlooked, Rust would provide for better dependency management. “It looks sane compared to what I’m used to in Python. Exhaustive pattern matching that brings confidence that you’re not forgetting something while you code. And when you compile error management primitives — failure handling right in the language syntax.”

Rust’s bottom line for Alexys was clear: “I chose Rust because it provided me with the programming language at the right level abstractions and the right paradigms. This is what I needed to finally understand and better explain the reliability and performance of an application.”

Production is not a “Hello World”

Learning Rust the hard way meant more than just tackling semicolons and brackets. This wasn’t going to be a simple “hello world” science project. It meant dealing with high stakes and going straight into production. The data processing app written in Rust needed to be integrated into their Kubernetes orchestration mechanism, and observable via Prometheus, Grafana, and Sentry. It needed error handling, latency optimization, integration with their Avro schema registry, bridge successfully between Confluent Kafka and their multi-datacenter ScyllaDB deployment, and more.

Watch the Session in Full

This is just the beginning of the challenge. To see how Alexys and the team at Numberly solved it and successfully moved from Python to Rust, you can watch the full webinar on-demand below. You can also view the slides here.



Lastly, you can read the blog Alexys wrote regarding the Rust implementation on Numberly’s own blog here.

Get Started with ScyllaDB

If you’d like to learn more about using ScyllaDB in your own event streaming platform, feel free to contact us directly, or join our vibrant Slack community.

If you want to jump right on in you can take free courses in using Kafka and ScyllaDB on ScyllaDB University, and get started by downloading ScyllaDB Open Source, or creating an account on ScyllaDB Cloud.

GET STARTED WITH SCYLLADB

Upgrades to Our Internal Monitoring Pipeline: Upgrading our Instametrics Cassandra cluster from 3.11.6 to 4.0

In this series of blogs, we have been exploring the various ways we pushed our metrics pipeline—mainly our Apache Cassandra® cluster named Instametrics—to the limit, and how we went about reducing the load it was experiencing on a daily basis. In this blog we go through how our team tackled a zero-downtime upgrade of our internal Cassandra 3.11.6 to Cassandra 4.0, in order for them to get real production experience and expertise in the upgrade process. We were even able to utilize our Kafka monitoring system to perform some blue green testing, giving us greater confidence in the stability of the Cassandra 4.0 release.

One of the major projects for our Cassandra team over the last 12 months has been preparing for the release of Cassandra 4.0. This release had improved built-in auditing, made significant improvements to the speed of streaming, enhanced Java 11 compatibility, facilitated auditing out of the box, and more. However, we needed to be sure that before the version was deployed onto our customers’ production mission critical clusters, we were confident that it would have the same level of performance and reliability as the 3.X releases.

The first thing that we did was make sure that we had beta releases on our managed platform to trial, before the project had officially released 4.0. This meant that customers could provision a 4.0 cluster, in order to perform initial integration testing on their applications.

It is worth noting that as part of introducing the Cassandra 4.0 beta to the Instaclustr managed platform we did have to do some work on upgrading our tooling. This included ensuring that metrics collection continued to operate correctly, accounting for any metrics which had been renamed, as well as updates to our subrange repair mechanism. We also submitted some patches to the project in order to make the Cassandra Lucene plugin 4.0 compatible. 

We knew that as we got closer to the official project release we would want to have real-world experience upgrading, running, and supporting a Cassandra 4.0 cluster under load. For this, we turned to our own internal Instametrics Cassandra cluster, which stores all of our metrics for all our nodes under management. 

This was a great opportunity to put our money where our mouth was when it came to Cassandra 4.0 being ready to be used by customers. Our Cassandra cluster is a critical part of our infrastructure however, so we needed to be sure that our method for upgrading was going to cause no application downtime.

How We Configured Our Test Environment

Like most other organizations, Instaclustr maintains a separate but identical environment for our developers to test their changes in, before being released to our production environment. Our production environment supports around 7000 instances, where our test environment is usually somewhere around 70 developer test instances. So, whilst it is functionally identical, the load is not.

Part of this is a duplication of the Instametrics Cassandra Cluster, albeit at a much smaller scale of 3 nodes.

Our plan for testing in this environment was reasonably straightforward:

  1. Create an identical copy of our cluster by restoring from backup to a new cluster using the Instaclustr Managed Platform
  2. Upgrade the restored cluster from 3.X to 4.0, additionally upgrading the Spark add-on
  3. Test our aggregation Spark jobs on the 4.0 cluster, as well as reading and writing from other applications which integrate with the Cassandra cluster
  4. Switch over the test environment from using the 3.x cluster to the 4.0 cluster

Let’s break down these steps slightly, and outline why we chose to do each one.

The first step is all about having somewhere to test, and break, without affecting the broader test environment. This allowed our Cassandra team to take their time diagnosing any issues without affecting the broader development team, who may require a working metrics pipeline in order to progress their other tickets. The managed platform automated restore makes this a trivial task, and means we can test on the exact schema and data inside our original cluster.

When it came to upgrading the cluster from 3.X to 4.0, we discussed with our experienced Technical Operations team the best methodology to upgrade Cassandra Clusters. Our team is experienced in both major and minor version bumps with Cassandra, and outlined the methodology used on our managed platform. This meant that we could test that our process would still be applicable to the upgrade to 4.0. We were aware that the schema setting “read_repair_chance” had been removed as part of the 4.0 release, and so we updated our schema accordingly.

Finally, it was time to check that our applications, and their various Cassandra drivers, would continue to operate when connecting to Cassandra 4.0. Cassandra 4.0 has upgraded to using native protocol version 5, which some older versions of Cassandra drivers could not communicate with. 

There was a small amount of work required for us to upgrade our metric aggregation Spark jobs in order to work with the newer version of Spark, which was required for us upgrading our Spark Cassandra driver. Otherwise, all of our other applications continued to work without any additional changes. These included applications using the Java, Python, and Clojure drivers.

Once we had completed our integration testing in our test environment, we switched over all traffic in the test environment from our 3.X cluster to the new 4.0 cluster. In this situation we did not copy over any changes which were applied to the 3.X cluster in between restoring the backup, and switching over our applications. This was strictly due to this being a test environment, and these not being of high importance. 

We continued to leave this as the cluster being used in our test environment, in order to see if any issues would slowly be uncovered after an extended time. We began working on our plan for upgrading our production cluster to the 4.0 release.

Although we had initially intended to release the Cassandra 4.0.0 version to our Cassandra cluster as soon as it was available, unfortunately due to a nasty bug in the 4.0.0 release, we decided to delay this until a patch could be raised against it. 

The silver lining here was that due to additional work on our metrics pipeline being deployed, we had additional options for testing a production load on a Cassandra 4.0 cluster. As we covered in an earlier blog, we had deployed a Kafka architecture in our monitoring pipeline, including a microservice who writes all metrics to our Cassandra cluster. 

What this architecture allows us to do is effectively have many application groups consume the same stream of metrics from Kafka, with a minimal performance impact. We have already seen how we had one consumer writing these metrics to Cassandra, and another which writes it to a Redis cache.

So, what’s the big benefit here? Well, we can duplicate the writes to a test Cassandra cluster, while we perform the upgrade! Effectively giving us the ability to perform a test upgrade on our actual live production data, with no risk to console downtime or customer issues! All we have to do is create an additional Cassandra cluster, and an additional group of writer applications. 

So, we deployed an additional 3.11.6 Cassandra cluster which was configured identically to our existing Instametrics Cassandra cluster, and applied the same schema. We then configured a new group of writer applications, which would perform the exact same operations, on the same data, as our “live” production Cassandra cluster. In order to put a read load on the cluster, we also set up a Cassandra Stress instance to put a small read load on the cluster. We then left this running for a number of days in order to place an appropriate amount of data into the cluster for the upgrade.

The Test Upgrade

Now came the fun part! Upgrading our test Cassandra cluster from 3.X to 4.0, while under load. We applied our upgrade procedure, paying careful attention to any application side errors from either of our writers, or Cassandra stress. Cassandra by design should be able to perform any upgrade, major or minor, without any application side errors if your data model is designed correctly. 

We did not experience any issues or errors during our test upgrade procedure, and the upgrade process was a successful operation! We did see slightly elevated CPU usage and OS load during the upgrade process, but that is to be expected due to upgrading sstables, running repairs, and a reduction in available nodes during the upgrade. 

In order to gain further confidence in the system, we also left this configuration running for a number of days. This was to ascertain if there was any performance or other issues with longer running operations such as compactions or repairs. Again, we did not see any noticeable impacts or performance drops across the two clusters when running side by side.

The Real Upgrade

Filled with confidence in our approach, we set out to apply the exact same process to our live production cluster, with much the same effect. There was no application downtime, and no issues with any operation during the upgrade process. Keeping a watchful eye on the cluster longer term, we did not see any application-side latency increases, or any other issues other than the elevated CPU usage and OS load we saw on the test cluster.

Once completed, we removed our additional writer infrastructure that had been created.

Wrapping Up

It has now been a number of months since we upgraded our Cassandra cluster to 4.0.1, and we have not experienced any performance or stability issues. Cassandra 4.0.1 continues to be our recommended version that customers should be using for their Cassandra workloads.

The post Upgrades to Our Internal Monitoring Pipeline: Upgrading our Instametrics Cassandra cluster from 3.11.6 to 4.0 appeared first on Instaclustr.

Announcing ScyllaDB University LIVE,  Summer 2022

I’m happy to announce our upcoming ScyllaDB University LIVE Summer Session, taking place on Thursday, July 28th, 8AM-12PM PDT!

REGISTER NOW

ScyllaDB University LIVE is a free, online, half-day, instructor-led training event with exclusive Scylla and NoSQL database content. I invite you to register now and save your spot.

We will have two parallel tracks: a beginners track focused on the basics of Scylla, and a track covering advanced topics such as best practices for CDC, Kafka, Kubernetes, Prepared Statements, and more.

Sessions will be run in parallel, so you can bounce back and forth between tracks or drop in for the sessions that interest you.  The sessions are conducted by some of our leading experts and engineers. They are live and interactive, and we welcome you to ask questions throughout the sessions. Following the training sessions, we will host an expert panel with special guests ready to answer your most pressing questions.

When choosing the topics for the different sessions, we chose a mix of some popular basic material for people new to Scylla, and advanced material, for experienced users that want to deepen their knowledge and see what’s new at the cutting edge of technology.

After the training sessions, you will have the opportunity to take accompanying ScyllaDB University courses to get some hands-on experience, complete quizzes, receive certificates of completion, and earn some exclusive swag.

Agenda

I’ll start with a quick welcome talk, briefly describing the different sessions and receiving everyone. Afterward, we’ll host two parallel tracks of sessions.

Essentials Track Advanced  Track
ScyllaDB Essentials

Intro to Scylla, Basic concepts, Scylla Architecture, Hands-on Demo

Kafka and Change Data Capture (CDC)

What is CDC? Consuming data, Under the hood, Hands-on Example

ScyllaDB Basics

Basic Data Modeling, Definitions, Data Types, Primary Key Selection, Clustering key, Compaction, ScyllaDB Drivers, Overview

Running ScyllaDB on Kubernetes

ScyllaDB Operator, ScyllaDB Deployment, Alternator Deployment, Maintenance, Hands-on Demo, Recent Updates

Build Your First ScyllaDB-Powered App

This session is a hands-on example of how to create a full-stack app powered by Scylla Cloud and implement CRUD operations using NodeJS and Express.

Advanced Topics in ScyllaDB

Collections, UDT, MV, Secondary Index, Prepared Statements, Paging, Retries, Sizing, TTL, Troubleshooting

 

After the last two sessions, we’ll host a roundtable with our session presenters and some other leading experts that will be available to answer your questions.

Before the event, you can check out our free courses on ScyllaDB University. They are completely free and will allow you to better understand ScyllaDB and how the technology works.

REGISTER NOW FOR SCYLLADB UNIVERSITY LIVE

.

Multidimensional Bloom Filter Secondary Index: The What, Why, and How

What

Bloom filters are space-efficient probabilistic data structures that can yield false positives but not false negatives. They were initially described by Burton Bloom in his 1970 paper  “Space/Time Trade-offs in Hash Coding with Allowable Errors“. They are used in many modern systems including the internals of the Apache® projects Cassandra®, Spark™, Hadoop®, Accumulo®, ORC™, and  Kudu™.

Multidimensional Bloom filters are data structures to search collections of Bloom filters for matches. The simplest implementation of a Multidimensional Bloom filter is a simple list that is iterated over when searching for matches. For small collections (n < 1000) this is the most efficient solution. However, when working with collections at scale other solutions can be more efficient. 

We implemented a multidimensional Bloom filter as a Cassandra secondary index in a project called Blooming Cassandra. The code is released under the Apache License 2.0.  The basic assumption is that the client will construct the Bloom filters and write them to a table. The table will have a BloomingIndex associated with it so that when the Bloom filter column is written or updated the Index will be updated. To search the client constructs a Bloom filter and executes a query with a WHERE clause like WHERE column = bloomFilter.  All potential matches are returned. The client is responsible for filtering out the false positives.

The CQL code would look something like this:

Create Table blah (a text,b int,c text,...,bloomFilter blob)
Create Custom Index on blah(bloomFilter) using BloomingIndex
Insert into blah (‘wow’, 1, ‘fun’, …, 0x1234e5ac)
Select a,b,c from blah where bloomFilter = 0x1212a4ac

Why

Our initial use case was that we wanted to be able to encrypt data on the client, write the data to Cassandra, and be able to search for the data without decrypting it. Our solution is to create a Bloom filter comprising the original plain text for each searchable column, encrypt the columns, add a column for the unencrypted Bloom filter, and write the resulting row to Cassandra. To retrieve the data, the desired unencrypted column values are used to create a Bloom filter. The database is then searched for matching filters and the encrypted data returned. The encrypted data are decrypted and false positives removed from the solution. This strategy was presented in my 2020 FOSDEM talk “Indexing Encrypted Data Using Bloom Filters”.

The Bloom filter could also be used to produce a weak reference to another Cassandra table to simplify joins. If we assume two tables: A and B where there is a one-to-many correspondence between them such that a row in A is associated with multiple rows in B. A Bloom filter can be created from the key value from A and inserted into B. Now we can query B for all rows that match A. We will have to filter out false positives, but the search will be reasonably fast.

With the multidimensional Bloom filter index, it becomes feasible to query multiple columns in large scale data sets. If we have a table with a large number of columns, for example, DrugBank; a dataset that contains 107 data fields, a column could be added to each row comprising the values for each data field. Then queries could be constructed that looked for specific value combinations across those columns.

How

Several multidimensional Bloom filter strategies were tried before the Flat-Bloofi solution was selected.  Flat-Bloofi as described by Adina Crainiceanu and Daniel Lemire in their 2015 paper Bloofi: Multidimensional Bloom filters, uses a matrix approach where each bit position in the Bloom filter is a column and each Bloom filter is a row in a matrix. To find matching bloom filters for a target filter, each column of the matrix that corresponds to an enabled bit in the target is scanned and the row indexes of all the rows that have that bit enabled are collected into a set. The intersection of the sets is the set of Bloom filters that match.

We also tested a “Chunked Flat-Bloofi” which breaks the filter down by bytes rather than bits.  This solution worked well for small data sets (n<2 million rows), however, the query process suffers from a query expansion during the search as described below. On storage, the Bloom filter is decomposed to a byte array and each non-zero byte, its array position and the identifier for the Bloom filter is then written to an index table. When searching for the target, the target Bloom filter is decomposed to a byte array and each non-zero byte is expanded to the set of all “matching” bytes, where “matching” is defined as the Bloom filter matching algorithm. The index table is then queried for all identifiers associated with the matching bytes at the index position. The result of this query is the set of all Bloom filters that have the bits enabled in the target byte and position. Each set represents a partial solution, the complete solution is the intersection of the sets. The solution is the complete set of matching identifiers.

The query expansion problem occurs whenever a Multidimensional Bloom filter strategy uses chunks rather than bits as the fundamental query block. Each target byte, with the exception of 0xFF, has more than one matching solution. For example the byte 0xF1  will match 0xF1 , 0xF3 , 0xF5 , 0xF7 , 0xF9 , 0xFB , 0xFD  and 0xFF . This has to be accounted for in the query processing or the original indexing.

The Flat-Bloofi implementation was built using memory mapped files to allow for fast access to the data. The implementation comprises 4 files, the Flat-Bloofi data structure, a list of active/inactive elements in the Flat-Bloofi structure, a mapping of the Flat-Bloofi numerical ID to the data table key, and a list of active/inactive mapping entries. The standard SSTable index was used to map the data table key to the Flat-Bloofi id.  All code is available in the Blooming Cassandra code base.

The post Multidimensional Bloom Filter Secondary Index: The What, Why, and How appeared first on Instaclustr.

P99 CONF 2022 Registration Now Open, Speakers Announced

P99 CONF is the event for engineers who care about P99 percentiles and high-performance, low-latency applications. This free, virtual conference was created by engineers for engineers. P99 CONF brings together speakers from across the tech landscape spanning perspectives from architecture and design of distributed systems, to the latest techniques in operating systems and programming languages, to databases and streaming architectures, to real-time operations and observability.

Register Now!

P99 CONF is a free virtual event scheduled for Wednesday and Thursday, October 19th and 20th, 2022, from 8:00 AM to 1:00 PM Pacific Daylight Time (PDT) — 16:00 – 20:00 UTC.

REGISTER FOR P99 CONF NOW!

Featured Speakers


Bryan Cantrill
CTO
Oxide Computer Company

Liz Rice
Chief Open Source Officer
Isovalent

Gil Tene
CEO and Co-Founder
Azul Systems


Steven Rostedt
Software Engineer
Google

Bryan Cantrill is one of Silicon Valley’s strongest opinionators and best raconteurs. His career spans from his stints at Sun Microsystems and later Oracle, to now being deeply embedded in the embedded systems making Oxide Computer one of the strongest hardware manufacturers you’ll find in this next tech cycle.

Liz Rice loves making, understanding and explaining code. She is currently the Chief Open Source Officer at eBPF pioneer Isovalent, makers of the Cilium project. Throughout her career she has been a strong proponent of open source, either writing her own code, or co-chairing CNCF conferences like KubeCon and CloudNativeCon.

Gil Tene, CEO and Co-Founder of Azul Systems, was one of the people who got us to host P99 CONF in the first place. His seminal talk on How NOT to Measure Latency remains a lasting reminder to the industry to dig a little deeper into any performance numbers they get before they publish them.

Steven Rostedt is a veteran Linux kernel developer, focusing on the Real Time Stable releases and PREEMPT_RT patches, with career experience spanning such industry leaders as Red Hat, VMWare and now Google.

Additional Speakers

  • Charity Majors, CTO, Honeycomb
  • Avi Kivity, CTO & Co-Founder, ScyllaDB
  • Alexey Ivanov, Site Reliability Engineer, Dropbox
  • Omar Elgabry, Software Engineer, Block
  • Alex Hidalgo, Principal Reliability Advocate at Nobl9
  • Vlad Ilyushchenko, Co-Founder and CTO of QuestDB
  • Piotr Sarna, Principal Software Engineer, ScyllaDB
  • Peter Zaitzev, CEO and Co-Founder of Percona
  • Pavlo Stavytski, Senior Software Engineer, Lyft
  • Oren Eini, CEO of RavenDB
  • Max De Marzi, Jr., Developer, RageDB
  • Song Yuying, Database Performance Engineer, PingCAP
  • Matthew Lenhard, CTO of ContainIQ
  • Mark Gritter, Founding Engineer, Akita Software
  • Marek Galovic, Staff Software Engineer, Pinecone
  • Marc Richards, Performance Engineer, Talawah Solutions
  • Konstantin Osipov, Director of Software Engineering at ScyllaDB
  • Malte Ubl, Chief Architect, Vercel
  • Leandro Melendez, DevRel, Grafana k6
  • Sabina Smajlaj, Operations Developer, Hudson River Trading
  • Jeffery Utter, Staff Software Developer, theScore
  • Henrix Rexed, Senior Staff Engineer, Dynatrace
  • Garrett Hamelin, Developer Advocate, LogicMonitor
  • Felipe Oliveira, Senior Performance Engineer, Redis
  • Felipe Huici, CEO and Co-Founder of Unikraft UG
  • Dmitrii Dolgov, Senior Software Engineer, Red Hat
  • Daniel Salvatore Albano, Senior Software Engineer II, Microsoft
  • Cristian Velazquez, Staff Site Reliability Engineer, Uber
  • Chen Huansheng, Database Performance Engineer, PingCAP
  • Pavel Emalyanov, Developer, ScyllaDB
  • Brian Likosar, Senior Solution Architect, StormForge
  • Blain Smith, Senior Software Engineer, StackPath
  • Armin Ronacher, Creator of Flask and Principal Architect, Sentry
  • Antón Rodríguez, Principal Software Engineer, New Relic

Follow us on Twitter to hear announcements such as details of their talks and to learn of any more speakers we add to our lineup.

Watch the Sessions from P99 CONF 2021

To get a taste of what our 2022 event will be like, you can catch up on any or all of last year’s sessions now available on demand.

WATCH THE 2021 SESSIONS ON-DEMAND

REGISTER NOW FOR P99 CONF 2022

How Palo Alto Networks Replaced Kafka with ScyllaDB for Stream Processing

Global cybersecurity leader Palo Alto Networks processes terabytes of network security events each day. They analyze, correlate, and respond to millions of events per second– many different types of events, using many different schemas, reported by many different sensors and data sources. One of their many challenges is understanding which of those events actually describe the same network “story” from different viewpoints.

Accomplishing this would traditionally require both a database to store the events and a message queue to notify consumers about new events that arrived into the system. But, to mitigate the cost and operational overhead of deploying yet another stateful component to their system, Palo Alto Networks’ engineering team decided to take a different approach.

This blog explores why and how Palo Alto Networks completely eliminated the MQ layer for a project that correlates events in near real time. Instead of using Kafka, Palo Alto Networks decided to use their existing low-latency distributed database (ScyllaDB) as an event data store and as a message queue – enabling them to eliminate Kafka. It’s based on the information that Daniel Belenky, Principal Software Engineer at Palo Alto Networks, recently shared at ScyllaDB Summit.

Watch On Demand

The Palo Alto Networks session from ScyllaDB Summit 2022 is available for you to watch right now on demand. You can also watch all the rest of the ScyllaDB Summit 2022 videos and check out the slides here.



Background: Events, Events Everywhere

Belenky’s team develops the initial data pipelines that receive the data from endpoints, clean the data, process it, and prepare it for further analysis in other parts of the system. One of their top priorities is building accurate stories. As Belenky explains, “We receive multiple event types from multiple different data sources. Each of these data sources might be describing the same network session, but from different points on the network. We need to know if multiple events – say, one event from the firewall, one event from the endpoint, and one event from the cloud provider– are all telling the same story from different perspectives.” Their ultimate goal is to produce one core enriched event that comprises all the related events and their critical details.

For example, assume a router’s sensor generates a message (here, it’s two DNS queries). Then, one second later, a custom system sends a message indicating that someone performed a log-in and someone else performed a sign-up. After 8 minutes, a third sensor sends another event: some HTTP logs.  All these events which arrived at different times might actually describe the same session and the same network activity.

Different events might describe the same network activity in different ways

The system ingests the data reported by the different devices at different times and normalizes it to a canonical form that the rest of the system can process. But there’s a problem: this results in millions of normalized but unassociated entries. There’s a ton of data across the discrete events, but not (yet) any clear insight into what’s really happening on the network and which of those events are cause for concern.

Palo Alto Networks needed a way to group unassociated events into meaningful stories
about network activity

Evolving from Events to Stories

Why is it so hard to associate discrete entries that describe the same network session?

  • Clock skew across different sensors:  Sensors might be located across different datacenters, computers, and networks, so their clocks might not be synchronized to the millisecond.
  • Thousands of deployments to manage:  Given the nature of their business, Palo Alto Networks provides each customer a unique deployment. This means that their solution must be optimized for everything from small deployments that process bytes per second to larger ones that process gigabytes per second.
  • Sensor’s viewpoint on the session:  Different sensors have different perspectives on the same session. One sensor’s message might report the transaction from point A to point B, and another might report the same transaction in the reverse direction.
  • Zero tolerance for data loss:  For a cybersecurity solution, data loss could mean undetected threats. That’s simply not an option for Palo Alto Networks.
  • Continuous out-of-order stream:  Sensors send data at different times, and the event time (when the event occurred) is not necessarily the same as the ingestion time (when the event was sent to the system) or the processing time (when they were able to start working on this event).

The gray events are related to one story, and the blue events are related to another story. Note that while the gray ones are received in order, the blue ones are not. 

From an application perspective, what’s required to convert the millions of discrete events into clear stories that help Palo Alto Networks protect their clients? From a technical perspective, the system needs to:

  1. Receive a stream of events
  2. Wait some amount of time to allow related events to arrive
  3. Decide which events are related to each other
  4. Publish the results

Additionally, there are two key business requirements to address. Belenky explained, “We need to provide each client a single-tenant deployment to provide complete isolation. And we need to support deployments with everything from several KB per hour up to several GBs per second at a reasonable cost.”

Belenky and team implemented and evaluated four different architectural approaches for meeting this challenge:

  • Relational Database
  • NoSQL + Message Queue
  • NoSQL + Cloud-Managed Message Queue
  • NoSQL, No Message Queue

Let’s look at each implementation in turn.

Implementation 1: Relational Database

Using a relational database was the most straightforward solution – and also the easiest to implement. Here, normalized data is stored in a relational database, and some periodic tasks run complex queries to determine which events are part of the same story. It then publishes the resulting stories so other parts of the system can respond as needed.

Implementation 1: Relational Database

Pros

  • The implementation was relatively simple. Palo Alto Networks deployed a database and wrote some queries, but didn’t need to implement complex logic for correlating stories.

Cons

  • Since this approach required them to deploy, maintain and operate a new relational database in their ecosystem, it would cause considerable operational overhead. Over time, this would add up.
  • Performance was limited since relational database queries are slower than queries on a low-latency NoSQL database like ScyllaDB.
  • They would incur higher operational cost since complex queries require more CPU and are thus more expensive.

Implementation 2: NoSQL + Message Queue

Next, they implemented a solution with ScyllaDB as a NoSQL data store and Kafka as a message queue. Like the first solution, normalized data is stored in a database – but in this implementation, it’s a NoSQL database instead of a relational database. In parallel, they publish the keys that will later allow them to fetch those event records from the database. Each row represents one event from different sources.

Implementation 2: NoSQL + Message Queue

Multiple consumers read the data from a Kafka topic. Again, this data contains only the key – just enough data to allow those consumers to fetch those records from the database. These consumers then get the actual records from the database, build stories by determining the relations between those events, and publish the stories so that other system components can consume them.

Why not store the records and publish the records directly on Kafka? Belenky explained, “The problem is that those records can be big, several megabytes in size. We can’t afford to run this through Kafka due to the performance impact. To meet our performance expectations, Kafka must work from memory, and we don’t have much memory to give it.”

Pros

  • Very high throughput compared to the relational database with batch queries
  • One less database to maintain (ScyllaDB was already used across Palo Alto Networks)

Cons

  • Required implementation of complex logic to identify correlations and build stories
  • Complex architecture and deployment with data being sent to Kafka and the database in parallel
  • Providing an isolated deployment for each client meant maintaining thousands of Kafka deployments. Even the smallest customer required two or three Kafka instances

Implementation 3: NoSQL + Cloud-Managed Message Queue

This implementation is largely the same as the previous one. The only exception is that they replaced Kafka with a cloud-managed queue.

Implementation 3: NoSQL + Cloud-Managed Message Queue

Pros

  • Very high throughput compared to the relational database with batch queries
  • One less database to maintain (ScyllaDB was already used across Palo Alto Networks)
  • No need to maintain Kafka deployments

Cons

  • Required implementation of complex logic to identify correlations and build stories
  • Much slower performance when compared to Kafka

They quickly dismissed this approach because it was essentially the worst of both worlds: slow performance as well as high complexity.

Implementation 4: NoSQL (ScyllaDB), No Message Queue

Ultimately, the solution that worked best for them was ScyllaDB NoSQL without a message queue.

Implementation 4: NoSQL, No Message Queue

Like all the previous solutions, it starts with normalized data in canonical form ready for processing, then that data is split into hundreds of shards. However, now the records are sent to just one place: ScyllaDB. The partition key is shard-number, allowing different workers to work on different shards in parallel. insert_time is a timestamp with a certain resolution – say up to 1 second. The clustering key is event_id, and that’s used later to fetch dedicated events.

Belenky expanded, “We have our multiple consumers fetching records from ScyllaDB. They run a query that tells ScyllaDB, ‘Give me all the data that you have for this partition, for this shard, and with the given timestamp.’ ScyllaDB returns all the records to them, they compute the stories, and then they publish the stories for other parts or other components in the system to consume.”

Pros

  • Since ScyllaDB was already deployed across their organization, they didn’t need to add any new technologies to their ecosystem
  • High throughput when compared to the relational database approach
  • Comparable performance to the Kafka solution
  • No need to add or maintain Kafka deployments

Cons

  • Their code became more complex
  • Producers and consumers must have synchronized clocks (up to a certain resolution)

Finally, let’s take an even deeper dive into how this solution works. The right side of this diagram shows Palo Alto Networks’ internal “worker” components that build the stories. When the worker components start, they query ScyllaDB. There’s a special table, called read_offsets, which is where each worker component stores its last offset (the last time stamp that it reached with its reading). ScyllaDB then returns the last state that it had for each shard. For example, for shard 1, the read_offset is 1000. Shards 2 and 3 have different offsets.

Then the event producers run a query that inserts data, including the event_id as well as the actual payload, into the appropriate shard on ScyllaDB.

Next, the workers (which are continuously running in an endless loop) take the data from ScyllaDB, compute stories, and make the stories available to consumers.

When each of the workers is done computing a story, it commits the last read_offset to ScyllaDB.

When the next event arrives, it’s added to a ScyllaDB shard and processed by the workers…then the cycle continues.

Final Results

What were their final results? Belenky summed up, “We’ve been able to reduce the operational cost by a lot, actually. We reduced the operational complexity because we didn’t add another system – we actually removed a system [Kafka] from our deployment. And we’ve been able to increase our performance, which translates to reduced operational costs.”

Want to Learn More?

ScyllaDB is the monstrously fast and scalable database for industry gamechangers. We’d love to chat with you if you want to learn more about how you can use ScyllaDB in your own organization. Contact us directly, or join the conversation with our Slack community.