Scylla Summit Preview: Grab and Scylla – Driving Southeast Asia Forward

Scylla Summit Preview: Grab and Scylla - Driving Southeast Asia Forward

In the run-up to Scylla Summit 2018, we’ll be featuring our speakers and providing sneak peaks at their presentations. This interview in our ongoing series is with Aravind Srinivasan, Staff Software Engineer of Grab, Southeast Asia’s leading on-demand and same-day logistics company. His presentation at Scylla Summit will be on Grab and Scylla: Driving Southeast Asia Forward.

Aravind, before we get into the details of your talk, we’d like to get to know you a little better. Outside of technology, what do you enjoy doing? What are your interests and hobbies?

I love hiking and biking. But now my (and my wife’s) world revolves around our 2 year old son who keeps us busy. 😃

How did you end up getting into database technologies? What path led you to getting hands-on with Scylla?

I started my career working on filesystems (for Isilon systems — now EMC/Dell) and so was always close to storage. After I decided to get out of the Kernel world and into the services world and moved to Uber, I was fortunate to work for a team which was building a queueing system from scratch where we used Cassandra as our metadata store, which worked ok for a while before we ran into lots of operational headaches. After I moved away from Uber and joined Grab to the Data Platform team, we needed a high-performing, low-overhead metadata store and we bumped into ScyllaDB at that point and that’s where we started our relationship with ScyllaDB.

What will you cover in your talk?

This talk will give an overview of how Grab uses ScyllaDB, the reason we chose ScyllaDB over others for the use cases and our experience so far with ScyllaDB.

Can you describe Grab’s data management environment for us? What other technologies are you using? What does Scylla need to connect and work with?

First and foremost Grab is an AWS shop but our predominant use case for ScyllaDB is with Kafka, which is the ingestion point for ScyllaDB. A couple of use cases also has a Spark job which talks to ScyllaDB directly. 

What is unique about your use case?

The most unique characteristic of our use case is the scale up pattern of the traffic volume (TPS). Generally the traffic volume (TPS) just hikes up and so a store which we use for our use case should be able to scale fast and should have the ability to handle bursts.

Is there anything Scylla Summit attendees need to know in order to get the most out of your talk? What technology or tools should they be familiar with?

Kafka, Stream Processing and some terminologies like TPS, QPS, p99 and other stats.

Thanks, Aravind. By the way, that seems like a perfect segue to highlight that we will have Confluent’s Hojjat Jafarpour talking about Kafka.

If you are interested in learning more about how Grab scaled their hypergrowth across Asia, make sure you register for Scylla Summit today!

 

The post Scylla Summit Preview: Grab and Scylla – Driving Southeast Asia Forward appeared first on ScyllaDB.

Scylla Summit Preview: Rebuilding the Ceph Distributed Storage Solution with Seastar

Scylla Summit Preview: Rebuilding the Ceph Distributed Storage Solution with Seastar
In the run-up to Scylla Summit 2018, we’ll be featuring our speakers and providing sneak peeks at their presentations. This interview in our ongoing series is with Kefu Chai, Software Engineer at Red Hat. Before Red Hat Kefu has worked at Morgan Stanley, VMWare, and EMC. His presentation at Scylla Summit will be on Rebuilding the Ceph Distributed Storage Solution with Seastar.

We’d like to get to know you a little better, Kefu. Outside of technology, what do you enjoy doing? What are your interests and hobbies?

I try to teach myself French in my spare time. And I read sci-fi and play FPS video games sometimes when I am tired of memorizing conjugations. 😃

You have broad experience developing kernel modules, client-side applications, middleware, testing frameworks and HTTP servers. What path led you to getting hands-on with Seastar?

Higher throughput, more IOPS and predictable lower latency have become the “holy grail” of storage solutions. The words sounds familiar to you, right? Because database and storage systems share almost the same set of problems nowadays. It’s natural for us to look for solutions in database technologies. And the Seastar framework behind Scylla is appealing. That’s why we embarked our journey rebuilding Ceph with Seastar

For those not familiar with Ceph, what are its primary features? How would you compare it to other storage options?

Ceph is an open-source distributed storage platform which offers block, object, and filesystem access to the data stored in the cluster. Unlike numbers, it’s often difficult to compare real-world entities. Even an apple could be very different from another apple in different perspectives. Instead, I’d like to highlight some things that differentiate Ceph from other software-defined storage solutions:

  • It has an active user and developer community.
  • It’s a unified storage solution. One doesn’t need to keep multiple different clusters for different use cases.
  • It’s designed to be decentralized and to avoid single point of failure.

What will you cover in your talk?

I will introduce Ceph the distributed storage we are working on, explain the problems we are facing, and then talk about how we rebuilt this system with Seastar.

Though you are sure to get into this deeper in your discussion, what advantages are you seeing in the Seastar framework?

As you might know, I was working on another C++ framework named mordor couple years ago before C++11 brought the future/promise to us. And all of mordor, C++11 and Seastar offer coroutine, such that by calling a blocking call, the library automatically moves another runnable fiber to this thread so this thread is not blocked. But Seastar went further by enforcing the share-nothing model with zero tolerance of locking. I think this alone differentiates Seastar from other coroutine frameworks. It forces developers to re-think their designs.

What were some unique challenges to integrate Seastar to Ceph?

Unlike some other projects based on Seastar, Ceph was not designed from scratch with Seastar. So we need to overcome some more interesting difficulties. Also, Ceph is a very dynamic project, so it’s like trying to catch a guy ten miles ahead running away from you.

