Hooking up Spark and Scylla: Part 2

Spark Scylla

Welcome to the second installation of the Spark and Scylla series. As you might recall, this series will revolve around the integration of Spark and Scylla, covering the architectures of the two products, strategies to transfer data between them, optimizations, and operational best practices.

Last time, we surveyed Spark’s RDD abstraction and the DataStax Spark connector. In this post, we will delve more deeply into the way data transformations are executed by Spark, and then move on to the higher-level SQL and DataFrame interfaces provided by Spark.

The code samples repository contains a folder with a docker-compose.yaml for this post. So go ahead and start all the services:

docker-compose up -d

The repository also contains a CQL file that’ll create a table in Scylla that we will use later in the post. You can execute it using:

docker-compose exec scylladb-node1 cqlsh -f /stocks.cql

Once that is done, launch the Spark shell as we’ve done in the previous post:

Spark’s Execution Model

Let’s find out more about how Spark executes our transformations. As you recall, we’ve introduced the RDD – the resilient, distributed dataset – in the previous post. An instance of RDD[T] is an immutable collection of elements of type T, distributed across several Spark workers. We can apply different transformations to RDDs to produce new RDDs.

For example, here’s an RDD of Person elements from which the ages are extracted and filtered:

If you paste that snippet into the REPL, you’ll note that the runtime type of ages is a MapPartitionsRDD. Here’s a slightly simplified definition of it:

Every transformation that we apply to an RDD – be it map, filter, join and so forth – results in a specialized subtype of RDD. Every one of those subtypes carries a reference to the previous RDD, and a function from an Iterator[T] to an Iterator[U].

The DAG that we described in the previous post is in fact reified in this definition; the previous reference is the edge in the graph, and the transform function is the vertex. This function, practically, is how we transform a partition from the previous RDD to one on the current RDD.

 Spark and Scylla

Now, let’s launch the Spark application UI; it is available at http://localhost:4040/. This interface is essential to inspecting what our Spark application is doing, and we’ll get to know some of its important features now.

This is how things are looking after pasting the previous snippet into the REPL:

Rather empty. This is because nothing’s currently running, and nothing, in fact, has run so far. I remind you that Spark’s RDD interface is lazy: nothing runs until we run an action such as reduce, count, etc. So let’s run an action!

ages.count

After the count executes, we can refresh the application UI and see an entry in the Completed Jobs table:

This would be a good time to say that in Spark, a job corresponds to an execution of a data transformation – or, a full evaluation of the DAG corresponding to an RDD. A job ends with values outside of the RDD abstraction; in our case, a Long representing the count of the RDD.

Clicking on the description of the job, we can see a view with more details on the execution:

The DAG visualization shows what stages the job consisted of, and the table contains more details on the stages. If you click the +details label, you can see a stack trace of the action that caused the job to execute. The weird names are due to the Scala REPL, but rest assured you’d get meaningful details in an actual application.

We’re not ready yet to define what exactly a stage is, but let’s drill down into the DAG visualization. Clicking on it, we can see a view with more details on the stage:

The DAG visualization in this view shows the runtime type of every intermediate RDD produced by the stage and the line at which it was defined. Apart from that, there’s yet another term on this page – a task. A task represents the execution of the transformations of all the nodes in the DAG on one of the partitions of the RDD.

Concretely, can be thought of as the composition of all the transform: Iterator[T] => Iterator[U] functions we saw before.

You’ll note that the Tasks table at the bottom lists two tasks. This is due to the RDD consisting of two partitions; generally, every transformation will result in a task for every partition in the RDD. These tasks are executed on the executors of the cluster. Every task runs in a single thread and can run in parallel with other tasks, given enough CPU cores on the executor.

So, to recap, we have:

  • jobs, which represent a full data transformation execution triggered by an action;
  • stages, which we have not defined yet;
  • tasks, each of which represents the execution of a transform: Iterator[T] => Iterator[U] function on a partition of the RDD.

To demonstrate what stages are, we’ll need a more complex example:

Here, we create two RDDs representing the employee and department tables (in tribute to the venerable SCOTT schema); we group the employees by department ID, join them to the department RDD, sum the salaries in each department and collect the results into an array.

collect is an action, so by executing it, we have initiated a job. You should be able to locate it in the Jobs tab in the UI, and click its description; the DAG on the detail page should look similar to this:

This is much more interesting! We now have 3 stages instead of a single stage, with the two smaller ones funneling into the larger one. Stage 7 is the creation and grouping of the emp data, stage 8 is the creation of the dept data, and they both funnel into stage 9 which is the join and mapping of the two RDDs.

The horizontal order of the stages and their numbering might show up differently on your setup, but that does not affect the actual execution.

So, why is our job divided into three stages this time? The answer lies with the type of transformations that comprise the job.

If you recall, we discussed narrow and wide transformations in the previous post. Narrow transformations – such as map and filter – do not move rows between RDD partitions. Wide transformations, such as groupBy require rows to be moved. Stages are in fact bounded by wide transformations (or actions): a stage groups transformations that can be executed on the same worker without data shuffling.

A shuffle is the process in which rows are exchanged between the RDD partitions and consequently between workers. Note what happens in this visualization of the groupBy transformation:

The elements for the resulting key were scattered across several partitions and now need to be shuffled into one partition. The stage boundary is often also called the shuffle boundary; shuffling is the process in Spark in which elements are transferred between partitions.

To complete the flow, here’s what happens after the groupBy stages; the join operation is a wide transformation as well, as it has to move data from the dept partition into the other partitions (this is a simplification, of course, as joining is a complicated subject) while the mapValues transformation is a narrow transformation:

Now that we know a bit more about the actual execution of our Spark jobs, we can further examine the architecture of the DataStax connector to see how it interacts with Scylla.

The DataStax Connector: A Deeper Look

As we’ve discussed in the previous post, rows in Scylla tables are distributed in the cluster according to their partition keys. To describe this process briefly, every Scylla cluster contains a collection of number ranges that form a token ring. Every range is owned by one (or more, depending on the replication factor) of the cluster nodes.

When inserting a row to Scylla, the partition key is hashed to derive its token. Using this token, the row can be routed and stored in the right node. Furthermore, when processing table read requests that span several token ranges, Scylla can serve the data from multiple nodes.

In a way, this is similar to Spark’s concept of tasks and partitions; as Scylla tables are comprised of token ranges, RDDs are also comprised of partitions. Spark tasks process RDD partitions in parallel, while Scylla can process token ranges in parallel (assuming the relevant ranges are stored on different nodes).

Now, if we’re reading data from Scylla into RDD partitions and processing it in Spark tasks, it would be beneficial to have some alignment between the Scylla token ranges and the RDD partitions. Luckily for us, this is exactly how the DataStax connector is designed.

The logic for creating the RDD partitions is part of the RDD interface:

The RDD created by sc.cassandraTable contains the logic for assigning multiple token ranges to each partition – this is performed by the CassandraPartitionGenerator class.

First, the connector will probe Scylla’s internal system.size_estimates table to estimate the size, in bytes, of the table. This size is then divided by the split_size_in_mb connector parameter (discussed further below); the result will be the number of partitions comprising the RDD.

Then, the connector will split the token ranges into groups of ranges. These groups will end up as the RDD partitions. The logic for converting the groups into partitions is in TokenRangeClusterer, if you’re interested; the gist is that every group will be an equal portion of the token ring and that every group can be fetched entirely from a single node.

After the token range groups are generated, they can be converted to collections of CQL WHERE fragments that will select the rows associated with each range; here’s a simplified version of the fragment generated:

WHERE token(key) > rangeStart AND token(key) <= rangeEnd

The CqlTokenRange class that is stored on the partition reference on the RDD handles the fragment generation. The token function is a built-in CQL function that computes the token for a given key value; essentially, it hashes the value using the configured Scylla partitioner. You can read more about this approach to full table scans in this article.

When the stage tasks are executed by the individual executors, the CQL queries are executed against Scylla with the fragments appended to them. Knowing how this works can be beneficial when tuning the performance of your Spark jobs. In a later post in this series, we will show how to go through the process of tuning a misbehaving job.

The split_size_in_mb parameter we mentioned earlier controls the target size of each RDD partition. It can be configured through Spark’s configuration mechanism, using the --conf command line parameter to spark-shell:

Data and Closures

We’ve covered a fair bit about how Spark executes our transformations but glossed over two fairly important points. The first point is the way data in RDD partitions is stored on executors.

RDD Storage Format

For the initial task that reads from Scylla, there’s not much mystery here: the executors run the code for fetching the data using the DataStax driver and use the case class deserialization mechanism we showed in the last post to convert the rows to case classes.

However, later tasks that run wide transformations will cause the data to be shuffled to other executors. Moving instances of case classes over the wire to other machines doesn’t happen magically; some sort of de/serialization mechanism must be involved here.

