Scylla Open Source Release 3.0.4

Scylla Open Source Release Notes

The ScyllaDB team announces the release of Scylla Open Source 3.0.4, a bugfix release of the Scylla 3.0 stable branch. Scylla 3.0.4, like all past and future 3.x.y releases, is backward compatible and supports rolling upgrades.

The major fix in this release is for a rare race condition between a schema update (adding or removing a column) and an initiated compaction, which may cause the compaction to write values to the wrong columns. The short time window where this may occur is just after compaction was initiated but before it starts writing to storage. Should the race condition occur, there may be a crash or validation/run-time error on the driver side. #4304

Related links:

Other issues solved in this release:

  • commit log: On rare cases, when two chunks of the commit log or hinted handoff are written simultaneously, they might overlap, cause one to write on the header of the other, resulting in a “commitlog files are corrupted: Checksum error in segment chunk at XXXX” message. #4231
  • Materialized Views: a mistake in flow control algorithm result with Scylla fail to apply back pressure to the client #4143
  • Scylla Driver: Using Scylla shard aware drivers may cause unbalanced connections, where one shard has tens of thousands of connections, while the others do not, resulting in a temporary slow down #4269
  • MC FIle format: large promoted indexes might require large allocations and cause performance problems or fail with bad_alloc. This release does not fix the problem completely but makes it less severe. #4217
  • MC Format: in some cases, writing an uncompressed mc sstable in debug mode fails with EINVAL due to unaligned DMA address #4262
  • CQL: Scylla does not support insertion of null data through JSON inserts #4256. For example:
    INSERT INTO mytable JSON '{
        "myid" : "id2",
        "mytext" : "text234",
        "mytext1" : "text235",
        "mytext2" : null
  • Housekeeping: scylla-housekeeping for servers with python version older than 3.6, reporting “TypeError: the JSON object must be str, not 'bytes'#4239
  • Streaming: rebuilding a node may cause long reactor stalls on large clusters #3639
  • Bootstrap: A bootstrapping node doesn’t wait for a schema before joining the ring, which may result in the node failing to bootstrap, with the error “storage_proxy - Failed to apply mutation”. In particular, this error manifests when user-defined type is used. #4196
  • Authentication: possible access to uninitialized memory in password_authenticator #4168
  • fragmented_temporary_buffer::read_exactly() allocates huge amounts of memory on premature end-of-stream, result with bad_alloc when reading from CQL, if the connection is closed in the middle of a frame, and other places #4233
  • CQLSh now use TLSv1.2 by default

The post Scylla Open Source Release 3.0.4 appeared first on ScyllaDB.

Scylla Enterprise Release 2018.1.11

Scylla Enterprise Release Notes

The ScyllaDB team announces the release of Scylla Enterprise 2018.1.11, a production-ready Scylla Enterprise minor release. Scylla Enterprise 2018.1.11 is a bug fix release for the 2018.1 branch, the latest stable branch of Scylla Enterprise. As always, Scylla Enterprise customers are encouraged to upgrade to Scylla Enterprise 2018.1.11 in coordination with the Scylla support team.

The major fix in this release is for a rare race condition between a schema update (adding or removing a column) and an initiated compaction, which may cause the compaction to write values to the wrong columns. The short time window where this may occur, is just after compaction was initiated but before it starts writing to storage. Should the race condition occur, there may be a crash or validation/run-time error on the driver side. #4304

Other fixes issues in this release are listed below, with open source references, if present:

  • Using Scylla shard aware drivers may cause unbalanced connections, where one shard has tens of thousands of connections, while the others do not, resulting in a temporary slow down #4269
  • Reads from system tables do not time out as they should, which might create large queues of reads if connections were opened too often.
  • Rebuilding a node may cause long reactor stalls on large clusters #3639
  • A bootstrapping node doesn’t wait for a schema before joining the ring, which may result in the node failing to bootstrap, with the error storage_proxy - Failed to apply mutation. In particular, this error manifests when user-defined type is used.  #4196
  • cqlsh: Make TLSv1.2 default

Related Links

The post Scylla Enterprise Release 2018.1.11 appeared first on ScyllaDB.

From SAP to Scylla: Tracking the Fleet at GPS Insight

From SAP to Scylla: Tracking the Fleet at GPS Insight

“Scylla is the ideal database for IoT in the industry right now. Especially without the garbage collection that Cassandra has. Small footprint. It just does what you need it to do.”

— Doug Stuns, GPS Insight

Doug Stuns began his presentation at Scylla Summit 2018 by laying out his company’s goals. Founded in 2004, GPS Insight now tracks more than 140,000 vehicles and assets. The company collects a wide variety of data for every one of those vehicles. Battery levels, odometer readings, hard stops, acceleration, vehicle performance, emissions, and GPS data to determine route efficiency.

By the time Doug was brought onboard, they knew they needed to move away from their exclusively SQL SAP Adaptive Server Enterprise (ASE) architecture to include NoSQL for real-time vehicle data management. “They had effectively too much machine data for their relational database environment,” which had been built 8 to 10 years prior. All the GPS data, all the machine data, and all the diagnostic data for monitoring their fleet was being ingested into their SQL system.

Over time, their SQL database just was inadequate to scale to the task. “As the data got larger and larger, it became harder and harder to keep the performance up on all this machine data in a relational database. So they were looking at NoSQL options, and that’s where I came in on the scene for GPS.”

GPS Insight was, at that point, growing their data by a terabyte a year just for diagnostics. Trying to create JOINs against these large tables was creating terrible performance. An additional problem was deleting old data. “With no TTL in a relational database, what are you stuck with? A bunch of custom purge scripts,” which also impacted performance.

“My task was to find a NoSQL-based solution that would handle this. When I originally came to GPS Insight we were 100% going down the Cassandra road… I thought Hadoop was a little too ‘Gen One’ and a little too cumbersome. So I started off heading down this Cassandra road and I stumbled upon Scylla. And from there we decided to do a POC, since we had the opportunity to go with with the best technology.” For example, one major issue Doug did not like about Cassandra was having to maintain an oversized cluster mainly because of Java Virtual Machine (JVM) garbage collection.

Proof of Concept

In the POC, Doug took a year’s worth of data and stood up two 3-node clusters on AWS, consisting of i3.2xlarge machines. They fired off a loading utility and began to populate both clusters.

“We loaded the Scylla data in, like, four or five hours, while it took us close to two days to load the Cassandra data.”

For production, which would scale to a ten-node cluster, Scylla solution engineers advised creating a stand-alone Scylla loader, consisting of a “beefed-up” m4.xl10 instance, with 40 processors on it. On that node, Doug installed the client libraries, and created sixteen child processes.

After loading the cluster, Doug created stress tests: one in PHP and one in golang using the respective language drivers. Both were quick to deploy. “No issues there.”

When they first ran the tests, there was no difference between Scylla and Cassandra. But as they kept increasing load, Scylla scaled while Cassandra quickly topped out. Scylla could execute reads at 75,000 rows per second, while Cassandra was limited to 2,000 rows per second.

Getting to Production

With the POC complete and knowing Scylla could handle the load, it was time to implement. The team’s concerns turned to what business logic (stored procedures, etc.) were already in SAP, and what they would put in Scylla. They decided to focus exclusively on GPS data, diagnostic data, and hardware-machine interface (HMI) data. This would come from the device GPS Insight installed in every vehicle, sending updates on fifteen to twenty data points every second.