Where are you in the process of integration?

We are rebuilding the infrastructures in Ceph using Seastar. It’s almost done.

What are you looking to do next? 

We want to get to the I/O path as soon as possible to understand how Seastar impacts to the performance.

Is there anything Scylla Summit attendees need to know in order to get the most out of your talk? What technology or tools should they be familiar with?

It would be ideal if the attendees have basic understanding of typical threading model used by servers. But I will cover this part briefly also.

Thank you for the time Kefu! Looking forward to seeing you on stage in November!

If you haven’t arranged your own travel plans to attend Scylla Summit 2018, it’s not too late! Don’t delay! Register today!

The post Scylla Summit Preview: Rebuilding the Ceph Distributed Storage Solution with Seastar appeared first on ScyllaDB.

Scylla Manager 1.2 Release Announcement

The Scylla Enterprise team is pleased to announce the release of Scylla Manager 1.2, a production-ready release of Scylla Manager for Scylla Enterprise customers.

Scylla 1.2 focuses on a better control of repair tasks scope, allowing users to run repairs on a subset of Data Centers (DC), keyspaces, and tables, easier deployment, and setup, and improvements in security.

Related links:

Upgrade to Scylla Manager 1.2

Read the upgrade guide carefully. In particular, you will need to redefine scheduled repairs. Please contact Scylla Support team for help in installing and upgrading Scylla Manager.

New features in Scylla Manager 1.2

Debian 8 and Ubuntu 16 packages

Scylla Manager is now available for Debian 8 and Ubuntu 16, packages will be shared in the next few days.

Improved Repair Granularity

In previous releases, one could only use Scylla Manager to run a cluster-wide repair. Starting with Scylla Manager 1.2, scheduled and ad-hoc repairs can be limited to:

  • One or more DataCenters (DC), with --dc flag
  • One node (host) with --host flag
  • Repair with a subset of hosts with –with-hosts flag
  • Repair primary or non-primary token ranges with –token-ranges [pr|npr] flag
  • Keyspace (existed in Scylla Manager 1.1 with a different syntax)
  • Table (existed in Scylla Manager 1.1 with a different syntax)

For example, the above elements can be used to:

  • Run a local DC Repair
  • Repair a specific node, for example after a node restart
  • Repair only a restarted node, and all nodes with overlapping token ranges

For more on running a granularity repair see sctool 1.2 reference, and below on sctool updates

Sctool updates

  • You now need to provide only one node IP when adding a new cluster. For example
    sctool cluster add --host=198.100.51.11 --name=prod-cluster
  • New flags in sctool:
    • --dc, --host, --with-hosts, --token-ranges (see more granular repair above)
    • --fail-fast: Stops the repair process on the first error
  • sctool allows selecting multiple keyspace and table using glob pattern matching, For example:
  • Repair only US East DC:
    sctool repair --cluster mycluster --dc dc-us-east
  • Repair all US DCs, but US West:
    sctool repair --cluster mycluster --dc dc-us-*, !dc-us-west
  • Repair subset of Keyspaces:
    sctool repair --cluster mycluster -K mykeyspaces-*
  • Repair subset of Keyspaces: sctool repair
    --cluster mycluster -K mykeyspace
  • sctool task start: a new --continue flag allow resuming the task from where it last stopped, not from the start.

Security

  • One of the common problems with adding a managed Scylla cluster to Scylla Manager was the task of creating a scylla-manager user and sharing the Scylla Manager public key to all Scylla node. With Scylla Manager 1.2 this task is greatly improved with the introduction of the new scyllamgr_ssh_setup script.Read more about the new script and how to manage a new cluster with Scylla Manager 1.2
  • Two optional new parameters for sctool command ‘cluster add’: ssh-user and ssh-identity-file, allow you to specify a different SSH key and user per cluster
  • Scylla Manager now uses HTTPS by default to access with Scylla nodes REST API. Note that Scylla REST API is still bound to a local IP only, and Scylla Manager uses SSH to connect to the node.

Monitoring

Scylla Grafana Monitoring 2.0 now includes Scylla Manager 1.2 dashboard

The following metrics have been updated in Scylla Manager 1.2

  • Total run tasks metrics definition have changed the name from “status_total” to “run_total”
  • In subsystem “repair” “unit” has changed the name to “task”
  • In subsystem “repair” “keyspace” has been added

The following metric has been removed in Scylla Manager 1.2

  • Subsystem “log” has been removed

About Scylla Manager

Scylla Manager adds centralized cluster administration and recurrent task automation to Scylla Enterprise. Scylla Manager 1.x includes automation of periodic repair. Future releases will provide rolling upgrades, recurrent backup, and more. With time, Scylla Manager will become the focal point of Scylla Enterprise cluster management, including a GUI frontend. Scylla Manager is available for all Scylla Enterprise customers. It can also be downloaded from scylladb.com for a 30-day trial.

The post Scylla Manager 1.2 Release Announcement appeared first on ScyllaDB.

Scylla Summit Preview: Scalable Stream Processing with KSQL, Kafka and Scylla

Scylla Summit Preview: Scalable Stream Processing with KSQL, Kafka and Scylla

In the run-up to Scylla Summit 2018, we’ll be featuring our speakers and providing sneak peeks at their presentations. This interview in our ongoing series is with Hojjat Jafarpour of Confluent. His presentation at Scylla Summit is entitled Scalable Stream Processing with KSQL, Kafka and Scylla.

Hojjat, before we get into your talk, tell us a little about yourself. What do you like to do for fun?

