Spark Powered by Scylla – Your Questions Answered

spark webinar

Spark workloads are common to Scylla deployments. Spark helps users conduct analytic workloads on top of their transactional data. Utilizing the open source Spark-Cassandra connector, Scylla enables access of Spark to the data.

During our recent webinar, ‘Analytics Showtime: Spark Powered by Scylla’ (now available on-demand), there were several questions that we found worthy of additional discussion.

Do you recommend deploying Spark nodes on top of Scylla nodes?

In general, we recommend separating the two deployments. There are several reasons for this recommendation:

  • Resource Utilization. Both Scylla and Spark are resource hungry applications. Co-deploying Scylla and Spark can cause resource depletion and contentions between Scylla and Spark.
  • Dynamic allocations. Scylla in most cases is a static deployment — you deploy a number of nodes as planned for your capacity (throughput and/or latency SLA). Spark jobs, on the other hand, have ad-hoc characteristics. Users can benefit from deploying and decommissioning Spark nodes without accruing money or performance costs.
  • Data locality impact in minimal. Since Scylla hashes the partition keys, the probability of a continuous placement of multiple Scylla partitions that are part of an RDD partition is slim. Without the collocation of data on the same node, users utilize the network to transfer data from the Scylla nodes to the Spark nodes, whether collocating or not collocating the data.

What tuning options would you recommend for very high write workloads to Scylla from Spark?

We recommend looking at the following settings:

  • The number of connections opened between your Spark executors and Scylla. You can monitor the number of open connections using Scylla’s monitoring solution.
  • Look at your data model. Can the Spark connector utilize its batch processing efficiently? Make sure that the Scylla partition key used in the batch is always the same. On the other hand, if you are about to create a huge partition, also consider the amount of time it will take Scylla to fetch that large partition
  • In the case of changing the buffer behavior to not batch, you should tune: output.batch.grouping.key to none
  • If your Spark nodes have enough power and network bandwidth available in your Spark and Scylla instances is 10gb or higher, increase the number of concurrent writes by changing “output.concurrent.writes” from its default of 5.

Does Scylla send the data compressed or uncompressed over the network to Spark?

We recommend compressing all communication through the Cassandra-spark connector.
Users can define the compression algorithm used in the configuration file of the connector.
Set “connection.compression” to LZ or Snappy to achieve the desired compression and reduction in network traffic.

What does the setting input.split.size_in_mb help with?

This is the default setting for the input split size and is set to 64MB by default. This is also the basic size of an RDD partition. This means that every fetch of information from Scylla will “fetch” 64MB. Such a setting is less efficient for Scylla’s architecture, as it means a single coordinator will have to deal with a fairly sizable read transaction while its counterparts are sitting idle. By reducing the split size to 1MB, we achieve several benefits:

  • Coordination and data fetching is distributed among several coordinators
  • Higher concurrency of requests, which translates to a higher number of connections between the Spark nodes and Scylla nodes

Does your demo use Spark standalone?

Yes, in our demo we are using Spark standalone. In most installations, in which Scylla is involved, we see Spark installed in standalone mode.

Miss the webinar or want to see it again? ‘Analytics Showtime: Spark Powered by Scylla’, is available for viewing on-demand.

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Spark Powered by Scylla – Your Questions Answered appeared first on ScyllaDB.

More Efficient Query Paging with Scylla 2.2

efficient query paging

In this blog post, we will look into Scylla’s paging, address some of the earlier problems with it, and describe how we solved those issues in our recently released Scylla 2.2. In particular, we will look at performance problems related to big partitions and how we alleviated them to improve throughput by as much as 2.5X. We will restrict ourselves to queries of a single partition or a discrete set of partitions (i.e., “IN” queries). We will address scanning queries, which return a range of partitions, in a separate post.

Prior to Scylla 2.2, Scylla’s paging was stateless. This meant that at the end of each page, all objects related to the query were discarded and all associated resources were freed. While this approach led to simple code when compounded with how Scylla works (no page-cache or any other OS cache are used), it also led to a lot of duplicate work. Before we cover what this duplicate work is and how we can avoid it, let’s first look into what exactly paging is and how it works in Scylla.

What is Paging?

Queries can return any amount of data. The amount of data that is returned is known only when the query is executed. This creates all sorts of resource management problems for the client as well as for the database. To avoid these problems, query results are transmitted in pages of limited size, one page at a time. After transmitting each page, the database stops and waits for the client to request the next one. This is repeated until the entire result set is transmitted. The size of pages can be limited by the client by limiting the number of rows that they can contain. There is also a built-in (non-optional) size limit of 1MB.

How Paging Works?

On each page, the coordinator selects a list of replicas for each partition to send read requests to. The set of replicas is selected such that it satisfies the required Consistency Level (CL). All read requests are sent concurrently. To quickly recap, the coordinator is the node that receives the query request from the client and the replicas are the nodes that have data that pertains to the subject of the query. Note that the coordinator can be a replica itself especially in cases where the client application uses a token-aware driver.

The replicas execute the read request and send the results back to the coordinator that merges them. For each partition in the list, the coordinator requests a page worth of data from the replicas. If, after merging, the merged result set has more data than the page limits, all extra data is discarded.

To be able to continue the query on the next page, the database must remember where it stopped. It does this by recording the position where the query was interrupted in an opaque (to the client) cookie called the paging state. This cookie is transmitted with every page to the client and then retransmitted by the client to the database on every page request. Since the paging state is completely opaque (just a binary blob) to the client, it can be used to store other query-related states in addition to the page end position.

At the beginning of the next page, the coordinator examines the queried partition list and drops partitions that were already read. To determine which partitions were already read, the coordinator uses the position stored in the paging-state mentioned above. It then proceeds with the query as if it were a new query, selecting the replicas, sending them the read requests, and so on. From the replicas’ perspective, each page is like a brand new query. On each page, the replicas locate all of the sources that contain data for the queried partition (memtable, row-cache, and sstables) and at the end of the page, all this work is thrown away. Note that especially in the case of sstables, this is a non-trivial amount of work that involves a fair amount of disk I/O on top of reading the actual partition data. This results in increased latency and lower throughput as the replicas are spending all of those extra CPU cycles and disk bandwidth on each page.

Making Queries Stateful

The solution is to make queries stateful. That is, don’t throw away all of the work done when initializing the query on the replicas. Instead, keep this state around until the next page and just continue where the previous one left off. This state is, in fact, an object and is called the querier. So after a page is filled, the query state (querier object) is saved in a special querier-cache. On the next page, the saved querier object is looked up and the query continues where it was left off. Note that there is a querier object created on every shard of every replica from which any of the queried partitions is read.

Although this sounds simple enough, there are quite a few details to get right in order for it to work. First off, a querier created for a certain query should not be used for another query. Different queries can have different restrictions, orderings, query times, etc. It would be nearly impossible and highly error-prone to validate all this to test whether a querier can be used for a given query. To avoid having to do this at all, each query is assigned a unique identifier at the beginning of the first page. This identifier is remembered throughout the query using the paging-state and is used as the key under which queriers are saved and looked up on replicas.

Validating the Saved State

It’s not enough to make sure that queriers are used for only the query they were created for. A lot can go wrong in a distributed database. To make sure a querier can be safely reused, a thorough validation is done on lookup to decide whether it can be reused or if it needs to be dropped and a new one should be created instead.

