Scylla Summit 2021 Speakers Announced

Summit Speaker Announcement blog post

Scylla Summit is the annual opportunity for our community, our partners, our engineers and staff to all meet, mingle and learn from each other. Of course, this year Scylla Summit will be a virtual event, January 12-14 2021. That means no time lost for travel, no need to choose from multiple tracks, and no cost for the event — not even for our Training Day.

REGISTER NOW

For those who have not attended in the past, consider these five reasons to join us:

  1. Learn the latest innovations to take back to your team
  2. Network with the best brains in the industry
  3. Get trained at our online Training Day (no charge for training this year!)
  4. Influence our roadmap
  5. Hear use cases, tips and tricks, optimizations and best practices from your colleagues

A Few Sessions of Note

  • Keynotes from ScyllaDB Co-Founders Dor Laor and Avi Kivity
  • Confluent on Streaming Data from Scylla to Kafka — Tim Berglund will share how you can combine the power of Scylla and Confluent to stream data straight from Scylla tables into Kafka topics.
  • AWS Outposts — Rob Czarnecki will highlight the capabilities of deploying Scylla Cloud and other applications to an on-premises location via Amazon’s fully managed service extension, AWS Outposts.
  • Jepsen Test Results for Scylla — Author and consultant Kyle Kingsbury (AKA, “Aphyr”) will describe how Scylla’s LWT implementation fared in partitioned state testing.

Your Big Data Peers

  • Expedia Group: Migrating from Cassandra to Scylla — Expedia Group will share their experience on what it took to move from Apache Cassandra to Scylla, and the benefits it brought them.
  • GE Healthcare: A Revolution in Healthcare — GE Healthcare needed to provide hospitals with on-site databases to support medical imaging. Learn how they used our Alternator interface to seamlessly make the shift from DynamoDB to Scylla.
  • Grab: Grab at Scale with Scylla — What does it take to make Southeast Asia’s most widely used “superapp” so successful at scale? Find out how Grab continues to evolve using Scylla to support the daily meals, rides and payments of over 160 million users.
  • Hotstar: Scaling Video On-Demand with Scylla — Hotstar is India’s premier streaming service, now a subsidiary of The Walt Disney Company. Learn how they scaled to meet the most demanding needs of their 300 million active users.
  • IOTA Foundation: Implementing a NoSQL Database in a Distributed Ledger Service — Learn how the IOTA Foundation uses Scylla as the persistent datastore at the heart The Tangle, their feeless, minerless distributed ledger technology.
  • Mail.Ru: High-load Storage of Users’ Actions with ScyllaDB and HDDs — Mail.ru is Russia’s largest email provider. Find out how they were able to use good old fashioned Hard Disk Drives (HDDs) as a performant, affordable datastore for their 47 million users’ email.
  • Numberly: Getting a Shard-Aware Python Driver Faster — Alexys Jacob, CTO of Numberly, will share how all shard-aware drivers achieve greater performance while developing on their current limitations, which Scylla 4.3 is going to unlock for even greater performance and reduced latency.
  • Ola Cabs: Real-Time Intelligent Decision Making Using Scylla — Ola Cabs is one of Scylla’s earliest users. They’ve grown their business around our technology. Learn how they continue to push the envelope with Scylla.
  • Expero & QOMPLX: Using ScyllaDB with JanusGraph for Cybersecurity — QOMPLX and Expero will explain how they use ScyllaDB and JanusGraph to detect, manage and assess risks for large corporate and government clients.
  • Ticketmaster: Handling High Loads with Ticketmaster — When massive public events go on sale, it is like Black Friday and Cybermonday combined. Learn how Ticketmaster leverages Scylla to process 150 million transactions in minutes.

In addition, our own engineers will present the latest features, optimizatons and capabilities of Scylla, including our Kubernetes operator, production-ready Change Data Capture (CDC), updated Kafka Scylla Connector, and more!

You can keep abreast of more speakers and their sessions on Scylla Summit site, which we update frequently. Full agenda coming soon!

Database Monsters of the World, Connect!

Make sure you sign up today and add a reminder to your calendar for January 12-14 2021.

REGISTER NOW FOR SCYLLA SUMMIT 2021

The post Scylla Summit 2021 Speakers Announced appeared first on ScyllaDB.

Scylla University: Coding with Scala, Part 2

This post is based on the Scylla Specific Drivers, Overview, Paging and Shard Awareness lesson in Scylla University, ScyllaDB’s free resource to learn NoSQL database development and Scylla administration. You can find the lesson here in Scylla University.

In a previous lesson, we explained how to create a sample Scala application that executes a few basic CQL operations with a Scylla cluster using the Phantom Scala driver. We glossed over the details of how to define our tables and databases. This lesson will delve further into those definitions and explore how our applications should be structured using Phantom’s abstractions.

Modeling Tables with Phantom

The Phantom driver provides facilities for mapping Scylla tables to actual data types we can work with. This approach’s immediate benefit is avoiding “stringly-typed” programming: that is, instead of using CQL strings embedded in our code to describe the operations we perform, we use rich types and functions to construct the operations.

This guarantees, for example, that if we’ve declared the firstName column on a table as a text column, we would only be able to compare it to String values in WHERE predicates.

Before we declare our table, it’s necessary to declare a data type that will hold a row from the table. In our case, we’re using the Mutant case class we’ve seen in the previous lesson. (if you haven’t downloaded the code yet, do so by following the instructions in the section “Set-up a Scylla Cluster” there):

A table can be declared by extending the Table class, as seen in the Mutants.scala file:

We extend the Table class and fix its type parameters: the first type parameter is actually a reference to the table subclass itself (Mutants), and the second is the type of the row. The tableName field can be optionally overridden, like we’ve done, to specify the table’s name in Scylla. By default, Phantom will use the subclass’s name in lowercase letters (mutants, in this case).