By default, Spark uses the standard Java serialization format for shuffling data. If this makes you shudder, it should! We all know how slow Java serialization is. As a workaround, Spark supports using Kryo as a serialization format.

Apart from using Java serialization, there are other problems with naively storing objects in the executor’s memory. Consider the following data type:

Stored as a Java object, every StockEntry instance would take up:

  • 12 bytes for the object header
  • 4 bytes for the String reference
  • 8 bytes for the integers

However, we also need to take into account the String itself:

  • Another 12 bytes for the header
  • 4 bytes for the char[] reference;
  • 4 bytes for the computed hashcode
  • 4 bytes for word boundary alignment

So that’s 48 bytes, not including the symbol string itself, for something that could theoretically be packed into 12 bytes (assuming 4 bytes for the symbol characters, ASCII encoded).

Apart from the object overhead, the data is also stored in row-major order; most analytical aggregations are performed in a columnar fashion, which means that we’re reading lots of irrelevant data just to skip it.

Closures

The second point we’ve glossed over is what Spark does with the bodies of the transformations. Or, as they are most commonly called, the closures.

How do those closures actually get from the driver to the executors? To avoid getting into too many details about how function bodies are encoded by Scala on the JVM, we’ll suffice in saying that the bodies are actually classes that can be serialized and deserialized. The executors actually have the class definitions of our application, so they can deserialize those closures.

There are some messy details here: we can reference outside variables from within our closure (which is why it is called a closure); do they travel with the closure body? What happens if we mutate them? It is best not to dwell on these issues and just avoid side-effects in the transformations altogether.

Lastly, working with closures forces us to miss out on important optimization opportunities. Consider this RDD transformation on the StockEntry case class, backed by a table in Scylla:

The result of this transformation is a map with the number of occurrences for each symbol. Note that we need, in fact, only the symbol column from the Scylla table. The query generated by the connector, however, tells a different story:

Despite our transformation only using symbol, all columns were fetched from Scylla. The reason being that Spark treats the function closures as opaque chunks of code; no attempt is done to analyze them, as that would require a bytecode analysis mechanism that would only be heuristic at best (see SPARK-14083 for an attempt this).

We could work around this particular problem, were it to severely affect performance, by using the select method on the CassandraTableScanRDD, and hinting to the connector which columns should be fetched:

This might be feasible in a small and artificial snippet such as the one above, but harder to generalize to larger codebases. Note that we also cannot use our StockEntry case class anymore, as we are forcing the connector to only fetch the symbol column.

To summarize, the biggest issue here is that Spark cannot “see” through our closures; as mentioned, they are treated as opaque chunks of bytecode that are executed as-is on the executors. The smartest thing that Spark can do to optimize this execution is to schedule tasks for narrow transformations on the same host.

These issues are all (mostly) solved by Spark SQL and the Dataset API.

Spark SQL and the Dataset API

Spark SQL is a separate module that provides a higher-level interface over the RDD API. The core abstraction is a Dataset[T] – again, a partitioned, distributed collection of elements of type T. However, the Dataset also includes important schema information and a domain-specific language that uses this information to run transformations in a highly optimized fashion.

Spark SQL also includes two important components that are used under the hood:

  • Tungesten, an optimized storage engine that stores elements in an efficiently packed, cache-friendly binary format in memory on the Spark executors
  • Catalyst, a query optimization engine that works on the queries produced by the Dataset API

We’ll start off by constructing a Dataset for our StockEntry data type backed by a Scylla table:

First, note that we are using the spark object to create the Dataset. This is an instance of SparkSession; it serves the same purpose as sc: SparkContext, but provides facilities for the Dataset API.

The cassandraFormat call will return an instance of DataFrameReader; calling load on it will return an instance of DataFrame. The definition of DataFrame is as follows:

type DataFrame = Dataset[Row]

Where Row is an untyped sequence of data.

Now, when we called load, Spark also inferred the Dataset’s schema by probing the table through the connector; we can see the schema by calling printSchema:

Spark SQL contains a fully fledged schema representation that can be used to model primitive types and complex types, quite similarly to CQL.

Let’s take a brief tour through the Dataset API. It is designed to be quite similar to SQL, so projection on a Dataset can be done using select:

Note how Spark keeps track of the schema changes between projections.

Since we’re now using a domain-specific language for projecting on Datasets, rather than fully fledged closures using map, we need domain-specific constructs for modeling expressions. Spark SQL provides these under a package; I recommend importing them with a qualifier to avoid namespace pollution:

The col function creates a column reference to a column named “open”. This column reference can be passed to any Dataset API that deals with columns; it also has functions named +, * and so forth for easily writing numeric expressions that involve columns. Column references are plain values, so you could also use a function to create more complex expressions:

If you get tired of writing f.col, know that you can also write it as $"col". We’ll use this form from now on.

Note that column references are untyped; you could apply a column expression to a dataset with mismatched types and get back non-sensical results:

Another pitfall is that using strings for column references denies us the compile-time safety we’ve had before; we’re awarded with fancy stack traces if we manage to mistype a column name:

The Dataset API contains pretty much everything you need to express complex transformations. The Spark SQL module, as it name hints, also includes SQL functionality; here’s a brief demonstration:

We can register Datasets as tables and then query them using SQL.

Check out the docs to learn more about the Dataset API; it provides aggregations, windowing functions, joins and more. You can also store an actual case class in the Dataset by using the as function, and still use function closures with map, flatMap and so on to process the data.

Let’s see now what benefits we reap from using the Dataset API.

Dataset API Benefits

Like every good database, Spark offers methods to inspect its query execution plans. We can see such a plan by using the explain function:

We get back a wealth of information that describes the execution plan as it goes through a series of transformations. The 4 plans describe the same plan at 4 phases:

  • Parsed plan – the plan as it was described by the transformations we wrote, before any validations are performed (e.g. existence of columns, etc)
  • Analyzed plan – the plan after it was analyzed against the schemas of the relations involved
  • Optimized plan – the plan after optimizations were applied to it – we’ll see a few examples of these in a moment
  • Physical plan – the plan after being translated to actual operations that need to be executed.

So far, there’s not much difference between the phases; we’re just reading the data from Scylla. Note that the type of the relation is a CassandraSourceRelation – a specialized relation that can interact with Scylla and extract schema information from it.

In the physical plan’s Scan operator, note that the relation lists all columns that appear in the table in Scylla; this means that all of them will be fetched. Let’s see what happens when we project the Dataset:

Much more interesting. The parsed logical plan now denotes symbol as an unresolved column reference; its type is inferred only after the analysis phase. Spark is many times referred to as a compiler, and these phases demonstrate how true this comparison is.

The most interesting part is on the physical plan: Spark has inferred that no columns are needed apart from the symbol column and adjusted the scan. This will cause the actual CQL query to only select the symbol column.

This doesn’t happen only with explicit select calls; if we run the same aggregated count from before, we see the same thing happening:

Spark figured out that we’re only using the symbol column. If we use max($"open") instead of count, we see that Spark also fetches the open column:

Being able to do these sort of optimizations is a very cool property of Spark’s Catalyst engine. It can infer the exact projection by itself, whereas when working with RDDs, we had to specify the projection hints explicitly.

As expected, this also extends to filters; we’ve shown in the last post how we can manually add a predicate to the WHERE clause using the where method. Let’s see what happens when we add a filter on the day column:

The PushedFilters section in the physical plan denotes which filters Spark tried to push down to the data source. Although it lists them all, only those that are denoted with a star (e.g. *LessThan(day,2010-02-01)) will actually be executed by Scylla.

This is highly dependent on the integration; the DataStax connector contains the logic for determining whether a filter would be pushed down or not. For example, if we add an additional filter on open, it would not be pushed down as it is not a part of the table’s primary key (and there is no secondary index on it):

The logic for determining which filters are pushed down to Scylla resides in the BasicCassandraPredicatePushDown class. It is well documented, and if you’re wondering why your predicate isn’t getting pushed down to Scylla, that would be a good place to start your investigation; in particular, the predicatesToPushdown member contains a set of all predicates determined to be legal to be executed by Scylla.

The column pruning optimization we discussed is part of a larger set of optimizations that are part of the Catalyst query optimization engine. For example, Spark will merge adjacent select operations into one operation; it will simplify boolean expressions (e.g. !(a > b) => a <= b), and so forth. You can see the entire list in org.apache.spark.sql.catalyst.optimizer.Optimizer, in the defaultBatches function.

Summary

In this post, we’ve discussed, in depth, how Spark physically executes our transformations, using tasks, stages, and jobs. We’ve also seen what problems arise from using the (relatively) crude RDD API. Finally, we’ve demonstrated basic usage of the Dataframe API with Scylla, along with the benefits we reap from using this API.