Each read request sent to a replica contains a read position from which it has to start reading the data. This is used to continue from where the previous page left off. On lookup, the querier’s position is matched against the request’s start position. If there is a mismatch, the querier is dropped and a new one with the correct position is created instead. Position mismatch can happen for a number of reasons — transient network partition, mismatching data, the coordinator discarding results from the previous page to avoid overfilling the page, etc.

Schema upgrades can run concurrently with ongoing reads. The coordinator will always use the current (latest) schema version for the read requests. So it is possible that a replica will receive a read request with a schema version that is more recent than that of the cached querier. To keep things simple in situations like that, the cached querier is dropped and a new one with the new schema is created instead.

Sticking to the Same Replicas

Saved queries are useful only if they are reused. To ensure this happens as much as possible, Scylla will try to stick with the same set of replicas that were used to serve the first page. This is done by saving the set of replicas that served the first page in the paging-state. Then, on each subsequent page, these replicas will be preferred over other ones.

Figure 1: Simplified sequence diagram of a paged query. Note how Node_1 is both a coordinator and a replica while serving the first page. Also, note that Node_2 sticks to reading from Node_1 even though it has the data also so that the existing cached querier can be reused.

Managing Resource Consumption

Although they are inactive, cached querier objects consume resources — primarily memory, but they can also pin sstables on disk. Additionally, Scylla has a reader concurrency control mechanism that limits the number of concurrently active sstable readers. Each reader has to obtain a permit in order to start reading. Cached readers will hold on to their permits, possibly preventing new readers from being created.

To keep resource consumption under control, several eviction strategies are implemented:

  • Time-based cache eviction: To avoid abandoned queriers sitting in the cache indefinitely, each cache entry has a time-to-live. When this expires, it is evicted from the cache. Queriers can be abandoned due to the client abandoning the query or the coordinator switching to another replica (for example, a transient network partition).
  • Memory-based cache eviction: The memory consumption of cache entries is kept under a threshold by evicting older entries when the insertion of a new one would cause it to exceed a threshold. This threshold is currently configured as 4% of the shard’s memory.
  • Permit-based cache eviction: When available permits run out, older cache entries are evicted to free up permits for creating new readers.

Diagnostics

To help observe the effectiveness of stateful queries, as well as aid in finding any problems, a number of counters are added:

  1. querier_cache_lookups counts the total number of querier cache lookups. Not all read requests will result in a querier lookup. For example, the first page of a query will not do a lookup as there was no previous page from which to reuse the querier. The second, and all subsequent pages, however, should attempt to reuse the querier from the previous page.
  2. querier_cache_misses counts the subset of (1) where the reads have missed the querier cache (failed to find a saved querier).
  3. querier_cache_drops counts the subset of (1) where a saved querier was found but it failed the validation so it had to be dropped.
  4. querier_cache_time_based_evictions counts the cached entries that were evicted due to their time-to-live expiring.
  5. querier_cache_resource_based_evictions counts the cached entries that were evicted due to a shortage of reader permits.
  6. querier_cache_memory_based_evictions counts the cached entries that were evicted due to reaching the cache’s memory limits.
  7. querier_cache_querier_population is the current number of querier entries in the cache.

Note:

  • All counters are per shard.
  • The count of cache hits can be derived from these counters as (1) – (2).
  • A cache drop (3) also implies a cache hit (see above). This means that the number of actually reused queriers is: (1) – (2) – (3)

Performance

We’ve finally arrived at the part of this blog post you’ve all been waiting for! Since the primary goal of stateful queries is to improve efficiency and throughput, we made this the focus of our benchmarking tests. To gauge the throughput gains, we populated a cluster of 3 nodes with a dataset consisting entirely of large partitions, then exercised it with read-from-disk and read-from-cache workloads to find out how it fares. The nodes were n1-standard-4 (4 vCPUs, 15GB memory) GCE nodes using local SSD disks. The dataset was composed of large partitions, each having 10k rows and each row being 10KB in size. This should exercise paging well as each partition will require about 100 pages to transfer.

Read-from-disk

Let’s first look at the read-from-disk scenario. This is the more interesting one as this is the scenario where stateless queries were hurting the most. And stateful queries don’t disappoint. After measuring the throughput on a cluster running at disk speed, the version with stateful queries performed almost 2.5X (+246%) better than the one without. This is a massive improvement, indeed.

Figure 2: OPS of stateless queries (before) and stateful queries (after).

Read-from-cache

Workloads that are served from cache didn’t suffer from stateless queries as much as those served from disk. This is because cache readers are significantly cheaper to create than disk readers. Still, we were curious and wanted to also catch any potential regressions. For the read-from-cache scenario, we measured the throughput of the cluster while driving it at network speed. To our surprise, the throughput was slightly worse for the stateful queries version. After some investigation, this turned out to be caused by an extra hop introduced by sticky replicas. For clusters whose performance is limited by network bandwidth — like our test cluster — this introduces additional network traffic and thus reduces throughput. To eliminate the disturbance introduced by the additional network hop, we employed a modified Java driver that would send each page request to the same coordinator throughout the duration of an entire query (on top of selecting a coordinator that is also a replica). This ensures that no additional hop is required to stick to the same set of replicas through the query, as opposed to the current status-quo of selecting new coordinators for each page in a round-robin manner. After eliminating the extra hop, throughput improved slightly.

Figure 3: OPS of stateless queries (before), stateful queries (after 1) and stateful queries with sticky coordinator driver fix (after 2)

Summary

With stateful queries, the effort of preparing to serve a query is not thrown away after each page. Not having to do the significant effort of preparing the query from scratch on each page greatly improves the efficiency and thus the throughput of paged queries, especially those reading large partitions. In the case of read-from-disk queries, this throughput improvement was measured to be as much 2.5X compared to the previous stateless queries.

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post More Efficient Query Paging with Scylla 2.2 appeared first on ScyllaDB.

Scylla Open Source Release 2.2

Scylla Release

The Scylla team is pleased to announce the release of Scylla 2.2, a production-ready Scylla Open Source minor release.

The Scylla 2.2 release includes significant improvements in performance and latencies, improved security with Role-Based Access Control (RBAC), improved high availability with hinted-handoff, and many others. More information on performance gains will be shared in a follow-up blog post.

Moving forward, Scylla 2.3 and beyond will contain new features and bug fixes of all types, while future Scylla 2.2.x and 2.1.y releases will only contain critical bug fixes. Scylla 2.0 and older versions will not be supported.

Related Links

New features

  • Role Based Access Control (RBAC) – compatible with Apache Cassandra 2.2. RBAC is a method of reducing lists of authorized users to a few roles assigned to multiple users. RBAC is sometimes referred to as role-based security.

For example:

CREATE ROLE agent;
GRANT CREATE ON customer.data TO agent;
GRANT DESCRIBE ON customer.data TO agent;
GRANT SELECT ON ALL KEYSPACES TO agent;
GRANT MODIFY ON customer.data TO agent;CREATE ROLE supervisor;
GRANT agent TO supervisor;

Related Documentation:

Scylla RBAC example
CQL RBAC reference