Within the class definition, we list several object definitions – each one corresponds to the table’s different columns. The objects extend classes that specify the type of the column: StringColumn, DoubleColumn, UUIDColumn, etc. The TableAliases type in Phantom contains all the possible column types. The objects can also, optionally, extend additional traits that specify properties of the columns. For example, the firstName column is part of the table’s partition key, and as such, it extends the PartitionKey trait.

The Mutants class is purposely defined as abstract, as we will not be instantiating it directly, but rather through a Database class.

Modeling Databases with Phantom

Databases in Phantom are several tables from the same keyspace bundled into one containing class. In our application, we’ve defined the MutantsDatabase class (found in the MutantsDatabase.scala file) as such:

First, note the connector constructor parameter: to construct the database, it must be provided with a connection to the cluster. The class extends the Database class and provides it with the connection.

In the definition of our database, we specify an object for every table we want to make available. The mutants object we define extends the Mutants class we defined earlier and mixes in the Connector trait. This trait injects the database’s connection into the table instance.

Exposing a Higher-level Interface

At this point, we can instantiate the database with a connection as follows:

We could use the db object directly throughout our application, but that’d be sub-optimal, as every module of our application would be able to access any of the tables defined on the database. Not a great way to architect our service! Instead, we want to define higher-level interfaces with focused responsibilities: the MutantsService class is an example of that.

Here’s a part of its definition:

The service uses the database as a constructor parameter, but only exposes specific methods for accessing (and mutating) the data. Funneling all Mutant-related data access through this service ensures we won’t have to refactor too much of our service should we want to change the data representation in Scylla in the future.

Conclusion

In this lesson, we delved into the table and database abstractions provided by Phantom and described their role in structuring our application. We saw how using these abstractions, along with focused service interfaces and classes, leads to well-architected services. You can keep experimenting with these abstractions and continue to make your applications more modular.

You can find more lessons on combining the power of our NoSQL database with your favorite programming languages in Scylla University. Sign up now! It’s totally free.

SIGN UP FOR SCYLLA UNIVERSITY

This lesson was written with the help of Itamar Ravid. Itamar is a Distributed systems engineer with a decade of experience in a wide range of technologies. He focuses on JVM-based microservices in Scala using functional programming. Thank you, Itamar!

The post Scylla University: Coding with Scala, Part 2 appeared first on ScyllaDB.

Announcing the Astra Service Broker: Tradeoff-Free Cassandra in Kubernetes

Scylla Student Projects, Part III: Kafka Client for Seastar and Scylla

 

Apache Kafka is a well-known distributed message queue, used by thousands of companies. Developers love its scalability, performance, fault tolerance, and an entire ecosystem of clients, connectors, and tools built around it. One relevant use case is the ingestion of data from databases to a Kafka cluster, from where it is easily available to other systems.

In this post, I will describe the details of our Kafka client for Seastar and Scylla, which we created as our final year student project, sponsored by ScyllaDB. As a member of the student team, I’ll take you on a journey throughout our graduation year, and show how we developed this client. Finally, you will see how we used it to write a Scylla service to stream Change Data Capture (CDC) events to Kafka.

Intro: Student Projects

In 2019, ScyllaDB sponsored a program for Computer Science students organized by the University of Warsaw. Throughout the whole academic year, 3 teams of undergraduate students collaborated with and learned from ScyllaDB engineers to bring new features to Scylla and its underlying Seastar engine. The projects picked for the 2019 edition were:

  • Parquet support for Seastar and Scylla (see Part I)
  • SeastarFS: an asynchronous userspace file system for Seastar (see Part II)
  • Kafka client for Seastar and Scylla (this post)

This blog post describes the Kafka client project. The students responsible for the project were Wojciech Bączkowski, Piotr Grabowski (the author of this article), Michał Szostek, and Piotr Wojtczak. Our supervisor was Michał Możdżonek, M. Sc. The project was the foundation for the Bachelor’s thesis of the students who took part in it.

Animation 1: Our graduation year journey

Project timeline. In the animation above you can see the timeline of our project. Throughout this blog post, we describe many stages of our development process, share how we planned our work, and implemented many great features. Soon you will learn in great detail about all of the topics that are presented (chronologically) on this timeline.

Apache Kafka: Core Concepts

Apache Kafka is a distributed message queue. You can send and receive messages from Kafka, which consist of a key and value.

Topics. To organize the messages in a Kafka cluster, each message is bound to a particular topic – a distinct and separate message queue. For example, you can have a topic called “notifications” storing push notifications to be sent to your mobile users and a separate “sign-in” topic to persist information about your users logging in to the website.

Brokers and partitions. As Kafka is a highly-scalable system, there has to be a way to scale a topic to many Kafka brokers (nodes). This aspect is achieved through partitions. A single Kafka topic consists of one or more partitions, which can be spread across multiple brokers. Depending on the chosen partitioning strategy, messages will be split across different partitions based on the message key, with each partition belonging to one specific broker.

Clients. There are two main types of Kafka clients – producers and consumers. Producers allow you to send messages, while consumers receive messages from the Kafka cluster. During our student project, we created a producer for Scylla.

Our Goals and Feature Set

At the start of our graduation year of bachelor studies, we sat down with ScyllaDB to discuss the project and outline the common goals. Together, we talked about the necessary features of the Kafka client, the scope of the project, and how it could be used in Scylla. The three core goals for the Kafka client we identified were: performance, correctness, and support for a wide range of features.