In the next post, we’ll turn to something we’ve not discussed yet: saving data back to Scylla. We’ll use this opportunity to also discuss the Spark Streaming API. Stay tuned!

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Hooking up Spark and Scylla: Part 2 appeared first on ScyllaDB.

Update released for Instaclustr sstable analysis tools for Apache Cassandra

Instaclustr is pleased to announce the latest update for it’s open-sourced sstable analysis tools for Apache Cassandra.

These tools, first released in February 2017, help operators to gain an accurate picture of the on-disk data stored by Cassandra which can be invaluable in diagnose and resolving operational issues.

A full run-through of the existing functionality of the tools can be found in this blog post.

This latest release, available in source from Instaclustr Github or in compiled download from our support page,

  • Improved support of TWCS – sorting by maximum timestamp in the sstable listing
  • For Apache Cassandra 3.x, ic-cfstats reports about rows including:
    • Total number of rows
    • Total number of row deletions
    • A row histogram giving number of rows per partition
    • Largest partitions/widest partitions report includes number of rows and how many row deletions
    • Added a Most Deleted Rows section reporting partitions with most row deletions
    • Tombstone Leaders and SSTable Leaders reports number of rows
  • ic-cfstats and ic-pstats includes histogram of partition sizes and sstables/partition in the summary

Instaclustr uses these tools to help support customers on our Apache Cassandra Managed Service and provides support for the use of the tools for our Apache Cassandra Enterprise Support customers.

The post Update released for Instaclustr sstable analysis tools for Apache Cassandra appeared first on Instaclustr.

Upcoming Enhancements to Scylla’s Filtering Implementation

Filtering Implementation

 

The upcoming Scylla 2.4 release will come with enhanced filtering support. One of the reasons we’re making this enhancement is due to the Spark-Cassandra connector’s reliance on ALLOW FILTERING support when generating CQL queries. The first part of this post provides a quick overview of what filtering is and why it can be useful. Then we will discuss why it can hurt performance and recommended alternatives. Finally, we’ll cover the caveats of Scylla’s filtering implementation.

ALLOW FILTERING Keyword

Queries that may potentially hurt a Scylla cluster’s performance are, by default, not allowed to be executed. These queries include those that restrict:

  • Non-key fields (e.g. WHERE v = 1)
  • Parts of primary keys that are not a prefixes (e.g. WHERE pk = 1 and c2 = 3)
  • Partition keys with something else other than an equality relation (e.g. WHERE pk >= 1)
  • Clustering keys with a range restriction and then by other conditions (e.g. WHERE pk =1 and c1 > 2 and c2 = 3)

Scylla is expected to be compatible with Cassandra in qualifying queries for filtering.

ALLOW FILTERING is a CQL keyword that can override this rule, but for performance reasons, please use it with caution. Let’s take a look at an example scenario – a database designed for a restaurant that wants to keep all of their menu entries in one place.

You can use the following code snippets to build a sample restaurant menu. This example will serve as a reference in the proceeding sections.

Now, with the test table initialized, let’s see which SELECT statements are potential filtering candidates. Queries based on primary key prefixes will work fine and filtering is not needed:

SELECT * FROM menu WHERE category = 'starters' and position = 3;

SELECT * FROM menu WHERE category = 'soups';

Now let’s take a look at queries below.

For an affordable meal:

SELECT * FROM menu WHERE price <= 10. ALLOW FILTERING;

For one specific dish:

SELECT * FROM menu WHERE name = 'sour rye soup' ALLOW FILTERING;

For all dishes that are listed first, but with a very specific price:

SELECT * FROM menu WHERE position = 1 and price = 10.5 ALLOW FILTERING;

For cheap starters:

SELECT * FROM menu WHERE category = 'starters' and price <= 10 ALLOW FILTERING;

Trying the queries above will result in an error message:

“Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING.”

This error occurs because either the non-key fields were restricted or a non-prefix part of the primary key was used in the statement. Some readers may spot the capitalized ALLOW FILTERING part of the error message and deduce that it’s a great solution to append it to the query. But those who are really reading the error message will notice that the keywords are not uppercase at all – these keywords are performance unpredictability.

Filtering is performed only after all potentially matching rows are fetched. So, if a table contains 10 million rows and all of them are potential candidates, they will all be fetched. If, however, filtering rules are applied and only 9 million rows fit the filter, 10% of the total table rows will be ignored. If the filtering restrictions are very selective and only a single row matches, 9999999 rows were read in vain. And if several partitions were queried, that makes it even worse – from the example queries above, only the last one (cheap starters) restricted the partition key, which makes it a little more efficient. All the other ones involve fetching all of the partitions, which is very likely to be slow and create an unnecessary load for the whole cluster. That said, low selectivity queries can benefit from filtering compared to secondary index-based search (more on this topic below). A Sequential scan of all of the values is faster than a huge set of random index-based seeks.

Filtering can be allowed by simply appending the ALLOW FILTERING keyword to queries:

SELECT * FROM menu WHERE price <= 10. ALLOW FILTERING;

SELECT * FROM menu WHERE name = 'sour rye soup' ALLOW FILTERING;

SELECT * FROM menu WHERE position = 1 and price = 10.5 ALLOW FILTERING;

SELECT * FROM menu WHERE category = 'starters' and price <= 10 ALLOW FILTERING;

Alternatives

Simply appending ALLOW FILTERING to queries should never be treated as a “rule of thumb” solution. It might be the best option for low selectivity queries, but it also might hurt performance cluster-wide if used incorrectly. The alternative ways described below should also be considered each time ALLOW FILTERING is discussed.

Schema Change

The first obvious thing to consider after seeing the “ALLOW FILTERING” error message is to change the data model. Let’s take a look at one of the example queries:

SELECT * FROM menu WHERE name = 'sour rye soup';

If the majority of queries involve looking up the name field, maybe it should belong to the key? With the table schema changed to:

CREATE TABLE menu (name text, category text, position int, price float, PRIMARY KEY(name));

queries that use the name field will not require filtering anymore.

Changing table schemas is usually not easy and sometimes it makes no sense at all because it would deteriorate performance for other important queries. That’s where secondary indexing may come to the rescue.

Secondary Indexes

Creating a secondary index on a field allows non-partition keys to be queried without filtering. Secondary indexing has its boundaries, e.g. it only works with equality restrictions (WHERE price = 10.5).

More information about secondary indexing can be found here:

Creating an index on a name field makes it possible to execute our soup query without problems:

CREATE INDEX ON menu(name);
SELECT * FROM menu WHERE name = 'sour rye soup';

It’s worth noting that indexes come with their own performance costs – keeping them will require additional space and querying them is not as efficient as by primary keys. A proper secondary index needs to be queried first and only then a base table query is constructed and executed, which means we end up having two queries instead of one. Also, writing to the table backed by indexing is slower because both the original table and all of the indexes need to be updated. Still, if changing the data model is out the question, indexing can be much more efficient than filtering queries. And that’s the case especially if queries are highly selective, i.e. only a few rows are read.

Finally, indexes and filtering do not exclude each other – it’s perfectly possible to combine both in order to optimize your queries. Let’s go back to another example:

SELECT * FROM menu WHERE position = 1 and price = 10.5;

If we suspect that not many dishes have the same cost, we could create an index on the price:

CREATE INDEX ON menu(price);

Now, in the first stage of query execution, this index will be used to fetch all of the rows with the specific price. Then, all of the rows with a position different than 1 will be filtered out. Note that ALLOW FILTERING needs to be appended to this query because filtering is still involved in its execution.

Materialized Views

Another notable alternative to filtering is to use materialized views to speed up certain SELECT queries at the cost of more complicated table updates. A comprehensive description of how materialized views work (with examples) can be found here.

Performance

A quick local test performed on the queries below shows the performance impact of filtering and secondary indexes when query selectivity is high. The test cluster consists of 3 nodes, replication factor RF=1, and caches are disabled to ensure that rows would need to be read from the SSD NVMe drive instead of RAM. All of the queries in this example table were filled with 10’000 rows:


Queries:

  • A – Based on partition key p1: SELECT * FROM TMCR WHERE p1 = 15534
  • B – Based on regular column r1: SELECT * FROM TMCR WHERE r1 = 15538
  • C – Based on regular column r2: SELECT * FROM TMCR WHERE r2 = 9
  • D – Based on regular column r2, sliced : SELECT * FROM TMCR WHERE r2 > 10000

The table below shows the duration of running 100 queries of each type in seconds:

Configuration/Query A (p1 = x) B (r1 = x) C (r2 = x) D (r2 > x)
Filtering, no indexes 0.14s 2.96s 3.63s
275K rows/s
2.96s
With index on r1 N/A 0.14s 3.79s 3.10s
With index on r2 N/A 3.11s 13.75s
73K rows/s
3.10s
With materialized view for r1* N/A 0.14s N/A N/A
With materialized view for r2* N/A N/A 1.15s
869K rows/s
2.55s