Note that you need to have Authentication enabled to use roles.

  • Hinted Handoff – Experimental. Hinted handoff is a Scylla feature that improves cluster consistency and is compatible with Apache Cassandra’s 2.1 Hinted handoff feature. When a replica node is not available for any reason, the coordinator keeps a buffer of writes (hints) to this replica. When the node becomes available again, hints are replayed to the replica. The buffer size is limited and configurable. You can enable or disable hinted handoff in the scylla.yaml file. More on Hinted Handoff.
  • GoogleCloudSnitch – Experimental. Scylla now supports GCE snitch and it is compatible with Apache Cassandra’s GoogleCloudSnitch. You can now use GoogleCloudSnitch when deploying Scylla on Google Compute Engine
    across one or more regions. As with EC2 snitches, regions are handled as Data Centers and availability zones as racks. More on GCE snitch #1619.
  • CQL: Support for timeuuid functions: now, currentTimeUUID (alias of now), minTimeuuid, and maxTimeuuid #2950.

Performance Improvements

We continue to invest in increasing Scylla’s throughput and reducing latency, and in particular, improving consistent latency.

  • Row-level cache eviction. Partitions are now evicted from in-memory cache with row granularity which improves the effectiveness of caching and reduces the impact of eviction on latency for workloads which have large partitions with many rows.
  • Improved paged single partition queries #1865. Paged single partition queries are now stateful, meaning they save their state between pages so they don’t have to redo the work of initializing the query on the beginning of each page. This results in improved latency and vastly improved throughput. This optimization is mostly relevant for workloads that hit the disk, as initializing such queries involves extra I/O. The results are 246% better throughput with lower latency when selecting by partition key with an empty cache. More here.
  • Improved row digest hash #2884. The algorithm used to calculate a row’s digest was changed from md5 to xxHash, improving throughput and latency for big cells. See the issue’s comment for a microbenchmark result. For an example of how row digest is used in a Scylla Read Repair, see here.
  • CPU Scheduler and Compaction controller for Size Tiered Compaction Strategy (STCS). With Scylla’s thread-per-core architecture, many internal workloads are multiplexed on a single thread. These internal workloads include compaction, flushing memtables, serving user reads and writes, and streaming. The CPU scheduler isolates these workloads from each other, preventing, for example, a compaction using all of the CPU and preventing normal read and write traffic from using its fair share. The CPU scheduler complements the I/O scheduler which solves the same problem for disk I/O. Together, these two are the building blocks for the compaction controller. More on using Control Theory to keep compactions Under Control.
  • Promoted index for wide partitions #2981. Queries seeking through a partition used to allocate memory for the entire promoted index of that partition. In case of really huge partitions, those allocations would also grow large and cause ‘oversized allocation’ warnings in the logs. Now, the promoted index is consumed incrementally so that the memory allocation does not grow uncontrollably.
  • Size-based sampling rate in SSTable summary files – automatically tune min_index_interval property of a table based on the partition sizes. This significantly reduces the amount of index data that needs to be read in tables with large partitions and speeds up queries. #1842

Known Issues

As noted above, Scylla 2.2 ships with a dynamic compaction controller for the Size-Tiered Compaction Strategy. Other compaction strategies have a static controller in this release.

Scylla 2.1 had a static controller for all compaction strategies (disabled by default on most configurations; enabled by default on AWS i3 instances), but the 2.2 static controller may allocate more resources than the 2.1 static controller. This can result in reduced throughput for users of Leveled Compaction Strategy and Time Window Compaction Strategy.

If you are impacted by this change, you may set the compaction_static_shares configuration variable to reduce compaction throughput. Contact the mailing list for guidance.

Scylla 2.3 will ship with dynamic controllers for all compaction strategies.

Metrics Updates

The Scylla Grafana Monitoring project now includes a Scylla 2.2 dashboard. See here for all metrics changed in 2.2

Next Steps

  • Scylla Summit 2018 is around the corner. Register now!
  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Scylla Open Source Release 2.2 appeared first on ScyllaDB.

Scylla Open Source Release 2.1.6

release

The Scylla team is pleased to announce the release of Scylla 2.1.6, a bugfix release of the Scylla 2.1 stable branch. Release 2.1.6, like all past and future 2.x.y releases, is backward compatible and supports rolling upgrades.

Scylla 2.1.6 fixes one critical issue with the caching of data for DateTiered or TimeWindow compaction strategies. If you are using either of these strategies, we recommend upgrading ASAP. The critical issue is: In some cases when using DateTiered or TimeWindow compaction strategies, a partition can start to appear empty until Scylla is restarted. The problem is in the cache logic, while the data itself is safely persistent on disk. #3552

Related Links

Get Scylla 2.1.6 – Docker, binary packages, and EC2 AMI
Get started with Scylla 2.1
Upgrade from 2.1.x to 2.1.y
Please let us know if you encounter any problems.

Additional Issue Resolved in This Release

  • Reads which are using clustering key restrictions (including paging reads) may be missing some of the deletions and static row writes for DateTiered and TimeWindow compaction strategies. Similar to #3552, data on the disk is not affected. #3553

Next Steps

  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Scylla Open Source Release 2.1.6 appeared first on ScyllaDB.

New Benchmark: Scylla 2.2 (i3.Metal x 4-nodes) vs Cassandra 3.11 (i3.4xlarge x 40-nodes)

 

Benchmarking is no easy task, especially when comparing databases with different “engines” under the hood. You want your benchmark to be fair, to run each database on its optimal setup and hardware, and to keep the comparison as apples-to-apples as possible. (For more on this topic, see our webinar on the “Do’s and Don’ts of Benchmarking Databases.”) We kept this in mind when conducting this Scylla versus Cassandra benchmark, which compares Scylla and Cassandra on AWS EC2, using cassandra-stress as the load generator.

Most benchmarks compare different software stacks on the same hardware and try to max out the throughput. However, that is often the wrong approach. A better way is to think about the comparison from the user’s perspective. That is, start by considering the volume, throughput, and latency a database user needs in order to meet his or her business needs. The goal here is to gauge the resources each database needs to meet established requirements and to see how Scylla and Cassandra handle the challenge.

Scylla and its underlying Seastar infrastructure fully leverage powerful modern hardware. With Scylla’s linear scale-up capabilities, the more cores and RAM the better, so we chose AWS EC2’s strongest instance available, i3.metal, for our Scylla cluster. Cassandra, on the other hand, is limited in its ability to scale up due to its reliance on the JVM. Its threads and locks are too heavy and slow down as the number of cores grows, and with no NUMA awareness, performance will suffer. As a result, using the i3.metal instance would not yield optimal results for Cassandra (read more about Cassandra’s sweet spot here). To make a fair comparison with Cassandra, we created a 40-node Cassandra cluster on i3.4xlarge instances. This recreates a use case we saw for of one of our Fortune-50 accounts.

We defined a workload of 38.85 billion partitions, replicated 3 times, 50:50 read/write ratio, a latency of up to 10 msec for the 99th percentile, and throughput requirements of 300k, 200k, and 100k IOPS.

We used our load generator, cassandra-stress, to populate each database with approximately 11TB (38.85B partitions), using the default cassandra-stress schema and RF=3 (see Appendix-A for the schema and cassandra-stress commands used in this benchmark). Once population and compaction completed, we restarted the Scylla /Cassandra service on all nodes, making sure the actual tests started with a cold cache and after all compactions were finished.

We tested the latency for various throughputs to have a more comprehensive comparison. We used a Gaussian distribution (38.85B partitions, Median: 19.425B and Standard Deviation: 6.475B) in all latency tests to achieve as much disk access as possible instead of reading from RAM. Each of the tests ran for 90 minutes to make sure we were in a steady state.