Feature set. After an academic year of working on the project, we successfully implemented a Kafka producer, offering:

  • Future-based API built on top of Seastar
  • High performance
  • Proper error handling and resiliency
  • Retries, with configurable backoff strategies (exponential backoff with jitter as default)
  • Partitioners (multiple implemented out-of-the-box)
  • Batching
  • ACK (configurable number of acknowledgments to wait to receive after sending a message)
  • Compatibility with older Kafka clusters (by supporting many versions of the protocol)
  • Support for multi-broker clusters

ScyllaDB also suggested writing an example Scylla feature that leveraged our Kafka client. We settled on implementing a proof-of-concept Scylla service that replicates Change Data Capture (CDC) log to a Kafka topic.

Why Seastar?

There are multiple Kafka clients already available, librdkafka being the most popular library for C++. However, using an existing library would cause a similar problem as in the Parquet project: latency.

Seastar design. Scylla is built on top of Seastar, an asynchronous high-performance C++ framework. Seastar was created following the shared-nothing principle and it has its own non-blocking I/O primitives, schedulers, priority groups, and many other mechanisms designed specifically for ensuring low latency and most optimized hardware utilization.

In the Seastar world, issuing a blocking system call (like read()) is an unforgivable mistake and a performance killer. It also means many libraries that rely on traditional, blocking system calls would create such performance regressions when used in a Seastar-based project — and librdkafka C++ implementation was not an exception in that matter. Therefore we decided to write our client from scratch and use the networking capabilities available in Seastar.

Alternative clients. At first, we also considered modifying some existing libraries to take advantage of Seastar, however, we quickly realized that the design of those libraries was not compatible with Seastar and Scylla philosophy. For example, librdkafka internally creates one thread per broker in the Kafka cluster. This can cause expensive cross-core data transfers (especially on CPUs with multiple NUMA nodes). That’s a stark difference compared to Scylla, which utilizes a single thread per CPU core and runs small sub-millisecond-sized tasks using a custom user-space scheduler.

Animation 2: Difference between librdkafka and our client.
Notice how librdkafka does expensive cross-core data transfers.

Seastar goodies. Moreover, Seastar offered us many useful features, for example, DPDK support – a network backend running in userspace, allowing for lower overhead networking. By designing our API to be based around futures, we could offer the users of our library an intuitive interface, immediately familiar to other Scylla and Seastar developers. By adopting the shared-nothing principle (not sharing the data between cores) there was no need to use locks or other synchronization primitives, which dramatically simplified the development process.

The Kafka protocol

To write a good Kafka client, we knew that we had to learn both the high-level concepts of how the client should work, as well as lower-level intricacies of its communication: the protocol.

Kafka uses a TCP binary protocol. It is based on the request/response paradigm (client sending the request, Kafka broker responding with one response).

Message types. The protocol defines many message types, such as ApiVersions with information about supported message versions by the broker (more on that later); Metadata with the current list of brokers, topics, and partitions in the cluster; Produce which allows for sending a message to some particular topic.

Versioning. As Kafka is actively developed and there are new features added continuously, the protocol must provide a way to be extended with new functionality. Moreover, it has to be done in a way that maintains compatibility with older clients.

The protocol deals with that challenge by using protocol versions. Each message type is individually versioned. The client, after connecting with the broker, sends an ApiVersions request. The broker sends back an ApiVersions response, which contains the information about supported versions for each message type. The client can now use this information about versions, to send requests in versions that both client and broker can support.

Let’s look at a concrete example: the Produce message has 9 versions defined. When transaction support was added to Kafka, there was a need to pass transaction ID alongside Produce request. This additional field was added in version 3 of this message type.

Handling Errors

One of the key features of Kafka is its fault-tolerance and resiliency. While exploring the design of existing Kafka clients, we quickly discovered that achieving this goal is the responsibility of both Kafka brokers and clients.

Error codes. In distributed systems such as Kafka, which can operate on hundreds of nodes, there are a lot of failure scenarios that must be handled correctly. From the client perspective, the protocol informs about those errors by sending the right error code.

It was our job, as Kafka client developers, to implement the correct behavior upon receiving an error code. Those errors can be split into three separate groups.

Non-recoverable errors. The first group of error codes we identified was non-recoverable errors: for example after receiving SASL_AUTHENTICATION_FAILED (authentication error), the correct behavior is to stop the client and throw an exception informing about authentication problems.

Retriable errors. Another group of errors is retriable errors. Most common in this group is NETWORK_EXCEPTION, which can occur in case of a broken connection or a timeout. Using our exponential backoff strategy, we retry those requests, as those network problems might be temporary.

Metadata refresh. The last group of error codes is those that require metadata refresh. The client maintains the local copy of the information about the cluster metadata – what are the addresses of nodes, topics, and their partitions. When producing a new message, Kafka requires sending it to a leader of a partition. To determine the node to dispatch the request to, we use this local metadata.

However, this metadata can get stale and we will send the request to the incorrect node. In such a case, we will get a NOT_LEADER_FOR_PARTITION error code and our client recognizes that metadata needs to be refreshed by sending the Metadata request to the Kafka cluster. Upon receiving updated metadata, we can try sending the message again.

Batching

Batching is a simple technique that improves bandwidth and decreases overhead. By sending many messages in one request (one batch) we don’t have to send the metadata about messages multiple times and the overhead of protocol (Kafka protocol, TCP/IP) is smaller relative to the single message. Kafka protocol supports this feature by allowing to send multiple messages in a single Produce request.

Our implementation. We implemented this feature by adding a separate queue for the messages waiting to be sent. At some point in time, this queue will be flushed, which results in the Produce requests to be sent to the Kafka cluster. There are a few ways to control the message flushing. Firstly you can manually call the flush() method on the producer. Another way to control this behavior is by setting the _buffer_memory option, which controls the maximum cumulative length of messages in the waiting queue. Finally, the client automatically periodically flushes the queue (period controlled by _linger option).

