Scylla University: Take the Challenge and Save the Planet!

Scylla University Promo

 

Take the Scylla University Challenge!

You’ve probably already heard about Scylla University. It’s our training resource center offering free courses you can take online at your own pace.

All you need to do is register and start taking lessons.

To keep you motivated, the first 5 users who complete all the available lessons with the most points will receive a cool Scylla t-shirt!

Points are awarded for:

  • Completing lessons including quizzes and hands-on labs, with a high score
  • Submitting a feedback form
  • General suggestions, feedback, and input (just shoot me a message on the #scylla-university channel on Slack)

Winners will be announced on Wednesday, August 14.

New Course: The Mutant Monitoring System

We recently published The Mutant Monitoring System (MMS), a new course which covers key concepts such as setup, failover, compactions, multiple data centers, and integrations with third-party applications, all within the context of saving Earth from a mutant invasion!

This course is suitable for administrators and developers who want to learn how to use Scylla within their applications in various programming languages. If you are new to Scylla, we recommend completing the Scylla Essentials course first. Each lesson includes hands-on examples. Practice makes perfect!

The Mutant Monitoring System Backstory

Mutants have emerged from the shadows and are now wreaking havoc on Earth! Increased levels of malicious mutant behavior pose a threat to national security and to the general public. To better protect its citizens and understand more about the mutants, the Government has enacted the Mutant Registration Act. The overall mission of the course is to help the Government keep the Mutants under control by building a Mutant Monitoring System. Finish this course, and we just may save the Earth! So let’s get to it!

Mutant Monitoring Systems

Course Lessons

Building a Mutant Monitoring System: Learn how to create the Scylla infrastructure to start building out the MMS. You’ll setup a 3-node Scylla Cluster, cqlsh, create a keyspace and table, read and write data. The Mutant Catalog is the first keyspace that will be created to gather basic metrics for each mutant such as first name, last name, address, and photo.

mmsCreate the Tracking System: This lesson covers how to build the tracking system keyspace and table that will allow us to keep track of the following mutant metrics: Name, Timestamp, Location, Speed, Velocity, Heat, Telepathy powers, primary key, clustering key, compaction, and some more cqlsh.

Set the Consistency Level: Learn about consistency levels, a node failure scenario, adding a node and repairing a cluster.

Configure Multi-Datacenter Replication: The lesson covers using multiple data centers, configuring keyspaces for multi DC, what happens if a DC is down in a multi DC environment.

mmsConfigure Multi-datacenter Consistency Levels: Learn about consistency level (CL) options that are available when using Scylla with multiple data centers and how the CL affects queries in a failure scenario with multiple data centers.

Monitoring: Set up the Scylla Monitoring stack to monitor nodes and examine important details such as performance, latency, node availability, and more. The lesson covers setting up the monitoring stack and configuring it, using cassandra-stress, different metrics, and dashboards.

Backup and Restore: Learn how to backup and restore data from the Mutant Monitoring System. We’ll see how to back up the data, simulate data loss and restore the data.

mmsImplement a Java Client: Part 1: See how to use the Java driver to interact with a Scylla cluster. You’ll create a sample Java application, setup the cluster and run the application to perform simple queries. When creating applications that communicate with Scylla, it is crucial that the programming language being used has support for database connectivity. Since Scylla is compatible with Cassandra, we can use any of the available Cassandra libraries.

Implement a Java Client, Part 2: Learn how to use the Java driver to interact with Scylla in a more efficient way, using Prepared Statements. A prepared statement is a query that is parsed by Scylla and then saved for later use. One of the valuable benefits is that you can continue to reuse that query and modify variables in the query to match variables such as names, addresses, and locations. You’ll get a Java application example that you can run.

Set Up Materialized Views: This lesson covers Materialized Views, which automate the tedious and inefficient work that must be done when an application maintains several tables with the same data that’s organized differently.

mutant monitoring system day 11 - coding with JavaImplement a Java Client, Part 3: Learn how to work with the blob datatype as we explore the Java programming language a bit further. We’ll use an interesting feature of Scylla that would allow us to store files in the database. With this ability, we can store images of the mutants in the catalog keyspace.

Analyze Data with Spark (coming soon): This lesson demonstrates how to use Spark with Scylla. We will go over how to use Apache Spark to analyze and visualize the data from the Mutant Monitoring system. As our mutant data increases in size over time, we need to be able to quickly analyze it using a cluster of machines and that is where Apache Spark comes in. Apache Spark is a distributed system and unified analytics engine for large-scale data processing.

Join the  #scylla-university channel on Slack for more training related discussions.

The post Scylla University: Take the Challenge and Save the Planet! appeared first on ScyllaDB.

Storing More Data and Saving Money with DSE vs. Open Source Cassandra

When the NoSQL movement began years ago, there was intense debate on which NoSQL databases were best for heavy-lifting applications with lots of data. Today, that discussion is settled, with those in the know acknowledging that Apache Cassandra™ is the top NoSQL engine for tackling large volumes of operational data.  

Cassandra’s masterless, divide-and-conquer architecture easily sails past all other databases that are either master-slave or multi-master in design, and ensures that your applications are future-proofed where scale is concerned. 

That said, open source Cassandra has a recommended storage limitation of around one terabyte of data per machine (or virtual machine for those carving up larger hardware). This is due to the overhead involved in streaming operations when nodes are added, removed or replaced, or for standard operational activities such as compaction and repairs. For extraordinarily large databases, this restriction can lead to negative management and cost implications. 

At DataStax, we’re acutely aware of this limitation and have a number of projects in the works to address it. In fact, some of our recent internal testing has shown that, with DataStax Enterprise and the improvements delivered in our advanced performance suite, you can store 2-4X the amount of compressed data per node (physical or virtual) and therefore realize some nice productivity and cost savings when building out your on-premises and cloud applications, all the while maintaining optimized levels of performance.

The Details 

Our technical teams have confirmed that due to our advanced performance enhancements and storage engine optimizations, out-of-the-box, DSE can store more than double the amount of compressed data per node for general applications as open source Cassandra. For time-series-styled applications (e.g., IoT), DSE can handle 4X the amount of compressed data over open source.  

Note that this is compressed data and not raw data size. Compression in DSE can reduce your overall data footprint by 25-33%, plus net you some nice read/write performance benefits as well. Keep that in mind when doing the deployment math for your clusters (i.e., you can store more data/node that you think).  

How does DSE pull this off? Long story short, it involves a number of the performance architecture enhancements and storage engine optimizations made in DSE 6, which you can read about here. We’ve got even more goodness coming on this front in soon-to-be-released versions of DSE, but that’s all I’ll say about that right now. 

Customer Examples

So what are some real-life examples of this in action? 

One of our customers is a large media content delivery company. Like many, they started on open source Cassandra and cobbled together other supporting open source technologies while taking advantage of cloud hosting from one of the “big 3” cloud service providers. 

As they grew, so did their environment along with a massive increase in the operational expenses to maintain an environment in excess of 300 open source Cassandra nodes. Worse, they expected their cloud costs to triple in the next three years.  

They came to DataStax in hopes of reducing their forecasted expenses and were not disappointed. With DSE’s advanced performance suite and ability to store more data than open source, they were able to save $3.2 million. OpsCenter and other management automation saved them another $900K, and an added bonus was that they were able to eliminate a MongoDB search cluster with DSE’s integrated search that saved them another $2.7 million. 

A nearly $7 million savings over three years: not bad! 

Another customer we’re currently doing something similar with is a major name in the oil and gas industry. As part of their focus on moving to standardized technologies, they have been comparing the true cost of open source Cassandra vs. a solution like DSE.  

We were brought in to conduct a collaborative multi-year build vs. partner analysis that looked at multiple areas, with some eye-popping conclusions:

  • They will be able to reduce cloud spend by 30-40% based on DSE advanced performance and improved storage, with the estimate being approximately $13M over five-plus years
  • The reduction of development costs and gain of self-management tools to manage, monitor and provision Apache Cassandra yields a $3M savings over five years 
  • Formal support and services provide another couple of million in savings coupled with a six- to nine-month reduction in getting their applications to market.

Again, nothing to sneeze at. 

Scale Out vs. Up Considerations 

One caveat on this topic is worth mentioning. While it’s tempting to put as much data as possible on every machine in a cluster, you need to ensure you don’t jeopardize other aspects of your deployment such as uptime and overall capacity potential. 

For example, maybe you can get away with only three or four nodes from a data volume standpoint, but you should keep in mind that if one of those nodes goes down, you immediately lose 25-33% of your capacity, and that could be a big deal breaker where your application is concerned.

Wrap Up

Today, the smart database choice for managing large amounts of distributed data at scale remains Cassandra. With DataStax Enterprise, you can manage more data per node than open source, saving yourself both time and money.

You can download DataStax Enterprise now, try it out with no restrictions and see how much of both you can save. Make sure to check out our online documentation as well as our free online courses at DataStax Academy if you’re just getting started with us.

Learn More: DataStax Accelerate Sessions

WATCH NOW

Evolution of Netflix Conductor:

v2.0 and beyond

By Anoop Panicker and Kishore Banala

Conductor is a workflow orchestration engine developed and open-sourced by Netflix. If you’re new to Conductor, this earlier blogpost and the documentation should help you get started and acclimatized to Conductor.

Netflix Conductor: A microservices orchestrator

In the last two years since inception, Conductor has seen wide adoption and is instrumental in running numerous core workflows at Netflix. Many of the Netflix Content and Studio Engineering services rely on Conductor for efficient processing of their business flows. The Netflix Media Database (NMDB) is one such example.

In this blog, we would like to present the latest updates to Conductor, address some of the frequently asked questions and thank the community for their contributions.

How we’re using Conductor at Netflix

Deployment

Conductor is one of the most heavily used services within Content Engineering at Netflix. Of the multitude of modules that can be plugged into Conductor as shown in the image below, we use the Jersey server module, Cassandra for persisting execution data, Dynomite for persisting metadata, DynoQueues as the queuing recipe built on top of Dynomite, Elasticsearch as the secondary datastore and indexer, and Netflix Spectator + Atlas for Metrics. Our cluster size ranges from 12–18 instances of AWS EC2 m4.4xlarge instances, typically running at ~30% capacity.

Components of Netflix Conductor
* — Cassandra persistence module is a partial implementation.

We do not maintain an internal fork of Conductor within Netflix. Instead, we use a wrapper that pulls in the latest version of Conductor and adds Netflix infrastructure components and libraries before deployment. This allows us to proactively push changes to the open source version while ensuring that the changes are fully functional and well-tested.

Adoption

As of writing this blog, Conductor orchestrates 600+ workflow definitions owned by 50+ teams across Netflix. While we’re not (yet) actively measuring the nth percentiles, our production workloads speak for Conductor’s performance. Below is a snapshot of our Kibana dashboard which shows the workflow execution metrics over a typical 7-day period.

Dashboard with typical Conductor usage over 7 days
Typical Conductor usage at Netflix over a 7 day period.

Use Cases

Some of the use cases served by Conductor at Netflix can be categorized under:

  • Content Ingest and Delivery
  • Content Quality Control
  • Content Localization
  • Encodes and Deployments
  • IMF Deliveries
  • Marketing Tech
  • Studio Engineering

What’s New

gRPC Framework

One of the key features in v2.0 was the introduction of the gRPC framework as an alternative/auxiliary to REST. This was contributed by our counterparts at GitHub, thereby strengthening the value of community contributions to Conductor.

Cassandra Persistence Layer

To enable horizontal scaling of the datastore for large volume of concurrent workflow executions (millions of workflows/day), Cassandra was chosen to provide elastic scaling and meet throughput demands.

External Payload Storage

External payload storage was implemented to prevent the usage of Conductor as a data persistence system and to reduce the pressure on its backend datastore.

Dynamic Workflow Executions

For use cases where the need arises to execute a large/arbitrary number of varying workflow definitions or to run a one-time ad hoc workflow for testing or analytical purposes, registering definitions first with the metadata store in order to then execute them only once, adds a lot of additional overhead. The ability to dynamically create and execute workflows removes this friction. This was another great addition that stemmed from our collaboration with GitHub.

Workflow Status Listener

Conductor can be configured to publish notifications to external systems or queues upon completion/termination of workflows. The workflow status listener provides hooks to connect to any notification system of your choice. The community has contributed an implementation that publishes a message on a dyno queue based on the status of the workflow. An event handler can be configured on these queues to trigger workflows or tasks to perform specific actions upon the terminal state of the workflow.

Bulk Workflow Management

There has always been a need for bulk operations at the workflow level from an operability standpoint. When running at scale, it becomes essential to perform workflow level operations in bulk due to bad downstream dependencies in the worker processes causing task failures or bad task executions. Bulk APIs enable the operators to have macro-level control on the workflows executing within the system.

Decoupling Elasticsearch from Persistence

This inter-dependency was removed by moving the indexing layer into separate persistence modules, exposing a property (workflow.elasticsearch.instanceType) to choose the type of indexing engine. Further, the indexer and persistence layer have been decoupled by moving this orchestration from within the primary persistence layer to a service layer through the ExecutionDAOFacade.

ES5/6 Support

Support for Elasticsearch versions 5 and 6 have been added as part of the major version upgrade to v2.x. This addition also provides the option to use the Elasticsearch RestClient instead of the Transport Client which was enforced in the previous version. This opens the route to using a managed Elasticsearch cluster (a la AWS) as part of the Conductor deployment.

Task Rate Limiting & Concurrent Execution Limits

Task rate limiting helps achieve bounded scheduling of tasks. The task definition parameter rateLimitFrequencyInSeconds sets the duration window, while rateLimitPerFrequency defines the number of tasks that can be scheduled in a duration window. On the other hand, concurrentExecLimit provides unbounded scheduling limits of tasks. I.e the total of current scheduled tasks at any given time will be under concurrentExecLimit. The above parameters can be used in tandem to achieve desired throttling and rate limiting.

API Validations

Validation was one of the core features missing in Conductor 1.x. To improve usability and operability, we added validations, which in practice has greatly helped find bugs during creation of workflow and task definitions. Validations enforce the user to create and register their task definitions before registering the workflow definitions using these tasks. It also ensures that the workflow definition is well-formed with correct wiring of inputs and outputs in the various tasks within the workflow. Any anomalies found are reported to the user with a detailed error message describing the reason for failure.

Developer Labs, Logging and Metrics

We have been continually improving logging and metrics, and revamped the documentation to reflect the latest state of Conductor. To provide a smooth on boarding experience, we have created developer labs, which guides the user through creating task and workflow definitions, managing a workflow lifecycle, configuring advanced workflows with eventing etc., and a brief introduction to Conductor API, UI and other modules.

New Task Types

System tasks have proven to be very valuable in defining the Workflow structure and control flow. As such, Conductor 2.x has seen several new additions to System tasks, mostly contributed by the community:

Lambda

Lambda Task executes ad-hoc logic at Workflow run-time, using the Nashorn Javascript evaluator engine. Instead of creating workers for simple evaluations, Lambda task enables the user to do this inline using simple Javascript expressions.

Terminate

Terminate task is useful when workflow logic should terminate with a given output. For example, if a decision task evaluates to false, and we do not want to execute remaining tasks in the workflow, instead of having a DECISION task with a list of tasks in one case and an empty list in the other, this can scope the decide and terminate workflow execution.

ExclusiveJoin

Exclusive Join task helps capture task output from a DECISION task’s flow. This is useful to wire task inputs from the outputs of one of the cases within a decision flow. This data will only be available during workflow execution time and the ExclusiveJoin task can be used to collect the output from one of the tasks in any of decision branches.

For in-depth implementation details of the new additions, please refer the documentation.

What’s next

There are a lot of features and enhancements we would like to add to Conductor. The below wish list could be considered as a long-term road map. It is by no means exhaustive, and we are very much welcome to ideas and contributions from the community. Some of these listed in no particular order are:

Advanced Eventing with Event Aggregation and Distribution

At the moment, event generation and processing is a very simple implementation. An event task can create only one message, and a task can wait for only one event.

We envision an Event Aggregation and Distribution mechanism that would open up Conductor to a multitude of use-cases. A coarse idea is to allow a task to wait for multiple events, and to progress several tasks based on one event.

UI Improvements

While the current UI provides a neat way to visualize and track workflow executions, we would like to enhance this with features like:

  • Creating metadata objects from UI
  • Support for starting workflows
  • Visualize execution metrics
  • Admin dashboard to show outliers

New Task types like Goto, Loop etc.

Conductor has been using a Directed Acyclic Graph (DAG) structure to define a workflow. The Goto and Loop on tasks are valid use cases, which would deviate from the DAG structure. We would like to add support for these tasks without violating the existing workflow execution rules. This would help unlock several other use cases like streaming flow of data to tasks and others that require repeated execution of a set of tasks within a workflow.

Support for reusable commonly used tasks like Email, DatabaseQuery etc.

Similarly, we’ve seen the value of shared reusable tasks that does a specific thing. At Netflix internal deployment of Conductor, we’ve added tasks specific to services that users can leverage over recreating the tasks from scratch. For example, we provide a TitusTask which enables our users to launch a new Titus container as part of their workflow execution.

We would like to extend this idea such that Conductor can offer a repository of commonly used tasks.

Push based task scheduling interface

Current Conductor architecture is based on polling from a worker to get tasks that it will execute. We need to enhance the grpc modules to leverage the bidirectional channel to push tasks to workers as and when they are scheduled, thus reducing network traffic, load on the server and redundant client calls.

Validating Task inputKeys and outputKeys

This is to provide type safety for tasks and define a parameterized interface for task definitions such that tasks are completely re-usable within Conductor once registered. This provides a contract allowing the user to browse through available task definitions to use as part of their workflow where the tasks could have been implemented by another team/user. This feature would also involve enhancing the UI to display this contract.

Implementing MetadataDAO in Cassandra

As mentioned here, Cassandra module provides a partial implementation for persisting only the workflow executions. Metadata persistence implementation is not available yet and is something we are looking to add soon.

Pluggable Notifications on Task completion

Similar to the Workflow status listener, we would like to provide extensible interfaces for notifications on task execution.

Python client in Pypi

We have seen wide adoption of Python client within the community. However, there is no official Python client in Pypi, and lacks some of the newer additions to the Java client. We would like to achieve feature parity and publish a client from Conductor Github repository, and automate the client release to Pypi.

Removing Elasticsearch from critical path

While Elasticsearch is greatly useful in Conductor, we would like to make this optional for users who do not have Elasticsearch set-up. This means removing Elasticsearch from the critical execution path of a workflow and using it as an opt-in layer.

Pluggable authentication and authorization

Conductor doesn’t support authentication and authorization for API or UI, and is something that we feel would add great value and is a frequent request in the community.

Validations and Testing

Dry runs, i.e the ability to evaluate workflow definitions without actually running it through worker processes and all relevant set-up would make it much easier to test and debug execution paths.

If you would like to be a part of the Conductor community and contribute to one of the Wishlist items or something that you think would provide a great value add, please read through this guide for instructions or feel free to start a conversation on our Gitter channel, which is Conductor’s user forum.

We also highly encourage to polish, genericize and share any customizations that you may have built on top of Conductor with the community.

We really appreciate and are extremely proud of the community involvement, who have made several important contributions to Conductor. We would like to take this further and make Conductor widely adopted with a strong community backing.

Netflix Conductor is maintained by the Media Workflow Infrastructure team. If you like the challenges of building distributed systems and are interested in building the Netflix Content and Studio ecosystem at scale, connect with Charles Zhao to get the conversation started.

Thanks to Alexandra Pau, Charles Zhao, Falguni Jhaveri, Konstantinos Christidis and Senthil Sayeebaba.


Evolution of Netflix Conductor: was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Scylla Manager 1.4.2 Release Announcement

Scylla Manager Release Note

The Scylla team is pleased to announce the release of Scylla Manager 1.4.2.

Scylla Manager is a management system that automates maintenance tasks on a Scylla cluster.

Release 1.4.2 is a bug fix release of the Scylla Manager 1.4 release

Related Links

Bugs fixed in this release

  • Scylla Manager on Docker: fail to access the store certificates directory, Manager fails with “invalid memory address or nil pointer dereference” error.
  • More informative error message when the connection to Scylla node fails
  • Connect to a Scylla cluster backend using token aware policy
  • Manager Go driver update from 1.0.1 to 1.2.0

The post Scylla Manager 1.4.2 Release Announcement appeared first on ScyllaDB.

Kiwi.com Takes Flight on Scylla

Kiwi.com blog post graphic

About Kiwi.com

Kiwi.com is an online flight booking platform that builds customized travel itineraries by assembling flight combinations from multiple airlines. Using this approach Kiwi.com saves travellers money on airline tickets by generating itineraries that mix-and match global airlines with local carriers, finding the best price for the trip as a whole.

At Scylla Summit 2018, we were joined by two speakers from Kiwi.com covering both the technical and business aspects of their migration from Cassandra to Scylla. The topics they covered include Cassandra to Scylla migration, benchmarking on two popular cloud providers’ of bare metal instances, and analysis of performance results that focus on full table scans.

In his presentation, Jan Plhak, Head of C++ Development, discussed how the nature of Kiwi.com’s data creates scaling challenges. Kiwi.com stores data on 100,000 flights a day, and 35 million flights a year. That’s not much data. In fact, as Jan pointed out, your phone can store that. What makes it challenging is that Kiwi.com stores flight combinations. This results in 7 billion flight entries, and a replicated dataset of 20 terabytes. A phone can’t store that.

With this background, Jan related Kiwi.com’s journey to Scylla. The team initially used PostgreSQL, but in order to scale, PostgreSQL required custom sharding, with 60 database instances and 60 Redis caches. Jan referred ironically described this topology as ‘pure joy’.

The team reasoned that a NoSQL database was more appropriate to their use case, so they turned to Apache Cassandra.  Ultimately, Cassandra proved unable to scale up, even as the team added more and more nodes. Even worse, the team was required to write custom code to read Cassandra SStables, creating problems with maintenance and upgrades.

“If you’re considering moving from Cassandra to Scylla, I don’t know what’s holding you back!”

Martin Strycek, Engineering Manager, Kiwi.com

Massive Full Table Scans

After discussing the journey to Scylla, Jan went into some detail about the requirements for full table scans, and why Cassandra was not up to the task. Cassandra’s limitations forced the team to implement a custom scanning service to read newly created SStables and stream updates to the cache, Scylla made it easy and safe to do performant full-table scans.

Kiwi.com’s precomputation engine requires all of the data, updated every hour. That load, combined with secondary production and testing put a strain on the production databases. With Cassandra, the team saw CPU overload and massive latency spikes. Jan ascribed this to Cassandra’s underlying Java implementation, as well as the inability to write a query that would read only the most recently updated data.

The Kiwi.com team attempted a Cassanadra workaround. Since Cassandra stores immutable data in SSTables during compaction, they could create a service to parse new SSTables, and then stream that data to the cache. The data from the cache could in turn be used to feed the preprocessing engine while sidestepping Cassandra. Jan described this workaround as ‘opening a Pandora’s box’.

Luckily, Scylla made it possible to close this Pandora’s box for good. Scylla enables continuous full table scans that filter for last-update-timestamp. Scylla can also handle token ranges without overloading. This solved many of Kiwi.com’s problems, in particular building workarounds on Cassandra’s internal, undocumented, unsupported format.

The Migration from Cassandra

Martin Strycek, engineering manager at Kiwi.com spoke to the migration process from Cassandra to Scylla, and provided some context involving TCO. Martin said that Kiwi.com first migrated to Cassandra from a big PostgreSQL cluster to get better performance and scalability, but their demands never stopped growing.

Martin covered the way his team approached testing of Scylla, the migration plan, how it impacts the business and Kiwi.com’s high-level application and infrastructure architecture. In Martin’s view, Scylla has had a significant impact on disaster recovery and availability of the overall system.

According to Martin, Kiwi.com quickly settled on Scylla as a drop-in replacement for Cassandra, but they wanted to prove it out under real-life conditions before making the leap. With a healthy scepticism for vendor benchmarks, Kiwi.com set out to independently evaluate Cassandra versus Scylla. To do so, the team defined equivalent configurations, traffic volumes, and workloads based on the Cassandra benchmark.

The goal was to test Scylla raw speed and performance, along with Scylla’s support for Kiwi.com’s specific workloads. They also wanted some insight into running on bare metal or on a cloud platform, testing GCP versus OVH, popular cloud provider in Europe. The final goal of the POC was to evaluate Scylla’s cost relative to the Cassandra cluster they were running.

Overall, Martin used three approaches to testing:

  • synthetic benchmarks
  • shadowing production traffic
  • internal benchmarking tool for reads

Kiwi.com worked closely with the Scylla team to establish success criteria for the POC. Once the test bed of five nodes each was set up, Kiwi.com ran a set of synthetic benchmarks, shadowed production traffic, and used internal monitoring tools for reads.

Their tests demonstrated a stark difference between the two databases. With a replication factor of 4, Cassandra required 100 nodes to achieve 40K reads per second. With only 21 nodes, Scylla was able to achieve 900K reads per second.

Scylla vs. Cassandra Table

Best of all, Kiwi.com discovered that the running cost of Scylla would be about 25% the cost of Cassandra. Martin provided a detailed breakdown of the hardware costs of running Cassandra versus Scylla, on bare metal and Google Cloud Platform:

Comparison Table between Cassandra and Scylla on cloud platform

A comparison of Kiwi.com’s hardware costs between Cassandra and Scylla on cloud platforms

Having made the decision to go with Scylla, the team undertook the migration to GCP and OVH instances running in multiple cities and geographical regions. In fact, Martin’s team installed the final server in the Scylla cluster just before the presentation, displaying shadow traffic from the live system.

Martin pointed out that Kiwi.com is also excited about Scylla’s roadmap. The ability to prioritize production traffic over analytics will be a huge advantage, since the many algorithms that Kiwi.com runs against the Scylla clusters will have no discernable impact on the customer experience.

Martin wrapped up his Scylla Summit talk by encouraging the audience to “never stop innovating”, stating, “This is the bottom line. If you are considering going from Scylla to Cassanadra, I don’t know why you didn’t do that last week!”

You can watch Jan’s full presentation (with slides), Kiwi.com Takes Flight with Scylla, and Martin’s Kiwi.com’s Migration to Scylla: The Why, the How, the Fails and the Status, from Scylla Summit 2018 in our Tech Talks section. And if you enjoy these in-depth technical insights from Kiwi.com as other NoSQL industry leaders, this is also a good reminder that registration is now open for Scylla Summit 2019.





Register Now for Scylla Summit 2019!

If you enjoyed reading about Kiwi.com’s use case, and want to learn more about how to get the most out of Scylla and your big data infrastructure, sign up for Scylla Summit 2019, coming up this November 5-6, 2019 in San Francisco, California.

Register for Scylla Summit Now!

The post Kiwi.com Takes Flight on Scylla appeared first on ScyllaDB.

OpenNMS Newts: A Time-Series Database for Network Monitoring, Built on Scylla

Open NMS Blog Graphic

The CTO of OpenNMS, Jesse White, joined us at our Scylla Summit last year to talk about the OpenNMS platform and specifically Newts, a time-series database embedded within the platform that relies on Scylla as a high-performance, scalable storage engine.

OpenNMS is an open source, enterprise-grade platform for building network monitoring solutions. It helps IT teams keep an eye on the routers, switches, servers, and software applications that make up modern networks. Designed to scale from day one, OpenNMS is a complete platform that encompasses multiple applications, SQL and NoSQL databases, as well as various vendor integrations. OpenNMS ships under the AGPL v2.0 license.

Jesse’s path to Scylla began when he started using OpenNMS in 2012, became a contributor, and then joined the OpenNMS group in 2014. Beginning work on Newts, Jesse had his first run-in with Cassandra. He thereafter encountered Scylla in 2017. Just a year later he was presenting the Scylla-based solution at Scylla Summit in 2018.

“OpenNMS has to scale since it ingests events from a huge variety of sources,” Jesse explained. “It supports poll-based approaches to collecting metrics via JMX JDBC, SNMP. It also supports streaming telemetry, where devices stream metrics to the platform. For example, OpenNMS parses syslog messages, transforms them into faults when needed, and issues alerts and notifications, as well as reporting on SLA violations.”

“Using Scylla, we were able to get almost double the results that we saw with Cassandra.”

-Jesse White, CTO of OpenNMS

According to Jesse, these datasets are growing exponentially. Collection, storage, analysis, and reporting are becoming more challenging, while the results are increasingly valuable. “For time-series metrics and network monitoring, people tend to collect everything in case there’s a problem. For that reason, we realized that we write a lot more data than we read. We wind up collecting much more data than our users ever see. So we need a solution that’s optimized for writes.”

Jesse’s team developed Newts to meet these requirements for scale. Newts started as a time-series storage strategy based on Cassandra and designed for high throughput and linear scale. It was also designed for grouped access, that collects, stores, and retrieves groups of related metrics.

Another feature of Newts is ‘late aggregation’; instead of aggregating data when it is stored, Newts aggregates data when it is read. Other time-series solutions perform in-line aggregations of the entire dataset, even when the ratio of reads to writes is small. With late aggregation, Newts removes one bottleneck to storage, eliminating the overhead of unused aggregate values.

In his Scylla Summit presentation, Jesse put particular emphasis on the graphing capabilities and presented a visual sample of time-series data in OpenNMS.

Time Series Graph

A graph of data in OpenNMS. The graph displays bits in and out over a network interface over a 12-hour period.

Before Newts, OpenNMS relied on RRDtool, a set of open source utilities for time-series storage and graphing. RRDtool stores time-series metrics in flat files or binary files on disk. These files are of a fixed size, allocated whenever an RRD file is created.  According to Jesse, “A decade ago, RRDTool’s algorithms were well-suited to our requirements, but they fell short of scaling to current demands. We needed to take a new direction, one that prioritizes write-optimized storage, and that scales beyond a single host.”

Jesse explained the process: “You define the file name, how many different data sources, how data is aggregated, how frequently you expect to push data to it, and so on.” Once you define the file, you can push metrics, retrieve data, and graph it. This was how OpenNMS was originally instrumented.”

To scale the system, a new layer of tooling was needed. For the Newts use case, write performance was paramount. With that in mind, the Newts team wanted to compare Cassandra with Scylla. They installed Cassandra on 5 i3.4xlarge AWS instances and achieved just over 1M samples per second. They then tested Scylla in a similar environment.  “Using Scylla, we were able to get almost double (1.8x) the results that we saw with Cassandra, with almost no changes at all. Out-of-the-box, swapping Cassandra for Scylla with no optimizations, we were able to achieve almost 2x throughput.”

OpenNMS has been using Newts since 2015. The Newts project is hosted on GitHub, and published under AGPL v2.0 license. It’s available for all OpenNMS users who want to scale out their time-series data beyond a single host.

Jesse wrapped up his Scylla Summit talk by stating that “Newts is relatively stable and complete, and it makes for a strong engine that can be used to retrieve and write time-series data from applications.”

You can watch Jesse’s full presentation (with slides) from Scylla Summit 2018 in our Tech Talks section. And if you enjoy these in-depth technical insights from OpenNMS as other NoSQL industry leaders, this is also a good reminder that registration is now open for Scylla Summit 2019.



Register Now for Scylla Summit 2019!

If you enjoyed reading about OpenNMS’ implementation of a time series database on top of Scylla, and want to learn more about how to get the most out of Scylla and your big data infrastructure, sign up for Scylla Summit 2019, coming up this November 5-6, 2019 in San Francisco, California.

Register for Scylla Summit Now!

The post OpenNMS Newts: A Time-Series Database for Network Monitoring, Built on Scylla appeared first on ScyllaDB.

Hardening Cassandra Step by Step - Part 2 Hostname Verification for Internode Encryption

Overview

This article looks at hostname verification for internode encryption, which is designed to prevent man-in-the-middle attacks. This is a follow-up to the first hardening Cassandra post that explored internode encryption. If you have not already done so, take a moment and read through the earlier post before proceeding.

Hostname verification for internode encryption was added in CASSANDRA-9220 and made available in Cassandra 3.6. Clusters that solely rely on importing all node certificates into each truststore are not affected. However, clusters that use the same, common certificate authority (CA) to sign node certificates are potentially affected. When the CA signing process allows other parties to generate certificates for different purposes, those certificates can in turn be used for man-in-the-middle attacks. In the interest of hardening your Cassandra clusters, the instructions below will walk through enabling hostname verification so that we can:

  • check that the node certificate is valid
  • check that the certificate has been created for the node to which we are about to connect

Hostname verification can only be done in an application layer on top of Transport Layer Security (TLS) so HTTPS is required, as the process is fully integrated. Here, the HTTPS client verifies that communication is with the correct server. The client checks that the dnsName in the subjectAltName field in the certificate sent from the server matches the host part of the URL. This prevents attackers impersonating or redirecting communications between nodes in the cluster.

In this article we will demonstrate this set up by going step by step with a cluster in AWS that is equipped with internode encryption, including:

  • client certificate authentication
  • hostname verification

There are a lot of steps involved to set everything up. Some steps need to performed locally, while others need to performed on the EC2 instances running in AWS. Because doing everything manually is tedious and error prone, we will use a number of tools to automate the whole process, namely tlp-cluster and Ansible.

The tlp-cluster tool is used to provision EC2 instances and the Cassandra cluster. Ansible is used to generate all of the certificates and to apply the necessary configuration changes to the Cassandra nodes.

The next couple sections provide instructions on how to install the tools and how to run the automation. After running through the automation you should have a fully operational cluster that is configured with internode encryption and hostname verification.

The remainder of the post will then highlight and explain key steps carried out by the automation.

Setup

This section outlines the prerequisites needed for the automation.

Make sure that the following are installed:

Clone the tlp-cluster and the tlp-ansible repos in the same parent directory:

$ cd ~/Development
$ git clone https://github.com/thelastpickle/tlp-cluster
$ git clone https://github.com/thelastpickle/tlp-ansible

Install tlp-cluster

tlp-cluster requires that you have an AWS access key and secret. To get started, add the tlp-cluster/bin directory to your $PATH to avoid having to always type the path to the tlp-cluster executable. For example:

$ export PATH="$PATH:/path/to/tlp-cluster/bin"
$ cd /path/to/tlp-cluster
./gradlew assemble

Run setup.sh

Next we need to run the setup.sh script, which lives in the tlp-ansible repo. The script will run tlp-cluster to provision a cluster in AWS and then generate an Ansible inventory file at tlp-ansible/inventory/hosts.tlp_cluster.

Note that /tmp/cassandra-tls-lab is used as a working directory. setup.sh will create the directory if it does not exist. All work is done in this directory.

If you have not run tlp-cluster before it will prompt you for some input, notably your AWS credentials.

setup.sh assumes tlp-cluster and tlp-ansible live in the same parent directory. If this is true, then it can simply be run without any arguments:

$ cd tlp-ansible
$ ./playbooks/tls_lab/setup.sh

However, if tlp-cluster and tlp-ansible have different parent directories, then you will have to provide the path to tlp-cluster:

$ cd tlp-ansible
$ ./playbooks/tls_lab/setup.sh
Usage: setup.sh <path-to-tlp_cluster-repo>

After setup.sh finishes, you will have a three node cluster set up in AWS. Cassandra however is not started on any of the machines since we will be applying further configuration changes for internode encryption.

To access the EC2 instances do the following:

$ cd /tmp/cassandra-tls-lab
$ alias ssh="ssh -F sshConfig"

Then you can conveniently log into your EC2 instances with:

$ ssh cassandra[0|1|2]  # e.g., ssh cassandra2

Run the Ansible Playbooks

Next, we will run Ansible playbooks, which will do the following:

  1. Create a Certificate Authority (CA)
  2. Create a truststore
  3. Create node keystores
  4. Configure the cluster

The node certificates need to have a Subject Alternative Name (SAN) in order for hostname verification to work.

We will first configure the cluster using certificates that do not have a SAN. This will allow us to see what kinds of errors may occur.

Run the following from the tlp-ansible directory:

$ ansible-playbook -i inventory/hosts.tlp_cluster playbooks/tls_lab/internode_tls_no_ssl_ext.yml

The internode_tls_no_ssl_ext.yml playbook generates all of the keys, certificates, and keystores locally. It then copies the certificates and keystores to the EC2 machines. Lastly, it updates /etc/cassandra/cassandra.yaml and then starts Cassandra on each machine.

It is worth reiterating that certificates and keystores are created locally. Creating them on the EC2 instances would require copying the CA private key to those machines, which we want to avoid doing. In a production environment, we would want additional measures in place that provide things like:

  • lifecycle management of keys
  • automatic key rotation
  • audit logging
  • protection of keys using a hardware security module (HSM)
  • strict policy controls to prevent misuse of keys

Now log into one of the nodes and check the cluster status.

$ ssh cassandra0
$ nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.42.245  114.48 KiB  256          100.0%            367efca2-664c-4e75-827c-24238af173c9  rack1

It only reports one node. You will find the same if you check the other nodes. This is because the nodes are unable to gossip with another. This error from /var/log/cassandra/system.log reveals the problem:

ERROR [MessagingService-Outgoing-/172.31.7.158-Gossip] 2019-06-06 17:11:30,991 OutboundTcpConnection.java:538 - SSL handshake error for outbound connection to 22081e9e[SSL_NULL_WITH_NULL_NULL: Socket[addr=/172.31.7.158,port=7001,localport=40560]]
javax.net.ssl.SSLHandshakeException: java.security.cert.CertificateException: No subject alternative names present
        at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) ~[na:1.8.0_212]
        at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1946) ~[na:1.8.0_212]
        at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:316) ~[na:1.8.0_212]
        at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:310) ~[na:1.8.0_212]
        at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1639) ~[na:1.8.0_212]
        at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:223) ~[na:1.8.0_212]
        at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1037) ~[na:1.8.0_212]
        at sun.security.ssl.Handshaker.process_record(Handshaker.java:965) ~[na:1.8.0_212]
        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1064) ~[na:1.8.0_212]
        at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1367) ~[na:1.8.0_212]
        at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:750) ~[na:1.8.0_212]
        at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123) ~[na:1.8.0_212]
        at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) ~[na:1.8.0_212]
        at org.apache.cassandra.io.util.BufferedDataOutputStreamPlus.doFlush(BufferedDataOutputStreamPlus.java:323) ~[apache-cassandra-3.11.4.jar:3.11.4]       at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) ~[na:1.8.0_212]
        at org.apache.cassandra.io.util.BufferedDataOutputStreamPlus.doFlush(BufferedDataOutputStreamPlus.java:323) ~[apache-cassandra-3.11.4.jar:3.11.4]
        at org.apache.cassandra.io.util.BufferedDataOutputStreamPlus.flush(BufferedDataOutputStreamPlus.java:331) ~[apache-cassandra-3.11.4.jar:3.11.4]
        at org.apache.cassandra.net.OutboundTcpConnection.connect(OutboundTcpConnection.java:462) [apache-cassandra-3.11.4.jar:3.11.4]
        at org.apache.cassandra.net.OutboundTcpConnection.run(OutboundTcpConnection.java:262) [apache-cassandra-3.11.4.jar:3.11.4]