I’m a Software Engineer at Confluent and the creator of KSQL, the Streaming SQL engine for Apache Kafka. In addition to challenging problems in scalable data management, I like traveling and outdoors. We are so lucky to have spectacular places like Yosemite, Lake Tahoe and many more wonders of nature in California and I take any opportunity to plan a getaway to such amazing places.

Kafka seems everywhere these days. What do you believe were the critical factors that drove its rapid adoption versus other options?

There are quite a few factors for the extraordinary success of Kafka. I think one of the main factors is the ability of Kafka to decouple producers and consumers of data. Such decoupling can significantly simplify the data management architecture of enterprises. Kafka, along with its complementary technologies such as KSQL/Streams and Connect can be the central nervous system for any data driven enterprise.

We see many organizations that go through the transformational phase of breaking their monolith into a microservices architecture which has Kafka as the central part of their new architecture.

Real-time data and stream processing raises the spectre of bursty data traffic patterns. You can’t control throughput as easily as with batch processing. Will you cover how to ensure you’re not just dumping data into /dev/null?

Yes, KSQL uses Kafka Streams as the physical execution engine which provides a powerful elasticity model. You can easily add new computing resource as needed for your stream processing applications in KSQL.

KSQL is a powerful tool to find and enrich data that’s coming in from live streams and topics. For people familiar with Cassandra Query Language (CQL), what are some key similarities or differences to note?

The main difference between KSQL and CQL is in their data processing model. Unlike CQL, KSQL is a Streaming SQL engine where our queries are continuous queries. When you run a query in KSQL it will keep reading input and will generate output continuously, unless you explicitly terminate the query. On the other hand, similar to CQL, one of our main goals in KSQL is to provide a familiar tool for our users to write stream processing applications. It’s much easier to learn SQL than Java. KSQL aims to make stream processing available for much broader audience than only hardcore software engineers!

Besides a general understanding of Kafka and KSQL, is there anything else Scylla Summit attendees familiarize themselves with to get the most out of your session?

Just a general understanding of Kafka is enough. I will provide a brief introduction to KSQL in the beginning of the talk.

Thanks very much for your time!

If you’d like to hear more of what Hojjat has to say, yet haven’t registered for Scylla Summit, here’s a handy button:

The post Scylla Summit Preview: Scalable Stream Processing with KSQL, Kafka and Scylla appeared first on ScyllaDB.

Registration Now Open for DataStax Accelerate!

I’m very excited to announce that registration is now open for DataStax Accelerate—the world’s premier Apache CassandraTM conference. The call for papers is also open.

Today’s enterprises are facing massive shifts in the use of real time data to create differentiated experiences through internally and externally facing applications. Simple, single-cloud deployments are insufficient for these globally distributed and scalable applications and requires a hybrid and multi-cloud approach.

This demands new ways to think about data management and how enterprises can make the most of their data while still keeping it portable and secure.

To discuss these trends and the technologies that underpin them, we are bringing together the best minds and most cutting-edge innovations in distributed database technology for DataStax Accelerate.  This premier conference will be held May 21-23, 2019 at the Gaylord National Resort and Convention Center in Oxon Hill, Maryland, just outside Washington, D.C.

Mark your calendar—because you will not want to miss it.

DataStax Accelerate will feature separate executive and technical tracks, as well as training, hands-on sessions, an exhibitor pavilion, networking, and a full presentation agenda from DataStax executives, customers, and partners.

You’ll be able to learn from your peers, industry experts, and thought leaders on how Apache Cassandra and DataStax can help you build and deploy game-changing enterprise applications and easily scale your data needs to fit your company’s growth and today’s hybrid and multi-cloud world.

Additionally, you can learn from:

  • Deep technical sessions for developers, administrators, and architects on DataStax and Apache Cassandra internals, theory, and operations
  • An informative and engaging showcase of some of the world’s largest enterprise companies that are running their strategic business initiatives with DataStax
  • In-depth and hands-on workshops around DataStax Enterprise 6.0 advanced technologies, like Graph, Solr, Kafka and SPARK
  • Face-time with DataStax engineers in various workshops and breakout sessions
  • Designer and innovator sessions on how to accelerate your hybrid and multi-cloud deployments using DataStax’s unique masterless database architecture that is always-on, active everywhere, and infinitely scalable

Register now!

Scylla Enterprise 2018.1.5 Release Announcement

Scylla Enterprise 2018.1.5 Release Announcement

The Scylla team is pleased to announce the release of Scylla Enterprise 2018.1.5, a production-ready Scylla Enterprise minor release. Scylla Enterprise 2018.1.5 is a bug fix release for the 2018.1 branch, the latest stable branch of Scylla Enterprise. In addition to bug fixes, 2018.1.5 includes major improvements in single partition scans. For more details, refer to the Efficient Query Paging blog post.

  • More about Scylla Enterprise here.

Scylla Enterprise customers are encouraged to upgrade to Scylla Enterprise 2018.1.5 in coordination with the Scylla support team. Note that the downgrade procedure from 2018.1.5, if required, is slightly different from previous releases. For instructions, refer to the Downgrade section in the Upgrade guide.

Related Links