The first obvious conclusion is that the queries based on primary key are much faster than fetching and filtering all of the rows.

Another interesting observation is that the low selectivity query C (WHERE r2 = 9), which effectively fetches all rows, is much faster with filtering than indexes. At first glance, it may look like an anomaly, but it is actually expected – sequential reading and filtering of all of the rows are faster than a random index lookup.

Also, creating a specialized materialized view can be faster than indexing, since querying a materialized view doesn’t involve double lookups.

Finally, indexing a low cardinality column (query C, configuration with index on r2) is heavily discouraged because it will create a single huge partition (in our example all r2 values are equal to 9 and r2 becomes a primary key for created index table). This local test shows it’s already slower than other configurations, but the situation would get even worse on a real three-node cluster.

What’s Not Here Yet

Scylla’s filtering implementation does not yet cover the following functionalities:

  • Support for CONTAINS restrictions on collections
  • Support for multi-column restrictions (WHERE (category, name) = (‘soups’, ‘sour rye soup’))

Summary

Newly implemented filtering support can allow certain queries to be executed by appending the ALLOW FILTERING keyword to them. Filtering comes with a performance burden and is usually a symptom of data model design flaws. The alternative solutions described in this blog post should be considered first.

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Upcoming Enhancements to Scylla’s Filtering Implementation appeared first on ScyllaDB.

Instaclustr Announces $15 Million Growth Equity Investment Led by Level Equity

Investment will support the expansion of Instaclustr’s managed platform for open source big data technologies

 

Palo Alto, California – August 16, 2018 Instaclustr, the leading provider of completely managed solutions for scalable open source technologies, today announced it has raised $15 million in an investment round led by Level Equity, the New York City-based private investment firm. The funding provides Instaclustr with capital to accelerate expansion of its managed platform of core open source technologies. Instaclustr – now serving more than 100 customers from various industries – will double its headcount over the next year. Instaclustr’s revenues have grown 300% over the last 24 months and this pace is expected to continue.

 

Instaclustr’s Open Source-as-a-Service platform delivers fully hosted and managed big data technology solutions in their 100% open source form, providing customers the data capabilities and reliability required to scale with absolute freedom from vendor or technical lock-in.  Core technologies currently provided by the managed platform include Apache Cassandra, Apache Spark and Apache Kafka.

 

Instaclustr will use this funding to expand its automated and proven management environment for database, analytics, search, and messaging services to include additional open source technologies such as Elasticsearch, Apache Ignite and Apache Flink. The company will also be adding other data-centric open source technologies that integrate well and that are ideally suited for supporting the scale, high availability, and performance demanded from next-gen applications and solutions. All technologies will continue to be made available through a single platform capable of handling customers’ entire data layer.

 

“We allow customers to get the most production value from an array of powerful open source data solutions – and to do so within a fully managed environment that frees up their IT resources and budget,” said Peter Nichol, CEO, Instaclustr. “The new investment from Level Equity will accelerate our platform’s expansion, grow our sales and support teams, and allow us to reach more organizations seeking to optimize their data-related performance, reliability, security, and scalability.”

 

“Instaclustr has earned its reputation as a trusted enterprise partner through its unwavering dedication to 100% open source technologies and fast, expert support,” Ben Levin, Founder and Partner, Level Equity. “Demand for flexible solutions capable of addressing mission-critical data infrastructure is growing exponentially, and this is reflected in the outstanding financial performance Instaclustr has demonstrated in the last several years. The comprehensive managed platform Instaclustr offers is well positioned as a singular backend answer to meet the entire scope of customers’ requirements when it comes to managing the data layer.”

 

“Because of Instaclustr’s robust platform and readily-available support, we’re able to focus our attention where it belongs: on our product and growing our business,” said Jason Wu, CTO, AdStage. “Working with Instaclustr has given us certainty that our open source deployment is being expertly managed, scaled, and optimized on our behalf. We look forward to continuing to benefit from our relationship with Instaclustr as it expands its managed data technologies.”

 

Level Equity joins existing investors Bailador Technology Investments, ANU Connect Ventures, and Our Innovation Fund, LP.

 

About Instaclustr

 

Instaclustr is the Open Source-as-a-Service company, delivering reliability at scale. We operate an automated, proven, and trusted managed environment, providing database, analytics, search, and messaging. We enable companies to focus internal development and operational resources on building cutting edge customer-facing applications.

 

For more information, visit Instaclustr.com and follow us @Instaclustr.

 

 

 

The post Instaclustr Announces $15 Million Growth Equity Investment Led by Level Equity appeared first on Instaclustr.

$15 Million Raise Led by Level Equity – Here’s What That Enables Us to Accomplish

I’m excited to announce that Instaclustr has raised $15 million from New York City-based private investment firm Level Equity and a number of our existing investors.  We will immediately begin using the capital to accelerate the expansion of our managed platform offering core open source technologies.

 

Our business is growing tremendously – we now serve more than 100 customers across a breadth of industries and use cases, and we expect the revenue growth of 300% that we’ve achieved over the past two years to continue apace. To meet the rising demands that this places on our organization, Instaclustr will be growing internally as well – in particular, we will be expanding our sales and support teams and plan to double our headcount over the coming year.

 

This new funding will allow us to introduce additional open source technologies to our Open Source-as-a-Service platform. More specifically, managed solutions for Elasticsearch, Apache Ignite, and Apache Flink will soon to join Apache Cassandra, Apache Spark and Apache Kafka among our fully hosted and managed big data technology solutions. We’ll also be adding other data-centric open source technologies that integrate well and that are ideally suited for supporting the scale, high availability, and performance demanded from our customers’ next-gen applications and solutions.

 

As we expand our automated and proven platform for database, analytics, search, and messaging services, we look forward to reaching more organizations seeking to optimize their data-related performance, reliability, security, and scalability. At the same time, we’ll continue to ensure that our customers get the most production value from our array of powerful open source data solutions – and do so within our fully managed environment that frees up their IT resources and budget.

 

Sincerely,

Peter Nichol, CEO

The post $15 Million Raise Led by Level Equity – Here’s What That Enables Us to Accomplish appeared first on Instaclustr.

Java 11 Support in Apache Cassandra 4.0

At the end of July, support for Java 11 was merged into the Apache Cassandra trunk, which will be shipped in the next major release, Cassandra 4.0. Prior to this, Cassandra 3.0 only ran using Java 8, since there were breaking changes in Java that prevented it from run on later versions. Cassandra now supports both Java 8 and 11.

To run Cassandra on Java 11, you’ll need to first download an early access build of jdk java 11, since there’s still no official released. I downloaded a build for my Mac and untar’ed the archive.

Next, you’ll need to set the environment variables. On my mac I’ve set the following variables:

$ export JAVA_HOME="/Users/jhaddad/Downloads/jdk-11.jdk/Contents/Home"
$ export JAVA8_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home"

You can get Cassandra by cloning the git repo and building using ant:

$ git clone https://github.com/apache/cassandra.git
$ cd cassandra
$ ant

You should see the build script finish with something like the following:

write-poms:
   [script] Warning: Nashorn engine is planned to be removed from a future JDK release

init:

maven-ant-tasks-localrepo:

maven-ant-tasks-download:

maven-ant-tasks-init:

maven-declare-dependencies:

_write-poms:

build-test:

jar:
      [jar] Building jar: /Users/jhaddad/dev/cassandra/build/tools/lib/stress.jar

BUILD SUCCESSFUL
Total time: 7 seconds

You can now start Cassandra with the following:

$ bin/cassandra -f

One feature that could be a big deal over time is the new garbage collection algorithm, ZGC. The goal of ZGC is to work on huge heaps while maintaining low latency, 10ms or less. If it delivers on the promise, we could avoid an entire optimization process that many teams struggle with. It can be enabled with these JVM flags.

-XX:+UnlockExperimentalVMOptions
-XX:+UseZGC

To use ZGC in Cassandra 4.0, you can add the JVM flags to the cassandra-env.sh file located in the conf directory of the repository as shown below. Note that flags are add above the JVM_OPTS="$JVM_OPTS $JVM_EXTRA_OPTS" line at the end of the file.

JVM_OPTS="$JVM_OPTS -XX:+UnlockExperimentalVMOptions"
JVM_OPTS="$JVM_OPTS -XX:+UseZGC"
JVM_OPTS="$JVM_OPTS $JVM_EXTRA_OPTS"

The Cassandra team intends to freeze the trunk branch in September, committing to bug fixes and stability improvements before releasing 4.0. We’d love feedback on the release during this period - especially in regards to performance with Java 11. We appreciate any help testing real world workloads (in a staging environment!). Bugs can be reported to the Cassandra JIRA. We aim to make the 4.0 release stable on day one. We encourage everyone to get involved early to ensure the high quality of this important release!

