Scylla Student Projects, Part II: Implementing an Async Userspace File System

Implementing a file system is a complex task, but contrary to popular belief it is not equivalent to writing a kernel module and implementing the full API specified and standardized by POSIX, with hard links, 3 types of file locking mechanisms, extended attributes et al. In fact, any program or library which allows its users to have a notion of a “file,” which can be looked up, written to and read from, is already a file system. Today, file systems come in many varieties — some are distributed, some reside only in RAM, and a fair share of them are actually written in userspace. In this post, I’ll present the details of a student project sponsored by ScyllaDB, which resulted in creating a fully asynchronous log-structured file system, written from scratch in Seastar.

Intro: student projects

In 2019, ScyllaDB sponsored a program for Computer Science students organized by the University of Warsaw. Throughout the whole academic year, 3 teams of undergraduate students collaborated with and learned from ScyllaDB engineers to bring new features to Scylla and its underlying Seastar engine. The projects picked for 2019 edition were:

  • Parquet support for Seastar and Scylla (see Part I)
  • SeastarFS: an asynchronous userspace file system for Seastar (this post)
  • Kafka client for Seastar and Scylla (still to come!)

This blog post describes the progress on the SeastarFS project. This work is all to the credit of the students who wrote it, Krzysztof Małysa, Wojciech Mitros, Michał Niciejewski and Aleksandr Sorokin, and their supervisor, Dr. Jacek Sroka.

Motivation

Why write a file system from scratch? The first, most obvious reason is that it’s fun! In the case of SeastarFS, the following factors were also very important:

  1. Lack of alternatives. ScyllaDB has very specific latency requirements for file systems, and XFS was the only system so far that is considered good enough. ScyllaDB’s write path could benefit from a log-structured file system (more on that below), but no such system fulfills the requirements to a satisfactory degree.
  2. Overhead. Traditional file systems are implemented in the kernel, which means that a user is forced to use system calls to communicate with them. That created additional overhead which could be avoided by moving more work to userspace.
  3. Safety. Relying on third-party software is easy, but it comes with risks — a bug sneaking into XFS in the future could result in ruining latency guarantees, because the file system would no longer behave the same way as ScyllaDB expected it to. Providing our own implementation reduces the dependency on external projects.
  4. Control. Having a file system written 100% in Seastar, we’re able to:
    • integrate with the Seastar scheduler by setting proper priority for each operation
    • tightly control how much memory is used by the file system
    • provide strict latency guarantees, since everything is written with Seastar asynchronous primitives
    • gather valuable metrics in very specific places
    • add debug code wherever we please, without recompiling the kernel
  5. Shard-awareness. Seastar is written in a shared-nothing design in mind, and using a centralized file system in the kernel does not fit well into that model.
  6. Completeness. Seastar is already inches away from being a full-fledged operating system. It has its own schedulers, memory management, I/O primitives, threads… the only thing missing is its own file system!

Interface

SeastarFS does not strive to be a general-purpose, POSIX-compliant file system. It’s a highly specialized project which aims to provide highest possible throughput and lowest latency under several assumptions:

  • The files are large. While the system should work correctly on small files, it should be optimized for sizes counted in mega-, giga- and terabytes.
  • No hot files or directories. Given the shared-nothing design of Seastar, the system expects multiple users to work on their own files, without having to share resources between many of them. While it should be possible to access any file by any user, the optimized path assumes there are no conflicts and each client works on its own files.
  • Sequential read/write mostly. The system allows many convenient operations on files and directories (e.g. renaming files), but it expects the most common operation to be reading and writing long sequences — and they will be optimized even at the cost of making other operations, like a rename or a random write, slower.

Fortunately, Scylla does just that – it mainly operates on large, append-only files (commitlogs and SSTables), and is already written in Seastar’s shared-nothing design.

The interface of SeastarFS is similar to what users can do on a POSIX-compliant system, but without all the esoteric features like extended attributes or in-system file locking. Here’s a draft interface of what a file can currently do in SeastarFS:

Log-structured file system

SeastarFS is a log-structured file system, which means that it stores its data and metadata in logs (duh). Traditional file systems usually keep their metadata and data in blocks allocated on a device. Writing a file to such a file system results in updating some on-disk metadata structures, allocating blocks for data and finally flushing the data to disk. Here’s a crude example of what could happen underneath, when a simple operation of writing 1000 bytes in the middle of a 500-byte file reaches the device:

  1. write 8 bytes at offset 672 // mtime (modification time) of file F is updated
  2. write 8 bytes at offset 608 // size of file F is updated
  3. write 512 bytes at offset 1036288 // the first block of F’s data is written
  4. write 8 bytes at offset 48 // file system’s metadata is updated: a new block was allocated
  5. write 488 bytes at offset 4518912 // the new, second block of F’s data is written

Disk accesses:

The main point is that even writing a single, consecutive stream of bytes results in many random writes reaching the storage device. That’s a suboptimal pattern even for modern NVMe drives, not to mention good old HDDs — for which the total time for performing an operation is Ls + x/T, where Ls is the seek latency, T is throughput and x is the number of bytes read/written. With Ls = 4ms and T = 500 MiB/s, the average time of writing 1,000 bytes in one chunk would be 4ms +2µs ≈ 4ms, while writing 1,000 bytes in batches of 100 at 10 different offsets would last around 10(4ms +0.2µs) ≈ 40ms, which clearly makes the seek latency a deciding factor. Even modern SSD disks prefer sequential access in large batches — especially if the operations are smaller than internal page sizes. Small operations means more work in case of reading, and faster device wear in case of writing, because internal SSD garbage collection needs to perform more work underneath — flash drives tend to erase with block granularity.

The problem stated in a paragraph above can be addressed by changing the way the file system stores both its data and metadata. For instance, imagine that the whole disk is divided into a relatively small number of fixed-size blocks (e.g. 16MiB), acting as logs. Now, each metadata operation sent to the file system is appended to one of such logs, always at the end. Respectively to the example above, updating metadata in such a system could look like this, assuming that the first free position in the log is at byte 376:

  • write 16 bytes at offset 376 // mtime of file F is updated
  • write 16 bytes at offset 384 // size of file F is updated
  • write 16 bytes at offset 392 // a new block was allocated

Disk accesses so far:

Now, a similar idea can be applied for data:

  • very small writes, with size comparable to the size of metadata entries, can be stored directly in the metadata log
  • moderately small writes end up in a special log dedicated for medium writes
  • large writes, with sizes bigger than blocks themselves, are simply split into blocks

Continuing our simplified example, a 1000B write ends up in one of the medium data logs:

What happens when a particular block used as a metadata log becomes full? Assuming that there are free blocks available in the system, a new block is allocated, and a “pointer” (which is just an on-disk offset) to this new log is appended to the old log as the last entry. That way, metadata logs create a linked list stored on disk. The list can be traversed later when booting the system.

Bootstrapping

SeastarFS, like any other file system, needs to be bootstrapped before it’s used — all the metadata needs to be verified and loaded to RAM. In order to be able to do that, a bootstrap record is created in a specific offset on the device — at first, it’s done by a helper program which formats the disk — just like mkfs does. The bootstrap record contains file system metadata, control sums, offsets of the metadata and data logs, and so on. When the system is started, it tries to load the structure from the specified offset (e.g. 0), and if it succeeds in verifying that the bootstrap record is genuine and consistent, all the metadata is loaded from the on-disk logs. The verification includes:

  • checking a magic number, which is customary for file systems (e.g. XFS uses a 32-bit number which represents “XFSB” in ASCII);
  • sanity-checking file system metadata: e.g. if a block size is larger than the device itself, something is probably off and we shouldn’t continue booting;
  • loading the metadata from metadata logs, which includes verifying metadata checksums and various sanity checks.

Here’s how a bootstrap record looks like for SeastarFS:

Compactions

Log-structured file systems sound just great with their better device access patterns, but there’s a catch. Entries are always appended to the back of the log, but that also means that some stored information starts to be completely redundant. That wouldn’t be a problem if our disks were infinitely large or we never had to read the whole log when booting the system, but none of this is unfortunately true. Let’s see an example log (simplified):

  1. Create a file f in directory /, assign inode number F
  2. Update size of file F to 3
  3. Write “abc” to file F at offset 0
  4. Update size of file F to 7
  5. Write “defg” to file F at offset 3
  6. Truncate file F to size 0
  7. Rename file F from /f to /some/other/directory/g
  8. Remove file F
  9. Create a file h in directory /, assign inode number H

At the time of performing each of the 8 operations, its metadata entry makes a lot of sense and is needed to keep the information up-to-date. However, by the time we’re right after performing operation 6, operations 1-5 become obsolete. If the system is turned off right now and metadata is later restored from the log, it doesn’t matter whether something was temporarily written to the file, since its size is ultimately 0. Similarly, after reaching operation 8, operations 1-7 are not needed at all. That’s good news – an opportunity to reclaim some space. How to do that? We can’t just rewrite the log in place, because that would ruin the whole idea about avoiding random writes, and could also be a source of inconsistencies if the power runs out in the middle of a rewrite. What we can do is pick a new, empty log, rewrite only the meaningful part of the old log, and only then remove the old log. A compacted form of a log above looks like this:

  1. Create a file h in directory /, assign inode number H

And that’s it!

When to compact logs? It’s not a trivial decision, since compacting involves rewriting whole blocks, which is costly in terms of I/O. It’s also best to only compact blocks if a significant part of it will be discarded. For instance, if only 3 out of 10,000 entries in a log are obsolete, it makes little sense to rewrite 9,997 entries to a new log just to reclaim a couple of bytes. If, however, 60% of the log is already obsolete, it’s tempting to start a compaction process. Note that even though compaction’s ultimate goal is to reclaim memory, it temporarily needs to allocate a new block for a rewrite. It’s best to have some blocks reserved for compactions only, since if 100% of the disk is already taken by user data, it’s already too late to compact anything.

If possible, it would be best to perform the compaction when the load on the file system is low, e.g. all the users are currently asleep, but, given that Seastar is proficient at assigning proper priorities to background tasks, compactions can also progress slowly even if the system is heavily used at the moment.

Perceptive readers probably noticed a similarity between this process and compaction performed on SSTables — no coincidence here, ScyllaDB’s write path is heavily influenced by a log-structured design already.

Locking

Concurrent operations on a file system must be performed with caution, since the metadata needs to always be in a consistent state. No locks can easily lead to inconsistencies (e.g. creating two files with an identical path), and naïve locking can cause subtle deadlocks. In order to prevent such problems, our implementation included a lock management system, which offers both exclusive and shared locking as well as lock ordering to prevent deadlocks. The core idea is that the file system needs two basic types of locks: an inode lock and a dentry lock. An inode lock is associated with a specific file, while a dentry lock guards a specific directory entry. A single file system operation may involve multiple locks. For instance, creating a file involves:

  1. Locking a directory (shared lock) – so that it’s not deleted in the middle of an operation,
    which would create an orphaned file without a parent directory.
  2. Locking a newly created directory entry (exclusive lock) – in order to prevent creating two files with identical paths.

Naturally, we could also get away with locking a directory in exclusive mode, but that’s a performance issue, because a directory locked in such a way quickly becomes a bottleneck — think adding 100,000 independent files to the / directory at once.

Multiple locks are a first step towards deadlock. In order to prevent that, we take advantage of the fact that all locks present in the file system can be linearly ordered — and thus we can always take the locks in increasing order, which ensures that there won’t be any loops. The order of locking in our file system is defined as follows. Let x be an inode, (x) is then a lock for that inode, and (x, d) is a lock for the directory entry corresponding to that inode:

(x) < (y) ⇔ x < y
(x) < (y, d) ⇔ x ≤ y
(x, d) < (y) ⇔ x < y
(x, d) < (y, d’) ⇔ (x<y) ∨(x=y∧d < d’)

Now, given that inodes are represented as 64-bit integers and directory entries are strings, we’re always able to compare two locks and decide which one is “smaller” than the other.

Test results

SeastarFS is not production-ready by the time of writing this blog, but a significant part of it is already coded and tested (see last paragraph for details). Testing a file system is a big project by itself, and it consists of checking:

  • correctness of all file system operations under various loads
  • latency of the operations under various loads with different client concurrency levels
  • general throughput of the file system
  • correctness of the compaction process
  • performance of the compaction process — both in terms of how much data can be reclaimed and how many resources are needed for that
  • how the system compares to other systems, like XFS.

Comparing to XFS was not a trivial task, since SeastarFS is in fact a userspace library with a specific API, and XFS is an inherent part of the Linux kernel with a different interface. Still, there are tools for benchmarking and checking (click here and here) the underlying file systems written specifically for Seastar, and they were quite easy to port to SeastarFS. As a result, it was possible to see how well SeastarFS performs when compared to others. Here’s a sample chart, showing how consistent SeastarFS is with regard to latency of operations:

Many more results, charts and conclusions can be found in the next paragraph.

Source code and the paper

All of the code produced during this project is open-source and available on GitHub and seastar-dev mailing list:

Each project was also a foundation for the Bachelor’s thesis of the students who took part in it. The thesis was already reviewed and accepted by the University of Warsaw and is public to read. You can find a detailed description of the design, goals, performed tests and results in this document, in Polish. Volunteers for a translation are very welcome!: zpp_fs.pdf.

We’re very proud of contributing to the creation of this academic paper – congrats to all brand new BSc degree holders! We are definitely looking forward to continuing our cooperation with the students and the faculty of the University of Warsaw in the future. Happy reading!

The post Scylla Student Projects, Part II: Implementing an Async Userspace File System appeared first on ScyllaDB.

Apache Cassandra vs DynamoDB

In this post, we’ll look at some of the key differences between Apache Cassandra (hereafter just Cassandra) and DynamoDB.

Both are distributed databases and have similar architecture, and both offer incredible scalability, reliability, and resilience. However, there are also differences,  and understanding the differences and cost benefits can help you determine the right solution for your application.

Apache Cassandra is an open source database available at no cost from the Apache Foundation. Installing and configuring Cassandra can be challenging and there is more than one pitfall along the way. However, Cassandra can be installed on any cloud service or at a physical location you choose.

The typical Cassandra installation is a cluster which is a collection of nodes (a node is a single instance of Cassandra installed on a computer or in a Docker container). Nodes can then be grouped in racks and data centers which can be in different locations (cloud zones and regions or physical collocations). You must scale Cassandra as your demand grows and are responsible for the ongoing management tasks such as backups, replacing bad nodes, or adding new nodes to meet demand.

Amazon DynamoDB is a fully managed database as a service. All implementation details are hidden and from the user viewpoint DynamoDB is serverless. DynamoDB automatically scales throughput capacity to meet workload demands, and partitions and repartitions your data as your table size grows, and distributes data across multiple availability zones. However, the service is available only through Amazon Web Services (AWS).

Replica Configuration and Placement

NoSQL data stores like Cassandra and DynamoDB use multiple replicas (copies of data) to ensure high availability and durability. The number of replicas and their placement determines the availability of your data.

With Cassandra, the number of replicas to have per cluster—the replication factor—and their placement is configurable. A cluster can be subdivided into two or more data centers which can be located in different cloud regions or physical collocations. The nodes in a data center can be assigned to different racks that can be assigned to different zones or to different physical racks.

In contrast, with DynamoDB, Amazon makes these decisions for you. By default, data is located in a single region and is replicated to three (3) availability zones in that region. Replication to different AWS regions is available as an option. Amazon streams must be enabled for multi-region replication.

Data Model

The top level data structure in Cassandra is the keyspace which is analogous to a relational database. The keyspace is the container for the tables and it is where you configure the replica count and placement. Keyspaces contain tables (formerly called column families) composed of rows and columns. A table schema must be defined at the time of table creation.

The top level structure for DynamoDB is the table which has the same functionality as the Cassandra table. Rows are items, and cells are attributes. In DynamoDB, it’s possible to define a schema for each item, rather than for the whole table.

Both tables store data in sparse rows—for a given row, they store only the columns present in that row. Each table must have a primary key that uniquely identifies rows or items. Every table must have a primary key which has two components: 

  • A partition key that determines the placement of the data by subsetting the table rows into partitions. This key is required.
  • A key that sorts the rows within a partition. In Cassandra, this is called the clustering key while DynamoDB calls it the sort key. This key is optional.

Taken together, the primary key ensures that each row in a table is unique. 

 Differences 

  • DynamoDB limits the number of tables in an AWS region to 256.  If you need more tables, you must contact AWS support. There are no hard limits in Cassandra. The practical limit is around 500 tables.
  • DynamoDB is schemaless. Only the primary key attributes need to be defined at table creation. 
  • DynamoDB charges for read and write throughput and requires you to manage capacity for each table. Read and write throughput and associated costs must be considered when designing tables and applications. 
  • The maximum size of an item in DynamoDB is 400KB. With Cassandra, the hard limit is 2GB; the practical limit is a few megabytes.
  • In DynamoDB, the primary key can have only one attribute as the primary key and one attribute as the sort key. Cassandra allows composite partition keys and multiple clustering columns.
  • Cassandra supports counter, time, timestamp, uuid, and timeuuid data types not found in DynamoDB.

Allocating Table Capacity

Both Cassandra and DynamoDB require capacity planning before setting up a cluster. However, the approaches are different. 

To create a performant Cassandra cluster, you must first make reasonably accurate estimates of your future workloads. Capacity is allocated by creating a good data model, choosing the right hardware, and properly sizing the cluster. Increasing workloads are met by adding nodes.

With DynamoDB, capacity planning is determined by the type of the read/write capacity modes you choose. On demand capacity requires no capacity planning other than setting an upper limit on each table. You pay only for the read and write requests on the table. Capacity is measured in Read Resource Units and Write Resource Units. On demand mode is best when you have an unknown workload, unpredictable application traffic, or you prefer the ease of paying for only what you use.

With provisioned capacity, you must specify the number of reads and write throughput limits for each table at the time of creation. If you exceed these limits for a table or tables, DynamoDB will throttle queries until usage is below defined capacity. Auto-scaling will adjust your table’s provisioned capacity automatically in response to traffic changes although there is a lag between the time throttling starts and increased capacity is applied.   

The throughput limits are provisioned in units called Read Capacity Units (RCU) and Write Capacity Units (WCU); queries are throttled whenever these limits are exceeded. One read capacity unit represents one strongly consistent read per second, or two eventually consistent reads per second, for an item up to 4 KB in size. Transactional read requests require two read capacity units to perform one read per second for items up to 4 KB. If you need to read an item that is larger than 4 KB, DynamoDB must consume additional read capacity units. One write capacity unit represents one write per second for an item up to 1 KB in size. If you need to write an item that is larger than 1 KB, DynamoDB must consume additional write capacity units. Transactional write requests require 2 write capacity units to perform one write per second for items up to 1 KB. For more information, see Managing Settings on DynamoDB Provisioned Capacity Tables.

Provisioned mode is a good option if any of the following are true: 

  • You have predictable application traffic.
  • Application traffic is consistent or ramps gradually.
  • You can forecast capacity requirements to control costs.

Partitions

Both Cassandra and DynamoDB group and distribute data based on the hashed value of the partition key. Both call these grouping partitions but they have very different definitions.

In Dynamo the partition is a storage unit that has a maximum size of 10 GB. When a partition fills, DynamoDB creates a new partition and the user has no control over the process. A partition can contain items with different partition key values. When a sort key is used, all the items with the same partition key value physically close together, ordered by sort key value.

DynamoDB partitions have capacity limits of 3,000 RCU or 1,000 WCU even for on-demand tables. Furthermore, these limits cannot be increased. If you exceed the partition limits, your queries will be throttled even if you have not exceeded the capacity of the table. See Throttling and Hot Keys (below) for more information.

A Cassandra partition is a set of rows that share the same hashed partition key value.  Rows with the same partition key are stored on the same node. Rows within the partition are sorted by the clustering columns. If no clustering column was specified, the partition holds a single row. While it would not be desirable, it would be possible for an application to drive tens of thousands of reads/writes to a single partition. 

See Cassandra Data Partitioning.

Query Language 

Cassandra provides a SQL-like language called Cassandra Query Language (CQL) to access data. DynamoDB uses JSON syntax. The following table shows the syntax for the query “return all information from the Music table for the song title ‘Lullaby of Broadway’ and the artist ‘Tommy Dorsey’”

CQL DynamoDB
Request all information for the song  ‘Lullaby of Broadway‘ played by Tommy Dorsey

SELECT *

FROM Music

WHERE Artist=’Tommy Dorsey’ AND SongTitle = ‘Lullaby of Broadway

get-item {

    TableName: “Music”,

    Key: {

        “Artist”: “Tommy Dorsey”,

        “SongTitle”: “Lullaby of Broadway

    }

} 


Secondary Indexes