Issues fixed by this release, with open source references, if applicable:

  • CQL: DISTINCT was ignored with IN restrictions #2837 – CQL: Dropping a keyspace with a user-defined type (UDT) resulted in an Error #3068
  • CQL: Selecting from a partition with no clustering restrictions (single partition scan) might have resulted in a temporary loss of writes #3608
  • CQL: Fixed a rare race condition when adding a new table, which could have generated an exception #3636
  • CQL: INSERT using a prepared statement with the wrong fields may have generated a segmentation fault #3688
  • CQL: MIN/MAX CQL aggregates were broken for timestamp/timeuuid values. For example SELECT MIN(date) FROM ks.hashes_by_ruid; where date is of type timestamp #3789
  • CQL: TRUNCATE request could have returned a succeeds response even if it failed on some replicas #3796
  • CQL: In rare cases, SELECT with LIMIT could have returned a smaller number of values than was necessary #3605 – Performance: eviction of large partitions may have caused latency spikes #3289
  • Performance: a mistake in static row digest calculations may have lead to redundant read repairs #3753, #3755
  • Performance: In some cases, it was noted that scylla nodes were stalling due to the max_task_backlog exceeding. Preventive measures have been implemented to keep this from happening. Enterprise issue #555
  • Stability: In some cases following a reset, the coordinator was sending a write request with the same ID as the request is sent prior to the restart. This triggered an assert in the coordinator. #3153
  • Stability: on rare cases eviction from invalidated partitions may cause an infinite loop. Enterprise issues #567
  • Monitoring: Added a counter for speculative retries #3030

The post Scylla Enterprise 2018.1.5 Release Announcement appeared first on ScyllaDB.

Anomalia Machina 2 – Automatic Provisioning: Massively Scalable Anomaly Detection with Apache Kafka and Apache Cassandra

Automatic provisioning of Apache Kafka and Apache Cassandra clusters using Instaclustr’s Provisioning API

1 Introduction

The Anomalia Machina has kicked off, and as you might be aware, it is going to do some large things on Instaclustr’s Open Source based platform. This application will primarily use Apache Kafka and Apache Cassandra hosted on Instaclustr’s managed services. The building of Anomalia Machina is going to be iterative and experimental. We want to be ready in order to support this approach, so we want to develop some tools to help us setup the infrastructure for such a colossal task.

The first thing that comes to mind while working with platforms of clusters is to have a tool which will easily create the required infrastructure automatically: Clusters, Network connectivity, firewall rules etc. Thanks to Instaclustr’s provisioning API, we can accomplish this task with ease. The second thing to accomplish is to load the clusters with Data.

Let us look into how provisioning (this blog) and load generation for Cassandra/Kafka clusters (next blog) can be done.

2 Cluster Provisioning

The provisioning program can be completely build with any programming language/script of your liking, it’s that simple! So I chose to start with bash. I started by reading the Instaclustr Provisioning API support article. The provisioning API can be used to provision both Cassandra and Kafka clusters with appropriate configuration parameters.

I planned to create a JSON template (or skeleton) which will be filled out by a simple bash script, and can then hit the provisioning API to fire up a Cassandra/Kafka cluster. But, I first need to create an account on the Instaclustr console to have the authority to do this. So after creating an account, I am now equipped with a username-password to get started.

The first piece of puzzle is to have a main script which will drive the whole thing and invoke some functions to get the provisioning done. I also need a configuration file which will act as my primary input and have all the information about a cluster that I want to create. Finally, there should be a JSON template which will be used to create provisioning requests similar to those given in support article.

The scripts could be executed from any machine, server, cloud instance etc. I set it up on my laptop and is quite portable in case there is a need to move it. I created a base directory called ‘provisioner’ and a configuration file ‘configuration.sh’. The configuration file holds all the required parameters. Then, I started with the master script ‘provisioner.sh’, the script takes a few command line arguments to keep things flexible.

Now, after working out all the arguments, creating necessary directories and reading configuration; the master script is ready to play some function which will take care of further tasks. I created a new script with a function to read the json template and replace it with parameters from the configuration file. The ‘JSON request’ needs to be posted to the Instaclustr API URL, and if everything including my credentials is good, it should start provisioning a brand new cluster for me.

Wait! Even though my program looks way simpler for a seasoned programmer, and it just does some basic stuff, it still gave me a hard time to get it right for the first few times. I stumbled upon some hurdles like ignorance of syntax, (over)confidence about bash skills and lack of correct parameters on Instaclustr. In addition, bash error reporting is not really great. For example, if you miss initializing a variable, it is used but with a blank value.  

Finally, I managed to get a cluster provisioning. But the program ends abruptly and we have no way of knowing whether it was successful or not. The support article actually mentions a remedy: “If the JSON is valid (see allowed values below), the API will respond with 202 Accepted and a JSON containing the cluster ”. That means I need to handle the response and look for the success.

Handling a JSON response using a bash function sounds a bit difficult to me. Although I can grep the required message out from the response, I had already thought about a simple python function to parse the response with ‘JSON’ module and return ‘Cluster Id’. Creating a small python script was too easy and my bash function can now know the Cluster Id on success.

I could have stopped at this point but, the success of creating a cluster with an ad hoc request encouraged me to do something more. I looked at the support article further down and realized that I can actually poll the cluster status, and make the provisioner wait until the cluster is fully functional.

The reason behind this is that cluster provisioning takes some time and it depends upon the size of the cluster. This just required me to write one more python function which will look for “clusterStatus” in the status JSON. The bash has one more function to call ‘cluster status’.

Here it is, a provisioner which uses a small configuration file and creates Cassandra/ Kafka clusters on the fly. I had to give some afterthought to the setup of provisioner and I collected the scripts dealing with Instaclustr platform in a new directory ‘InstaclustrAPI’. This package can be used in any other program to perform provisioning, status check etc.