Proposal for a New Cassandra Cluster Key Compaction Strategy

Cassandra storage is generally described as a log-structured merge tree (LSM). In general, LSM storage provides great speed in performing writes, updates and deletes over reads. As a general rule, a write in Cassandra is an order of magnitude faster than a read. Not that reads are necessarily slow, but rather that the entire design of the server is to do writes very quickly and efficiently.

To manage data written to LSM storage, the files created by the fast writes need to be re-organized to help read efficacy and manage storage space. The process to perform this reorganization is called “compaction.” There are currently three generally available compaction strategies, each designed to optimize certain workloads.

Unfortunately, there are many workloads which don’t necessarily fit well into any of the current compaction strategies. What I hope to do here is present a convincing argument for a fourth compaction strategy which I think will fit the needs of many use cases which today are left out in the cold.

I am calling my proposed compaction strategy: Cluster Key Compaction Strategy (CKCS)

Existing strategies

Size Tiered

Size Tiered Compaction Strategy (STCS) is the default compaction strategy and it has worked for many workloads through the years Cassandra has been in existence. It is recognized as having a relatively low write amplification level and it can generally keep the total number of SSTable files reasonably low, limiting the number of SSTable files that need to be referenced to find all the parts of a partition required by a read. One of its largest drawbacks is the amount of disk space required for a compaction.

Leveled Compaction

Leveled Compaction Strategy (LCS) attempts to address the large amount of disk space required for compaction, and at the same time it also works to drastically limit the number of SSTable files required to fulfill a read from a partition to just one or two SSTable files. Its main drawback is the dramatic increase in write amplification for all data stored in LCS. With LCS, SSTable files are allowed to grow only to a predefined fixed size with the requirement that all columns of a specific partition exist in only one SSTable file at each level. When compacting from one level to the next, many SSTable files are both merged and distributed to many SSTable files.

Time Window

Time Window Compaction Strategy (TWCS) uses STCS inside of a set of predefined windows or buckets based on time to store data. It deliberately keeps partitions spread across many SSTable files. By the use of windows, the space required to perform a compaction can be reduced by up to the number of windows. For example, if the number of windows is 20, then the space required for any TWCS compaction will be no more than 1/20 of the space consumed by the table. It also results in the lowest write amplification of any of the compaction strategies. While this is not enforced, it is strongly recommended that TWCS be used only with data that is known to have a limited lifetime, preferably through the Time To Live (TTL) feature of Cassandra. TWCS was designed to store time series data where the data coming in is dividable into well-defined time chunks.

TWCS does not play well with hinted handoffs, read repairs or regular repairs, all of which can end up putting data which might belong in one window into a different window. This is not usually a problem if the data is short-lived or not of a critical nature. But that is not always the case in the real world.

Limits to existing strategies

As discussed above, each strategy has its strengths and weaknesses. Each needs to be carefully evaluated to decide which is best for your application.

STCS requires a large amount of space to perform compactions and may need many SSTable files read to find all parts of a specific row for a given partition, but it has fairly low write amplification.

LCS dramatically reduces the amount of space required for a compaction and greatly improves the likelihood that all the rows of a partition will be in the same place, but it can produce a huge number of SSTable files and it results in a massive increase in write amplification. It’s best used with workloads where reads are 90% or better of the workload.

TWCS is designed to work with time series data only. It is based on server time, having nothing at all to do with anything stored in the data itself. Like LCS, it greatly reduces the space required for compaction and it also has even better write amplification than STCS. It does not work well with Cassandra’s current anti-entropy mechanisms which makes it unsuitable for some kinds of data which might otherwise fit

Why a new strategy

In the last four years I have spent time consulting for different organizations which are using or planning to use Cassandra, and I keep finding workloads which would benefit from a compaction strategy that has features of both LCS and TWCS, and yet is still distinct from either one.

In fact, there are really two distinct use cases one could argue belong to separate strategies, but I think a single strategy could be created to fit both.

I would like to propose Cluster Key Compaction Strategy (CKCS). In the CKCS, SSTable files will be grouped together based on its cluster key values. Either on a set of moving windows, much like TWCS uses where a specific number of windows contain data expected to expire over time to limit the total number of windows, or based on a predefined set of set of partitions for the entire key range. By basing the window selection on cluster key values, the windows become defined outside of current server time, allowing Cassandra anti-entropy tools to work, although this will increase write amplification and SSTable file counts over traditional TWCS. It will also allow data sets which are not time-based to benefit from the compaction space and partition spread out that is in the nature of the current TWCS strategy.

Proposed CKCS Details

The proposed CKCS will use the first column of the cluster key to define buckets which will be used to designate groups of SSTable file sets used to store data. In order to make the definition simple, the data type of that first column will need to be fixed in width and the possible key values well understood. Small integer, integer, large integer and timestamps would be the simplest to use and not, in my opinion, an unnecessarily restricted list.

How the CKCS would work

When a table is created with CKCS one of two bucket definition parameter types will be used.

  1. Moving window variation. Two parameters are used: one defines the unit size much like TWCS and should be caused unit. A unit can be a timeframe (seconds, minutes, hours, days) or it can be a number scale (ones, tens, hundreds, thousands, millions). The second parameter is the window size in units. With the moving window variation, it is assumed that all data written to the table will eventually expire and the number of windows will therefore be limited based on the lifetime of data stored in the table.
  2. Static window variation. One parameter is used: The static window variation assumes long-lived data which is to be spread into multiple windows, or buckets based on the value of the entire contents of the cluster key column. With this variation, the window size is not specified by the user. Instead, the number of windows or buckets is specified. Cassandra will compute the “size” by taking the maximum absolute value range of the column and dividing by the number of desired windows or buckets.

In both approaches, when an SSTable is flushed to disk, behavior is normal. When enough SSTable files have been flushed defined by a compaction threshold variable, instead of compacting the SSTable files together, the data in the existing SSTable files will be distributed to a single SSTable file in each window or bucket. For normal operation of the moving window variation, this will look much like the first compaction in TWCS and probably result in an actual compaction. For the static window variation, this will cause the data in the tables to be distributed out, creating more rather than fewer SSTable files.

After data is distributed to a defined SSTable file window or bucket, compaction proceeds using Size Tiered compaction within that window or bucket. To allow efficient queries based on cluster key ranges, the range of cluster key values for a specific SSTable file will be stored as a part of the SSTable file’s metadata in its statistics file.

Benefits

This new compaction strategy will have benefits over TWCS and might likely succeed it as the primary time series compaction strategy, as it avoids many of the current issues with TWCS or its predecessor DTCS. In addition, this new strategy will bring some of the benefits of TWCS to database workloads which are not time series in nature.

Large partitions

Large partitions under both STCS and LCS cause significant extra work during compaction. By spreading the partition data out over a number of windows or buckets, partitions can become significantly larger before having the heap and CPU impact on Cassandra during compaction that large partitions do today.

Dealing with anti-entropy

Currently, Cassandra anti-entropy mechanisms tend to work counter-purpose to both TWCS and DTCS and often make it necessary to turn them off to avoid pushing data into the wrong windows. It is also impossible to reload existing data or add a new DC or even a new host without disrupting the windowing.

CKCS will ensure data gets put into the correct windows even with anti-entropy running. It will also allow maintenance activities, including data reloads, adding a new DC or a new host to an existing DC storing data into the correct window.

What CKCS won’t be able to do is ensure a final window compaction since there is never a certain final point in time for a given window. A “final” compaction is still likely to be a good idea; it just won’t ensure that all data will be in a single SSTable file for the window.

Compaction space savings

For both modes, moving window and static window, the compaction space savings will be comparable to what can be accomplished with TWCS or DTCS.

Write amplification

Write amplification benefits should be similar to TWCS for the moving window mode as long as writes take place during the actual time windows and anti-entropy is not generating significant out of window writes. In Static window mode, write amplification should be similar to standard STCS but the number of compactions increases while the sizes will decrease making for the overall I/O workload somewhat less spiky.

 


Find out how Pythian can help you with Cassandra Services.

Upcoming Improvements to Scylla Streaming Performance

Scylla Streaming

Streaming in Scylla is an internal operation that moves data from node to node over a network. It is the foundation of various Scylla cluster operations. For example, it is used by an “add node” operation to copy data to a new node in a cluster. It is also used by a “decommission” operation that removes a node from a cluster and streams the data it holds to the other nodes. Another example is a “rebuild” operation that rebuilds the data that a node should hold from replicas on other nodes. It is also used by a “repair” operation that is used to synchronize data between nodes.

In this blog post, we will take a closer look at how Scylla streaming works in detail and how the upcoming Scylla 2.4’s new streaming improves streaming bandwidth by 240% and reduces the time it takes to perform a “rebuild” operation by 70%.