By default, Cassandra and DynamoDB queries can use only the primary key columns in the search condition which must include all partition key columns. Non-key columns can be used in a search by creating an index on that column.

Cassandra supports creating an index on most columns including a clustering column of a compound primary key or on the partition key itself. Creating an index on a collection or the key of a collection map is also supported. However, when used incorrectly a secondary index can hurt performance. A general rule of thumb is to index a column with low cardinality of few values and to use only with the partition key in the search clause. Because the index table is stored on each node in a cluster, a query using a secondary index can degrade performance if multiple nodes are accessed. 

DynamoDB has local secondary indexes. This index uses the same partition key as the base table but has a different sort key. Scoped to the base table partition that has the same partition key value. Local secondary indexes must be created at the same time the table is created. A maximum of 5 local secondary indexes may be created per table. 

Materialized Views versus Global Secondary Indexes 

In Cassandra, a Materialized View (MV) is a table built from the results of a query from another table but with a new primary key and new properties. Queries are optimized by the primary key definition. The purpose of a materialized view is to provide multiple queries for a single table. It is an alternative to the standard practice of creating a new table with the same data if a different query is needed. Data in the materialized view is updated automatically by changes to the source table. However, the materialized view is an experimental feature and should not be used in production.

A similar object is DynamoDB is the Global Secondary Index (GSI) which creates an eventually consistent replica of a table. The GSI are created at table creation time and each table has a limit of 20. The GSI must be provisioned for reads and writes; there are storage costs as well.

Time To Live

Time To Live (TTL) is a feature that automatically removes items from your table after a period of time has elapsed.

  • Cassandra specifies TTL as the number of seconds from the time of creating or updating a row, after which the row expires.
  • In DynamoDB, TTL is a timestamp value representing the date and time at which the item expires.
  • DynamoDB applies TTL at item level. Cassandra applies it to the column.

Consistency

Both Cassandra and DynamoDB are distributed data stores.  In a distributed system there is a tradeoff between consistency—every read receives the most recent write or an error, and availability—every request receives a (non-error) response but without the guarantee that it contains the most recent write. In such a system there are two levels possible levels of consistency:

  • Eventual consistency. This implies that all updates reach all replicas eventually. A read with eventual consistency may return stale data until all replicas are reconciled to a consistent state.
  • Strong consistency returns up-to-date data for all prior successful writes but at the cost of slower response time and decreased availability.

DynamoDB supports eventually consistent and strongly consistent reads on a per query basis. The default is eventual consistency. How it is done is hidden from the user.

 Strongly consistent reads in DynamoDB have the following issues: 

  • The data might not be available if there is a network delay or outage.
  • The operation may have higher latency.
  • Strongly consistent reads are not supported on global secondary indexes.
  • Strongly consistent reads use more throughput capacity than eventually consistent reads and therefore is more expensive. See Throttling and Hot Keys (below).

Cassandra offers tunable consistency for any given read or write operation that is configurable by adjusting consistency levels. The consistency level is defined as the minimum number of Cassandra nodes that must acknowledge a read or write operation before the operation can be considered successful. You are able to configure strong consistency for both reads and writes at a tradeoff of increasing latency.

Conflicts Resolution

In a distributed system, there is the possibility that a query may return inconsistent data from the replicas. Both Cassandra and DynamoDB resolve any inconsistencies with a “last write wins” solution but with Cassandra, every time a piece of data is written to the cluster, a timestamp is attached. Then, when Cassandra has to deal with conflicting data, it simply chooses the data with the most recent timestamp.

For DynamoDB, “last write wins” applies only to global tables and strongly consistent reads.

Security Features

Cassandra and DynamoDB provide methods for user authentication and authorization and data access permissions  Both use encryption for client and inter-node communication. DynamoDB also offers encryption at rest. Instaclustr offers encryption at rest for its Cassandra Managed Services.

Performance Issues

Consistency and Read Speed

Choosing strong consistency requires more nodes to respond to a request which increases latency.

Scans

Scans are expensive for both systems. Scans in Cassandra are slow because Cassandra has to scan all nodes in the cluster. Scans are faster in DynamoDB but are expensive because resource use is based on the amount of data read not returned to the client. If the scan exceeds your provisioned read capacity, DynamoDB will generate errors.

DynamoDB’s Issues

Auto Scaling

Amazon DynamoDB auto scaling uses the AWS Application Auto Scaling Service to dynamically adjust provisioned throughput capacity on your behalf, in response to actual traffic patterns. This enables a table or a global secondary index to increase its provisioned read and write capacity to handle sudden increases in traffic, without throttling. When the workload decreases, application auto scaling decreases the throughput so that you don’t pay for unused provisioned capacity.

  • Auto scaling does not work well with varying and bursty workloads. The table will scale up only based on consumption, triggering these alarms time after time until it reaches the desired level.
  • There can be a lag (5-15 minutes) between the time capacity is exceeded and autoscaling takes effect.
  • Capacity decreases are limited. The maximum is 27 per day (A day is defined according to the GMT time zone).
  • Tables scale based on the number of requests made, not the number of successful requests.
  • Autoscaling cannot exceed hard I/O limits for tables.

Throttling and Hot Keys

In DynamoDB, the provisioned capacity of a table is shared evenly across all the partitions in a table with the consequence that each partition has a capacity less than the table capacity. For example, a table that has been provisioned with 400 WCU and 100 RCU and had 4 partitions, each partition would have a write capacity of 100 WCU and 25 RCU.  

So if you had a query send most read/write requests to a single partition—a hot partition— and that throughput exceeded the shared capacity of that partition, your queries would be throttled even though you had unused capacity in the table. If your application creates a hot partition, your only recourse would be either to redesign the data model or to over allocate capacity to the table.

Adaptive capacity can provide up to 5 minutes of grace time by allocating unused capacity from other partitions to the “hot” one provided unused capacity is available and hard limits are not reached. The hard limits on a partition are 3,000 RCU or 1,000 WCU.

Cross-Region Replication

If you want a DynamoDB table to span regions, you’ll need to use global tables that require careful planning and implementation. The tables in all regions must use on-demand allocation or have auto scaling enabled. The tables in each region must be the same: time to live, the number of global, and local secondary indexes, and so on.

Global tables support only eventual consistency that can lead to data inconsistency. In such a conflict, DynamoDB applies a last writer wins (lww) policy. The data with the most recent timestamp is used.

Migrating an existing local table to a global table requires additional steps. First, DynamoDB Streams is enabled on the original local table to create a changelog. Then an AWS Lambda function is configured to copy the corresponding change from the original table to the new global table.

Cost Explosion 

As highlighted in Managed Cassandra Versus DynamoDB, DynamoDB’s pricing model can easily make it expensive for a fast growing company:

  • The pricing model is based on throughput, therefore costs increase as IO increases. 
  • A hot partition may require you to overprovision a table.
  • Small reads and writes are penalized because measured throughput is rounded up to the nearest 1 KB boundary for writes and 4 KB boundary for reads.
  • Writes are four times more expensive than reads.
  • Strongly consistent reads are twice the cost of eventually consistent reads. 
  • Workloads performing scans or queries can be costly because the read capacity units are calculated on the number of bytes read rather than the amount of data returned.
  • Read heavy workloads may require DynamoDB Accelerator (DAX) which is priced per node-hour consumed. A partial node-hour consumed is billed as a full hour.
  • Cross region replication incurs additional charges for the amount of data replicated by the Global Secondary Index and the data read from DynamoDB Streams.
  • DynamoDB does not distinguish between a customer-facing, production table versus tables used for development, testing, and staging.

Summary 

DynamoDB’s advantages are an easy start; absence of the database management burden; sufficient flexibility, availability, and auto scaling; in-built metrics for monitoring; encryption of data at rest.

Cassandra’s main advantages are: fast speed of writes and reads; constant availability; a SQL-like Cassandra Query Language instead of DynamoDB’s complex API; reliable cross data center replication; linear scalability and high performance. 

However, the mere technical details of the two databases shouldn’t be the only aspect to analyze before making a choice.

Cassandra versus DynamoDB costs must be compared carefully. If you have an application that mostly writes or one that relies on strong read consistency, Cassandra may be a better choice.

You will also need to know which technologies you need to supplement the database? If you need the open source like ElasticSearch, Apache Spark, or Apache Kafka, Cassandra is your choice. If you plan to make extensive use of AWS products, then it’s DynamoDB.

If you use a cloud provider other than AWS or run your own data centers, then you would need to choose Cassandra.

The post Apache Cassandra vs DynamoDB appeared first on Instaclustr.

Python scylla-driver: how we unleashed the Scylla monster’s performance

At Scylla summit 2019 I had the chance to meet Israel Fruchter and we dreamed of working on adding shard-awareness support to the Python cassandra-driver which would be known as scylla-driver.

A few months later, when Israel reached out to me on the Scylla-Users #pythonistas Slack channel with the first draft PR I was so excited that I jumped in the wagon to help!

The efforts we put into the scylla-driver paid off and significantly improved the performance of the production applications that I had the chance to switch to using it by 15 to 25%!

Before we reached those numbers and even released the scylla-driver to PyPi, EuroPython 2020 RFP was open and I submitted a talk proposal which was luckily accepted by the community.

So I had the chance to deep-dive into Cassandra vs Scylla architecture differences, explain the rationale behind creating the scylla-driver and give Python code details on how we implemented it and the challenges we faced doing so. Check my talk spage for

I also explained that I wrote an RFC on the scylla-dev mailing list which lead the developers of Scylla to implement a new connection-to-shard algorithm that will allow clients connecting to a new listening port to select the actual shard they want a connection to.

This is an expected major optimization from the current mostly random way of connecting to all shards and I’m happy to say that it’s been implemented and is ready to be put to use by all the scylla drivers.

I’ve recently been contacted by PyCon India and other Python related conferences organizers for a talk so I’ve submitted one to PyCon India where I hope I’ll be able to showcase even better numbers thanks to the new algorithm!

After my Europython talk we also had very interesting discussions with Pau Freixes about his work on a fully asynchronous Python driver that wraps the C++ driver to get the best possible performance. First results are absolutely amazing so if you’re interested in this, make sure to give it a try and contribute to the driver!

Stay tuned for more amazing query latencies 😉

Benchmarking Cassandra with Local Storage on Azure

Continuing our efforts in adding support for the latest generation of Azure Virtual Machines, Instaclustr is pleased to announce support for the D15v2 and L8v2 types with local storage support for Apache Cassandra clusters on the Instaclustr Managed Platform. So let’s begin by introducing the two forms of storage.

Local storage is a non-persistent (i.e. volatile) storage that is internal to the physical Hyper-V host that a VM is running on at any given time. As soon as the VM is moved to another physical host as a result of a deallocate (stop) and start command, hardware or VM crash, or Azure maintenance, the local storage is wiped clean and any data stored on it is lost.

Remote storage is persistent and comes in the form of managed (or unmanaged) disks.  These disks can have different performance and redundancy characteristics. Remote storage is generally used for any VM’s OS disk or data disks that are intended to permanently store important data. Remote storage can be detached from the VM from one physical host to another in order to preserve the data. It is stored on Azure’s storage account which VMs use to attach it via the network. The physical Hyper-V host running the VM is independent of the remote storage account, which means that any Hyper-V host or VM can mount a remotely stored disk on more than one occasion.

Remote storage has the critical advantage of being persistent, while local storage has the advantage of being faster (because it’s local to the VM – on the same physical host) and included in the VM cost. When starting an Azure VM you only pay for its OS and disks. Local storage on a VM is included in the price of the VM and often provides a much cheaper storage alternative to remote storage.

The Azure Node Types

D15_v2s

D15_v2s are the latest addition to Azure’s Dv2 series, featuring more powerful CPU and optimal CPU-to-memory configuration making them suitable for most production workloads. The Dv2-series is about 35% faster than the D-series. D15_v2s are a memory optimized VM size offering a high memory-to-CPU ratio that is great for distributed database servers like Cassandra, medium to large caches, and in-memory data stores such as Redis.

Instaclustr customers can now leverage these benefits with the release of the D15_v2 VMs, which provide 20 virtual CPU cores and 140 GB RAM backed with 1000 GB of local storage.

L8_v2s

L8_v2s are a part of the Lsv2-series featuring high throughput, low latency, and directly mapped local NVMe storage. L8s like the rest of the series were designed to cater for high throughput and high IOPS workloads including big data applications, SQL and NoSQL databases, data warehousing, and large transactional databases. Examples include Cassandra, Elasticsearch and Redis. In general, applications that can benefit from large in-memory databases are a good fit for these VMs.

L8s offer 8 virtual CPU cores and 64 GB in RAM backed by local storage space of 1388 GB. L8s provide roughly equivalent compute capacity (cores and memory) to a D13v2 at about ¾ of the price. This offers speed and efficiency at the cost of forgoing persistent storage.

Benchmarking

Prior to the public release of these new instances on Instaclustr, we conducted Cassandra benchmarking for:

VM Type
CPU Cores RAM Storage Type Disk Type
DS13v2 8 56 GB remote 2046 GiB (SSD)
D15v2 20 140 GB local 1000 GiB (SSD)
L8s w/local 8 64 GB local 1388 GiB (SSD)
L8s w/remote 8 64 GB remote 2046 GiB (SSD)

All tests were conducted using Cassandra 3.11.6

The results are designed to outline the performance of utilising local storage and the benefits it has over remote storage. Testing is split into two groups, fixed and variable testing. Each group runs two different sets of payload:

  1. Small – Smaller more frequent payloads which are quick to execute but are requested in large numbers.
  2. Medium – Larger Payloads which take longer to execute and are potentially capable of slowing Cassandra’s ability to process requests

The fixed testing procedure is:

  1. Insert data to fill disks to ~30% full.
  2. Wait for compactions to complete.
  3. Run a fixed rate of operations
  4. Run a sequence of tests consisting of read, write and mixed read/write operations.
  5. Run the tests and measure the operational latency for a fixed rate of operations. 

The variable testing procedure is:

  1. Insert data to fill disks to ~30% full.
  2. Wait for compactions to complete.
  3. Target a median latency of 10ms.
  4. Run a sequence of tests comprising read, write and mixed read/write operations. These tests were broken down into multiple types.
  5. Run the tests and measure the results including operations per second and median operation latency. 
  6. Quorum consistency for all operations.
  7. We incrementally increase the thread count and re-run the tests until we have hit the target median latency and are under the allowable pending compactions.
  8. Wait for compactions to complete after each test.

As with any generic benchmarking results the performance may vary from the benchmark for different data models or application workloads. However, we have found this to be a reliable test for comparison of relative performance that will match many practical use cases.

Comparison

When comparing performance between instance types with local and remote storage it is important to take note of the latency of the operations. The latency of read operations indicate how long a request takes to retrieve information from the disk and return it back to the client.

In the fixed small and medium read tests, local storage offered better latency results in comparison to instances with remote storage. This is more noticeable in the medium read tests, where the tests had a larger payload and required more interaction with the locally available disks. Both L8s (with local storage) and D15_v2s offered a lower latency to match the operation rate given to the cluster.

When running variable-small-read tests under a certain latency, the operation rate for D15v2 reached nearly 3 times the number of ops/s for D13v2s with remote storage. Likewise L8s with local storage out performed L8s (with remote storage). L8s (with local storage) had twice the performance and half the latency of the L8s (with remote storage). 

variable-read-small L8s w/local L8s w/remote DS13_v2 DS15_v2
Operation Rate 19,974 7,198 23,985 61,489
Latency mean (avg) 22.1 48.3 21.9 19.4
Latency medium 20.1 40.3 19.9 17.7
Latency 95th percentile 43.6 123.5 43.3 39.2
Latency 99th percentile 66.9 148.9 74.3 58.9

In the medium read tests, instances with local storage outperformed instances with remote storage. Both L8s (with local storage) and D15s had a far better ops/s and latency result, even with a significantly higher operation rate. This makes a very convincing argument for local storage over remote storage when seeking optimal performance.

variable-read-medium L8s w/local L8s w/remote DS13_v2 DS15_v2
Operation Rate 235 76 77 368
Latency mean (avg) 4.2 13 12.9 2.7
Latency medium 3.2 8.4 9.5 2.5
Latency 95th percentile 4.9 39.7 39.3 3.4
Latency 99th percentile 23.5 63.4 58.8 4.9

Looking at the writes on the other hand, D15s outperformed due to their large pool of CPU cores. While differences were less obvious in the small tests, results were more obvious in the medium tests. Further investigation will be conducted to determine why this is the case.

Based on the graph for variable medium testing, D15s outperformed in all categories. D15v2s have both a larger number of cores to outpace competition with heavy loads of writes and local storage offering faster disk intensive reads. This was additionally supported by strong performance in the mixed medium testing results.

L8s with local storage took second place, performing better than DS13v2s in the read and mixed tests. Whilst DS13v2 nodes slightly edged out L8s in writes for larger payloads. The mixed results showed a substantial difference in performance between the two with L8s taking a lead thanks to local storage providing faster disk intensive reads. 

Conclusion

Based on the results from this comparison, we find that local storage offers amazing performance results for disk intensive operations such as reads. D15v2 nodes, with their large number of cores to perform CPU intensive writes and local storage to help with disk intensive reads, offer top tier performance for any production environment.

Furthermore, L8s with local storage offer a great cost efficient solution at around ¾ of the price of D13v2s and offer a better price-performance gain notably in read operations. This is especially beneficial for a production environment which prioritizes reads over writes.

In order to assist customers in upgrading their Cassandra clusters from currently used VM types to D15v2s or L8 VM type nodes, Instaclustr technical operations team has built several tried and tested node replacement strategies to provide zero-downtime, non-disruptive migrations for our customers. Reach out to our support team if you are interested in these new instance types for your Cassandra cluster.

You can access pricing details through the Instaclustr Console when you log in, or contact our Support team.

The post Benchmarking Cassandra with Local Storage on Azure appeared first on Instaclustr.

Scylla Enterprise Release 2020.1.0

The Scylla team is pleased to announce the release of Scylla Enterprise 2020.1.0, a production-ready Scylla Enterprise major release. After 5,568 commits originating from five open source releases, we’re excited to now move forward with Scylla Enterprise 2020. This release marks a significant departure for us, while we’ve said for years we are a drop-in replacement for Apache Cassandra we are now also a drop-in replacement for Amazon DynamoDB.

The Scylla Enterprise 2020.1 release is based on Scylla Open Source 4.0, promoting open source features to Enterprise level support, including DynamoDB-compatible API (Alternator) and Lightweight Transactions (LWT).

Alternator, Our DynamoDB-Compatible API

DynamoDB users can now switch to Scylla without changing a single line of application code. Scylla significantly reduces the total cost of ownership, delivers lower and more consistent latencies and expands the limitations DynamoDB places on object size, partition size, etc. Developers don’t want to be locked into a single platform, especially one that’s so expensive.

Scylla can also run as a complement to DynamoDB, extending an existing DynamoDB implementation to additional deployment options. You can run clusters on-premises, on your preferred cloud platforms or on Scylla’s fully managed database as a service, Scylla Cloud. You are free to access their data as you like, without pay-per-operation fees, and with more deployment options, including open source solutions like Docker and Kubernetes.

Improvements over Cassandra

Meanwhile we continue to extend our lead over Apache Cassandra and other Cassandra-compatible offerings. For example, Scylla’s implementation of the Paxos consensus algorithm for Lightweight Transactions (LWT) is more efficient and consistent by default, allowing you better performance for compare-and-set conditional update operations.

We’ve also introduced unique enhancements for the Cassandra Query Language (CQL), to provide more granular and efficient results, from adding new clauses such as BYPASS CACHE, to enhancements in ALLOW FILTERING (such as multi-column and CONTAINS restrictions), to a LIKE operator to enable searching for patterns in user data.

Also in the Scylla Enterprise 2020 release roadmap is Change Data Capture (CDC), a stream of database updates implemented as standard CQL tables. This means your change data will be distributed across your cluster normally, and you can use standard CQL queries to see changes, pre-images and post-images of affected records. You will not need specialized applications to deduplicate and consume updates.

Unique Capabilities

Since we made our last major release in May 2019 we introduced Scylla-exclusive innovations, such as a new default compaction strategy Incremental Compaction Strategy (ICS), which can save users over a third of their total storage requirements, and Workload Prioritization, which enables balancing different classes of transactional or analytical processing on the same cluster.

These unique capabilities allow Scylla Enterprise users to get far more efficient utility out of their existing infrastructure — advances no other NoSQL database vendor can match, and are even above and beyond our highly performant open source offering.

Staying at the Forefront

We’ve also seen many advances in cloud computing, such as the 60 terabyte-scale I3en series “meganodes” from AWS, which enable much higher storage-to-memory densities, and which Scylla Enterprise 2020.1 now supports. Scylla Enterprise also keeps us in unison with industry advances in Linux, supporting Red Hat Enterprise Linux 8, CentOS 8, and Debian 10. We’ve also put in a great deal of work to ensure our code is portable, including relocatable packages.