Conclusions and Summary

  • Compared to the 40-node Cassandra cluster, the Scylla 4-node cluster provided:
    • 10X reduction in administration overhead. This means ALL operations such as upgrade, logging, monitoring and so forth will take a fraction of the effort.
    • 2.5X reduction in AWS EC2 costs. In fact, had we been more strict with Cassandra, we would have increased its cluster size to meet the required latency.
  • Scylla was able to meet the SLA of 99% latency < 10ms in all tested workloads (with the one exception of 12ms in the read workload under 300K OPS).
  • Cassandra was able to meet the SLA of 99% latency < 10ms only for the 100K OPS workload.
  • Scylla demonstrated superior 99.9% latency in ALL cases, in some cases showing an improvement of up to 45X.
  • Scylla also demonstrated better 99% latency in all cases but one. In the one case where Cassandra was better (write @100K OPS) both clusters demonstrated very low single-digit latency.
  • Cassandra demonstrated better 95% latency in most cases. This is an area for improvement for Scylla. Read more about this in the monitoring section below.

 

Comparison of AWS Server Costs

Scylla 2.2 Cassandra 3.11
Year term Estimated cost: ~$112K

Year term Estimated cost: ~$278.6K

 

Setup and Configuration

Scylla Cluster Cassandra Cluster
EC2 Instance type i3.Metal (72 vCPU | 512 GiB RAM) i3.4xlarge (16 vCPU | 122 GiB RAM)
Storage (ephemeral disks) 8 NVMe drives, each 1900GB 2 NVMe drives, each 1900GB
Network 25Gbps Up to 10Gbps
Cluster size 4-node cluster on single DC 40-node cluster on single DC
Total CPU and RAM CPU count: 288 | RAM size: 2TB CPU count: 640 | RAM size: ~4.76TB
DB SW version Scylla 2.2 Cassandra 3.11.2
(OpenJDK build 1.8.0_171-b10)
Scylla Loaders Cassandra Loaders
Population 4 x m4.2xlarge (8 vCPU | 32 GiB RAM)
8 c-s clients, 2 per instance
16 x m4.2xlarge (8 vCPU | 32 GiB RAM)
16 c-s clients, 1 per instance
Latency tests 7 x i3.8xlarge (up to 10Gb network)
14 c-s clients, 2 per instance
8 x i3.8xlarge (up to 10Gb network)
16 c-s clients, 2 per instance

Cassandra Optimizations

It’s no secret that Cassandra’s out-of-the-box performance leaves much to be desired. Cassandra requires quite a bit of tuning to get good results. Based on recommendations from Datastax and Amy Tobey’s guide to Cassandra tuning, we applied the following optimizations to the Cassandra cluster.

Originally we applied the changes listed below to only the cassandra.yaml and the jvm options files — that yielded poor performance results. Despite multiple attempts using various amounts of cassandra-stress clients and threads per client, we could not get more than 30K operations per second throughput. After applying the IO tuning setting, Cassandra started performing much better.

cassandra.yaml buffer_pool_use_heap_if_exhausted: true
disk_optimization_strategy: ssd
row_cache_size_in_mb: 10240
concurrent_compactors: 16
compaction_throughput_mb_per_sec: 960
jvm.options -Xms48G
-Xmx48G
-XX:+UseG1GC
-XX:G1RSetUpdatingPauseTimePercent=5
-XX:MaxGCPauseMillis=500
-XX:InitiatingHeapOccupancyPercent=70
-XX:ParallelGCThreads=16
-XX:PrintFLSStatistics=1
-Xloggc:/var/log/cassandra/gc.log
#-XX:+CMSClassUnloadingEnabled
#-XX:+UseParNewGC
#-XX:+UseConcMarkSweepGC
#-XX:+CMSParallelRemarkEnabled
#-XX:SurvivorRatio=8
#-XX:MaxTenuringThreshold=1
#-XX:CMSInitiatingOccupancyFraction=75
#-XX:+UseCMSInitiatingOccupancyOnly
#-XX:CMSWaitDuration=10000
#-XX:+CMSParallelInitialMarkEnabled
#-XX:+CMSEdenChunksRecordAlways
IO tuning echo 1 > /sys/block/md0/queue/nomerges
echo 8 > /sys/block/md0/queue/read_ahead_kb
echo deadline > /sys/block/md0/queue/scheduler

Scylla Optimizations

There is no need to optimize the Scylla configuration. Scylla automatically configures the kernel, the OS, and itself to dynamically adjust to the best setup.

Dataset Used and Disk Space Utilization

Because we wanted the latency tests to be based primarily on disk access, we populated each database with a large dataset of ~11TB consisting of 38.85B partitions using the default cassandra-stress schema, where each partition size was ~310 bytes. A replication factor of 3 was used. Each Cassandra node holds a data set that is 5.5 times bigger than its RAM, whereas each Scylla node holds 16.25-times the data size of RAM. Note that the Cassandra 3.x file format consumes less disk space. The September/October release of Scylla will include full compatibility with Cassandra’s 3.x format, bringing further improvements to volume and performance.

Scylla Cassandra
Total used storage ~32.5 TB ~27 TB
nodetool status server load (Avg.) ~8.12 TB / node ~690.9 GB / node
/dev/md0 (Avg.) ~8.18 TB / node ~692 GB / node
Data size / RAM ratio ~16.25 : 1 ~5½ : 1

Performance Results (Graphs)

 

Performance Results (Data)

The following table summarizes the results for each of the latency tests conducted.

 

Scylla Monitoring Screenshots

Scylla version 2.2 introduces several new capabilities, including the Compaction Controller for the Size Tiered Compaction Strategy (STCS). This new controller provides Scylla with an understanding of just how much CPU shares it can allocate for compactions. In all the tested workloads (300K Ops, 200K Ops, and 100K Ops) the incoming traffic load on the CPU-reactor was on average 70%, 50%, and 25% respectively. The compaction controller understands if there are enough unused/free CPU shares to be allocated for compactions. This enables Scylla to complete the compactions in a fast and aggressive manner while ensuring that the foreground load is maintained and the throughput is unaffected. The spikes you see in the CPU-reactor graph in each of the workloads correspond exactly to compaction jobs execution, as can be seen, the in the compaction graph.

When the workload is bigger (300K OPS), SSTables are created faster and more frequent compactions are needed, which is why we see more frequent CPU-reactor spikes to 100%. When the workload is smaller (100K OPS), SSTables are created more slowly and compactions are needed less frequently, resulting in very few CPU-reactor spikes during that run.

Latency test (300K Ops): Mixed 50% WR/RD workload (CL=Q)


 

Latency test (200K OPS): Mixed 50% WR/RD workload (CL=Q)


 

Latency test (100K OPS): Mixed 50% WR/RD workload (CL=Q)


 

Future Work

Scylla’s compaction controller code is new, as is the CPU scheduler. Looking at graphs we see that it’s possible to smooth compaction automatically and reduce the latency to 1/3 the size and thus push more throughput while still meeting the SLA of this use case (10ms for 99%).

Appendix-A

Scylla Schema (RF=3)

Cassandra  Schema (RF=3)