Hope my small contribution helps Paul in building ‘Anomalia Machina’ and if it becomes a milestone in the history of Instaclustr/Open Source Software/Human History, I have a small credit 😛

 

Please find the code for provisioner here: https://github.com/instaclustr/provisioner

We would like to hear from you about your experience, comments, and suggestions.

The post Anomalia Machina 2 – Automatic Provisioning: Massively Scalable Anomaly Detection with Apache Kafka and Apache Cassandra appeared first on Instaclustr.

Custom commands in cstar

Welcome to the next part of the cstar post series. The previous post introduced cstar and showed how it can run simple shell commands using various execution strategies. In this post, we will teach you how to build more complex custom commands.

Basic Custom Commands

Out of the box, cstar comes with three commands:

$ cstar
usage: cstar [-h] {continue,cleanup-jobs,run} ...

cstar

positional arguments:
  {continue,cleanup-jobs,run}
    continue            Continue a previously created job (*)
    cleanup-jobs        Cleanup old finished jobs and exit (*)
    run                 Run an arbitrary shell command

Custom commands allow extending these three with anything one might find useful. Adding a custom command to cstar is as easy as placing a file to ~/.cstar/commands or /etc/cstar/commands. For example, we can create ~/.cstar/commands/status that looks like this:

#!/usr/bin/env bash
nodetool status

With this file in place, cstar now features a brand new status command:

$ cstar
usage: cstar [-h] {continue,cleanup-jobs,run,status} ...

cstar

positional arguments:
  {continue,cleanup-jobs,run,status}
    continue            Continue a previously created job (*)
    cleanup-jobs        Cleanup old finished jobs and exit (*)
    run                 Run an arbitrary shell command
    status

A command like this allows us to stop using:

cstar run --command "nodetool status" --seed-host <host_ip>

And use a shorter version instead:

cstar status --seed-host <host_ip>

We can also declare the command description and default values for cstar’s options in the command file. We can do this by including commented lines with a special prefix. For example, we can include the following lines in our ~/.cstar/commands/status file:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: all
# C* description: Run nodetool status

nodetool status

Once we do this, the status will show up with a proper description in cstar’s help and running cstar status --seed-host <host_ip> will be equivalent to:

cstar status --seed-host <host_ip> --cluster-parallel --dc-parallel --strategy all

When cstar begins the execution of a command, it will print an unique ID of the command being run. This ID is needed for resuming a job but more on this later. We also need the job ID to examine the output of the commands. We can find the output in:

$ ~/.cstar/jobs/<job_id>/<hostname>/out

Parametrized Custom Commands

When creating custom commands, cstar allows declaring custom arguments as well. We will explain this feature by introducing a command that deletes snapshots older than given number of days.

We will create a new file, ~/.cstar/commands/clear-snapshots, that will start like this:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: all
# C* description: Clear snapshots older than given number of days
# C* argument: {"option":"--days", "name":"DAYS", "description":"Snapshots older than this many days will be deleted", "default":"7", "required": false}

The new element here is the last line starting with # C* argument:. Upon seeing this prefix, cstar will parse the remainder of the line as a JSON payload describing the custom argument. In the case above, cstar will:

  • Use --days as the name of the argument.
  • Save the value of this argument into a variable named DAYS. We will see how to access this in a bit.
  • Associate a description with this argument.
  • Use 7 as a default value.
  • Do not require this option.

With this file in place, cstar already features the command in its helps:

$ cstar
usage: cstar [-h] {continue,cleanup-jobs,run,status,clear-snapshots} ...

cstar

positional arguments:
  {continue,cleanup-jobs,run,clear-snapshots}
    continue            Continue a previously created job (*)
    cleanup-jobs        Cleanup old finished jobs and exit (*)
    run                 Run an arbitrary shell command
    status              Run nodetool status
    clear-snapshots     Clear snapshots older than given number of days
    

$ cstar clear-snapshots --help
usage: cstar clear-snapshots [-h] [--days DAYS]
                             [--seed-host [SEED_HOST [SEED_HOST ...]]]
                             ...
                             <other default options omitted>

optional arguments:
 -h, --help            show this help message and exit
 --days DAYS
                       Snapshots older than this many days will be deleted
 --seed-host [SEED_HOST [SEED_HOST ...]]
                       One or more hosts to use as seeds for the cluster (edited)
 ...
 <other default options omitted>

Now we need to add the command which will actually clear the snapshots. This command needs to do three things:

  • Find the snapshots that are older than given number of days.
    • We will use the -mtime filter of the find utility:
    • find /var/lib/cassandra/*/data/*/*/snapshots/ -mtime +"$DAYS" -type d
    • Note we are using "$DAYS" to reference the value of the custom argument.
  • Extract the snapshot names from the findings.
    • We got absolute paths to the directories found. Snapshot names are the last portion of these paths. Also, we will make sure to keep each snapshot name only once:
    • sed -e 's#.*/##' | sort -u
  • Invoke nodetool clearsnapshot -t <snapshot_name> to clear each of the snapshots.

Putting this all together, the clear-snapshots file will look like this:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: all
# C* description: Clear up snapshots older than given number of days
# C* argument: {"option":"--days", "name":"DAYS", "description":"Snapshots older than this many days will be deleted", "default":"7", "required": false}

find /var/lib/cassandra/data/*/*/snapshots/ -mtime +"$DAYS" -type d |\
sed -e 's#.*/##' |\
sort -u |\
while read line; do nodetool clearsnapshot -t "${line}"; done