How Scylla Streaming Works

The idea behind Scylla streaming is very simple. It’s all about reading data on one node, sending it to another node, and applying the data to disk. The diagram below shows the current path of data streaming. The sender creates sstable readers to read the rows from sstables on disk and sends them over the network. The receiver receives the rows from the network and writes them to the memtable. The rows in memtable are flushed into sstables periodically or when the memtable is full.

How We’re Improving Scylla Streaming

You can see that on the receiver side, the data is applied to a memtable. In normal CQL writes, memtables help by sorting the CQL writes in the form of mutations. When the CQL writes in memtables are flushed to an SSTable, the mutations are sorted. However, when streaming, the mutations are already sorted. This is because they are in the same order when the sender reads the mutations from the SSTable. That’s great! We can remove the memtable from the process. The advantages are:

  • Less memory consumption. The saved memory can be used to handle your CQL workload instead.
  • Less CPU consumption. No CPU cycles are used to insert and sort memtables.
  • Bigger SSTables and fewer compactions. Once the Memtable is full, it is flushed to an SSTable on disk. This happens during the whole streaming process repeatedly, thus generating many smaller SSTable files. This volume of SSTables adds pressure to compaction. By removing the memtable from the streaming process, we can write the mutations to a single SSTable.

If we look at the sender and receiver as a sandwich, the secret sauce is the Seastar RPC framework. To send a row, a Seastar RPC call is invoked. The sender invokes the RPC call repetitively until all the rows are sent. The RPC call is used in request-response models. For streaming, the goal is to send a stream of data with higher throughput and less time and not to request the remote node to process the data and give a response with lower latency for each individual request. Thus, it makes more sense to use the newly introduced Seastar RPC Streaming interface. With the new Seastar RPC Streaming interface, we need to use the RPC call only once to get handlers called Sink and Source, dramatically reducing the number of RPC calls to send streaming data. On the sender side, the rows are pushed to the Sink handler, which sends them over the network. On the receiver side, the rows are pulled from the Source handler. It’s also worth noting that we can remove the batching at the streaming layer since the Seastar RPC streaming interface will do the batching automatically.

With the new workflow in place, the new streaming data path looks like this:

Performance Improvements

In this part, we will run tests to evaluate the streaming performance improvements. For this, we chose to use the rebuild operation that streams data from existing nodes to rebuild the database. Of course, the rebuild operation uses Scylla streaming.

We created a cluster of 3 Scylla nodes on AWS using i3.xlarge instance. (You can find the setup details at the end of this blog post.) Afterward, we created a keyspace with a replication factor of 3 and inserted 1 billion partitions to each of the 3 nodes. After the insertion, each node held 240 GiB of data. Lastly, we removed all the SSTables on one of the nodes and ran the “nodetool rebuild” operation. The sender nodes with the replicas send the data to the receiver node in parallel. Thus, in our test, there were two nodes streaming in parallel to the node that ran the rebuild operation.

We compared the time it took to complete the rebuild operation and the streaming bandwidth on the receiver node before and after the new streaming changes.

Tests Old Scylla Streaming New Scylla Streaming Improvement
Time To Rebuild 170 minutes 50 minutes 70%
Streaming Bandwidth 36 MiB/s 123 MiB/s 240%

To look at the streaming performance on bigger machines, we did the above test again with 3 i3.8xlarge nodes. Since the instance is 8 times larger, we inserted 8 billion partitions to each of the 3 nodes. Each node held 1.89TiB of data. The test results are in the tables below.

Tests Old Scylla Streaming New Scylla Streaming Improvement
Time To Rebuild 218 minutes 66 minutes 70%
Streaming Bandwidth 228 MiB/s 765 MiB/s 235%

Conclusion

With our new Scylla streaming, streaming data is written to the SSTable on disk directly and skips the memtable completely resulting in less memory and CPU usage and less compaction. The data is sent over network utilizing the new Seastar RPC Streaming interface.

The changes described here will be released in the upcoming Scylla 2.4 release. It will make the Scylla cluster operations like add new node, decommission node and repair node even faster.
You can follow our progress at implementing the streaming improvement on GitHub: #3591.

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

Setup Details

DB Nodes: 3
Instance Type: i3.xlarge / i3.8xlarge
Replication Factor (RF): 3
Consistency Level (CL): QUORUM
Compaction Strategy: Size-Tiered
Scylla Version: Scylla master commit 31d4d37161bdc26ff6089ca4052408576a4e6ae7 with the new streaming disabled / enabled.

The post Upcoming Improvements to Scylla Streaming Performance appeared first on ScyllaDB.

The Cost of Containerization for Your Scylla

 Cost Containerization

 

The ubiquity of Docker as a packaging and deployment platform is ever growing. Using Docker containers relieves the database operator from installing, configuring, and maintaining different orchestration tools. In addition, it standardizes the database on a single orchestration scheme that is portable across different compute platforms.

There is, however, a performance payoff for the operational convenience of using containers. This is to be expected because of the extra layer of abstraction (the container itself), relaxation on resource isolation, and increased context switches. This solution is known to be computationally costly, which is exacerbated on a shard-per-core architecture such as Scylla. This article will shed light on the performance penalties involved in running Scylla on Docker, where the penalties are coming from, and the tuning steps Docker users can take to mitigate them. In the end, we demonstrate that it is possible to run Scylla on Docker containers by paying no more than a 3% performance penalty in comparison with the underlying platform.

Testing Methodology

The initial testing used an Amazon Web Service i3.8xlarge instance as a baseline for performance. Once the baseline performance was established, we used the same workload on a Docker base deployment container and compared the results.

Testing included:

  • Max throughput for write workloads
  • Latency comparisons at targeted read workload

The hardware and software setup used for testing are described in Appendix B
The different workloads are described in Appendix C.

We tested the same workload in four different configurations:

  • AMI: Scylla 2.2 AMI, running natively on AWS without any further tuning
  • Docker default: Scylla 2.2 official Dockerhub image without any further tuning
  • --cpuset: Scylla 2.2 Dockerhub image, but with CPU pinning and isolation of network interrupts to particular cores, mimicking what is done in the Scylla AMI.
  • --network host: Aside from the steps described in –cpuset, also bypassing the Docker virtualized networking by using the host network.

Max Throughput Tests

We chose a heavy disk I/O workload in order to emulate a cost-effective, common scenario. Max write throughput tests were obtained using the normal distribution of partition, in which the median is the half of the populated range and standard deviation is one-third of the median. The results are shown in Figure 1.

Figure 1: Maximum throughput comparison between a Scylla 2.2 AMI running on an AWS i3.8xlarge (blue), and various Docker configurations. With the default parameters meant to run in shared environments, there is a 69% reduction in peak throughput. However, as we optimize, the difference can be reduced to only  3%.

Scylla in a Docker container showed 69% reduction on write throughput using our default Docker image. While some performance reduction is expected, this gap is significant and much bigger than one would expect. We attribute it to the fact that none of the close-to-the-hardware optimizations usually employed by Scylla are present. The results get closer to the underlying platform’s performance when Scylla controlled the resources and allocated the needed tasks to the designated CPU, IO channel, and network port. A significant portion of the performance was recovered (11% reduction) with CPU Pinning and perftune.py (to tune the NIC and disks in the host) script execution. Going even further, using the host network and removing the Docker network virtualization using the --network host parameter brought us to a 3% reduction on overall throughput. One of the strong Docker features is the ability to separate networking traffic coming to each Docker instance on the same machine (as described here). If one uses the --network host parameter,  he/she will no longer be able to do that since Docker is going to hook up to the host networking stack directly.

Latency Test Results

To achieve the best latency results, we issued a read workload to the cluster at a fixed throughput. The throughput is the same for all cases to make the latency comparison clear. We executed the tests on the wide range of the populated data set for 1 hour, making sure that the results are coming from both cache and disk. The results are shown in Figure 2.

Figure 2: 99.9th, 99th, 95th and average latency for a read workload with fixed throughput with the Scylla 2.3 AMI, in blue, and with the various Docker configurations. There is a stark increase in the higher percentiles with the default configuration where all optimizations are disabled. But as we enabled them the difference essentially disappears.

While the difference in percentage (up to 40%) might be significant from the AMI to the Docker image, the numerical difference is in the single digit millisecond. But after network and CPU pinning and removing the Docker network virtualization layer, the differences become negligible. For users who are extremely latency sensitive, we still recommend using a direct installation of Scylla. For users looking to benefit from the ease of Docker usage, the latency penalty is minimal.

Analysis

We saw in the results of the previous runs that users who enable specific optimizations can achieve with Docker setups performance levels very close to what they would in the underlying platform. But where is the difference coming from?