Optimization

One of the initial goals of our project was to write a client that was feature-rich, correct, and performant. By making sure that we designed our code with performance in mind from day one, utilizing the Seastar framework that is the foundation of Scylla’s high performance, we were confident that we were on the right track.

The approach. At first, our client was much slower than a first-party Java Kafka client. To fix the performance of our solution, we have performed multiple profiling cycles, every time identifying issues and fixing them. Some fixes we applied initially were trivial, but to squeeze every last drop of performance we later focused on algorithmic and low-level improvements. As you will see in the benchmark section we more than succeeded in our optimization endeavor.

Profilers. For profiling, we used the perf tool and a great open-source Flamegraph tool. In each profiling cycle, we identified the biggest performance problem based on flame graphs and fixed it.

Data copying. When developing the client, we tried to avoid any unnecessary copying of data. However, we missed a few spots here and there. They were immediately visible on the flame graph and fixing them proved to be very easy – sometimes as easy as changing passing some variable to be by constant reference instead of by value.

Screenshot 1: Annotated flame graph of our client at some initial optimization stage

In one such case, forgetting about using references resulted in cluster metadata being copied on every message sent. It might not seem like a big issue, but when sending millions of messages per second, this single problem was a big part of the overhead.

Memory allocations. Another memory-related performance issue was memory allocation. While serializing the protocol messages there were a few temporary buffers, some of them allocated for every message sent. By declaring them as (static) thread_local, we saved millions of allocations that were executed in the course of our benchmarks.

Algorithmic improvements. Some of the fixes required looking at the problems from an algorithmic perspective. One of the operations we execute for each message is looking up metadata to determine the address of the broker to dispatch the Produce request to. Initially, this lookup was done linearly (finding the topic in topic metadata, then a partition in partitions metadata). Afterward, we added a pre-processing step to sort the topics and partitions, which allowed us to perform a binary search in the lookup function. Our final approach was to use unordered_map, which proved to be most efficient in our benchmarks.

Profiler – be careful! When using a profiling tool such as flame graphs, you need to be aware of its limitations or quirks. One issue that was not instantly obvious to us, based on just looking at the flame graph (as they show on-CPU activity), was network socket configuration. Because we do manual batching of messages and we wait for the batch to flush, every millisecond in that process counts. Using the NO_DELAY option on the socket, which disables Nagle’s algorithm, made a big difference in our benchmarks as it reduced the latency between issuing the write on the socket and receiving the response from the Kafka cluster.

SIMD. Another issue that we found was inefficient CRC32 calculation. This checksum is a mandatory value to be sent alongside messages. Initially, to get the client running, we used the Boost library implementation, which uses a lookup table that is applied to each byte. As this code was probably inlined, it was not presented on the flame graph as a separate function, hiding its impact on the performance.

We switched to the more efficient implementation, which takes advantage of SSE4.2 crc32 instruction. By using this instruction, we were able to process 8 bytes in 3 cycles (latency, excluding memory access) compared to byte-by-byte lookups of Boost implementation. In Scylla, this approach is even more advanced, as it takes advantage of being able to start executing crc32 instruction every cycle (throughput of 1).

Benchmarks

As the performance of our client was one of our top priorities, we needed to design a benchmark comparing our solution with other Kafka clients. During our optimization process having a reliable benchmark proved to be indispensable.

In the course of the development, we tested the client on our local machines. However, when we started measuring the performance, we realized that we had to run it on much beefier machines and in a multi-broker environment to get more realistic results, closer to real-life deployments.

Benchmark environment. We decided to run our benchmarks on AWS. We used m5n.2xlarge instances, configured in a “cluster” placement group (to reduce the latency). Those instances have 8 logical processors (Xeon Cascade Lake, 3.1GHz sustained boost) and 32GB of RAM. We equipped them with gp2 SSDs and ran them in 1-broker and 3-broker Kafka cluster configuration. The Zookeeper server and instance with tested clients were started on separate nodes of the same type.

Benchmarked clients. We compared our client with two leading libraries: the first-party Java client, maintained by Apache Kafka project, and librdkafka – a popular C++ client.

Test methodology. To test them all uniformly, we implemented a simple benchmark routine. In a given benchmark case, we send some number of messages of a certain size and manually flush the batch when it is full and wait for it to finish processing. We are aware that this methodology might not play to the strengths of some clients, but we tried hard to tune the options of every tested library to achieve the best performance in each one of them.

Let’s get to the results!

1-node cluster, sending 80 million messages (each 10 bytes), batched in groups of 1,000

Client Time(s) Messages per second Throughput (MB/s)
Our client 53.40 s 1,498,127 msg/s 14.20 MB/s
Java 70.83 s 1,129,464 msg/s 10.77 MB/s
librdkafka (C++) 79.72 s 1,003,512 msg/s 9.57 MB/s

1-node cluster, sending 80 million messages (each 100 bytes), batched in groups of 1,000

Client Time(s) Messages per second Throughput (MB/s)
Our Client 81.68 s 979,431 msg/s 93.40 MB/s
Java 95.46 s 838,047 msg/s 79.92 MB/s
librdkafka (C++) 92.84 s 861,697 msg/s 82.17 MB/s

1-node cluster, sending 8 million messages (each 1,000 bytes), batched in groups of size 100

Client Time(s) Messages per second Throughput (MB/s)
Our client 38.51 s 207,738 msg/s 198.11 MB/s
Java 43.52 s 183,823 msg/s 175.30 MB/s
librdkafka (C++) 41.24 s 193,986 msg/s 184.99 MB/s