Scylla Enterprise 2020.1 includes all of 2019.1.x bug fixes and features plus additional features and fixes brought in from Scylla Open Source 4.x.

Additional functionality enhancements since 2019.1.0 includes:

  • Local secondary indexes (next to our global indexes)
  • IPv6 support
  • Various query improvements including CQL per partition limit and GROUP BY

Security enhancements

  • Encryption at Rest

Usability enhancements include:

  • Large Cell / Collection Detector
  • Nodetool TopPartitions

Performance enhancements include:

  • MC SSTable format
  • Row-level repair
  • Improved CQL server admission control

Related Links

Scylla Enterprise customers are encouraged to upgrade to Scylla Enterprise 2020.1, and are welcome to contact our Support Team with questions.

Deployment

  • Scylla Enterprise packages are newly available for:
    • Debian 10
    • Red Hat Enterprise Linux 8
    • CentOS 8
  • Scylla EC2 AMI user data format has changed. The new format is JSON based, and while it supports all the options it is not backward compatible.
  • Completing the new AMI format, we are releasing a Cloud Formation template, for easy launch of a Scylla cluster in one region.
  • Scylla Docker: node_exporter is now part of the Scylla Docker, making it easier to consume OS-level metrics when working with Docker and Scylla Monitoring
  • Scylla Enterprise 2020.1 is also optimized to work with the new AWS EC2 I3en server family.
  • Scylla Enterprise 2020.1 is *not* available on Ubuntu 14.04 or Debian 8. If you are using Scylla with either of these OS/versions please upgrade to Ubuntu 16.04 or 18.04 or Debian 9 accordingly.

New Features in 2020.1

Lightweight Transactions (LWT)

Also known as Compare and Set (CAS), add support for conditional INSERT and UPDATE CQL commands. Scylla supports both equality and non-equality conditions for lightweight transactions (i.e., you can use <, <=, >, >=, !=,= and IN operators in an IF clause).

You can learn more on LWT in Scylla and LWT optimizations from the latest LWT Webinar (registration required) and LWT documentation.

Scylla Alternator: The Open Source DynamoDB-compatible API

Project Alternator is an open-source implementation for an Amazon DynamoDB™-compatible API. The goal of this project is to deliver an open source alternative to Amazon’s DynamoDB that can be run anywhere, whether on-premises or on your own favorite cloud provider, or in Scylla Cloud.

Scylla Alternator was introduced as an experimental feature in Scylla 3.2, and it is now, in Scylla 2020.1.0, it is promoted to the Enterprise version.

Read more here: Open Source DynamoDB-compatible API Documentation.

  • Support IPv6 for client-to-node and node-to-node communication #2027
    Scylla now supports IPv6 Global Scope Addresses for all IPs: seeds, listen_address, broadcast_address etc. Note that Scylla Monitoring stack 3.0 and above supports IPv6 addresses as well, and IPv6 support for Scylla Manager is coming soon (Scylla Manager 2.0.1). Make sure to enable enable_ipv6_dns_lookup in scylla.yaml (see below)

Example from scylla.yaml

Enable_ipv6_dns_lookup: true
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "fcff:69:46::7b"
listen_address: fcff:69:46::7b
broadcast_rpc_address: fcff:69:46::7b
rpc_address: fcff:69:46::7b

Documentation for IPv6

CQL Enhancements

  • CQL: support for GROUP BY to Select statement #2206

Example:
SELECT partitionKey, max(value) FROM myTable GROUP BY partitionKey;
SELECT partitionKey, clustering0, clustering1, max(value) FROM myTable GROUP BY partitionKey, clustering0, clustering1;

Documentation for GROUP BY

  • CQL: LIKE Operation #4477
    The new CQL LIKE keyword allows matching any column to a search pattern, using % as a wildcard. Note that LIKE only works with ALLOW FILTERING.

LIKE Syntax support:
'_' matches any single character
'%' matches any substring (including an empty string)
'\' escapes the next pattern character, so it matches verbatim
any other pattern character matches itself
an empty pattern matches empty text fields

For example:
> INSERT INTO t (id, name) VALUES (17, ‘Mircevski’)
> SELECT * FROM t where name LIKE 'Mirc%' allow filtering

Documentation for the new LIKE operator

  • CQL: Support open range deletions in CQL #432
    Allow range deletion based on the clustering key, similar to range query with SELECT

Example:
CREATE TABLE events ( id text, created_at date, content text, PRIMARY KEY (id, created_at) );
DELETE FROM events WHERE id='concert' AND created_at <= '2019-10-31';

Documentation for open range deletions

  • CQL: Auto-expand replication_factor for NetworkTopologyStrategy #4210
    Allowing users to set Replication Factor (RF) for all existing Data Centers (DCs).

Example:
CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}

Note than when adding new data centers, you need to set the RF again to include the added DCs. You can again use the auto-expansion option:
ALTER KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}

Or you can set RF for the new DC manually (if you want a different RF, for example), but remember to keep the old DCs’ replication factors:
ALTER KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3, 'new-dc' : 5}

Documentation for the new Auto-expand replication_factor

  • CQL: Support non-frozen UDTs #2201
    Non-frozen User Defined Types (UDTs), similarly to non-frozen collections, allow updating single fields of a UDT value without knowing the entire value. For example:

CREATE TYPE ut (
a int,
b int
);
CREATE TABLE tbl (
pk int PRIMARY KEY,
val ut
);
INSERT INTO tbl (pk, val) VALUES (0, {a:0, b:0});
UPDATE tbl SET val.b = 1 WHERE pk = 0;

Documentation for the new non-frozen UDTs

  • CQL: CQL Tracing now include reports on SSTable access #4908
    Disk I/O has a strong effect on query latency, but until now, it was not reported in CQL tracing.
    This change introduces relevant entries to the CQL trace whenever SSTables are read from disk.

Example:

  • Add CQL PER PARTITION LIMIT
    For example SELECT * FROM users PARTITION LIMIT 2;
    • See here for using LIMIT for SELECTs
  • BYPASS CACHE clause #3770
    The new BYPASS CACHE clause on SELECT statements informs the database that the data being read is unlikely to be read again in the near future, and also was unlikely to have been read in the near past; therefore no attempt should be made to read it from the cache or to populate the cache with the data. This is mostly useful for range scans which typically process large amounts of data with no temporal locality and do not benefit from the cache.

For example:
SELECT * from heartrate BYPASS CACHE;

If you are using Scylla Monitoring Stack, you can use the Cache section of the Scylla Per Server dashboard, to see the effect of the BYPASS CACHE command on the cache hit and miss ratio.

    • More on BYPASS CACHE here
  • ALLOW FILTERING enhancements
    • Support multi-column restrictions in ALLOW FILTERING #3574
      SELECT * FROM t WHERE (c, d) IN ((1, 2), (1,3), (1,4)) ALLOW FILTERING;
      SELECT * FROM t WHERE (c, d) < (1, 3) ALLOW FILTERING;
      SELECT * FROM t WHERE (c, d) < (1, 3) AND (c, d) > (1, 1) ALLOW FILTERING;
    • Support restricted column, not in select clause #3803
      CREATE TABLE t (id int primary key, id_dup int);
      SELECT id FROM t WHERE id_dup = 3 ALLOW FILTERING;
      — the restriction is applied to a column, which is not in the select clause. In prior versions this command returned incorrect results, but now it works as intended
    • Support CONTAINS restrictions #3573
      CREATE TABLE t (p frozen<map<text, text>>, c1 frozen<list>, c2 frozen<set>, v map<text, text>, id int, PRIMARY KEY(p, c1, c2));SELECT id FROM t WHERE p CONTAINS KEY 'a' ALLOW FILTERING;
      SELECT id FROM t WHERE c1 CONTAINS 3 ALLOW FILTERING;
      SELECT id FROM t WHERE c2 CONTAINS 0.1 ALLOW FILTERING;
      SELECT id FROM t WHERE v CONTAINS KEY 'y1' ALLOW FILTERING;
      SELECT id FROM t WHERE v CONTAINS KEY 'y1' AND c2 CONTAINS 3 ALLOW FILTERING;
  • CQL: Group functions count now works with bytes type #3824
    CREATE TABLE t (p INT, c INT, v BOOLEAN, PRIMARY KEY (p, c));
    SELECT COUNT(v) FROM t WHERE p = 1 AND c = 1;
  • Local Secondary Index #4185
    Local Secondary Indexes are an alternative to Global Secondary Indexes, allowing Scylla to optimize workloads where the partition key of the base table and the index are the same key.

Reduce Storage Space

  • Incremental Compaction Strategy (ICS) – introduced in 2019.1.4, ICS is the default compaction strategy in 2020.1
    The new ICS compaction strategy is an improvement on the default Size Tiered Compaction Strategy (STCS). While it shares the same read and writes amplification factors as STCS, it fixes its doubling of temporary disk usage issues by breaking huge SSTables into smaller chunks, all of which are named an SSTable run.

While STCS forces you to keep 50% of your storage reserved for temporary compaction allocation, ICS reduces most of this overhead, allowing you to use more of the disk for regular usage. This can translate to using fewer nodes, or storing more data on the existing cluster. For example, a typical node with 4 TB and 16 shards will have less than 20% temporary space amplification with ICS, allowing the user to run at up to 80% capacity. In a follow-up blog post we will provide a deep dive into ICS disk usage saving.

ICS is only available in Scylla Enterprise
More on ICS:

  • SSTable “mc” format is enabled by default. Apache Cassandra 3.x SSTable format (mc) has been available in Scylla Open Source since release 3.0, and were also available in Scylla Enterprise 2019.1, but were not enabled by default. In Scylla Enterprise 2020.1, mc formatted tables are now enabled by default. This format can save users significant amounts of disk storage; as much as 50%. Note that you can continue to use the old file format by setting enable_sstables_mc_format: false in scylla.yaml.
    • More on the mc format here

Performance

  • Performance: Segregate data by timestamp for Time Window Compaction Strategy (TWCS) #2687
    Isolate TWCS from read repair or compaction with an imported SSTable, which may ruin the TWCS file per time range property.
  • Performance: add ZSTD compression #2613
    Adds support for the option to compress SSTables using the Zstandard algorithm from Facebook.
    To use, pass 'sstable_compression': 'org.apache.cassandra.io.compress.ZstdCompressor'
    to the compression argument when creating a table.
    You can also specify a compression_level (default is 3). See Zstd documentation for the available compression levels.

Also read our two part blog series on Compression in Scylla: Part One and Part Two.

Example:
create table a.a (a text primary key, b int) with compression = {'sstable_compression': 'org.apache.cassandra.io.compress.ZstdCompressor', 'compression_level': 1, 'chunk_length_in_kb': '64'};

  • Performance: Repair improvements in Scylla 3.2:
    • Improved data transfer efficiency between nodes. Repair is now switched to use the Seastar RPC stream interface which is more efficient to transfer large amounts of repair data. #4581
    • Increased the internal row buffer size to provide better performance on cross DC clusters with high latency links. #4581
    • Repair can now adjust ranges to repair in parallel according to memory size #4696
  • Row Level Repair #3033
    In partition-level repair (the algorithm used in Scylla Open Source 3.0 and earlier), the repair master node splits the ranges to sub-ranges containing 100 partitions, and computes the checksum of those 100 partitions and asks the related peers to do the same.
    • If the checksum matches, the data in this subrange is synced, and no further action is required.
    • If the checksum mismatches, the repair master fetches the data from all the peers and sends back the merged data to peers.

This approach has two major problems:

    1. A mismatch of only a single row in any of the 100 partitions causes 100 partitions to be transferred. A single partition can be very large, even hundreds of MBs. 100 partitions can be way over one gigabyte of data.
    2. Checksum (find the mismatch) and streaming (fix the mismatch) will read the same data twice

To fix the two issues above we introduce the new Row-Level Repair. Row-level repair works on a small range which contains only a few rows (a few megabytes of data at most), reads these rows to memory, finds the mismatches and sends them to the peers. By that, it only reads the data once, and significantly reduces the data volume stream for each row mismatch.

In a benchmark done on a three Scylla nodes cluster, on AWS using i3.xlarge instance, each with 1 billion
Rows (241 GiB of data), we tested three use cases:

Use case Description Time to repair Improvement
Scylla Open Source 3.0 Scylla Open Source 3.1
0% synced One of the nodes has zero data. The other two nodes have 1 billion identical rows. 49.0 min 37.07 min x1.32 faster
100% synced All of the 3 nodes have 1 billion identical rows. 47.6 min 9.8 min x4.85 faster
99.9% synced Each node has 1 billion identical rows and 1 billion * 0.1% distinct rows. 96.4 min 14.2 min x6.78 faster

The new row-level repair shines where a small percent of the data is out of sync – the most likely use case in case of short network issues or a node restart.

For the last use case, the bytes sent over the wire:

Scylla 3.0 Scylla 3.1 Transfer Data Ratio
TX 120.52 GiB 4.28 GiB 3.6%
RX 60.09 GiB 2.14 GiB 3.6%

As expected, where the actual difference between nodes is small, sending just relevant rows, not 100 partitions at a time, makes a huge difference.

More on row level repair implementation and results here

  • IOCB_CMD_POLL support
    On Linux 4.19 or higher, Scylla will use a new method of waiting for network events, IOCB_CMD_POLL. More about the new interface in this lwn.net article.