The first step is obvious: Scylla employs a polling thread-per-core architecture, and by pinning shards to the physical CPUs and isolating network interrupts the number of context switches and interrupts is reduced.

As we saw, once all CPUs are pinned we can achieve throughput that is just 11% worse than our underlying platform. It is enlightening at this point to look at Flamegraphs for both executions. They are presented in Figures 3 and 4 below:

Figure 3: Flamegraphs obtained during a max-throughput write workload with the Scylla 2.2 AMI.

Figure 4: Flamegraphs obtained during the same workload against Scylla running in Docker containers with its CPUs pinned and interrupts isolated.

As expected, the Scylla part of the execution doesn’t change much. But to the left of the Flamegraph we can see a fairly deep callchain that is mostly comprised of operating system functions. As we zoom into it, as shown in Figures 5 and 6, we can see that those are mostly functions involved in networking. Docker virtualizes the network as seen by the container. Therefore, removing this layer can bring back some of the performance as we saw in Figures 1 and 2.

 

Figure 5: Zooming in to the Flamegraphs. The Scylla 2.2 AMI.

Figure 6: Scylla running on Docker with all CPUs pinned and network interrupts isolated.

Where Does the Remaining Difference Come From?

After all of the optimizations were applied, we still see that Docker is 3% slower than the underlying platform. Although this is acceptable for most deployments, we would still like to understand why. Hints as to why can be seen in the very Flamegraphs in Figures 3-6. We see calls to seccomp that are present in the Docker setup but not in the underlying platform. We also know for a fact that Docker containers are executed within Linux cgroups, which are expected to add overhead.

We disabled security profiles by using the --security-opt seccomp :unconfined Docker parameter. Also, it is possible to manually move tasks out of cgroups by using the cgdelete utility. Executing the peak throughput tests again, we now see no difference in throughput between Docker and the underlying platform. Understanding where the difference comes from adds educational value. However, as we consider those to be essential building blocks of a sane Docker deployment, we don’t expect users to run with those disabled.

Conclusion

Containerizing applications is not free. In particular, processes comprising the containers have to be run in Linux cgroups and the container receives a virtualized view of the network. Still, the biggest cost of running a close-to-hardware, thread-per-core application like Scylla inside a Docker container comes from the opportunity cost of having to disable most of the performance optimizations that the database employs in VM and bare-metal environments to enable it to run in potentially shared and overcommitted platforms.

The best results with Docker are obtained when resources are statically partitioned and we can bring back bare-metal optimizations like CPU pinning and interrupt isolation. There is only a 10% performance penalty in this case as compared to the underlying platform – a penalty that is mostly attributed to the network virtualization. Docker allows users to expose the host network directly for specialized deployments. In cases in which this is possible, we saw that the performance difference compared to the underlying platform falls down to 3%.

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

Appendix A: Ways to Improve Performance in a Containerized Environment

As we demonstrated in previous articles like An Overview of Scylla Architecture and its underlying framework Seastar, Scylla uses a shared-nothing approach and pins each Scylla shard to a single available CPU.

Scylla already provides some guidelines on how to improve Scylla performance on Docker. Here we present a practical example on an i3.8xlarge AWS instance and showcase how to use network IRQ and CPU pinning.

Network Interrupts

Scylla checks the available network queues and available CPUs during the setup. If there are not enough queues to distribute network interrupts across all of the cores, Scylla will isolate some CPUs for this purpose. Also, if irqbalance is installed it will add the CPUs dedicated to networking to the list of irqbalance banned CPUs. For that Scylla uses the perftune script, distributed with Scylla packages. It is still possible to run the same script in the host in preparation for running Docker containers. One caveat is that those changes are not persisted and have to be applied every time the machine is restarted.

In the particular case of i3.8xlarge perftune will isolate CPUs 0 and 16 for the sole purpose of handling network interrupts:

For proper isolation, the CPUs handling network interrupts shouldn’t handle any database load. We can use a combination of perftune.py and hex2list.py to discover exactly what are the CPUs that are free of network interrupts:

Shard-per-core Architecture and CPU Pinning

When we use Scylla inside a container, Scylla is unaware of the underlying CPUs in the host. As a result, we can see drastic performance impact (50%-70%) due to context switches, hardware interrupts, and the fact that Scylla needs to stop employing polling mode. In order to overcome this limitation, we recommend users to statically partition the CPU resources to b assigned to the container and letting the container take full control of its shares. This can be done using the --cpuset option. In this example, we are using an i3.8xlarge (32 vcpus) and want to run a single container in the entire VM. We will pass --cpuset 1,15-17-31 ensuring that we pin 30 shards to 30 vCPUs. The two remaining vCPUs will be used for network interrupts as we saw previously. It is still possible to do this when more than one container is present in the box, by partitioning accordingly.

Appendix B: Setup and Systems Used for Testing (AWS)

Hardware

Throughput tests
1 x i3.8xlarge (32 CPUs 244GB RAM 4 x 1900GB NVMe 10 Gigabit network card) Scylla node
2 x c4.8xlarge (36 CPUs 60GB RAM 10 Gigabit network card) writers running 4 cassandra-stress instance each.

Latency tests
1 x i3.8xlarge (32 CPUs 244GB RAM 4 x 1900GB NVMe 10 Gigabit network card) Scylla node
2 x c4.8xlarge (36 CPUs 60GB RAM 10 Gigabit network card) readers running 4 cassandra-stress instance each.

Software

Scylla 2.2 AMI (ami-92dc8aea region us-west-2 [Oregon])
Docker version 1.13.1, build 94f4240/1.13.1
Perftune.py from scylla-tools

Provisioning Procedure

AMI
AMI deployed using AWS Automation

Docker

Docker --cpuset

Docker --network host

Appendix C: Workloads

Dataset

5 Columns, 64 bytes per column, 1500000000 partitions. Total Data: ~480GB
2 Loaders running 1 cassandra-stress, 750000000 interactions each
cassandra-stress commands:

Loader 1:

Loader 2:

Write Max Throughput Test

2 Loaders running 4 cassandra-stress each for 1 hour.
cassandra-stress commands:

Loader 1:

Loader 2:

30K IOPs Read Latency Test

2 Loaders running 4 cassandra-stress each for 1 hour.
cassandra-stress commands:

Loader 1:

Loader 2:

The post The Cost of Containerization for Your Scylla appeared first on ScyllaDB.

Apache Cassandra Performance Tuning - Compression with Mixed Workloads

This is our third post in our series on performance tuning with Apache Cassandra. In our first post, we discussed how we can use Flame Graphs to visually diagnose performance problems. In our second post, we discussed JVM tuning, and how the different JVM settings can have an affect on different workloads.

In this post, we’ll dig into a table level setting which is usually overlooked: compression. Compression options can be specified when creating or altering a table, and it defaults to enabled if not specified. The default is great when working with write heavy workloads, but can become a problem on read heavy and mixed workloads.

Before we get into optimizations, let’s take a step back to understand the basics of compression in Cassandra. Once we’ve built a foundation of knowledge, we’ll see how to apply it to real world workloads.

How it works

When we create a table in Cassandra, we can specify a variety of table options in addition to our fields. In addition to options such as using TWCS for our compaction strategy, specifying gc grace seconds, and caching options, we can also tell Cassandra how we want it to compress our data. If the compression option is not specified, LZ4Compressor will be used, which is known for it’s excellent performance and compression rate. In addition to the algorithm, we can specify our chunk_length_in_kb, which is the size of the uncompressed buffer we write our data to as an intermediate step before writing to disk. Here’s an example of a table using LZ4Compressor with 64KB chunk length:

create table sensor_data ( 
    id text primary key, 
    data text) 
WITH compression = {'sstable_compression': 'LZ4Compressor', 
                    'chunk_length_kb': 64};

We can examine how well compression is working at the table level by checking tablestats:

$ bin/nodetool tablestats tlp_stress

Keyspace : tlp_stress
        Read Count: 89766
        Read Latency: 0.18743983245326737 ms
        Write Count: 8880859
        Write Latency: 0.009023213069816781 ms
        Pending Flushes: 0
                Table: sensor_data
                SSTable count: 5
                Old SSTable count: 0
                Space used (live): 864131294
                Space used (total): 864131294
                Off heap memory used (total): 2472433
                SSTable Compression Ratio: 0.8964684393508305
                Compression metadata off heap memory used: 140544

The SSTable Compression Ratio line above tells us how effective compression is. Compression ratio is calculated by the following:

compressionRatio = (double) compressed/uncompressed;

meaning the smaller the number, the better the compression. In the above example our compressed data is taking up almost 90% of the original data, which isn’t particularly great.

How data is written

I’ve found digging into the codebase, profiling and working with a debugger to be the most effective way to learn how software works.