1-node cluster, sending 80 million messages (each 50 bytes), batched in groups of size 10,000

Client Time(s) Messages per second Throughput (MB/s)
Our client 60.52 s 1,321,877 msg/s 63.03 MB/s
Java 77.01 s 1,038,826 msg/s 49.53 MB/s
librdkafka (C++) 61.07 s 1,309,972 msg/s 62.46 MB/s

3-node cluster, sending 78 million messages (each 10 bytes), batched in groups of size 3,000

Client Time(s) Messages per second Throughput (MB/s)
Our client 38.96 s 2,002,053 msg/s 19.09 MB/s
Java 57.68 s 1,352,288 msg/s 12.89 MB/s
librdkafka (C++) 81.09 s 961,894 msg/s 9.17 MB/s

3-node cluster, sending 100 million messages (each 100 bytes), batched in groups of size 5,000

Client Time(s) Messages per second Throughput (MB/s)
Our client 69.59 s 1,436,988 msg/s 137.04 MB/s
Java 90.49 s 1,105,094 msg/s 105.39 MB/s
librdkafka (C++) 98.67 s 1,013,479 msg/s 96.65 MB/s

3-node cluster, sending 10 million messages (each 1,000 bytes), batched in groups of size 2,000

Client Time(s) Messages per second Throughput (MB/s)
Our client 29.43 s 339,789 msg/s 324.04 MB/s
Java 30.86 s 324,044 msg/s 309.03 MB/s
librdkafka (C++) 31.17 s 320,821 msg/s 305.95 MB/s

Results analysis. As you can see from our benchmarks, our client is faster than both the Java client and librdkafka in all cases. When sending 78 million 10-byte messages to a 3-node cluster, our client smashed the two million messages per second barrier, being significantly faster than the competition. On a test with larger messages (1000 bytes large), we were able to get 300+ MB/s throughput, all on a single core!

However, looking at the raw data, it might be a little intimidating to fully grasp it. To make it easier, let’s group it by some aspect and chart the groups. Let’s start by organizing the results by the number of brokers in the cluster:

Chart 1: Execution time geometric mean (in seconds; smaller is better), tests grouped by cluster configuration

In our tests, librdkafka was faster than the Java client in a single-broker scenario, while it lagged in the three-broker configuration. Our solution was faster than the others in both one-broker and three-broker configurations.

Our benchmarks measured workloads with very small messages, as well as larger messages (10 bytes, 100 bytes, 1,000 bytes). The results, when grouped by this aspect:

Chart 2: Normalized geometric mean of execution time (smaller is better), tests grouped by message size in bytes

As you can see our client is faster, especially when sending small messages. However, this performance difference quickly shrinks when switching to larger messages (100 B or 1000 B). Even though in the 1000 B case this difference is much smaller, we are still able to achieve 6-8% speedup.

Example Use Case: Scylla CDC Replication Service

We designed our Kafka client to work well with Scylla, so our next step was obvious – implement a new Scylla functionality that used our client. We decided to marry our Kafka client with Scylla’s Change Data Capture (CDC) feature.

Change Data Capture (CDC) is a feature that allows users to track data changes in their Scylla database. When CDC is enabled on a table (base table), a new CDC log table is created. It allows you to query the history of all changes made to the table, not only the current state of the table that is stored in the base table. You can learn more about this feature in our “Using Change Data Capture (CDC) in Scylla” blogpost.

New Scylla service. To showcase how our client could be used with the CDC feature, we created a new Scylla service that streams data changes to Apache Kafka. We use the CDC log table to query the latest changes that happened in the table, which can be queried efficiently thanks to the CDC design. Later, the service serializes those changes to Apache Avro format and sends it to Kafka using our client.

Having the CDC events streamed to Kafka opens up a breadth of possibilities to use this data to easily build more complex systems. One possibility that seemed like a perfect match was using ScyllaDB Sink Connector. It is a Kafka Connect connector, which replicates messages from Kafka topic to some Scylla table. It supports the Avro format that we implemented in our service, which allows us to maintain proper schema information.

New opportunities. By using those systems in conjunction, we have effectively set up an asynchronous Scylla-to-Scylla table replication. This shows how our service and Kafka, which are relatively general-purpose, could enable new and interesting ways to use the data from your Scylla cluster. Moreover, it allowed us to easily test our service by checking if the tables on the source and destination Scylla clusters contain the same data after the replication has finished.

Future plans. Even though our Scylla service was created as a proof-of-concept it showed great potential. ScyllaDB is now hard at work to bring this functionality as a Kafka connector. Stay tuned!

Wrapping it up

All of the source code for the Kafka client is available at https://github.com/haaawk/seastar-kafka-client and for modified Scylla with our service at https://github.com/haaawk/scylla/tree/cdc_kafka_integration.

The thesis. The project was the foundation for the Bachelor’s thesis of the students who took part in it. The thesis was already reviewed and accepted by the University of Warsaw and is public to read. You can find a detailed description of the design, goals, performed tests, and results in this document, in Polish: bachelor_thesis.

The end of the beginning. So that wraps up our “Scylla Student Projects” series for 2020. ScyllaDB is very proud to have helped the students in their graduation year of Bachelor studies. We’re very pleased to announce that four students, who have collaborated with us, have joined ScyllaDB.

Personally speaking, as a former member of the Kafka student team, working with ScyllaDB was a great experience for me. As avid C++ fans, we were initially drawn by the fact that ScyllaDB stays on the bleeding edge of the C++ standard – using the features from the latest standard that were just implemented in compilers. We also saw this as an opportunity to explore the areas of computer science that very few companies work on. From the first day, we saw how people working at ScyllaDB were passionate and willing to help us with any problems we encountered.