For production, GPS Insight decided to deploy a hybrid topology, with five nodes on premises and five nodes in the cloud. The on-premises servers were Dell 720s. “Definitely not state-of-the-art.” While they had 192 GB of RAM, they had spinning HDDs for storage. For the cloud, Doug said they used i3.4xlarge nodes, each with 4TB NVMe storage.

GPS Insight Production Topology

Next, Doug set up a Spark 2.3 node for the developers. Looking at the set of twenty queries they needed to run, they worked together to define a diagnostic Data Definition Language (DDL) to be able to communicate between their Scylla cluster and the SQL system. It included both non-expiring data that needed to be “kept forever” for contractual reasons (such as fuel consumption) as well as data that had the exact same schema, but had a five-month TTL expiration. “Each table was exactly the same minus the partitioning clause, and the primary key and other ordering constraints so you could get the data back to support the queries.” It also solved various data purging problems.

They partitioned one table using a compound key of account ID and vehicle identification number (VIN). The next table was identical, but partitioned only by account ID. The next was by account ID and category. They had recently added some time series data as well.

“The beauty of that for the developers was we barely had to change our PHP code and golang code. We were just able to add the drivers in and the reporting mechanism worked almost without any changes.”

They audited and validated the reports from Scylla against SAP ASE. Audit reports would expire after five months.

GPS Insight DevOps Methodology

To support the developers, Doug created a three keyspace software development lifecycle environment:

  • A dev keyspace, where the developers can do what they wish.
  • A keyspace that matches production, to which developers can commit their changes, and Doug can promote to production, and
  • A production keyspace, which can take any unplanned changes back to dev.

As a side benefit of offloading the raw data management to Scylla, Doug explained that this approach now allowed the SAP ASE team the ability to upgrade the SQL system since load was nowhere near as large nor uptime so critical as it had been.

“Scylla has allowed GPS Insight an inexpensive and efficient way of offloading machine data from SAP ASE while having even better visibility and performance.”

— Doug Stuns

Want to learn more? Watch the full video below:

If after watching you have any more questions or ideas regarding your own Big Data projects, make sure to join our Slack channel, or contact us directly.

The post From SAP to Scylla: Tracking the Fleet at GPS Insight appeared first on ScyllaDB.

Speculative Query Executions with GoCql

Speculative query executions in Cassandra are an interesting solution for some use cases, such as faulty/slow/unresponsive nodes or network issues. This kind of execution allows a client to launch a DB request for multiple endpoints at the same time and let these requests compete for the winning response. This would only work if the query itself is defined as idempotent, that is, it renders the same result no matter how many times it is run. We’ve written about idempotence and speculative query execution before, so head over to refresh your memory if needed.

In most cases, using speculative query execution would not lead to performance improvement in normal execution (even though in some edge cases it might), but improve the reliability of queries to get a response from the server. In other cases, it might improve overall execution time for queries, as getting the response from the fastest node does save time. One should also remember, that while this can improve the reliability of the queries or overall execution time, it will be at the cost of using more resources (mostly CPU/Network) to do so.

A few use cases for using speculative queries:

  1. A node you are querying is down and your SLAs require a response from the server within a timeframe that is lower than Cassandra timeout.
  2. A node is flaky, leading to inconsistent response times or dropped queries.
  3. A node is returning timeout errors requiring a client application to retry the query on another node.

Speculative query execution has been a feature of the Java driver for quite some time now, and we have recently included similar functionality into the GoCql driver. As mentioned above, it allows a user to run the same query on multiple hosts at the same time, letting the executions to compete for the winning spot. The first query to get a response will win and be returned to the client.

In order to use the speculative query execution with this change, one must define the query as Idempotent and provide an execution policy:

    cluster := gocql.NewCluster("", "", "")
    sp := &SimpleSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond}
    session, err := cluster.CreateSession()
    // Build the query
    qry := session.Query("speculative").SetSpeculativeExecutionPolicy(sp).Idempotent(true)

As can be seen from the example above, we’ve used a SimpleSpeculativeExecution policy, which is the one implemented in the driver. It is a very simple policy, one that defines the number of additional executions (that is in addition to the original request), and the constant delay between these executions. One could implement their own policy easily for example, to have a policy that pauses incrementally longer between additional executions, one could build the following policy:

type IncreasingSpeculativeExecution struct {
        NumAttempts  int
        TimeoutDelay time.Duration

func (sp *IncreasingSpeculativeExecution) Attempts() int        { return sp.NumAttempts }
func (sp *IncreasingSpeculativeExecution) Delay() time.Duration {
    sp.TimeoutDelay += 50 * time.Millisecond
    return sp.TimeoutDelay

And then use it in the query execution:

    cluster := gocql.NewCluster("", "", "")
    sp := &IncreasingSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond}
    session, err := cluster.CreateSession()
    // Build the query
    qry := session.Query("speculative").SetSpeculativeExecutionPolicy(sp).Idempotent(true)

To show an example for using speculative query executions, we’ll use a 3-node Cassandra cluster. The use case that we’re going to explore is going to be a slow node, which we are going to simulate using a simple <highlight>tc</highlight> tool that comes as a part of the iproute2 package. Our example is going to be a bit extreme, but hopefully, it conveys the idea of when speculative queries might be useful.

To simulate a slow node, run the following command on one of the nodes:

sudo tc qdisc add dev eth0 root netem delay 250ms

This adds 250ms delay to all outbound packets for the given physical device (eth0 in the above case). Then we use the following client code to run the tests:

/* Before you execute the program, Launch `cqlsh` and execute:
create keyspace example with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
create table example.tweet(timeline text, id UUID, text int, PRIMARY KEY(id));
create index on example.tweet(timeline);
package main

import (


type hostMetrics struct {
        attempts int
        latency  int

// The observer type to watch the queries data
type testQueryObserver struct {
        metrics map[string]*hostMetrics
        verbose bool

func (o *testQueryObserver) ObserveQuery(ctx context.Context, q gocql.ObservedQuery) {
        host := q.Host.ConnectAddress().String()
        curMetric := o.metrics[host]
        curAttempts := 0
        curLatency := 0
        if curMetric != nil {
                curAttempts = curMetric.attempts
                curLatency = curMetric.latency
        if q.Err == nil {
                o.metrics[host] = &hostMetrics{attempts: q.Metrics.Attempts + curAttempts, latency: curLatency + int(q.Metrics.TotalLatency/1000000)}
        if o.verbose {
                fmt.Printf("Observed query %q. Returned %v rows, took %v on host %q with %v attempts and total latency %v. Error: %q\n",
                        q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.Attempts, q.Metrics.TotalLatency, q.Err)

func (o *testQueryObserver) GetMetrics() {
        for h, m := range o.metrics {
                fmt.Printf("Host: %s, Attempts: %v, Avg Latency: %vms\n", h, m.attempts, m.latency/m.attempts)

// Simple retry policy for attempting the connection to 1 host only per query
type RT struct {
        num int

func (rt *RT) Attempt(q gocql.RetryableQuery) bool {
        return q.Attempts() <= rt.num

func (rt *RT) GetRetryType(err error) gocql.RetryType {
        return gocql.Rethrow

func main() {

        specExec := flag.Bool("specExec", false, "Speculative execution")

        // the number of entries to insert
        cycles := 10000

        // connect to the cluster
        cluster := gocql.NewCluster("...")
        cluster.Keyspace = "example"

        // the timeout of one of the nodes is very high, so let’s make sure we wait long enough
        cluster.Timeout = 10 * time.Second
        cluster.RetryPolicy = &RT{num: 3}
        session, err := cluster.CreateSession()
        if err != nil {
        defer session.Close()

        observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), verbose: false}
        for i := 0; i < cycles; i = i + 1 {
                r := rand.Intn(10000)
                u, _ := gocql.RandomUUID()
                query := session.Query(`INSERT INTO example.tweet (id, timeline, data) VALUES (?, 'me', ?)`, u, r).Observer(observer)
                // Create speculative execution policy with the timeout delay between following executions set to 10ms
                sp := &gocql.SimpleSpeculativeExecution{NumAttempts: 2, TimeoutDelay: 10 * time.Millisecond}
                // Specifically set Idempotence to either true or false to constrol normal/speculative execution

        // wait a sec before everything finishes
        <-time.After(1 * time.Second)

        // Print results

This code is also available at

This client code will insert 10000 entries into the cluster. As we’re using random numbers for the key column (id), all the queries are expected to be distributed more or less evenly among the nodes. Now, when we start the cluster and execute the client, we notice the following:

<shell output>
admin@ip-10-0-17-222:~/go$ time go run spectest.go


Host1: <ip>, Attempts: 3334, Avg Latency: 502ms
Host2: <ip> , Attempts: 3333, Avg Latency: 2ms
Host3: <ip>, Attempts: 3333, Avg Latency: 2ms

real    28m21.859s
user    0m2.920s
sys     0m1.828s

</shell output>

So it takes about half an hour to run the queries because one of the nodes has a constant delay of about half a second. When running with speculative execution mode, we get:

<shell output>
admin@ip-10-0-17-222:~/go$ time go run spectest.go --specExec


Host2: <ip>, Attempts: 5000, Avg Latency: 1ms
Host3: <ip>, Attempts: 4999, Avg Latency: 2ms

real    1m24.493s
user    0m3.900s
sys     0m3.072s

</shell output>

Not only we don’t ever see the “problematic” node responding to the query, but all queries are also split between the 2 other “fast” nodes, taking only about a 1.5 minutes to complete.

As for the overhead of this improvement, there’s a jump in open sockets while running in speculative mode vs non-speculative:

Open sockets while running in speculative mode vs non-speculative

As can be seen, the number of open sockets jumps from about 147 to 153 (about 5% jump). There are no significant increases in CPU, memory or io utilisation.

This might be considered a good trade-off, but remember that this is what happens on a single, very simple client on a performant host (m4-2xlarge) that only ran this test. In other use cases, such as smaller instance, busy client, or client running many queries in parallel, this may have a more significant effect on system resources utilisation. As a result, it is highly recommended to test the benefits of using speculative queries in your pre-production environment before deploying in production.

After this feature was released, a few issues were identified and have already been fixed, thanks to the community of contributors. It is in good shape now and is ready for general use. Enhancements planned for this feature include running the query observer (useful for monitoring query executions) in the main goroutine, which will make it more convenient to debug failures in the client code.


The post Speculative Query Executions with GoCql appeared first on Instaclustr.

Anomalia Machina 9 – Anomaly Detection at Scale: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra

In the previous blog, we deployed the Anomalia Machina application in a production-like Kubernetes environment.  In this blog, we test it out and see many anomalies it can detect at scale on small Kafka and Cassandra Instaclustr production clusters.

Coming towards the end of our epic odyssey we now have a final challenge to overcome. Can we ride out the storm and scale the application?

Anomalia Machina - Poseidon stirs up a storm

Poseidon stirs up a storm

Kubernetes Resource Management (Replicas, Pods, Nodes)

I ended my initial Kubernetes experiments (Anomalia Machina 7) by accidentally exploding my Kubernetes cluster – I overloaded the Kubernetes worker nodes with too many Pods. This time we’ll try and scale correctly. As we discovered, Kubernetes by default assumes pods require no resources, so will (try) to create an infinite number of Pods on a finite number of Nodes. Here’s the Kubernetes resource management documentation. In the kubernetes configuration file, you can specify the minimum (requests) and maximum (limits) resources a Pod requires to run in terms of CPU and/or memory resources. This is how Pods are scheduled to Nodes:

When you create a Pod, the Kubernetes scheduler selects a node for the Pod to run on. Each node has a maximum capacity for each of the resource types: the amount of CPU and memory it can provide for Pods. The scheduler ensures that, for each resource type, the sum of the resource requests of the scheduled Containers is less than the capacity of the node.

If a Pod exceeds its resource limits then it might be evicted from the node. We won’t use limits.

It’s therefore easy enough to force Kubernetes to allocate only one Pod per Node, and at the same time prevent accidental overloading of the nodes, by setting the resources request to the number of cores each EC2 instance provides. If we pick a small instance size, with say 2 cores, and use a resource request of 2 (cpu: “2000m” below) this gives us the flexibility to scale up in small increments:

apiVersion: apps/v1
kind: Deployment
  name: consumer-deployment
    app: consumer
  replicas: 1
      app: consumer
        app: consumer
      - name: consumer
        image: myimages/consumer:latest
        - containerPort: 1235
            cpu: "2000m"

This also gives us the opportunity to tune the application so it’s scalable in units of 2 cores. Our anomaly detection application pipeline consists of a Kafka consumer and a Cassandra client (and detector algorithm) in two separate thread pools. After some experimentation, we settled on 1 thread for the Kafka consumer and 20 threads for the Cassandra client as optimal. Kafka consumers are fast and return many events per poll, and Kafka scales best with fewer consumers. However, the writes and reads to/from Cassandra and the detector algorithm are slower, so more threads are needed to keep up with the consumer.

This also means that each Pod has 1 Kafka consumer and at least one Cassandra connection (Cassandra uses connection pooling by default, so the 20 threads can use 1 or more connections depending on configuration). As we scale further, we need to ensure that the number of Kafka Partitions on the topic is >= the number of Pods (otherwise any excess Kafka consumers, and therefore Pods, will be idle). Cassandra doesn’t have a maximum connection limit, each Cassandra node will easily handle 100s of connections, and the Java client automatically manages the connection pool.

Anomaly Detection at Scale

Checking a few events for anomalies is easy, but what happens when there are many events to check?

Anomalia Machina - Sheeps

In order to scale the application, we need sufficient worker nodes/EC2 instances. As discovered in the last blog, Kubernetes by default doesn’t automatically manage the instances.

Unlike pods and services, a node is not inherently created by Kubernetes: it is created externally by cloud providers.

In AWS EKS the simplest way of increasing worker nodes is to manually increase the desired and maximum values in the worker nodes auto scaling group.  Is it possible to actually get autoscaling working? Yes, in theory. There are two aspects to Kubernetes autoscaling: Pods and Nodes.

The Horizontal Pod Autoscaler scales the number of Pods based on triggers such as CPU utilisation. As such it’s probably not a good fit for this benchmarking use case where we want each Pod to run flat out and use close to 100% of the resources available to it.  

The Cluster Autoscaler scales the worker nodes based on pending Pods.  Although the Cluster Autoscaler is the de facto standard for automatic scaling in Kubernetes, it is not part of the main release.  Here are some blogs on how to run the Cluster Autoscaler on AWS. And here’s a good blog explaining the differences between the scaling approaches.

One reason I ran into problems initially with pod scaling was that I naively assumed Kubernetes was more intelligent that it actually is (by default). I thought it could somehow automatically work out resource usage for Pods. Well, it turns out that I wasn’t completely crazy. There’s also a second Pod autoscaler called the Vertical Pod Autoscaler which sets resource requests on pod containers automatically based on historical usage. This is a good blog which covers all three autoscalers.

Anomalia Machina - Odysseus nasty encounter with Polyphemus

Odysseus had a nasty encounter with the giant shepherd Polyphemus but used the flock of sheep to escape the blinded Cyclops.

Results: 18,000 Anomaly Checks per second

Finally, we can reveal some results. I spun up (small) production sized Cassandra (version 3.11.3) and Kafka (version 2.1.0) clusters in the same region as my AWS EKS cluster. Here are the cluster and the worker nodes and producer instance details:


Cassandra Cluster

3 nodes x 8 cores = 24 cores (i3.2xlarge, 1900GB SSD, 61GB RAM, 8 cores)

Kafka Cluster

3 nodes x 4 cores = 12 cores (r4.xlarge-750, 750GB Disk, 30.5GB RAM, 4 cores)

Kubernetes Worker Nodes

10 nodes x 2 cores = 20 cores (c5.large)

Kafka Producer

1 node x 8 cores = 8 cores (m4.2xlarge)


For simplicity, I ran the Kafka producer flat out (around 2 million TPS) for long enough to create sufficient data (Billions of events) in Kafka to run the rest of the pipeline and produce consistent results. I then gradually increased the number of Pods from 1 to the maximum number of worker nodes available, 10, by increasing the number of replicas with this command:

kubectl scale deployment consumer-deployment --replicas=10

Using the Prometheus monitoring metrics, the maximum throughput obtained for the business level metric (Anomaly Checks per second) was 18,000 TPS, with all the worker nodes and Cassandra cluster at 100% CPU Utilisation. Not surprisingly, the throughput didn’t increase further with more worker nodes. The Kafka cluster had plenty of spare CPU capacity. Here’s a graph showing these results:

Anomalia Machina - Cassandra Cluster results

What’s also informative is to understand the throughput (events in and out) for each sub-system (Kafka load generator, Kafka cluster, Kubernetes cluster, and Cassandra cluster), and compute the total system throughput as the sum of the subsystem throughputs. This will give us extra insights such as, how much of the total workload each subsystem handles, and how many events in total are required to perform each anomaly check.

We assume a Kafka load generator rate of Z events per second, and a steady state (i.e. all subsystems keep up with this event rate). Specifically, this means that the business metric, Anomaly Checks per second (step 9 below) is also equal to Z.

The subsystem throughputs are: Kafka load generator (step 1: 1 x Z), Kafka cluster (steps 2, 3, 10: 3 x Z), Kubernetes cluster (steps 4, 5, 8, 9: 4 x Z), and Cassandra cluster (steps 6, 7: 2 x Z). The total system throughput is, therefore: (1 + 3 + 4 + 2) x Z = 10 x Z.  i.e. The system throughput is an order of magnitude higher than the business metric.

Anomalia Machina 9 - Kubernetes Cluster Throughput

For the initial small production cluster this gives a system throughput of: 18,000 x 10 = 180,000 events/s. This stacked bar graph shows the breakdown for the subsystems and the total. The load generator rate on the bottom is the same value as Anomaly Checks per second (18,000):

Anomalia Machina - System events

Next blog we’ll keep scaling the system and see how far we can go.


The post Anomalia Machina 9 – Anomaly Detection at Scale: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra appeared first on Instaclustr.

Deep Dive into the Scylla Spark Migrator

Scylla and Spark

Another week, another Spark and Scylla post! This time, we’re back again with the Scylla Spark Migrator; we’ll take a short tour through its innards to see how it is implemented.

  • Read why we implemented the Scylla Spark Migrator in this blog.


When developing the Migrator, we had several design goals in mind. First, the Migrator should be highly efficient in terms of resource usage. Resource efficiency in the land of Spark applications usually translates to avoiding data shuffles between nodes. Data shuffles are destructive to Spark’s performance, as they incur more I/O costs. Moreover, shuffles usually get slower as more nodes are added (which is the opposite of the scaling model we like!).

Beyond resource efficiency, the Migrator was designed to perform decently out of the box with relatively little tuning: the default configuration splits the source table into 256 ranges that are transferred in parallel; on each executor, 8 connections are opened to Cassandra and 16 to Scylla; rows are fetched in batches of 1000 and TTL/WRITETIME timestamps are preserved. Of course, these parameters can be tuned using the configuration file.

With these goals in mind, let’s recap how the Migrator works:

  • When launched, the Migrator reads the schema definition for the source table from Cassandra;
  • The schema is used to create the CQL selection;
  • If timestamp preservation is enabled, the TTL and WRITETIME timestamps for non-key columns are also added to the CQL projection;
  • The rows are fetched in chunks from Cassandra, and each chunk is written to Scylla;
  • As rows are written, the token ranges that have been processed are tracked and periodically saved.

Sounds pretty straightforward! We’ll dive into how these steps are implemented in the following sections.

Using the table definition to create the DataFrame schema

As we’ve mentioned in the post about DataFrames, every DataFrame in Spark has an associated schema. The schema is used to validate the queries against the DataFrame, optimize them and so forth. When the data source for the DataFrame is structured, it is very sensible to infer the schema from the data source.

When creating a DataFrame from a Cassandra table, the Cassandra connector will happily infer the schema for us. However, creating DataFrames is limited to using the table itself. The Migrator needs to add additional expressions to the table scan, so we’re going to build the schema manually using the connector’s infrastructure.

We start off by creating an instance of CassandraConnector. This class bundles the management of connections to Cassandra for the driver and the executors:

The connector’s configuration can be derived from Spark’s configuration. The actual initialization in the Migrator is slightly different, as we extract the configuration parameters from a YAML file rather than through Spark’s configuration mechanism. With the connector defined, we can use it to build the schema.

The connector provides a data type called TableDef:

The TableDef data type represents everything we need to know about the structure of a Cassandra/Scylla table. Most importantly, it includes several sequences of ColumnDef instances that describe the columns of the table.

Now, TableDef is a plain case class, so we could construct it manually (and that could be useful when creating new tables from DataFrames not originating in Cassandra), but in this case, we’d like to infer it from an existing table. There’s a handy method for this:

tableFromCassandra will use the connector to fetch the schema data from Cassandra and create the TableDef instance. There’s actually another method, Schema.fromCassandra, that can fetch the definitions for all keyspaces and all tables on Cassandra, but we won’t use it.

Using the TableDef instance, we should be able to construct the DataFrame schema. These schemas are specified as a StructType; this is a data type representing a record:

So essentially, each ColumnDef should map to a StructField. There’s one last missing piece for this puzzle: how do we actually do that? How do we convert the ColumnType associated with a ColumnDef to that?

Luckily, the DataTypeConverter object has what we need:

This function will take any ColumnDef and spit back a StructField. So we can just do the following to create our StructType schema:

Great! So now, we can move forward with actually modifying the schema for our needs.

Building a custom query for a table scan

The motivation for building the schema manually is to include in it the expressions for TTL and WRITETIME timestamps for the individual columns; we need to copy over those timestamps along with the column values. Let’s start by creating the custom query itself. We’re going to use the more low-level RDD interface, as only that interface supports specifying custom selections.

We can modify the colums included in the CQL projection by using the select method on the CassandraRDD. This is the definition for select:

We haven’t talked about ColumnRef yet – this is a data type describing column references in CQL projections. We can build the basic projection by using the ref field present on the ColumnDef:

Now, to include the TTL and WRITETIME timestamps of each column in the projection, we could do something like the following:

There’s one problem: we can’t retrieve these timestamps for columns that are part of the primary key for the table; so we need to only add the timestamp extraction for regular columns. The TableDef data type differentiates between the columns (allColumns is just a concatenation of all the ColumnDef instances), so we can write the projection as follows:

And this projection can be fed to the select method on the CassandraRDD:

What’s CassandraSQLRow, you ask? This is a data type, provided by the Cassandra connector, that implements the interface for Spark’s internal Row: an sequence of heterogenously-typed named fields that backs DataFrames. The rows from the Cassandra driver for Java are converted into this data type.

Finally, we need to take care of the DataFrame schema as well: it too must be modified to include the fields for the TTL and WRITETIME timestamps. This is done similarly by transforming the sequence of StructFields:

Note that each field is checked against the names of regular columns; only if it is indeed a non-key column, the schema is modified to include its timestamps.

Excellent! We now have an RDD that uses a custom projection to query the columns and their timestamps, and a DataFrame schema. We need to put them together and create the DataFrame; we don’t actually want to work with the RDD. To do so, we’ll use the createDataset method available on the SparkSession:

The RowEncoder is (yet another) data type that tells Spark’s Catalyst engine how to convert the RDD’s data to Spark’s internal binary representation.

Preserving ttls and writetimes for individual cells

Our next challenge is to figure out how to actually write different WRITETIME~/~TTL timestamps to each column of the transferred rows. See, the problem is that when issuing a CQL insert, you can only specify one timestamp that is applied to all non-key columns being written:

This statement would apply the 24 hour TTL value to both reg1 and reg2 values. The solution to this is to issue a separate CQL statement for each group of values within a row with the same TTL and WRITETIME. For example, assuming we need to copy a row that consists of the following values:

To copy such a row, we would need to issue the following CQL statements:

Note how the insertions of reg3 and reg4 ended up in the same CQL statement; this is because they have the same TTL and WRITETIME values, while the other values create unique combinations of TTL and WRITETIME. When Scylla processes these CQL statements, it’ll merge the values into the same row determined by the key.

The DataFrame that contains the rows must use a fixed schema; the CQL statements we’ve shown contain the columns of the primary key, and a subset of (or possibly the entire set of) regular columns. We’ll need to represent the fact that some columns are unset on the rows. To do so, we’ll use the CassandraOption[T] data type provided by the Cassandra connector.

This data type is similar to Scala’s built-in Option[T] data type, but contains another term in the sum type. Here’s the definition:

We can use this data type to represent the 3 states a column in a CQL DML statement can be: set with a value, unset, or set with null (which means it’ll be cleared in the row).

To implement this row splitting operation, we’ll use the flatMap function present on DataFrames, that allows the use of plain Scala functions to process the rows in the DataFrame. This is the definition of flatMap:

Recall that DataFrames are a type alias for Dataset[Row]. TraversableOnce is a trait from the Scala collections library representing collections that can be processed once (or more). We can use flatMap to transform the rows in the DataFrame to collections of elements for which a Spark encoder exists. The collections will be flattened into the resulting DataFrame.

So, we’ll write a function of the form Row => List[Row] that will expand each original row into rows that use the same TTL and WRITETIME values. Since access to rows is by index, we’ll first create maps that contain the indices for the primary keys and the regular columns.

This function will create the maps we need:

Note that the index map for the regular columns contains the index for the original column, the TTL column and the WRITETIME column. To actually perform the row splitting operation, we’ll use this function:

This one is slightly more involved, so let’s describe each of the transformations it performs. We start by checking if there are any regular columns; if there aren’t, and the row is composed entirely of primary key columns, we just return the row in a list.

When we do have regular columns, we first transform the index map to a list of column name, value, TTL value and WRITETIME value. Next, we group the fields by their TTL and WRITETIME values and discard the timestamps from the resulting map’s value type. Lastly, we construct a row from each field group by adding the primary key values, the regular column values wrapped in CassandraOption and finally adding the TTL and WRITETIME values.

To actually use these functions, we first need to use Spark’s broadcasting mechanism to send a copy of the field index maps to each executor. This is done as follows:

The broadcasting mechanism is an optimization for when the bodies of DataFrame transformation functions need to reference read-only values. To transform the DataFrame, we call flatMap as follows:

Finally, we need to tell the connector to save the DataFrame to Scylla. We’ll drop down again to the RDD API as that offers more control over how the CQL statements are constructed. We create a ColumnSelector value that describes which columns should be written; the columns are created from the original schema that we loaded, before adding the colName_ttl columns:

We also create a WriteConf value that describes how the connector should perform the writes. Critically, we tell the connector which column should be used for the TTL and WRITETIME values for each row:

And we pass all of this to the saveToCassandra method:

Whew! That’s the beefiest part for the Migrator.

Tracking token ranges that have been transferred

The next part we’ll discuss is how the Migrator keeps track of which token ranges have already been transferred. This functionality required some modification of the Cassandra connector to propagate this information to the Migrator, which is why the Migrator relies on a fork of the connector.

There are two sides to this feature: first, we must keep track of token ranges that have been transferred, and periodically write them to a savepoint file; second, when resuming a migration, we must use the savepoint file to skip token ranges that have already been transferred. Let’s start with the first part.

We’ve seen Spark’s broadcast variables in the previous section; these are immutable, read-only values that are sent to all executors from the driver. Spark also contains accumulators: variables that are readable and writable from both executors and the driver.

We’ve modified the connector to store transferred token ranges in an accumulator. This accumulator is then periodically read on the driver and its contents is stored in a savepoint file. To implement an accumulator, we can inherit from Spark’s AccumulatorV2 abstract class:

The contract is that the accumulator can read values of type IN, update an internal state in a thread-safe way, and output values of type OUT, which would typically be an aggregation of IN values. Our accumulator will set IN = Set[CqlTokenRange[_, _]] and OUT = Set[CqlTokenRange[_, _]].

These are the important parts of the implementation:

The core idea is that we use an AtomicReference to store the current set of transferred token ranges. AtomicReference is a mutable reference for storing immutable values. Scala’s Set is immutable, so it can be safely stored there. Whenever we need to add another set of token ranges that have been transferred, we use getAndUpdate to atomically update the set. To extract the set, we can use the get method on the reference.

To use the accumulator, we’ve modified the connector’s TableWriter class; specifically, when writing one of the RDD’s partitions, the writer tests if the partition is a CassandraPartition (this is true for the Migrator, but not always, and thus is not reflected in the types), and extracts its ranges:

We call this function in the writeInternal method and add the transferred ranges into the accumulator after writing them:

The full implementation can be seen here.

Before starting the DataFrame write, which blocks the calling thread until it completes, a scheduled thread is setup which reads the accumulator periodically and writes its contents to the savepoint file:

Now, the second part to this feature is actually skipping token ranges that have already been transferred. The connector operates dividing the token ranges between the executors, and then fetching rows corresponding to the token ranges. We can filter the token ranges by passing in a function of the form (Long, Long) => Boolean, which determines if a given range should be transferred.

This is the relevant implementation in CassandraTableScanRDD#compute:

These token ranges are then fetched as they were before our modifications. This is all that’s needed to skip the token ranges that were already transferred!


The source code for the Migrator can be found here, and the source code for the modified Cassandra connector here (see the latest commits to see the changes made). Some of the snippets shown here differ slightly from the code in Github, but the core principles remain the same.

The post Deep Dive into the Scylla Spark Migrator appeared first on ScyllaDB.

Virtual tables are coming in Cassandra 4.0

One of the exciting features coming in Cassandra 4.0 is the addition of Virtual Tables. They will expose elements like configuration settings, metrics, or running compactions through the CQL interface instead of JMX for more convenient access. This post explains what Virtual Tables are and walks through the various types that will be available in version 4.0.

Virtual Tables

The term “Virtual Tables” can be confusing, as a quick Google search may leave one under the impression that they are views that can be created through an DDL statement. In the context of Cassandra, however, Virtual Tables will be created and managed by Cassandra itself, with no possibility of creating custom ones through CQL.

They are not to be confused with Materialized Views either, which persist data from a base table into another table with a different primary key.

For Cassandra 4.0, virtual tables will be read only, trivially exposing data as CQL rows. Such data was (and will still be) accessible through JMX, which can be cumbersome to interact with and secure.

Two new keyspaces were added in Cassandra 4.0 to support Virtual Tables: system_views and system_virtual_schema.
The latter will contain schema information on the Virtual Tables, while the former will contain the actual tables.

cqlsh> select * from system_virtual_schema.tables;

 keyspace_name         | table_name    | comment
          system_views |        caches |                system caches
          system_views |       clients |  currently connected clients
          system_views |      settings |             current settings
          system_views | sstable_tasks |        current sstable tasks
          system_views |  thread_pools |                             
 system_virtual_schema |       columns |   virtual column definitions
 system_virtual_schema |     keyspaces | virtual keyspace definitions
 system_virtual_schema |        tables |    virtual table definitions

Neither of these keyspaces can be described through the DESCRIBE KEYSPACE command, so listing the rows in system_virtual_schema.tables is the only way to discover the Virtual Tables.

The tables themselves can be described as shown here:

cqlsh> describe table system_views.caches

CREATE TABLE system_views.caches (
    capacity_bytes bigint PRIMARY KEY,
    entry_count int,
    hit_count bigint,
    hit_ratio double,
    name text,
    recent_hit_rate_per_second bigint,
    recent_request_rate_per_second bigint,
    request_count bigint,
    size_bytes bigint
) WITH compaction = {'class': 'None'}
    AND compression = {};

Available Tables in 4.0

Since Apache Cassandra 4.0 was feature freezed in September 2018, we already have the definitive list of Virtual Tables that will land in that release.


The caches virtual table displays the list of caches involved in Cassandra’s read path. It contains all the necessary information to get an overview of their settings, usage, and efficiency:

cqlsh> select * from system_views.caches;

 name     | capacity_bytes | entry_count | hit_count | hit_ratio | recent_hit_rate_per_second | recent_request_rate_per_second | request_count | size_bytes
   chunks |       95420416 |          16 |       134 |  0.864516 |                          0 |                              0 |           155 |    1048576
 counters |       12582912 |           0 |         0 |       NaN |                          0 |                              0 |             0 |          0
     keys |       25165824 |          18 |        84 |  0.792453 |                          0 |                              0 |           106 |       1632
     rows |              0 |           0 |         0 |       NaN |                          0 |                              0 |             0 |          0

This information is currently available through the nodetool info command.


The clients virtual tables will list all connected clients, with information such as the number of issued requests or what username it is using:

cqlsh> select * from system_views.clients;

 address   | port  | connection_stage | driver_name | driver_version | hostname  | protocol_version | request_count | ssl_cipher_suite | ssl_enabled | ssl_protocol | username
-----------+-------+------------------+-------------+----------------+-----------+------------------+---------------+------------------+-------------+--------------+----------- | 61164 |            ready |        null |           null | localhost |                4 |           146 |             null |       False |         null | anonymous | 61165 |            ready |        null |           null | localhost |                4 |           155 |             null |       False |         null | anonymous 


The settings virtual table will list all configuration settings that are exposeable in the cassandra.yaml config file:

cqlsh> select * from system_views.settings limit 100;

@ Row 1
 name  | allocate_tokens_for_keyspace
 value | null

@ Row 2
 name  | audit_logging_options_audit_logs_dir
 value | /Users/adejanovski/.ccm/trunk/node1/logs/audit/

@ Row 3
 name  | audit_logging_options_enabled
 value | false

@ Row 4
 name  | audit_logging_options_excluded_categories
 value | 

@ Row 5
 name  | audit_logging_options_excluded_keyspaces
 value | 
@ Row 17
 name  | back_pressure_strategy
 value |{high_ratio=0.9, factor=5, flow=FAST}

@ Row 18
 name  | batch_size_fail_threshold_in_kb
 value | 50

@ Row 19
 name  | batch_size_warn_threshold_in_kb
 value | 5

@ Row 20
 name  | batchlog_replay_throttle_in_kb
 value | 1024

Here, I’ve truncated the output, as there 209 settings exposed currently. There are plans to make this table writeable so that some settings can be changed at runtime as can currently be done through JMX. Such changes, of course, would need to be persisted in cassandra.yaml to survive a restart of the Cassandra process.


The sstable_tasks virtual table will expose currently running operations on SSTables like compactions, upgradesstables, or cleanup. For example:

cqlsh> select * from system_views.sstable_tasks ;

 keyspace_name | table_name  | task_id                              | kind       | progress | total     | unit
    tlp_stress | sensor_data | f6506ec0-3064-11e9-95e2-b3ac36f635bf | compaction | 17422218 | 127732310 | bytes

These informations are currently available through the nodetool compactionstats command.


The thread_pools virtual table will display the metrics for each thread pool in Cassandra:

cqlsh> select * from system_views.thread_pools ;

 name                         | active_tasks | active_tasks_limit | blocked_tasks | blocked_tasks_all_time | completed_tasks | pending_tasks
             AntiEntropyStage |            0 |                  1 |             0 |                      0 |               0 |             0
         CacheCleanupExecutor |            0 |                  1 |             0 |                      0 |               0 |             0
           CompactionExecutor |            0 |                  2 |             0 |                      0 |            3121 |             0
         CounterMutationStage |            0 |                 32 |             0 |                      0 |               0 |             0
                  GossipStage |            0 |                  1 |             0 |                      0 |           17040 |             0
              HintsDispatcher |            0 |                  2 |             0 |                      0 |               0 |             0
        InternalResponseStage |            0 |                  8 |             0 |                      0 |               0 |             0
          MemtableFlushWriter |            0 |                  2 |             0 |                      0 |              20 |             0
            MemtablePostFlush |            0 |                  1 |             0 |                      0 |              21 |             0
        MemtableReclaimMemory |            0 |                  1 |             0 |                      0 |              20 |             0
               MigrationStage |            0 |                  1 |             0 |                      0 |               0 |             0
                    MiscStage |            0 |                  1 |             0 |                      0 |               0 |             0
                MutationStage |            0 |                 32 |             0 |                      0 |               8 |             0
    Native-Transport-Requests |            1 |                128 |             0 |                      0 |             717 |             0
       PendingRangeCalculator |            0 |                  1 |             0 |                      0 |               6 |             0
 PerDiskMemtableFlushWriter_0 |            0 |                  2 |             0 |                      0 |              20 |             0
              ReadRepairStage |            0 |                  8 |             0 |                      0 |               0 |             0
                    ReadStage |            0 |                 32 |             0 |                      0 |              22 |             0
                  Repair-Task |            0 |         2147483647 |             0 |                      0 |               0 |             0
         RequestResponseStage |            0 |                  8 |             0 |                      0 |              22 |             0
                      Sampler |            0 |                  1 |             0 |                      0 |               0 |             0
     SecondaryIndexManagement |            0 |                  1 |             0 |                      0 |               0 |             0
           ValidationExecutor |            0 |         2147483647 |             0 |                      0 |               0 |             0
            ViewBuildExecutor |            0 |                  1 |             0 |                      0 |               0 |             0
            ViewMutationStage |            0 |                 32 |             0 |                      0 |               0 |             0

This information is currently available through the nodetool tpstats command.


Virtual Tables, regardless of the type, contain data that is specific to each node. They are not replicated, have no associated SSTables, and querying them will return the values of the coordinator (the node that the driver chooses to coordinate the request). They will also ignore the consistency level of the queries they are sent.

When interacting with Virtual Tables through cqlsh, results will come from the node that cqlsh connected to:

cqlsh> consistency ALL
Consistency level set to ALL.
cqlsh> select * from system_views.caches;

 name     | capacity_bytes | entry_count | hit_count | hit_ratio | recent_hit_rate_per_second | recent_request_rate_per_second | request_count | size_bytes
   chunks |       95420416 |          16 |       134 |  0.864516 |                          0 |                              0 |           155 |    1048576
 counters |       12582912 |           0 |         0 |       NaN |                          0 |                              0 |             0 |          0
     keys |       25165824 |          18 |        84 |  0.792453 |                          0 |                              0 |           106 |       1632
     rows |              0 |           0 |         0 |       NaN |                          0 |                              0 |             0 |          0

(4 rows)

Tracing session: 06cb2100-3060-11e9-95e2-b3ac36f635bf

 activity                                                                 | timestamp                  | source    | source_elapsed | client
                                                       Execute CQL3 query | 2019-02-14 14:54:20.048000 | |              0 |
 Parsing select * from system_views.caches; [Native-Transport-Requests-1] | 2019-02-14 14:54:20.049000 | |            390 |
                        Preparing statement [Native-Transport-Requests-1] | 2019-02-14 14:54:20.049000 | |            663 |
                                                         Request complete | 2019-02-14 14:54:20.049424 | |           1424 |

When interacting through the driver, there is no simple way of selecting a single node as coordinator. The load balancing policy is responsible for this and it is set on the Cluster object, not on a per query basis.
For the Datastax Java Driver, a new feature was introduced to support selecting a specific node to ease up Virtual Tables access through JAVA-1917. It adds a setNode(Node node) method to the Statement class in order to forcefully designate the node responsible for the query, and “voilà”.
For the record, the same feature was added to the Python driver.

Beyond Apache Cassandra 4.0

The data that is currently missing from Virtual Tables are global and table level metrics such as latencies and throughputs (Cassandra exposes A LOT of table specific metrics beyond those two).
Rest assured that these are being worked on from two different approaches in CASSANDRA-14670 and CASSANDRA-14572, which were not ready in time for the feature freeze.

It will probably take some time for Virtual Tables to match the amount of data available through JMX but we are confident it will catch up eventually.
Convenient and secure CQL access to runtime metrics in Apache Cassandra will tremendously ease building tools like Reaper which currently rely on JMX.

Scylla and Elasticsearch, Part Two: Practical Examples to Support Full-Text Search Workloads

Scylla and Elasticsearch

We covered the basics of Elasticsearch and how Scylla is a perfect complement for it in part one of this blog. Today we want to give you specific how-tos on connecting Scylla and Elasticsearch, including use cases and sample code.

Use Case #1

If combining a persistent, highly available datastore with full text search engine is a market requirement, then implementing a single, integrated solution is an ultimate goal that requires time and resources. To answer this challenge we describe below a way for users to use best-of-breed solutions that support full text search workloads. We chose Elasticsearch open source together with Scylla open source to showcase the solution.

In this use case we start with a fresh clean setup, meaning that you need to ingest your data into both Scylla and Elasticsearch using dual writes and then perform the textual search.

The following example creates an apparel catalog consisting of 160 items, stored on Scylla, and searchable using Elasticsearch (ES). The catalog.csv file comprised of 7 columns: Brand, Group, Sub_Group, Color, Size, Gender and SKU – Let’s describe them.

  • Brand: companies who manufacture the clothing items (5 options)
  • Group: clothing type (4 options)
  • Sub_Group: an attribute that correlates to the clothing group type (9 options)
  • Color: pretty much self explanatory (7 options) – very common query filter
  • Size: ranging from Small to XLarge (4 options) – very common query filter
  • Gender: People like to see the results for the relevant gender group (4 options) – very common query filter
  • SKU: a unique product ID, usually made of the other product attributes initials
Brand Group Sub_Group Color Size Gender
North Face Shirts short sleeve black Small men
Patagonia Pants long sleeve red Medium women
Mammut Shoes gore-tex grey Large boys
Garmont Jackets leather green XLarge girls
Columbia   softshell yellow    
    hiking blue    
    jeans white    
We will be using two python scripts (see Appendix-B) to demonstrate the integration of Scylla with Elasticsearch.
  • Dual writes using the insert_data script for data ingestion, in our case an apparel catalog csv file.
  • Textual search using the query_data script, which is basically a 2-hop query that will retrieve the unique product_id (SKU) from Elasticsearch and then use the retrieved SKU to query other product attributes from Scylla.
Scylla and Elasticsearch Block Diagram


  • python installed
  • pip installed
  • Java 8 installed
  • Scylla cluster installed (see here)
  • Node for Elasticsearch and python scripts (can be separate nodes)


  1. Install the python drivers on the node to be used for the scripts

$ sudo pip install cassandra-driver $ sudo pip install elasticsearch

  1. Install Elasticsearch (see here)

$ sudo apt-get update $ curl -L -O $ sudo dpkg -i elasticsearch-6.2.3.deb

  1. Start Elasticsearch, verify status and health state

$ sudo /etc/init.d/elasticsearch start [ ok ] Starting elasticsearch (via systemctl): elasticsearch.service. curl

{ “cluster_name” : “elasticsearch”, “status” : “green“, “timed_out” : false, “number_of_nodes” : 1, “number_of_data_nodes” : 1, “active_primary_shards” : 0, “active_shards” : 0, “relocating_shards” : 0, “initializing_shards” : 0, “unassigned_shards” : 0, “delayed_unassigned_shards” : 0, “number_of_pending_tasks” : 0, “number_of_in_flight_fetch” : 0, “task_max_waiting_in_queue_millis” : 0, “active_shards_percent_as_number” : 100.0 }

  1. Copy the following files to the location from which you will run them, and make them executable
    • catalog.csv (the SKU is the unique product_id, made of other product attributes initials)
  1. Run the insert_data script. The script will perform the following:
    • Create the Schema on Scylla (see Appendix-A)
    • Create the Elasticsearch index (see Appendix-A)
    • Dual write: insert the catalog items (csv file) to both DBs (using prepared statement for Scylla)
    Use the -s / -e flags to insert a comma-separated list of IPs for the Scylla and /or Elasticsearch (ES) nodes. If you are running Elasticsearch (ES) on the same node as the python scripts, no need to enter IP, will be used as the default. $ python -h usage: [-h] [-s SCYLLA_IP] [-e ES_IP] optional arguments: -h, --help show this help message and exit -s SCYLLA_IP -e ES_IP
  1. Once the “insert_data” script completes, you will find 160 entries in both Scylla and Elasticsearch
Elasticsearch Scylla
$ curl
  "count" : 160,
  "_shards" : {
  "total" : 5,
  "successful" : 5,
  "skipped" : 0,
  "failed" : 0
cqlsh> SELECT COUNT (*) FROM catalog.apparel ;


(1 rows)
    1. Run the query_data script. The script will perform the following:
      • It will execute a textual search in Elasticsearch per the flag you provide, either searching for a single word (single filter) OR searching for multiple words (multiple filters), OR without any filter, which is basically a “match_all” query.

      • It will then use the SKU value retrieved from the textual search to query Scylla, while using prepared statements.As mentioned, there are 3 query types, use the -n flag to select the query type. Optional values are:

        • “single”: using a single filter (by group) to query for “pants”
        • “multiple” (default): using multiple filters (by color AND sub_group) to query for “white softshell”
        • “none”: query without any filter = “match_all”

        Use the -s / -e flags to insert a comma-separated list of IPs for the Scylla and /or Elasticsearch (ES) nodes. If you are running Elasticsearch (ES) on the same node as the python scripts, no need to enter IP, will be used as the default. Note: Elasticsearch returns only the 1st 10 results by default. To overcome this we set the size limit to 1000 results. When retrieving a large set of results, we recommend using pagination (read more here: elasticsearch-py helpers).

        $ python -h
        usage: [-h] [-s SCYLLA_IP] [-e ES_IP] [-n NUM_FILTERS]

        optional arguments:
          -h, –help show this help message and exit
          -s SCYLLA_IP
          -e ES_IP
          -n NUM_FILTERS

    2. To delete Elasticsearch index and the keyspace in Scylla run the following commands

      • Elasticsearch: $ curl -X DELETE ""
      • Scylla: cqlsh> DROP KEYSPACE catalog ;

Use Case #2

In this use case we assume you already have your data in Scylla and want to import it into Elasticsearch, to be indexed for textual search purposes. To accomplish this ,the first thing you will need to do is export the relevant table and its content from Scylla into a .csv data file, this can be accomplished by using the cqlsh COPY TO command.

The second thing to do is export the table schema into a .cql schema file, a file for each table separately. This can be accomplished by running the following command cqlsh [IP] "-e DESCRIBE TABLE [table name] > [name].cql

Once you have your .csv and .cql files ready, you just need to have an Elasticsearch node installed and you’re good to go.

The following script (see Appendix-C) will use the.cql schema file and .csv data file as inputs to create an index in Elasticsearch (ES) and insert the data.

The ES index name will be created based on the .csv file name

The index _id field (index partition key) is based on the PRIMARY KEY taken from the .cql schema (simple/composite/compound).

The index _type field will represent the partition key (PK), in case of a compound key it will use `-` to concatenate the column names.

The script will print progress for every 1000 rows processed and total rows processed in its output.


  • python installed
  • pip installed
  • Java 8 installed
  • Scylla cluster installed (see here)
  • Node for Elasticsearch and python scripts (can be separate nodes)


  1. Install the python drivers on the node to be used for the scripts
    $ sudo pip install cassandra-driver
    $ sudo pip install elasticsearch
  2. Install Elasticsearch (see here)
    $ sudo apt-get update
    $ curl -L -O
    $ sudo dpkg -i elasticsearch-6.2.3.deb

  3. Start Elasticsearch, verify status and health state$ sudo /etc/init.d/elasticsearch start
    [ ok ] Starting elasticsearch (via systemctl): elasticsearch.service.curl


    "cluster_name" : "elasticsearch",
    "status" : "green",
    "timed_out" : false,
    "number_of_nodes" : 1,
    "number_of_data_nodes" : 1,
    "active_primary_shards" : 0,
    "active_shards" : 0,
    "relocating_shards" : 0,
    "initializing_shards" : 0,
    "unassigned_shards" : 0,
    "delayed_unassigned_shards" : 0,
    "number_of_pending_tasks" : 0,
    "number_of_in_flight_fetch" : 0,
    "task_max_waiting_in_queue_millis" : 0,
    "active_shards_percent_as_number" : 100.0

  4. Copy the python file to the location from which you will run it, and make it executable. Place your .csv and .cql files in an accessible location (can be same dir as the python script)

  5. Run the script (see below usage, important details and examples)
    • Usage
      $ python -h
      usage: [-h] [-e ES_IP] [-c CSV_FILE_NAME]
                                          [-s CQL_SCHEMA_FILE_NAME]
                                          [-i IGNORE_CQL_SCHEMA]
      optional arguments:
        -h, --help show this help message and exit
        -e ES_IP
        -c CSV_FILE_NAME
    • Important Details
      • Use -e flag to insert a comma-separated list of IPs for Elasticsearch (ES) nodes. If ES is running locally, no need for this flag, default will be used
      • -i ignore_cql_schema -> default: True. Meaning it will use the 1st column from the .csv file for ES index _id field. If you have a compound PK use -i no so not to ignore the .cql schema
      • -c csv_file_name -> requires full path to file. Needs to be in the format as described in the prerequisites
      • -s cql_schema_file name -> requires full path to file. Checking schema for compound PK, if did not find it checking for simple PK
      • If .cql file is not provided (but .csv file was provided), script will fall back to ignoring .cql schema and use the 1st column from the .csv file for ES index _id field
      • If both .cql + .csv files are not provided, error is printed and script exists.
    • Output Example Using Compound PK
      ubuntu@ip-172-16-0-124:~/scylla_elastic$ python -c
      ./cp_prod.product_all.csv -s
      ./cp_prod_product_all.cql -i no

      ## Check schema (./cp_prod_product_all.cql) for compound primary key to be used as index id
      ## Did not find a compound primary key, checking for regular primary key to be used as index id
      ## Connecting to ES -> Creating 'cp_prod.product_all' index, if not exist
      ## Write csv file (./cp_prod.product_all.csv) content into Elasticsearch
      ## Update every 1000 rows processed ##
      Rows processed: 1000
      Rows processed: 2000
      Rows processed: 3000
      Rows processed: 4000
      Rows processed: 5000
      Rows processed: 6000
      Rows processed: 7000
      Rows processed: 8000
      Rows processed: 9000
      ## After all inserts, refresh index (just in case)
      ### Total Rows Processed: 9715 ###

Next Steps

We’ve given the case for using Scylla and Elasticsearch together. And above we’ve shown you step-by-step model for how to implement it. The next step is up to you! Download Scylla and Elasticsearch and try it out yourself.

If you do, we’d love to hear your feedback and experience. Either by joining our Slack channel, or dropping us a line.

Appendix A (Schema and Index for Use Case #1)

Scylla schema Elasticsearch Index

Appendix B (Python Code for Use Case #1)

Insert_Data Query_Data

Appendix C (Python Code for Use Case #2)

The post Scylla and Elasticsearch, Part Two: Practical Examples to Support Full-Text Search Workloads appeared first on ScyllaDB.