When data is written to / read from SSTables, we’re not dealing with convenient typed objects, we’re dealing with streams of bytes. Our compressed data is written in the CompressedSequentialWriter class, which extends BufferedDataOutputStreamPlus. This writer uses a temporary buffer. When the data is written out to disk the buffer is compressed and some meta data about it is recorded to a CompressionInfo file. If there is more data than available space in the buffer, the buffer is written to, flushed, and the buffer starts fresh to be written to again (and perhaps flushed again). You can see this in org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java:

@Override
public void write(byte[] b, int off, int len) throws IOException
{
    if (b == null)
        throw new NullPointerException();

    // avoid int overflow
    if (off < 0 || off > b.length || len < 0
        || len > b.length - off)
        throw new IndexOutOfBoundsException();

    if (len == 0)
        return;

    int copied = 0;
    while (copied < len)
    {
        if (buffer.hasRemaining())
        {
            int toCopy = Math.min(len - copied, buffer.remaining());
            buffer.put(b, off + copied, toCopy);
            copied += toCopy;
        }
        else
        {
            doFlush(len - copied);
        }
    }
}

The size of this buffer is determined by chunk_length_in_kb.

How data is read

The read path in Cassandra is (more or less) the opposite of the write path. We pull chunks out of SSTables, decompress them, and return them to the client. The full path is a little more complex - there’s a a ChunkCache (managed by caffeine) that we go through, but that’s beyond the scope of this post.

During the read path, the entire chunk must be read and decompressed. We’re not able to selectively read only the bytes we need. The impact of this is that if we are using 4K chunks, we can get away with only reading 4K off disk. If we use 256KB chunks, we have to read the entire 256K. This might be fine for a handful of requests but when trying to maximize throughput we need to consider what happens when we have requests in the thousands per second. If we have to read 256KB off disk for ten thousand requests a second, we’re going to need to read 2.5GB per second off disk, and that can be an issue no matter what hardware we are using.

What about page cache?

Linux will automatically leverage any RAM that’s not being used by applications to keep recently accessed filesystem blocks in memory. We can see how much page cache we’re using by using the free tool:

$ free -mhw
              total        used        free      shared     buffers       cache   available
Mem:            62G        823M         48G        1.7M        261M         13G         61G
Swap:          8.0G          0B        8.0G

Page cache can be a massive benefit if you have a working data set that fits in memory. With smaller data sets this is incredibly useful, but Cassandra was built to solve big data problems. Typically that means having a lot more data than available RAM. If our working data set on each node is 2 TB, and we only have 20-30 GB of free RAM, it’s very possible we’ll serve almost none of our requests out of cache. Yikes.

Ultimately, we need to ensure we use a chunk length that allows us to minimize our I/O. Larger chunks can compress better, giving us a smaller disk footprint, but we end up needing more hardware, so the space savings becomes meaningless for certain workloads. There’s no perfect setting that we can apply to every workload. Frequently, the most reads you do, the smaller the chunk size. Even this doesn’t apply uniformly; larger requests will hit more chunks, and will benefit from a larger chunk size.

The Benchmarks

Alright - enough with the details! We’re going to run a simple benchmark to test how Cassandra performs with a mix of read and write requets with a simple key value data model. We’ll be doing this using our stress tool, tlp-stress (commit 40cb2d28fde). We will get into the details of this stress tool in a later post - for now all we need to cover is that it includes a key value workload out of the box we can leverage here.

For this test I installed Apache Cassandra 3.11.3 on an AWS c5d.4xlarge instance running Ubuntu 16.04 following the instructions on cassandra.apache.org, and updated all the system packages using apt-get upgrade. I’m only using a single node here in order to isolate the compression settings and not introduce noise from the network overhead of running a full cluster.

The ephemeral NVMe disk is using XFS and mounted it at /var/lib/cassandra. I set readahead using blockdev --setra 0 /dev/nvme1n1 so we can see the impact that compression has on our disk requests and not hide it with page cache.

For each workload, I put the following command in a shell script, and ran tlp-stress from a separate c5d.4xlarge instance (passing the chunk size as the first parameter):

$ bin/tlp-stress run KeyValue -i 10B -p 10M --populate -t 4 \
  --replication "{'class':'SimpleStrategy', 'replication_factor':1}" \
  --field.keyvalue.value='book(100,200)' -r .5  \
  --compression "{'chunk_length_in_kb': '$1', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}" \
  --host 172.31.42.30

This runs a key value workload across 10 million partitions (-p 10M), pre-populating the data (--populate), with 50% reads (-r .5), picking 100-200 words from of one of the books included in the stress tool (--field.keyvalue.value='book(100,200)'). We can specify a compression strategy using --compression.

For the test I’ve used slightly modified Cassandra configuration files to reduce the effect of GC pauses by increasing the total heap (12GB) as well as the new gen (6GB). I spend a small amount of time on this as optimizing it perfectly isn’t necessary. I also set compaction throughput to 160.

For the test, I monitored the JVM’s allocate rate using the Swiss Java Knife (sjk-plus) and disk / network / cpu usage with dstat.

Default 64KB Chunk Size

The first test used the default of 64KB chunk length. I started the stress command and walked away to play with my dog for a bit. When I came back, I was through about 35 million requests:

stress 64kb

You can see in the above screenshot our 5 minute rate is about 22K writes / second and 22K reads/ second. Looking at the output of dstat at this time, we can see we’re doing between 500 and 600MB / second of reads / second:

DStat 64KB

Memory allocation fluctuated a bit, but it hovered around 1GB/s:

sjk 4kb

Not the most amazing results in the world. Of the disk reads, some of that throughput can be attributed to compaction, which we’ll always have to contend with in the real world. That’s capped at 160MB/s, leaving around 400MB/s to handle reads. That’s a lot considering we’re only sending 25MB across the network. That means we’re doing over 15x the disk I/O than our network I/O. We are very much disk bound in this workload.

4KB Chunk Size

Let’s see if the 4KB chunk size does any better. Before the test I shut down Cassandra, cleared the data directory, and started things back up. I ran the same stress test above using the above shell script, passing 4 as the chunk size. I once again played fetch with my dog for a bit and came back after around the same time as the previous test.

Looking at the stress output, it’s immediately obvious there’s a significant improvement:

stress

In almost every single metric reported by the metric library the test with 4KB outperforms the 64KB test. Our throughput is better (62K ops / second vs 44K ops / second in the 1 minute rate), and our p99 for reads is better (13ms vs 24ms).

If we’re doing less I/O on each request, how does that impact our total disk and network I/O?

dstat 4kb

As you can see above, there’s a massive improvement. Disk I/O is significantly reduced from making smaller (but more) requests to disk, and our network I/O is significantly higher from responding to more requests.

sjk 4kb

It was initially a small surprise to see an increased heap allocation rate (because we’re reading WAY less data into memory), but this is simply the result of doing a lot more requests. There are a lot of objects created in order to satisfy a request; far more than the number created to read the data off disk. More requests results in higher allocation. We’d want to ensure those objects don’t make it into the Old Gen as we go through JVM tuning.

Off Heap Memory Usage

The final thing to consider here is off heap memory usage. Along side each compressed SSTable is compression metadata. The compression files have names like na-9-big-CompressionInfo.db. The compression metadata is stored in memory, off the Cassandra heap. The size of the offheap usage is directly proportional to the amount of chunks used. More chunks = more space used. More chunks are used when a smaller chunk size is used, hence more offheap memory is used to store the metadata for each chunk. It’s important to understand this trade off. A table using 4KB chunks will use 16 times the memory as one using 64KB chunks.

In the example I used above the memory usage can be seen as follows:

Compression metadata off heap memory used: 140544 

Changing Existing Tables

Now that you can see how a smaller chunk size can benefit read heavy and mixed workloads, it’s time to try it out. If you have a table you’d like to change the compression setting on, you can do the following at the cqlsh shell:

cqlsh:tlp_stress> alter table keyvalue with compression = {'sstable_compression': 'LZ4Compressor', 'chunk_length_kb': 4};

New SSTables that are written after this change is applied will use this setting, but existing SSTables won’t be rewritten automatically. Because of this, you shouldn’t expect an immediate performance difference after applying this setting. If you want to rewrite every SSTable immediately, you’ll need to do the following:

nodetool upgradesstables -a tlp_stress keyvalue

Conclusion

The above is a single test demonstrating how a tuning compression settings can affect Cassandra performance in a significant way. Using out of the box settings for compression on read heavy or mixed workloads will almost certainly put unnecessary strain on your disk while hurting your read performance. I highly recommend taking the time to understand your workload and analyze your system resources to understand where your bottleneck is, as there is no absolute correct setting to use for every workload.

Keep in mind the tradeoff between memory and chunk size as well. When working with a memory constrained environment it may seem tempting to use 4KB chunks everywhere, but it’s important to understand that it’ll use more memory. In these cases, it’s a good idea to start with smaller tables that are read from the most.