Stay tuned! As the new academic year has just recently started, ScyllaDB is eager to work with new students this year. Stay tuned for a post coming in one year to read how the next projects turned out!

A note to educators: ScyllaDB is committed to helping challenge and guide the next generation of computer scientists. Besides our projects at the University of Warsaw, we recently held a virtual talk at Carnegie Mellon University. If you would like to see something similar conducted at your own institution please contact us directly.

The post Scylla Student Projects, Part III: Kafka Client for Seastar and Scylla appeared first on ScyllaDB.

Scylla Manager 2.2: Repair Revisited

We released Scylla Manager 1.0 to help our users manage repairs in February 2018. Since then a lot of things have changed. We added the row-level repair to Scylla Open Source Scylla, and to the Enterprise version this year. We just released Scylla Manager 2.2 with a new repair method optimized for row-level repair.

New repair features in Scylla Manager 2.2

Parallel repairs

Scylla Manager chooses the optimal number of parallel repairs for a keyspace. This is beneficial for big clusters. For example, given a 12 node cluster and a keyspace with replication factor 3, it can be repaired up to 4 times faster than with Scylla Manager 2.1. This is done by repairing distinct replica sets in a token ring in parallel. Scylla Manager ensures that each node takes part in at most one Scylla repair job at all times.

The following diagram shows a model of how token ranges are replicated in a token ring.

Token ranges 11-20 are replicated by nodes N1, N2, N3. When they are repaired we can still repair token ranges 41-50 replicated by nodes N4, N5, N6 and token ranges 71-80 replicated by nodes N7, N8, N9. We could also repair ranges 101-110 on nodes N10, N11, and N0. Otherwise Node N0 is idle; we can only repair token ranges 1-10 when nodes N1, N2 (and N3) are done repairing. The process continues until the whole ring is repaired.

The parallel limit can be described by the following formula:

max_parallel = floor(# of nodes / keyspace RF)

In a multi DC setting the keyspace RF in the above formula is a sum of replication factors in different DCs.

Adjustment of repair intensity to a node capabilities

Repair intensity feature was added in a previous version of Scylla Manager. It lets you specify how many token ranges (per shard) to repair in a single Scylla repair job. For Scylla clusters that do not support row-level repair, intensity can also be a decimal between (0,1). In that case it specifies percent of shards that can be repaired in parallel on a repair master node.

Scylla Manager 2.2 adds support for intensity=0. In that case the number of token ranges is calculated based on node memory and adjusted to the Scylla maximal number of ranges that can be repaired in parallel (see max_repair_ranges_in_parallel in Scylla logs). If you want to repair faster, try setting --intensity 0.

Changing the repair speed in flight

In Scylla Manager 2.2 we added two new sctool repair subcommands: sctool repair update and sctool repair control. The former replaces the task update command and extends it with the capabilities to update any repair parameter in an existing repair task. The latter allows you to change the intensity and parallelism as you repair without ever restarting the task. In contrast to the update command the control command does not persist the changes for future runs. The current values for intensity and parallel can be checked in sctool task progress.

Example:

Run the following command to adjust the number of token ranges per Scylla repair job to the maximum supported (in parallel) by a repair master node.

sctool repair control -c prod-cluster --intensity 0

Support for schema changes during repair

In Scylla Manager 2.2 repair goes table by table. This has many benefits. On a node Scylla repair jobs need to read fewer files, and we keep coming back to the same files before we move on to the next table. When there is a new table added, it is not a concern of Scylla Manager since the list of tables to repair is created in the repair init phase. When a table is deleted during repair, Scylla Manager gracefully handles that and moves on to the next table.

Small table optimization

Tables that contain less than 1G of data would be repaired in a few Scylla repair jobs, one per replica set. This saves time on Scylla repair job creation and status checking. The small table threshold can be changed as a repair command parameter.

Graceful stop

Whenever a user decides to stop a repair there is a grace period to wait for the ongoing repair jobs before killing them. The duration of the grace period by default is 30s, the default can be changed in the configuration file.

New sctool repair progress

Repair progress and duration is displayed per table. In detailed view users may now see the actual repair progress of a keyspace or a table on each node.

Benchmark
In the benchmark we run 9 Scylla 2020.1 nodes on AWS i3.2xlarge machines. Each node has 8 cores, 61GB of memory, and holds approximately 1.8TB of data. During the tests the nodes are 50% loaded with a read workflow, and 10% of data needs to be repaired, all the nodes have missing data. The following dashboard shows the system in the stabilisation phase before repair starts.

The following chart presents effects of changing the parallel values between full parallelism (default), two parallel threads and one parallel thread.

The following chart presents effects of changing intensity while running at full parallelism. One can observe that when the system is loaded adding more repairs does not speed things up too much but it’s still in the range of 10%.

If we take out parallel from the equation changing intensity from 1 (default) to 2 gives 20% improvement. On an idle cluster intensity is more relevant.

Summary

Scylla Manager 2.2’s parallel repair mechanism is a great supplement to Scylla row-level repair. Give it a try today! You can download it from the Scylla Download Center. Also, if you are interested in learning more about Scylla Manager and repairs, check out this free lesson in Scylla University!

The post Scylla Manager 2.2: Repair Revisited appeared first on ScyllaDB.

Using Grafana Loki 2.0 as an Alert Source

Grafana Loki is Grafana’s tool for log aggregation and reporting. Loki 2.0 was just released with many enhancements so I took it for a spin. One of the nice things with the new release is the ability to create custom alerts.

In this blog-post I will cover the following topics:

  • using Loki with rsyslog
  • configuration from a file
  • alert generation
  • logs transformation while querying (it’s great, so wait for it!)

In my testing, I used Scylla on AWS, but it’s applicable to any other system.

Introduction

I assume that you already know about Grafana (if not, get started here). If you are using Grafana on top of Prometheus where the latter serves as the metrics server, you should know that Prometheus can generate alerts. Although Grafana can read those alerts directly from Prometheus, it’s best to use the Alertmanager between them. In those setups, Prometheus sends alerts to the Alertmanager that acts as a data source for Grafana.

The nice thing about this setup is that Alertmanager can do more things with alerts like sending emails or Slack messages.

Loki is a log aggregation tool that is part of Grafana’s ecosystem with a query language similar to Prometheus. Loki can serve as a data source for Grafana, so you can look at logs inside a dashboard. With the 2.0 release Loki can also act as an alert source. This means that you can generate alerts like you would do in Prometheus: point those alerts to your Alertmanager and you would get, for free, the same notifications and representation your Prometheus alerts have.

The benefit of sending the alerts directly from Loki is that it is possible to get some extra information while doing so, and I will show that below.

Setup

We are going to have two servers: one for the monitoring and for the reporting. However, typically there will be many reporting servers and one single monitoring server.

In my tests, I used an image of Scylla, a high performance NoSQL database, running on an AWS instance. This allows me to gather a lot of useful monitoring information.

For clarity, I will address the two servers that I’ve outlined earlier as Monitoring for the monitoring stack and the Scylla instance as the reporting server.

Loki, Promtail and rsyslog

rsyslog is a popular log collection and distribution system, rsyslog can send your system logs both locally and to multiple log server destinations. In this case, I have it running on the Scylla reporting server.

Loki uses promtail as its agent to collect logs. While promtail can run on each of the reporting machines and collect logs information, it can also be configured to receive logs data using the Loki Push API or it can be set up as a syslog target. In these cases, you have one promtail process running on the same server as Loki.

In our case, we will use the setup outlined above, so we don’t have to install additional agents on the reporting servers.

rsyslog configuration

rsyslog pushes the logs to the monitoring target, assuming the IP address of the monitoring system is 10.99.0.1.

Let’s login to the reporting server, to make sure that rsyslog is installed and up and running:

$ systemctl status rsyslog

In case if it is not, go ahead and install it and also make sure that it’s running correctly.

First, edit /etc/rsyslog.conf and add the following line at the end of the file:

if $programname == 'scylla' then @@10.99.0.1:1514;RSYSLOG_SyslogProtocol23Format

Note, the line above is an advanced example that only sends logs generated by Syclla. In case if you want to send everything to your log server, you can use following:

@@10.99.0.1:1514;RSYSLOG_SyslogProtocol23Format

Now, let’s restart the rsyslog daemon:

$ sudo systemctl restart rsyslog

We’re done here.

Configuring the Monitoring stack

On the monitoring server, we are going to run Grafana, Alertmanager, Loki, and promtail services and we are going to use Docker containers for sake of simplicity.

I assume the docker service is already running on your server and that you already have Grafana and Alertmanager up and running with Grafana configured to read data directly from the Alertmanager.

As an example, you can look at the Scylla Monitoring Stack GitHub repository that’s already configured to run such a monitoring stack. (New to Scylla Monitoring Stack? Learn more in our free Scylla University course here.)

Running loki and promtail

As the next step, we are going to configure our services to read the configuration and the alerts from log files. You can read more about installing those using Docker here.

On top of that configuration, you should do the following:

Loki rule files

Before you start, execute the following command:

$ mkdir -p loki-rules/fake/

This command will create a directory where the rules files will be stored.

Here’s an example of a rule file:

$ cat loki-rules/fake/loki-rule.yaml
groups:
  - name: example
    rules:
    - alert: HighThroughputLogStreams
      expr: sum by (instance, msg) (count_over_time({module="large_data"} |regexp`(?P.*)` [1h])>0)
      for: 20s
      labels:
        severity: "2"
      annotations:
        description: '{{ $labels.instance }} {{ $labels.msg }}
memory.

Now you should create a file in that directory by following the example above.

We’ll go back to this file later.

Next, run the Loki container by executing the following commands:

$ docker run -d -v $(pwd)/loki-conf:/mnt/config -v $(pwd)/loki-rules:/etc/loki/rules -p 3100:3100 --name loki grafana/loki -config.file=/mnt/config/loki-config.yaml

That would start a container named loki.

You can check status of the container with the following command:

$ docker logs loki

Check and see if there are no errors.

Find out what the IP address this container is using:

$ docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' loki
172.17.0.5

From the command above we are determined that the IP address is 172.17.0.5.

Now, let’s edit the promtail config file to reflect that information. Here is an example file:

$ cat promtail_config.yml
server:
  http_listen_port: 9080
  grpc_listen_port: 0
 
positions:
  filename: /tmp/positions.yaml
 
clients:
  - url: http://172.17.0.5:3100/loki/api/v1/push
 
scrape_configs:
  - job_name: syslog
    syslog:
      listen_address: 0.0.0.0:1514
      labels:
        job: "syslog"
    relabel_configs:
      - source_labels: ['__syslog_connection_ip_address']
        target_label: 'instance'
      - source_labels: ['__syslog_message_app_name']
        target_label: 'app'
      - source_labels: ['__syslog_message_severity']
        target_label: 'severity'
 
    pipeline_stages:
      - match:
          selector: '{app="scylla"}'
          stages:
            - regex:
                expression: "\\[shard (?P\\d+)\\] (?P\\S+).*"
            - labels:
                shard:
                module:
      - match:
          selector: '{severity="warning"}'
          stages:
            - metrics:
                warning_total:
                  type: Counter
                  description: "total count of warnings"
                  prefix: scylla_logs_
                  config:
                    match_all: true
                    action: inc

What you’ve learned so far from the sample file above is:

  1. We are configuring the syslog job, here we open the port for incoming rsyslog traffic. See also how we extract the labels from the message. We’ll come back to it at the end.
  2. The regular expression that we are using to match and create a shard and module labels. Those shard and module labels are something that every Scylla log has and they will be useful for indexing purposes.
  3. The last match creates a metric, this used to be the way to get alerts from Loki, I’ll leave it here as a reference, but we are not going to use it.

Now, let’s start promtail:

$ docker run -d -p 9080:9080 -v $PWD/promtail_config.yml:/etc/promtail/config.yml --name=promtail grafana/promtail:2.0.0 --config.file=/etc/promtail/config.yml

You can now login to http://{IP}:9080/targets and you should see alerts coming from your server. Note that this example is Scylla specific and you will need to make appropriate changes for any other services, I suggest that you start with minimal filtering so you will start with actual logs and will be able to add more filtering later.

If all works as planned, switch to your Grafana dashboard. There you can see the Loki metrics by adding a loki data-source (remember to use the Loki container IP) and use the explore option from the menu.

But, let’s talk about alerts. In the following example, I have created an alert in Scylla (without going into the details, I enter a large cell to a table, which is something that needs to be avoided).

In my Grafana dashboard, I already have an alert table and we can see that the alert is there. Note that in this example I didn’t change anything in the Grafana configuration and in fact, Loki is not a data source here.

Explaining the alert rule

Minimal cardinality and labels

In Loki 2.0 there are two kinds of labels, the first are labels that are created when the logs get collected. Those labels are used for indexing and expect to have a low cardinality.

If you are not familiar with the notion of low-cardinality, it’s a fancy way of saying that a parameter can have relatively few possible values, which implies that if you cut your data by that specific value you will get a big chunk of the data.

Loki suggests that you only use labels for low-cardinality values.

If you have experienced Prometheus/Alertmanager alerts in the past, you know that the information you receive is passed by labels. In our trace line, we have very specific information, it states the keyspace, table, and column of the big cell that causes the problem, for cases like this, there are query-time labels. Those are labels that are collected when the log lines are being queried and can be used in the expression and in the alert.

Creating labels at query time

Look back at loki-rules/fake/loki-rule.yaml.

You can see that we use the |regexp to create name labels from the log line. In our example, we simply copy the entire line into a msg label. You can see the result in our alert table, see both the msg label and the description.

Take away

  1. When creating a label, postpone high-cardinality labels for the query time and in general, fewer labels are better.
  2. Files location, if you read your rules from files locate them inside a name directory (in my example it’s called fake/) and map the directory that holds that (in my case loki-rules/) to the directory in your Loki configuration.
  3. You can use promtail as an rsyslog server when you point your rsyslog on each of the Monitored servers to it.
  4. You can create alerts directly on Loki and point them to your Alertmanager.

Already using Loki? Inspired to try it now? Let us know!

We believe Loki will be of great use across the cloud computing community far beyond Scylla. Though if you are using Loki with Scylla, or are now inspired to try it out, we’d love to hear. Feel free to contact us, or drop in and let the community know in our Slack channel.

The post Using Grafana Loki 2.0 as an Alert Source appeared first on ScyllaDB.

[Webcast] The 411 on Storage Attached Indexing in Apache Cassandra

Scylla Manager Supports Google Cloud Storage

Scylla Manager automates the backup process and allows you to configure how and when backups occur. Until now, Scylla Manager supported only AWS S3 compatible APIs as a backup storage. Starting with Scylla Manager 2.2 we’ve added Google Cloud Storage (GCS) to the list.

Having support for backup in Google Cloud Storage allows Scylla clusters deployed on Google Cloud Engine to be protected from disasters, and also minimizes the time needed to clone or fast migrate your clusters.

This article shows how to perform backup of your data to a GCS bucket.

Backup

Let’s assume we already have a Scylla cluster up and running. We will use 3x n1-standard-4 nodes using local NVMe drives.

First let’s ensure that all Scylla Manager Agents are upgraded to at least version 2.2. We can use sctool status to check Agent version:

Let’s schedule a daily backup of the entire cluster using a “manager-prod-backup” bucket. You have to use the “gcs” prefix in the location parameter in order to tell Scylla Manager that you want to upload to Google Cloud Storage.

$ sctool backup --cluster prod-cluster --location gcs:manager-prod-backup --retention 7 --interval 1d

After a while, we can check if backup is running:

And after around 20 minutes the backup is finished:

Let’s check bucket content. You can see there are three directories under the “backup” directory:

  1. meta – contains metadata about backup like cluster node IDs, names of keyspaces and tables etc.
  2. schema – contains schema definitions for each backed up keyspace.
  3. sst – contains SSTable files.

If you’re interested in more details about backup management, make sure to check our recent blog about backups.

Efficiency

Let’s check if and how Scylla Manager performs during backup procedure and how it affects latency.

Our cluster was under ~30% mixed load, as you can see on graphs the effect of running backup is negligible (backup started at 16:43 and ended 17:03):

Scylla Manager Agent tries to be efficient in terms of CPU and memory usage to not affect workload latency. You can see that each Agent used around 300 MiB of memory during backup and releases it once backup is done.

Summary

Ease of configuration and efficiency of the Scylla Manager Agent are very important in order to achieve best results, that’s why we constantly work on improving our support for storage providers and expanding the list of supported ones.
If you have a suggestion on what we should support next we would like to hear your feedback. Enterprise customers can open a ticket with our support team. Open Source users can always reach us on Slack, or contact us privately.

The post Scylla Manager Supports Google Cloud Storage appeared first on ScyllaDB.