The new interface is detected and used automatically. To use the old interface, add the command line option --reactor-backend=epoll.

  • Materialized Views improvements
    • Minimize generated view updates for unselected column updates #3819
    • Indexing within a partition is inefficient #4185
  • Move truncation records to separate table (#4083)
  • BYPASS CACHE clause – introduced in 2019.1.1
    The new BYPASS CACHE clause on SELECT statements informs the database that the data being read is unlikely to be read again in the near future, and also was unlikely to have been read in the near past; therefore no attempt should be made to read it from the cache or to populate the cache with the data. This is mostly useful for range scans which typically process large amounts of data with no temporal locality and do not benefit from the cache.

For example:
SELECT * from heartrate BYPASS CACHE;

If you are using Scylla Monitoring Stack, you can use the Cache section of the Scylla Per Server dashboard, to see the effect of the BYPASS CACHE command on the cache hit and miss ratio.

    • More on BYPASS CACHE here

Security

Encryption at rest – introduced in 2019.1.1, 2019.1.3

You can now encrypt on-disk Scylla data, including:

  • SSTables
  • Commit logs
  • Batch and hints logs

Encryption at rest support three key Providers:

  • Local, which allows you to keep keys in the file system for each node.
  • Table provider, allows you to store table keys in Scylla Tables and eliminates the need to copy the table key to each server.
  • KMIP provider. KMIP is a standard protocol for exchanging keys in a secure way. With this key provider, you can use any KMIP compatible server to secure Scylla Encryption keys.

Known Issues in Encryption at rest:

  • DESCRIBE TABLE with encryption at rest parameters does not have the exact same format as CQL CREATE format. The results, schema backup and restore of tables with encryption at rest will require

Tooling

  • Snapshot enhancement: a table schema, schema.cql, is now part of each Scylla snapshot created with “nodetool snapshot”. Schema is required as part of the Scylla backup restore procedure. #4192
  • Connection virtual table #4820
    The new table system.clients table provides information about CQL clients currently connected to Scylla.
    Client Information includes: address, port, type, shard, protocol_version and username
  • Export system uptime via REST endpoint /system/uptime_ms
  • Large cell / collection detector
    Scylla is not optimized for very large rows or large cells. They require allocation of large, contiguous memory areas and therefore may increase latency. Rows may also grow over time. For example, many insert operations may add elements to the same collection, or a large blob can be inserted in a single operation.

Similar to the large partitions table, the large rows and large cells tables are updated when SSTables are written or deleted, for example, on memtable flush or during compaction.

Examples for using the new Large Rows and Large Cells tables:
SELECT * FROM system.from system.large_rows;
SELECT * FROM system.from system.large_cells;

  • Stability: nodetool scrub --skip-corrupted. As the name suggests, the new option allows scrub to purge corrupt data from the output, by skipping over it.
  • nodetool toppartitions #2811. Samples cluster writes and reads and reports the most active partitions in a specified table and time frame. For example:

> nodetool toppartitions nba team_roster 5000
WRITES Sampler:
Cardinality: ~5 (256 capacity)
Top 10 partitions:
Partition Count +/-
Russell Westbrook 100 0
Jermi Grant 25 0
Victor Oladipo 17 0
Andre Roberson 1 0
Steven Adams 1 0

READS Sampler:
Cardinality: ~5 (256 capacity)
Top 10 partitions:
Partition Count +/-
Russell Westbrook 100 0
Victor Oladipo 17 0
Jermi Grant 12 0
Andre Roberson 5 0
Steven Adams 1 0

  • Nodetool upgradesstables #4245
    Rewrites SSTables for keyspace/table to the latest Scylla version. Note that this is *not* required when enabling mc format, or upgrading to a newer Scylla version. In these cases, Scylla will write new SSTable, either in memtable flush or compaction, while keeping the old tables in the old format.

nodetool upgradesstables ( -a | --include-all-sstables ) --

By default, the command only rewrites SSTables which are *not* of the latest release, -a | –include-all-sstables option can by use to rewrite *all* the sstables.

Monitoring

Scylla Enterprise 2020.1.0 Dashboard are available with Scylla Monitoring Stack 3.4.1 or later

  • For new metrics compare to Scylla Enterprise 2019.1 see here

Stability and bug fixes (from 2019.1.0)

  • Date Tiered Compaction Strategy (DTCS) is deprecated and eventually be removed. If you are still using it, please switch to Time Window Compaction Strategy (TWCS).
  • API: add error injection to REST API (used for testing only)
  • A new API allows enable/disable error injections to different places in the code #3295
  • Scylla will now ignore a keyspace that is removed while being repaired, or without tables, rather than failing the repair operation. #5942 #6022
  • Local secondary indexes, as well as materialized views that have the same partition key as their base table, are now updated synchronously. This improves consistency and simplifies concurrency control. #4365. Alternator was updated to take advantage of this.
  • a new REST API for deleting a table from a snapshot #5805
  • CQL: Fix missing aggregate functions for counters
  • The long-deprecated RandomPartitioner and ByteOrderedPartitioner partitioners were removed #5636
  • Stability: improve CQL server admission control #4768
    Current admission control takes a permit when CQL requests start and releases it when a reply is sent, but some requests may leave background work behind after that point. In Scylla 3.2, admission control takes into account these background tasks, and improves the way Scylla handles overload conditions.
  • Stability: Large collections are now more resistant to memory fragmentation
  • Stability: scylla-kernel-conf package which tunes the kernel for Scylla’s needs. It now tunes vm.swappiness, to reduce the probability of the kernel swapping out Scylla memory and introducing stalls.
  • “mc” SSTable file format: empty counters were not handled correctly, which could lead to Coredump #4363. Note that mc format is disabled by default in 2019.1. More on mc (Apache Cassandra 3.0 format) here
  • Range scan: in a rare condition, a specific combination of data and its alignment in the reader buffer, can lead to missing rows from a range scan #4418. Two things are needed to trigger this defect:
    • A range tombstone that has the same start bound as the row just after it in the stream
    • This range tombstone is the last fragment in the reader’s buffer, and that the reader is evicted after the buffer is consumed.
  • TimeWindowCompactionStrategy: on some very rare cases, a use after free may hurt performance #4465
  • Stability: Possible race condition between compaction deleting SSTable and reshuffling of SSTable, for example when migrating data from Apache Cassandra, or Scylla server with a different core number. #4622
  • Stability: Catch SSTable unclosed partition format error in the write path, not in the read path. A new command line flag: –abort-on-internal-error makes Scylla exit as soon as such an error occurs, making it easier to catch and debug. #4794, #4786
  • Stability: Receiving side of streaming may abort or produce incorrect SSTables when a failure happens on the sender side, generating the following error:
    sstables/sstables.cc:1748: void sstables::seal_summary(sstables::summary&, std::experimental::fundamentals_v1::optional&&, std::experimental::fundamentals_v1::optional&&, const sstables::index_sampling_state&): Assertion `first_key' failed. #4789
  • Stability: A node restart during repair may cause the node streaming data to it to abort with an error message:
    scylla: message/messaging_service.cc:549: seastar::shared_ptr netw::messaging_service::get_rpc_client(netw::messaging_verb, netw::messaging_service::msg_addr): Assertion `!_stopping' failed.
    Aborting on shard 1.
    #4767
  • Stability: In some cases, when --abort-on-lsa-bad-alloc is enabled, Scylla aborts even though it’s not really out of memory #2924
  • Stability: Potential undefined behavior in SSTable cleanup, which may cause a segmentation fault #4718
  • Stability: When using MC SSTable format, Scylla might fail to exit gracefully with an out of disk space error: #4614
  • Stability: A possible deadlock between connection authentication and the stall detector, in cases where the authentication logic generates a stall #4759
  • Stability: Exceptions in index reader are not handled gracefully #4761
  • Stability: Fix segmentation faults when replacing expired SSTables #4085
  • Stability: Scylla init process: a possible race between role_manager and pasword_authenticator can cause Scylla to exit #4226
  • Stability: Scylla exits ungracefully when shutting down while running repair #4589
  • CQL: Using tuple in the primary key can fail the large cell detector #4645
  • CQL: Using tuples as a clustering key type without using the to_string() implementation, for example, a tuple, will cause the large row detector to exit. #4633
  • CQL: Marshalling error when using Date with capital Z for timezone, for example, '2019-07-02T18:50:10Z' #4641
  • Performance: Repair of a single shard range opens RPC connections for streaming on all shards. This is redundant and can exhaust the number of connections on a large machine. Note that Scylla Manager runs repair on a shard by shard basis. Running repairs from nodetool (nodetool repair) will make the issue even worse. #4708
  • Performance: improve the efficiency of operations on large collections
  • Sec Index: NodeJS driver: For some drivers, for example, NodeJS, paging indexes can end up in an infinite loop of returning pages with 0 results but “has_more_pages” flag set to true #4569
  • Sec Index: Filtering may ignore clustering key restrictions if they form a prefix without a partition key #4541
  • MV: In rare cases, when a node dies, and *another* node has MV updates to send him, the sending node is notified of other node death just after sending request started. The sending nodes than need to cancel an ongoing request which might cause Scylla to exit #4386
  • Sec Index: A partition key index may cause a regular query to fail with “No such index” #4539
  • Stability: Fix of handling of schema alterations and evictions in the cache, which may result in a node crash #5127 #5128 #5134 #5135
  • Stability: Fix a bug in cache accounting #5123
  • Stability: Fix a bug that can cause streaming to a new node to fail with “Cannot assign requested address” error #4943
  • Stability: A race condition in node boot can fail the init process #4709
  • Stability: Can not replace a node which is failed in the middle of the boot-up process (same root cause as #4709 above) #4723
  • Stability: Perftune.py script fails to start with “name ‘logging’ is not defined” error #4958 #4922
  • Stability: Scylla may hang or even segfaults when querying system.size_estimates #4689
  • Performance: Range scans run in the wrong service level (workload prioritization) (internal #1052)
  • Performance: Wrong priority for View streaming slow down user requests #4615
  • Hinted handoff:
  • Fix races that may lead to use-after-free events and file system level exceptions during shutdown and drain #4685 #4836
  • Commit log error “Checksum error in segment chunk at” #4231
  • Docker: An issue in command-line options parsing prevents Scylla Docker from starting, reporting “error: too many positional options have been specified on the command line” error #4141
  • In-Transit Encryption: Streaming in local DC fails if only inter-DC encryption is enabled #4953
  • Stability: non-graceful handling of end-of-disk space state may cause Scylla to exit with a coredump #4877
  • Stability: core dump on OOM during cache update after memtable flush, with !_snapshot->is_locked() failed error message #5327
  • Stability: Adding a DC with MV might fail with assertion _closing_state == state::closed #4948
  • Oversized allocation warning in reconcilable_result, for example, when paging is disabled #4780
  • Stability: running manual operations like nodetool compact will crash if the controller is disabled #5016
  • Stability: Under heavy read load, the read execution stage queue size can grow without bounds #4749
  • Stability: repair: assert failure when a local node fails to produce checksum #5238
  • CQL: One second before expiration, TTLed columns return as null values #4263, #5290
  • Stability: long-running cluster sees bad gossip generation when a node restarts #5164 (similar to CASSANDRA-10969)
  • CQL: wrong key type used when creating non-frozen map virtual column #5165
  • CQL: using queries with paging, ALLOW FILTERING and aggregation functions return intermediate aggregated results, not the full one #4540
  • Correctness: Hinted handoff (HH) sends counter hints as counter updates when a node targeted by the hint does not exist. This may cause wrong counter value when HH is enabled, Counters are used, and nodes are down. #5833 #4505
  • Correctness: wrong encoding of a negative value of type varint. More details in #5656
  • Correctness: Materialized Views: virtual columns in a schema may not be propagated correctly #4339
  • CQL: error formats field name as a hex string instead of text #4841
  • Stability: Running nodetool clearsnapshot can cause a failure, if a new snapshot is created at the exact same time #4554 #4557
  • Stability: using an invalid time UUID can cause Scylla to exit. For example
    select toDate(max(mintimeuuid(writetime(COLUMN)))) #5552
  • Stability: out of memory in cartesian product IN queries, where each column filter is multiple by all other filters. For example:
    create table tab (
    pk1 int, pk2 int, pk3 int, pk4 int, pk5 int, pk6 int, pk7 int, pk8 int, pk9 int,
    primary key((pk1, pk2, pk3, pk4, pk5, pk6, pk7, pk8, pk9))
    );

    select * from tab where pk1 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    and pk2 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    and pk3 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    and pk4 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    and pk5 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    and pk6 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    and pk7 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    and pk8 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    and pk9 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    ServerError: std::bad_alloc
    #4752
  • Stability: IPv6 – seed nodes do not connect to peers with the scope field #5225
  • Stability: a graceful shutdown fais produce an error when Tracing is on
    [with Service = tracing::tracing]: Assertion 'local_is_initialized()' #5243
  • Stability: running a misformatted ALTER command on a UDT will cause a crash #4837
  • Stability: Replace dead node for a seed is allowed but does not work #3889
  • Stability: Immediate abort on OS errors EBADF (Bad file number) and ENOTSOCK (Socket operation on non-socket). These errors usually hint on a deeper root cause, and aborting as soon as possible is both safer and makes it easier to analyze.
  • Stability: downgrade assert on row: append(): downgrade to an error. The assertion, introduced in 2019.1.4 proved to be too strict, aborting on cases which are not fatal.
  • Stability: storage_proxy: limit resources consumed in cross-shard operations
  • Performance: New configuration option, enable_shard_aware_drivers, allows disabling shard aware drivers from the server (Scylla) side
  • Reduce memory footprint: Scylla keeps SSTable metadata in memory #4915
  • Reduce memory footprint: cell locking data structures consume 64KiB of RAM per table #4441
  • Install: Scylla installs wrong java version on Ubuntu 18.04 #4548
  • Stability: nodetool scrub --skip-corrupted. As the name suggests, the new option allows scrub to purge corrupt data from the output, by skipping over it.
  • Stability: write-path validator adds more tests to Scylla write path, identifying potential file format issues as soon as possible.
  • Stability: Possible heap-buffer-overflow when stopping the gossip service #5701
  • Stability: A rare race condition in range scans might cause the scan to read some data twice, triggering data validation errors which causes Scylla to exit.
  • Stability: long-running cluster sees bad gossip generation when a node restarts #5164 (followup to a fix in 2019.1.4)
    Active nodes wrongfully marked as DOWN #5800
    In some rare cases, scylla node crashing after upgrade to 2019.1.6 when scanning a table containing a partition which has range-tombstones with a prefix start bound right at the end of the partition
  • Stability: in some rare cases, SSTable metadata in memory is not correctly evicted, causing memory bloat #4951
  • CQL: a restricted column that is not in a select clause returns a wrong value #5708
  • Stability: Node shutdown may exit when using encryption in transit #5759, Seastar #727
  • Performance: scylla doesn’t enforce the use of TSC clocksource #4474
  • Stability: wrong hinted handoff logic for detecting a destination node n DN state #4461
  • Stability: commit log exception in shutdown #4394
  • Stability: potential race condition when creating a table with the same name as a deleted one #4382
  • Install: scylla_setup does not present virtio-blk devices correctly on interactive RAID setup #4066
  • Stability: malformed replication factor is not blocked in time, causing an error when running DESCRIBE SCHEMA later #3801
  • Stability: In rare cases, and under heavy load, for example, during repair, Scylla Enterprise might OOM and exit with an error such as “compaction_manager - compaction failed: std::bad_alloc (std::bad_alloc)”. #3717
  • Stability: possible abort when using reverse queries that read too much data #5804
  • Stability: writes inserted into memtable may be interpreted using incorrect schema version on schema ALTER of a counter table #5095
  • Stability: possible query failure, crash if the number of columns in a clustering key or partition key is more than 16 #5856
  • Stability: When speculative read is configured a write may fail even though enough replicas are alive #6123
  • Performance: Allow tweaking of slow repairs due to redundant writes for tables with materialized views
  • Stability, hinted_handoff: all nodes in the cluster become overloaded (CPU 100% loaded on all shards) after node finishes the “replace node” procedure
  • Tooling: nodetool status returns wrong IPv6 addresses #5808
  • AWS: Update enhanced networking supported instance list #6540
  • CQL: Filtering on a static column in an empty partition is incorrect #5248
  • Stability: When a node fails during an ongoing node replace procedure, and then restarted with no data, parts of the token range might end up not assigned to any node #5172
  • API: Scylla returns the wrong error code (0000 – server internal error) in response to trying to do authentication/authorization operations that involve a non-existing role. #6363
  • Stability: potential use after free in storage service #6465
  • Stability: When hinted handoff enabled, commitlog positions are not removed from rps_set for discarded hints #6433 #6422.
  • Stability: multishard_writer can deadlock when producer fails, for example when, during a repair, a node fail #6241
  • Performance: A few of the local system tables from the `system` namespace, like large_partitions do not use gc grace period to 0, which may result in millions of tombstones being needlessly
    kept for these tables, which can cause read timeouts. Local system tables use LocalStrategy replication, so they do not need to be concerned about gc grace period. #6325
  • CQL: ALTERing compaction settings for table also sets default_time_to_live to 0 #5048
  • Stability: Scylla freezes when casting a decimal with a negative scale to a float #6720
  • Stability: In a case when using partition or clustering keys which have a representation in memory which is larger than 12.8 KB (10% of LSA segment size), linearization of the large (fragmented) keys may cause undefined behavior #6637
  • Correctness: Materialized view updates with future partition tombstones are not correctly generated. When the base table does not have any rows, but it does have a partition tombstone with a given timestamp, inserting a base row with a smaller timestamp will generate an incorrect materialized view update #5793
  • Install / Upgrade (RPM): Scylla Enterprise metapackage does not install correct version of scylla-enterprise-conf package #5639
  • CQL: Impossible WHERE condition returns a non-empty slice #5799
  • Stability: Counter write read-before-write is issued with no timeout, which may lead to unbounded internal concurrency if the enclosing write operation timed out. #5069
  • CQL: Using CQL functions Max and Min on data type inet/blob/list/map/set/time/tuple/udt/column returns unexpected result #5139
  • CQL Role Based Access Control (RBAC): MODIFY permission is required on all materialized views in order to modify a table. #5205
  • CQL: Range deletions for specific columns are not rejected #5728
  • Compression: Internode, on the wire, compression is not enabled based on configuration #5963
  • Stability: Open RPC stream connection when the reader has no data generate errors: stream_session - stream_transfer_task: Fail to send to x.x.x.x:0: std::system_error (error system:99, connect: Cannot assign requested address)
  • Stability: Staging SSTables are incorrectly removed or added to backlog after ALTER TABLE / TRUNCATE #6798
  • Stability: Issuing a reverse query with multiple IN restrictions on the clustering key might result in incorrect results or a crash. For example:
    CREATE TABLE test (pk int, ck int, v int, PRIMARY KEY (pk, ck));
    SELECT * FROM test WHERE pk = ? AND ck IN (?, ?, ?) ORDERED BY ck DESC;

    #6171
  • Stability: index reader fails to handle failure properly, which may lead to unexpected exit #6924

The post Scylla Enterprise Release 2020.1.0 appeared first on ScyllaDB.

Cassandra and Kubernetes: SIG Update and Survey

Five operators for Apache Cassandra have been created that have made it easier to run containerized Cassandra on Kubernetes. Recently the major contributors to these operators came together to discuss the creation of a community-based operator with the intent of making one that makes it easy to run C* on K8s. One of the project’s organizational goals is that the end result will eventually become part of the Apache Software Foundation or the Apache Cassandra project.

The community created a special interest group (SIG) to set goals for what the operator should do at different levels to find a path for creating a standard community-based operator. The Operator Framework suggests five maturity levels for operator capabilities starting from basic installation to auto-pilot.

Operator Capability Maturity Levels

(Source: OperatorFramework.io)

The five Cassandra Kubernetes operators all come from different backgrounds, so the first major goal is to develop a common understanding as to what an operator needs to do and at which level. This first step involves collaborating on a Custom Resource Definition (CRD) that will set the syntax / schema which will be used to create Cassandra clusters on Kubernetes. Once this is done, a software extension can be developed in a variety of languages including Go, Java, or using the Operator SDK in Helm or Ansible without making changes to Kubernetes.

We’re not starting from zero, as the creators of the five operators are actively participating in the SIG. Hopefully much of the decided upon CRD will have code fragments that can be leveraged from the other projects. The major operators out publicly today are those by Sky UK, Orange Telecom, Instaclustr, Elassandra, and DataStax (list sourced from the awesome-cassandra project):

  • CassKop - Cassandra Kubernetes Operator - This Kubernetes operator by Orange automates Cassandra operations such as deploying a new rack aware cluster, adding/removing nodes, configuring the C and JVM parameters, upgrading JVM and C versions. Written in Go. This one was also one of the first ones out and is the only one that can support multiple Kubernetes clusters using Multi-CassKop
  • Cassandra Operator - A Kubernetes operator by SkyUK that manages Cassandra clusters inside Kubernetes. Well designed and organized. This was among the first operators to be released.
  • Instaclustr - Kubernetes Operator for Cassandra operator - The Cassandra operator by Instaclustr manages Cassandra clusters deployed to Kubernetes and automates tasks related to operating an Cassandra cluster.
  • Cass Operator - DataStax’s Kubernetes Operator supports Apache Cassandra as well as DSE containers on Kubernetes. Cassandra configuration is managed directly in the CRD, and Cassandra nodes are managed via a RESTful management API.
  • Elassandra Operator - The Elassandra Kubernetes Operator automates the deployment and management of Elassandra clusters deployed in multiple Kubernetes clusters.

If you’re interested in catching up on what the SIG has been talking about, you can watch the YouTube videos of the sessions and read up on the working documents:

As with any Kubernetes operator, the goal is to create a robot which takes the manual work of setting up complex configurations of containers in Kubernetes easier. An operator can also be seen as a translator between the logical concepts of the software and the concrete Kubernetes resources such as nodes, pods, services. Combined with controllers, operators can abstract out operations such that the human operators can focus on problems related to their industry or domain. As mentioned above, the different operator capability levels offer a roadmap to creating a robust operator for Cassandra users that is easy to use, set up, maintain and upgrade, and expand a cluster.

When a platform needs Cassandra, it’s probably exhausted the other potential datastores available because it needs high availability and fault tolerance, at high speeds, around the world. Kubernetes is a technology that can match well with Cassandra’s capabilities because it shares the features of being linearly scalable, vendor neutral, and cloud agnostic. There is a healthy debate about whether Cassandra belongs in Kubernetes — and whether databases belong in Kubernetes at all — because other orchestration tools are good enough, though the growing user base of Kubernetes in hobby and commercial realms suggests that we need to provide an operator that can keep up with the demand.

Most likely if someone is thinking about moving Cassandra workloads from public cloud, on-premises VMs, or even on-premises bare metal servers to either a public or private cloud hosted K8s, they’ll want to evaluate whether or not the existing architecture could run and be performant.

As part of the SIG, we’re also coming up with reference architectures on which to test the operator. Here are some of the common and most basic reference architectures that are likely candidates.

  • Single Workload in Single Region
    • 1 DCs in 1 region, with 3 nodes (3 total)
    • DC expands to 6 (6 total)
    • DC contracts to 3 ( 3 total)

Single Workload / Datacenter in a Single Region

  • Multi-Workload in Single Region
    • 2 DCs, both in the same region, with 3 nodes in each DC (6 total)
    • Both DCs expand to 6 each (12 total)
    • Both DCs contract to 3 each ( 6 total)
    • Add a third DC in the same region with 3 nodes (9 nodes)
    • Remove third DC

Multiple Workloads / Datacenters in a Single Region

  • Single Workload in Multi-Regions
    • 2 DCs, 1 in each region, with 3 nodes in each DC (6 total)
    • Both DCs expand to 6 each (12 total)
    • Both DCs contract to 3 each ( 6 total)
    • Add a third DC in a 3rd region with 3 nodes (9 total)
    • Remove third DC

Although each organization is different, these scenarios or combinations of these scenarios account for 80% of most pure Apache Cassandra use cases. The SIG would love to know more about Cassandra users’ use cases for Kubernetes. Please take this short survey, which will remain open through September 17, 2020.

Join the biweekly meetings to stay informed.

Dial C* for Operator - Cass Operator Meet Reaper

Reaper is a critical tool for managing Apache Cassandra. Kubernetes-based deployments of Cassandra are no exception to this. Automation is the name of the game with Kubernetes operators. It therefore makes sense that Cass Operator should have tight integration with Reaper. Fortunately, Cass Operator v1.3.0 introduced support for Reaper. This post will take a look at what that means in practice.

Note: If you want to try the examples in this post, install Cass Operator using the instructions in the project’s README.

Pods

Before we dive into the details, let’s take a moment to talk about Kubernetes pods. If you think a pod refers to a container, you are mostly right. A pod actually consists of one or more containers that are deployed together as a single unit. The containers are always scheduled together on the same Kubernetes worker node.

Containers within a pod share network resources and can communicate with each other over localhost. This lends itself very nicely to the proxy pattern. You will find plenty of great examples of the proxy pattern implemented in service meshes.

Containers within a pod also share storage resources. The same volume can be mounted within multiple containers in a pod. This facilitates the sidecar pattern, which is used extensively for logging, among other things.

The Cassandra Pod

Now we are going to look at the pods that are ultimately deployed by Cass Operator. I will refer to them as Cassandra pods since their primary purpose is running Cassandra.

Consinder the following CassandraDatacenter:

# example-cassdc.yaml

apiVersion: cassandra.datastax.com/v1beta1
kind: CassandraDatacenter
metadata:
  name: example
spec:
  clusterName: example
  serverType: cassandra
  serverVersion: 3.11.6
  managementApiAuth:
    insecure: {}
  size: 3
  allowMultipleNodesPerWorker: true
  storageConfig:
    cassandraDataVolumeClaimSpec:
      storageClassName: server-storage
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 5Gi

Create the CassandraDatacenter as follows:

$ kubectl apply -f example-cassdc.yaml

Note: This example as well as the later one specify serverVersion: 3.11.6 for the Cassandra version. Cassandra 3.11.7 was recently released, but Cass Operator does not yet support it. See this ticket for details.

Note: Remember to create the server-storage StorageClass.

It might take a few minutes for the Cassandra cluster to fully initialize. The cluster is ready when the Ready condition in the CassandraDatacenter status reports True, e.g.,

$ kubectl -n cass-operator get cassdc example -o yaml
...
status:
  cassandraOperatorProgress: Ready
  conditions:
  - lastTransitionTime: "2020-08-10T15:17:59Z"
    status: "False"
    type: ScalingUp
  - lastTransitionTime: "2020-08-10T15:17:59Z"
    status: "True"
    type: Initialized
  - lastTransitionTime: "2020-08-10T15:17:59Z"
    status: "True"
    type: Ready
...   

Three (3) pods are created and deployed, one per Cassandra node.

$ kubectl -n get pods -l cassandra.datastax.com/cluster=example
NAME                            READY   STATUS    RESTARTS   AGE
example-example-default-sts-0   2/2     Running   0          4h18m
example-example-default-sts-1   2/2     Running   0          4h18m
example-example-default-sts-2   2/2     Running   0          133m

Each row in the output has 2/2 in the Ready column. What exactly does that mean? It means that there are two application containers in the pod, and both are ready. Here is a diagram showing the containers deployed in a single Cassandra pod:

Cassandra Pod

This shows three containers, the first of which labled as an init container. Init containers have to run to successful completion before any of the main application containers are started.

We can use a JSONPath query with kubectl to verify the names of the application containers:

$ kubectl get pod example-example-default-sts-0 -o jsonpath={.spec.containers[*].name} | tr -s '[[:space:]]' '\n'
cassandra
server-system-logger

The cassandra container runs the Management API for Apache Cassandra, which manages the lifecycle of the Cassandra instance.

server-system-logger is a logging sidecar container that exposes Cassandra’s system.log. We can conveniently access Cassandra’s system.log using the kubectl log command as follows:

$ kubectl logs example-example-default-sts-0 -c server-system-logger

The Cassandra Pod with Reaper

Here is another CassandraDatacenter specifying that Reaper should be deployed:

# example-reaper-cassdc.yaml

apiVersion: cassandra.datastax.com/v1beta1
kind: CassandraDatacenter
metadata:
  name: example-reaper
spec:
  clusterName: example-reaper
  serverType: cassandra
  serverVersion: 3.11.6
  managementApiAuth:
    insecure: {}
  size: 3
  reaper:
    enabled: true
  storageConfig:
    cassandraDataVolumeClaimSpec:
      storageClassName: server-storage
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 5Gi

The only difference from the first CassandraDatacenter are these two lines:

  reaper:
    enabled: true

This informs Cass Operator to deploy Reaper in sidecar mode. One of the main benefits of deploying Reaper in sidecar mode is security. Reaper only needs local JMX access to perform repairs. There is no need for remote JMX access or JMX authentication to be enabled.

Once again three pods are created and deployed, one per Cassandra node.

$ kubectl -n cass-operator get pods -l cassandra.datastax.com/cluster=example-reaper
NAME                                          READY   STATUS    RESTARTS   AGE
example-reaper-example-reaper-default-sts-0   3/3     Running   1          6m5s
example-reaper-example-reaper-default-sts-1   3/3     Running   1          6m5s
example-reaper-example-reaper-default-sts-2   3/3     Running   1          6m4s

Now, each pod reports 3/3 in the Ready column. Here is another diagram to illustrate which containers are deployed in a single Cassandra pod:

Cassandra Pod with Reaper sidecar

Now we have the reaper application container in addition to the cassandra and server-system-logger containers.

Reaper Schema Initialization

In sidecar mode, Reaper automatically uses the Cassandra cluster as its storage backend. Running Reaper with a Cassandra backend requires first creating the reaper_db keyspace before deploying Reaper. Cass Operator takes care of this for us with a Kubernetes Job. The following kubectl get jobs command lists the Job that gets deployed:

$ kubectl get jobs -l cassandra.datastax.com/cluster=example-reaper
NAME                                COMPLETIONS   DURATION   AGE
example-reaper-reaper-init-schema   1/1           12s        45m

Cass Operator deploys a Job whose name is of the form <cassandradatacenter-name>-init-schema. The Job runs a small Python script named init_keyspace.py.

The output from kubectl -n cass-operator get pods -l cassandra.datastax.com/cluster=example-reaper showed one restart for each pod. Those restarts were for the reaper containers. This happened because the reaper_db keyspace had not yet been initialized.

We can see this in the log output:

$ kubectl -n cass-operator logs example-reaper-example-reaper-default-sts-1 -c reaper | grep ERROR -A 1
ERROR  [2020-08-10 20:28:19,965] [main] i.c.ReaperApplication - Storage is not ready yet, trying again to connect shortly...
com.datastax.driver.core.exceptions.InvalidQueryException: Keyspace 'reaper_db' does not exist

The restarts are perfectly fine as there are no ordering guarantees with the start of application containers in a pod.

Accessing the Reaper UI

Reaper provides a rich UI that allows you to do several things including:

  • Monitor Cassandra clusters
  • Schedule repairs
  • Manager and monitor repairs

Cass Operator deploys a Service to expose the UI. Here are the Services that Cass Operator deploys.

$ kubectl -n cass-operator get svc
NAME                                             TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)             AGE
cass-operator-metrics                            ClusterIP   10.0.37.211   <none>        8383/TCP,8686/TCP   8h
cassandradatacenter-webhook-service              ClusterIP   10.0.33.233   <none>        443/TCP             8h
example-reaper-example-reaper-all-pods-service   ClusterIP   None          <none>        <none>              14m
example-reaper-example-reaper-service            ClusterIP   None          <none>        9042/TCP,8080/TCP   14m
example-reaper-reaper-service                    ClusterIP   10.0.47.8     <none>        7080/TCP            10m
example-reaper-seed-service                      ClusterIP   None          <none>        <none>              14m

The Service we are interested in has a name of the form <clusterName>-reaper-service which is example-reaper-reaper-service. It exposes the port 7080.

One of the easiest ways to access the UI is with port forwarding.

$ kubectl -n cass-operator port-forward svc/example-reaper-reaper-service 7080:7080
Forwarding from 127.0.0.1:7080 -> 7080
Forwarding from [::1]:7080 -> 7080
Handling connection for 7080

Here is a screenshot of the UI:

Reaper UI

Our example-reaper cluster shows up in the cluster list because it gets automatically registered when Reaper runs in sidecar mode.

Accessing the Reaper REST API

Reaper also provides a REST API in addition to the UI for managing clusters and repair schedules. It listens for requests on the ui port which means it is accessible as well through example-reaper-reaper-service. Here is an example of listing registered clusters via curl:

$ curl -H "Content-Type: application/json" http://localhost:7080/cluster
["example-reaper"]

Wrap Up

Reaper is an essential tool for managing Cassandra. Future releases of Cass Operator may make some settings such as resource requirements (i.e., CPU, memory) and authentication/authorization configurable. It might also support deploying Reaper with a different topology. For example, instead of using sidecar mode, Cass Operator might provide the option to deploy a single Reaper instance. This integration is a big improvement in making it easier to run and manage Cassandra in Kubernetes.

IOTA: Using Scylla for distributed storage of the Tangle

This post explores how the IOTA Foundation integrated Scylla into its permanode software, Chronicle, to support real-world applications on the Tangle.

Introduction to IOTA

IOTA is an open-source distributed ledger technology (DLT), designed to support frictionless data and value transfer on the Tangle.

The Tangle is the distributed ledger that’s shared across all nodes in an IOTA network. Any client, anywhere in the world, is able to send transactions to any node, and that transaction will be validated and replicated across the rest of the network to form one version of truth.

Differences between the Tangle and blockchains

Blockchains and the Tangle both fall under the same top-level category: DLT. The main difference between the Tangle and a blockchain lies in its data structure, which gives the Tangle the following unique features:

  • No transaction fees
  • No miners

The blockchain data structure consists of a chain of sequential blocks, where each block contains a limited number of transactions. As a result, you can attach new transactions to only one place: A block at the end of the chain.

To secure the network against attacks, each block in a blockchain must be mined. Mining requires a lot of computational power, which is why miners are incentivized by transaction fees and block rewards.

Due to this block limitation known as the blockchain bottleneck, blockchain networks often experience slow confirmation times and high transaction fees.

The Tangle data structure is a directed acyclic graph (DAG), where each transaction approves two previous ones.

Rather than being limited to a single place for attaching new transactions, you can attach transactions anywhere in the Tangle, which drastically reduces the limit on confirmation times.

Instead of having miners, the Tangle is secured by a temporary finality device called the Coordinator that decides which parts of the Tangle are valid and should be confirmed.

This device is set to be removed in a project called Coordicide to make the Tangle completely decentralized.

The IOTA Foundation

The IOTA Foundation is a non-profit that collaborates with the IOTA community and partners to deliver sustainable, real-world impact.

The goals of the IOTA Foundation are to:

  • Research and implement the IOTA protocol
  • Standardize the protocol to ensure its widespread adoption
  • Develop production-ready open-source software
  • Educate others about IOTA technologies and promote their use cases

The IOTA community

The IOTA community is made up of 250,000 members who play an active role in the development and adoption of IOTA technology.

Some notable community projects include:

About permanodes

Permanodes are devices that are designed to store the entire history of the Tangle. To do so, they do not validate transactions. Instead, their one goal is to take transactions from IOTA nodes and store them in a separate distributed database for others to query.

The case for a permanode

For IOTA nodes with limited storage, the Tangle can quickly fill up their database. This presents a problem for mobile devices, and even moreso, the Internet of Things, where IOTA is working on integration with low-level, resource-constrained devices. To combat this problem, nodes come with a feature called local snapshots that allows them to delete old transactions and keep their local copy of the Tangle small.

However, for many business use cases, data in transactions need to be stored for long periods of time. For example, financial data must be stored for 10 years in some cases, and identity data needs to be kept for at least the lifetime of the identity (in some cases your identity may need to survive even beyond your lifetime).

In order to enable these business use cases without putting a burden on nodes, the IOTA Foundation developed a permanode called Chronicle.

Why Scylla

Chronicle uses Scylla as a default storage solution because it provides the following important features:
Fault tolerance: Users can set a replication strategy to determine how to replicate data to avoid a single-point of failure

  • Data consistency: Users can set a consistency level to determine whether a read or write operation is successful
  • Fast and efficient data queries: Scylla uses LSM-based storage with high write throughput
  • Time to live: Users can define the lifetime of their data
  • Low operating costs: Licenses (including free and enterprise licenses) and operating costs are very favorable compared to other solutions

How Scylla integrates with Chronicle

Chronicle is a framework for building permanode services that receive transactions from an IOTA network and store them in a Scylla cluster. Originally written in Elixir, it has now been ported fully to Rust to support interoperability with other IOTA projects, like Bee, and to provide a more secure programming environment.

To make Chronicle extensible, it is divided into the following components:

Dashboard

This component is an application for managing and monitoring components such as adding or removing Scylla nodes without downtime or connecting to new IOTA nodes in the Chronicle Broker.

Chronicle Broker

This component receives and processes transactions from IOTA nodes through an event API such as MQTT and provides useful utilities to import historical data.

At the moment, Chronicle Broker uses MQTT to receive transactions and persist them, using the shard-aware strategy in Chronicle Storage.

Chronicle Storage

This component provides access to the datasets and the physical Scylla nodes in a cluster, called a ring.

Chronicle Storage starts by initializing the dashboard and query engine.

The query engine is made up of stages, which handle requests for transactions from each shard in a given Scylla node. The stages are controlled by stage supervisors, which in turn are controlled by node supervisors that maintain the cluster topology.

Each stage includes the following lightweight processors:

Workers represent requests such as from API calls or the MQTT worker.

When a worker receives a request, it sends it to the corresponding reporter of the shard.

The reporter then sends the request to its sender, which handles sending the request to the shard socket.

The receiver then takes the responses from the shard and passes it to the reporter, who passes it back to the worker to pass onto the client.

To use Chronicle Storage, applications must implement the worker trait and access the local ring to send requests to the corresponding stage.

The ring provides a subset of useful send strategy methods:

  • send_local_random_replica(token, request): Selects a random stage within the same datacenter
  • send_global_random_replica(token, request): Selects a random stage in any datacenter

CQL schemas

The current Chronicle data model uses the following tables:

  • Transaction table: Stores transaction hashes and fields
      CREATE TABLE IF NOT EXISTS mainnet.transaction (
            hash varchar,
            payload varchar,
            address varchar,
            value varchar,
            obsolete_tag varchar,
            timestamp varchar,
            current_index varchar,
            last_index varchar,
            bundle varchar,
            trunk varchar,
            branch varchar,
            tag varchar,
            attachment_timestamp varchar,
            attachment_timestamp_lower varchar,
            attachment_timestamp_upper varchar,
            nonce varchar,
            milestone bigint,
            PRIMARY KEY(hash, payload, address, value, obsolete_tag, timestamp,
            current_index, last_index, bundle, trunk, branch, tag,  attachment_timestamp,
            attachment_timestamp_lower, attachment_timestamp_upper, nonce)
      ); 
  • Hint Table: Stores the sharding information for a given vertex
       CREATE TABLE IF NOT EXISTS mainnet.hint (
            vertex varchar,
            kind varchar,
            year smallint,
            month tinyint,
            milestone bigint,
            PRIMARY KEY(vertex, kind, year, month)
        ) WITH CLUSTERING ORDER BY (kind DESC, year DESC, month DESC); 
  • Data table: Stores the relations to a transaction, and can only be used if we have the sharding info for a given vertex, which can be queried from the hint table
        CREATE TABLE IF NOT EXISTS mainnet.data (
            vertex varchar,
            year smallint,
            month tinyint,
            kind varchar,
            timestamp bigint,
            tx varchar,
            value bigint,
            milestone bigint,
            PRIMARY KEY((vertex,year,month), kind, timestamp, tx, value)
        ) WITH CLUSTERING ORDER BY (kind DESC, timestamp DESC); 

What’s next

Chronicle’s roadmap includes some exciting improvements:

  • A UI dashboard for managing Chronicle components
  • Transaction solidification to allow Chronicle to know if it is missing any transactions in the Tangle
  • Selective permanode service for storing only transactions that you are interested in

In the meantime, see the documentation for steps on getting started with Chronicle.

If you want to contribute, the source code is all open source, so take a look at the following resources:

The post IOTA: Using Scylla for distributed storage of the Tangle appeared first on ScyllaDB.

Scylla Student Projects, Part I: Parquet

In 2019, ScyllaDB sponsored a program for Computer Science students organized by the University of Warsaw. Throughout the whole academic year, 3 teams of undergraduate students collaborated with and learned from ScyllaDB engineers to bring new features to Scylla and its underlying Seastar engine. The projects picked for 2019 edition were:

  • Parquet support for Seastar and Scylla
  • SeastarFS: an asynchronous userspace file system for Seastar
  • Kafka client for Seastar and Scylla.

We’re pleased to announce that the cooperation was very successful and we look forward to taking part in future editions of the program! Now, let’s see some details on the results of the first project on the list: Parquet support for Seastar and Scylla. This work is all to the credit of the students who wrote it, Samvel Abrahamyan, Michał Chojnowski, Adam Czajkowski and Jacek Karwowski, and their supervisor, Dr. Robert Dąbrowski.

Introduction

Apache Parquet is a well known columnar storage format, incorporated into Apache Arrow, Apache Spark SQL, Pandas and other projects. In its columns, it can store simple types as well as complex nested objects and data structures. Representing the data as columns brings interesting advantages over the classic row-based approach:

  • fetching specific columns from a table requires less I/O, since no redundant values from other columns are read from disk
  • values from a column are often the same or similar to each other, which increases the efficiency of compression algorithms
  • interesting data encoding schemes, like bit-packing integers, can be easily applied
  • more complex operations, like aggregating all values from a single column,
    can be implemented more efficiently (e.g. by leveraging vectorized CPU instructions)

An example of the Parquet file format, showing how it can optimize based on repeated values in columnar data.

Scylla uses SSTables as its native storage format, but we’re interested in allowing our users to pick another format — like Parquet — for certain workloads. That was the main motivation for pursuing this student project.

How to integrate ScyllaDB with Parquet?

Parquet is open-source, very popular and broadly used by many projects and companies, so why not use an existing C++ library and plug it right into Scylla? The short answer is “latency.”

Scylla is built on top of Seastar, an asynchronous high-performance C++ framework. Seastar was created in accordance with the shared-nothing principle and it has its own nonblocking I/O primitives, schedulers, priority groups and many other mechanisms designed specifically for ensuring low latency and most optimized hardware utilization. In the Seastar world, issuing a blocking system call (like read()) is an unforgivable mistake and a performance killer. That also means that many libraries which rely on traditional, blocking system calls (used without care) would create such performance regressions when used in a Seastar-based project — and Parquet’s C++ implementation was not an exception in that matter.

There are multiple ways of adapting libraries for Seastar, but in this case the simplest answer turned out to be the best — let’s write our own! Parquet is well documented and its specification is quite short, so it was a great fit for a team of brave students to try and implement it from scratch in Seastar.

Implementing parquet4seastar

Spoiler alert: the library is already implemented and it works!

https://github.com/michoecho/parquet4seastar

The first iteration of the project was an attempt to simply copy the whole code from Arrow’s repository and replace all I/O calls with ones compatible with Seastar. That also means rewriting everything to Seastar’s future/promise model, which is a boring and mechanical task, but also easy to do. Unfortunately, it quickly turned out that Parquet implementation from Apache Arrow has quite a lot of dependencies within Arrow itself. Thus, in order to avoid rewriting more and more lines, a decision was made: let’s start over, take Parquet documentation and write a simple library for reading and writing Parquet files, built from scratch on top of Seastar.

Other advantages to this approach cited by the students: by writing it over from scratch, they would avoid carrying over any technical debt and minimize the amount of lines-of-code to be added to the existing code base, and, most of all, they thought it would be more fun!

A block diagram of how parquet4seastar and parquet2cql were designed to interact with the Scylla database.

The library was written using state-of-the-art Seastar practices, which means that measures have been taken to maximize the performance while keeping the latencies low. The performance tests indicated that the reactor stalls all came from external compression libraries – which, of course, can be rewritten in Seastar as well.

We were also pleased to discover that Parquet’s C++ implementation in Apache Arrow comes with a comprehensive set of unit tests – which were adjusted for parquet4seastar and used for ensuring that our reimplementation is at least as correct as the original.

Still, our main goal was to make the library easy to integrate with existing Seastar projects, like Scylla. As a first step and a proof-of-concept for the library, a small application which reads Parquet files and translates them into CQL queries was created.

parquet2cql

parquet2cql is a small demo application which shows the potential of parquet4seastar library. It reads Parquet files from disks, takes a CQL schema for a specific table and spits out CQL queries, ready to be injected into ScyllaDB via cqlsh or any CQL driver. Please find a cool graph which shows how parquet2cql works below. `p4s` stands for `parquet4seastar`.

parquet2cql can be used as a crude way of loading Parquet data straight into Scylla, but it’s still only a demo application – e.g. it does not support CQL prepared statements, which would make the process much more optimized. For those interested in migrating Parquet data to Scylla clusters, there’s a way to ingest Parquet files using our Scylla Spark Migrator.

Integration with ScyllaDB

Allowing ScyllaDB to store its data directly in Parquet instead of the classic SSTable format was way out of scope for this project, but, nonetheless, a proof-of-concept demo which stores SSTable data files not only in the native MC format, but also in Parquet was performed successfully! The implementation assumed that no complex types (lists, set) were present in the table. This experiment allowed us to compare the performance and storage overhead of using Parquet vs SSTable mc format for various workloads. Here’s a diagram showing how the experiment was performed:

Results

The project was not only about coding – a vast part of it was running various correctness and performance tests and comparisons. One of the tests checked whether parquet4seastar library is faster than its older brother from Apache Arrow. Here are the sample results:

Reading time of parquet4seastar relative to Apache Arrow (less means that parquet4seastar was faster).

The results indicate that parquet4seastar is generally similar to Apache Arrow in terms of time of execution (with an exception of the short strings scenario, which is a result of a design decision, please find more details in the paper below). The results are promising, because they mean that providing much better latency guarantees by using nonblocking I/O and the future/promise model did not result in any execution time overhead. Aside from comparing the library against Apache Arrow, many more test scenarios were run – measuring reactor stalls, comparing the sizes of SSTables stored in native MC format vs in Parquet, etc.

Here we have a comparison of the disk usage between Parquet and SSTables. The chart above shows the results of the first test conducted where the students inserted a million rows with random strings each time, but some values were duplicated. The horizontal axis shows the number of duplicates for each value and the vertical axis shows the total size of the files. You can see that in this test Parquet is more efficient.

In this example the students tested a table with multiple NULL values, typical of a sparse data set. The horizontal axis shows the number of randomly selected columns that are not NULL and instead have a random value. In this case you can see that SSTables are a better format when most of the columns are NULL.

From these tests it can be concluded that with Parquet you can achieve significant disk space savings when the data is not null, but the number of unique values ​​is not very large.

The Paper

Now, the coolest part. Each project was also a foundation for the Bachelor’s thesis of the students who took part in it. The thesis was already reviewed and accepted by the University of Warsaw and is public to read.You can find a detailed description of the design, goals, performed tests and results in this document: zpp_parquet.pdf. We’re very proud of contributing to the creation of this academic paper – congrats to all brand new BSc degree holders! We are definitely looking forward to continuing our cooperation with the students and the faculty of the University of Warsaw in the future. Happy reading!

READ THIS: APACHE PARQUET SUPPORT FOR SCYLLA

CHECK OUT PART II: IMPLEMENTING AN ASYNC USERSPACE FILESYSTEM

The post Scylla Student Projects, Part I: Parquet appeared first on ScyllaDB.

Apache Cassandra 4.0 Beta Released

The beta release of Apache Cassandra 4.0 is finally here, it’s been two years in the making. We’ve had a preview release available to customers since March for testing. A wide range of improvements have been made.

Stability

The explicit goal of this release has been to be “the most stable major release ever” to accelerate adoption within the release cycle, which I blogged about in January.  For this release a series of new testing frameworks were implemented focusing on stability, and performance, which have paid off handsomely. The feeling of the team at Instaclustr is that we have never been more confident about a release (and we’ve seen quite a few!)

Performance

This release integrates the async event-driven networking code from Netty for communication between nodes. I blogged about Netty in February but it’s worth reiterating what’s been achieved with this upgrade. It has enabled Cassandra 4.0 to have a single thread pool for all connections to other nodes instead of maintaining N threads per peer which was cramping performance by causing lots of context switching. It has also facilitated zero copy streaming for SStables which now goes x5 times faster than before.

This complete overhaul of the networking infrastructure has delivered some serious gains.

  • Tail end latency has been reduced by 40%+ in P99s in initial testing
  • Node recovery time has been vastly reduced
  • Scaling large clusters is easier and faster

Auditing and Observability

Cassandra 4.0 introduces a powerful new set of enterprise class audit capabilities that I covered here in March. These help Cassandra operators meet their SOX and PCI requirements with a robust high level interface. Audit logging saves to the node, outside of the database, with configurable log rollover. Audit logs can be configured to attend to particular keyspaces, commands, or users and they can be inspected with the auditlogviewer utility.

Full Query Logging is also supported and the fqltool allows inspection of these logs.

Virtual Tables

Virtual tables, which I covered here in February, enable a series of metrics to be pulled from a node via CQL from read-only tables. This is a more elegant mechanism than JMX access as it avoids the additional configuration required. JMX access is not going anywhere soon, but this presents a really solid improvement to a number of metric monitoring tasks.

Community

Our community is our most powerful feature in all our releases and I can’t think of a better validation of open source under the Apache Foundation community model than this release. I just want to take this opportunity to congratulate and thank everyone in the community who have taken both Cassandra and its release processes to the next level with this beta release. 

As always, you can spin up a free trial of Cassandra on our platform. Even with the performance gains delivered in this release our popular Cassandra Data Modeling Guide to Best Practices is always worth a read to get the most out of Cassandra.

The post Apache Cassandra 4.0 Beta Released appeared first on Instaclustr.

Introducing Apache Cassandra 4.0 Beta: Battle Tested From Day One

This is the most stable Apache Cassandra in history; you should start using Apache Cassandra 4.0 Beta today in your test and QA environments, head to the downloads site to get your hands on it. The Cassandra community is on a mission to deliver a 4.0 GA release that is ready to be deployed to production. You can guarantee this holds true by running your application workloads against the Beta release and contributing to the community’s validation effort to get Cassandra 4.0 to GA.

With over 1,000 bug fixes, improvements and new features and the project’s wholehearted focus on quality with replay, fuzz, property-based, fault-injection, and performance tests on clusters as large as 1,000 nodes and with hundreds of real world use cases and schemas tested, Cassandra 4.0 redefines what users should expect from any open or closed source database. With software, hardware, and QA testing donations from the likes of Instaclustr, iland, Amazon, and Datastax, this release has seen an unprecedented cross-industry collaboration towards releasing a battle-tested database with enterprise security features and an understanding of what it takes to deliver scale in the cloud.

There will be no new features or breaking API changes in future Beta or GA builds. You can expect the time you put into the beta to translate into transitioning your production workloads to 4.0 in the near future.

Quality in distributed infrastructure software takes time and this release is no exception. Open source projects are only as strong as the community of people that build and use them, so your feedback is a critical part of making this the best release in project history; share your thoughts on the user or dev mailing lists or in the #cassandra ASF slack channel.

Redefining the elasticity you should expect from your distributed systems with Zero Copy Streaming

5x faster scaling operations

Cassandra streams data between nodes during scaling operations such as adding a new node or datacenter during peak traffic times. Thanks to the new Zero Copy Streaming functionality in 4.0, this critical operation is now up to 5x faster without vnodes compared to previous versions, which means a more elastic architecture particularly in cloud and Kubernetes environments.

Globally distributed systems have unique consistency caveats and Cassandra keeps the data replicas in sync through a process called repair. Many of the fundamentals of the algorithm for incremental repair were rewritten to harden and optimize incremental repair for a faster and less resource intensive operation to maintain consistency across data replicas.

Giving you visibility and control over what’s happening in your cluster with real time Audit Logging and Traffic Replay

Enterprise-grade security & observability

To ensure regulatory and security compliance with SOX, PCI or GDPR, it’s critical to understand who is accessing data and when they are accessing it. Cassandra 4.0 delivers a long awaited audit logging feature for operators to track the DML, DDL, and DCL activity with minimal impact to normal workload performance. Built on the same underlying implementation, there is also a new fqltool that allows the capture and replay of production workloads for analysis.

There are new controls to enable use cases that require data access on a per data center basis. For example, if you have a data center in the United States and a data center in Europe, you can now configure a Cassandra role to only have access to a single data center using the new CassandraNetworkAuthorizer.

For years, the primary way to observe Cassandra clusters has been through JMX and open source tools such as Instaclustr’s Cassandra Exporter and DataStax’s Metrics Collector. In this most recent version of Cassandra you can selectively expose system metrics or configuration settings via Virtual Tables that are consumed like any other Cassandra table. This delivers flexibility for operators to ensure that they have the signals in place to keep their deployments healthy.

Looking to the future with Java 11 support and ZGC

One of the most exciting features of Java 11 is the new Z Garbage Collector (ZGC) that aims to reduce GC pause times to a max of a few milliseconds with no latency degradation as heap sizes increase. This feature is still experimental and thorough testing should be performed before deploying to production. These improvements significantly improve the node availability profiles from garbage collection on a cluster which is why this feature has been included as experimental in the Cassandra 4.0 release.

Part of a vibrant and healthy ecosystem

The third-party ecosystem has their eyes on this release and a number of utilities have already added support for Cassandra 4.0. These include the client driver libraries, Spring Boot and Spring Data, Quarkus, the DataStax Kafka Connector and Bulk Loader, The Last Pickle’s Cassandra Reaper tool for managing repairs, Medusa for handling backup and restore, the Spark Cassandra Connector, The Definitive Guide for Apache Cassandra, and the list goes on.

Get started today

There’s no doubt that open source drives innovation and the Cassandra 4.0 Beta exemplifies the value in a community of contributors that run Cassandra in some of the largest deployments in the world.

To put it in perspective, if you use a website or a smartphone today, you’re probably touching a Cassandra-backed system.

To download the Beta, head to the Apache Cassandra downloads site.

Resources:

Apache Cassandra Blog: Even Higher Availability with 5x Faster Streaming in Cassandra 4.0

The Last Pickle Blog: Incremental Repair Improvements in Cassandra 4

Apache Cassandra Blog: Audit Logging in Apache Cassandra 4.0

The Last Pickle Blog: Cassandra 4.0 Data Center Security Enhancements

The Last Pickle Blog: Virtual tables are coming in Cassandra 4.0

The Last Pickle Blog: Java 11 Support in Apache Cassandra 4.0

Apache Cassandra Infographic

Upgrading a Large Cassandra Cluster with cstar

I recently did an upgrade of 200+ nodes of Cassandra across multiple environments sitting behind multiple applications using the cstar tool. We chose the cstar tool because, out of all automation options, it has topology awareness specifically to Cassandra. Here are some things I noticed.

1. The sister program cstarpar is sometimes required

The cstar tool is used to run commands on servers in a distributed way. The alternate cstarpar is used if you need to run the commands on the originating server instead. The Last Pickle detailed a fine example in their 3-part series on cstarpar [https://thelastpickle.com/blog/2018/12/11/cstar-reboots.html]. In our case, the individual nodes didn’t have the same access to a configuration management server that the jump host did. The cstarpar script was used to issue a command to the configuration management server, and then send ssh commands to the individual nodes (to change files, restart, etc.).

2. The cstar jobs folder can be used to view output

The jobs folder is on the originating server under ~/.cstar/jobs, with a UUID-labeled directory for each job, and server hostname directories underneath. The output is in a file named “out” under each hostname directory. Grepping through ~/.cstar/jobs/[UUID]/server*/out is a handy way to view desired info in the output.

3. Use verbose output

The cstar output can be a little too quiet, and we know that sometimes means trouble. The tag on a -v flag so you have lots of output to grep through as above.

4. Ask for the output

Related, you also have to ask for some output. One of the pre-checks was to verify that specifically named files didn’t exist. Long story short, but the most efficient way to do this particular check was to grep through directories. In the test, the command worked, and in staging, the command worked. In production, cstar was marking each node as failed. Much troubleshooting later, we realized that the files existed in test and staging, but not production, so the script wasn’t finding anything and therefore “failing.” Piping the output into a ‘wc -l’ allowed each check to have some kind of response, and the script succeeded.

5. The Cassandra nodes have to be up

It’s documented that all of the nodes in a cluster have to be registering as up, or cstar will fail. The automated process we used was to shut down Cassandra, pull the new config and binary, and restart Cassandra, node by node. With a lot of Cassandra nodes, even with a brief sleep time in between nodes, I was hitting the permissions server too often and too quickly for its comfort, and about 75% of the way through, it started blocking me after Cassandra was shut down on every 10th node or so. The only way I detected this was that cstar paused for long enough that I noticed; there was no error message. I had to wait for the permissions server to stop limiting me, and then manually issue the commands on the node. On the plus side, cstar didn’t fail while waiting for me and continued on with the list of nodes automatically after I took care of the individual node.

6. It really is topology-aware

I saved the best for last. It’s a trick to make other automation tools aware of Cassandra topology. In this upgrade environment, we had multiple data centers with varying numbers of nodes within each, and cstar was smart about distributing the work so that generally the same percentage of nodes were completed for each data center at any point in time. That meant that in the end, the largest data center wasn’t being hit repeatedly with remaining upgrades.

Overall, the gotchas were minor, and I’m happy we used the cstar tool on this upgrade. It allowed flexibility to run custom scripts in a unique environment and certainly shortened the amount of time required to upgrade a large cluster.

Check out the cstar tool here https://github.com/spotify/cstar.

Spark + Cassandra Best Practices

Spark Overview

Spark was created in 2009 as a response to difficulties with map-reduce in Hadoop, particularly in supporting machine learning and other interactive data analysis. Spark simplifies the processing and analysis of data, reducing the number of steps and allowing ease of development. It also provides for reduced latency, since processing is done in-memory.

Spark can be used to process and analyze data to and from a variety of data storage sources and destinations. In this blog , we will discuss Spark in conjunction with data stored in Cassandra.

Querying and manipulating data in Spark has several advantages over doing so directly in Cassandra, not the least of which is being able to join data performantly. This feature is useful for analytics projects.

Spark Use Cases

Typical use cases for Spark when used with Cassandra are: aggregating data (for example, calculating averages by day or grouping counts by category) and archiving data (for example, sending external data to cold storage before deleting from Cassandra). Spark is also used for batched inserts to Cassandra. Other use cases not particular to Cassandra include a variety of machine learning topics.

Spark in the Data Lifecycle

A data analysis project starts with data ingestion into data storage. From there, data is cleansed and otherwise processed. The resulting data is analyzed, reviewing for patterns and other qualities. It may then be further analyzed using a variety of machine learning methods. End-users will be able to run ad hoc queries and use interfaces to visualize data patterns. Spark has a star role within this data flow architecture.

Ingestion

Spark can be used independently to load data in batches from a variety of data sources (including Cassandra tables) into distributed data structures (RDDs) used in Spark to parallelize analytic jobs. Since one of the key features of RDDs is the ability to do this processing in memory, loading large amounts of data without server-side filtering will slow down your project. The spark-cassandra-connector has this filtering and other capabilities. (See https://github.com/datastax/spark-cassandra-connector/.) The limitation on memory resources also implies that, once the data is analyzed, it should be persisted (e.g., to a file or database).

To avoid some of the limitations of this batch processing, streaming functionality was added to Spark. In Spark 1, this functionality was offered through DStreams. (See https://spark.apache.org/docs/latest/streaming-programming-guide.html.)

Spark 2 — a more robust version of Spark in general — includes Structured Streaming. (See https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.) With Structured Streaming, consider that instead of creating a static table based on a batch input, the table is constantly updated with new data from the source. The data will be stored in a data frame and continuously updated with the new data. (Another benefit of using dataframes over RDDs is that the data is intuitively abstracted into columns and rows.)

Available data sources on the source side for streaming include the commonly used Apache Kafka. Kafka buffers the ingest, which is key for high-volume streams. See https://kafka.apache.org/ for more details on Kafka.

Data Storage

Although we are focusing on Cassandra as the data storage in this presentation, other storage sources and destinations are possible. Another frequently used data storage option is Hadoop HDFS. The previously mentioned spark-cassandra-connector has capabilities to write results to Cassandra, and in the case of batch loading, to read data directly from Cassandra.

Native data output formats available include both JSON and Parquet. The Parquet format in particular is useful for writing to AWS S3. See https://aws.amazon.com/about-aws/whats-new/2018/09/amazon-s3-announces-new-features-for-s3-select/ for more information on querying S3 files stored in Parquet format. A good use case for this is archiving data from Cassandra.

Data Cleansing

Data cleansing involves dealing with questionable data (such as null values) and other preprocessing tasks (such as converting categorical data to mapped integers). Once data is stored in a data frame, it can be transformed into new dataframes based on filters. Other than the fact you have the capability to do this cleansing within the same code (e.g., the Scala script running Spark), Spark does not provide magic to clean data; after all, this takes knowledge about the data and the business to understand and code particular transformation tasks.

Pattern Analysis

Spark dataframes can be easily explored for basic patterns using commands like describe, which will show the count, mean, standard deviation, minimum value, and maximum value of selected columns. Dataframes can be further transformed with functions like groupBy, pivot, and agg (aggregate). Spark SQL can be used for complex joins and similar tasks, using the SQL language that will be familiar to many data analysts.

Machine Learning and Data Mining

Machine learning and data mining encompass a broad range of data modeling algorithms intended to make predictions or to discover unknown meaning within data.

From Spark 2 onward, the main library for Spark machine learning is based on data frames instead of RDDs. You may see this new data frame-based library referred to as Spark ML, but the library name hasn’t changed — it is still MLlib. (See https://spark.apache.org/docs/latest/ml-guide.html.) Some things that are possible with Spark in this area are recommendation engines, anomaly detection, semantic analysis, and risk analysis.

Ad Hoc Queries

Spark SQL is available to use within any code used with Spark, or from the command line interface; however, the requirement to run ad hoc queries generally implies that business end-users want to access a GUI to both ask questions of the data and create visualizations. This activity could take place using the eventual destination datastore as the backend. Directly from Spark, there are enterprise options such as Tableau, which has a Spark connector. For query speed, a memory-based cache such as Apache Ignite could be used as the analytics backend; to maintain that speed by avoiding disk i/o, the data being used for queries should fit into memory.

Visualization

Depending on the programming language and platform used, there may be libraries available to directly visualize results. For example, if Python is used in a Jupyter notebook, then Matplotlib will be available. (See https://matplotlib.org/.) In general, except for investigating data in order to clean it, etc., visualization will be done on the data after it is written to the destination. For business end-users, the above discussion in Ad Hoc Queries applies.

Architecture

The general architecture for a Spark + Cassandra project is apparent from the discussion of the Data Lifecycle above. The core elements are source data storage, a queueing technology, the Spark cluster, and destination data storage.

In the case of Cassandra, the source data storage is of course a cluster. Spark will only query a single data center, and to avoid load on a production cluster, this is what you want. Most installations will set up a second data center replicating data from the production data center cluster and attach Spark to this second data center. If you are unable to set up a separate data center to connect to Spark (and we strongly recommend setting it up), be sure to carefully tune the write variables in the spark-cassandra-connector. In addition, if Datastax Enterprise is being used, then DSE Search should be isolated on a third data center.

Another consideration is whether to set up Spark on dedicated machines. It is possible to run Spark on the same nodes as Cassandra, and this is the default with DSE Analytics. Again, this is not advised on the main production cluster, but can be done on a second, separate cluster.

Spark in the Cloud

Spark at Google Cloud

Google Cloud offers Dataproc as a fully managed service for Spark (and Hadoop):

https://cloud.google.com/dataproc/

Spark at AWS

AWS supports Spark on EMR: https://aws.amazon.com/emr/features/spark/.

Spark Development

Coding Language Options

Spark code can be written in Python, Scala, Java, or R. SQL can also be used within much of Spark code.

Tuning Notes

Spark Connector Configuration

Slowing down the throughput (output.throughput_mb_per_sec) can alleviate latency.

For writing, then the Spark batch size (spark.cassandra.output.batch.size.bytes) should be within the Cassandra configured batch size (batch_size_fail_threshold_in_kb).

Writing more frequently will reduce the size of the write, reducing the latency. Increasing the batch size will reduce the number of times Cassandra has to deal with timestamp management. Spark can cause allocation failures if the batch size is too big.

For further documentation on connector settings, see  https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md.

Spark Security

Security has to be explicitly configured in Spark; it is not on by default. However, the configuration doesn’t cover all risk vectors, so review the options carefully. Also, most of these settings can be overridden in code accessing Spark, so it is important to audit your codebase and most important to limit connections from specific hosts further protected by user authentication.

Enable authentication

Authentication is turned off by default. It is enabled through two configuration variables, using a shared secret.

The two configuration variables are spark.authenticate (default is false; set to true) and spark.authenticate.secret (set to string of shared secret). If YARN is used, then much of this is done by default. If not, then set these two in the spark-defaults.conf file.

Then use this secret key when submitting jobs. Note that the secret key can be used to submit jobs by anyone with the key, so protect it well.

Enable logging for all submitted jobs

You can set spark.eventLog.enabled in the spark-defaults.conf file, but it can be overridden in a user’s code (e.g., in the SparkConf) or in shell commands, so it has to be enforced by business policy.

Note also that the log file itself (configured via spark.eventLog.dir) should be protected with filesystem permissions to avoid users snooping data within it.

Block Java debugging options in JVM

Make sure the JVM configuration does not include the following options: -Xdebug, -Xrunjdwp, -agentlib:jdwp.

Redact environment data in WebUI

Can disable the whole UI via spark.ui.enabled, but other than that, or overriding the EnvironmentListener with alternate custom code, there is no way to redact the information in the Environment tab of the UI specifically.

It is recommended to enable ACLs for both the WebUI and the history server, which will protect the entirety of the web-based information.

Enable and enforce SASL RPC encryption

The recommendation with Spark is to enable AES encryption since version 2.2, unless using an external Shuffle service. To enable SASL, set the following to true: spark.authenticate.enableSaslEncryption and spark.network.sasl.serverAlwaysEncrypt.

Enable encryption on all UIs and clients

To enable AES encryption for data going across the wire, in addition to turning on authentication as above, also set the following to true: spark.network.crypto.enabled. Choose a key length and set via spark.network.crypto.keyLength, and choose an algorithm from those available in your JRE and set via spark.network.crypto.keyFactoryAlgorithm.

Don’t forget to also set configuration from any database (e.g., Cassandra) to Spark, to encrypt that traffic.

Enable encryption on Shuffle service

In addition to the above encryption configuration, set the following to true: spark.network.crypto.saslFallback.

To encrypt the temporary files created by the Shuffle service, set this to true: spark.io.encryption.enabled. The key size and algorithm can also be set via spark.io.encryption.keySizeBits and spark.io.encryption.keygen.algorithm, but these have reasonable defaults.

Disable Spark REST server

The REST server presents a serious risk, as it does not allow for encryption. Set the following to false: spark.master.rest.enabled.

Operations

Monitoring Spark

Spark has built-in monitoring: https://spark.apache.org/docs/latest/monitoring.html

Apache Cassandra 4.0 Benchmarks

Apache Cassandra 4.0 will reach beta shortly and is the first version that will support JDK 11 and onwards. Latency is an obvious concern for Apache Cassandra™ users and big hopes have been put into ZGC, the new low latency garbage collector introduced in JDK 11. It reached GA in JDK 14, which made us eager to evaluate how good of a fit it would be for Apache Cassandra clusters. We also wanted to compare Apache Cassandra 3.11.6 performance against 4.0 and see if Shenandoah, RedHat’s garbage collector, should be considered for production use. In this post we will see that Cassandra 4.0 brings strong performance improvements on its own which are massively amplified by the availability of new garbage collectors: ZGC and especially Shenandoah.

Benchmarking methodology

The following benchmarks were conducted using tlp-cluster for provisioning and configuring Apache Cassandra clusters in AWS and using tlp-stress for load generation and metrics collection. All used tools are available as open source and benchmarks are easily reproducible for anyone with an AWS account.

Clusters were composed of 3 nodes using r3.2xlarge instances and a single stress node using a c3.2xlarge instance.

Default settings were used for Apache Cassandra, with the exception of GC and heap.

Cluster provisioning and configuration was done using the latest release of tlp-cluster. We recently added some helper scripts to automate cluster creation and also the installation of Reaper and Medusa.

After installing and configuring tlp-cluster according to the documentation, you’ll be able to recreate the clusters we used to run the benchmarks:

# 3.11.6 CMS JDK8
build_cluster.sh -n CMS_3-11-6_jdk8 -v 3.11.6 --heap=16 --gc=CMS -s 1 -i r3.2xlarge --jdk=8 --cores=8

# 3.11.6 G1 JDK8
build_cluster.sh -n G1_3-11-6_jdk8 -v 3.11.6 --heap=31 --gc=G1 -s 1 -i r3.2xlarge --jdk=8 --cores=8

# 4.0 CMS JDK11
build_cluster.sh -n CMS_4-0_jdk11 -v 4.0~alpha4 --heap=16 --gc=CMS -s 1 -i r3.2xlarge --jdk=11 --cores=8

# 4.0 G1 JDK14
build_cluster.sh -n G1_4-0_jdk14 -v 4.0~alpha4 --heap=31 --gc=G1 -s 1 -i r3.2xlarge --jdk=14 --cores=8

# 4.0 ZGC JDK11
build_cluster.sh -n ZGC_4-0_jdk11 -v 4.0~alpha4 --heap=31 --gc=ZGC -s 1 -i r3.2xlarge --jdk=11 --cores=8

# 4.0 ZGC JDK14
build_cluster.sh -n ZGC_4-0_jdk14 -v 4.0~alpha4 --heap=31 --gc=ZGC -s 1 -i r3.2xlarge --jdk=14 --cores=8

# 4.0 Shenandoah JDK11
build_cluster.sh -n Shenandoah_4-0_jdk11 -v 4.0~alpha4 --heap=31 --gc=Shenandoah -s 1 -i r3.2xlarge --jdk=11 --cores=8

Note: in order to conduct all benchmarks under similar conditions, a single set of EC2 instances was used throughout the tests.

An upgrade from Cassandra 3.11.6 to Cassandra 4.0~alpha4 was executed and JDKs were switched when appropriate using the following script:

#!/usr/bin/env bash

OLD=$1
NEW=$2
curl -sL https://github.com/shyiko/jabba/raw/master/install.sh | bash
. ~/.jabba/jabba.sh
jabba uninstall $OLD
jabba install $NEW
jabba alias default $NEW
sudo update-alternatives --install /usr/bin/java java ${JAVA_HOME%*/}/bin/java 20000
sudo update-alternatives --install /usr/bin/javac javac ${JAVA_HOME%*/}/bin/javac 20000

The following JDK values were used when invoking jabba:

  • openjdk@1.11.0-2
  • openjdk@1.14.0
  • openjdk-shenandoah@1.8.0
  • openjdk-shenandoah@1.11.0

OpenJDK 8 was installed using Ubuntu apt.

Here are the java -version outputs for the different JDKs that were used during the benchmarks:

jdk8

openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)

jdk8 with Shenandoah

openjdk version "1.8.0-builds.shipilev.net-openjdk-shenandoah-jdk8-b712-20200629"
OpenJDK Runtime Environment (build 1.8.0-builds.shipilev.net-openjdk-shenandoah-jdk8-b712-20200629-b712)
OpenJDK 64-Bit Server VM (build 25.71-b712, mixed mode)

jdk11

openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

jdk11 with Shenandoah

openjdk version "11.0.8-testing" 2020-07-14
OpenJDK Runtime Environment (build 11.0.8-testing+0-builds.shipilev.net-openjdk-shenandoah-jdk11-b277-20200624)
OpenJDK 64-Bit Server VM (build 11.0.8-testing+0-builds.shipilev.net-openjdk-shenandoah-jdk11-b277-20200624, mixed mode)

jdk14

openjdk version "14.0.1" 2020-04-14
OpenJDK Runtime Environment (build 14.0.1+7)
OpenJDK 64-Bit Server VM (build 14.0.1+7, mixed mode, sharing)

CMS

CMS (Concurrent Mark Sweep) is the current default garbage collector in Apache Cassandra. It was removed from JDK 14 so all tests were conducted with either JDK 8 or 11.

The following settings were used for CMS benchmarks:

-XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-XX:+CMSParallelRemarkEnabled
-XX:SurvivorRatio=8
-XX:MaxTenuringThreshold=1
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSWaitDuration=10000
-XX:+CMSParallelInitialMarkEnabled
-XX:+CMSEdenChunksRecordAlways
-XX:+CMSClassUnloadingEnabled
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=8
-Xms16G
-Xmx16G
-Xmn8G

Note that the -XX:+UseParNewGC flag was removed from JDK 11 and is then implicit. Using this flag would prevent the JVM from starting up.

We’ll keep CMS at 16GB of max heap as it could otherwise lead to very long pauses on major collections.

G1

G1GC (Garbage-First Garbage Collector) is easier to configure than CMS as it resizes the young generation dynamically, but delivers better with large heaps (>=24GB). This explains why it hasn’t been promoted to be the default garbage collector. It also shows higher latencies than a tuned CMS, but provides better throughput.

The following settings were used for G1 benchmarks:

-XX:+UseG1GC
-XX:G1RSetUpdatingPauseTimePercent=5
-XX:MaxGCPauseMillis=300
-XX:InitiatingHeapOccupancyPercent=70
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=8
-Xms31G
-Xmx31G

For 4.0 benchmarks, JDK 14 was used when running G1 tests.

We’re using 31GB of heap size to benefit from compressed oops and have the largest number of addressable objects for the smallest heap size.

ZGC

ZGC (Z Garbage Collector) is the latest GC from the JDK, which focuses on providing low latency with stop-the-world pauses shorter than 10ms. It is also supposed to guarantee that the heap size has no impact on pause times, allowing it to scale up to 16TB of heap.If these expectations are met, it could remove the need to use off-heap storage and simplify some development aspects of Apache Cassandra.

The following settings were used for ZGC benchmarks:

-XX:+UnlockExperimentalVMOptions
-XX:+UseZGC
-XX:ConcGCThreads=8
-XX:ParallelGCThreads=8
-XX:+UseTransparentHugePages
-verbose:gc
-Xms31G
-Xmx31G

We needed to use the -XX:+UseTransparentHugePages as a workaround to avoid enabling large pages on Linux. While the official ZGC documentation states it could (possibly) generate latency spikes, the results didn’t seem to show such behavior. It could be worth running the throughput tests using large pages to see how it could affect the results.

Note that ZGC cannot use compressed oops and is not affected by the “32GB threshold”. We’re using 31GB of heap to use the same sizing as G1 and allow the system to have the same amount of free RAM.

Shenandoah

Shenandoah is a low latency garbage collector developed at RedHat. It is available as a backport in JDK 8 and 11, and is part of the mainline builds of the OpenJDK starting with Java 13.

Like ZGC, Shenandoah is a mostly concurrent garbage collector which aims at making pause times not proportional to the heap size.

The following settings for Shenandoah benchmarks:

-XX:+UnlockExperimentalVMOptions
-XX:+UseShenandoahGC
-XX:ConcGCThreads=8
-XX:ParallelGCThreads=8
-XX:+UseTransparentHugePages
-Xms31G
-Xmx31G

Shenandoah should be able to use compressed oops and thus benefits from using heaps a little below 32GB.

Cassandra 4.0 JVM configuration

Cassandra version 4.0 ships with separate jvm.options files for Java 8 and Java 11.

These are the files:

  • conf/jvm-server.options
  • conf/jvm8-server.options
  • conf/jvm11-server.options

Upgrading to version 4.0 will work with an existing jvm.options file from version 3.11, so long as it is renamed to jvm-server.options and the jvm8-server.options and jvm11-server.options files are removed. This is not the recommended approach.

The recommended approach is to re-apply the settings found in the previous jvm.options file to the new jvm-server.options and jvm8-server.options files. The Java specific option files are mostly related to the garbage collection flags. Once these two files are updated and in place, it then becomes easier to configure the jvm11-server.options file, and simpler to switch from JDK 8 to JDK 11.

Workloads

Benchmarks were done using 8 threads running with rate limiting and 80% writes/20% reads. tlp-stress uses asynchronous queries extensively, which can easily overwhelm Cassandra nodes with a limited number of stress threads. The load tests were conducted with each thread sending 50 concurrent queries at a time. The keyspace was created with a replication factor of 3 and all queries were executed at consistency level LOCAL_ONE.

All garbage collectors and Cassandra versions were tested with growing rates of 25k, 40k, 45k and 50k operations per second to evaluate their performance under different levels of pressure.

The following tlp-stress command was used:

tlp-stress run BasicTimeSeries -d 30m -p 100M -c 50 --pg sequence -t 8 -r 0.2 --rate <desired rate> --populate 200000

All workloads ran for 30 minutes, loading approximately 5 to 16 GB of data per node and allowing a reasonable compaction load.

Note: The purpose of this test is not to evaluate the maximum performance of Cassandra, which can be tuned in many ways for various workloads. Neither is it to fine tune the garbage collectors which all expose many knobs to improve their performance for specific workloads. These benchmarks attempt to provide a fair comparison of various garbage collectors using mostly default settings when the same load is generated in Cassandra.

Benchmarks results

3.11.6 25k-40k ops/s:
3.11.6 latency charts - 25k-40k

4.0 25k-40k ops/s:
4.0-alpha4 latency charts - 25k-40k

4.0 45k-50k ops/s:
4.0-alpha4 latency charts - 25k-40k

Throughput

Throughput wise, Cassandra 3.11.6 maxed out at 41k ops/s while Cassandra 4.0 went up to 51k ops/s, which is a nice 25% improvement thanks to the upgrade, using CMS in both cases. There have been numerous performance improvements in 4.0 explaining these results, especially on heap pressure caused by compaction (check CASSANDRA-14654 for example).

Shenandoah in jdk8 on Cassandra 3.11.6 fails delivering the maximum throughput in the 40k ops/s load test and starts showing failed requests at this rate. It behaves much better with jdk11 and Cassandra 4.0. It can now nearly match the throughput from CMS with a maximum at 49.6k ops/s. Both G1 and Shenandoah, with jdk 8 maxed out at 36k ops/s overall with Cassandra 3.11.6.

G1 seems to have been improved in jdk14 as well and beats jdk11 with a small improvement from 47k/s to 50k/s.

ZGC fails at delivering a throughput that matches its contenders in both jdk11 and jdk14, with at most 41k ops/s.

Average write p99 tableAverage write p99 (ms)

Average read p99 tableAverage read p99 (ms)

Max p99 tableMax p99 (ms)

Shenandoah in jdk8 delivers some very impressive latencies under moderate load on Cassandra 3.11.6, but performance severely degrades when it gets under pressure.

Using CMS, Cassandra 4.0 manages to keep an average p99 between 11ms and 31ms with up to 50k ops/s. The average read p99 under moderate load went down from 17ms in Cassandra 3.11.6 to 11.5ms in Cassandra 4.0, which gives us a 30% improvement.

With 25% to 30% improvements in both throughput and latency, Cassandra 4.0 easily beats Cassandra 3.11.6 using the same garbage collectors.

Honorable mention to Shenandoah for the very low latencies under moderate load in Cassandra 3.11.6, but the behavior under pressure makes us worried about its capacity of handling spiky loads.

While ZGC delivers some impressive latencies under moderate load, especially with jdk14, it doesn’t keep up at higher rates when compared to Shenandoah. Average p99 latencies for both reads and writes are the lowest for Shenandoah in almost all load tests. These latencies combined with the throughput it can achieve in Cassandra 4.0 make it a very interesting GC to consider when upgrading. An average p99 read latency of 2.64ms under moderate load is pretty impressive! Even more knowing that these are recorded by the client.

G1 mostly matches its configured maximum pause time of 300ms when looking at the max p99, but using lower target pause could have undesirable effects under high load and trigger even longer pauses.

Under moderate load, Shenandoah manages to lower average p99 latencies by 77%, with a top low at 2.64ms. This will be a major improvement for latency sensitive use cases. Compared to CMS in Cassandra 3.11.6, it’s a whopping 85% latency reduction for reads at p99!

Honorable mention to ZGC in jdk14 which delivers some great performance under moderate load but sadly can’t yet keep up at higher rates. We are optimistic that it will be improved in the coming months and might eventually compete with Shenandoah.

Final thoughts

G1 brought improvements in Cassandra’s usability by removing the need to fine tune generation sizes at the expense of some performance. The release of Apache Cassandra 4.0, which brings very impressive boosts on its own, will allow using new generation garbage collectors such as Shenandoah or ZGC, which are both simple to implement with minimal tuning, and more efficient in latencies.

Shenandoah is hard to recommend on Cassandra 3.11.6 as nodes start to misbehave at high loads, but starting from jdk11 and Cassandra 4.0, this garbage collector offers stunning improvements in latencies while almost delivering the maximum throughput one can expect from the database.

Your mileage may vary as these benchmarks focused on a specific workload, but the results make us fairly optimistic in the future of Apache Cassandra for latency sensitive use cases, bringing strong improvements over what Cassandra 3.11.6 can currently deliver.

Download the latest Apache 4 build and give it a try. Let us know if you have any feedback on the community mailing lists or in the ASF Slack.

Cassandra Data Modeling Best Practices Guide

Apache Cassandra is an open source non-relational, or NoSQL, distributed database that enables continuous availability, tremendous scale, and data distribution across multiple data centers and cloud availability zones. Simply put, it provides a highly reliable data storage engine for applications requiring immense scale.

Data modeling is a process used to analyze, organize, and understand the data requirements for a product or service. Data modeling creates the structure your data will live in. It defines how things are labeled and organized, and determines how your data can and will be used. The process of data modeling is similar to designing a house. You start with a conceptual model and add detail to produce the final blueprint. 

The ultimate goal of Cassandra data modeling and analysis is to develop a complete, well organized, and high performance Cassandra cluster. Following the five Cassandra data modeling best practices outlined will hopefully help you meet that goal:

  1. Cassandra is not a relational database, don’t try to model it like one
  2. Design your model to meet 3 fundamental goals for data distribution
  3. Understand the importance of the Primary Key in the overall data structure 
  4. Model around your queries but don’t forget about your data
  5. Follow a six step structured approach to building your model. 

Cassandra Is Not a Relational Database

Do not try to design a Cassandra data model like you would with a relational database. 

Query first design: You must define how you plan to access the data tables at the beginning of the data modeling process not towards the end. 

No joins or derived tables: Tables cannot be joined so if you need data from more than one table, the tables must be merged into a denormalized table.   

Denormalization: Cassandra does not support joins or derived tables so denormalization is a key practice in Cassandra table design.

Designing for optimal storage: For relational databases this is usually transparent to the designer. With Cassandra, an important goal of the design is to optimize how data is distributed around the cluster. 

Sorting is a Design Decision: In Cassandra, sorting can be done only on the clustering columns specified in the PRIMARY KEY.

The Fundamental Goals of the Cassandra Data Model

Distributed data systems, such as Cassandra, distribute incoming data into chunks called partitions.  Cassandra groups data into distinct partitions by hashing a data attribute called partition key and distributes these partitions among the nodes in the cluster. 

(A detailed explanation can be found in Cassandra Data Partitioning.)

A good Cassandra data model is one that: 

  1. Distributes data evenly across the nodes in the cluster
  2. Place limits on the size of a partition
  3. Minimizes the number of partitions returned by a query.

Distributes Data Evenly Around the Cassandra Cluster

Choose a partition key that has a high cardinality to avoid hot spots—a situation where one or a few nodes are under heavy load while others are idle.   

Limits the Size of Partitions

For performance reasons choose partition keys whose number of possible values is bounded. For optimal performance, keep the size of a partition between 10 and 100MB. 

Minimize the Number of Partitions Read by a Single Query 

Ideally, each of your queries will read a single partition. Reading many partitions at one time is expensive because each partition may reside on a different node. The coordinator (this is the node in the cluster that first receives the request) will generally need to issue separate commands to separate nodes for each partition you request. This adds overhead and increases the variation in latency. Unless the data set is small, attempting to read an entire table, that is all the partitions, fails due to a read timeout. 

Understand the Importance of the Primary Key

Every table in Cassandra must have a  set of columns called the primary key. (In older versions of Cassandra, tables were called column families). In addition to determining the uniqueness of a row, the primary key also shapes the data structure of a table. The Cassandra primary key has two parts:

Partition key: The first column or set of columns in the primary key. This is required. The hashed value of the partition key value determines where the partition will reside within the cluster.

Clustering key (aka clustering columns): Are the columns after the partition key. The clustering key is optional. The clustering key determines the default sort order of rows within a partition.  

A very important part of the design process is to make sure a partition key will: 

  1. Distribute data evenly across all nodes in a cluster.  Avoid using keys that have a very small domain of possible values, such as gender, status, school grades, and the like.  The minimum number of possible values should always be greater  than the number of nodes in the cluster.  Also, avoid using keys where the distribution of possible values is highly skewed. Using such a key will create “hotspots” on the cluster. 
  2. Have a bounded range of values. Large partitions can increase read latency and cause stress on a node during a background process called compaction. Try to keep the size of partitions under 100MB. 

Model Around Your Queries

The Cassandra Query Language (CQL) is the primary language used to communicate with a Cassandra database. In syntax and function, CQL resembles SQL which makes it easy for those who know the latter to quickly learn how to write queries for Cassandra. But there are some important differences that affect your design choices. 

A well known one is that Cassandra does not support joins or derived tables. Whenever you require data from two or more tables, you must denormalize. 

Search conditions have restrictions that also impact the design. 

  • Only primary key columns can be used as query predicates. (Note: a predicate is an operation on expressions that evaluates to TRUE, FALSE).
  • Partition key columns are limited to equality searches. Range searches can only be done on clustering columns.
  • If there are multiple partition key columns (i.e. a composite partition key), all partition columns must be included in the search condition.
  • Not all clustering columns need to be included in the search condition. But there are some restrictions: 
    • When omiting columns you must start with the rightmost column listed in the primary key definition;  
    • An equality search cannot follow a range search.

Don’t Forget About the Data

Creating a complete Cassandra data model involves more than knowing your queries. You can identify all the queries correctly but if you miss some data, your model will not be complete.  Attempting to refactor a mature Cassandra data can be an arduous task. 

Developing a good conceptual model (see below) will help identify the data your application needs. 

Take a Structured Approach to Design 

In order to create a data model that is complete and high performing, it helps to follow a big data modeling methodology for Apache Cassandra that can be summarized as: 

  1. Data Discovery (DD). This is a high level view of the data your application needs and identifies the entities (things), the attributes of the entities, and which attributes are the identifiers. This may be an iterative process as development. 
  2. Identify the Access Patterns (AP).  Identify and list the queries your application will want to perform.  You need to answer: What data needs to be retrieved together, what are the search criteria, and what are the update patterns? This also may be an iterative process. 
  3. Map data and queries (MDQ).  Maps the queries to the data identified in steps 1 and 2 to create logical tables which are high level representations of Cassandra tables.
  4. Create the physical tables (PT).  Convert the logical data model to a physical data model (PDM) by using CQL CREATE TABLE statements. 
  5. Review and Refine physical data model.  Confirm that the physical tables will meet the 3 Basic Goals for Cassandra Data Model.

Structured approach to cassandra data model design

A more detail examination of these steps can be found in an earlier Instaclustr Whitepaper: 6 Step Guide to Apache Cassandra Data Modelling

If you have worked with relational database design, some steps will be familiar because they are also in the entity-relationship (ER) model.  At the conceptual stage, it can be useful to visually represent the data model by ER diagrams using either the Chen or Information Engineering (IE) notation. The Chebotko diagram uses a notation developed by Artem Chebotko to represent data and queries at the logical and physical modeling stages. 

Cassandra Model Example

Let’s assume that we have a simple logging system with two entities: LogSource and LogMessage.  For LogSource the key attribute is sourceName.  For the entity LogMessage, the key attribute is messageID.  

The query we want to execute is:  Q1) show the message information about the 10 most recent messages for a given source. 

The primary access entity is LogSource because it contains the equality search attribute (sourceName).  We create a logical table named LogMessage_by_Source and push the attribute sourceName into it. That becomes the partition key (indicated by the K).

We need to sort by time so messageTime becomes the clustering column in  LogMessage_by_Source.  (Indicated by C↑) 

The secondary entity is LogMessage. The key attribute messageID becomes a 2nd clustering column of the primary key in  LogMessage_By_Source to ensure uniqueness of the row.  Finally, we add the remaining columns from the secondary source to complete the data needed by the query. 

An example of Cassandra data model

Data Duplication 

Data duplication refers to the number of times data must be duplicated in different tables to satisfy access patterns.   For example, if  we wanted to search for a specific message by its  unique identifier we would duplicate the data by creating a new table called LogMessage_by_ID that uses  messageID as the partition key.

Two issues can arise from duplication: 

  • Increased complexity to maintain  data integrity across multiple tables; 
  • If the data being duplicated is very large it puts size and write pressure on the database.

In a case where data duplication would cause more problems than it solves, an alternative is to duplicate only lookup keys, that is a lookup table. However, this solution requires the client perform a second query to read the secondary data. The trade-off between read performance and data maintenance cost needs to be judged in the context of the specific performance requirements of your application and such a solution would need to be benchmarked to ensure that it is a workable solution.

Materialized Views

These are objects created by a query which are copies of a base table but with a different partition key. The data between the materialized view and the base table is automatically synchronized by Cassandra. Their purpose was to make modeling to new query patterns easier and more flexible.  

Instaclustr’s advice is not to use them in Cassandra 3.x because of problems in keeping the view and the base table synchronized. The Apache Cassandra project has classified Materialized Views as an experimental feature for Cassandra 3.x. 

Summary

Cassandra Data modeling is a process used to define and analyze data requirements and access patterns on the data needed to support a business process. 

A data model helps define the problem, enabling you to consider different approaches and choose the best one.  It ensures that all necessary data is captured and stored efficiently. 

Models document important concepts and jargon, proving a basis for long-term maintenance.

Creating a Cassandra is a non-relational database.  Do not design it as you would a relational database. Don’t be afraid to denormalize data. Writes in Cassandra are relatively cheaper than for relational databases.

 The goals of a successful Cassandra Data Model are to choose a partition key that (1)  distributes data evenly across the nodes in the cluster; (2) minimizes the number of partitions read by one query, and (3) bounds the size of a partition.

Take a structured approach to your model. Your first steps are understanding your and identifying access patterns on the data. These are most critical to developing a complete model.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post Cassandra Data Modeling Best Practices Guide appeared first on Instaclustr.

Cassandra Monitoring: A Best Practice Guide

Introduction to Cassandra Monitoring

Apache Cassandra is a NoSQL database designed to provide scalability, reliability, and availability with linear performance scaling. Cassandra database is designed as a distributed system, and aims to handle big data efficiently. Refer to what-is-apache-cassandra and cassandra-architecture for more information. Note that knowledge of Cassandra architecture and basic terminology is a prerequisite to understanding Cassandra monitoring. 

Cassandra monitoring is an essential area of database operations to ensure the good health of a cluster and optimal performance. Alerting is another crucial area for production systems, and it is complementary to monitoring. Good alerting in Cassandra can be achieved by utilization of the monitoring infrastructure and relevant toolset. Alerting and monitoring help create a robust environment for any Cassandra deployment.

This blog post aims to touch all important aspects of Cassandra monitoring. We hope it provides the reader with crucial information about monitoring tools, components, and metrics.

Monitoring Terminologies

JVM Based Monitoring

Cassandra is developed in Java and is a JVM based system. Each Cassandra node runs a single Cassandra process. JVM based systems are enabled with JMX (Java Management Extensions) for monitoring and management. Cassandra exposes various metrics using MBeans which can be accessed through JMX. Cassandra monitoring tools are configured to scrape the metrics through JMX and then filter, aggregate, and render the metrics in the desired format. There are a few performance limitations in the JMX monitoring method, which are referred to later. 

The metrics management in Cassandra is performed using Dropwizard library. The metrics are collected per node in Cassandra. However, those can be aggregated by the monitoring system. 

Cassandra Monitoring

Metrics 

There are a large number of metrics exposed by Cassandra to cover all possible areas including performance, resources, communication, node, and cluster state etc. The metrics are defined with distinct types, and those can be categorized as well for operational ease.    

Metrics Types

Cassandra metrics are defined with specific data types. These types are designed to accommodate metrics representations to represent the metrics like latency, counts, and others correctly. 

The metrics types are not intuitive and you might need some time to get familiar. 

  • Gauge: A single value representing a metric at a specific point in time, e.g. value of memory allocated or a number of active tasks. 
  • Counter: Counters are the same as a gauge but are used for value comparisons. Generally, a counter is only incremented, and it is reset when the functionality gets disrupted like a node restart. An example is cache_hit count.
  • Histogram: Histogram is a count of data elements from a data stream grouped in fixed intervals. A histogram gives a statistical distribution of values. The data elements are provided over min, max, mean, median, 75th, 90th, 95th, 98th, 99th, 99.9th percentile value intervals. 
  • Timer: Timer keeps the rate of execution and histogram of duration for a metric. 
  • Latency: This is a special type to measure latency. It includes Timer and the latency is in microseconds. There is also a TotalLatency with each latency metric. The total latency is the count of latency since the beginning. The beginning means the start of a node. 
  • Meter: Meter is a unit to measure throughput. It also includes a weighted moving average for first, fifth, and fifteenth minute.

Metrics Categories

The metrics are categorised based on Cassandra domains, e.g. table, keyspace, storage, communication, JVM etc. Not all metrics should be monitored all the time, but those should be available in case required, i.e. during troubleshooting or application performance testing. 

The metrics are further subdivided in terms of broader areas like resources, network, internals, crucial data elements etc. Metrics can be represented as per topology levels like cluster level, node level, table level etc. to organize all the information. 

The categorization becomes clear as we go through specific metrics and correlate those with specific Cassandra areas.

Metrics Format

The Cassandra dropwizard metrics are specified in format below:

Dropwizard Metric Name: org.apache.cassandra.metrics.<Metric scope>.<Metric type>.<MetricName>

Mbean: org.apache.cassandra.metrics:type=<Metric type> scope=<Metric scope> name=<MetricName>

Metric Type: This is the category of metrics e.g. table, keyspace, threadpool. Do not confuse this with the data type of metrics.

Metric scope: This is the metric sub type for more granularity wherever required. The scope is hence optional. E.g. the table name or keyspace name. 

Metric name: The final metric name like LiveSSTableCount. 

Essential Metrics

Cassandra Metrics

Node Status 

The status of nodes must be monitored and alerted immediately if a node is down. Cassandra cluster’s availability directly depends on the uptime of all the nodes in the cluster. Although the anti-entropy mechanism in Cassandra helps protect data from inconsistency, there is no replacement for lost performance during a node downtime. A down node puts pressure on other nodes in the data center to handle requests and store hints. Hence, downtime for a node should be minimum. 

Cassandra operational activity requires node restart or downtime but those can be scheduled at least busy times for the cluster. This alert helps keep track of any service disruption and the need to run repair a node. A node should be repaired if it is out of the cluster for more than the hinted handoff window which is three hours by default. 

Client Request Metrics

The client requests metrics provide information about client communication in forms of read and write requests per second between the client and a coordinator node. Other than normal read and write requests, there are special types of read and write operations CAS, View, and RangeSlice which have their own set of metrics. These metrics help to track the request count, latency, failures, and a few other statistics. The basic statistic to monitor is the number of requests per seconds, i.e. throughput and request latency.

Requests Per Second

The number of requests should be aggregated per data center and per node. There could be some nodes receiving more requests as compared to other nodes. This behaviour creates extra pressure for the nodes receiving more requests. The specific requests like CAS and RangeSlice should be tracked separately for clarity. These operations are resource-intensive and have a unique effect on the nodes. The ratio of read requests to write requests is crucial to understand the type of workload. There are specific configurations to optimize a read-heavy or a write-heavy workload. 

Each cluster can handle a certain amount of client requests per second efficiently. If the number of requests exceeds the cluster capacity, it can result in undesirable results like dropped messages, inconsistency, increased latency etc. The CAS and RangeSlice request can cause increased latency. 

Uneven load on a few nodes can be handled with optimal load balancing at the driver side. The read and write latency or throughput issues caused by constant overloading should be addressed by adding more nodes to the data center and revisiting the data model if required.

Alerting: Set alerts on the number of requests threshold served per node and data center. 

Client Request Latency

Latency tracked by these metrics is the read and write latency experienced by client applications. There are various percentiles of latency, as mentioned in latency metric type. These metric types should be tracked separately as well as overall values so that there is a clear view of system performance metrics. Production systems generally have latency SLAs. The SLA on a specific or overall latency should be tracked and alerted upon the client latency.

There are various factors which affect latency including, the amount of load served by a node or cluster, system resources and tuning, GC settings and behaviour, type of requests. Troubleshooting latency issues mainly depends on the accurate investigation of the root cause. Correlating latency metrics with other metrics helps to track down root causes. Using a graph solution like Grafana for visualization is the most efficient way to sight and track issues.

Alerting: Set alerts for latency SLA thresholds if any or expected latency range.

Request Timeout and Failure 

These metrics are the number of client requests timed out or failed. Failed requests are a clear indication of errors, and those should be addressed immediately. The common causes for request failure are unavailability of data, failure to get a response from the required number of replicas, data inconsistency, and network error. Troubleshooting for error is performed using the error messages and other metrics correlation. 

Alerting: Set alerts for more than a few failure requests on production systems.

Compaction Statistics 

This group of metrics include the amount of data compacted, the number of active/completed compactions, and other relevant details. Compactions consume node resources and could consume the disk space quickly. Monitoring compactions provides a good insight into the compaction strategy used as each strategy has a unique operational footprint. Specific Cassandra operations like repairs, high volume data writes, add/remove/replace nodes etc. increase the compaction activity. It is important to monitor the compactions while performing such operations. 

A common troubleshooting method for high compaction activities and high resource consumption is to throttle the compaction rate. In some scenarios, compactions can be temporarily stopped, but it requires a lot of caution and must be re-enabled at some point to keep the SSTable count low, and read latency optimal.

Alerting: Alerting is not essential for these metrics. However, alerts can be set if there are a higher number of pending compactions sustained for longer than expected time interval.

Garbage Collector Metrics

The Garbage Collector (GC) is yet another crucial area for monitoring. The efficiency of Cassandra throughput and performance depends on the effective use of JVM resources and streamlined GC. The GC behaviour mainly depends on these factors—the garbage collector used, the workload served by Cassandra nodes, GC parameter settings, the heap size for JVM etc. A common issue with garbage collection is long GC pause or the time taken to perform garbage collection. 

The GC works well with the default settings by Cassandra, but those can be tuned if required to suit a specific workload and the number of resources. GC parameter tuning is a non-trivial task and requires knowledge of GC internals. However, sometimes the GC can be resolved by fixing the data model, changing the workload, or JVM resources. It is essential to correlate bad GC behaviour with the exact root cause before performing a remedy. Also, any change in parameters impacting GC should be monitored carefully to ensure improvements. 

Alerting: Set alert on GC pauses for more than acceptable thresholds on production systems.

Memory Metrics

The memory metrics provide JVM heap, non-heap, and total memory used by Cassandra. The JVM heap storage is used heavily for a variety of purposes by Cassandra. The non-heap memory is also used a lot by later versions of Cassandra. Monitoring the heap and overall memory gives insight into memory usage. It can be used to correlate with any issues and determine memory requirements. 

Please note, Cassandra cannot scale with an indefinite amount of memory. This boils down to the fact that JVM and GC cannot perform optimally for large heap size. The most common range of heap size for Cassandra is 8GB-32GB where the smaller size is configured with CMS GC and the larger size with G1GC.

Alerting: Set alerts to test specific memory thresholds and tuning.  

Threadpool Metrics

Cassandra works with numerous thread pools internally. This design is aimed to achieve asynchronous tasks, and it also helps to handle back pressure. Monitoring for the thread pools makes it easy to understand the internal system behaviour. It also helps to understand  specific pools under pressure with active, pending, and blocked tasks. 

The solution for constantly saturated pools generally is to provide more processing capacity to the node or the cluster. Other core issues like poor data model and query pattern also impact on the thread pools. 

Alerting: Set alerts for more than a few blocked tasks on the production system. This helps take preventive action to help avoid performance impact.

Table Metrics 

Table metrics are useful in tracking each table independently. These can be used to monitor a specific set of tables which are performance-critical or host a large volume of data. There are various metrics for each table but some of the most important are discussed here:  

Partition Size

The partition size is a crucial factor in ensuring optimal performance. Cassandra uses partitions of data as a unit of data storage, retrieval, and replication. Hence, if the partition size is larger it impacts overall performance. The ideal range of partition size is less than 10MB with an upper limit of 100MB. These values are derived from operational experience from the Cassandra community. 

The data model and table definition control the partition size. The partition key for a table determines the data to create partitions. A partition key should be designed to accumulate data only up to acceptable size limits. Unfortunately, it is not easy to replace current partitions for a table. But, if the data model is in the design phase, it is crucial to test all the table definitions for potential large partitions sizes. In the existing tables, if large partitions are a major issue, they can be addressed by complete data rewrite. This operation could be long-running, but it can solve many performance issues, and if configured correctly, it can be performed without minimal or no downtime for the table. 

Alerting: Configure alerts on large partitions for tables with unbounded partitions. An unbounded partition is where the partition grows in size with new data insertion and does not have an upper bound.

Tombstone Scanned

Tombstones are the deletion markers in Cassandra. Tombstones are produced by data deletion, and it could be performed using various means like delete queries, TTL expiry, null inserts etc. The immutable design of SSTables and compaction operations makes tombstone eviction difficult in some scenarios. Tombstone presence directly impacts read performance; its effect increases with the number of tombstones scanned per operation. This metric provides a histogram of tombstones read for a table’s queries in recent time. 

The troubleshooting for tombstone eviction can be performed using various options like revisiting the compaction strategy, major compaction, nodetool garbagecollect etc. Note that all the mentioned remedies for tombstone eviction could operate on a large set of SSTables and are non-trivial operations. The operations must be well tested before executing on production. 

Alerting: Set alerts for tombstones-scanned per read metrics for performance-sensitive tables. 

SSTable Per Read

These metrics are related to the immutable design of SSTables and read operation. The SSTables are created per table, and the data is arranged sequentially in the order it is written. This results in multiple SSTable reads to complete a single read operation. The number of SSTables read contributes to the time consumed to complete the read operation. Hence, the number of SSTables per read should be minimized. 

A good number of SSTables per read is a relative value and depends on the data volume and compaction strategy. However, as a general rule, those should be less than 10. The compaction strategy used for a table plays a crucial role in this metric. A table should be configured with optimum compaction strategy as per the table usage. Repair operation plays a role in keeping the SSTables consistent and hence also indirectly impacts this metric. All the data in Cassandra should ideally be repaired once per gc_grace_seconds cycle. 

Alerting: Set alerts for all the read performance-sensitive and high data volume tables for SSTables per read. 

Additional Metrics

It is difficult to cover all the metrics present in Cassandra in this blog post, and it is also difficult to predict the most useful ones in general. I have tried to cover the most used metrics individually. But there are still some crucial metrics which are useful for getting insight in specific Cassandra areas. Let’s look at those briefly:

Dropped Messages

Cassandra handles many forms of messages corresponding to various functions. These messages can get dropped mostly due to load or communication error etc. The dropping of messages causes data inconsistency between nodes, and if those are frequent, it can cause performance issues. It is necessary to identify the cause of dropped messages. If those occur frequently or if those are in large numbers, the system resources and data model should be revisited. Alerts should be set for an unexpected occurrence or number of dropped messages. 

Caches For Tables

Cassandra uses quite some cache, and those are configurable. The cache metrics are useful to track effective use of a particular cache. A good example is the use of row cache for frequently accessed rows in a table. If caching hot data in row cache improves the cache hits, it is a successful use of the row cache. 

Data Streaming

Streaming is used while booting up new nodes, repair operations, and during some other cluster operations. Streaming operations can move many data across a cluster and hence consume network bandwidth. The streaming metrics are useful for monitoring node activities and repairs when planned. The streaming rate can be controlled if required to spare the bandwidth for operations.

Hinted Handoff 

Hints are a part of the anti-entropy mechanism, and those try to protect nodes from data loss when those are offline. Hints are stored and transferred, so metrics related to these attributes and delivery success, failure, delays, and timeouts are exposed. 

The hints metrics are useful to monitor all hints activities. A lot of hints stored and used indicate nodes being offline where hint delays, failures indicate a network or other communication issues.

CQL and Batch 

CQL metrics include the number of statements executed of each type. The batch metrics include the number of batch statements executed. These metrics help to monitor the application activity and query semantics used. Use of logged and unlogged batches has its caveats in Cassandra, and they can cause performance penalty if not used correctly. 

System Metrics

These metrics are not exported by Cassandra but those are obtained from the OS. These metrics are equally important as the Cassandra metrics to obtain system insights. 

Disk Usage

The disk usage is subject to monitoring as Cassandra is optimized to write a lot of data in quick time. The real risk for disk fillup is from compactions. The default compaction strategy used for Cassandra is SizeTieredCompactionStrategy STCS. This strategy merges many SSTables and outputs a single SSTable. The resulting SSTable can have a size equal to the combined size of all the SSTables merged in it. Also, until a compaction operation ends, both old and new SSTables exist on disk. 

The disk space guidelines for a cluster with most tables using STCS is to utilise the disk space up to 50% and to leave the rest as a room for compactions. Generally, disk space is cheaper in cost as compared to other resources and there is no harm to keep vacant space on nodes. However, if there is limited disk space available, disk monitoring becomes even more crucial as free disk left for compactions can be reduced further than general guidelines. 

Remedy for high disk usage includes snapshot deletion as those can consume a considerable amount of space. Another method is to stop specific compaction operation; this frees space consumed by the new SSTables. The time until the compaction starts again can be utilizd to add more space. 

Alerting: Set alerts for various stages of disk usage. The alerts can be categorized for severity based on the amount of free disk space on a node. 

CPU Usage

CPU capacity in a Cassandra cluster contributes as the main processing capacity. The number of requests served by a node and the amount of data stored are the factors directly proportional to the CPU utilization. CPU utilization should be monitored to ensure the nodes are not overloaded. 

A Cassandra cluster or a single data center should have all the nodes of similar size. Those should have an equal number of CPU cores, and the CPU utilization should also be equivalent. A single node or a few nodes with high CPU is an indication of uneven load or request processing across the nodes. It is observed that Cassandra is not CPU bound in most cases. However, a cluster or data center with high CPU utilization at most times should be considered for node size upgrade. 

Alerting: Set alerts for specific levels of CPU utilization on nodes or just for a single threshold. The levels can be defined as per expected CPU load, e.g. 80%, 90%, >95% etc. 

Monitoring tools

There are various tools available to set up Cassandra monitoring. I am describing here a few popular open-source tools used widely across the Cassandra community.

Prometheus

Prometheus is a metrics tool used for handling time-series based monitoring. It has alerting capability as well, which works on the time-series metrics. Prometheus can be configured to collect Cassandra metrics from nodes as well as the system metrics of the nodes. Prometheus uses exporters which are installed on the nodes and export data to Prometheus.  

Prometheus runs with a time-series database to store metrics. The metrics are stored in the database and can be queried using promQL, a query language for Prometheus. Prometheus also runs a web UI which can be used to visualise the actual metrics, graphs, alert rules etc. 

Alertmanager is the extension used for configuring alerts. Alertmanager has various integrations available for alerting including email, slack, hipchat, pagerduty etc. Prometheus has evolved over time, and it integrates well with the dropwizard metrics library. 

Prometheus - time-series based cassandra monitoring

Grafana

Grafana is a visualisation tool which can be used to visualize any time-series metrics. Grafana has various panels to showcase the data. The most commonly used panel is a graph. A graph is used to plot incoming data against a time-series in two dimensions. 

Grafana integrates with various data sources. These sources are queried in real-time by Grafana to obtain metrics. Each Grafana panel has one or more queries configured to query a data source; the result of the query is rendered on the panel. Grafana uses Prometheus as a well-integrated data source.

Grafana - Time series metrics visualization

Cassandra Exporter

Cassandra exporter is Instaclustr’s open-source solution for collecting Cassandra metrics efficiently. It is designed to integrate with Cassandra JVM and collect and publish metrics. Hence, Cassandra exporter is a replacement for the JMX metrics. 

JMX metrics in Cassandra have performance limitations and hence can cause some issues if used on systems with a large number of nodes. The Cassandra exporter has been well tested for optimal performance monitoring. The metrics produced by Cassandra exporter are also time-series and can be readily consumed by Prometheus. Please refer to the github page for information regarding configuration and usage. 

Conclusion

Cassandra monitoring is essential to get insight into the database internals. Monitoring is a must for production systems to ensure optimal performance, alerting, troubleshooting, and debugging. There are a large number of Cassandra metrics out of which important and relevant metrics can provide a good picture of the system. 

Finally, Instaclustr has the Cassandra monitoring expertise and capability with various options. 

  • Cassandra exporter is an excellent open source tool for optimal monitoring performance on large Cassandra clusters. 
  • Instaclustr Cassandra managed service uses a comprehensive monitoring-alerting service with 24×7 support and it is a good option to outsource all Cassandra operations and it comes with a free trial.

Instaclustr Cassandra Consulting services can help you with any monitoring or other Cassandra operations.

The post Cassandra Monitoring: A Best Practice Guide appeared first on Instaclustr.