Caused by: java.security.cert.CertificateException: No subject alternative names present
        at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:145) ~[na:1.8.0_212]
        at sun.security.util.HostnameChecker.match(HostnameChecker.java:94) ~[na:1.8.0_212]
        at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455) ~[na:1.8.0_212]
        at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436) ~[na:1.8.0_212]
        at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:200) ~[na:1.8.0_212]
        at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:124) ~[na:1.8.0_212]
        at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1621) ~[na:1.8.0_212]
        ... 12 common frames omitted

Near the top of message we see the exact cause of the error which happens during the SSL handshake:

java.security.cert.CertificateException: No subject alternative names present

The internode_tls_no_ssl_ext.yml playbook generated certificates that do not have a Subject Alternative Name (SAN). If we were to disable hostname verification on each node (and restart), everything would work fine. We, of course, want to leave hostname verification enabled; so, we will run a playbook that generates SAN certificates.

Run the following from the tlp-ansible directory:

$ ansible-playbook -i inventory/hosts.tlp_cluster playbooks/tls_lab/internode_tls.yml

The internode_tls.yml playbook does everything that was done previously except that it generates SAN certificates.

Now log onto one of the machines and check the cluster status:

$ cd cassandra1
$ nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.29.239  280.9 KiB  256          67.1%             26a5b2e1-1079-4879-9e3f-31366852f095  rack1
UN  172.31.7.158   281.17 KiB  256          66.4%             0fab0bb3-0326-48f1-a49f-e4d8196e460b  rack1
UN  172.31.42.245  280.36 KiB  256          66.5%             367efca2-664c-4e75-827c-24238af173c9  rack1