C-S commands – Scylla

  • Population (~11TB | 38.85B partitions | CL=ONE) x 8 clients
    nohup cassandra-stress write no-warmup n=4856250000 cl=one -mode native cql3 -node [IPs] -rate threads=200 -log file=[log_file] -pop seq=1..4856250000 &

 

  • Latency tests: Mixed 50% WR/RD workload (CL=Q) x 14 clients
    7 clients X nohup taskset -c 1-15 cassandra-stress mixed ratio\(write=1,read=1\) no-warmup duration=90m cl=quorum -pop dist=gaussian\(1..38850000000,19425000000,6475000000\) -mode native cql3 -node [IPs] -log file=[log_file] -rate threads=200 limit=7142/s | 14285/s | 21650/s & (300K | 200K | 100K Ops)

    7 clients X 
    nohup taskset -c 17-31 cassandra-stress mixed ratio\(write=1,read=1\) no-warmup duration=90m cl=quorum -pop dist=gaussian\(1..38850000000,19425000000,6475000000\) -mode native cql3 -node [IPs] -log file=[log_file] -rate threads=200 limit=21650/s | 14285/s | 7142/s & (300K | 200K | 100K Ops)

C-S commands – Cassandra

  • Population (~11TB | 38.85B partitions | CL=ONE) x 16 clients
    nohup cassandra-stress write n=2428125000 cl=one -mode native cql3 -node [IPs] -rate threads=200 -log file=[log file] -pop seq=0..2428125000 &

 

  • Latency test: Mixed 50% WR/RD workload (CL=Q) x 16 clients
    nohup cassandra-stress mixed ratio\(write=1,read=1\) no-warmup duration=90m cl=quorum -pop dist=gaussian\(1..38850000000,19425000000,6475000000\) -mode native cql3 -node [IPs] -log file=[log_file] -rate threads=200 limit=19000/s | 12500/s | 6250/s & (300K | 200K | 100K Ops)

The post New Benchmark: Scylla 2.2 (i3.Metal x 4-nodes) vs Cassandra 3.11 (i3.4xlarge x 40-nodes) appeared first on ScyllaDB.

Scylla Enterprise Release 2018.1.4

scylla release

The Scylla team is pleased to announce the release of Scylla Enterprise 2018.1.4, a production-ready Scylla Enterprise minor release. Scylla Enterprise 2018.1.4 is a bug fix release for the 2018.1 branch, the latest stable branch of Scylla Enterprise.

More about Scylla Enterprise here.

Scylla Enterprise 2018.1.4 fixes one critical issue on the caching of data for DateTiered or TimeWindow compaction strategies. If you are using either of these strategies, we recommend upgrading ASAP. The critical issue is: In some cases when using DateTiered or TimeWindow compaction strategies, a partition can start to appear empty until Scylla is restarted. The problem is in the cache logic, while the data itself is safely persistent on disk. #3552

Related Links

Scylla Enterprise customers are encouraged to upgrade to Scylla Enterprise 2018.1.4 in coordination with the Scylla support team.

Additional Issues Solved in This Release

  • Reads which are using clustering key restrictions (including paging reads) may be missing some of the deletions and static row writes for DateTiered and TimeWindow compaction strategies. Similar to #3552, data on the disk is not affected. #3553

Next Steps

  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Scylla Enterprise Release 2018.1.4 appeared first on ScyllaDB.

Undetectable tombstones in Apache Cassandra

One of the usual suspects for performance issues in the read path of Apache Cassandra is the presence of tombstones. We are used to check how many tombstones are accessed per read early in the process, to identify the possible cause of excessive GC pauses or high read latencies.
While trying to understand unexpected high read latencies for a customer a few months ago, we found out that one special (although fairly common) kind of tombstone was not counted in the metrics nor traced in the logs : primary key deletes.

A tale of two tombstones

Tombstones in Cassandra divide in two main categories:

  • cell tombstone
  • range tombstone

The former is a tombstone that shadows a single cell. This kind of tombstone is usually generated by TTLs, where once a cell expires it turns into a tombstone. It can also be the result of updating a cell to a null value or of deleting a single cell.

The latter is a tombstone that can shadow multiple cells. This kind of tombstone is generated by DELETE statements that can have different spans:

  • Partition deletes: a single tombstone will shadow the whole partition
  • Multiple rows deletes: a single tombstone will shadow a subset of the partition
  • Single row deletes: a single tombstone will shadow a single row only

To be fully accurate, range tombstones are always composed of two tombstones, one for each bound of the shadowed range.

Considering the following table:

CREATE TABLE test.tombstones (
    id int,
    clust1 text,
    clust2 text,
    val1 text,
    val2 text,
    PRIMARY KEY (id, clust1, clust2)
) WITH CLUSTERING ORDER BY (clust1 ASC, clust2 ASC)

Placing a partition level range tombstone would be achieved using the following query:

DELETE FROM test.tombstones WHERE id = ?

Placing a multi-row range tombstone would require the following query:

DELETE FROM test.tombstones WHERE id = ? AND clust1 = ?

Since Apache Cassandra 3.0.0 it is also possible to perform the following kind of range delete thanks to CASSANDRA-6237:

DELETE FROM test.tombstones WHERE id = ? AND clust1 > ? AND clust1 <= ?

Creating a single row range tombstone would be done as follows:

DELETE FROM test.tombstones WHERE id = ? AND clust1 = ? AND clust2 = ?

And we can create cell tombstones by either deleting/nulling a single column (or setting a TTL at write time):

DELETE val1 from test.tombstones WHERE id = ? AND clust1 = ? AND clust2 = ? 

or:

UPDATE test.tombstones SET val1 = null WHERE id = ? AND clust1 = ? AND clust2 = ? 

Detecting the bug

Our customer had some pretty powerful hardware with top notch SSD drives and more than enough CPU and RAM to get the best out of Apache Cassandra 3.11.1.

Most tables were exhibiting correct read perfomance but one, and the cluster was going through a lot of long GC pauses (> 1s):

Keyspace     Table             p50           p75           p95
ks1          table1       24985.15      62174.25     670692.26
ks1          table2         772.01       1965.33       9462.06
ks1          table3          39.03         56.19       2807.62

All 3 tables had the same data model and nodetool tablestats output showed no tombstones per read. Since what we were seeing made no sense, we suspected that one of the metrics we were using was not accurate.

Although we weren’t seeing any tombstone read according to the logs and the metrics, the customer confirmed that DELETE statements were performed regularly on the full primary key, so we experimented to find out why Cassandra behaved this way.

Reproducing the problem was fairly easy and we used CCM to test different versions of C*, using the test.tombstones table defined above.

We wrote a short Python script to populate the table with a configurable number of rows, on a single partition spreading over 10 clustering keys:

futures = []
for i in range(int(sys.argv[1])):
    futures.append(session.execute_async("""
        insert into test.tombstones(id, clust1, clust2, val1, val2) 
        values(1,'""" + str(i%10) + """','""" + str(i) + """','test','test')"""))
    
    if i%1000==0 and i>0:
        for future in futures:
            rows = future.result()
        print str(i+1) + " rows..."
        futures = []

Then we made another script to delete a subset of the rows by primary key (single row deletes):

futures = []
for i in range(int(sys.argv[1])):
    futures.append(session.execute_async("""
        DELETE FROM test.tombstones 
        WHERE id=1 
        AND clust1 = '""" + str(str(i%10)) + """' 
        AND clust2='""" + str(i) + """'"""))
    
    if i%1000==0 and i>0:
        for future in futures:
            rows = future.result()
        print str(i+1) + " rows..."
        futures = []