We can now run the clear-snpahsots command like this:

$ cstar clear-snapshots --days 2 --seed-host <seed_host>

Complex Custom Commands

One of the main reasons we consider cstar so useful is that the custom commands can be arbitrary shell scripts, not just one-liners we have seen so far. To illustrate this, we are going to share two relatively complicated commands.

Upgrading Cassandra Version

The first command will cover a rolling upgrade of the Cassandra version. Generally speaking, the upgrade should happen as quickly as possible and with as little downtime as possible. This is the ideal application of cstar’s topology strategy: it will execute the upgrade on as many nodes as possible while ensuring a quorum of replicas stays up at any moment. Then, the upgrade of a node should follow these steps:

  • Create snapshots to allow rollback if the need arises.
  • Upgrade the Cassandra installation.
  • Restart the Cassandra process.
  • Check the upgrade happened successfully.

Clearing the snapshots, or upgrading SSTables is something that should not be part of the upgrade itself. Snapshots being just hardlinks will not consume excessive space and Cassandra (in most cases) can operate with older SSTable versions. Once all nodes are upgraded, these actions are easy enough to perform with dedicated cstar commands.

The ~/.cstar/commands/upgrade command might look like this:

#!/usr/bin/env bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: topology
# C* description: Upgrade Cassandra package to given target version
# C* argument: {"option":"--snapshot-name", "name":"SNAPSHOT_NAME", "description":"Name of pre-upgrade snapshot", "default":"preupgrade", "required": false}
# C* argument: {"option":"--target-version", "name":"VERSION", "description":"Target version", "required": true}

# -x prints the executed commands commands to standard output
# -e fails the entire script if any of the commands fails
# -u fails the script if any of the variables is not bound
# -o pipefail instrucs the interpreter to return right-most non-zero status of a piped command in case of failure
set -xeuo pipefail

# exit if a node is already on the target version
if [[ $(nodetool version) == *$VERSION ]]; then
  exit 0
fi

# create Cassandra snapshots to allow rollback in case of problems
nodetool clearsnapshot -t "$SNAPSHOT_NAME"
nodetool snapshot -t "$SNAPSHOT_NAME"

# upgrade Cassandra version
sudo apt-get install -y cassandra="$VERSION"

# gently stop the cassandra process
nodetool drain && sleep 5 && sudo service cassandra stop

# start the Cassandra process again
sudo service cassandra start

# wait for Cassandra to start answering JMX queries
for i in $(seq 60); do
    if ! nodetool version 2>&1 > /dev/null; then
        break
    fi
    sleep 1s
done

# fail if the upgrade did not happen
if ! [[ $(nodetool version) == *$VERSION ]]; then
  exit 1
fi

When running this command, we can be extra-safe and use the --stop-after option:

$ cstar upgrade --seed-host <host_name> --target-version 3.11.2 --stop-after 1

This will instruct cstar to upgrade only one node and exit the execution. Once that happens, we can take our time to inspect the node to see if the upgrade went smoothly. When we are confident enough, we can resume the command. Output of each cstar command starts with a highlighted job identifier, which we can use with the continue command:

$ cstar continue <job_id>

Changing Compaction Strategy

The second command we would like to share performs a compaction strategy change in a rolling fashion.

Compaction configuration is a table property. It needs an ALTER TABLE CQL statement execution to change. Running a CQL statement is effective immediately across the cluster. This means once we issue the statement, each node will react to the compaction change. The exact reaction depends on the change, but it generally translates to increased compaction activity. It is not always desirable to have this happen: compaction can be an intrusive process and affect the cluster performance.

Thanks to CASSANDRA-9965, there is a way of altering compaction configuration on a single node via JMX since Cassandra version 2.1.9. We can set CompactionParametersJson MBean value and change the compaction configuration the node uses. Once we know how to change one node, we can have cstar do the same but across the whole cluster.

Once we change the compaction settings, we should also manage the aftermath. Even though the change is effective immediately, it might take a very long time until each SSTable undergoes a newly configured compaction. The best way of doing this is to trigger a major compaction and wait for it to finish. After a major compaction, all SSTables are organised according to the new compaction settings and there should not be any unexpected compaction activity afterwards.

While cstar is excellent in checking which nodes are up or down, it does not check for other aspects of nodes health. It does not have the ability to monitor compaction activity. Therefore we should include the wait for major compaction in the command we are about to build. The command will then follow these steps:

  • Stop any compactions that are currently happening.
  • Set the CompactionParametersJson MBean to the new value.
    • We will use jmxterm for this and assume the JAR file is already present on the nodes.
  • Run a major compaction to force Cassandra to organise SSTables according to the new setting and make cstar wait for the compactions to finish.
    • This step is not mandatory. Cassandra would re-compact the SSTables eventually.
    • Doing a major compaction will cost extra resources and possibly impact the node’s performance. We do not recommend doing this at all nodes in parallel.
    • We are taking advantage of the topology strategy which will guarantee a quorum of replicas free from this load at any time.

The ~/.cstar/commands/change-compaction command might look like this:

#! /bin/bash
# C* cluster-parallel: true
# C* dc-parallel: true
# C* strategy: topology
# C* description: Switch compaction strategy using jmxterm and perform a major compaction on a specific table
# C* argument: {"option":"--keyspace-name", "name":"KEYSPACE", "description":"Keyspace containing the target table", "required": true}
# C* argument: {"option":"--table", "name":"TABLE", "description":"Table to switch the compaction strategy on", "required": true}
# C* argument: {"option":"--compaction-parameters-json", "name":"COMPACTION_PARAMETERS_JSON", "description":"New compaction parameters", "required": true}
# C* argument: {"option":"--major-compaction-flags", "name":"MAJOR_COMPACTION_FLAGS", "description":"Flags to add to the major compaction command", "default":"", "required": false}
# C* argument: {"option":"--jmxterm-jar-location", "name":"JMXTERM_JAR", "description":"jmxterm jar location on disk", "required": true}

