What’s New for Search in DSE 6

DataStax Enterprise (DSE) 6 is built on the best distribution of Apache Cassandra™ and DSE 6 Search is built with a production-certified version of Apache Solr™ 6.

Let me provide a quick tour of some of the enhancements and features found in DSE 6 Search. We are continuing with the themes of improving performance and stability as well as eliminating complexity for users from the previous release.

One of the most profound topics for DSE 6 is its Advanced Performance architecture. A large swath of engineering focus was on re-integration of search with the new advanced performance version of Cassandra as part of this release.

The new core architecture created an opportunity for a cleaner redesign of the search indexing pipeline for DSE. The outcome of all of this work is less configuration required to operate search and improved search data consistency that also comes with more throughput. The redesign mechanisms include a more synchronous write path for indexing data with less moving pieces to tune and monitor or back-pressure mechanisms that can be bottlenecks. This is an overall win for DataStax that also has direct benefits for our users by making things faster, more stable, and easier to use.

NodeSync is another big new feature for DSE 6. While no additional engineering efforts were required to integrate with this functionality outside of testing, NodeSync will bring major benefits to DSE Search. Since search data is automatically handled by DSE, repaired stored data means repaired search data — a huge win for consistency for the entire DSE data layer.

Moving on to search-specific features, DSE 5.1 introduced a series of functionalities aimed at making search even more unified with DSE. As we continue on that journey with DSE 6, native CQL queries can now leverage search indexes for a wider array of CQL query functionality and indexing support. Search queries no longer require the use of ‘solr_query’ and CQL queries that require ‘ALLOWED FILTERING’ no longer have that requirement as search indexes will automatically be utilized.

Beyond some of the more basic Boolean queries and non-key lookups, there are couple of new keywords that are supported, and there are particular operators that deserves special attention. With DSE 6 Search, users can now use the ‘IS NOT NULL’  and ‘!=’ operators. Furthermore, users can also leverage the LIKE operator directly against search indexes along with the added benefit of tailoring the search behavior of the operator through configuration.

Let’s take a look at some of the queries we can now run with pure CQL in DSE.

In the DSE Search 5.1 announcement blog, we created a search index that was more suitable for Boolean and lookup queries over full text search. With DSE 6, the default search index configuration has slightly changed to provide functionality more inline with the ANSI SQL’s LIKE operator versus full-text analysis. This requires less processing to generate the data and less index data for this level of search. We’ll refer to the DSE 5.1 blog for creating and configuring a search index on a table.

Start with this simple CQL schema:

CREATE TABLE amazon.metadata (

   asin text PRIMARY KEY,

   also_bought set<text>,

   buy_after_viewing set<text>,

   categories set<text>,

   imurl text,

   price double,

   title text

);

Create a search index on this table:

CREATE SEARCH INDEX IF NOT EXISTS ON amazon.metadata;

Let’s go ahead and try a variety of simple CQL queries that will be labeled if they should or should not be executed based on what we know about CQL data modeling. First is a query on a field that is not part of the primary key. We have a simple partition key so that any query that isn’t on the asin field should fail.

 

cqlsh> SELECT asin, price, title FROM amazon.metadata WHERE price = 18.49;

asin       | price | title

————+——-+————————————————

002560810X | 18.49 | Needlepoint and Pattern: Themes and Variations

(1 rows)

 

We can also use one of the new operators on this field:

 

cqlsh> SELECT asin, price, title FROM amazon.metadata WHERE price != 0 LIMIT 10;

asin       | price | title

————+——–+——————————————————

0022841393 |  83.52 |             Science: A Closer Look – Grade 6

0028028651 |  197.2 |           Business Law with UCC Applications

B00E8M1HQM |   11.6 |

000716467X |   8.89 |                    Emotional Rollercoaster

0007044984 | 192.06 |                             Human Anatomy- Text Only

0007321198 |  45.31 | Collins English Dictionary: 30th Anniversary Edition

002560810X |  18.49 | Needlepoint and Pattern: Themes and Variations

002391341X | 153.33 |                   Descriptive Geometry (9th Edition)

0006392830 |   4.99 |                               Lesias Dream

B004TMC2FQ |   6.37 |

(10 rows)

 

Because we have a simple string index on the title field, we can do exact queries against that field. Of course, if we wanted more flexibility on our search capabilities we can simply reconfigure and rebuild our index to do more advanced search queries:

 

cqlsh> SELECT asin, title FROM amazon.metadata WHERE title = ‘Emotional Rollercoaster’;

asin       | title

————+————————-

000716467X | Emotional Rollercoaster

(1 rows)

 

We can query against collections as well:

 

cqlsh> SELECT count(*) FROM amazon.metadata WHERE categories CONTAINS ‘Crime’;

count

——-

  440

 

Combining conditions becomes as simple as stating them in CQL terms:

 

cqlsh> SELECT asin, title FROM amazon.metadata WHERE categories CONTAINS ‘Books’ AND title = ‘Emotional Rollercoaster’;

asin       | title

————+————————-

000716467X | Emotional Rollercoaster

 

As previously mentioned, there was a slight change to the default configuration of the search index to behave closer to ANSI SQL’s LIKE operator.

Here are a few examples that show that text search is still possible. In the first example, we are looking for any rows that are in the category ‘Books’ and contains the word ‘Emotional’ in the title where Emotional is case sensitive:

 

cqlsh> SELECT asin, title FROM amazon.metadata WHERE categories CONTAINS ‘Books’ AND title LIKE ‘%Emotional%’;

asin       | title

————+————————————————————————————

0007112580 |      Emotional Healing in Minutes: Simple Acupressure Techniques For Your Emotions

000716467X |                                                            Emotional Rollercoaster

0007197772 | Sedona Method: How to Get Rid of Your Emotional Baggage and Live the Life You Want

0028740173 |                                                  The Emotional Life of the Toddler

002921405X |         SHOULDN’T I BE HAPPY?: Emotional Problems of Pregnant and Postpartum Women

 

We can also search by the same parameters as before but instead of containing the word ‘Emotional’, it should start with the word:

 

cqlsh> SELECT asin, title FROM amazon.metadata WHERE categories CONTAINS ‘Books’ AND title LIKE ‘Emotional%’;

asin       | title

————+——————————————————————————-

0007112580 | Emotional Healing in Minutes: Simple Acupressure Techniques For Your Emotions

000716467X |                                                       Emotional Rollercoaster

(2 rows)

 

While the initial search-enabled CQL functionality exposure is limited to existing keywords, this is merely the first phase of search-enabled CQL enhancements to come. There was a lot of work that was done behind the scenes to establish the foundation for potential query optimizers, query analysis, and other, more sophisticated queries in general. Subsequent enhancements will see more search-orientated functionality such as relevance queries, spatial filters, and even facet queries! We know this direction will help make issuing search queries much easier for our users.

Some of the other work that went into this release was focused around easing some of the operational burdens that could arise with misuse of DSE Search.

Most importantly, we’ve made two key changes in an effort to add safety guardrails around the Solr HTTP API with regards to data writes and deletes that should be done in CQL.

First, we’ve disabled the ability to perform writes through the Solr HTTP interface. This has been branded as a very bad practice for quite some time and we were finally able to remove this capability.

And second, we’ve disabled the ability to execute deletes through the Solr HTTP interface which can be properly executed in CQL. This will reduce some of the pitfalls from the system that only cause issues. These changes enforce best practices to be followed through DSE.

More logging was also added around shard replica requests to improve our support’s troubleshooting turnaround times.

Finally, some default index behavior from Cassandra was overridden to improve native repair operations with regards to search indexes to further eliminate operational complexities.

You can download DSE 6 now and try out all that I’ve walked through above. If you’re interested in learning about additional features we introduced in DSE 6, check out our online documentation and these other blog posts:

 

Orientation to Cassandra Nodetool

Nodetool is a broadly useful tool for managing Cassandra clusters. A large percentage of questions concerning Cassandra can easily be answered with a nodetool function.

Having been developed over time by a diverse open source community, the nodetool commands can seem at first glance to be defined within a minimally consistent syntax. On closer inspection, the individual commands can be organized into several overlapping buckets.

The first grouping consists of commands to view (get) or change (set) configuration variables. An example pair is getlogginglevels and setlogginglevel. By default, logging is set to INFO, midway in the available range of ALL, TRACE, DEBUG, INFO, WARN, ERROR, and OFF. Running nodetool getlogginglevels will display the currently set value.

Other get/set (sometimes prefixed as enable/disable) commands can be set either at startup or while Cassandra is running. For example, incremental backups can be enabled in the startup configuration file cassandra.yaml by setting incremental_backups=true. Alternatively, they can be started or stopped using nodetool, with the commands nodetool enablebackup and nodetool disablebackup. In general, though, most configuration values are either set in startup configuration files or set dynamically using nodetool; there is little overlap.

Several nodetool commands can be used to get insight into status of the Cassandra node, cluster, or even data. Two very basic informational commands are nodetool status and nodetool info. Nodetool status provides a brief output of node state (up, down, joining cluster, etc.), IP addresses, and datacenter location. Nodetool info provides a less brief output of key status variables. It is a convenient way to see memory utilization, for example.

Although the tool is named nodetool, not all commands apply to nodes. For example, nodetool describecluster provides information about the cluster — snitch and partitioner type, name, and schema versions. For another example, nodetool netstats provides information about communication among nodes.

The nodetool can not only be used for basic configuration and information; it is also a powerful tool for cluster operations and data management. The operations tasks of shutting down a node within a cluster or doing maintenance on a live node are made easier with commands like nodetool drain (flushes writes from memory to disk, shuts off connections, replays commitlog) and nodetool disablegossip (makes node invisible to the cluster). Data management tasks are made easier with commands like nodetool repair to sync data among nodes (perhaps due to missed writes across the cluster) and nodetool garbagecollect to remove deleted data.

Now that I have provided an orientation to nodetool, in future posts I will describe how to combine various information, set/get, and management commands to do common tasks such as backups, performance tuning, and upgrades.


Learn more about Pythian services for Cassandra.

Apache Kafka “Kongo” Part 4.1: Connecting Kafka to Cassandra with Kafka Connect

Kafka Connect is an API and ecosystem of 3rd party connectors that enables Kafka to be easily integrated with other heterogeneous systems without having to write any extra code. This blog focuses on a use case extending the Kongo IoT application to stream events from Kafka to Apache Cassandra using a Kafka Connect Cassandra Sink.

For an introduction to Kafka Connect see Apache Kafka Connect Architecture Overview.

Here’s the Kongo code and sample connect property files for this blog.

1. The Problem: Streaming Kongo Events to Cassandra

In the previous blog (Apache Kafka “Kongo” Part 3: Kafkafying Kongo – Serialization, One or Many topics, Event Order Matters) we started the “Kafkification” process of the Kongo IoT application by adding some initial Kafka producers, topics and consumers for Sensor and RFID events. This enabled the rules to be checked for sensor events (for Goods in warehouses and trucks), and for RFID load events (to check that Goods are allowed to be transported on the same truck). However, we didn’t do anything as a result of these checks.

Imagine that we have a business requirement to keep track of every violation during the storage and transportation of Goods (for real-time notification), and then when Goods are eventually delivered (and potentially for arbitrary an arbitrary period after delivery) check for violations for each delivered Goods (for auditing, quality control, process improvement etc).

Because Kafka uses an immutable log store we could potentially do all this in Kafka “for free”, with appropriate retention policy settings. Nevertheless, let’s assume we need to persist the violation events in some other system. In practice Kafka may have to be integrated with existing enterprise systems anyway (particularly if it’s being used an integration platform!), so this use case gives us an excuse to investigate how events can be sent from Kafka to another application.

To visualise the high level event flow in the Kongo application we’ll use a Sankey Diagram. Sankey Diagrams visualise flows in systems (they have been used for everything from steam engines to Netflix). You read them from left to right and the vertical dimension is proportional to quantity. The following diagram shows relative event flows in Kongo assuming we add new Kafka producers for Sensor and RFIDLoad Checks that feed to a single new Kafka violations topic, and assumes that every check results in a violation event:

Sankey Diagram built online with http://sankeymatic.com/build/

Notice that the number of output events is much higher than the number of input events. On average, every input event can produce up to 100 output events, and exhibits event “amplification” – for every input event there can potentially be an avalanche of new events produced.  Both the event system (i.e. Apache Kafka) and any external Sink need to be highly scalable. Luckily as Kafka automatically provides buffering, the external system only has to keep up with the average flow rate rather than peaks. Apache Cassandra is designed for high write rates and is linearly scalable so it’s a good choice as a Sink.

2. The Solution: Kafka Connect

Having decided to write events from Kafka topics to Cassandra, how do we do it?  Well, we could write a custom Kafka consumer that reads events from a topic and writes them to Cassandra. Or we could use the part of the Kafka architecture that is specifically designed for scalable and reliable movement of data between Kafka and third party systems, Kafka Connect. Kafka connect featured (immediately below the kafka logo) in the 1st Kafka blog:

Pick‘n’Mix: Cassandra, Spark, Zeppelin, Elassandra, Kibana, & Kafka

What’s significant to realise is that Apache Kafka provides the Kafka Connect API, but it only comes with one basic example file system connector. Useful connectors are provided by 3rd parties to connect to specific systems, or you can write your own connector.  So if you want to connect Kafka to system X (say Cassandra), you have to find or write connectors for that specific system, and in the direction you want to the data to go: “Source connectors” pull data from an external system (the Source) and write it to Kafka topics; “Sink connectors” read data from Kafka topics and push it to an external system (the Sink). Each connector flavour is unidirectional, you can’t go against the flow. Here’s an even simpler diagram showing the high-level Kafka Connect architecture with Source (green) and Sink (blue) data flows:

High Level Kafka Connect Architecture showing Source and Sink Flows

For an introduction to the Kafka Connect Architecture (covering Source and Sink Connectors; Connectors, Plugins, Tasks and Workers; Clusters; and Converters) please see the related blog “Apache Kafka Connect Architecture Overview”.

3. Connecting Kongo to Cassandra

How did we get Kongo working with Kafka connect? The steps taken included: Selecting a Kafka Cassandra Connector; getting the simple Kafka File Sink connector working (standalone); adding a Violations topic to Kongo; connecting Kongo to the file Sink connector; connecting Kongo to the Kafka Cassandra connector (standalone); production deployment with distributed workers.