The procedure was to run the first script to add 10,000 rows, flush all memtables and then run the deletion script to remove 1,000 rows (so that tombstones are stored in different memtables/SSTables than the shadowed data).

Reading the table in Apache Cassandra 2.0.13 gave the following traces:

cqlsh> tracing on
Now tracing requests.

cqlsh> select count(*) from test.tombstones where id = 1;

 count
-------
  9000

(1 rows)


Tracing session: 9849e2b0-7f8f-11e8-9faa-4f54a5a22ee7

 activity                                                                  | timestamp    | source    | source_elapsed
---------------------------------------------------------------------------+--------------+-----------+----------------
                                                        execute_cql3_query | 15:39:37,762 | 127.0.0.1 |              0
    Parsing select count(*) from test.tombstones where id = 1 LIMIT 10000; | 15:39:37,762 | 127.0.0.1 |             66
                                                       Preparing statement | 15:39:37,762 | 127.0.0.1 |            162
                            Executing single-partition query on tombstones | 15:39:37,763 | 127.0.0.1 |           1013
                                              Acquiring sstable references | 15:39:37,763 | 127.0.0.1 |           1035
                                               Merging memtable tombstones | 15:39:37,763 | 127.0.0.1 |           1080
                       Partition index with 16 entries found for sstable 3 | 15:39:37,763 | 127.0.0.1 |           1786
                               Seeking to partition beginning in data file | 15:39:37,763 | 127.0.0.1 |           1803
 Skipped 0/1 non-slice-intersecting sstables, included 0 due to tombstones | 15:39:37,764 | 127.0.0.1 |           2308
                                Merging data from memtables and 1 sstables | 15:39:37,764 | 127.0.0.1 |           2331
                                  Read 9000 live and 3000 tombstoned cells | 15:39:37,796 | 127.0.0.1 |          34053
                                                          Request complete | 15:39:37,831 | 127.0.0.1 |          69508                                                        

Although we deleted 1000 rows, Apache Cassandra 2.0 reports 3000 tombstones were read: Read 9000 live and 3000 tombstoned cells.
Dumping one of the SSTables to JSON, we see that each row is composed of 3 cells:

[
        {"key": "00000001","columns": [
        ...
                ["0:100:","",1530711392080000], ["0:100:val1","test",1530711392080000], ["0:100:val2","test",1530711392080000],
                ["0:1000:","",1530711392233000], ["0:1000:val1","test",1530711392233000], ["0:1000:val2","test",1530711392233000],
        ...
        }
]

So C* 2.0 does not count the tombstones but rather the individual cells that are shadowed by the tombstones (at least in this specific case).

Now with Apache Cassandra 2.1.18 to 3.11.1:

cqlsh> paging off
Disabled Query paging.
cqlsh> tracing on
Now Tracing is enabled
cqlsh> select count(*) from test.tombstones where id = 1;

 count
-------
  9000

(1 rows)

Tracing session: e39af870-7e14-11e8-a105-4f54a5a22ee7

 activity                                                                                        | timestamp                  | source    | source_elapsed
-------------------------------------------------------------------------------------------------+----------------------------+-----------+----------------
                                                                              Execute CQL3 query | 2018-07-02 18:28:01.783000 | 127.0.0.1 |              0
                Parsing select count(*) from test.tombstones where id = 1; [SharedPool-Worker-1] | 2018-07-02 18:28:01.784000 | 127.0.0.1 |             53
...
...
                                      Read 9000 live and 0 tombstone cells [SharedPool-Worker-2] | 2018-07-02 18:28:01.810000 | 127.0.0.1 |          27004
                                                                                Request complete | 2018-07-02 18:28:01.825313 | 127.0.0.1 |          42313

Since 2.1.x all the way to 3.11.1 (with 2.2.x and 3.0.x having the same behavior), there are no more tombstones reported : Read 9000 live and 0 tombstone cells.

The issue was not affecting the other kinds of range tombstones (partition and multiple rows) and cell tombstones were correctly counted as well.

It was then necessary to go down the read path to understand why some tombstones could be missed.

Merging mechanisms in Apache Cassandra

Being built on top of an LSM tree storage engine, C* has to merge cells that can spread in memtables and several SSTables in order to return back a consistent view of the queried data.
While the local node has to merge both cells and tombstones together to get the current state of the rows, it also has to exchange tombstones with other nodes in case of digest mismatch, to achieve read repairs.
The output of the merge will be:

  • a list of rows, containing a list of cells which can each be live or not (tombstones are cells that aren’t live anymore)
  • a list of tombstones which contains only range tombstones.

Different kinds of tombstones get merged differently with the data they shadow though:

  • Partition and multi row tombstones: data doesn’t survive the merge and the range tombstones are returned as such so that they can be used for digest comparison and read repair.
  • Cell tombstones: they are returned as cells with liveness set to false.
  • Single row tombstones: they are merged upstream and aren’t returned as tombstones. The row will be returned with no cell and liveness set to false.

After a redesign of the read path in 2.1 that aimed at optimizing performance by taking out unnecessary tombstoned cells during the merge phase, single row tombstones stopped being counted and became impossible to detect. Only range tombstones that survived the merge and individually tombstoned cells were counted, but a row with no cell would be skipped silently.

Patched for 3.11.2 and 4.0

CASSANDRA-8527 was committed for 3.11.2 and the upcoming 4.0 in order to count all empty, (non live) rows as tombstones:

cqlsh> SELECT count(*) from test.tombstones where id = 1;

 count
-------
  9000

(1 rows)

Tracing session: c0c19e20-7f91-11e8-90d3-b70479b8c91e

 activity                                                                                 | timestamp                  | source    | source_elapsed | client
------------------------------------------------------------------------------------------+----------------------------+-----------+----------------+-----------
                                                                       Execute CQL3 query | 2018-07-04 15:54:21.570000 | 127.0.0.1 |              0 | 127.0.0.1
 Parsing SELECT count(*) from test.tombstones where id = 1; [Native-Transport-Requests-1] | 2018-07-04 15:54:21.570000 | 127.0.0.1 |            173 | 127.0.0.1
                                        Preparing statement [Native-Transport-Requests-1] | 2018-07-04 15:54:21.570000 | 127.0.0.1 |            315 | 127.0.0.1
                             Executing single-partition query on tombstones [ReadStage-2] | 2018-07-04 15:54:21.571000 | 127.0.0.1 |            947 | 127.0.0.1
                                               Acquiring sstable references [ReadStage-2] | 2018-07-04 15:54:21.571000 | 127.0.0.1 |           1061 | 127.0.0.1
  Skipped 0/1 non-slice-intersecting sstables, included 0 due to tombstones [ReadStage-2] | 2018-07-04 15:54:21.571000 | 127.0.0.1 |           1271 | 127.0.0.1
                                                Key cache hit for sstable 1 [ReadStage-2] | 2018-07-04 15:54:21.572000 | 127.0.0.1 |           1698 | 127.0.0.1
                                  Merged data from memtables and 1 sstables [ReadStage-2] | 2018-07-04 15:54:21.613000 | 127.0.0.1 |          42574 | 127.0.0.1
                               Read 9000 live rows and 1000 tombstone cells [ReadStage-2] | 2018-07-04 15:54:21.613000 | 127.0.0.1 |          42746 | 127.0.0.1
                                                                         Request complete | 2018-07-04 15:54:21.620624 | 127.0.0.1 |          50624 | 127.0.0.1

This allows us to safely rely on traces and metrics when troubleshooting high read latencies in Apache Cassandra 3.11.2+.

Takeways

Tombstones generated by full primary key deletes will not be reported in the logs nor counted in the metrics between Apache Cassandra 2.1.0 up to 3.11.1.

You can only guess about their presence if you’re observing unexpected latencies that cannot be explained by anything else.
As a general rule, it is advised to ask the dev teams if they are performing DELETE statements and which type precisely, even if the metrics suggest that there are no tombstones.

Upgrading to 3.11.2+ allows you to detect those tombstones both in logs and metrics. As a consequence, they will now be counted in the failure threshold above which C* will cancel in flight queries, while they were succeeding before the upgrade.

How to migrate data from Cassandra to Elassandra in Docker containers

A client recently asked us to migrate a Cassandra cluster running in Docker containers to Elassandra, with the data directory persisted via a bind mount. Elassandra is a fork of Cassandra integrated closely with Elasticsearch, to allow for a highly scalable search infrastructure.

To prepare the maintenance plan, we tested some of the methods as shown below.

The following are the commands used if you would like to test the process locally. Docker commands are used on one node at a time throughout the process to execute test statements. The Cassandra container is named my_cassandra_container, and the test Elassandra container is called my_elassandra_container. Replace the local directory /Users/youruser below as appropriate.

Start the Cassandra Container

First, start a container with the latest Cassandra version (3.11.2), binding the data volume locally as datadir.

In our use case, variables such as data center were pre-determined, but note that Cassandra and Elassandra have different default values in the container startup scripts for some of the variables. In the example below, data center, rack, snitch, and token number will be sent explicitly via environment variables flags (-e), but you can alternatively adjust these in the configuration files before starting Elassandra.

It will take about 15 seconds for this to start up before Cassandra is ready to accept the write statement following this. If you’re following the logs, look for “Created default superuser role ‘cassandra'” before proceeding.

docker run --name my_cassandra_container -e CASSANDRA_DC=DC1 -e CASSANDRA_RACK=RAC1 -e CASSANDRA_ENDPOINT_SNITCH=SimpleSnitch  -e CASSANDRA_NUM_TOKENS=8 -v /Users/youruser/mytest/datadir:/var/lib/cassandra -d cassandra:latest

Copy Configuration Files

Copy the Cassandra configuration files to a local location for ease of editing.

docker cp my_cassandra_container:/etc/cassandra/ /Users/youruser/mytest/cassandra

Create and Validate Test Data

Next, create some data in Cassandra using cassandra-stress as a data generator.

docker exec -it my_cassandra_container cassandra-stress write n=20000 -pop seq=1..20000 -rate threads=4

For comparison later, do a simple validation of the data by executing count and sample queries.

docker exec -it my_cassandra_container cqlsh -e "select count(*) from keyspace1.standard1"
docker exec -it my_cassandra_container cqlsh -e "select * from keyspace1.standard1 limit 1"

Stop and Remove the Cassandra Container

To prepare for the migration, stop Cassandra and remove the container.

docker exec -it my_cassandra_container nodetool flush
docker stop my_cassandra_container
docker rm my_cassandra_container

Install Elassandra Container

On a new container, install the latest Elassandra version using the same local data and configuration file paths as above. Again, it will take 15 seconds or so before the next statement can be run. If you are following the logs, look for “Elassandra started.”

docker run --name my_elassandra_container -e CASSANDRA_DC=DC1 -e CASSANDRA_RACK=RAC1 -e CASSANDRA_ENDPOINT_SNITCH=SimpleSnitch  -e CASSANDRA_NUM_TOKENS=8 -v /Users/youruser/mytest/datadir:/var/lib/cassandra -d strapdata/elassandra:latest

Validate Data

Now that Elassandra is running, re-validate the data. Note that at this point, only the fork of Cassandra is running, not integrated yet with Elasticsearch.

docker exec -it my_elassandra_container cqlsh -e "select count(*) from keyspace1.standard1"
docker exec -it my_elassandra_container cqlsh -e "select * from keyspace1.standard1 limit 1"

Repeat the above steps on remaining nodes.

Enable Elasticsearch

To enable the Elasticsearch part of Elassandra, stop Cassandra on all nodes. A rolling update does not work for this step. Enable Elasticsearch by updating the elasticsearch.yml configuration file as below. (Note that you have linked it to your local filesystem via the cp statement, so edit it directly on your local machine.)

docker stop my_elassandra_container
docker cp my_elassandra_container:/opt/elassandra-6.2.3.1/conf/elasticsearch.yml /Users/youruser/mytest/cassandra

vi /Users/youruser/mytest/cassandra/elasticsearch.yml

cluster.name: Test Cluster  ## Name of cluster
network.host: 172.17.0.2  ## Listen address
http.port: 9200

Restart and Validate Elassandra

Finally, restart and test the Elassandra container.

docker start my_elassandra_container

docker exec -it my_elassandra_container curl -X GET http://localhost:9200/

Sample output:

Elassandra GET Output

Elassandra GET Output

Thank you to Valerie Parham-Thompson for assistance in testing.

Case Study: Grab Hails Scylla for Performance and Ease of Use

case study

Grab is one of the most frequently used mobile platforms in Southeast Asia, providing the everyday services that matter most to consumers. Its customers commute, eat, arrange shopping deliveries, and pay with one e-wallet. Grab believes that every Southeast Asian should benefit from the digital economy, and the company provides access to safe and affordable transport, food and package delivery, mobile payments and financial services. Grab currently offers services in Singapore, Indonesia, the Philippines, Malaysia, Thailand, Vietnam, Myanmar and Cambodia.

When handling operations for more than 6 million on-demand rides per day, there’s a lot that must happen in near-real time. Any latency issues could result in millions of dollars in losses.

Performance Challenges

Like many other on-demand transportation companies, Grab relies on Apache Kafka, the data streaming technology underlying all of Grab’s systems. The engineering teams within Grab aggregate these multiple Kafka streams – or a subset of streams – to meet various business use cases. Doing so calls for reading the streams, using a powerful, low-latency metadata store to perform aggregations, and then writing the aggregated data into another Kafka stream.

The Grab development team initially used Redis as its aggregation store, only to find that it couldn’t handle the load. “We started to notice lots of CPU spikes,” explained Aravind Srinivasan, Software Engineer at Grab. “So we kept scaling it vertically, kept adding more processing power, but eventually we said it’s time to look at another technology and that’s when we started looking at Scylla.”

Easier-to-Use and Less Expensive than Apache Cassandra and Other Solutions

In deciding on a NoSQL database, Grab evaluated Scylla, Apache Cassandra, and other solutions. They performed extensive tests with a focus on read and write performance and fault tolerance. Their test environment was a 3-node cluster that used basic AWS EC2 machines.

“Most of our use cases are write heavy,” said Srinivasan. “So we launched different writer groups to write to the Scylla cluster with 1,000,000 records and looked at the overall TPS and how many errors occurred. Scylla performed extremely well. Read performance was one of the major bottlenecks we had when using Redis, so we wanted to test this thoroughly. We launched multiple readers from the Scylla cluster and evaluated the overall throughput and how long it took to scan the entire table. We’d populate the table with 1,000,000 rows and then figure out how long the entire table scan took.”

“For fault-tolerance, we had a 5-node cluster and we’d bring down a node at the same time we were adding another node and doing other things to the cluster to see how it behaves. Scylla was able to handle everything we threw at it. On the operational side, we tested adding a new node to an existing cluster and that was good as well.”

“Running the same workload on other solutions would have cost us more than three times as much as Scylla.”
– Aravind Srinivasan, Software Engineer, Grab

Growing Use of Scylla at Grab

Scylla came out on top of extensive performance tests and is now in production at Grab. “Scylla is working really well as our aggregation metadata store,” says an enthusiastic Srinivasan. “It’s handling our peak load of 40K operations per second. It’s write-heavy right now but the latency numbers on both reads and writes are very, very impressive.”

The Grab team points to a few things that they especially like about Scylla:

  • Performance: “Scylla is on par with Redis, which is in-memory. We are seeing write performances that are extremely good.”
  • Cost: “We are running one of our heaviest streams on Scylla and we’re doing it with just a 5-node cluster using AWS i3.4xlarge instances. And that is very, very good for us in terms of resource efficiency. Running the same workload on other solutions would have cost more than three times as much.”
  • Easier than Cassandra: “The administrative burden with Cassandra was too great. There were too many knobs I needed to tweak to get it performing properly. Adding and removing nodes was very heavy in Cassandra. With Scylla, everything has been easy – it works just like it’s supposed to.”
  • No Hot Partitions: “This was one of the major issues with other solutions. We used to get hot partition/shard issues with other approaches which would take a long time to sort out. With Scylla, there are no hot partitions at all. It’s almost unbelievable when you look at the metrics because all the nodes are getting exactly the same amount of traffic.”
  • Support: “Scylla’s support team has truly impressive response times. It shows their commitment to their users and to making ScyllaDB successful.”

Grab is now looking to extend its use of Scylla. Other teams at Grab are hearing about the success of using Scylla as an aggregation store and are looking to migrate additional use cases to Scylla, such as statistics tracking, as a time series database, and more.

Next Steps

  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Case Study: Grab Hails Scylla for Performance and Ease of Use appeared first on ScyllaDB.

Cassandra’s Journey — Via the Five Stages of Grief

New technologies usually need to fight their way into the hearts of the people who will end up using them. This fight is often long and hard, and Apache Cassandra didn’t have it any easier than any of the other technological developments of our time.

In 2008 the database world was a wild place. Large data infrastructures were testing the limits of relational databases, and companies like Google and Amazon were about to run out of options on how to handle their massive data volumes.

At the time I was working at an education company called Hobsons, and was one of those infrastructure engineers trying to get more scale out of my tired old databases. Cassandra caught my eye as something with a great foundation in computer science that also solved many of the issues I was having.

But not everyone was as convinced as I was.

If you’re not familiar with the  Kübler-Ross model of grieving, also known as The Five Stages of Grief, it describes a way most people end up dealing with loss and change. Looking back, I realize now that the en-masse giving up of relational databases to switch to something more appropriate for the new world of big data—Cassandra— very much followed this same model.

Here’s how it happened from my POV in the trenches of data infrastructure.

Stage 1: DenialThe individual believe the prognosis is somehow mistaken and clings to a false, preferable reality.

In 2008, Apache Cassandra was the closing curtain on a 30-year era of database technology, so denial was an easy and obvious response to it in the early years. Of course, many of the new databases being released weren’t exactly of the highest quality. Coming from a database with years and years of production vetting, it was easy to throw some shade at the newcomers.

Cassandra was in that camp.

But it could do things relational databases couldn’t, like stay online when physical nodes fail or scale online by just adding more servers. Administrators called it a toy and developers called it a fad — just some kids trying to be cool. Cassandra kept growing, though — and solving real problems. The replication story was unmatched and was catching a lot of attention. There were ways to replicate a relational database, but it was hard and didn’t work well. Data integrity required one primary database with all others being secondary or read-only, and failure modes contributed to a lot of offline pages displayed on web sites. But generally speaking people only want to make the effort to fix things when they absolutely have to, and for now, relational databases weren’t really broken.

Stage 2: AngerThe individual recognizes that denial cannot continue and becomes frustrated.

Slowly but surely people started to move notable use cases with real production workloads over to Cassandra. There were happy users talking about incredible stories of scale and resiliency! The company names attached to these stories became less cutting edge and more mainstream and it was becoming clear to many that this wasn’t just a fad. It was starting to make a real impact and could be coming to a project meeting soon.

I remember one of my first consulting gigs at a big-name company. I was working with the development team on some data models and in the back of the room was a group of engineers, arms crossed, not looking happy. When I talked to them, they made it quite clear that this change was not welcome, and that “This is going to ruin the company.” They were the Oracle database administrators and they saw this at best as a bad idea and at worst as a threat to their livelihood. In the ensuing months I experienced similar tense moments with other groups of engineers.

Stage 3: BargainingThe individual tries to postpone the inevitable and searches for an alternate route.

Despite roadblocks and delay tactics, the needs of businesses everywhere dictated a move to high-scaling technologies like Apache Cassandra. It was solving real problems in a way no other database could and no matter how much “tuning” you could do on your other solutions.

This led to situations where teams started negotiating the terms of a Cassandra roll-out. One team I worked with wasn’t allowed to put Cassandra in any critical path close to customers. Ironically, when the systems in the critical path started failing, the only system that could withstand the conditions that led to their failure was the much-maligned Cassandra cluster.

Then, a new breed of database appeared that tried to capitalize on the fear of non-relational databases. It was called NewSQL and promised full ACID transactions along with Cassandra-like resiliency, but NewSQL never quite worked out when real-world failures presented themselves. That’s how infrastructure goes: It burns half-baked ideas to the ground and calls in a welcoming party for the good ideas.

Stage 4: Depression –  “I’m so sad, why bother with anything?”

Cassandra started gaining traction in every corner of the tech world. As the solutions implemented to avoid this inevitability failed, fighting the future became less and less appealing. There was a massive growth period when the early adopters became late adopters and they were talking. The relational database holdouts finally just stopped talking about it and did something else. Many decided to move to data warehousing where they could put their amazing SQL skills to use via complex queries.

Stage 5: AcceptanceThe individual embraces the inevitable future.

And then, there was a moment, and nobody knows exactly when it was, that Cassandra became a mainstream database. It might have been when everywhere you looked there was yet another great use case being talked about. As the saying went, anyone doing something at scale on the Internet was probably using Cassandra. For me, the moment I realized Cassandra had finally been accepted was when I saw large numbers of database administrators signing up for training on DataStax Academy. It was like a big shift had occurred in the day-in, day-out world of databases. Application developers were always pushing the cutting edge, but administrators had to keep those applications running until they were replaced, and their new foundation of choice was Cassandra.

When you think about it, you really see the same reaction to every new paradigm-shifting technology. The early days of the computer, the Internet, and now blockchain all faced the same fear and doubt as the early days of Cassandra. Collectively—we deny the truth, rage at inevitability, scramble for an alternative, fall into despair, and finally accept and embrace our new reality. What comes after Cassandra is anyone’s guess, but as with people, usually the best kind of change comes little by little and goes almost completely unnoticed until it’s staring you in the face, and you say, “Wow — you’ve changed!” Here’s to the Cassandra of the past, the present, and the future.

Cassandra Turns 10!

CELEBRATE HERE