Nodes are now able to gossip with each other. Look for this message in the log to verify that internode encryption is in fact enabled:

INFO  [main] 2019-06-06 17:25:13,962 MessagingService.java:704 - Starting Encrypted Messaging Service on SSL port 7001

We now have a functioning cluster that uses two-way (or mutual) TLS authentication with hostname verification.

Cleanup

When we are all done with the cluster, run:

$ cd /tmp/cassandra-tls-lab
$ tlp-cluster down -a

This will destroy the EC2 instances.

Review the Steps

One goal of this post is to make it easy to set up a cluster with internode encryption along with hostname verification. We accomplished this by using tlp-cluster and Ansible. We used tlp-cluster to provision EC2 instances and a Cassandra cluster. We then ran Ansible playbooks to configure the Cassandra nodes with internode encryption enabled. This involved generating keys, certificates, and keystore and then applying the necessary configuration changes to Cassandra.

Another goal of this post is to detail how to set up internode encryption and hostname verification without having to be well versed in Ansible. The following sections highlight the key steps for setting up a cluster with internode encryption that includes both client authentication and hostname verification.

Note that everything discussed in the follow sections is implemented in the Ansible playbooks that we previously ran; however, not all details from the playbooks are covered.

Create Certificate Authority

Ansible generates our own CA for signing all of our node certificates. As in the earlier post we use openssl to generate and sign certificates.

Ansible Note: Most of the work done to generate the CA is in the cassandra_ca role which is located at tlp-ansible/roles/cassandra_ca.

First we need a working directory. The setup.sh script creates this for us if it does not already exist at:

/tmp/cassandra-tls-lab/cassandra

Next we need an SSL configuration file. Ansible generates that for us at /tmp/cassandra-tls-lab/cassandra/ssl.cnf. It looks something like:

[ req ]
distinguished_name     = req_distinguished_name
prompt                 = no
output_password        = cassandra
default_bits           = 2048

[ req_distinguished_name ]
C                      = US
ST                     = North Carolina
L                      = Clayton
O                      = The Last Pickle
OU                     = tls_lab
CN                     = CassandraCA
emailAddress           = info@thelastpickle.com

Ansible Note: Ansible uses the Jinja2 templating engine for templates.

Ansible Note: You can update variables in tlp-ansible/roles/tls_common/defaults/main.yml to change the generated output in ssl.cnf.

Next the CA certificate and private key is created with:

$ openssl req -config /tmp/cassandra-tls-lab/cassandra/ssl.cnf -new -x509 -keyout /tmp/cassandra-tls-lab/cassandra/ca.key -out /tmp/cassandra-tls-lab/cassandra/ca.crt -days 365

We can verify the contents of the certificate with:

$ cd /tmp/cassandra-tls-lab/cassandra
$ openssl x509 -in ca.crt -text -noout
Certificate:
    Data:
        Version: 1 (0x0)
        Serial Number:
            b7:b6:74:45:b1:99:26:3b
    Signature Algorithm: sha256WithRSAEncryption
        Issuer: C=US, ST=NC, L=Clayton, O=The Last Pickle, OU=sslverify, CN=CassandraCA/emailAddress=info@thelastpickle.com
        Validity
            Not Before: May 30 17:47:57 2019 GMT
            Not After : May 29 17:47:57 2020 GMT
        Subject: C=US, ST=NC, L=Clayton, O=The Last Pickle, OU=sslverify, CN=CassandraCA/emailAddress=info@thelastpickle.com
        Subject Public Key Info:
            Public Key Algorithm: rsaEncryption
                Public-Key: (2048 bit)
                Modulus:
                    00:cf:00:4e:a0:20:07:a8:e8:d7:7e:14:a5:7d:ad:
                    38:cc:bd:99:a1:8b:02:ed:9f:27:52:a7:50:59:5b:
                    8e:e9:ee:e6:42:74:30:06:fb:f3:f9:5c:68:93:93:
                    35:4c:26:b1:b7:c6:9e:e3:50:25:ad:e2:43:90:12:
                    68:c4:05:98:e8:9d:74:18:d3:f5:09:a1:71:10:60:
                    aa:48:a0:7d:fe:d7:9a:0c:25:ae:16:e9:5f:ca:b0:
                    8d:70:be:5b:b3:80:8e:33:b8:6e:7e:9f:3d:d8:31:
                    7e:ca:85:cc:be:c5:50:82:99:cb:16:ab:6c:84:ec:
                    9c:5f:cd:ed:b1:58:b5:5e:b3:be:56:41:f8:7e:72:
                    17:5b:9e:78:8f:9c:be:8b:f8:56:f9:b5:90:b5:84:
                    b4:74:e8:da:9e:dd:fd:07:db:85:b3:f2:fd:9e:af:
                    4e:e1:5e:da:23:4f:ec:7b:1b:fa:87:51:86:60:9c:
[ req ]
                    af:00:79:55:8c:b1:50:e9:a8:b0:9f:e3:e4:93:82:
                    77:94:78:f9:6e:ea:7d:6b:41:a5:29:29:d2:1b:70:
                    c3:dd:6d:5d:b7:1b:a4:70:70:af:55:2f:62:b3:dc:
                    93:a7:f8:6c:08:24:44:de:de:67:33:dd:bf:12:73:
                    91:e9:b8:84:60:a5:b2:ba:1f:21:36:fa:0b:5e:dc:
                    d6:0d
                Exponent: 65537 (0x10001)
    Signature Algorithm: sha256WithRSAEncryption
         57:9e:3c:46:96:92:ce:0d:d1:c5:ad:63:d0:60:25:77:83:f2:
         43:78:47:8d:26:80:00:7f:b9:4c:a5:a1:4a:92:23:4c:63:fb:
         ec:1d:a2:35:c7:10:65:4c:75:4f:bb:a2:4b:13:fe:7e:6a:19:
         d0:9c:b2:e9:48:0d:3c:ac:94:8f:65:be:f5:e1:c1:6b:f1:ba:
         d5:06:90:b1:37:4d:ef:88:57:da:3b:08:b5:72:fd:e7:db:0f:
         fe:da:1e:c0:fc:76:c1:3b:00:8e:fd:b5:c2:79:c8:a0:94:93:
         48:3d:94:9d:47:f6:8a:96:04:a2:53:9c:cd:2c:13:d6:e8:b3:
         0d:08:cf:16:ce:5d:37:15:ca:88:4b:ea:d5:5c:5b:a2:c8:fc:
         44:83:fa:7e:78:87:4f:5b:21:e0:03:c8:5f:7e:7a:01:a0:fc:
         f5:22:46:1d:48:3d:e6:12:78:93:b5:74:6f:f6:0e:99:1b:f9:
         44:ea:90:a3:04:cb:cd:9b:1a:36:02:fa:38:be:08:ca:fc:53:
         cd:2a:0b:09:26:0e:45:d1:7d:dc:ea:3d:76:40:e6:58:3c:c1:
         a1:86:b4:6e:10:9c:c9:cf:e2:3c:a2:b0:63:2d:c1:a0:9f:39:
         f8:c1:36:99:a3:b4:02:78:20:05:cb:ae:a4:9b:24:9a:13:84:
         22:43:b1:03

Things to note

  • The line Signature Algorithm: sha256WithRSAEncryption tells us that the certificate has an RSA public key that was generated with a SHA-256 hash.
  • The issuer and the subject will be the same for root certificates.
  • This task is performed locally, not on the remote EC2 instances.

Create the Truststore

Each node needs a truststore to verify requests from other nodes., i.e., to perform client certificate authentication (also referred to as mutual TLS). Because all of our node certificates will be signed by the CA, our nodes can share the truststore.

Ansible Note: The cassandra_ca role located at tlp-ansible/roles/cassandra_ca performs the tasks to generate the truststore.

The keystore file is created with the keytool command by importning the CA certificate. Ansible runs the following command:

$ keytool -keystore /tmp/cassandra-tls-lab/cassandra/truststore.p12 -alias CassandraCARoot -importcert -file /tmp/cassandra-tls-lab/cassandra/ca.crt -keypass cassandra -storepass cassandra -storetype pkcs12 -noprompt
Certificate was added to keystore

We can verify the contents of the truststore with:

$ keytool -v -list -keystore /tmp/cassandra-tls-lab/cassandra/truststore.p12 -storepass cassandra
Keystore type: PKCS12
Keystore provider: SUN

Your keystore contains 1 entry

Alias name: cassandracaroot
Creation date: Jun 5, 2019
Entry type: trustedCertEntry

Owner: EMAILADDRESS=info@thelastpickle.com, CN=CassandraCA, OU=tls_lab, O=The Last Pickle, L=Clayton, ST=NC, C=US
Issuer: EMAILADDRESS=tls_lab@thelastpickle.com, CN=CassandraCA, OU=tls_lab, O=The Last Pickle, L=Clayton, ST=NC, C=US
Serial number: c733567b30c14a81
Valid from: Tue Jun 04 12:37:58 EDT 2019 until: Wed Jun 03 12:37:58 EDT 2020
Certificate fingerprints:
         MD5:  D3:29:03:D3:00:11:A3:C2:B0:E2:B9:5F:A1:CB:C0:F0
         SHA1: 88:18:44:FC:F6:76:A0:2E:A8:D1:02:2E:E8:C5:FF:5D:EB:72:C4:9D
         SHA256: 33:12:AF:A7:9B:29:90:28:E2:01:7C:9C:DA:88:4F:23:7C:1A:DC:90:99:78:FD:79:D5:CB:14:E8:5F:0E:94:67
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 1


*******************************************
*******************************************

Notice that the keystore type is PKCS12. We are using the PKCS #12 (.p12) format for our keystore files instead of the default, JKS. PKCS #12 is an archive file format in cryptography for storing keys and certificates. Because it is a standardized format, it can be used not only in Java but also in other languages.

Create Node Keystores

Each node needs a keystore which will store its certificate. A keystore is needed for one-way TLS authentication while the truststore is needed to enable two-way authentication.

Ansible performs the following tasks:

  • create a directory for storing keys, certificates, and keystores
  • create a config file that is used when generating the key and certificate
  • generate the private key
  • generate the PKCS #12 file, i.e., the keystore
  • generate a Certificate Signing Request (CSR)
  • sign the certificate with the CA
  • import the CA and the signed certificate into the keystore

Ansible Note: Most of this work is implemented in the node_keystores role located at tlp-ansible/roles/node_keystores.

Create Directory and Config File

Ansible will create a directory for each EC2 instance. The directory name will be the public IP address of the machines. You should see something like this:

$ ls -1 /tmp/cassandra-tls-lab/cassandra/
34.219.169.240
54.188.182.229
54.201.133.12
ca.crt
ca.key
ca.srl
ssl.cnf
truststore.p12

Inside each host subdirectory you will find a ssl.cnf that Ansible generated. It should look like:

[ req ]
distinguished_name     = req_distinguished_name
prompt                 = no
output_password        = cassandra
default_bits           = 2048
req_extensions         = v3_req

[ req_distinguished_name ]
C                      = US
ST                     = NC
L                      = Clayton
O                      = The Last Pickle
OU                     = tls_lab
CN                     = 34.219.169.240
emailAddress           = tls_lab@thelastpickle.com

[ v3_req ]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names

[ alt_names ]
IP.1 = 172.31.41.144

Things to note

  • The common name field, CN is set to EC2 instance’s public IP address.
  • The certificate uses a SAN which is specified by the SubjectAltName field. This is necessary for hostname verification.
  • The first alternative IP address, IP.1, is set to the private address of the EC2 instance.
  • This task is performed locally, not on the remote EC2 instances.

Ansible note: The same Jinja2 template that was used to generate the ssl.cnf file for the CA is used here. The variable ssl_extensions_enabled controls whether or not the SAN is included in ssl.cnf.

Generate the RSA Key

We need to generate a private key for each host machine. The key will be used to sign node certificates. Ansible uses the following openssl command to generate the private key for each host:

$ openssl genrsa -des3 -out <host-dir>/node.key -passout pass:cassandra 2048

where <host-dir> might be /tmp/cassandra-tls-lab/cassandra/34.219.169.240.

The -des3 option specifies that triple DES cipher is used to encrypt the key.

Generate the Certificate Signing Request (CSR)

Next Ansible generates the CSR with:

$ openssl req -config <host-dir>/ssl.cnf -new -key <host-dir>/node.key -out <host-dir>/node.csr -passin pass:cassandra

where <host-dir> might be /tmp/cassandra-tls-lab/cassandra/34.219.169.240.

Sign the Certificate

  $ openssl x509 -req -CA /tmp/cassandra-tls-lab/cassandra/ca.crt -CAkey /tmp/cassandra-tls-lab/cassandra/ca.key -in <host-dir>/node.csr -out <host-dir>/node.crt -days 365 -CAcreateserial -extensions v3_req -extfile <host-dir>/ssl.cnf -passin pass:cassandra

where <host-dir> might be /tmp/cassandra-tls-lab/cassandra/34.219.169.240.

We can verify the contents of the certificate with:

$ openssl x509 -in node.crt -text -noout
Certificate:
    Data:
        Version: 3 (0x2)
        Serial Number:
            95:a6:a4:1d:64:4c:21:a2
    Signature Algorithm: sha256WithRSAEncryption
        Issuer: C=US, ST=NC, L=Clayton, O=The Last Pickle, OU=tls_lab, CN=CassandraCA/emailAddress=info@thelastpickle.com
        Validity
            Not Before: Jun  4 21:52:15 2019 GMT
            Not After : Jun  3 21:52:15 2020 GMT
        Subject: C=US, ST=NC, L=Clayton, O=The Last Pickle, OU=tls_lab, CN=34.217.54.220/emailAddress=info@thelastpickle.com
        Subject Public Key Info:
            Public Key Algorithm: rsaEncryption
                Public-Key: (2048 bit)
                Modulus:
                    00:e3:50:56:e8:d3:e7:75:0a:fd:8a:30:14:dd:51:
                    b7:b0:51:c6:d1:d5:61:ed:0d:bd:ae:b6:57:a7:58:
                    5c:cf:50:48:a5:cc:d6:e7:5a:d3:87:19:d8:0c:0c:
                    5f:2b:7d:0f:1f:0d:eb:d8:48:6b:41:79:d8:2c:fe:
                    87:ad:da:c8:8b:54:49:94:36:1e:10:00:a9:99:bd:
                    7d:6e:bd:91:d6:35:70:df:36:aa:74:3b:64:09:e3:
                    1b:03:36:2c:55:8b:26:8b:10:ed:7f:fa:98:89:4d:
                    94:9c:db:3e:65:8e:2c:29:3f:c1:c1:19:1b:6a:8b:
                    c4:9d:29:7a:ac:9e:a8:48:93:6e:45:ba:a1:5d:7b:
                    a0:7c:41:8a:22:4d:1e:47:2b:a9:8d:80:bc:b4:12:
                    df:d5:80:9f:5b:ec:73:94:9d:b4:a5:bd:9a:b6:ff:
                    a0:34:5c:ad:23:b3:51:a7:45:6f:35:a7:2c:78:bc:
                    a4:4a:9e:1d:da:97:8f:57:4c:6f:67:70:72:f9:19:
                    2e:c0:9a:34:f5:23:5c:09:b6:05:c4:4d:4c:a4:e7:
                    59:bf:0c:76:63:92:6e:c9:bc:24:59:85:ac:24:5a:
                    20:54:1c:ac:ef:83:cc:2f:fa:8e:bb:fc:e6:10:6d:
                    5d:57:63:a2:0b:e9:3e:10:83:05:1d:1c:c0:64:fa:
                    df:31
                Exponent: 65537 (0x10001)
        X509v3 extensions:
            X509v3 Basic Constraints:
                CA:FALSE
            X509v3 Key Usage:
                Digital Signature, Non Repudiation, Key Encipherment
            X509v3 Subject Alternative Name:
                IP Address:172.31.7.37
    Signature Algorithm: sha256WithRSAEncryption
         96:52:42:a7:1c:26:10:4b:c6:d9:6e:45:55:2c:a8:43:e7:37:
         13:1e:10:fc:b6:30:f4:11:54:5f:89:db:9e:20:b8:9e:78:1f:
         69:bf:78:74:10:68:c4:4b:7c:40:5d:a6:7c:e9:9f:d5:90:77:
         68:b6:24:33:4d:02:95:83:00:79:43:41:02:8c:4f:ff:de:19:
         16:90:b0:f0:7e:4f:ec:ea:7d:8e:a5:f3:e8:a1:91:07:0d:88:
         b1:71:b6:af:a8:6e:5e:3b:9b:39:36:28:3a:3c:93:d1:bb:07:
         f7:1a:b5:e1:c7:5f:68:45:28:80:f4:14:43:6c:23:f1:4f:49:
         4f:d1:3d:8a:3a:5d:68:e2:13:dc:39:96:43:eb:25:dc:7f:72:
         ec:54:31:a3:2f:ed:e3:70:0d:f7:31:16:54:96:e1:ce:db:c6:
         29:12:d5:b4:15:3d:c6:11:8a:43:58:05:5a:1c:46:72:35:10:
         04:fc:1f:89:f0:d7:82:03:93:c8:1e:9e:20:1a:74:0a:77:99:
         c5:c2:ba:5f:e1:9f:3d:2f:8b:2e:41:df:56:af:cb:20:73:23:
         63:76:d6:ef:c0:e6:7f:04:1a:a6:5c:6d:30:25:20:7c:1e:bd:
         fd:65:e8:39:b1:59:eb:4d:c1:d3:7e:c3:4b:30:11:c1:dd:a2:
         9a:8b:f3:4c

Things to note

  • Make sure that you see the X509V3 extensions section that includes a Subject Alternative Name.
  • The signing request includes -extensions v3_req. Without it the certificate will not include the SAN.
  • This task is performed locally, not on the remote EC2 instances.

Generate the Keystore

Ansible generates the keystore with:

$ openssl pkcs12 -export -out <host-dir>/keystore.p12 -inkey <host-dir>/node.key -in <host-dir>/node.crt -name <host> -passin pass:cassandra -passout pass:cassandra

where <host> might be 34.219.169.240 and <host-dir> would then be /tmp/cassandra-tls-lab/cassandra/34.219.169.240.

Things to note

  • The .p12 file is our keystore.
  • We are using the PKCS #12 file format instead of the default, JKS.
  • This task is performed locally, not on the remote EC2 instances.

Import the CA Certificate

We need to import the CA certificate into the keystore in order to properly establish the trust chain. We do this with the keytool command.

Ansible runs the following for each host machine:

$ keytool -keystore <host-dir>/keystore.p12 -alias CassandraCARoot -import -file /tmp/cassandra-tls-lab/cassandra/ca.crt -noprompt -keypass cassandra -storepass cassandra
Certificate was added to keystore

where <host-dir> might be /tmp/cassandra-tls-lab/cassandra/34.219.169.240.

We can verify the contents of the keystore:

$ keytool -list -keystore keystore.p12 -storepass cassandra
Keystore type: PKCS12
Keystore provider: SUN

Your keystore contains 2 entries

Alias name: 34.217.54.220
Creation date: Jun 5, 2019
Entry type: PrivateKeyEntry
Certificate chain length: 2
Certificate[1]:
Owner: EMAILADDRESS=info@thelastpickle.com, CN=34.217.54.220, OU=tls_lab, O=The Last Pickle, L=Clayton, ST=NC, C=US
Issuer: EMAILADDRESS=info@thelastpickle.com, CN=CassandraCA, OU=tls_lab, O=The Last Pickle, L=Clayton, ST=NC, C=US
Serial number: 95a6a41d644c21a2
Valid from: Tue Jun 04 17:52:15 EDT 2019 until: Wed Jun 03 17:52:15 EDT 2020
Certificate fingerprints:
         MD5:  89:FC:08:8D:15:E7:A0:84:AE:6A:3C:CF:88:B3:E4:24
         SHA1: 0A:BA:A1:EF:A6:9F:A0:77:C8:89:B4:79:78:F5:2F:51:3B:8F:E9:F7
         SHA256: AA:54:78:C8:13:73:65:A3:AF:05:68:8E:45:7F:8E:70:3E:0C:6C:43:3A:17:07:84:D3:88:49:56:0C:61:BC:F5
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

Extensions:

#1: ObjectId: 2.5.29.19 Criticality=false
BasicConstraints:[
  CA:false
  PathLen: undefined
]

#2: ObjectId: 2.5.29.15 Criticality=false
KeyUsage [
  DigitalSignature
  Non_repudiation
  Key_Encipherment
]

#3: ObjectId: 2.5.29.17 Criticality=false
SubjectAlternativeName [
  IPAddress: 172.31.7.37
]

Certificate[2]:
Owner: EMAILADDRESS=info@thelastpickle.com, CN=CassandraCA, OU=tls_lab, O=The Last Pickle, L=Clayton, ST=NC, C=US
Issuer: EMAILADDRESS=info@thelastpickle.com, CN=CassandraCA, OU=tls_lab, O=The Last Pickle, L=Clayton, ST=NC, C=US
Serial number: c9f85a64b75bf080
Valid from: Tue Jun 04 17:52:09 EDT 2019 until: Wed Jun 03 17:52:09 EDT 2020
Certificate fingerprints:
         MD5:  78:A6:01:CA:46:FE:01:F3:A7:AC:EB:62:02:69:37:57
         SHA1: F0:CE:99:21:20:9E:FF:6A:0B:88:D3:DF:62:37:54:22:73:87:D7:CD
         SHA256: D8:63:B2:D7:6D:5E:A1:15:92:0A:17:41:9A:47:E5:64:40:F0:03:FF:7B:00:78:34:D6:AC:7B:F0:2C:2B:D1:65
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 1


*******************************************
*******************************************


Alias name: cassandracaroot
Creation date: Jun 5, 2019
Entry type: trustedCertEntry

Owner: EMAILADDRESS=info@thelastpickle.com, CN=CassandraCA, OU=tls_lab, O=The Last Pickle, L=Clayton, ST=NC, C=US
Issuer: EMAILADDRESS=info@thelastpickle.com, CN=CassandraCA, OU=tls_lab, O=The Last Pickle, L=Clayton, ST=NC, C=US
Serial number: c9f85a64b75bf080
Valid from: Tue Jun 04 17:52:09 EDT 2019 until: Wed Jun 03 17:52:09 EDT 2020
Certificate fingerprints:
         MD5:  78:A6:01:CA:46:FE:01:F3:A7:AC:EB:62:02:69:37:57
         SHA1: F0:CE:99:21:20:9E:FF:6A:0B:88:D3:DF:62:37:54:22:73:87:D7:CD
         SHA256: D8:63:B2:D7:6D:5E:A1:15:92:0A:17:41:9A:47:E5:64:40:F0:03:FF:7B:00:78:34:D6:AC:7B:F0:2C:2B:D1:65
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 1


*******************************************
*******************************************

Things to note

  • This task is performed locally, not on the remote EC2 instances.
  • The keystore should have two entries. The first entry should be a PrivateKeyEntry that includes the node certificate and private key.
  • The first entry should include a SAN with the EC2 instance’s private IP address.
  • The second keystore entry should be a trustedCertEntry that contains the CA certificate.

Copy the Keystore Files

After the keystores have been updated, Ansible copies both the keystore and truststore files to the host machines. The files are stored in /etc/cassandra.

Ansible Note: The copy tasks are defined in tlp-ansible/roles/node_keystores/tasks/copy_keystores.yml.

Configure the Cluster

The Ansible playbooks update /etc/cassandra/cassandra.yaml on each host machine. The server_encryption_options property is set to:

server_encryption_options:
  internode_encryption: all
  keystore: ./conf/keystore.p12
  keystore_password: cassandra
  truststore: ./conf/truststore.p12
  truststore_password: cassandra
  require_client_auth: true
  require_endpoint_verification: true
  store_type: PKCS12

After updating cassandra.yaml, Ansible restarts the node.

Ansible Note: The tasks for updating the configuration and restarting Cassandra are defined in the cassandra_configuration role located at tlp-ansible/roles/cassandra_configuration.

Verify Cluster State

Finally, check that the cluster is in a healthy state:

$ cd cassandra0
$ nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.31.29.239  280.9 KiB  256          67.1%             26a5b2e1-1079-4879-9e3f-31366852f095  rack1
UN  172.31.7.158   281.17 KiB  256          66.4%             0fab0bb3-0326-48f1-a49f-e4d8196e460b  rack1
UN  172.31.42.245  280.36 KiB  256          66.5%             367efca2-664c-4e75-827c-24238af173c9  rack1

Look for this message in the log to verify that internode encryption is enabled:

INFO  [main] 2019-06-06 17:25:13,962 MessagingService.java:704 - Starting Encrypted Messaging Service on SSL port 7001

We now have a functioning cluster that uses two-way (or mutual) TLS authentication with hostname verification.

Ansible Note: The playbooks do not verify the cluster state. That would be a good enhancement!

Conclusion

Transport Layer Security is necessary for securing a cluster. TLS without hostname verification leaves the cluster vulnerable. This article explained how to set up mutual TLS along with hostname verification and walked through all the steps so that you can secure internode communication.

In the next security post, we will look at client-to-node encryption. Stay tuned!

Momentum, Change, and Moving to the Future

For several years DataStax has worked with customers in hybrid and multi-cloud environments. Recently, our roadmap has intensified and accelerated in response to customer demands. We all know that cloud is changing every aspect of business—both for our customers, and for ourselves. We must continue to challenge ourselves to rethink the products we build and develop new ways to take them to market.

To that end, we made some big product announcements in May at our Accelerate conference. Our most exciting news by far was the announcement of DataStax Constellation, our new cloud data platform designed to make it easy for developers to build modern applications, and easier for ops teams to manage those applications—regardless of where they are deployed. These are entirely new opportunities for DataStax and have required us to make changes in every part of our business.

Realizing the full potential of what we can accomplish with Constellation will require adjusting every function to this new landscape. As with all meaningful change, some will be fun and exciting, and some will be incredibly hard. In our case, we need to quickly add many new positions in order to expand our cloud-native offerings. Our belief as a company is, and always has been, that we will grow the business responsibly—and that means that we cannot just add new costs and increase expenses. Therefore, we have made the difficult decision to restructure our organization.

Today, we said a heartfelt goodbye to some employees who have made incredible contributions to our success. This is not something that we take lightly. We said goodbye to people who are valued colleagues and, in many cases, close friends. Where we could, we moved people from previous roles into new teams that are part of the future plan. Unfortunately, at our size that does not work for all positions, and as a result, we needed to remove a little less than 10% of the company as part of the restructuring. This has been a time of tough decisions and we have done everything we can to transition these valued people with dignity and compassion.

Going forward, we are now structured and hiring to successfully bring the Constellation platform and cloud-native offerings to market. Working with our cloud partners, this fall we will deliver as-a-service offerings for both Cassandra and Insights; as well as contribute to open-source developer communities to pioneer advances in Apache Cassandra™, and our own portfolio. To accomplish all of this and as part of the restructuring, we need to hire about 100 new employees to bring the talent needed to deliver our roadmap.

Our next generation of products is off to a good start. At Accelerate, we ran our developer bootcamp using an early, hands-on preview of our first offering on Constellation, DataStax Apache Cassandra as a Service. The result was a tremendous amount of enthusiasm and positive feedback on its power and ease of use. That real-world reaction to Constellation accelerated our own decision to invest even more aggressively.

So we move forward, with deep gratitude to all those who helped us accomplish all that has been done, and with excitement for all that is to come.

Using the Object Mapper with Scala

DataStax's Java driver for Apache Cassandra and DSE has a very useful feature called Object Mapper that greatly simplifies the development of the code that accesses the database. It allows you to map Cassandra tables directly into your business objects, so you don't need to write the numerous calls, like, getInt, getString, etc. to extract data from returned rows and explicitly convert that data into your classes.



A Dozen New Features in tlp-stress