3.1 We’re going on a Kafka Cassandra Connector Hunt!

One of the challenges with understanding and using Kafka Connect is that only the Connect API is provided by Apache Kafka, and the Apache documentation is therefore limited to the API. Doing a quick Google search it seems that there were at least a few candidates for a Kafka Cassandra Connector. This was good otherwise the rest of the blog would have been about writing one from scratch. What was I looking for in a connector? Not much really!? Here’s my wishlist (yours may be different):

  • A Kafka Cassandra Sink Connector (as we want to write data to Cassandra)
  • Open Source, with an Apache License
  • Works with recent versions of Apache Kafka (1.1.0) and Cassandra (3.11)
  • Shows some recent maintenance activity
  • Works with the minimal amount of extra software
  • Has reasonable documentation
  • Has at least one external article describing or reviewing it
  • Has some documented examples
  • and is maybe certified for Kafka, Cassandra or both (is that a thing?)

To cut a longish story short, we settled on a connector from Landoop, now part of their Lenses platform (see all the connectors that were found in the resources section at the end).

This may seem like an odd choice, as even though it is open source, in theory, it requires you to run both their Lenses platform and a Schema Registry service. However,  after a bit of research, I had a suspicion that it was possible to run the Landoop Kafka Cassandra connector with only the default Apache Kafka connect functionality. See How to use Kafka Connect for Cassandra without Confluent, Kafka Connect – Import Export for Apache Kafka, and “To install Kafka Connect outside of the Lenses Development Docker follow the instructions from Kafka.”

What do you need to install? Assuming we have Kafka and Cassandra installed, you also need to download the Kafka Cassandra Connector and download Stream Reactor (for the connector property files).

However, before jumping headfirst into the Kafka Cassandra Sink connector let’s dip our toes in the water with the Apache Kafka built-in example file connector to get some experience with configuring a connector, modifying Kongo and getting them to work together.

3.2 Kafka File Connector

The Kafka file connector is simple, comes with Apache Kafka, and you can test it out just using the Kafka command line tools.  I followed the instructions for the File Sink Connector here. The File Sink Connector will simply read records from a Kafka topic and append them to a file.

If you have Apache Kafka installed and running that’s all you’ll need to try it out.  You need to find two property files, both are located in config. The property file for the single (standalone) worker is called connect-standalone.properties, and the connector-specific property file is called connect-file-sink.properties.   Copy them and make some changes.  The connector property file is the simplest and has lines to: set the connector name, the connector class, the maximum number of tasks (1 in this case), the name of the file that will act as the sink (where records will be appended), and the name of the Kafka topic that records will be read from:

name=test-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test-sink.txt
topics=connect-test

The worker property file is longer.  It has settings for the Kafka bootstrap servers (which you will need to set if you are running a real Kafka cluster) and Converters. The file only needs two changes from the defaults for this example. Change the default key and value converters from json.JsonConvert to storage.StringConverter:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

To start Kafka connect with the worker and sink property files, run connect-standalone from the command line, with the two property files as the arguments. This starts a Kafka connect worker which in turn starts a connector and a task:

> bin/connect-standalone.sh connect-standalone.properties connect-file-sink.properties

This produces lots of useful info messages and eventually confirmation that a connector and a worker sink task have been started. From now on the worker will be continuously polling the connect-test topic for records, and passing them to the file sink task which will append them to the /tmp/test-sink.txt file. You can check that it’s working by writing some random data to the topic using the console producer like this:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test

>one two three

>I live up a tree

And the file now contains the lines:

one two three

I live up a tree

What if you don’t change the key and value converters and instead leave them as the JSON defaults? Then the worker throws an exception and dies. Why is this? It is expecting JSON formatted data on the Kafka topic. To try JSON data, change the key and value schemas to false:

key.converter.schemas.enable=false

value.converter.schemas.enable=false

You can then send data to the topic which the file sink connector can interpret as JSON:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test

>{"message":"one two three"}

>{"message":"I live up a tree"}

And the file contains the new lines:

{message=one two three}

{message=I live up a tree}

Note that for this example we have not used a Kafka topic key (just a value).

3.3 Adding a Violations Topic to Kongo

The Kongo code was modified to write violation messages to a violation topic in preparation for connecting the topic to Cassandra with a sink connector. As with previous topics we needed classes for the event type and serialization: ViolationEvent and ViolationEventSerializer. I initially modelled these on the RFIDEventSerializer which formatted multiple event fields as comma separated Strings. Unlike the other event types we don’t need a ViolationEventConsumer as we are only writing data to the topics, not consuming it in the application itself.

The main changes were in Goods. I added a new KafkaProducer for ViolationEvent. Recall that there are two sorts of violations in Kongo. The first is if a Goods in a location (warehouse or truck) has any rules violated due to a sensor event value being out of range. If a violation is detected then a new ViolationEvent is created and sent to the topic, with a key being the Goods Id, and the value being a String with the violation category (“sensor”) and details of all the rules that were violated and why.

The second violation event may occur as a result of a RFID load event If a Goods is loaded onto a truck and there is a co-location violation with any of the Goods already loaded (a conflict of categories allowed to be transported together). Again a ViolationEvent is created and sent, with the key equal to the Goods loaded, a violation category of “location” and a String containing the details.

Due to the new use case requiring notifications to be produced and written to Cassandra, for every Goods violation during storage and transport, I noticed that the original version didn’t check or produce warnings for the Goods already loaded on trucks. As the Sankey diagrams in the introduction revealed, a side-effect of this is that there may be a large number of violation events produced for a single RFID load event.

3.4 Connecting Kongo to a File Sink

We can now ramp up our experiments and see if we can get the Kafka File Sink connector working with the new Kongo violations topic to write violations to a file. To do this, just change the topic name in the connect-file-sink.properties file to the new violations topic, and check that the converters are set to StringConverter in connect-standalone.properties:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

Start the connect worker as above and then start the Kongo application (from KafkaRun). Check the contents of the sink file and you’ll see the violation messages.

3.5 Connecting Kongo to a Cassandra Sink

Now let’s try and write the violations events to Cassandra.  What do you need to do? Get the connector and get the property files. Modify the property files. Run the connector.  

The most important links are as follows (see resources below for more):

Copy/move the kafka-connect-cassandra jar file to libs, and the connector property file, cassandra-sink.properties, to config.

One of the major issues to consider is what data format to use, as there seem to be a number of options, and some of them require a Schema Registry service. The simplest approach seemed to be JSON. See the No Schema and a JSON payload instructions for setting the converters in the worker property file (just use a copy of the one used for the file connector worker).  

To try this out without an existing Cassandra cluster running, you can set up a local database with these instructions. The Cassandra connector Sink expects a keyspace and table to exist to write the data into so you can create them first with the cqlsh command line:

CREATE KEYSPACE kongo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE kongo.violations (

goods text,

error text,

PRIMARY KEY (goods, error)

)

Table design is important in Cassandra, and you need to think about what queries you want to use. We assume that for this use case we will want to query if a particular Goods Id has any violations, and find all violations for a Goods Id. This means that the partition key will be the Goods Id. But given that there can be multiple violations for a Goods during its storage and transportation lifetime, we need another key as well, making a compound primary key. The additional column is the clustering key and we use the complete error String message.  This allows a query like this:

select error from violations where goods=’GoodsId’;

Which will return all the errors for GoodsId.

Now configure the cassandra-sink.properties file with Connector class, topic, and Cassandra settings:

connect.progress.enabled=true

name=cassandra-sink-orders

connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector

tasks.max=1

topics=violations-topic

connect.cassandra.kcql=INSERT INTO violations SELECT goods, error from violations-topic

connect.cassandra.contact.points=localhost

connect.cassandra.port=9042

connect.cassandra.key.space=kongo

connect.cassandra.username=me

connect.cassandra.password=nottelling

Note the kcql line. KCQL stands for Kafka Connect Query Language (not to be confused with KSQL which was designed for Kafka stream processing). This tells the connector how to map the Kafka topic data to the Cassandra table data. The topic key is not used by the connector, just the topic value. The KCQL above tells the connector to read data from violations-topic, parse the value and find the goods and error fields, and then write the goods and error values to the goods and error columns of the Cassandra table, called violations in the kongo keyspace.  This requires that the Goods Id appears as data in the topic value, even though it may already be used as the topic key. However, reading the KCQL documentation, it appears to be possible to access and use the topic key value directly (with _key.*).

KCQL may seem like overkill for this simple example, but the claim is that it can enable more complex use cases, e.g for selecting and renaming fields, and even content based delivery – i.e. different events/columns to be sent to different Cassandra tables dynamically.  E.g. sensor violations to one table and rfid load violations to another.

To test everything we have so far, before connecting it to Kongo, start the Cassandra sink connector using the standalone worker (as above for the file sink) with the new connector property file:

> bin/connect-standalone.sh connect-standalone.properties cassandra-sink.properties

And then test it out with some “fake” JSON data:

> bin/kafka-console-producer.sh –broker-list localhost:9092 –topic violations-topic

>{“goods”:”goods1″,”error”:”temperature > 50″}

>{“goods”:”goods1″,”error”:”humidity > 90″}

>{“goods”:”goods2″,”error”:”Bulky and Fragile Goods co-located”}

> etc

Surprisingly (to me) this worked and the data appeared in Cassandra!

The final change required to Kongo for the actual violation events to appear in Cassandra was to modify the ViolationEventSerializer serializer method to turn ViolationEvents into a JSON formatted String, with the two required fields (goods and error with String values), before serializing as a byte array. Given the incremental development and testing approach taken, this also worked and real violation events were written to Cassandra.

Part 4.2 is next and covers distributed workers, and useful Kafka Connect resources.

The post Apache Kafka “Kongo” Part 4.1: Connecting Kafka to Cassandra with Kafka Connect appeared first on Instaclustr.

Using Cassandra Stress to model a time series workload

Motivation

When examining whether Cassandra is a good fit for your needs, it is good practice to stress test Cassandra using a workload that looks similar to the expected workload in Production.

In the past we have examined the richness of features using YAML profiles in Cassandra’s stress tool – if you haven’t seen the previous post or are unfamiliar with YAML profiles in Cassandra stress, I’d recommend checking it out now.

YAML profiles are all fine and dandy when it comes to mixed or general workloads using SizeTieredCompactionStrategy (STCS) or LeveledCompactionStrategy (LCS), but sometimes we may want to model a time series workload using TimeWindowCompactionStrategy (TWCS). How would we do that with the current options available to us in stress? Ideally, we would be able to do such a thing without having to schedule cassandra-stress instances every X minutes.

Native functions

As it turns out, Cassandra has a native function now() that returns the current time as a timeuuid, which is a unique representation of time. Cassandra also ships with the function toTimestamp() that accepts a timeuuid. Putting the two together, we are able to obtain the following result:

So we can use that to our advantage in a YAML profile:

table_definition: |

CREATE TABLE twcstest (

id text,

time timestamp,

metric int,

value blob,

PRIMARY KEY((id), time)

) WITH CLUSTERING ORDER BY (time DESC)

AND compaction = { 'class':'TimeWindowCompactionStrategy', 'compaction_window_unit':'MINUTES', 'compaction_window_size':'20' }

AND comment='A table to see what happens with TWCS & Stress'


columnspec:

- name: id

  size: fixed(64)

  population: uniform(1..1500M)

- name: time

  cluster: fixed(288)

- name: value

  size: fixed(50)


queries:

putindata:

  cql: insert into twcstest (id, time, metric, value) VALUES (?, toTimestamp(now()), ?, ?)

Based on that YAML above, we can now insert time series data as part of our stress. Additionally, please be aware that the compaction_window_unit property has been deliberately kept much smaller than is typical of a normal production compaction strategy!

cassandra-stress user profile=stressspectwcs.yaml n=50000000 cl=QUORUM ops\(putindata=1\) -node file=nodelist.txt -rate threads=500 -log file=insert.log -pop seq=1..1500M

The only snag to be aware of is that stress will insert timestamps rapidly, so you may want to tweak the values a little to generate suitably sized partitions with respect to your production workload.

That’s great, now how do I select data?

Well, intuitively we would just make use of the same helpful native functions that got us out from the tight spot before. So we may try this:

queries:

putindata:

  cql: insert into twcstest (id, time, metric, value) VALUES (?, toTimestamp(now()), ?, ?)

simple1:

  cql: select * from twcstest where id = ? and time <= toTimestamp(now()) and time >= hmmm….

We appear to be a little stuck because selects may not be as straightforward as we had expected.

  1. We could try qualifying with just <=, but then that would be a whole lot of data we select (You aren’t going to do this in Production, are you?), unless id is bucketed…but it isn’t in our situation.
  2. We could try qualifying with just >=, but then nothing will be returned (You aren’t testing a case like this either, surely).

Unfortunately for us, it doesn’t look like Cassandra has anything available to help us out here natively. But it certainly has something we can leverage.

UDFs for the win

User defined functions (UDFs) have been added to Cassandra since 2.2. If you aren’t familiar with them, there are examples of them available in a previous blog post and the official cassandra documentation. Since Cassandra doesn’t have any other native functions to help us, we can just write our own UDF, as it should be.

Typically we may expect to want to select a slice up to a certain number of minutes ago. So we want to write a UDF to allow us to do that.

CREATE OR REPLACE FUNCTION stresscql2.minutesAgo ( arg int )

  RETURNS NULL ON NULL INPUT

  RETURNS bigint

  LANGUAGE java

  AS $$

  return (System.currentTimeMillis() - arg * 60 * 1000); $$;

This UDF is quite self explanatory so I won’t go into too much detail. Needless to say, it returns a bigint of arg minutes ago.

Here is a test to illustrate just to be safe:

Here is our new and improved YAML profile:

simple1:

cql: select * from twcstest where id = ? and time <= toTimestamp(now()) and time >= minutesAgo(5)          

Now, when we execute cassandra-stress with simple1, we can expect just data within a certain time frame instead of selecting the whole partition. We can also keep varying the query to select older data if we like, for example, time >= minutesAgo(600) and time <= minutesAgo(590) for data up to 10 hours ago.

A variation with bucketing

We can also create UDFs that model bucketing behaviour. For example, suppose now we have a schema that has data bucketed, like this:

CREATE TABLE twcstestbucket (

id text,

bucket timestamp,

time timestamp,

metric int,

value blob,

PRIMARY KEY((id, bucket), time)

) WITH CLUSTERING ORDER BY (time DESC)

AND compaction = { 'class':'TimeWindowCompactionStrategy', 'compaction_window_unit':'MINUTES', 'compaction_window_size':'20' }

AND comment='A table to see what happens with TWCS & Stress'

And we want to be able to insert data in 5 minute buckets. We can create UDFs like so:

CREATE OR REPLACE FUNCTION stresscql2.nowInMilliSec()

  RETURNS NULL ON NULL INPUT

  RETURNS bigint

  LANGUAGE java

  AS $$

  return (System.currentTimeMillis()); $$;

CREATE OR REPLACE FUNCTION stresscql2.bucket( arg bigint )

  RETURNS NULL ON NULL INPUT

  RETURNS bigint

  LANGUAGE java

  AS $$

   java.time.ZonedDateTime time = java.time.ZonedDateTime.ofInstant(java.time.Instant.ofEpochMilli(arg),     java.time.ZoneOffset.UTC);

   java.time.ZonedDateTime lastFiveMinutes = time.truncatedTo(java.time.temporal.ChronoUnit.HOURS)

                  .plusMinutes(5 * (time.getMinute()/5));

   return (lastFiveMinutes.toEpochSecond() * 1000);

   $$;

CREATE OR REPLACE FUNCTION stresscql2.randomBucket(lowerbound bigint, upperbound bigint)

  RETURNS NULL ON NULL INPUT

  RETURNS bigint

  LANGUAGE java

  AS $$

  java.time.ZonedDateTime lower = java.time.ZonedDateTime.ofInstant(java.time.Instant.ofEpochMilli(lowerbound), java.time.ZoneOffset.UTC);

  java.util.Random random = new java.util.Random();

  int numberOfBuckets = (int) (upperbound - lowerbound) / (5 * 60 * 1000);

  int targetBucket = random.nextInt(numberOfBuckets);

  return (lower.truncatedTo(java.time.temporal.ChronoUnit.HOURS).plusMinutes(5 * (lower.getMinute()/5)).plusMinutes(5 * targetBucket).toEpochSecond() * 1000);

  $$;

The UDF bucket is quite self explanatory as well – it just returns the nearest 5 minute bucket smaller than arg. This assumes UTC time and 5 minute buckets, but the code can easily be tailored to be more general.

However, our UDF doesn’t understand timeuuid. Which is why we need another helper function, which is the function nowInMilliSec().

The final UDF generates a random bucket based on a lower and upper bound time. The expected input bounds should be in epoch milliseconds. This will help in selecting old/random data bucketed to within 5 minutes in a range.

And now here is our new and modified YAML profile to accommodate our desires of having stress follow a bucketed workload:

putindata:

cql: insert into twcstestbucket (id, bucket, time, metric, value) VALUES (?, bucket(nowInMilliSec()), toTimestamp(now()), ?, ?)

simple1:

cql: select * from twcstestbucket where id = ? and bucket = bucket(nowInMilliSec()) and time <= toTimestamp(now()) and time >= minutesAgo(5)

fields: samerow            

selectold:

cql: select * from twcstestbucket where id = ? and bucket = randomBucket(1524115200000, 1524129600000)

1524117600000 happens to be Thursday, April 19, 2018 5:20:00 AM in GMT time while 1524129600000 happens to be Thursday, April 19, 2018 9:20:00 AM. It can be tailored to suit needs. It’s kind of ugly, but it will do the job.

And there we go: Tap into UDFs to be able to model a TWCS workload with Cassandra stress.

cassandra-stress user profile=stressspectwcs.yaml n=50000000 cl=QUORUM ops\(putindata=5,simple1=5,selectold=1\) -node file=nodelist.txt -rate threads=500 -log file=mixed.log -pop seq=1..1500M

There’s always an option of writing your own client and using that to perform stress instead, with the obvious benefit that there’s no need to write UDFs and you have control over everything. The downside is that you would have to write code that includes rate limiting and reporting of metrics whereas cassandra stress is the stressing tool that comes with Cassandra out of the box and has very rich statistics, down to latency for each query.

The post Using Cassandra Stress to model a time series workload appeared first on Instaclustr.

Apache Kafka Connect Architecture Overview

Kafka Connect is an API and ecosystem of 3rd party connectors that enables Apache Kafka to be scalable, reliable, and easily integrated with other heterogeneous systems (such as Cassandra, Spark, and Elassandra) without having to write any extra code. This blog is an overview of the main Kafka Connect components and their relationships. We’ll cover Source and Sink Connectors; Connectors, Plugins, Tasks and Workers; Clusters; and Converters.

For an example of how to use Kafka Connect see Apache Kafka “Kongo” Part 4.1 and 4.2: Connecting Kafka to Cassandra with Kafka Connect.

1. Source and Sink Connectors

At a high level, “Source connectors” pull data from an external system (the Source) and write it to Kafka topics. “Sink connectors” read data from Kafka topics and push it to an external system (the Sink). Each connector flavour is unidirectional, you can’t go against the flow. Here’s a simple diagram showing the high level Kafka Connect architecture with Source (green) and Sink (blue) data flows:

High Level Kafka Connect Architecture showing Source and Sink Flows

2. Connectors, Plugins, Tasks & Workers

There are three main components to the Kafka Connect API, each with a different role: Connectors, Tasks and Workers.

Connectors are either Source or Sink Connectors, and are responsible for a some of the Task management, but not the actual data movement.

Tasks come in two corresponding flavours as well, Source and Sink Tasks. A Source Task will contain custom code to get data from the Source system (in the pull() method) and uses a Kafka producer which sends the data to Kafka topics. A Sink Task uses a Kafka consumer to poll Kafka topics and read data, and custom code to push data to the Sink system (in the put() method). Each Sink Task has a thread, and they belong to the same consumer group for load balancing.

The components work together like this (with inspiration from “Kafka: The Definitive Guide”):

Connector “Plugins” (Collections of Connectors and Tasks)

A Connector Plugin is a collection of Connectors and Tasks deployed to each Worker.

Connector

Connectors are responsible for the number of tasks, splitting work between tasks, getting configurations for the tasks from the workers and passing it to the Tasks. E.g. to decide how many tasks to run for a Sink, a Connector could use the minimum of max.tasks set in the configuration and the number of partitions of the Kafka topic it is reading from). The workers actually start the Tasks.

Tasks

Tasks are responsible for getting data into and out of Kafka (but only on the Source or Sink side, the Workers manage data flow to/from Kafka topics). Once started, Source Tasks poll Source systems and get the data that the Workers send to Kafka topics, and Sink Tasks get records from Kafka via the Worker, and write the records to the Sink system.

Workers

Workers are the processes that execute the Connectors and Tasks. They handle the REST requests that define connectors and configurations, start the connectors and tasks and pass configurations to them. If using distributed workers, and a worker process dies, then the connectors and tasks associated with the failed worked will be taken over and load balanced among the remaining workers.

3. Kafka Connect Clusters

A Kafka Connect Cluster has one (standalone) or more (distributed) Workers running on one or multiple servers, and the Workers manage Connectors and Tasks, distributing work among the available Worker processes. Note that Kafka Connect does not automatically handle restarting or scaling of Workers, so this must be handled with some other solution.

The following diagram shows the main relationships and functions of each component in a connect cluster. A Kafka connect cluster can be run on one or more servers (for production these will be separate to the servers that the Kafka Brokers are running on), and one (but potentially more) workers on each server. Data movement is shown with green lines:

Apache Kafka Connect Architecture UML Diagram

4. Kafka Connect Converters

Just like Catalytic Converters for cars, converters are also a key part of the Kafka connector pipeline! I initially found converters perplexing as Kafka consumers and producers already have (De-)Serializers. Are converters the same or different?  Kafka doesn’t know anything about the data format of topic keys and value, it just treats them as byte arrays. So consumers and producers need to be able to convert objects to and from byte arrays, and that’s exactly what the (De-)Serializers do.

Doing some more research on Converters I found that the converter interface docs say:

“The Converter interface provides support for translating between Kafka Connect’s runtime data format and byte. Internally, this likely includes an intermediate step to the format used by the serialization layer.”

I also found that Converter has fromConnectData() and toConnectData() method that must be implemented for converting byte arrays to/from Kafka Connect Data Objects.  Connect “Data Objects” have schemas and values, and a SchemaBuilder which provides a fluent API for constructing Schema objects. Schemas are optional to support cases with schema-free data. ConnectRecords (subclasses SinkRecord and SourceRecord) are analogous to Kafka’s ConsumerRecord and ProducerRecord classes, and contain the data read from or written to Kafka.

In conclusion, here’s how Sources, Tasks, Converters, Topics, (De-)Serializers and Sinks fit together to give a complete end-to-end Kafka data pipeline:

Complete end-to-end Kafka Data Pipeline

Finally, one nice feature of the Kafka Connect architecture is that because converters are decoupled from connectors, you can reuse any Kafka Connect Converter with any Kafka Connect Connector.

The post Apache Kafka Connect Architecture Overview appeared first on Instaclustr.

Scylla Release 2.0.4

release

The Scylla team is pleased to announce the release of Scylla 2.0.4, a bugfix release of the Scylla 2.0 stable branch. Release 2.0.4, like all past and future 2.x.y releases, is backward compatible and supports rolling upgrades.  Please note the latest stable open source release is Scylla 2.1.3, and you are encouraged to upgrade to it.

Related Links:

Bugs Fixed in this Release:

    • Streaming sender-side failures were not propagated to the receiving side. #1743
    • Streaming many column families at the same time generated many small sstables on the receiving side. #3065
    • Streaming may overload a new node which fails to boot as a result. #3258, 3311
    • CQL prepared statements are now properly evicted, thus preventing server OOM when preparing many unique statements. #2747
    • Authentication requests might return expired permission from cache. #2590
    • Dropping a keyspace with a User Defined Type(UDT) resulted in an error: “InvalidRequest: Error from server: code=2200 [Invalid query] message=”Unknown type…”. #3068
    • Scylla aborted with an “Assertion `end >= _stream_position’ failed” exception. This occurred when querying a partition with no clustering ranges (happened on a counter table with no live rows) which also didn’t have static columns. #3304
    • In some rare cases when one or more nodes were down, a Scylla gossip bug caused Scylla to exit with a segmentation fault. #3299
    • In some cases, a removed node (removed using nodetool) failed to bootstrap. #3331
    • A TLS session can be closed while an IO continuation is still in flight, causing a “system:32, Broken pipe” error #3358
    • In a case where gossip information is not available yet, for example during a cluster init, nodetool gossipinfo may fail with NullPointerException. #3330
    • sstableloader does not support User Defined Type(UDT). scylla-tools-java #57
    • sstableloader cannot bind prepared statement for a table using “date” type. scylla-tools-java #59

The post Scylla Release 2.0.4 appeared first on ScyllaDB.

Introducing DataStax Bulk Loader

With DataStax Enterprise (DSE) 6 we’ve introduced one of the most requested features for DSE, a bulk loading (and unloading) utility called the DataStax Bulk Loader (abbreviated as DSBulk).

Loading data into a database shouldn’t be something someone spends a lot of time thinking about or doing. There should just be a tool that is simple, fast, robust, and flexible. And that’s what DSBulk is! It’s as much as 4x faster than loading data via COPY commands and community tools.

What’s the Problem?

There are a number of common use cases that need to either put data into the database in bulk, or extract data out of the database in bulk. Here are just a few:

  • A new user just installed DSE and wants to load a large data set they’re familiar with so they can explore DSE.
  • In order to migrate an application to use DSE, the existing data needs to be moved from the legacy system into DSE.
  • In order to make an offline copy of the database, the data in DSE needs to be exported to local files.
  • A production workflow has data arrive from disparate locations into a file system on one machine, which then needs to be loaded into DSE.

What all of these cases have in common is that they are trying to take a file, or set of files, on their local machine and efficiently load them into DSE. Or, they are trying to do the reverse, efficiently unload data in DSE onto the local machine.

The goal is simple: The tool should be:

  • Easy to use.
  • Able to support common incoming data formats.
  • Able to support multiple field formats, such as dates and times.
  • Able to support advanced security configurations.
  • Able to gracefully handle badly parsed data and database insertion errors.
  • Able to report on the status and completion of loading tasks, including summary statistics (such as the load rate).
  • Efficient and fast.

For all the things it can do for data ingestion, it should be able support the analogous features for data extraction.

DataStax Bulk Loader

The first thing to point out, right up front, is that while bulk loading is by far the most dominant use case, DataStax Bulk Loader was also designed to equally cover the bulk unloading use case. A core tenet of the design is that for every loading option there is an analogous unloading option, and vice versa, resulting in a very easy-to-understand tool that can be applied to a variety of use cases.

DSBulk is a command-line tool for both Linux and Windows environments. It’s included in DataStax Enterprise and DataStax Basic, and also available as a standalone download. It supports all DSE Advanced Security features, including Kerberos and password authentication, as well as SSL security. It loads data up to 4x faster than cqlsh’s COPY command, and faster than existing community tools.


Figure 1: Load Performance of DSBulk vs Cqlsh

DSBulk is designed to load files as they’re presented into existing database tables. That is, DSBulk uses existing tables and will not create new tables. To create tables in DSE, other tools should be used, such as the CQL shell tool, cqlsh.

Additionally, DSBulk does not perform any data transformations, except for interpreting input data, such as converting a string representation of a date into a CQL data type. Operations such as converting strings to lowercase, arithmetic on input columns, or filtering out rows based on some criteria, are not supported. To accomplish these tasks, users should leverage other tools, such as DSE Analytics or scripting languages like Bash, Perl, or Python. DSBulk can accept data on STDIN, so users can pipe the output from the transformation step directly into DSBulk for loading. Analogously, DSBulk can export data to STDOUT, which can be piped into a transformation step directly, as well.

DSBulk supports input data in JSON or delimited format from a single file or a directory of files (you can even provide a pattern for the filename, such as “/path/to/load/input*.csv”). In both cases, there are a number of input parsing options that can be set, enabling input in a wide variety of formats. These include setting the delimiter (e.g., comma), the date format string, the decimal delimiter (e.g., to support European formats), the string equivalent of NULL, etc. Users can also specify the mapping from input fields to database columns in a flexible manner.

But DSBulk goes beyond simply loading the data in files. The user can set a time-to-live on the data being inserted, or add the current timestamp to the data as a column of input. Advanced users can provide specific CQL to be used for the insert, allowing for complex loading scenarios. This is even more useful for taking data out, as the user can specify a subset of the data, rows or columns, to be unloaded.

As previously mentioned, DSBulk supports loading data to DSE and DataStax Basic, and supports unloading data from DSE, DataStax Basic, and CQL-driver-compatible sources such as open-source Apache CassandraTM.

One note with respect to the other options that exist to load data into DSE: another approach to loading data would be to leverage DSE Analytics’ integrated Apache SparkTM. The benefit here is that multiple processes will be loading data in parallel to DSE. The only requirement in this situation is that the data needs to be in a distributed file system, such as DSEFS or Amazon S3. If the data does reside in those locations, DSE Analytics may be a more efficient approach. But for data that exists in one place, such as on a laptop or a server in the data center, DSBulk is the right choice.

Simple Task, Robust Solution

Data loading is not something that a user should have to spend a lot of time thinking about, and with DSBulk you no longer have to. It’s an efficient, flexible, easy-to-use tool that excels at the simple task of loading and unloading data.

DSBulk is available as a standalone download or as part of DSE and DataStax Basic. You can download it from the DataStax download page and read the documentation for more information.

 

Cassandra 4.0 Data Center Security Enhancements

Apache Cassandra versions 3.x and below have an all or nothing approach when it comes the datacenter user authorization security model. That is, a user has access to all datacenters in the cluster or no datacenters in the cluster. This has changed to something a little more fine grained for versions 4.0 and above, all thanks to Blake Eggleston and the work he has done on CASSANDRA-13985.

Cassandra 4.0 Data Centers Are On A Role

The Cassandra 4.0 feature added via CASSANDRA-13985 allows an operator to restrict the access of a Cassandra role to specific datacenters. This new shiny feature is effectively datacenter authorization for roles and will help provide better security and protection for multi-datacenter clusters.

Consider the example scenario where a cluster has two datacenters; dc1 and dc2. In this scenario datacenter dc1 backs a web server application that performs Online Transaction Processing, and datacenter dc2 backs an analytics application. The web server application could be restricted via a role to access dc1 only and similarly, the analytics application could be restricted via a role to access dc2 only. The advantage here is that it minimises the reach that each application has to the cluster. If the analytics application was configured incorrectly to connect to dc1 it would fail, rather than quietly running and increasing the load on the dc1 nodes.

The behaviour of the new datacenter authorization feature can be controlled via the cassandra.yaml file using the new setting named network_authorizer. Out of the box it can be set to one of two values:

  • AllowAllNetworkAuthorizer - allows any role to access any datacenter effectively disabling datacenter authorization; which is the current behaviour.
  • CassandraNetworkAuthorizer - allows the ability to store permissions which restrict role access to specific datacenters.

Important Notes

  • For the network_authorizer setting work when set to CassandraNetworkAuthorizer, the authenticator setting must be set to PasswordAuthenticator. Otherwise, the node will fail to start.
  • When enabling any authorization feature in Cassandra including this one, always increase the system_auth keyspace replication factor. Failure to do this may result in being locked out of the cluster!
  • Further values can be added for custom behaviour by implementing the INetworkAuthorizer interface.
  • Apache Cassandra 4.0 will ship with network_authorizer set to a value of AllowAllNetworkAuthorizer in the cassandra.yaml file. This is similar to the existing authorizer setting in Cassandra where no authorization restrictions are applied by default.

Issuing A Statement

When network_authorizer is set to CassandraNetworkAuthorizer, the CQL syntax can be used to set the datacenter access for a role in a cluster. To help with the setting of permissions in CQL, its keyword vocabulary has been extended to include the clauses ACCESS TO ALL DATACENTERS and ACCESS TO DATACENTERS. Both clause can be added to CQL ROLE statements when either creating or altering a role.

To create a role that has access to all datacenters in a cluster use the ACCESS TO ALL DATACENTERS clause. For example:

CREATE ROLE foo WITH PASSWORD = '...' AND LOGIN = true AND ACCESS TO ALL DATACENTERS;

Similarly a role can be altered to have access to all datacenters. For example:

ALTER ROLE foo WITH ACCESS TO ALL DATACENTERS;

To create a role that is restricted to specific datacenters use the clause ACCESS TO DATACENTERS followed by a set containing the datacenters the role is authorized to access. The datacenter names are literal values i.e. quoted and comma separated. For example, use the following CQL to restrict the access of a role to datacenters dc1 and dc3 only:

CREATE ROLE foo WITH PASSWORD = '...' AND LOGIN = true
    AND ACCESS TO DATACENTERS {'dc1', 'dc3'};

Similarly a role can be altered to have restricted access. For example:

ALTER ROLE foo WITH ACCESS TO DATACENTERS {'dc1', 'dc3'};

If the ACCESS TO DATACENTERS {...} clause is omitted from a CREATE ROLE command, then the new role will have access to all data centers in the cluster. In this specific case, it is equivalent to adding the ACCESS TO ALL DATACENTERS clause on the CREATE ROLE command.

Simple Example

Here is a quick demo of the feature in action. The following demo uses ccm to launch a cluster running the trunk version of Apache Cassandra commit Id 2fe4b9d. The cluster will have two datacenters with a single node in each, and the network_authorizer feature will be enabled on each node. The scripts to set up ccm and cluster are included inline as well.

Set up ccm to use the local build of commit 2fe4b9d for the Cassandra libraries, by running the following script.

#!/bin/bash

set -e

if [ -z "${1}" ]
then
  echo "Apache Cassandra repository path required."
  exit 1
fi

CCM_CASSANDRA_VERSION="4.0.0"
CCM_CASSANDRA_REPOSITORY_PATH=".ccm/repository/${CCM_CASSANDRA_VERSION}"

CASSANDRA_DIR_PATH=${1}
CASSANDRA_SUB_DIR_LIST="bin build conf lib pylib tools"

echo "Building CCM ${CCM_CASSANDRA_VERSION} repository"
mkdir -p ~/${CCM_CASSANDRA_REPOSITORY_PATH}

echo ${CCM_CASSANDRA_VERSION} > ~/${CCM_CASSANDRA_REPOSITORY_PATH}/0.version.txt

for dir_name in ${CASSANDRA_SUB_DIR_LIST}
do
  echo "Copying directory ${CASSANDRA_DIR_PATH}/${dir_name} to CCM ${CCM_CASSANDRA_VERSION} repository"
  mkdir -p ~/${CCM_CASSANDRA_REPOSITORY_PATH}/${dir_name}
  cp -r ${CASSANDRA_DIR_PATH}/${dir_name}/* ~/${CCM_CASSANDRA_REPOSITORY_PATH}/${dir_name}
done

Create the ccm cluster which uses the libraries from commit 2fe4b9d by running the following script.

#!/bin/bash

set -e

CLUSTER_NAME="${1:-dc-security-demo}"

ccm remove ${CLUSTER_NAME}

echo "Creating cluster '${CLUSTER_NAME}'"
ccm create ${CLUSTER_NAME} -v 4.0.0

# Modifies the configuration of a node in the CCM cluster.
function update_node_config {
  CASSANDRA_YAML_SETTINGS="authenticator:PasswordAuthenticator \
                          endpoint_snitch:GossipingPropertyFileSnitch \
                          network_authorizer:CassandraNetworkAuthorizer \
                          num_tokens:32 \
                          seeds:127.0.0.1,127.0.0.2"

  for key_value_setting in ${CASSANDRA_YAML_SETTINGS}
  do
    setting_key=$(echo ${key_value_setting} | cut -d':' -f1)
    setting_val=$(echo ${key_value_setting} | cut -d':' -f2)
    sed -ie "s/${setting_key}\:\ .*/${setting_key}:\ ${setting_val}/g" \
      ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra.yaml
  done

  sed -ie "s/dc=.*/dc=dc${1}/g" \
    ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra-rackdc.properties

  sed -ie 's/\#MAX_HEAP_SIZE=\"4G\"/MAX_HEAP_SIZE=\"1G\"/g' \
    ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra-env.sh
  sed -ie 's/\#HEAP_NEWSIZE=\"800M\"/HEAP_NEWSIZE=\"250M\"/g' \
    ~/.ccm/${CLUSTER_NAME}/node${1}/conf/cassandra-env.sh
}

NUMBER_NODES=2

for node_num in $(seq ${NUMBER_NODES})
do
  echo "Adding 'node${node_num}'"
  ccm add node${node_num} \
    -i 127.0.0.${node_num} \
    -j 7${node_num}00 \
    -r 0 \
    -b \
    -s

  update_node_config ${node_num}

  # Localhost aliases
  echo "ifconfig lo0 alias 127.0.0.${node_num} up"
  sudo ifconfig lo0 alias 127.0.0.${node_num} up
done

sed -ie 's/use_vnodes\:\ false/use_vnodes:\ true/g' \
  ~/.ccm/${CLUSTER_NAME}/cluster.conf

Check the cluster nodes were created.

anthony@Anthonys-MacBook-Pro ~/ > ccm status
Cluster: 'dc-security-demo'
---------------------------
node1: DOWN (Not initialized)
node2: DOWN (Not initialized)

Start the nodes in the cluster.

anthony@Anthonys-MacBook-Pro ~/ > ccm node1 start
anthony@Anthonys-MacBook-Pro ~/ > ccm node2 start

Check that the cluster is up and running as expected.

anthony@Anthonys-MacBook-Pro ~/ > ccm node1 nodetool status

Datacenter: dc1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load        Tokens  Owns (effective)  Host ID                              Rack
UN  127.0.0.1  115.46 KiB  32      100.0%            7dafff97-e2c5-4e70-a6a9-523f5594671b rack1
Datacenter: dc2
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
UN  127.0.0.2  67.05 KiB  32      100.0%            437e3bca-d0b7-4102-bc56-201b96856f01  rack1

Start a CQL session with the cluster and increase the system_auth replication.

anthony@Anthonys-MacBook-Pro ~/ > ccm node1 cqlsh -u cassandra -p cassandra
Connected to dc-security-demo at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 4.0-SNAPSHOT | CQL spec 3.4.5 | Native protocol v4]
Use HELP for help.
cassandra@cqlsh>
cassandra@cqlsh> ALTER KEYSPACE system_auth
    WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1' : 1, 'dc2' : 1};

Warnings :
When increasing replication factor you need to run a full (-full) repair to distribute the data.

cassandra@cqlsh> exit;

Repair the system_auth keyspace on both nodes.

anthony@Anthonys-MacBook-Pro ~/ > ccm node1 nodetool repair system_auth
...
anthony@Anthonys-MacBook-Pro ~/ > ccm node2 nodetool repair system_auth
...

Start another CQL session and create a few roles with different datacenter restrictions.

anthony@Anthonys-MacBook-Pro ~/ > ccm node1 cqlsh -u cassandra -p cassandra
Connected to dc-security-demo at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 4.0-SNAPSHOT | CQL spec 3.4.5 | Native protocol v4]
Use HELP for help.
cassandra@cqlsh>
cassandra@cqlsh> CREATE ROLE foo WITH PASSWORD = 'foo' AND LOGIN = true
    AND ACCESS TO DATACENTERS {'dc1'};
cassandra@cqlsh> CREATE ROLE bar WITH PASSWORD = 'bar' AND LOGIN = true
    AND ACCESS TO DATACENTERS {'dc2'};
cassandra@cqlsh> SELECT * FROM system_auth.network_permissions;

 role      | dcs
-----------+---------
 roles/foo | {'dc1'}
 roles/bar | {'dc2'}

(2 rows)
cassandra@cqlsh> exit;

Test the datacenter access for the newly created roles.

anthony@Anthonys-MacBook-Pro ~/ > ccm node1 cqlsh -u foo -p foo
Connected to dc-security-demo at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 4.0-SNAPSHOT | CQL spec 3.4.5 | Native protocol v4]
Use HELP for help.
foo@cqlsh> exit;

anthony@Anthonys-MacBook-Pro ~/ > ccm node2 cqlsh -u foo -p foo
Connection error: ('Unable to connect to any servers', {'127.0.0.2': Unauthorized('Error from server: code=2100 [Unauthorized] message="You do not have access to this datacenter"',)})

anthony@Anthonys-MacBook-Pro ~/ > ccm node1 cqlsh -u bar -p bar
Connection error: ('Unable to connect to any servers', {'127.0.0.1': Unauthorized('Error from server: code=2100 [Unauthorized] message="You do not have access to this datacenter"',)})

anthony@Anthonys-MacBook-Pro ~/ > ccm node2 cqlsh -u bar -p bar
Connected to dc-security-demo at 127.0.0.2:9042.
[cqlsh 5.0.1 | Cassandra 4.0-SNAPSHOT | CQL spec 3.4.5 | Native protocol v4]
Use HELP for help.
bar@cqlsh>

As can be seen from the output above, a role is unable to establish a CQL session on a node in a particular datacenter unless it has been granted permissions to do so.

Summary

Apache Cassandra 4.0 is definitely shaping up to be an exciting new release of the database! The datacenter authorization feature is a useful for hardening the security of a cluster by limiting the reach of roles and applications talking to the cluster. It is designed to be used in conjunction with other authorization features to create roles that have specific purposes in a cluster. Stay tuned as we post more new features and updates that will be part of Cassandra 4.0.

Cassandra open-source log analysis in Kibana, using filebeat, modeled in Docker

I was recently asked to set up a solution for Cassandra open-source log analysis to include in an existing Elasticsearch-Logstash-Kibana (ELK) stack. After some research on more of the newer capabilities of the technologies, I realized I could use “beats” in place of the heavier logstash processes for basic monitoring. This basic monitoring would not involve extensive log transformation.

The code to run this demo is available to clone or fork at https://github.com/pythian/cassandra-elk. The only other requirement is Docker (I am using Docker version 18.05.0-ce-rc1) — using Docker for Mac or Docker for Windows will be most convenient.

In a typical production system, you would already have Cassandra running, but all the pieces are included in the Docker stack here so you can start from zero. The model here assumes ELK and a Cassandra cluster are running in your environment, and you need to stream the Cassandra logs into your monitoring system. 

In this setup, the Cassandra logs are being ingested into Elasticsearch and visualized via Kibana. I have included some ways to see data at each step of the workflow in the final section below.

Start the containers:

docker-compose up -d 

(Note: The cassandra-env.sh included with this test environment limits the memory used by the setup via MAX_HEAP_SIZE and HEAP_NEWSIZE, allowing it to be run on a laptop with small memory. This would not be the case in production.)

Set up the test Cassandra cluster:

As the Docker containers are starting up, it can be convenient to see resource utilization via ctop:

Example of ctop resource monitor for Docker containers in open-source log analysis for Cassandra

Set up the filebeat software

Do the following on each Cassandra node.

1. Download the software

You would likely not need to install curl in your environment, but the Docker images used here are bare-bones by design. The apt update statement is also necessary since typically repos are cleared of files after the requested packages are installed via the Dockerfile.

apt update

apt install curl -y

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-amd64.deb

dpkg -i filebeat-6.2.3-amd64.deb

For other operating systems, see: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation.html.

 

2. Configure filebeat

The beats software allows for basic filtering and transformation via this configuration file. Put the below in /etc/filebeat/filebeat.yml.

(This is edited from an example at: https://github.com/thelastpickle/docker-cassandra-bootstrap/blob/master/cassandra/config/filebeat.yml.)

The values in the output.elasticsearch and setup.kibana are their respective IP addresses and port numbers. For filebeat.prospectors — a prospector manages all the log inputs — two types of logs are used here, the system log and the garbage collection log. For each, we will exclude any compressed (.zip) files. The multiline* settings define how multiple lines in the log files are handled. Here, the log manager will find files that start with any of the patterns shown and append the following lines not matching the pattern until it reaches a new match. More options available at: https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html.

output.elasticsearch:

    enabled: true

    hosts: ["172.16.238.31:9200"]

setup.kibana:

    host: "172.16.238.33:5601"

filebeat.prospectors:

    - input_type: log

      paths:

        - "/var/log/cassandra/system.log*"

      document_type: cassandra_system_logs

      exclude_files: ['\.zip$']

      multiline.pattern: '^TRACE|DEBUG|WARN|INFO|ERROR'

      multiline.negate: true

      multiline.match: after

    - input_type: log

      paths:

        - "/var/log/cassandra/debug.log*"

      document_type: cassandra_debug_logs

      exclude_files: ['\.zip$']

      multiline.pattern: '^TRACE|DEBUG|WARN|INFO|ERROR'

      multiline.negate: true

      multiline.match: after

 

3. Set up Kibana dashboards

filebeat setup --dashboards

 

Example output:

Loaded dashboards

 

4. Start the beat

service filebeat start

 

Example output:

2018-04-12T20:43:03.798Z INFO instance/beat.go:468 Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]

2018-04-12T20:43:03.799Z INFO instance/beat.go:475 Beat UUID: 2f43562f-985b-49fc-b229-83535149c52b

2018-04-12T20:43:03.800Z INFO instance/beat.go:213 Setup Beat: filebeat; Version: 6.2.3

2018-04-12T20:43:03.801Z INFO elasticsearch/client.go:145 Elasticsearch url: http://172.16.238.31:9200

2018-04-12T20:43:03.802Z INFO pipeline/module.go:76 Beat name: C1

Config OK

 

View the graphs:

Then view the Kibana graphs in a local browser at: http://localhost:5601.

 

Run some sample load against one of the nodes to get more logs to experiment with:

cassandra-stress write n=20000 -pop seq=1..20000 -rate threads=4

Example output from Cassandra-stress being used to populate test data

Here are some sample queries to run in Kibana:

  • message:WARN*
  • message:(ERROR* OR WARN*)
  • message:(ERROR* OR WARN*) AND beat.hostname:DC1C2

 

You can also filter the display by choosing from the available fields on the left.

Kibana dashboard example display

 

If you would like to see what the logs look at each step of the workflow, view logs within the Cassandra container in /var/log/cassandra like this:

tail /var/log/cassandra/debug.log

Example output:

WARN  [PERIODIC-COMMIT-LOG-SYNCER] 2018-05-07 14:01:09,216 NoSpamLogger.java:94 - Out of 0 commit log syncs over the past 0.00s with average duration of Infinityms, 1 have exceeded the configured commit interval by an average of 80.52ms

 

View this data stored in Elasticsearch (in JSON format) in a browser like this:

http://localhost:9200/_search?q=(message:(ERROR*%20OR%20WARN*)%20AND%20beat.hostname:DC1C2)

Example output:

Scylla Release 2.1.3

scylla release

The Scylla team is pleased to announce the release of Scylla 2.1.3, a bugfix release of the Scylla 2.1 stable branch. Release 2.1.3, like all past and future 2.x.y releases, is backward compatible and supports rolling upgrades.

Related Links:

Bugs Fixed in this Release:

    • Dropping a keyspace with a User Defined Type (UDT) results in an error: “InvalidRequest: Error from server: code=2200 [Invalid query] message=”Unknown type…”. #3068
    • sstableloader: the SSTable files to Scylla upload tool does not support User Defined Type(UDT). scylla-tools-java #57

The post Scylla Release 2.1.3 appeared first on ScyllaDB.

Introducing AlwaysOn SQL for DSE Analytics

With DataStax Enterprise (DSE) 6 we have introduced AlwaysOn SQL, a new, highly-available, secure SQL service for analytical queries.

This service will allow BI, ETL, and other tools to connect via the standard ODBC and JDBC protocols to analyze data in DSE, including tabular data from the database, graph data from DSE Graph, and files stored in DSEFS. This service is not just highly available, a necessary requirement for production, but also fully secure, with security integrated with the rest of DSE.

Why do We Need It?

SQL is a standard that has been around for over 40 years and has grown to be the de-facto language for querying data. Many new database vendors have based their own query languages, such as the Cassandra Query Language (CQL) for Apache CassandraTM and many others, on SQL. Many of these languages, including CQL, support a subset of the SQL standard, since some of the standard is at odds with the goals and designs of these new databases. That said, SQL remains an important tool for data analysis, and an entire ecosystem of tools and expertise has grown around it over the past 40 years.

ODBC and JDBC are the two main programming interfaces to submit SQL to a database and retrieve results. These APIs enable SQL-based tools, such as BI tools, to support many databases at once, rallying around a single API against which to design.

In addition to tooling, SQL is a common language of choice for data analysis in general. Most data and business analysts are familiar with SQL and interact daily with data via SQL. While Apache SparkTM has other interfaces, such as Scala and Python, many data analysis tasks do not require the heavy weight of a Scala or Python application and can be easily accomplished with SQL. Analysts are looking for a simple way to ask questions of the data and receive results, without having to program in Spark.

So, what does an SQL service need to provide to the end user?

  1. It must be 100% reliable, meaning it must be always on and not depend on manual intervention to handle failures.
  2. It should leverage the common interfaces that exist, namely ODBC and JDBC, and thereby easily integrate with existing tools and allow users to leverage their existing knowledge.
  3. It should offer robust security, including authentication and authorization, and this security should be integrated with the rest of the data platform, leveraging the same users, permissions, and user management.
  4. It should be fully integrated into the database. It should not feel like a bolt-on addition but integrated and managed like the rest of the platform.

A Little History

DSE has shipped with SQL access to data for many releases, first via Hive and Hiveserver, then Shark and Sharkserver, and finally with Spark and the Spark SQL Thriftserver, which is what is available in DSE 5.1. However, neither of these solutions were worry-free in production environments.

In DSE 5.1, the role of ODBC/JDBC service for DSE was handled by the Spark SQL Thriftserver. This is the same approach taken by open-source Apache Spark and the various Spark vendors. However, the Spark SQL Thriftserver leaves a bit to be desired, in terms of being enterprise-class.

First, security in the Spark SQL Thriftserver has some notable shortcomings. While it’s true that you can enable LDAP authentication for the Spark SQL Thriftserver, this is not integrated with DSE, and does not support the variety of authentication schemes that DSE supports. More importantly, the Spark SQL Thriftserver runs as a single, long-running Spark application, and as such it connects to the DSE database as a single user. That is, once any user has authenticated to the Spark SQL Thriftserver, it will query the underlying DSE database as a single user regardless of which user was authenticated. This is a limitation of the open source Spark SQL Thriftserver and the Spark Cassandra Connector, and it’s clearly sub-par for security-conscious customers.

Second, the Spark SQL Thriftserver is not highly available. It must be manually started, and it’s left to the administrator to keep an eye on it. If it stops for some reason, the administrator has to restart the service again, potentially on another node (in case of node failure). This lack of high availability is prohibitive to putting it into production. Furthermore, if the service is restarted, then analytical SQL applications need to update their connection to whichever machine the service was restarted on.

The Spark SQL Thriftserver supports caching tables in memory to support even faster query times. However, if the service stops, intentionally or unintentionally, then the cached data is not automatically re-cached for the user. Moreover, if the user did issue a manual re-cache operation, when it took the data out of the data source this second time there is no guarantee that the underlying data set hasn’t changed, resulting in a different cached data set and thus different results for queries.

AlwaysOn SQL

AlwaysOn SQL was designed with these issues in mind, and is extremely conscious of the shortcomings of previous approaches.

High Availability

The first thing we want to ensure is that this service is highly available and does not require any manual monitoring or intervention. AlwaysOn SQL is enabled to start when DSE starts without any manual steps. When it’s enabled, the DSE nodes in the data center will elect a node to serve as the connection point for AlwaysOn SQL, and the service will be started. If the service stops, either on purpose or due to a failure, the DSE nodes will elect a new leader and that node will start the service and act as the new connection point.

How will client applications know which node is serving as the AlwaysOn SQL connection point? There’s an API call that can return the active connection point via the “dse client-tool” command-line tool, CQL, or a REST call to any of the nodes in the data center. Furthermore, we worked with Magnitude Software to enhance the ODBC and JDBC drivers for Spark SQL to seamlessly leverage this REST API so that clients are transparently connected to the active AlwaysOn SQL service node.

One challenge with running Spark applications is ensuring that the necessary resources are available, otherwise your application will queue and wait for other applications to finish. In DSE 5.1 we introduced the concept of Workpools, subsets of computing resources that users can be authorized to submit applications to use. For AlwaysOn SQL, we leverage this construct to create a Workpool just for AlwaysOn SQL, ensuring that the resources are always available when AlwaysOn SQL needs to start or restart.

Another challenge is how to give a good, consistent cache experience in the face of service restarts. To handle this case, we introduced new SQL syntax to support taking a persistent cache. When you create this new cache, a snapshot of the data from the database will be saved to DSEFS, DSE’s continuously available distributed file system, and then pulled into memory. Therefore, when the AlwaysOn SQL service is restarted, the cache can be repopulated from the DSEFS snapshot, ensuring the same data in the cache as before. This allows applications to reliably query the cached data without having to worry if the service has been restarted or if the data is not consistent from query to query.

Advanced Security

Production applications require strict security, which is why AlwaysOn SQL was built to integrate with DSE’s Advanced Security features. The users for AlwaysOn SQL are managed along with the users of the DSE database, via CQL as another GRANT-able privilege. When a user authenticates via ODBC/JDBC, AlwaysOn SQL uses DSE to determine permissions and grant access to the service. Users do not need additional usernames or passwords to use AlwaysOn SQL, and, since AlwaysOn SQL leverages DSE Advanced Security, the various Unified Authentication schemes are supported.

Authentication is one component to security; the other is authorization. Unlike the Spark SQL Thriftserver, which queries the DSE database as a single user regardless of the authenticated user, AlwaysOn SQL always queries the underlying database as the authenticated user. AlwaysOn SQL uses DSE’s Proxy Execution feature to properly query database tables as the authenticated user, thereby ensuring the end user only has access to tables and data they have permissions to read.

All the Data

AlwaysOn SQL provides SQL access not just to DSE database tables, which are automatically registered in the SQL catalog for analysis. DSE Graph data is also available via SQL in the form of vertex tables and edge tables, just like in DseGraphFrames. These tables are also automatically registered in the SQL catalog.

Data stored in DSEFS can be manually registered as external tables in AlwaysOn SQL. This allows for a simple way to load data from DSEFS into DSE tables via SQL – and ODBC or JDBC – only, and similarly for exporting data from DSE tables and graphs to DSEFS. External data sources can also be registered with AlwaysOn SQL, allowing simple data migration from legacy systems to DSE.

Production-Ready SQL for Analytics

AlwaysOn SQL represents an important step to providing always-on SQL access to data, in support of business analysts and the tools they use. By enabling SQL users on DSE, we invite a broad group of developers to analyze data in DSE without having to climb the learning curve of Spark programming, including (potentially) learning a new programming language. But beyond enabling tools and developers, AlwaysOn SQL was designed from the ground up to support production deployments in enterprise environments, focusing on availability and security as the two core tenets.

You can download DSE 6 now and read through our updated documentation for more information.

Mutant Monitoring System Day 10 – Backup and Restore

mms
This is part 10 of a series of blog posts that provides a story arc for Scylla Training.

In the previous post, we learned how developers at Division 3 can create applications that can interact with the Mutant Monitoring System by showing how to use the Cassandra libraries available in programming languages. Due to recent violent events in the mutant community, Division 3 implemented a new policy for Scylla Administrators to learn how to backup and restore the mutant data in the cluster. In this post, we will learn how to backup and restore data from the Mutant Monitoring System.

Starting the Scylla Cluster

The Scylla Cluster should be up and running with the data imported from the previous blog posts.

The MMS Git repository has been updated to provide the ability to automatically import the keyspaces and data. If you have the Git repository cloned already, you can simply do a “git pull” in the scylla-code-samples directory.

git clone https://github.com/scylladb/scylla-code-samples.git
cd scylla-code-samples/mms

Modify docker-compose.yml and add the following line under the environment: section of scylla-node1:

- IMPORT=IMPORT

Now we can build and run the containers:

docker-compose build
docker-compose up -d

After roughly 60 seconds, the existing MMS data will be automatically imported. When the cluster is up and running, we can run our application code.

Backing Up the Data

First, we need to backup our schema with the following commands:

docker exec -it mms_scylla-node1_1 bash
cqlsh -e "DESC SCHEMA" > /schema.cql

With the schema backed up, we can create a snapshot of the two keyspaces used for the Mutant Monitoring System: tracking and catalog. Snapshots are taken using nodetool snapshot. The command first flushes the MemTables from memory to SSTables on disk, then creates a hard link for each SSTable in each keyspace. With time, SSTables are compacted, but the hard link keeps a copy of each file. This takes up and increasing amount of disk space.

nodetool snapshot tracking

Requested creating snapshot(s) for [tracking] with snapshot name [1524840557127] Snapshot directory: 1524840557127

nodetool snapshot catalog

Requested creating snapshot(s) for [catalog] with snapshot name [1524840566260] Snapshot directory: 1524840566260

The snapshot is created in the Scylla data directory /var/lib/scylla/data and It will have the following structure: keyspace_name/table_name-UUID/snapshots/snapshot_name.

Simulating Data Loss

All of the data is backed up on node1 only. Division 3 must now prepare to handle cyber attacks from the mutants and other external organizations. In this scenario, we will simulate such an attack by using cqlsh to delete the Mutant Monitoring Keyspaces.

cqlsh
drop keyspace tracking;
drop keyspace catalog;

To verify that the keyspaces are gone, run the following command:

describe keyspaces;

The output should be:

system_traces system_schema system

Great, we only have the default keyspaces now. We can now learn how to restore the data.

Restoring the Data

To restore the data, we first need to re-create the keyspaces from the backup done previously:

docker exec -it mms_scylla-node1_1 bash
cqlsh -e "SOURCE '/schema.cql'"

Run the nodetool drain command to ensure the data is flushed to the SSTables:

nodetool drain

Next, we will need to kill the Scylla processes in each container and delete the contents of the commit log. In each container, run the following commands:

pkill scylla
rm -rf /var/lib/scylla/commitlog/*

Repeat for containers mms_scylla-node2_1 and mms_scylla-node3_1.

Return back to mms_scylla-node1_1:

docker exec -it mms_scylla-node1_1 bash

The first snapshot restore will be for the catalog keyspace. We first need to find the original snapshot folder which is the oldest directory in /var/lib/scylla/data/catalog:

cd /var/lib/scylla/data/catalog/
ls -al

total 16
drwx------ 4 root root 4096 Apr 27 15:03 .
drwxr-xr-x 1 scylla scylla 4096 Apr 27 14:40 ..
drwx------ 3 root root 4096 Apr 27 15:03 mutant_data-32afab304a2c11e8b776000000000002
drwx------ 4 root root 4096 Apr 27 14:58 mutant_data-dcffd0a04a2811e89412000000000000

Luckily the directories are numbered sequentially. The original data directory with the snapshot is mutant_data-dcffd0a04a2811e89412000000000000 and the current data directory created after importing the schema is mutant_data-32afab304a2c11e8b776000000000002. We can now copy the contents from the snapshot directory to the new data directory:

cp -rf mutant_data-dcffd0a04a2811e89412000000000000/snapshots/1524840566260/* mutant_data-32afab304a2c11e8b776000000000002/

Repeat the process for restoring the tracking keyspace in /var/lib/scylla/data/tracking. When that is complete, we can start Scylla in each container with the following commands:

export SCYLLA_CONF='/etc/scylla'
/usr/bin/scylla --developer-mode 1 --options-file /etc/scylla/scylla.yaml&
/usr/lib/scylla/jmx/scylla-jmx -l /usr/lib/scylla/jmx&

Repeat for mms_scylla-node2_1 and mms_scylla-node3_1. It will take a few minutes for the nodes to form a cluster. You should see messages like this:

INFO 2018-04-27 17:00:35,574 [shard 0] gossip - Connect seeds again ... (11 seconds passed)
INFO 2018-04-27 17:00:36,574 [shard 0] gossip - Connect seeds again ... (12 seconds passed)
INFO 2018-04-27 17:00:37,575 [shard 0] gossip - Connect seeds again ... (13 seconds passed)

When the nodes form a cluster, you should see similar messages as shown below:

INFO 2018-04-27 17:05:27,927 [shard 0] storage_service - Node 172.18.0.3 state jump to normal
INFO 2018-04-27 17:05:28,024 [shard 0] gossip - Favor newly added node 172.18.0.3
INFO 2018-04-27 17:05:28,058 [shard 0] storage_service - Node 172.18.0.2 state jump to normal
INFO 2018-04-27 17:05:29,034 [shard 0] gossip - Favor newly added node 172.18.0.2

After the nodes are online, we can run a repair and then verify that our data is restored properly with the following commands:

nodetool repair

You should see many messages like this:

INFO 2018-04-27 17:31:28,744 [shard 1] repair - Repair 302 out of 498 ranges, id=1, shard=1, keyspace=system_traces, table={sessions_time_idx, node_slow_log_time_idx, events, sessions, node_slow_log}, range=(1506223645386967449, 1537077568234766066]

This means that repair is syncing the data across the cluster. The entire process may take a few minutes. When the repair process is complete, we can run queries to ensure that the data is restored to both keyspaces:

cqlsh -e 'select * from tracking.tracking_data;'
cqlsh -e 'select * from catalog.mutant_data;'

Conclusion

In conclusion, we followed the directive from Division 3 to teach each Scylla Administrator to learn how to backup and restore a cluster. The process began with backing up the schema followed by creating a snapshot of the data. We simulated an attack by deleting the keyspaces and then restoring and repairing the cluster. Stay safe out there and backup as much as you can!

Next Steps

  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Mutant Monitoring System Day 10 – Backup and Restore appeared first on ScyllaDB.

Intel’s Memory Group on the Advantages of Scylla

intel

The Intel Memory Group is behind the revolutionary Optane SSD drive that provides breakthrough performance and is 5-8x faster at Low Queue Depths than traditional SSD’s. Intel began working with ScyllaDB staff last year to build a big memory system at high-volume scale. They chose Scylla because they needed a solution that can fully leverage the hardware to derive the best possible performance.

The Intel team likes that Scylla is drop-in compatible with Apache Cassandra and didn’t require any changes to their applications. They also appreciate that Scylla optimizes workloads without manual user intervention or downtime.

“With Scylla you’ve got open source development methodology providing that very fast delivery so you’ve got the best of today’s worlds.”  Frank Ober, Solutions Architect, Intel

Watch the video below to see what Frank Ober of Intel has to share about his experience with Scylla.

Next Steps

    • Learn more about Scylla from our product page.
    • See what our users are saying about Scylla.
    • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
    • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Intel’s Memory Group on the Advantages of Scylla appeared first on ScyllaDB.

Scylla Enterprise Release 2018.1.0

release
The Scylla team is pleased to announce the release of Scylla Enterprise 2018.1.0, a production-ready Scylla Enterprise major release.

The Scylla Enterprise 2018.1 release is based on Scylla Open Source 2.1 and includes new features which are only offered to our Enterprise customers, Enterprise version bug fixes, and backported bug fixes from upstream releases 1.7, 2.0, 2.1 and master.

Read more about Scylla Enterprise here.

Related Links

Scylla Enterprise customers are encouraged to upgrade to Scylla Enterprise 2018.1 and are welcome to contact our Support Team with questions.

New Features in Scylla Enterprise 2018.1

Enterprise only features in 2018.1

  • Auditing
    Scylla Enterprise includes an auditing capability, which allows administrators to track system and users activity for deployments with multiple users.
    Scylla auditing can work in Keyspace, Table or Category granularity. Categories are:
    AUTH Logs login events
    DML Logs insert, update, delete, and other data manipulation language (DML) events
    DDL Logs object and role create, alter, drop, and other data definition language (DDL) events
    DCL Logs grant, revoke, create a role, drop a role, and list roles events
    QUERY Logs all queriesThe audit log can be output the system log, or to a Scylla table. More on Auditing in Scylla Enterprise: http://docs.scylladb.com/operating-scylla/auditing.
  • IBM Power support
    Scylla Enterprise 2018.1 is also available on the IBM POWER8 architecture, running on CentOS 7 and RHEL.

New Features in 2018.1
(Where mentioned, issue numbers are from the upstream open source project)

  • Counters. Available in Scylla open source from 1.7, Counters are now part of Scylla Enterprise
    More on Counters: http://docs.scylladb.com/using-scylla/counters/
  • Time Window Compaction Strategy (TWCS). An improvement and simplification to Date Tiered Compaction Strategy (DTCS) as the go-to strategy for time series data. TWCS uses Size-Tiered Compaction Strategy (STCS) on “time windowed“ groups of SSTables, using the maximum timestamp of the SSTable to decide which time window each SSTable will be part of. #1432
  • Row Cache. In Pre 2.0 releases, Scylla cached full partitions only and we chose not to cache partitions larger than 10MB. With the introduction of row cache, Scylla can cache a single CQL row out of a large partition, removing this limitation. This change will significantly improve Scylla performance for use cases with large partitions such as Time Series.
  • Heat Weighted Load Balancing. When a Scylla node restarts for any reason (upgrade, config update, error), it starts with an empty cache. This means the node read path will always hit the disk until it’s cache is repopulated. During this time, this node will be slower, and as a result, can slow down the entire cluster. Heat Weighted Load Balancing solves this issue by optimizing the node selection in the coordinator based on the heat of each node. A hot node, with a good cache-hit-ratio, will get more reads requests than a cold node with the same data and a lower cache hit ratio. After some time, the cold node will become hot, and the load balancing will become uniform again. #1455. More on Heat Weighted Load Balancing
  • Improved Performance for CPU-bound workloads. More here.
  • DNS resolution. You can now use DNS names (FQDN), not only IPs, in scylla.yaml for seed nodes, listen_address, broadcast_address and broadcast_rpc_address #2357.
  • CQL additions:
    • Cast functions between types, for example: SELECT avg(cast(count as double)) FROM myTable.
    • Support Duration data type #2240.
    • CQL prepared statements are now properly evicted so that you’re not able to OOM a server just by preparing many unique statements. See #2474
    • Support non-reserved keywords as columns (without quotes) #2507, for example, a column named “frozen”.
    • Update to CQL 3.3.1, to align with Cassandra 2.2 release
    • New data types: tinyint, smallint, date, and time #1284
      Binary protocol v4 (required by the new data types).
    • Support UNSET in prepared statements #2039
    • CQL aggregate functions now support floating point data types #2168
  • Transitional Auth – To enable access control without downtime, use transitional modes to temporarily support applications and users that do not have accounts without interrupting services. More here
  • New REST API /storage_service/force_terminate_repair allows aborting of a running repair and all the data streams related to it. #2105.
  • Ext4 support. Scylla now supports the ext4 file system in production, not just in dev mode. Using XFS is still recommended for best performance #2381.
  • CPU isolation. Improve latency using isolation between background tasks like memtable flushes and compaction and foreground tasks like CQL read/write. More here
  • Java tools. Java tools provided with Scylla are upgraded to match Cassandra 3.0. In particular:
    • sstableloader supports Cassandra 3.0 sstable format, allowing easy migration from Cassandra 2.2 and 3.x clusters to Scylla
    • sstable2json is renamed to sstabledump
    • sstablesplit, sstableupgrade, sstablescrub, json2sstable, sstablekeys, and token-generator are temporarily removed (all break compatibility to current SSTable format)
  • CompressionInfo.db files, part of the SSTable format, are stored more efficiently in memory, allowing higher disk: RAM ratios #1946.

System Impacts

  • Installation updates
    Starting from Scylla Enterprise 2018.1, Scylla packages for Ubuntu 16.04 and Debian 8 are signed. You can find the keys and instructions per Linux distribution here.
  • Upgrade procedure. You can only upgrade to Scylla Enterprise 2018.1 from Scylla Enterprise 2017.1.6 or later, or from Scylla Open Source 2.1 or later. Upgrade from earlier Scylla releases is not supported.
  • Kernel support
    Scylla Enterprise 2018.1 requires Linux kernel 3.10.0-514 or later.
  • Driver support
    Relevant for upgrading from Scylla Enterprise 2017.1 (ignore if you are upgrading from Scylla Open Source 2.1).Certain older versions of Cassandra drivers are no longer supported. With the introduction of MV, Scylla has moved to Cassandra 3.0 compatible system tables. This means that certain old Cassandra drivers, which do not work with Cassandra 3.0, will not work with Scylla.To ensure your application will work properly after a Scylla upgrade, we highly recommend upgrading your Cassandra driver before upgrading to Scylla 2018.1. The latest drivers are backward compatible and will work with all Scylla 2017.1, 1.x and 2.x releases, past, and future.At the very least, your Cassandra driver should be in the list below, using the version listed or later.
    – C++ : 2.3+
    – C#: 3.0+
    – Java: 3.0+
    – Node.js: 3.0+
    – PHP: 1.2+
    – Python: 3.5+
    – Ruby: 3.0+
    – GoCQL commit 8d27cdeae224463cadc1ee973386862baaf6662e (Mar 1, 2016) or later.
  • Counter Migration. Scylla does not support Counter SSTables created by Apache Cassandra 2.0 or older. Note that if you upgraded from Cassandra 2.0 to 2.1, you may still have old Counters in the SSTables.The proper way to migrate Counters data from Cassandra to Scylla is to run a major compaction on the Cassandra node, and use sstableloader to load the SSTable to Scylla.
  • Deprecated partitioners. Byte-ordered and random partitioners were deprecated in Scylla Open Source 2.0, and Scylla Enterprise 2018.1 and will be removed in a future version. Please open a support ticket if you use or are planning to use these partitioners.
  • New intra-node sharding algorithmThe intra-node sharding algorithm determines how data is divided among cores in a node. The change results in a better distribution of data among cores, especially in large clusters. This improves performance across the cluster. When upgrading from 2017.1, please check scylla.yaml for murmur3_partitioner_ignore_msb_bits. If you do *not* have this line, please consult Scylla support on how to safely upgrade.

Metrics updates from 2017.1

Scylla Monitoring stack now includes Scylla 2018.1 dashboard. For a full list of metrics, update see here.

Known Issues

When installing Scylla Enterprise on Power Architecture, scylla_setup fail to install node_exporter. Without node_exporter OS level metric is not reported to Scylla Monitoring Stack. Bypass: install node_exporter packages using the public repo.

Next Steps

  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Scylla Enterprise Release 2018.1.0 appeared first on ScyllaDB.

What’s New for DataStax Enterprise Analytics 6

An always-on, distributed cloud database needs non-stop analytics to manage workflows, derive insights, and enable analysis for business applications and business analysts. With that in mind, for this release we focused our attention for DSE Analytics on nonstop availability and ease-of-use for operational analytics workloads, resulting in some significant and impactful new features:

  • AlwaysOn SQL, a highly-available, enterprise-grade service to support production analytical SQL applications
  • DataStax Enterprise (DSE)-specific enhancements to the Spark Catalyst optimizer, including automatic use of DSE Search indices to give automatic performance benefits of DSE Search to Spark operations
  • Upgrade of the DSE Analytics engine to Apache Spark™ 2.2, including Structured Streaming to enable improved streaming analytics for DSE

Building on the Goodness of DSE 5.1

Before diving into the new features of DSE 6, it’s worth highlighting a few items from DSE 5.1 that DSE 6 builds on. The most notable of these is the performance gains we introduced via Continuous Paging — namely, up to a 3x improvement for scans from the DSE database. This greatly accelerates operational analytic workloads on data in DSE.

Another big enhancement was the general availability of DSEFS, a continuously available, HDFS-compatible, distributed file system that integrates seamlessly with Spark and is capable of scaling to 40TB per node. DSEFS provides not just checkpointing for Spark Streaming applications but also supports general use cases, including data reception, lambda-architecture-type data flows, and scan-heavy operational analysis.

Another enhancement to call out is the improvements to DSE’s Spark Resource Manager. The Resource Manager in DSE has been highly available for several versions, but in DSE 5.1 significant improvements were made to its fault-tolerance, security, and ease of use. In DSE 5.1, all nodes in the DSE datacenter can accept a Spark job submission, and all communications — client to the cluster, within the cluster, etc. — are protected by encryption.

Introducing DSE AlwaysOn SQL

Given the performance improvements in DSE 5.1, we turned our focus to making the developer experience significantly simpler with analytic queries in DSE 6, and we wanted to ensure these improvements addressed the issues with running analytics in production.  

One ubiquitous API for analysis is good old SQL. It’s been around for a long time and a complete and extensive industry has built up to include tools, applications, and expertise around doing data analysis and data flows with SQL. It’s important to remember that despite being built in Scala, using Spark does not require Scala skills. Spark has always included an SQL component, and DSE Analytics inherits that benefit, too.  

The Spark community has put a large amount of effort into making a strong SQL engine, but it has largely avoided addressing what it takes to build an enterprise-class ODBC/JDBC service that can be put into production. The service needs to be highly available, simple to use, and implement production-ready security measures for networking, user authentication, and user authorization.

This is what AlwaysOn SQL was designed and built to achieve. It’s a production-ready ODBC/JDBC service that provides SQL access to data in DSE, both the database and DSEFS.  This allows ODBC/JDBC analytical applications to be put into production, worry-free. This service will automatically restart in the face of failures, and cached datasets will also be refreshed automatically. Client applications will connect seamlessly to the service without having to know the details of where in the data center the service is running.

Authentication and authorization of SQL users will occur via the same users managed within DSE Advanced Security, removing the need for extra, error-prone security setup steps. Queries against the underlying DSE database will be executed as the authenticated SQL user, providing fine-grained security to the data.

More details on this will be coming soon in a later blog post, so please keep a look out for that.

Improved Spark Analytic Engine

DSE 6 includes a number of improvements “under the hood” to support not just AlwaysOn SQL but also general Spark applications. DataStax has invested in a number of areas to improve the performance of these Spark applications in DSE Analytics.  

First, DSE 6 upgrades the DSE Analytics engine to Apache Spark 2.2.  This landmark release sees the graduation of the new Structured Streaming component from an “experimental” feature to a full, first-class citizen. Aside from Structured Streaming, Spark 2.2 focuses more on usability, stability, and polish. To support the new Structured Streaming API, DSE 6 includes a new Structured Streaming sink exclusively for DSE, enabling simple, efficient, and robust streaming of data into DSE from Apache Kafka, file systems, or other sources.

DSE Analytics also now automatically leverages any DSE Search indices and pushes down the DSE Search query to let the search engine efficiently perform the query. This allows for not just free-text search but also for Boolean predicates to be efficiently evaluated by the Lucene-based engine that is well-suited to process these queries. In some cases, namely if the query will return a large portion of the data, it is less efficient to get the data via the index and more efficient to simply scan the whole data set and let Spark do the evaluation of the predicates. DSE Analytics will actually query the DSE Search component to determine how much data would be returned, and accordingly decide which approach is more efficient for this query — automatically.

DSE Analytics also introduces a new join strategy exclusively for DSE, the Direct Join. Those of you familiar with Spark Streaming with Apache Cassandra™ are already familiar with the joinWithCassandraTable method for doing lookup to join with data in the database. The new direct join is the Dataset analogue to the RDD method, joinWithCassandraTable. Moreover, the direct join will be chosen automatically in situations where it is the preferred approach to joining with DSE database data. The direct join has clear use for Spark Streaming applications, but it is equally useful for batch applications, as well.

Finally, DSE GraphFrames, introduced in DSE 5.1, has been expanded to provide even more capabilities and improved performance in DSE 6. Additionally, all graphs in DSE Graph will automatically appear in the Spark SQL catalog as vertex and edge tables. This enables simple access to basic SQL and Dataset operations on the table representation of graph data, including via ODBC/JDBC and AlwaysOn SQL.

And Away We Go!

These new enhancements to DSE Analytics deliver improved simplicity, reliability, flexibility, and performance to the DSE platform. AlwaysOn SQL brings production-ready, enterprise-grade ODBC/JDBC to DSE Analytics, enabling a large ecosystem of tools, applications, and expertise.  DSE Analytics 6 continues the enhancements and optimizations to DSE’s Spark engine that customers have come to expect from DataStax.

You can download DSE 6 now and read through our updated documentation for more information.

7 Reasons Why Open-Source Elassandra (Cassandra + Elasticsearch) Is Worth a Look

For organizations that rely on the Cassandra NoSQL database but require more efficient search capabilities, Elassandra offers a compelling open-source solution. Elassandra combines the powers of Elasticsearch and Cassandra by utilizing Elasticsearch as a Cassandra secondary index.

The post 7 Reasons Why Open-Source Elassandra (Cassandra + Elasticsearch) Is Worth a Look appeared first on Instaclustr.

Mutant Monitoring Systems (MMS) Day 9 – Interacting Programmatically

mms

This is part 9 of a series of blog posts that provides a story arc for Scylla Training.

In the previous post, we setup the Scylla Monitoring stack for the Mutant Monitoring System so we can monitor the performance, latency, and status of nodes in the Scylla Cluster. Those of us in Division 3 now want to teach our development team how to create applications that can interact with the Scylla Cluster so we can build the next-generation tools for the Mutant Monitoring System. In this post, we will explore how to connect to a Scylla cluster using the available APIs.

When creating applications that communicate with a database such as Scylla, it is crucial that the programming language being used has support for database connectivity. Since Scylla is compatible with Cassandra, we can use any of the Cassandra libraries available. For example, in Go, there is Gocql and Gocqlx. In Node.js, there is cassandra-driver. Since Division 3 consists mostly of Node.js developers, let’s begin by writing a sample Node.js application.

Creating the Application

Switch to the terminal and create a new directory on your computer that can be used for this project. We will need to create a file called package.json. Package.json is used by the npm command to download the listed library dependencies such as cassandra-driver for this project. Using your favorite text editor, create the following file:

Next, we can begin coding the main application that will interact with the Mutant Monitoring System’s Scylla cluster. Using a text editor, create a new file called app.js. At the beginning of the file, we need to start by importing the cassandra-driver as shown below:

var cassandra = require('cassandra-driver');

In the next line, we can begin creating functions that will run queries on the Scylla cluster such as INSERT, DELETE, and SELECT in the tracking keyspace. The example below will insert data into the cluster with the following Strings: first_name, last_name, timestamp, heat, location, speed, telepathy_powers, date.

The client variable will create a new connection to the Scylla Cluster and also give the application the ability to communicate with all three nodes. By being able to specify multiple nodes, the application can be more resilient if a Scylla node fails, and provides performance by load-balancing the requests. The keyspace entry specifies the keyspace to use.

The query variable can be thought of as using cqlsh in your program. You can put any query that you want in there and have your application execute it on the Scylla cluster. By using ?’s in the VALUES section of the query variable, we can have the values populated by parameters which are specified in the next line. All of the parameters in the constant parms is what we gathered as input in the insertData function. When the query is executed with the client.execute call, you can see there is an entry for the query and parms that were set previously. If any errors are reported, they will be visible in the console.

With the application query done, we can now create the code that will call it and put data into it:

The setTimeout loop is used to randomly insert data every 50ms. Since Jim Jeffries is a popular mutant that we like to monitor at Division 3, we will manually put his information in for first_name, last_name, and location. The other required values use the Math and Date functions to automate the values. Finally, we call the insertData function that we added earlier to actually do the work. When the insert is complete, we will repeat the function. When the application starts, we call the load function loop with load();.

Now that that is done, we will need to get the Scylla Cluster up and running before we run the application.

Starting the Scylla Cluster

The Scylla Cluster should be up and running with the data imported from the previous blog posts.

The MMS Git repository has been updated to provide the ability to automatically import the keyspaces and data. If you have the Git repository cloned already, you can simply do a “git pull” in the scylla-code-samples directory.

git clone https://github.com/scylladb/scylla-code-samples.git
cd scylla-code-samples/mms

Modify docker-compose.yml and add the following line under the environment: section of scylla-node1:

- IMPORT=IMPORT

Now we can build and run the containers:

docker-compose build
docker-compose up -d

After roughly 60 seconds, the existing MMS data will be automatically imported. When the cluster is up and running, we can run our application code.

Running the Application

Since this application will run in Docker, we will need to create a file called Dockerfile to build and run the code. Using a text editor, paste the contents below into Dockerfile:

Next, run the following commands to build and run the application:

docker build -t app .
docker run -d --net=mms_web --name app app
docker logs -f app

We can see if everything is working properly by accessing the Scylla cluster and counting the rows:

docker exec -it mms_scylla-node1_1 cqlsh
select count(*) from tracking.tracking_data;

If you keep running the select count(*) query, you should be able to see the count increase every second.

Conclusion

In this post, we explained how developers at Division 3 can create applications that interact with the Mutant Monitoring System by showing them how to use the Cassandra libraries available in programming languages. Since Scylla is compatible with Cassandra, developers coming from Cassandra will not have to make changes to their existing code. Please continue to innovate to help Division 3 create new tools that can track Mutants and keep the public safe from tyranny.

Next Steps

  • Learn more about Scylla from our product page.
  • See what our users are saying about Scylla.
  • Download Scylla. Check out our download page to run Scylla on AWS, install it locally in a Virtual Machine, or run it in Docker.
  • Take Scylla for a Test drive. Our Test Drive lets you quickly spin-up a running cluster of Scylla so you can see for yourself how it performs.

The post Mutant Monitoring Systems (MMS) Day 9 – Interacting Programmatically appeared first on ScyllaDB.

Stuff The Internet Says On Scalability For April 13th, 2018

Hey, it's HighScalability time:

 

Bathroom tile? Grandma's needlepoint? Nope. It's a diagram of the dark web. Looks surprisingly like a tumor.

If you like this sort of Stuff then please support me on Patreon. And I'd appreciate if you would recommend my new book—Explain the Cloud Like I'm 10—to anyone who needs to understand the cloud (who doesn't?). I think they'll learn a lot, even if they're already familiar with the basics. 

  • $23 billion: Amazon spend on R&D in 2017; $0.04: cost to unhash your email address; $35: build your own LIDAR; 66%: links to popular sites on Twitter come from bots; 60.73%: companies report JavaScript as primary language; 11,000+: object dataset provide real objects with associated depth information; 150 years: age of the idea of privacy; 30%~ AV1's better video compression; 100s of years: rare-earth materials found underneath Japanese waters; 67%: better image compression using Generative Adversarial Networks; 1000 bit/sec: data exfiltrated from air-gapped computers through power lines using conducted emissions; 

  • Quotable Quotes:
    • @Susan_Hennessey: Less than two months ago, Apple announced its decision to move mainland Chinese iCloud data to state-run servers.
    • @PaulTassi: Ninja's New 'Fortnite' Twitch Records: 5 Million Followers, 250,000 Subs, $875,000+ A Month via @forbes
    • @iamtrask: Anonymous Proof-of-Stake and Anonymous, Decentralized Betting markets are fundamentally rule by the rich. If you can write a big enough check, you can cause anything to happen. I fundamentally disagree that these mechanisms create fair and transparent markets.
    • David Rosenthal: The redundancy needed for protection is frequently less than the natural redundancy in the uncompressed file. The major threat to stored data is economic, so compressing files before erasure coding them for storage will typically reduce cost and thus enhance data survivability.
    • @mjpt777: The more I program with threads the more I come to realise they are a tool of last resort.
    • JPEG XS~ For the first time in the history of image coding, we are compressing less in order to better preserve quality, and we are making the process faster while using less energy. Expected to be useful for virtual reality, augmented reality, space imagery, self-driving cars, and professional movie editing.
    • Martin Thompson: 5+ years ago it was pretty common for folks to modify the Linux kernel or run cut down OS implementations when pushing the edge of HFT. These days the really fast stuff is all in FPGAs in the switches. However there is still work done on isolating threads to their own exclusive cores. This is often done by exchanges or those who want good predictable performance but not necessarily be the best. A simple way I have to look at it. You are either predator or prey. If predator then you are mostly likely on FPGAs and doing some pretty advanced stuff. If prey then you don't want to be at the back of the herd where you get picked off. For the avoidance of doubt if you are not sure if you are prey or predator then you are prey. ;-)
    • Brian Granatir: serverless now makes event-driven architecture and microservices not only a reality, but almost a necessity. Viewing your system as a series of events will allow for resilient design and efficient expansion. DevOps is dead. Serverless systems (with proper non-destructive, deterministic data management and testing) means that we’re just developers again! No calls at 2am because some server got stuck? 
    • @chrismunns: I think almost 90% of the best practices of #serverless are general development best practices. be good at DevOps in general and you'll be good at serverless with just a bit of effort
    • David Gerard: Bitcoin has failed every aspiration that Satoshi Nakamoto had for it. 
    • @joshelman: Fortnite is a giant hit. Will be bigger than most all movies this year. 
    • @swardley: To put it mildly, the reduction in obscurity of cost through serverless will change the way we develop, build, refactor, invest, monitor, operate, organise & commercialise almost everything. Micro services is a storm in a tea cup compared to this category 5.
    • James Clear: The 1 Percent Rule is not merely a reference to the fact that small differences accumulate into significant advantages, but also to the idea that those who are one percent better rule their respective fields and industries. Thus, the process of accumulative advantage is the hidden engine that drives the 80/20 Rule.
    • Ólafur Arnalds: MIDI is the greatest form of art.
    • Abraham Lincoln: Give me six hours to chop down a tree and I will spend the first four sharpening the axe.
    • @RichardWarburto: Pretty interesting that async/await is listed as essentially a sequential programming paradigm.
    • @PatrickMcFadin: "Most everyone doing something at scale is probably using #cassandra" Oh. Except for @EpicGames and @FortniteGame They went with MongoDB. 
    • Meetup: In the CloudWatch screenshot above, you can see what happened. DynamoDB (the graph on the top) happily handled 20 million writes per hour, but our error rate on Lambda (the red line in the graph on the bottom) was spiking as soon as we went above 1 million/hour invocations, and we were not being throttled. Looking at the logs, we quickly understood what was happening. We were overwhelming the S3 bucket with PUT requests
    • Sarah Zhang: By looking at the polarization pattern in water and the exact time and date a reading was taken, Gruev realized they could estimate their location in the world. Could marine animals be using these polarization patterns to navigate through the ocean? 
    • Vinod Khosla: I have gone through an exercise of trying to just see if I could find a large innovation coming out of big companies in the last twenty five years, a major innovation (there’s plenty of minor innovations, incremental innovations that come out of big companies), but I couldn’t find one in the last twenty five years.
    • Click through for lots more quotes.

Don't miss all that the Internet has to say on Scalability, click below and become eventually consistent with all scalability knowledge (which means this post has many more items to read so please keep on reading)...

Garbage Collection Tuning for Apache Cassandra

This is our second post in our series on performance tuning with Apache Cassandra. In the first post, we examined a fantastic tool for helping with performance analysis, the flame graph. We specifically looked at using Swiss Java Knife to generate them.

In this post, we’re going to focus on optimizing Garbage Collection. First though, it’s important to answer the question, why bother tuning the JVM? Don’t the defaults work well enough out of the box? Isn’t this a bit of premature optimization?

Unfortunately, if your team cares at all about meeting an SLA, keeping costs to a minimum, or simply getting decent performance out of Cassandra, it’s absolutely necessary. We’ve seen properly tuned clusters exhibit a 5-10x improvement in latency and throughput from correctly tuning the JVM. For read heavy workloads where the number of nodes in the cluster is often determined by the number of queries per second it can handle rather than the size of the data, this equates to real cash. A fifty node cluster of r3.2xlarge instances in AWS, billing on demand will run you about $325,000 a year in instance costs alone. Tuning the JVM with a 5x improvement will save a quarter million dollars a year.

Let’s get started.

What is garbage collection?

One thing that’s nice about Java is, for the most part, you don’t have to worry about managing the memory used to allocate objects. Typically you create objects using the new keyword, you use the object for a little while, and you don’t need to worry about doing anything with it when you’re done. This is because the JVM handles keeping track of which objects are still in use and which objects are no longer needed. When an object is no longer needed, it’s considered “garbage”, and the memory used can be reallocated and used for something else. This is in contrast to older languages like C where you would allocate some memory using malloc(size), and use free(ptr) when you were done with it. While it may not seem like a trivial process to remember to call free() on pointers you no longer need, forgetting to do so can cause a memory leak, which can be difficult to track down in large codebases.

There’s quite a few options when configuring garbage collection, so it can seem a little daunting. Cassandra has as of recent always shipped using the Parallel New (ParNew) and Concurrent Mark and Sweep (CMS) garbage collection algorithms, so they will be the focus of this post. There is alternatively the option of using G1GC, but the tradeoff for its simplicity is a higher memory requirement and longer GC times, albeit with reasonable throughput. For cases where we care about the p99 (or p99.9), we’re better off using ParNew + CMS and getting the lower latency, at least for the time being. This may be improved with versions of Java above 8, but at the moment that’s what Cassandra requires.

JVM Spaces

The JVM is a generational, meaning objects are created in an “Eden” space and over time promoted up into the Survivor space and later the Old Generation. This is an attempt to reduce memory fragmentation by keeping objects with a high churn rate limited to a specific memory space. High object churn and fragmentation is problematic because it requires quite a bit of housekeeping to defragment the space. Remember how painful defragmenting a hard drive was?

Defrag

The JVM has many options for startup settings related to Garbage Collection. First, we’re able to set how much memory the JVM is allowed to use in total. The total space is governed by -Xmx (x for max). We generally set -Xms as well (think ‘s’ for startup) which is how much memory is allocated at startup. For performance reasons, Xmx and Xms should be the same value. If not explicitly set, the Cassandra start script will calculate how much memory to use, with a max of 8GB. This is a safe default that hasn’t changed over the years, but there are many workloads that these settings are not optimal. There’s additional settings that govern the behavior of the collectors, but we will limit the scope of this post to explaining how to correctly size each of the regions, and address the additional parameters in a follow up post.

The first two spaces, Eden and survivor, are considered the New Gen, and the sizing is determined by the -Xmn flag. Try to remember n for new. Objects are allocated in the Eden space. Eden is not sized directly, rather it is the remainder of space after we allocate our next region, the survivor spaces.

The survivor space is split into two smaller spaces, referred to S0 and S1. Only one survivor space is active at a given time. We copy objects between the two survivor generations until we eventually promote them to the old generation. The amount of memory in each survivor space is governed by the startup flag -XX:SurvivorRatio. Each survivor space has a 1:SurvivorRatio relationship to the Eden space. If we use the setting -XX:SurvivorRatio=2 with a NewGen of 4GB, Eden will be allocated 2GB of space with each survivor allocated 1GB of space, giving us our 1:2 ratio.

The old generation is where objects eventually land that have survived multiple copies. The idea here is that if they’ve been able to survive this far, they probably will survive a while longer.

Overview

Collection and Promotion

Understanding what happens to objects during their lifetime is essential to tuning the JVM optimally. Generally speaking, during each phase the JVM looks at the object graph, determine the objects that are live and which are garbage.

Parallel New

As mentioned above, Eden is where objects are first allocated. This is a fixed size space determined by the -Xmn flag. As objects are allocated in Eden, the space fills up. Once we’ve filled up Eden space, we have our first GC pause. At this time, the JVM stops all threads and inspects the objects in memory. We refer to this as a “stop the world” pause.

We mentioned above only one survivor space is active at any given time. The first thing the JVM does during a ParNew pause is take any objects that are in the current survivor space that are live objects and either copy them to the other survivor space or the old gen, depending on how long they have lived. The number of times is is guided by the XX:MaxTenuringThreshold option, but there is no guarantee that an object has to survive this many times. Any objects that are garbage the JVM does nothing with, they are simply overwritten later. The process of copying objects between generations takes significantly longer than doing nothing, there’s a direct relationship between the number and size of objects copied and how long it takes to copy them. This process all occurs during the stop the world pause. The more we copy, the longer it takes. The bigger the space, the more potential copying we could do. A massive NewGen filled with live objects that need to be promoted would result in massive pauses. On the other hand, a massive new gen filled with garbage that can be disregarded will have very short pauses.

After the surviving objects are copied from one survivor generation to the other, we copy any live objects from the Eden space to the new active survivor space. As with the first phase, copying is slow, cleaning up garbage is fast.

Concurrent Mark and Sweep

In our old generation, we use a different algorithm, Concurrent Mark and Sweep (CMS). The pauses here are shorter, because most of the work is done concurrently with the application. We have two short pauses here, the mark and remark phases. During these pauses we identify the live and dead objects. We’re able to clean up the garbage while the application is running. CMS kicks in when we reach a certain threshold of fullness, specified by the -XX:CMSInitiatingOccupancyFraction parameter. This default is around 92, and somewhat dangerous, we risk memory filling up before CMS can complete. Cassandra sets this to be a bit lower, -XX:CMSInitiatingOccupancyFraction=75 by default is signficantly safer.

Full GC

Full GC happens when we can’t promote objects to the Old Generation. It cleans up all the spaces, freeing up as much space as possible, and does the dreaded defragmention. Full GC pauses are awful, they can lock up the JVM for several minutes, more than enough time for a node to appear as offline. We want to do our best to avoid Full GC pauses. In many cases it’s faster to reboot a node than let it finish a pause lasting several minutes.

Performance Profiles

The key takeaways from the previous section:

  • Cleaning up garbage is fast.
  • Promoting objects is slow.
  • The more promotion, the longer the pause.
  • Big spaces are capable of more promotion, and thus, longer pauses.

Figuring out the optimal settings for your Cassandra installation is dependent on the workload. Generally speaking, we can categorize a workload as write heavy, read heavy, or an evenly mixed workload. Almost every cluster we’ve analyzed has fallen into one of the first two workloads, with over 99% of the operations being either reads or writes.

Write Heavy Workloads

Write heavy workloads are the Bread and butter of Cassandra. Writes are optimized by treating Memtables as a write back cache, avoiding random reads and in place updates in favor of bulk writes to memtables and a compaction process (the defining characteristics of an LSM)

During a write heavy workload, there are two primary sources of allocations that we must keep in mind when tuning Garbage Collection: memtables and compaction.

The amount of space used by Memtables is configurable and is a significant source of allocation. Mutations are stored in memtables and flushed to disk as SSTables. If you’re storing large text fields or blobs, you can keep a significant amount of data off heap by enabling the following:

memtable_allocation_type: offheap_objects

Since objects assocated with memtables are being allocated often and kept around for a while, we want to limit the size of our Memtables by explicitly specifying memtable_heap_space_in_mb, otherwise it’s set automatically to 1/4 of the heap. If we limit the space used by Memtables we can safely increase the size of the new gen, which will reduce the frequency of GC pauses.

Compaction can generate a significant amount of garbage, depending on the setting for compaction_throughput_mb_per_sec. Compaction can generate a significant amount of short lived objects. For time series data (the most common write heavy use case), LeveledCompactionStrategy not only generates more I/O and takes up a ton of CPU but also generates significantly more garbage than the more appropriate TimeWindowCompactionStrategy. Alex goes into detail on the behavior of TWCS in this post. Picking the right compaction strategy for the right workload can mean orders of magnitude difference in performance.

Generally speaking, with faster disks and higher compaction throughput, it’s a good idea to increase the size of the new generation to account for faster allocation.

Read Heavy Workloads

Read heavy workloads are a little different. Under a read heavy workload, we have significantly less memory churn related to Memtables. Instead, we have a lot of short lived objects as a result of reads pulling data off disk and creating temporary objects. These objects typically last less than a second, sometimes only a few milliseconds. The faster our disks, the higher the allocation rate.

The trouble with the Cassandra defaults and read heavy workloads is that the new generation is capped at 800MB, and the object allocation rate with fast disks can easily cause objects to be prematurely pushed into the old generation. This is bad because:

  • Promotion is slow
  • Lots of promotion can easily fill the old gen

With read heavy workloads and fast disks, we need to increase our heap size and new gen or we will suffer incredibly long pauses. A good starting point is to bump up the heap to 12GB and set the size of the new generation to 6GB. This allows us to keep objects associated with reads in the new generation while keeping premature promotion to a minimum. We can also see a significant benefit by increasing XX:MaxTenuringThreshold, which will keep objects in the new gen rather than promoting them. Using larger survivor spaces helps here as well. Starting with -XX:SurvivorRatio=4 instead of the default 8 and XX:MaxTenuringThreshold=6 will keep objects in the new gen a little longer. If your systems have plenty of RAM, 16GB heap with 10GB new gen will decrease the frequency of pauses even more and should keep roughly the same pause times. There’s very little benefit from having a large old generation as we aren’t keeping more long term objects around with the larger heaps.

Final Thoughts

At this point you should have a better understanding of how different workloads behave in the JVM. No matter the workload, the goal is to keep object promotion to a minimum to limit the length of GC pauses. This is one way we keep our important read and write latency metrics low.

This post is by no means an exhaustive list of everything going on inside the JVM, and there’s quite a few opportunities for improvements after this. That said, we’ve seen some really impressive results just from tuning a handful of settings, so this is a great starting point. Significantly decreasing the objects promoted to old gen can reduce GC pauses by an order of magnitude. We’ll be circling back in the future to examine each of the workloads in greater detail.

Which Cassandra Version Should I Use (2018)?

If you’re starting new or in the 3.0.x series: 3.11.2

  • Apache Cassandra 3.0 is supported until 6 months after 4.0 release (date TBD)

If you’re in 2.x, update to the latest in the series (2.1.20, 2.2.12)

  • Apache Cassandra 2.2 is supported until 4.0 release (date TBD)
  • Apache Cassandra 2.1 is supported until 4.0 release (date TBD). Critical fixes only

 

Long Version

– If you’re starting new or in the 3.0.x series: 3.11.2

Stability wise, both 3.0.16 and 3.11.2 are stable at this point. The biggest advantage of 3.11.2 vs 3.0.16 is the additional features that went into the 3.x series (with x>0).
Not all features are desirable though. (Move away from Materialized Views, since they are marked as experimental on the latest releases).

Despite this, the Slow Query Log and Change-Data-Capture are examples of really useful ones that might make you consider jump to 3.11.2, as you will not get them in the 3.0.x series. JBOD users should also look at CASSANDRA-6696 might be interesting.

– If you’re in 2.x, update to the latest in the series (2.1.20, 2.2.12)

As you might expect, these two releases are very stable, since they have a lot of development time on top of them. If a cluster is still running these Cassandra versions, the best is to upgrade to
the latest releases in the respective series (either 2.1.20 or 2.2.12).

To me, the biggest downside of using these versions, it the fact that they will probably be the last releases of either Cassandra series. The support for critical bugs is here until 4.0 is released (https://cassandra.apache.org/download/) but besides that no major changes or improvements will come.

An additional possible thing to consider, that if there may not be a direct upgrade to the 4.x series, an upgrade may need to be done via 2.x -> 3.x -> 4.x.

But for now, I would stick with the recommendation keep your current major version if you’re already there and not needing anything new!

 


Find out how Pythian can help with all of your Cassandra needs.