set -xeuo pipefail

echo "Switching compaction strategy on $KEYSPACE.$TABLE"
echo "Stopping running compactions"
nodetool stop COMPACTION
echo "Altering compaction through JMX..."
echo "set -b org.apache.cassandra.db:columnfamily=$TABLE,keyspace=$KEYSPACE,type=ColumnFamilies CompactionParametersJson $COMPACTION_PARAMETERS_JSON"  | java -jar $JMXTERM_JAR --url 127.0.0.1:7199 -e

echo "Running a major compaction..."
nodetool compact ${KEYSPACE} ${TABLE} $MAJOR_COMPACTION_FLAGS

The command requires options specifying which keyspace and table to apply the change on. The jmxterm location and the new value for the compaction parameters are another two required arguments. The command also allows passing in flags to the major compaction. This is useful for cases when we are switching to SizeTieredCompactionStrategy, where the -s flag will instruct cassandra to produce several size-tiered files instead of a single big file.

Running the nodetool compact command will not return until the major compaction finishes. This will cause the execution on one node to not complete until this happens. Consequently, cstar will see this long execution and dutifully wait for it to complete before moving on to other nodes.

Here is an example of running this command:

$ cstar change-compaction --seed-host <host_name> --keyspace tlp_stress --table KeyValue --jmxterm-jar-location /usr/share/jmxterm-1.0.0-uber.jar --compaction-parameters-json "{\"class\":\"LeveledCompactionStrategy\",\"sstable_size_in_mb\":\"120\"}"

This command also benefits from the --stop-after option. Moreover, once all nodes are changed, we should not forget to persist the schema change by doing the actual ALTER TABLE command.

Conclusion

In this post we talked about cstar and its feature of adding custom commands. We have seen:

  • How to add a simple command to execute nodetool status on all nodes at once.
  • How define custom parameters for our commands, which allowed us to build a command for deleting old snapshots.
  • That the custom commands are essentially regular bash scripts and can include multiple statements. We used this feature to do a safe and fast Cassandra version upgrade.
  • That the custom commands can call external utilities such as jmxterm, which we used to change compaction strategy for a table in a rolling fashion.

In the next post, we are going to look into cstar’s cousin called cstarpar. cstarpar differs in the way commands are executed on remote nodes and allows for heavier operations such as rolling reboots.

Hooking up Spark and Scylla: Part 3

Hooking up Spark and Scylla: Part 3

Welcome back! Last time, we discussed how Spark executes our queries and how Spark’s DataFrame and SQL APIs can be used to read data from Scylla. That concluded the querying data segment of the series; in this post, we will see how data from DataFrames can be written back to Scylla.

As always, we have a code sample repository with a docker-compose.yaml file with all the necessary services we’ll need. After you’ve cloned it, start up the services with docker-compose:

After that is done, launch the Spark shell as in the previous posts in order to run the samples in the post:

Saving static DataFrames

Let’s start with a simple example. We’ll create a DataFrame from a list of objects of the same type, and see how it can be written to Scylla. Here’s the definition for our DataFrame; note that the comments in the snippet indicate the resulting values as you’d see them in the Spark shell:

The data types are fairly rich, containing primitives, lists and maps. It’ll be interesting to see how they are translated when writing the DataFrame into Scylla.

Next, add the following imports from the DataStax Cassandra connector for Spark to enrich the Spark API:

Now, similarly to how DataFrames are read from Scylla using spark.read.cassandraFormat (see the previous post for the details), they can be written into Scylla by using a DataFrameWriter available on the DataFrame:

Spark, as usual, is not running anything at this point. To actually run the write operation, we use writer.save:

Unfortunately (or fortunately), the Cassandra connector will not automatically create tables for us when saving to non-existent targets. Let’s fire up cqlsh in another terminal:

We can go back to our Spark shell and try running writer.save() again. If all went well, no exceptions should be thrown and nothing will be printed. When the Spark shell prompt returns, try running the following query in cqlsh – you should see similar results:

Great! So that was fairly easy. All we had to do was match the column names and types to the DataFrame’s schema. Let’s see what happens when a column’s type is mismatched; create another table with name’s type changed to bigint:

And write the DataFrame to it:

You should see a fairly large amount of stack traces scrolling by, and embedded somewhere within them, the NumberFormatException we’ve shown here. It’s not very informative; we can infer from our simple example that the problem is with the name column, but with a larger application and schema this might be harder.

When column names are mismatched, the error message is slightly friendlier; this is the exception we’d get when name is misnamed:

Some type mismatched won’t even throw exceptions; the connector will happily coerce booleans to text, for example, when saving boolean fields from DataFrames into text columns. This is not great for data quality. The astute reader will also note that the NumberFormatException from before would not be thrown for strings that only contain numbers, which means that some datasets might behave perfectly well, while others would fail.

The connector contains some useful infrastructure that we could use to implement programmatic checks for schema compatibility. For example, the TableDef data type represents a Scylla table’s schema. We can convert a Spark DataFrame’s schema to a TableDef like so:

The fromDataFrame function mapped every Spark type to the corresponding Scylla type. It has also picked the first column in the DataFrame schema as the primary key for the resulting table definition.

Alternatively, we can also retrieve a TableDef from Scylla itself using the Schema data type. This time, we need to initialize the connector’s session manually and then use it to retrieve the schema:

The TableDef contains the definitions of all columns in the tables, the partition keys, indices and so forth. You should of course use those fields rather than compare the generated CQL.

So, we can formulate a naive solution based on these data types for checking whether our schemas are compatible. This could serve as runtime validation prior to starting our data transformation jobs. For a complete solution, you’d also need to take into consideration which data types are sensibly assignable to other data types; smallint is assignable to a bigint, for example, but not the other way around.

To end this section, let’s see how a new table can be created from a DataFrame. The createCassandraTable method can be used to create a new, empty table in Scylla using the DataFrame’s schema:

NOTE: The support for writing RDDs back to Scylla is more extensive than that for DataFrames; for example, when writing RDDs, one can specify that elements should be appended to list columns rather than replacing them. The DataFrame API is somewhat lagging behind in this regard. We will expand more on this in a subsequent post in this series, in which we will describe the Scylla Migrator project.

Execution details

Now that we know how to execute the write operations, it’d be good to understand what technically is happening as they are executed. As you might recall, the RDDs that underlie DataFrames are comprised of partitions; when executing a transformation on a DataFrame, the transformation is executed on each partition in parallel (assuming there are adequate compute resources).

Write operations are no different. They are executed in parallel on each partition by translating the rows of each partition into INSERT statements. This is can be seen clearly by using Scylla’s tracing capabilities. Let’s truncate our table and turn on the probabilistic tracing with a probability of 1 using nodetool:

We’ll execute the write operation again, but this time, we will create a much larger dataframe using a small helper function:

We can reset the tracing probability to 0 now using the same nodetool subcommand. To view the tracing results, we can query the system_traces.sessions table:

The results on your local instance should look similar; you should see many entries in the sessions table for execution of INSERT statements. The connector will prepare the statement for inserting the data and execute batches of inserts that reuse the prepared statement.

Another interesting aspect of the execution of the table writes is the use of laziness. Say we’re reading back the big table we just wrote into a DataFrame, and we’d like to write it back to a new table, like so:

Instead of reading the entire source table into Spark’s memory and only then writing it back to Scylla, the connector will lazily fetch batches of rows from Scylla and pipe them into the writer. If you recall, RDDs are defined by a function Iterator[T] => Iterator[U]. When the DataFrame is created in the first line in the snippet, the connector creates an Iterator (see here) that when pulled, would fetch the next batch from Scylla.

When the DataFrame is written into Scylla on the last line, that iterator has not been pulled yet; no data has been fetched from Scylla. The TableWriter class in the connector will create another iterator (see here), on top of the source iterator, that will build batches of INSERT statements.

The overall effect is that the loop that will iterate the batch iterator and insert the batches will cause the source to lazily fetch data from Scylla. This means that only the data needed for the batch being inserted will be fetched into memory. That’s a very useful property that you can exploit for building ETL processes!

It should be noted that this property is only true if the stages between the source and the writer did not contain any wide transformations. Those transformations would cause a shuffle to be performed (see the previous post for more details on this) and subsequently the entire table would be loaded into memory.

Summary

You should now be equipped to write Spark jobs that can execute queries on ScyllaDB, create DataFrames from the results of those queries and write the DataFrames back to Scylla — into existing tables or tables that need to be created.

Up until now, all workloads we’ve described are typically described as batch workloads: the entirety of the dataset is available up front for processing and its size is known. On our next post, we will discuss streaming workloads, in which those two conditions aren’t necessarily true. We’ll put together an application that uses everything we’ve seen up until now to process streaming workloads, write them into Scylla and serve queries using the data from Scylla. Stay tuned!

The post Hooking up Spark and Scylla: Part 3 appeared first on ScyllaDB.

Instaclustr releases support for new Google Cloud Platform regions

Instaclustr first introduced support for Google Cloud Platform in November 2016. Since that time we have seen a steady take-up by customers and the Instaclustr Managed Platform running on GCP is a well-proven solution.

In the time since our first release, Google has been very busy adding regions for GCP and we have recently updated our system and support provisioning of Apache Cassandra and Apache Kafka clusters in the following GCP regions:

  • US-Central-1 (Iowa)
  • US-West-1 (Oregon)
  • US-West-2 (Los Angeles)
  • US-East-1 (South Carolina)
  • US-East-4 (North Virginia)
  • NorthAmerica-NorthEast-1 (Montreal)
  • SouthAmerica-East-1 (Sao Paulo)
  • Europe-West-1 (Belgium)
  • Europe-West-2 (London)
  • Europe-West-3 (Frankfurt)
  • Europe-West-4 (Netherlands)
  • Europe-North-1 (Finland)
  • Asia-South-1 (Mumbai)
  • Asia-SouthEast-1 (Singapore)
  • Asia-East-1 (Taiwan)
  • Asia-NorthEast-1 (Tokyo)
  • Australia-SouthEast-1 (Sydney)

All regions are supported for both our Run In Instaclustr’s Account model (pricing inclusive of GCP charges) and Run In Your Own Account (you pay charges direct to Google. Full pricing for the Run In Instaclustr’s Account model is available via our console. For more details on the Run in Your Own Account model, please contact sales@instaclustr.com.

The post Instaclustr releases support for new Google Cloud Platform regions appeared first on Instaclustr.