It’s been a while since we introduced our tlp-stress tool, and we wanted to give a status update to showcase the improvements we’ve made since our first blog post in October. In this post we’ll show you what we’ve been working on these last 8 months.

  1. Duration based test runs

    One of the first things we realized was tlp-stress needed a way of specifying the length of time, rather than for a number of operations. We added a -d flag which can be combined with human readable shorthand, such as 4h for four hours or 3d for three days. The feature allows the user to specify any combination of days (d), hours (h), minutes (m), and second (s). This has been extremely helpful when running tests over several days to evaluate the performance during development of Cassandra 4.0.

     tlp-stress run KeyValue -d 3d
    
  2. Dynamic Workload Parameters

    When writing a workload, it became very apparent that we would have to either create thousands of different workloads or make workload parameters customizable to the user. We opted for the latter. Workloads can now annotate their variables to be overridden at runtime, allowing for extremely flexible operations.

    For example, one of our workloads, BasicTimeSeries, has a SELECT that looks like this:

     getPartitionHead = session.prepare("SELECT * from sensor_data_udt WHERE sensor_id = ? LIMIT ?")
    

    The limit clause above was hard coded initially:

     var limit = 500
    

    Changing it required recompiling tlp-stress. This was clearly not fun. We’ve added an annotation that can be applied to variables in the stress workload:

     @WorkloadParameter("Number of rows to fetch back on SELECT queries")
     var limit = 500
    

    Once this annotation is applied, the field can be overridden at run time via the --workload command line parameter:

     $ tlp-stress run BasicTimeSeries --workload.limit=1000
    

    You can find out what workload parameters are available by running tlp-stress info with the workload name:

     $ tlp-stress info BasicTimeSeries
     CREATE TABLE IF NOT EXISTS sensor_data (
                                 sensor_id text,
                                 timestamp timeuuid,
                                 data text,
                                 primary key(sensor_id, timestamp))
                                 WITH CLUSTERING ORDER BY (timestamp DESC)
     Default read rate: 0.01 (override with -r)
    
     Dynamic workload parameters (override with --workload.name=X)
    
     Name  | Description                                    | Type
     limit | Number of rows to fetch back on SELECT queries | kotlin.Int
    
  3. TTL Option

    When tlp-stress finally matured to be usable for testing Cassandra 4.0, we immediately realized that the biggest problem with long running tests was if they were constantly inserting new data, we’d eventually run out of disk. This happens sooner rather than later - tlp-stress can easily saturate a 1GB/s network card. We needed an option to add a TTL to the table schema. Providing a --ttl with a time in seconds will add a default_time_to_live option to the tables created. This is to limit the amount of data that will live in the cluster and allow for long running tests.

  4. Row and Key Cache Support

    On many workloads we found disabling key cache can improve performance by up to 20%. Being able to dynamically configure this to test different workloads has been extremely informative. The nuance here is that the data model matters a lot. A key value table with a 100 byte text field has a much different performance profile than a time series table spanning 12 SSTables. Figuring out the specific circumstances where these caches can be beneficial (or harmful) is necessary to correctly optimize your cluster.

    You can configure row cache or key cache by doing the following when running tlp-stress:

     $ tlp-stress run KeyValue --rowcache 'ALL' --keycache 'NONE' 
    

    We’ll be sure to follow up with a detailed post on this topic to show the results of our testing of these caches.

  5. Populate Data Before Test

    It’s not useful to run a read heavy workload without any data, so we added a --populate flag that can load up the cluster with data before starting a measured workload. Consider a case where we want to examine heap allocations when running a read dominated workload across ten thousand partitions:

     $ tlp-stress run KeyValue --populate 10k -p 10k -r .99 -d 1d
    

    The above will run the KeyValue workload, performing 99% reads for one day, but only after inserting the initial ten thousand rows.

    Workloads can define their own custom logic for prepopulating fields or allow tlp-stress to do it on its own.

  6. Coordinator Only Mode

    This allows us to test a lesser known and used feature known as “coordinator nodes”. Eric Lubow described how using dedicated nodes as coordinators could improve performance at the 2016 Cassandra summit (slides here). We thought it would be useful to be able to test this type of workload. Passing the --coordinatoronly flag allows us to test this scenario.

    We will post a follow up with detailed information on how and why this works.

  7. Pass Extra CQL

    We know sometimes it would be helpful to test a specific feature in the context of an existing workload. For example, we might want to test how a Materialized View or a SASI index works with a time series workload. If we were to create an index on our keyvalue table we might want to do something like this:

    --cql "CREATE CUSTOM INDEX fn_prefix ON keyvalue(value) USING 'org.apache.cassandra.index.sasi.SASIIndex';

    For now, we won’t be able to query the table using this workload. We’re only able to see the impact of index maintenance, but we’re hoping to improve this in the future.

  8. Random Access Workload

    As you can see above, the new workload parameters have already been extremely useful, allowing you customize workloads even further than before. This will allow us to model a new pattern and data model: the random access workload.

    Not every Cassandra use case is time series. In fact, we see a significant number of non time series workloads. A friends list is a good example of this pattern. We may want to select a single friend out of a friends list, or read the entire partition out at once. This new workload allows us to do either.

    We’ll check the workload to see what workload specific parameters exist:

     $ tlp-stress info RandomPartitionAccess
     CREATE TABLE IF NOT EXISTS random_access (
                                partition_id text,
                                row_id int,
                                value text,
                                primary key (partition_id, row_id)
                             )
     Default read rate: 0.01 (override with -r)
        
     Dynamic workload parameters (override with --workload.name=X)
        
     Name   | Description                                                                   | Type
     rows   | Number of rows per partition, defaults to 100                                 | kotlin.Int
     select | Select random row or the entire partition.  Acceptable values: row, partition | kotlin.String
    

    We can override the number of rows per partition as well as how we’ll read the data. We can either select the entire row (a fast query) or the entire partition, which gets slower and more memory hungry as the partition gets bigger. If we were to consider a case where the users in our system have 500 friends, and we want to test the performance of selecting the entire friends list, we can do something like this:

     $ tlp-stress run RandomPartitionAccess --workload.rows=500 --workload.select=partition
    

    We could, of course, run a second workload that queries individual rows if we wanted to, throttling it, having it be read heavy, or whatever we needed that will most closely simulate our production environment.

  9. LWT Locking Workload

    This workload is a mix of a pattern and a feature - using LWT for locking. We did some work researching LWT performance for a customer who was already making heavy use of them. The goal was to identify the root cause of performance issues on clusters using a lot of lightweight transactions. We were able to find some very interesting results which lead to the creation of CASSANDRA-15080.

    We will dive into the details of tuning a cluster for LWTs in a separate post, there’s a lot going on there.

  10. CSV Output

    Sometimes it’s easiest to run basic reports across CSV data rather than using reporting dashboards, especially if there’s only a single stress server. One stress server can push a 9 node cluster pretty hard so for small tests, you’ll usually only use a single instance. CSV is easily consumed from Python with Pandas or gnuplot, so we added it in as an option. Use the --csv option to log all results to a text file in CSV format which you can save an analyze later.

  11. Prometheus Metrics

    Prometheus has quickly become the new standard in metrics collection, and we wanted to make sure we could aggregate and graph statistics from multiple stress instances.

    We expose an HTTP endpoint on port 9500 to let prometheus scrape metrics.

  12. Special Bonus: We have a mailing list!

    Last, but not least, we’ve set up a mailing list on Google Groups for Q&A as well as development discussion. We’re eager to hear your questions and feedback, so please join and help us build a better tool and community.

    This list is also used to discuss another tool we’ve been working on, tlp-cluster, which we’ll cover in a follow up blog post.

We’re committed to improving the tlp-stress tooling and are looking forward to our first official release of the software. We’ve got a bit of cleanup and documentation to write. We don’t expect the architecture or functionality to significantly change before then.

KillrVideo Python Pt. 6— Cassandra with Python: Simple to Complex

Keeping it simple

Geospatial Anomaly Detection (Terra-Locus Anomalia Machina) Part 4: Cassandra, meet Lucene! – The Cassandra Lucene Index Plugin

Massively Scalable Geospatial Anomaly Detection with Apache Kafka and Cassandra

After being Lost in (3D) Space in the last blog, this final part of the geospatial anomaly detection series, we come back down to Earth and try out the Cassandra Lucene Plugin (this was our destination all along, but it’s a long way to Alpha Centauri!). We’ll also reveal how well a subset of the alternative solutions, from this and the previous 1-3 parts, actually worked.

1. The Cassandra Lucene Index

The Cassandra Lucene Index is a plugin for Apache Cassandra:

“… that extends its index functionality to provide near real-time search, including full-text search capabilities and free multivariable, geospatial and bitemporal search. It is achieved through an Apache Lucene based implementation of Cassandra secondary indexes, where each node of the cluster indexes its own data.

Recently we announced that Instaclustr is supporting the Cassandra Lucene Index plugin, and providing it as part of our managed Cassandra service.  The full documentation for the plugin is here. 

The  documents say that “the plugin is not intended to replace Apache Cassandra denormalized tables, indexes, or secondary indexes, as it is just a tool to perform some kind of queries which are really hard to be addressed using Apache Cassandra out of the box features.”

Given that in the past 3 blogs we’ve investigated using all of these approaches to add geospatial queries to Cassandra, it’s finally time to take the Cassandra Lucene Index for a spin. 

How does Lucene work? Lucene is built on a very ancient hand data mining technique which was originally used by Monks in 1230 to build a concordance of the Bible, it helped them find all the verses which had specific words in them (e.g. “Nephilim” – not many verses, “a” – lots of verses). A more recent (1890s) concordance was Strong’s “Exhaustive Concordance”, so-called not because it was heavy (it was, it weighed 3 KG making it bigger and heavier than the book it indexed, the Bible) but because it indexed every word, including “a”: 

Geospatial Anomaly Detection 4 - Exhaustive Concordance of the Bible

This is a good overview of Lucene (Lucene: The Good Parts), which helps make sense of this overview of the Lucene Cassandra plugin.

How is the Lucene Cassandra plugin used? It’s supported as an optional add-on in Instaclustr’s managed service offering, just select it in the Instaclustr Management Console when you are creating a Cassandra cluster and it is automatically deployed for you. 

Lucene indexes are an extension of the Cassandra secondary indexes, so they are created using the CQL CREATE CUSTOM INDEX statement like this:

CREATE CUSTOM INDEX (IF NOT EXISTS)? <index_name>
ON <table_name> ()
USING 'com.stratio.cassandra.lucene.Index'
WITH OPTIONS = <options>

To search using the index, this syntax is used.

SELECT * FROM test WHERE expr(<idx_name>, '<expr>');

Note that <options> and <expr> are JSON objects which actually specify what indexes are created and how they are searched. Also note that you only need to create a single Lucene index for each Cassandra table, as Lucene handles indexing all the requested columns for you.

Lucene indexes are queried using a custom JSON syntax defining the kind of search to be performed:

SELECT ( <fields> | * ) FROM <table_name> WHERE expr(<index_name>, '{
   (filter: ( <filter> )* )?
   (, query: ( <query>  )* )?
   (, sort: ( <sort>   )* )?
   (, refresh: ( true | false ) )?
}');

You can combine multiple filters, queries, and sorts (including sorting by geo distance).  Note that queries are sent to all nodes in the Cassandra cluster. However, filters may find the results from a subset of the nodes, so filters will have better throughput than queries.

2. Geospatial searches

Geospatial Anomaly Detection 4 - Throughput Results - Geospatial searches
How does this help for Geospatial queries? The Lucene plugin has very rich geospatial semantics including support for geo points, geo shapes, geo distance search, geo bounding box search, geo shape search, as well as multiple distance units, geo transformations, and complex geo shapes. We only need to use a subset of these for our geospatial anomaly detection use case. 

The simplest concept is the geo point which is <latitude, longitude> coordinates. Interestingly, under the hood Indexing is done using a tree structure with geohashes (with configurable precision).  Here’s how to create an index over latitude and longitude columns: 

CREATE CUSTOM INDEX test_idx ON test()
USING 'com.stratio.cassandra.lucene.Index'
WITH OPTIONS = {
   'refresh_seconds': '1',
   'schema': '{
      fields: {
         geo_point: {
            type: "geo_point",
            validated: true,
            latitude: "lat",
            longitude: "long",
            max_levels: 8
         
      
   }'
};

This index can be used to search for rows within a distance range from a specific point with this syntax:

SELECT ( <fields> | * ) FROM <table> WHERE expr(<index_name>, '{
   (filter | query): {
      type: "geo_distance",
      field: <field_name> ,
      latitude: <latitude> ,
      longitude: <longitude> ,
      max_distance: <max_distance>
      (, min_distance: <min_distance> )?
   
}');

There’s also a bounding box search with syntax like this. Don’t forget you first have to convert distance to latitude and longitude for the bounding box corners:

SELECT ( <fields> | * ) FROM <table> WHERE expr(<index_name>, '{
   (filter | query): {
      type: "geo_bbox",
      field: <field_name>,
      min_latitude: <min_latitude>,
      max_latitude: <max_latitude>,
      min_longitude: <min_longitude>,
      max_longitude: <max_longitude>
   
}');

Partition-directed searches will be routed to a single partition, increasing performance. However, token range searches without filters over the partitioning column will be routed to all the partitions, with a slightly lower performance. This example fetches all nodes and all partitions so will be slower:

SELECT ( <fields> | * ) FROM <table> WHERE expr(<index_name>, '<expr>’);

But this example fetches a single partition and will be faster:

SELECT ( <fields> | * ) FROM <table> WHERE expr(<index_name>, '<expr>’) AND <partition_key> = <value>;

You can also limit the result set:

SELECT ( <fields> | * ) FROM <table> WHERE expr(<index_name>, '<expr>’) AND <partition_key> = <value> limit <limit>;

Sorting is sophisticated, but note that you can’t use the CQL ORDER BY clause with the Lucene indexes. Here’s what the documentation says:

  1. When searching by filter, without any query or sort defined, then the results are returned in Cassandra’s natural order, which is defined by the partitioner and the column name comparator.
  2. When searching by query, results are returned sorted by descending relevance.
  3. Sort option is used to specify the order in which the indexed rows will be traversed.
  4. When simple_sort_field sorting is used, the query scoring is delayed.

Finally, prefix search is useful for searching larger areas over a single geohash column as you can search for a substring:

SELECT ( <fields> | * ) FROM <table> WHERE expr(<index_name>, '{
   (filter | query): {
      type: "prefix",
      field: <field_name> ,
      value: <value>
   
}');

For example, to search for a 5 character geohash:

filter: {
      type: "prefix",
      field: "geohash",
      value: "every"

Here are the Cassandra table and Lucene indexes we created to evaluate the performance:

CREATE TABLE latlong_lucene (
   geohash1 text,
   value double,
   time timestamp,
   latitude double,
   longitude double,
   Primary key (geohash1, time)
) WITH CLUSTERING ORDER BY (time DESC);

CREATE CUSTOM INDEX latlong_index ON latlong_lucene ()
USING 'com.stratio.cassandra.lucene.Index'
WITH OPTIONS = {
   'refresh_seconds': '1',
   'schema': '{
      fields: {
         geohash1: {type: "string"},
         value: {type: "double"},
         time: {type: "date", pattern: "yyyy/MM/dd HH:mm:ss.SSS"},
         place: {type: "geo_point", latitude: "latitude", longitude: "longitude"}
      
   }'
};

The simplest possible search is just using the partition key and a sort, Where <lat> and <long> are the location of the current event to check for an anomaly.

SELECT value FROM latlong_lucene WHERE expr(latlong_index,
'{ sort: [  {field: "place", type: "geo_distance", latitude: " + <lat> + ", longitude: " + <long> + "}, {field: "time", reverse: true} ] }') and geohash1=<geohash> limit 50;

The prefix search starts from geohash8 and increases the area until 50 rows are found:

SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ filter: [ {type: "prefix", field: "geohash1", value: <geohash>} ] }') limit 50

This query shows how the geo distance search works (distances are increased incrementally until 50 rows are found):

SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ filter: { type: "geo_distance", field: "place", latitude: " + <lat> + ", longitude: " + <long> + ", max_distance: " <distance> + "km"   } }') and geohash1=' + <hash1> + ' limit 50;

Finally, here’s the bounding box search, where the min and max bounding box latitudes and longitudes are computed based on increasing distance until 50 rows are found:

SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ filter: { type: "geo_bbox", field: "place", min_latitude: " + <minLat> + ", max_latitude: " + <maxLat> + ", min_longitude: " + <minLon> + ", max_longitude: " + <maxLon> + " }}')  limit 50;

3. And the Winner is …

We tested a subset of the geospatial implementation alternatives we explored in this geospatial anomaly detection blog series, and ran throughput tests on identical Cassandra, Kafka and Kubernetes clusters to those we used in Anomalia Machina Blog 9.

Some high-level observations are that:

  1. There are many alternative ways of implementing geospatial proximity queries in Cassandra
  2. The geospatial alternatives show significant variation (the graph below shows best, average and worst, the best geospatial results are 62 times better than the worst)
  3. The original non-geospatial results (18,000 TPS) are better than all the geospatial results, and
  4. The best geospatial results only achieve ⅓ the throughput of the original results, the average geospatial results are only 1/10th the original results.   

However, this is not entirely unexpected, as the geospatial problem is a lot more demanding as (1) the queries are more complex being over 2 or 3 dimensions, and requiring calculation and of proximity between locations, and (2) more than one query is typically needed due to increasing the search area until 50 events are found. 

Following are the detailed results.  For some options, we tried both spatially uniform distributions of data, and dense data (clumped within 100km of a few locations), otherwise the default was a uniform distribution. We also tried a dynamic optimisation which kept track of the distance that returned 50 results and tried this distance first (which only works for uniform distributions).

3.1 Cassandra and Geohashes

The following graph shows the results for the “pure” Cassandra implementations (Blog 2) including basic secondary indexes, all using geohashes. For most of the options both 2D and 3D geohashes are supported (Blog 3), with the exception of the option using “inequality over hash8 clustering column” which is 2D only (it relies on ordering which is not maintained for the 3D geohash). The results from worst to best are: 

  • Using 1 index for each geohash length 2D/3D (uniform, 1300 TPS, dense, 1690 TPS)
  • 1 table for each geohash length 2D/3D (uniform, 2700 TPS), inequality over hash8 clustering column (2D only) (uniform, 2700 TPS)
  • 1 table for each geohash length 2D/3D (dense, 3600 TPS)
  • And the winner is, inequality over hash8 clustering column (2D only) (dense, 6200 TPS). 

3.2 Cassandra Lucene Plugin/SASI and latitude/longitude

In this section, we evaluate the Cassandra Lucene Plugin (this blog) and SASI options (5.3). These options enabled the spatial searches over full latitude/longitude coordinates, using either bounding boxes or increasing distances.  The exception was the Lucene prefix option (see above) which used a single geohash and the Lucene prefix operator to search shorter geohashes (and larger areas).

The most obvious thing to note is that the SASI options were the worst:  SASI lat long clustering columns bounded box (dense, 100TPS, and uniform, 120TPS). 

The Lucene options involved combinations of dense or uniform distributions, filtering by distance or bounding box, and sorted or not (in which case the client can sort the results). The results (from worst to best) were:

  • filter by increasing distances, distance sort (uniform data, 200TPS)
  • distance sort, no filter (uniform data, 300TPS)
  • filter by increasing distances, no sort (uniform data, 400TPS)
  • filter by bounded box increasing distance (700TPS)
  • filter by bounded box, automatically optimised for most likely distance (1000TPS), equal with filter by increasing distances, no sort (dense data, 1000TPS)
  • prefix filter (using geohashes, uniform data, 2300TPS, and with dense data 4600TPS).  

Not surprisingly, the best result from this bunch used geohashes in conjunction with the prefix filter and Lucene indexes. There was not much to pick between the bounding box vs. distance filter options, with both achieving 1000 TPS (with distance optimisation or densely distributed data), probably because the geo point indexes use geohashes and both distance search types can take advantage of search optimisations using them.

3.3 Best Geohash Results

The following graph shows the best six results (all using geohashes) from worst to best:

  • The Lucene plugin prefix filter (uniform data, 2300 TPS) performed well, but was beaten by both the
  • Cassandra 1 table per hash 2D/3D (uniform data, 2700 TPS) and Cassandra inequality over a single geohash8 clustering column 2D (uniform, 2700 TPS).
  • The impact of assumptions around the data density are apparent as dense data reduces the number of queries required and increases the throughput dramatically with Cassandra 1 table per hash 2D/3D (dense data, 3600 TPS),
  • and Lucene plugin prefix filter (dense data, 4600 TPS).
  • And the overall winner is Cassandra inequality over a single geohash8 clustering column 2D (dense, 6200 TPS).

However, the worst and best results from this bunch are limited to 2D geospatial queries (but see below) whereas the rest all work just as well for 3D as 2D. The best result is the simplest implementation, using a single geohash as a clustering column. 

Lucene options that use full latitude/longitude coordinates are all slower than geohash options. This is to be expected as the latitude/longitude spatial queries are more expensive, even with the Lucene indexing. There is, however, a tradeoff between the speed and approximation of geohashes, and the power but computational complexity of using latitude/longitude. For simple spatial proximity geohashes may be sufficient for some use cases. However, latitude/longitude has the advantage that spatial comparisons can be more precise and sophisticated, and can include arbitrary polygon shapes and checking if shapes overlap.

Finally, higher throughput can be achieved for any of the alternatives by adding more nodes to the Cassandra, Kafka and Kubernetes (worker nodes) clusters, which is easy with the Instaclustr managed Apache Cassandra and Kafka services. 

4. “Drones Causing Worldwide Spike In UFO Sightings!”

If you really want to check for Drone proximity anomalies correctly, and not cause unnecessary panic in the general public by claiming to have found UFOs instead, then it is possible to use a more theoretically correct 3D version of geohashes. If the original 2D geohash algorithm is directly modified so that all three spatial coordinates (latitude, longitude, altitude), are used to encode a geohash, then geohash ordering is also ensured.  Here’s example code in gist for a 3D geohash encoding which produces valid 3D geohashes for altitudes from 13km below sea level to geostationary satellite orbit. Note that it’s just sample code and only implements the encoding method, a complete implementation would also need a decoder and other helper functions.

The tradeoff between using the previous “2D geohash+rounded altitude” and this approach is that: (1) this approach is correct, but for the previous approach (2) the 2D part is still compatible with the original 2D geohashes (this 3D geohash is incompatible with it), and (3) the altitude is explicit in the geohash string (it’s encoded into and therefore only implicit in the 3D geohash).

Geospatial Anomaly Detection 4 - UFO Sighting

5. Conclusions and Further Information

In conclusion, using Geohashes with a Cassandra Clustering Column is the fastest approach for simple geospatial proximity searches, and also gives you the option of 3D for free, but Cassandra with the Lucene plugin may be more powerful for more complex use cases (but doesn’t support 3D). If the throughput is not sufficient a solution is to increase the cluster size.

The Instaclustr Managed Platform includes Apache Cassandra and Apache Kafka, and the (optional add-on) Cassandra Lucene Plugin can be selected at cluster creation and will be automatically provisioned along with Cassandra.

Try the  Instaclustr Managed Platform with our Free Trial.

The post Geospatial Anomaly Detection (Terra-Locus Anomalia Machina) Part 4: Cassandra, meet Lucene! – The Cassandra Lucene Index Plugin appeared first on Instaclustr.

Geospatial Anomaly Detection (Terra-Locus Anomalia Machina) Part 3: 3D Geohashes (and Drones)

Massively Scalable Geospatial Anomaly Detection with Apache Kafka and Cassandra

In this blog we discover that we’ve been trapped in Flatland, and encounter a Dimension of the Third Kind. We introduce 3D geohashes, see how far up (and down) we can go (geostationary satellite orbits, the moon, and beyond), revisit Cassandra partition sizes, and come back down to earth with a suggested 3D geohash powered hazard proximity anomaly detector application for Drones!

1. What goes Up…

Geospatial Anomaly Detector 3 - Flatland

As if changing from believing that the earth is “flat” to round isn’t hard enough for some people, there’s also the challenge of moving from 2D to 3D!

“Either this is madness or it is Hell.” “It is neither,” calmly replied the voice of the Sphere, “it is Knowledge; it is Three Dimensions: open your eye once again and try to look steadily.”
Edwin A. Abbott, Flatland: A Romance of Many Dimensions

In the world of Flatland there are only 2 dimensions, and the Sphere (who inhabits 3D Spaceland) tries to explain height to the 2D creatures than inhabit it!

So let’s try and imagine a 3D world, so we can go Up:

Geospatial Anomaly Detector 3 - Disney Pixar - Up

And Down! (No, there wasn’t actually a sequel):

Geospatial Anomaly Detector 3 - Disney Pixar - Down

It’s a big jump from a flatworld (e.g. a 2D map) to even just a surface on a round ball (i.e. a sphere). But introducing a 3rd, vertical, dimension has more challenges. Where do you measure elevation from? The centre of the earth? The surface of the earth? And guess what, the earth isn’t actual spherical (white line), it’s an ellipsoid (orange line), and even has dips and bumps (geoid height, red line). This diagram shows the vertical complications of a (cross section of a) real planet:

Geospatial Anomaly Detector 3 - Vertical complications of a planet

Does the vertical dimension matter? Well, there are actually lots of things below the nominal surface of the earth. For example, the deepest borehole, the Kola Superdeep Borehole, has a depth of 12,226m (as the top of the hole proclaims):

Geospatial Anomaly Detector 3 - Kola Superdeep Borehole

It’s 11km to the bottom of the sea – the deepest dive ever was made recently to 10,928 m down in the Mariana Trench, 16m deeper than the previous record set way back in 1960, finding a plastic bag and a few weird fish:

Geospatial Anomaly Detector 3 - Eel at the bottom of Mariana Trench

An arrowtooth eel swims at the bottom of the Mariana Trench.

By comparison, it’s only 4km down the deepest mine, 2.2km down the deepest cave, and the Dead sea is 400m below sea level.

Geospatial Anomaly Detector 3 - Dead Sea

Going in the other direction, altitude above the earth can range over essentially infinite distances (the galaxy is big), but coming back closer to earth there are some more useful altitudes. Planes and some drones operate up to 10km up, jet planes up to 37km, balloons up to 40km, and rocket planes up to 112km!

Geospatial Anomaly Detector 3 - Altitude

Further out, the orbits of satellites can extend more than 30,000 km above sea level:

Geospatial Anomaly Detector 3 - Orbits of a satellite

2. 3D Geohashes

How can we add altitude (and depth) to geohashes? A theoretically correct approach would be to modify the geohash encoding and decoding algorithms to incorporate altitude directly into the geohash string, making it a true 3D geohash (See postscript below). A simpler approach, used by Geohash-36, is to append altitude data onto the end of the 2D geohash. However, given that we want the ability to use different length geohashes to find nearby events, just using the raw altitude won’t work. We also want the 3D geohash cells to have similar lengths in each dimension (including altitude), and in proportion to the geohash length. The approach we used was to round the altitude value to the same precision as the 2D geohash (which depends on the length) and append it to the 2D geohash with a special character as a separator (e.g. “+”/“-” for altitudes above/below sea level). Note that this approach is only approximate as cubes further away from the earth will be larger than those nearer. The function to round an altitude (a) to a specified precision (p) is as follows:

roundToPrecision(a, p) = round( a / p ) x p

The following table shows the 2D geohash lengths, precision, area and volume for the corresponding 3D geohashes:

2D geohash length precision (m) precision (km) area (km^2) volume (km^3)
1 5000000 5000 25000000 1.25E+11
2 1260000 1260 1587600 2000376000
3 156000 156 24336 3796416
4 40000 40 1600 64000
5 4800 4.8 23.04 110.592
6 1220 1.22 1.4884 1.815848
7 152 0.152 0.023104 0.003511808
8 38 0.038 0.001444 0.000054872

Note that we refer to the length of the 3D geohash as the base 2D geohash length, even though the 3D geohash is obviously longer due to the appended altitude characters. For an 8 character 2D geohash, the 3D geohash represents a cube with 38m sides. At the other extreme, for 1 character, the cube is a massive 5000km on each side. For a 2 character geohash this is what the approximate 1,000km^3 cube (representing all the world’s water) looks like;

Geospatial Anomaly Detector 3 - Earth's Water

Some example altitudes encoded as a 3D geohash are as follows:

altitude (m) 3D geohash notes
0 “sv95xs01+0” Dead sea, sea level
-500 “sv95xs0-494” Dead sea, below sea level
50 “sv95xs01+38” Dead sea, 50m altitude
100 “sv95xs01+114” Dead sea, 100m altitude
8848 “tuvz4+9600” Mount Everest, cube with 4.8km sides
8848 “tuvz+0” Mount Everest, 3D geohash cube with 40km sides
35786000 “x+35000000” geostationary satellite orbit, 3D geohash cube with 5,000km sides

Geostationary satellites appear to be fixed over the same position on the earth, which has to be close to the equator, and because they all have to be 35,786 km up it gets a bit crowded (an average 73km separation). So the above geohash example, “x+35000000”, could “net” around 68 satellites in a cube 5,000km on each side.

Because the 3D geohash is based on latitude/longitude it’s an example of a earth-centered rotational coordinate system, as the coordinates remain fixed on the earth’s surface (i.e. the frame of reference is the earth’s surface, and rotates with it). For objects that are in space, it’s easier to use a Earth-centered inertial coordinate system which doesn’t move with the earth’s rotation, so (for objects further away) their location doesn’t change rapidly.

I was curious about what coordinate systems were used for the Apollo moon landings, now almost exactly 50 years ago.

Geospatial Anomaly Detector 3 - Apollo Lunar Mission

The answer was “lots”. All the stages of the rocket, including the command/service module and lunar module each used a local coordinate system. This made it easier for the astronauts to compute course correction burns. But there were also many other coordinate systems (the guidance computer translated between them): Earth basic, Moon basic, Earth launch site, passive thermal control (the “barbecue roll”), preferred lunar orbital insertion, lunar landing site, preferred lunar orbital plane change, LM ascent, preferred trans-Earth injection, and Earth entry.

Geospatial Anomaly Detector 3 - Rendezvous radar and CSM target orientation

And given that everyone knows the earth is not really the centre of the universe, there are sun centred coordinate systems to (and Galactic and Supergalactic!) . This brings us back to full circle to 2D again, as typically distance isn’t relevant for astronomical objects (because it’s unknown or infinite), so stars, black holes, galaxies etc are assumed to be located on the imaginary celestial sphere (the glass sphere in this old example):Geospatial Anomaly Detector 3 - Celestial Globe

A celestial globe enclosing a terrestrial globe, from 1739

3. Do 3D geohashes impact partition cardinality?

By using 3D geohashes I realised that we are increasing the cardinality, which may go some of the way to solving the issue we identified at the end of the Blog 2 on Geospatial Anomaly Detection:Geohashes (2D) With shorter geohash partitions being too large. Doing some calculations of the 3D geohash cardinality reveals that the increase in cardinality depends on the altitude range actually in use. For example, if the altitude range is as far as the geostationary satellite orbits, then the cardinality of the 3D geohash based on the 3 character 2D geohash (green) is now high enough to use as the partition key (previously by itself the 4 character 2D geohash was the shortest geohash that would work, orange).

2D geohash length 2D cardinality 2D cardinality > 100000 3D cardinality (0-30,000km altitude) 3D cardinality > 100000
1 32 FALSE 192 FALSE
2 1024 FALSE 24576 FALSE
3 32768 FALSE 6324224 TRUE
4 1048576 TRUE 786432000 TRUE
5 33554432 TRUE 2.09715E+11 TRUE
6 1073741824 TRUE 2.64044E+13 TRUE
7 34359738368 TRUE 6.78155E+15 TRUE
8 1.09951E+12 TRUE 8.68036E+17 TRUE

However, this is highly sensitive to the subrange of altitudes in use. A 0-100km altitude subrange has no useful impact on the cardinality, but a subrange of 0-400,000km (the approximate distance from the earth to the moon) gives the 3D geohash based on the 2 character 2D geohash high enough cardinality to act as the partition key.

4. A Drone Proximity Anomaly Detection Example

Geospatial Anomaly Detector 3 - Drone Proximity Anomaly Detection Example

To test the 3D geohash with the Anomalia Machina pipeline we generated random 3D location data, but rather than shooting for the moon, we limited it to the low-altitudes legally permitted for drones (a few hundred metres high). At these altitudes the 8 and 7 character geohashes can adequately distinguish locations with nearby altitudes (with a precision of 38m and 152m respectively). This is a realistic simulation of future piloted or automated Drone delivery or monitoring platforms, perhaps working in complex built up environments like this diagram, where there are many hazards that drones need to navigate around:

Geospatial Anomaly Detector 3 - Low altitude airspace

An example of a more concrete drone anomaly detection problem is when the value to be checked is a count of the number of proximity alerts for each drones and location. The proximity alerts could jump if drones started operating near built up or congested areas, near exclusion zones, or near temporary flight restriction areas. The rules for drone proximity are complex as permitted distances from drones to different types of hazards vary (and vary across the world). Some hazards are likely to be in fixed locations (e.g. airports and built up areas), others will themselves be moving as will the drones (e.g. vehicles). We’ll focus on fixed hazards only (moving hazards could be processed by a Kafka streams and state store application), and assume that proximity is measured in 3D. The rules for fixed hazards (in the UK) are (1) > 50m from people and property (2) > 150m from congested areas (3) > 1000m from airports, and (4) > 5000m (a guess) from emergency exclusion zones (e.g. fires).

Geospatial Anomaly Detector 3 - Drone Safety

How could we generate proximity alerts for rules like this? Geohashes are again a good fit due to their natural ability to model locations at different scales. A different Cassandra table can be used for each type of hazard, with a 3D geohash as the partition key, where the partition key for each table is chosen to be based on the 2D geohash length closest to the hazard scale. We can simply query each table with the drone location to check if (and how many) hazards of each type are too close to the drone. I.e. geohash8 for people and property (38m precision), geohash7 for congested areas (152m precision), geohash6 for airports (1220m precision), and geohash5 for emergency exclusion zones (4800m precision). The resulting counts would then be fed to the 3D geospatial anomaly detector for checking.

Also note that if we want to increase the precision of the altitude for Drone operations, perhaps to enable altitude to be accurate to within 1m to ensure Drones are hovering outside the correct floor of a skyscraper (e.g. to support emergency services, maintenance, etc), then a 3D geohash constructed from a 10 character 2D geohash, with altitude rounded to 1m appended, can be used (which is a 1 cubic metre box).


Next blog: We investigate using some of the powerful geospatial features of the Lucene Cassandra index plugin, and implement a subset of the alternatives from all the blogs, and run load tests with the Anomalia Machina system. The winner will be revealed in the next blog.

Postscript – Example 3D Geohash code

If you really want to check for Drone proximity anomalies correctly, and not cause unnecessary panic in the general public by claiming to have found UFOs instead, then it is possible to use a more theoretically correct 3D version of geohashes. If the original 2D geohash algorithm is directly modified so that all three spatial coordinates (latitude, longitude, altitude), are used to encode a geohash, then geohash ordering is also ensured. Here’s example code in gist for a 3D geohash encoding which produces valid 3D geohashes for altitudes from 13km below sea level to geostationary satellite orbit. Note that it’s just sample code and only implements the encoding method, a complete implementation would also need decoder and other helper functions.

The tradeoff between using the previous “2D geohash+rounded altitude” and this approach is that: (1) this approach is correct, but for the previous approach (2) the 2D part is still compatible with the original 2D geohashes (this 3D geohash is incompatible with it), and (3) the altitude is explicit in the geohash string (it’s encoded into and therefore only implicit in the 3D geohash).

Geospatial Anomaly Detector 3 - 3D Hilbert Cube

A 3D Hilbert Cube (the Maths behind geohashes) made from pipes!

Subscribe to our newsletter below or Follow us on LinkedIn or Twitter to get notified when we add the Third Dimension.

The post Geospatial Anomaly Detection (Terra-Locus Anomalia Machina) Part 3: 3D Geohashes (and Drones) appeared first on Instaclustr.

Geospatial Anomaly Detection (Terra-Locus Anomalia Machina) Part 2: Geohashes (2D)

Massively Scalable Geospatial Anomaly Detection with Apache Kafka and Cassandra

In this blog, we continue exploring how to build a scalable Geospatial Anomaly Detector. In the previous blog, we introduced the problem and tried an initial Cassandra data model with locations based on latitude and longitude. We now try another approach, Geohashes, to start with, of the 2D kind, and have some close encounters of another kind (Space vs. Time). Note that Geohashes are easily confused with Geohashing, which is an outdoor adventure sport based on trying to reach a random Geohash, and Geocaching, which is a worldwide treasure hunt to attempt to find geocaches hidden at different locations. So, on with the adventure of the Geohashes kind!

1 Geohashes are “everywhere”

In the previous blog (Geospatial Anomaly Detection: Part 1) we discovered that efficient proximity querying over <latitude, longitude> location coordinates using inequalities is challenging in Cassandra.

Geospatial Anomaly Detector 2 - Proximity Retrieval Ratio

Is there an alternative?  Perhaps the experiments in the previous blog, using a country string as the partition key, give us a valuable clue? The Earth can be divided up in a lot of different  (but somewhat arbitrary) ways such as by countries, continents, or even tectonic plates!

Geospatial Anomaly Detector 2 - Major Tectonic Plates

The major tectonic plates

What if there was a more “scientific” way of dividing the world up into different named areas, ideally hierarchical, with decreasing sizes, with a unique name for each? Perhaps more like how an address works. For example, using a combination of plate, continent, and country, what famous building is located here?

North American Plate ->

Continent of North America ->

U.S.A. ->

District of Columbia ->

Washington ->

Northwest ->

Pennsylvania Avenue ->

1600

“The” White House! Actually, tectonic plates are hierarchical, as there are major, minor and microplates (but only sometimes and at geological timescales).  

It turns out that there are many options for systematically dividing the planet up into nested areas.  One popular method is a Geohash. Geohashes use “Z-order curves” to reduce multiple dimensions to a single dimension with fixed ordering (which turns out to be useful for database indexing). A geohash is a variable length string of alphanumeric characters in base-32 (using the digits 0-9 and lower case letters except a, i, l and o). E.g. “gcpuuz2x” (try pronouncing that!) is the geohash for Buckingham Palace, London. A geohash identifies a rectangular cell on the Earth: at each level, each extra character identifies one of 32 sub-cells as shown in the following diagram.

Geospatial Anomaly Detector 2 - Geohash, 32 sub-cells

Shorter geohashes have larger areas, while longer geohashes have smaller areas. A single character geohash represents a huge 5,000 km by 5,000 km area, while an 8 character geohash is a much smaller area, 40m by 20m. Greater London is “gcpu”, while “gc” includes Ireland and most of the UK. To use geohashes you encode <latitude, longitude> locations to a geohash (which is not a point, but an area), and decode a geohash to an approximate <latitude, longitude> (the accuracy depends on the geohash length).  Some geohashes are even actual English words. You can look up geohashes online (which is an easier type of adventure than geohashing or geocaching): “here” is in Antarctica, “there” is in the Middle East, and “everywhere” is (a very small spot) in Morocco! And here’s an online zoomable geohash map which makes it easy to see the impact of adding more characters to the geohash.

Geohashes are a simpler version of bounding boxes that we tried in the previous blog, as locations with the same geohash will be near each other, as they are in the same rectangular area. However, there are some limitations to watch out for including edge cases and non-linearity near the poles.

To use geohashes with our Anomalia Machina application, we needed a Java implementation, so we picked this one (which was coincidentally developed by someone geospatially nearby to the Instaclustr office in Canberra).  We modified the original Anomalia Machina code as follows. The Kafka key is a <latitude, longitude> pair. Once an event reaches the Kafka consumer at the start of the anomaly detection pipeline we encode it as geohash and write it to Cassandra. The query to find nearby events now uses the geohash. The modified anomaly detection pipeline looks like this:

Geospatial Anomaly Detector 2 - Geospatial Anomaly Detector with Geohashes

But how exactly are we going to use the geohash in Cassandra? There are a number of options.

2 Implementation Alternatives

2.1 Geohash Option 1 – Multiple indexed geohash columns

The simplest approach that I could think of to use geohashes with Cassandra, was to have multiple secondary indexed columns, one column for each geohash length, from 1 character to 8 characters long (which gives a precision of +/- 19m which we assume is adequate for this example).

The schema is as follows, with the 1 character geohash as the partition key, time as the clustering key, and the longer (but smaller area) geohashes as secondary indexes:

CREATE TABLE geohash1to8 (
    geohash1 text,
    time timestamp,
    geohash2 text,
    geohash3 text,
    geohash4 text,
    geohash5 text,
    geohash6 text,
    geohash7 text,
    geohash8 text,
    value double,
    PRIMARY KEY (hash1, time)
) WITH CLUSTERING ORDER BY (time DESC);

CREATE INDEX i8 ON geohash1to8 (geohash8);
CREATE INDEX i7 ON geohash1to8 (geohash7);
CREATE INDEX i6 ON geohash1to8 (geohash6);
CREATE INDEX i5 ON geohash1to8 (geohash5);
CREATE INDEX i4 ON geohash1to8 (geohash4);
CREATE INDEX i3 ON geohash1to8 (geohash3);
CREATE INDEX i2 ON geohash1to8 (geohash2);

In practice the multiple indexes are used by searching from smallest to largest areas. To find the (approximately) nearest 50 events to a specific location (e.g. “everywhere”, shortened to an 8 character geohash, “everywhe”), we start querying with smallest area first, the 8 character geohash, and increase the area by querying over shorter geohashes, until 50 events are found, then stop:

select * from geohash1to8 where geohash1=’e’ and geohash8=’everywhe’ limit 50;
select * from geohash1to8 where geohash1=’e’ and geohash7=’everywh’ limit 50;
select * from geohash1to8 where geohash1=’e’ and geohash6=’everyw’ limit 50;
select * from geohash1to8 where geohash1=’e’ and geohash5=’every’ limit 50;
select * from geohash1to8 where geohash1=’e’ and geohash4=’ever’ limit 50;
select * from geohash1to8 where geohash1=’e’ and geohash3=’eve’ limit 50;
select * from geohash1to8 where geohash1=’e’ and geohash2=’ev’ limit 50;
select * from geohash1to8 where geohash1=’e’ limit 50;

Spatial Distribution and Spatial Density

What are the tradeoffs with this approach? The extra data storage overhead of having multiple geohash columns, the overhead of multiple secondary indexes, the overhead of multiple reads, due to (potentially) searching multiple areas (up to 8) to find 50 events, and the approximate nature of the spatial search due to the use of geohashes.  How likely we are to find 50 nearby events on the first search depends on spatial distribution (how spread out in space events are, broad vs. narrow) and spatial density (how many events there are in a given area, which depends on how sparse or clumped together they are).

For example, broad distribution and sparse density:

Geospatial Anomaly Detector 2 - Broad distribution and sparse density

Broad distribution and clumped:

Geospatial Anomaly Detector 2 - Broad Distribution and Clumped

Narrow distribution and sparse:

Geospatial Anomaly Detector 2 - Narrow distribution and sparse

Narrow distribution and clumped:

Geospatial Anomaly Detector 2 - Narrow distribution and clumped

There’s a potentially nice benefit of using geohashes for location in our anomaly detection application.  Because they are areas rather than highly specific locations, once an anomaly has been detected it’s automatically associated with an area which can be included with the anomaly event reporting. This may be more useful than just a highly specific location in some cases (e.g. for setting up buffer and exclusion zones, triggering more sophisticated but expensive anomaly detection algorithms on all the data in the wider area, etc). This is the flip side of the better-known fact that geohashes are good for privacy protection by virtue of anonymizing the exact location of an individual. Depending on the hash length the actual location of an event can be made as vague as required to hide the location (and therefore the identify) of the event producer.  

Note that in theory, we don’t have to include the partition key in the query if we are using secondary indexes, i.e. this will work:

select * from geohash1to8 where geohash8=’everywhe’ limit 50;

The downside of this query is that every node is involved. The upside, that we can choose a partition key with sufficient cardinality to avoid having a few large partitions, which is not a good idea in Cassandra (see note below).

2.2 Geohash Option 2 – Denormalized Multiple Tables

There are a couple of different possible implementations of this basic idea. One is to denormalise the data and use multiple Cassandra tables, one for each geohash length. Denormalisation by duplicating data across multiple tables to optimise for queries, is common in Cassandra –  “In Cassandra, denormalization is, well, perfectly normal” – so we’ll definitely try this approach.

We create 8 tables, one for each geohash length:

CREATE TABLE geohash1 (
    geohash text,
    time timestamp,
    value double,
    PRIMARY KEY (geohash, time)
) WITH CLUSTERING ORDER BY (time DESC);

… 
CREATE TABLE geohash8 (
    geohash text,
    time timestamp,
    value double,
    PRIMARY KEY (geohash, time)
) WITH CLUSTERING ORDER BY (time DESC);

For each new event, we compute geohashes from 1 to 8 characters long, and write the geohash and the value to each corresponding table.  This is fine as Cassandra is optimised for writes. The queries are now directed to each table from smallest to largest area geohashes until 50 events are found:

select * from geohash8 where geohash=’everywhe’ limit 50;
select * from geohash7 where geohash=’everywh’ limit 50;
select * from geohash6 where geohash=’everyw’ limit 50;
select * from geohash5 where geohash=’every’ limit 50;
select * from geohash4 where geohash=’ever’ limit 50;
select * from geohash3 where geohash=’eve’ limit 50;
select * from geohash2 where geohash=’ev’ limit 50;
select * from geohash1 where geohash=’e’ limit 50;

2.3 Geohash Option 3 – Multiple Clustering Columns

Did you know that Cassandra supports multiple Clustering columns? I had forgotten. So, another idea is to use clustering columns for the geohashes. I.e. Instead of having multiple indexes, one for each length geohash column, we could have multiple clustering columns:

CREATE TABLE geohash1to8_clustering (
    geohash1 text,
    time timestamp,
    geohash2 text,
    gephash3 text,
    geohash4 text,
    geohash5 text,
    geohash6 text,
    geohash7 text,
    geohash8 text,
    value double,
    PRIMARY KEY (geohash1, geohash2, geohash3, geohash4, geohash5, geohash6, geohash7, geohash8, time)
) WITH CLUSTERING ORDER BY (geohash2 DESC, geohash3 DESC, geohash4 DESC, geohash5 DESC, geohash6 DESC, geohash7 DESC, geohash8 DESC, time DESC);

Clustering columns work well for modelling and efficiently querying hierarchically organised data, so geohashes are a good fit. I.e. a single clustering column is often used to retrieve data in a particular order (e.g. for time series data) but multiple clustering columns are good for nested relationships, as Cassandra stores and locates clustering column data in nested sort order. The data is stored hierarchically, which the query must traverse (either partially or completely). To avoid “full scans” of the partition (and to make queries more efficient), a select query must include the higher level columns (in the sort order) restricted by the equals operator. Ranges are only allowed on the last column in the query. A query does not need to include all the clustering columns, as it can omit lower level clustering columns.

Time Series data is often aggregated into increasingly longer bucket intervals (e.g. seconds, minutes, hours, days, weeks, months, years), and accessed via multiple clustering columns in a similar way.  However, maybe only time and space (and other as yet undiscovered dimensions) are good examples of the use of multiple clustering columns? Are there other good examples? Maybe organisational hierarchies (e.g. military ranks are very hierarchical after all), biological systems, linguistics and cultural/social systems, and even engineered and built systems. Unfortunately, there doesn’t seem to be much written about modelling hierarchical data in Cassandra using clustering columns, but it does look as if you are potentially only limited by your imagination.

The query is then a bit trickier as you have to ensure that to query for a particular length geohash, all the previous columns have an equality comparison. For example, to query a length 3 geohash, all the preceding columns (geohash1, geohash2) must be included first:

select * from geohash1to8_clustering where geohash1=’e’ and geohash2=’ev’ and geohash3=’eve’ limit 50;

2.4 Geohash Option 4 – Single Geohash Clustering Column

Another approach is to have a single full-length geohash as a clustering column. This blog (Z Earth, it is round?! In which visualization helps explain how good indexing goes weird) explains why this is a good idea:

“The advantage of [a space filling curve with] ordering cells is that columnar data stores [such as Cassandra] provide range queries that are easily built from the linear ordering that the curves impose on grid cells.”

Got that? It’s easier to understand an example:

CREATE TABLE geohash_clustering (
    geohash1 text,
    time timestamp,
    geohash8 text,
    lat double,
    long double,
    PRIMARY KEY (geohash1, geohash8, time)
) WITH CLUSTERING ORDER BY (geohash8 DESC, time DESC);

Note that we still need a partition key, and we will use the shortest geohash with 1 character for this. There are two clustering keys, geohash8 and time. This enables us to use an inequality range query with decreasing length geohashes to replicate the above search from smallest to largest areas as follows:

select * from geohash_clustering where geohash1=’e’ and geohash8=’everywhe’ limit 50;
select * from geohash_clustering where geohash1=’e’ and geohash8>=’everywh0’ and geohash8 <=’everywhz’ limit 50;
select * from geohash_clustering where geohash1=’e’ and geohash8>=’everyw0’ and geohash8 <=’everywz’ limit 50;
select * from geohash_clustering where geohash1=’e’ and geohash8>=’every0’ and geohash8 <=’everyz’ limit 50;
select * from geohash_clustering where geohash1=’e’ and geohash8>=’ever0’ and geohash8 <=’everz’ limit 50;
select * from geohash_clustering where geohash1=’e’ and geohash8>=’eve0’ and geohash8 <=’evez’ limit 50;
select * from geohash_clustering where geohash1=’e’ and geohash8>=’ev0’ and geohash8 <=’evz’ limit 50;
select * from geohash_clustering where geohash1=’e’ limit 50;

3 Space vs. Time (Potential Partition Problems)

Geospatial Anomaly Detector 2 - Space vs. Time (Potential Partition Problems)

Just like some office partitions, there can be issues with Cassandra partition sizes. In the above geohash approaches we used sub-continental (or even continental scale as “6” is almost the whole of South America!) scale partition keys. Is there any issue with having large partitions like this?

Recently I went along to our regular Canberra Big Data Meetup and was reminded of some important Cassandra schema design rules in Jordan’s talk (Storing and Using Metrics for 3000 nodes – How Instaclustr use a Time Series Cassandra Data Model to store 1 million metrics a minute”)

Even though some of the above approaches may work (but possibly only briefly as it turns out), another important consideration in Cassandra is how long they will work for, and if there are any issues with long term cluster maintenance and node recovery.  The relevant rule is related to the partition key. In the examples above we used the shortest geohash with a length of 1 character, and therefore a cardinality of 32, as the partition key. This means that we have a maximum of 32 partitions, some of which may be very large (depending on the spatial distribution of the data). In Cassandra you shouldn’t have unbounded partitions (partitions that keep on growing fast forever), partitions that are too large (> 100MB), or uneven partitions (partitions that have a lot more rows that others). Bounded, small, even partitions are Good. Unbounded, large, uneven partitions are Bad. It appears that we have broken all three rules.

However, there are some possible design refinements that come to the rescue to stop the partitions closing in on you, including (1) a composite partition key to reduce partition sizes, (2) a longer geohash as the partition key to increase the cardinality, (3) TTLs to reduce the partition sizes,  and (4) sharding to reduce the partition sizes. We’ll have a brief look at each.

3.1 Composite Partitions

A common approach in Cassandra to limit partition size is to use a composite partition key, with a bucket as the 2nd column. For example:

CREATE TABLE geohash_clustering (
    geohash1 text,
    bucket text,
    time timestamp,
    geohash8 text,
    lat double,
    long double,
    PRIMARY KEY ((geohash1, bucket), geohash8, time)
) WITH CLUSTERING ORDER BY (geohash8 DESC, time DESC);

The bucket represents a number of fixed duration time range (e.g. from minutes to potentially days), chosen based on the write rate to the table, to keep the partitions under the recommended 100MB. To query you now have to use both the geohash1 and the time bucket, and multiple queries are used to go back further in time. Assuming we have a “day” length bucket:

select * from geohash_clustering where geohash1=’e’ and bucket=’today-date’ and geohash8=’everywhe’ limit 50;
select * from geohash_clustering where geohash1=’e’ and bucket=’yesterday-date’ and geohash8=’everywhe’ limit 50;
etc

Space vs. Time

But this raises the important (and ignored so far) question of Space vs. Time. Which is more important for Geospatial Anomaly Detection? Space? Or Time?

Geospatial Anomaly Detector 2 - Space-time scales of major processes occurring in the ocean

Space-time scales of major processes occurring in the ocean (spatial and temporal sampling on a wide range of scales)

The answer is that it really depends on the specific use case, what’s being observed, how and why. In the natural world  some phenomena are Big but Short (e.g. tides), others are Small but Longer (e.g. biological), while others are Planetary in scale and very long (e.g. climate change).

So far we have assumed that space is more important, and that the queries will find the 50 nearest events no matter how far back in time they may be.  This is great for detecting anomalies in long term trends (and potentially over larger areas), but not so good for more real-time, rapidly changing and localised problems.  If we relax the assumption that space is more important than time, we can then add a time bucket and solve the partition size problem. For example, assuming the maximum sustainable throughput we achieved for the benchmark in Anomalia Machina Blog 10 of 200,000 events per second, 24 bytes per row, uniform spatial distribution, and an upper threshold of 100MB per partition, then the time bucket for the shortest geohash (geohash1) can be at most 10 minutes, geohash2 can be longer at just over 5 hours, and geohash3 is 7 days.

What impact does this have on the queries? The question then is how far back in time do we go before we decide to give up on the current search area and increase the area? Back to the big bang? To the dinosaurs? Yesterday? Last minute?  The logical possibilities are as follows.

We could just go back to some maximum constant threshold time for each area before increasing the search area. For example, this diagram shows searches going from left to right, starting from the smallest area (1), increasing the area for each search, but with a constant maximum time threshold of 100 for each:

Geospatial Anomaly Detector 2 - Constant time search with increasing space search

Constant time search with increasing space search

Alternatively, we could alternate between increasing time and space.  For example, this diagram shows searches going from left to right, again starting from the smallest area and increasing, but with an increasing time threshold for each:

Geospatial Anomaly Detector 2 - Increasing time search with increasing space search

Increasing time search with increasing space search

However, a more sensible approach (based on the observation that the larger the geohash area the more events there will be), is to change the time threshold based on the space being searched (i.e. time scale inversely depends on space scale). Very small areas have longer times (as there may not be very many events given in a minuscule location), but bigger areas have shorter times (as there will be massively more events at larger scales). This diagram again shows searches increasing from the smaller areas on the left to larger areas on the right, but starting with the longest time period, and reducing as space increases:

Geospatial Anomaly Detector 2 - Decreasing time search with increasing space search

Decreasing time search with increasing space search

3.2 Partition Cardinality

Another observation is that in practice the shorter geohashes are only useful for bootstrapping the system. As more and more data is inserted the longer geohashes will increasingly have sufficient data to satisfy the query, and the shorter geohashes are needed less frequently. So, another way of thinking about the choice of correct partition key is to compute the maximum cardinality. A minimum cardinality of 100,000 is recommended for the partition key. Here’s a table of the cardinality of each geohash length:

geohash length cardinality cardinality > 100000
1 32 FALSE
2 1024 FALSE
3 32768 FALSE
4 1048576 TRUE
5 33554432 TRUE
6 1073741824 TRUE
7 34359738368 TRUE
8 1.09951E+12 TRUE

From this data, we see that a minimum geohash length of 4 (with an area of 40km^2) is required to satisfy the cardinality requirements. In practice, we could, therefore, make the geohash4 the partition key. At a rate of 220,000 checks per second the partitions could hold 230 days of data before exceeding the maximum partition size threshold. Although we note that the partitions are still technically unbounded, so a composite key and/or TTL (see next) may also be required.

3.3 TTLs

A refinement of these approaches, which still allows for queries over larger areas, is to use different TTLs for each geohash length. This would work where we have a separate table for each geohash length. The TTLs are set for each table to ensure that the average partition size is under the threshold, the larger areas will have shorter TTLs to limit the partition sizes, while the smaller areas can have much longer TTLs before they get too big (on average, there may still be issues with specific partitions being too big due if data is clumped in a few locations). For the longer geohashes the times are in fact so long that disk space will become the problem well before partition sizes (e.g. geohash5 could retain data for 20 years before the average partition size exceeds 100MB).

3.4 Manual Sharding

By default, Cassandra is already “sharded” (the word appears to originate with Shakespeare) by the partition key. I wondered how hard it would be to add manual sharding to Cassandra in a similar way to using a compound partition key (option 1), but where the extra sharding key is computed by the client to ensure that partitions are always close to some fixed size. It turns out that this is possible, and could provide a way to ensure that the system works dynamically irrespective of data rates and clumped data. Here’s a couple of good blogs on sharding in Cassandra (avoid pitfalls in scaling, synthetic sharding).  Finally, Partitions in Cassandra do some take effort to design, so here’s some advice for data modelling recommended practices.

 

Next blog: We add a Dimension of the Third Kind to geohashes – i.e. we go Up and Down!

Subscribe to our newsletter below or Follow us on LinkedIn or Twitter to get notified when we add the Third Dimension.

The post Geospatial Anomaly Detection (Terra-Locus Anomalia Machina) Part 2: Geohashes (2D) appeared first on Instaclustr.

Get better price-to-performance with the new AWS M5 instance types on the Instaclustr Managed Platform

Instaclustr is pleased to announce the support for AWS EC2 M5 instance type for Apache Cassandra clusters on the Instaclustr Managed Platform.

The M-series AWS EC2 instances are general purpose compute instances and are quite popular due to their versatile configurations that can cater to most generic application workloads. In case you are wondering what’s the 5 in M5, it’s the generation number. Every few years AWS announces a new generation of instance type for each series as hardware costs become cheaper by the day and economies of scale apply. This 5th generation M-series is much more powerful and cheaper than its predecessor. It’s a no brainer for technology teams to upgrade their cloud deployments to the latest generation instance available and this is an ongoing pursuit with Cloud environments.

In order to assist customers in upgrading their Cassandra clusters from M4 to M5 instance type nodes, Instaclustr technical operations team has built several tried and tested node replacement strategies to provide zero-downtime, non-disruptive migrations for our customers. Read Advanced Node Replacement blog for more details on one such strategy.

Just to compare how well M5s perform compared to M4s, we ran our standard cassandra benchmarking tests. The results showed significant performance gains with M5s compared to M4s. The improvements ranged from 33% for a medium-sized write-only test to a massive 375% for medium-sized read-only test. Below are the benchmarking details.

We had two 3-node clusters, one with m4.large and the other with m5.large instance type nodes. Each node had 250GB EBS backed SSDs attached. Below shows the throughput difference between M4 and M5 for the small sized I/O.

Small writes (insert operations) saw around 81% throughput increase, small reads saw around 47% throughput increase and small reads+writes (mixed workload) saw around 70% throughput increase. (We benchmark throughput achieved for a constant median latency between tests. For details, see our Apache Cassandra certification test plan.)

We noticed that the performance gains increased with increase in IO size. Small I/O test had 0.6kb per row of read or write size. With medium I/O tests, which had a read/write size of 12kb per row, showed several folds better performance for read and mixed workloads but only a meagre improvement in writes (insert operations). Below graph shows the comparison for medium-sized I/O.

Medium writes saw only around 33% throughput increase, whereas medium reads and medium mixed workload saw around 375% and 300% throughput increase respectively.

M5 instance pricing

We have added four variants of M5 instance types added to the platform. One instance of m5.large and 3 instances of m5.xlarge with different storage capacities. All of them come with EBS backed SSD drives. Compared to M4s, there is a slight increase in price. The biggest increase of 23% is for the lowest configuration i.e. the m.large instance. But evidently, from the benchmarks, the price-to-performance ratio is much better for m5.large compared to m4.large. What’s even better for customers is that the higher configs of M5 are only marginally more expensive than their M4 equivalents which means even better price-to-performance.

Below are their specification and pricing (for US East – Ohio). Pricing for rest of the regions is available on Instaclustr Console when you login.

# Instaclustr Instance Type AWS Instance Name CPU Cores Memory Storage Capacity Price/node/month

US East (Ohio)

1 Tiny m5.large 2 8 Gib 250 GB
SSD EBS
$309.18
2 Small m5.xlarge 4 16 Gib 400 GB
SSD EBS
$576.59
3 Balanced m5.xlarge 4 16 Gib 800 GB
SSD EBS
$747.59
4 Bulk m5.xlarge 4 16 Gib 1600 GB
SSD EBS
$1,089.59

To know more about this benchmarking or if you need clarification on when to use M5 instance type for Cassandra, reach out to our Support team (if you are an existing customer), or contact our Sales team.

The post Get better price-to-performance with the new AWS M5 instance types on the Instaclustr Managed Platform appeared first on Instaclustr.

Massive scale Kafka and Cassandra deployment for real-time anomaly detection: 19 Billion events per day

“The limits of the possible can only be defined by going beyond them into the impossible.”

– Arthur C. Clarke

At Instaclustr, we constantly push ourselves to test the limits of the performance and scalability of the open source technologies our Managed Platform offers. Even at massive scale, our platform ensures that the reliability of the system is intact and continues to be available 24×7. After all, this is our promise to our customers – Performance and Reliability at scale.

In the past few months, our Technology Evangelist, Paul Brebner has been relentlessly testing the limits of massive-scale deployment of a data pipeline consisting of Apache Kafka and Apache Cassandra on the Instaclustr Managed Platform in conjunction with an example Anomaly detection application running on a Kubernetes cluster deployed on AWS EKS. Before we get into the details of the experiment, let me brag a bit on our achievements –

The system achieved a peak Kafka writes of 2.3Million/s, while the rest of the pipeline ran at a sustainable anomaly check of 220,000/s.

If you are looking for a business metric, from the business application standpoint, here it is –

The system processed a massive 19 Billion events per day (19 x 10^9).

 

This is nearly 500 times higher throughput with lower real-time latency compared to any previously published benchmarks for Anomaly detection system as far as we know.

With Anomaly detection as an example application, we architected a streaming data pipeline and demonstrated the scalability, performance and cost-effectiveness of the data layer technologies like Apache Kafka and Apache Cassandra on the Instaclustr Managed Platform. The experiment is narrated through a series of 10 blogs along with a white paper (links at the end), each going through different stages of the experiment and providing in-depth discussion on the problem being solved, thought process in architecting the system, alternative choices evaluated, experimentation process, tools, integrations, reporting, and finally how we scaled the system step-by-step to this massive size and recorded massive performance that could cater to the operational needs of any businesses running today. Even the largest e-commerce business’ traffic reported is about ~1 Billion transactions per day. This system can cater to 19 such massive e-commerce businesses. What’s also impressive here is the affordability through the economies-of-scale benefit of the Cloud. We derived that a system like this could process roughly 1.9 Million events per dollar spent (considering cost for infrastructure, managed service, anomaly detection application R&D and ongoing maintenance).

That’s enough of bragging! Let’s look into a bit more detail about the application and the system itself.

The Anomalia Machina Experiment

Anomaly Detection Application

Anomaly detection is a method used to detect unusual events in an event stream. It is widely used in a range of applications such as financial fraud detection, security, threat detection, website user analytics, sensors, IoT, system health monitoring, etc. Streaming data (events) from these applications are inspected for anomalies or irregularities, and when an anomaly is detected, alerts are raised either to trigger an automated process to handle the exception or for manual intervention. The logic to determine if an event is an anomaly depends on the application but, typically, such detection systems look for historically known patterns (that were previously classified as anomalies, for supervised anomaly detection, or that are significantly different to past events, for unsupervised systems) in the streaming data. Anomaly detection systems involve a combination of technologies such as machine learning, statistical analysis, algorithmic optimisation techniques and data layer technologies to ingest, process, analyse, disseminate and store streaming data. When such applications operate at massive scale generating millions or billions of events, they impose significant computational challenges to anomaly detection algorithms and, performance and scalability challenges to data layer technologies.

A simple type of unsupervised anomaly detection is Break or Changepoint analysis. We used a simple CUSUM (CUmulative SUM) algorithm which takes a stream of events and analyses them to see if the most recent events(s) are “different” to previous ones.

Architecture and Technology Choices

We combined Kafka, Cassandra, and the application to form a simple type of “Lambda” architecture, with Kafka and the streaming data pipeline as the “speed layer”, and Cassandra as the “batch” and “serving” layer. Kafka is a good choice for fast scalable ingestion of streaming data. It supports multiple heterogeneous data sources with linearly scalability and supports data persistence and replication by design. It is also a great fit for store-and-forward use-case where a different technology is used as the persistent storage layer like Cassandra which is great for storing high velocity streaming data (particularly time series data) as it’s optimised for both writes and reads providing with linear scalability and reliability by design.

Application Design

The diagram below shows the anomaly detection application design where the main components are the Kafka load generator (Kafka producer), Kafka cluster, Anomaly detection pipeline, and Cassandra cluster. The anomaly detection pipeline consists of two components: (1) The Kafka consumer which reads messages from the Kafka cluster, and (2) the processing stages which, for each event received from the Kafka consumer, it writes to Cassandra, reads historic data from Cassandra, and runs the detection algorithm to check if the event is an anomaly or not:

Anomalia Machina Summary - Anomaly Detection Application Design

Application and Data Pipeline Deployment

We used Instaclustr Managed Platform for automated provisioning, deployment, scaling and monitoring of Kafka and Cassandra clusters. It enables rapid and painless creation of arbitrary sized clusters, and management and monitoring. To automate provisioning, deployment, and scaling of the application, we used AWS EKS. We also used Open Source Prometheus to monitor performance and other metrics, and OpenTracing and Jaeger for distributed tracing. The application was secured with VPC peering from application cluster running on EKS to Kafka and Cassandra clusters running on the Instaclustr Managed Platform.Anomalia Machina Architecture

Experiment Specification and Results

We used the following clusters to achieve our biggest results:

  • Instaclustr managed Kafka cluster – EBS: high throughput 1500 9 x r4.2xlarge-1500 (1,500 GB Disk, 61 GB RAM, 8 cores), Apache Kafka 2.1.0, Replication Factor=3
  • Instaclustr managed Cassandra cluster – Extra Large, 48 x i3.2xlarge (1769 GB (SSD), 61 GB RAM, 8 cores), Apache Cassandra 3.11.3, Replication Factor=3
  • AWS EKS Kubernetes cluster Worker Nodes – 2 x c5.18xlarge (72 cores, 144 GB RAM, 25 Gbps network), Kubernetes Version 1.10, Platform Version eks.3

The system achieved a peak Kafka writes of 2.3Million/s, while the rest of the pipeline ran at a sustainable anomaly checks of 220,000/s – a massive 19 Billion events processed per day (19 x 10^9). It used 574 cores in total, and can easily be scaled up or down. The system was processing 400 events per second per core.

Anomalia Machina Summary - Kafka writes of 2.3million:s

Further Information

For more technical information, see our blog series Anomalia Machina: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra:

  1. Anomalia Machina 1 – Introduction
  2. Anomalia Machina 2 – Automatic Provisioning of Cassandra and Kafka clusters
  3. Anomalia Machina 3 – Kafka Load Generation
  4. Anomalia Machina 4 – Prototype application
  5. Anomalia Machina 5 – Monitoring with Prometheus
  6. Anomalia Machina 6 – Application Tracing with OpenTracing and Jaeger
  7. Anomalia Machina 7 – Kubernetes Cluster Creation and Application Deployment
  8. Anomalia Machina 8 – Production Application Deployment with Kubernetes
  9. Anomalia Machina 9 – Anomaly Detection at Scale
  10. Anomalia Machina 10 –  Final Results

We have published a white paper covering these 10 chapters and you can download it from here Anomalia Machina White Paper.

The sample Open Source Anomalia Machina application code is available from the Instaclustr GitHub.


Further work extending Anomalia Machina POC
We have started extending the Anomalia Machina POC to a few real-world use cases. Specifically, we are building a fraud detector for FSI sector and an anomaly detector for geospatial applications. We have plans to publish more white papers and blog series around these topics.

Watch this space by subscribing to our newsletter, or follow us on Twitter or LinkedIn for more interesting POC stories.

The post Massive scale Kafka and Cassandra deployment for real-time anomaly detection: 19 Billion events per day appeared first on Instaclustr.