6 Reasons to Switch from DataStax to Scylla

Scylla and DataStax Enterprise have a common lineage in Apache Cassandra. Yet whereas DataStax built their enterprise product around the existing Java code base of Cassandra, Scylla is a complete rewrite from the ground up in C++, compatible in every way from disk formats to wire protocols, yet optimized for Linux and modern multiprocessor, multicore NUMA architecture servers.

Not being tied to legacy code gives Scylla specific advantages in terms of performance and reliability — such as no pauses for JVM Garbage Collection.

To help you understand some of the architectural design decisions and operational benefits of Scylla we’ve written a whitepaper on the 6 Reasons to Switch from DataStax to Scylla, highlighting our advantages and comparing features between the two databases. It covers everything: price-performance, consistency of performance, operational simplicity, maintainability, improved functionality, ease of migration and a side-by-side feature comparison.

If you are considering DataStax Enterprise and it seems a good fit for your use case, also consider reading this whitepaper, and then benchmarking it side-by-side with Scylla.

DISCOVER 6 REASONS TO SWITCH FROM DATASTAX TO SCYLLA

Deepen Your Knowledge

If after reading this paper you still have questions, we’d invite you to contact us, or interact directly with our engineers and your fellow big data practitioners in our Slack channel. Also feel free to check out our on-demand webinars, such as our Scylla Virtual Workshops. Or take a free course in Scylla University to learn everything from how to get started to advanced topics in database monitoring and administration.

The post 6 Reasons to Switch from DataStax to Scylla appeared first on ScyllaDB.

Getting the Most out of Lightweight Transactions in Scylla

Lightweight transactions (abbreviated as LWT) appeared in Cassandra back in 2012 when it was a revolutionary feature. Since that time we have seen various comparable implementations in other NoSQL databases and a new protocol for consensus, Raft. So how is Scylla adding LWT now relevant to you?

Well, for one, Cassandra’s implementation of LWT hasn’t necessarily worked as intended. There are many issues with the implementation the Cassandra team has admitted they won’t fix. As well, Cassandra requires four round trips to process each query using LWT, which generally causes poor performance and can throw timeout exceptions.

Cassandra’s implementation of LWT requires four round trips to conduct a single transaction.

Scylla’s implementation of LWT requires only three round trips, making it more efficient than Cassandra’s LWT.

We believe we solved how to do LWTs far more efficiently in Scylla. If you have been using Cassandra’s LWTs as-is, you may find performance under Scylla to be a breath of fresh air. If you avoided using LWTs because of poor performance, you may want to reconsider them now.

Lightweight transactions in Scylla are not just for the sake of Cassandra compatibility. Our product has its own unique voice with focus on performance, observability and also — versatility. SERIAL mode is the backbone of Scylla’s DynamoDB compatible interface, known as Alternator, and will serve internally as a building block for many upcoming features.

What is a Lightweight Transaction?

While users familiar with Apache Cassandra may be familiar with the term, let’s make sure to define what a “lightweight transaction” is so we’re all on the same page. In a traditional SQL RDBMS a “transaction” is a logical unit of work — a group of tasks. It comes along with an entire set of expectations, which have become known as ACID properties. In Scylla, as in Cassandra, lightweight transactions are limited to a single conditional statement, which allows an “atomic compare and set” operation. That is, it checks if a condition is true, and if so, it conducts the transaction. If the condition is not met, the transaction does not go through.

They were called “lightweight” since they do not truly lock the database for the transaction. Instead, it uses a consensus protocol, in this case Paxos, to ensure there is agreement between the nodes to commit the change.

While there are many long discussions on whether LWTs for Cassandra and Scylla make them a “true” ACID-compliant database, for a user, the most important thing is the capacity to do conditional updates. And that is what we will focus on in this blog post.

A Practical Example: An LWT Banking App

Is transaction support necessary when working with Scylla? After all, Scylla QUORUM writes are pretty durable, and QUORUM reads guarantee monotonicity.

Let’s take a look at an example. Rumour has it, ACID transactions are required for financial applications, so I’ll write a small banking application if only to see if it’s true or not. Besides, many apps for businesses with lower regulatory demands have a similar data model: a cryptocurrency exchange, an online game, a mobile operator customer portal. My application will be extremely scalable: both in transactions per second and the number of client accounts.

At the core of the app will reside an account ledger — a registry of accounts of all existing clients and all of the transactions associated with each account. It can be difficult to partition horizontally, since no matter how it is partitioned, most transactions will require changes at multiple database nodes:

CREATE TABLE lightest.accounts (
     bic TEXT, -- bank identifier code
     ban TEXT, -- bank account number within the bank
     balance DECIMAL, -- account balance
     pending_transfer UUID, -- will be used later
     pending_amount DECIMAL, -- will be used later
     PRIMARY KEY((bic, ban))
)

As you can see, I chose to partition the ledger by both Bank Identification Code (BIC) and Bank Account Number (BAN) — bank and account identifiers. I could have chosen the bank identifier alone. The choice impacts the ratio of transfers which use a single partition, and thus require a single lightweight transaction, vs transfers between partitions, possibly residing on different data nodes, which require multiple lightweight transactions.

The fewer columns are used in a partition key, the larger each partition can become. Scylla isn’t particularly in love with oversized partitions, since they can lead to imbalance in load distribution among shards. Lightweight transactions don’t play well with them either. When a partition is highly contended (i.e. used by a lot of concurrent requests) a statement may have to retry to obtain a quorum of promises. Even with small partitions the real world transactions may surprise us, e.g. a single player or organization may participate in most transactions, and thus become a hotspot. I’ll try to look into this scenario as well, by generating an appropriate transaction distribution. This will highlight some dark corners of LWT design.

The supporting code for the app is available at https://github.com/kostja/lightest. It’s a Go app, and I chose Go for its excellent support for concurrency. It is also using Scylla’s shard-aware GoCQL driver, which performs much better for any kind of Scylla workload, including LWT. Whatever you tell lightest to do it will do using a lot of workers — as many workers as there are cores, multiplied by four. lightest aims at being close to a real world workload, part of which is using a normal random distribution for the data and queries it generates. The central test implemented by lightest is money transfers between two accounts, especially if they reside on different partitions. But I will begin with some simpler tests.

First, let’s populate the ledger by registering accounts. To register an account lightest uses the following conditional insert:

INSERT INTO accounts (bic, ban, balance, pending_amount)
VALUES (?, ?, ?, 0) IF NOT EXISTS

The words IF NOT EXISTS make this command different from a standard tunable consistency insert. Before such insert is applied, Scylla reads existing data and evaluates a condition against it, in our case, checks if the row is not yet present. This ensures that an existing row is never overwritten. If you’re curious why the standard INSERT doesn’t behave in the same way, please watch my talk at the Scylla Summit, where I discuss how eventual consistency complements log structured merge trees to provide best write throughput.

IF clause is the magic phrase which makes this statement conditional. That’s why lightweight transactions are also known as conditional statements. The latter name I personally find more telling since, as it will be seen, a lightweight transaction isn’t that lightweight after all. But they are powerful enough, and also massively scalable, as you can see if you check lightest pop(ulate) output (which I am running here on 3 AWS EC2 i3.8xlarge machines):

$ ./lightest pop --host db1 -n 1000000
Creating 1000000 accounts using 384 workers on 96 cores
0.76% done, RPS 7577, Latency min/max/med: 0.002s/0.141s/0.013s
3.11% done, RPS 23562, Latency min/max/med: 0.002s/0.119s/0.010s
…
99.98% done, RPS 676, Latency min/max/med: 0.002s/0.026s/0.002s
100.00% done, RPS 174, Latency min/max/med: 0.002s/0.003s/0.002s
Inserted 1000000 accounts, 13 errors, 1637283 duplicates
Total time: 122.042s, 8193 inserts/sec

We can get more work done if we increase the number of workers:

$ ./lightest pop --host db1 -n 1000000 -w 2000
Creating 1000000 accounts using 2000 workers on 96 cores
0.74% done, RPS 7401, Latency min/max/med: 0.002s/0.186s/0.026s
7.30% done, RPS 65647, Latency min/max/med: 0.003s/0.150s/0.024s
...

The key difference between conditional and standard statements is that conditional statements implement more natural semantics: despite a huge number of workers, if a record is already present in the database, it’s not re-inserted. This semantic is an example of what is known as being linearizable.

Linearizability means being able to process requests in a strict sequential order. Basically, Scylla takes a stream of concurrent writes and orders it into a single sequential history. Which is important, because you have to check the conditions are true and complete each request before beginning processing the next; otherwise underlying data may change (and thus the condition no longer be true). This is also why client statement timestamps are not permitted in conditional statements: each write gets a unique server timestamp which is automatically assigned when/if the write is actually stored.

Dealing with Failure

The client has a way to find out if a statement is not applied because the associated condition was false:

cqlsh:lightest> INSERT INTO accounts (bic, ban, balance, pending_transfer) VALUES (...) IF NOT EXISTS;

 [applied] | bic | ban | balance | pending_amount | pending_transfer
-----------+-----+-----+---------+----------------+------------------
     False | ... | ... |       0 |           null |             null

During population, my goal is to insert a lot of random accounts, which are then used to perform payments. I’d like to avoid having to load the accounts from the database during the payment workload, so instead I save the pseudo-random generator seed, which is then reused during payments to generate the same pseudo-random data. This approach has a caveat: since populate runs with hundreds of workers, all using the same seed, a chance of collision on account identifier is quite high. The collision rate gets even higher as we approach the end of the benchmark, since all workers choose their pseudo-random account numbers from a finite set.

As you can observe in lightest output, thanks to deduplication of concurrent inserts, there were more queries than inserted rows, and the transaction throughput fell significantly towards the end of the test. The number of skipped statements is present in the LWT dashboard for Scylla Monitoring Stack, a chart for cas_condition_not_met:

OK, it’s clear that we need to try another account, if we get a duplicate key error, but what shall we do if we get an error? Should we retry with the current or a new account?

People are used to transactions having either of the two outcomes: success or failure. The case of failure is hard to come by in a staged setting and there is a strong temptation to ignore it.

To simulate potential real-world errors, when running my test I tweaked the cluster by sending STOP and CONTINUE signals to server processes. Database users commonly think errors have no impact, i.e. do not change the database state. Indeed, aren’t we using transactions to achieve atomic, all or nothing semantics?

query failure != transaction failure

A common misconception coming from using non-distributed databases is that a failure always has to mean a transaction failure, which in turn always ends with a transaction rollback. For an embedded database like SQLite, error of any other type has no occasion to happen. Even for most popular client-server databases like MySQL or PostgreSQL, a connection failure after a transaction commit is an extremely rare event, so applications simply ignore it.

Scylla, as a distributed database, executes a transaction at multiple nodes, some of which are allowed to fail. With CL=SERIAL, just like with CL=QUORUM, as long as the majority of replicas succeed, the transaction is committed even if the node which initiated the transaction fails. A failure doesn’t have to be even a node crash or network error: the client query can simply time out.

LWT Timeout due to uncertainty

lightest is well aware of this property. When an INSERT fails, be it because of a timeout, lack of quorum, or contention, lightest re-tries, and a retry can then fail, apply, or find an existing record. Luckily, IF predicate makes the LWT statement safe to retry. As a result, some of the failures actually stand for successful account registrations, and re-tries of these transactions do not satisfy IF condition and thus do not apply.

Note: if the statement condition doesn’t become false after a statement is applied, re-trying it may lead to double apply. In such cases instead of re-trying, one can read the latest data back, and check if it is actually updated. SELECT statement then must use SERIAL consistency level, to guarantee it returns the most recently committed data. Scylla made an effort to make SERIAL SELECTs really linearizable, having fixed issue CASSANDRA-12126, still open in Cassandra.

Let’s check how much overhead SERIAL consistency incurs by running lightest populate with QUORUM consistency:

$ ./lightest pop --host db1 -n 3000000 -c QUORUM -w 8000
Creating 3000000 accounts using 8000 workers on 96 cores
1.70% done, RPS 51048, Latency min/max/med: 0.000s/0.415s/0.002s
11.30% done, RPS 287915, Latency min/max/med: 0.000s/0.045s/0.001s
..
Inserted 3000000 accounts, 0 errors, 0 duplicates
Total time: 19.766s, 151774 t/sec
Latency min/max/avg: 0.000s/0.415s/0.002s
Latency 95/99/99.9%: 0.007s/0.025s/0.180s

The throughput is nearly 4x! Where is the LWT overhead coming from?

The way Paxos achieves many of its properties is by introducing a concept of a ballot – an effectively unique identifier associated with each transaction. In Scylla, a ballot is a 128-bit UUID based on 64-bit clock reading, a random bit sequence and the machine id. This makes all ballots unique and chronologically orderable. Paxos ballots are tracked independently for each partition key. It is both a strength and a weakness: absence of coordination between transactions against different partitions increases the overall throughput and availability, but provides no mutual order, so transactions cannot span across partitions.

Ballots, as well as other protocol state, are stored in a local system table, system.paxos at all replicas. When a transaction is complete, most of this state is pruned away. If a transaction fails mid-way, however, the state is kept for gc_grace_seconds (set to 10 days by default), and DBAs are expected to run nodetool repair before it expires.

The node which is performing the transaction, also known as the coordinator, begins by creating a new ballot and asking the nodes owning data for the respective token range to store it. Replicas refuse to store the ballot if it is older than the one they already know, while the coordinator refuses to proceed if it doesn’t have a majority of responses from replicas. This majority rule ensures only a single transaction is modifying the data at a time, and that the coordinator gets up to date with the most recent changes before it suggests a new one. Remember I mentioned that an LWT statement first reads the data from the table, and only then executes an update? So Scylla actually does two things at the same time: reads the conditioned row from the majority of replicas, and, by storing the ballot, extracts a promise from replicas to accept a new value only from a higher ballot (not from a lower ballot) when/if it comes along.

After the coordinator gets a majority of promises to accept a new value, it evaluates the lightweight transaction condition and sends a new row to replicas. Finally, when a majority of replicas accept and persist the new row in their system.paxos table, the coordinator instructs them to apply the change to the base table. At any point some of the replicas may fail, and even in absence of failure replicas may refuse a ballot because they have made a promise to a different coordinator. In all such cases the coordinator retries, possibly with a new ballot. After all steps are complete, the coordinator requests the participants to prune system.paxos from intermediate protocol state. The pruning is done as a background task.

In total, Scylla performs 3 foreground and 1 background write for each Paxos transaction at each replica. While this is better than the current Cassandra implementation (here’s a good summary of differences between Scylla and Cassandra), it is still 4x more than an average QUORUM write. What’s more, each LWT write in Scylla flushes the commit log to disk, regardless of the commitlog_sync setting, to ensure LWT writes are truly durable.

Why does Scylla have to make all these rounds? Isn’t it possible to reduce the algorithm to a single round per transaction on average? Let’s look at why each phase is important.

  • Step 1, the prepare phase, is necessary to ensure there is only one coordinator making the change. We could avoid selecting a coordinator independently for each transaction. However, it would require maintaining a record of the current leader for each partition key, which is a lot of additional metadata. On the positive side, Scylla LWT availability does not depend on a single leader, and node failures have low impact on cluster availability.
  • Step 2, storing the interim result in system.paxos is necessary to work well with eventually consistent reads and writes. Scylla can’t assume people will not be using QUORUM reads and writes of the same data along with LWT, even though it’s not recommended. So a transaction should not be visible for these until it’s committed, and this is why it’s first made durable in system.paxos. We could have avoided this step by not permitting eventually consistent and conditional writes of the same data. That would break compatibility with Cassandra, so we decided not to do it. Cassandra perhaps followed the same reasoning originally and wanted to simplify adoption of lightweight transactions in existing eventually consistent apps.
  • Step 3 moves the data from system.paxos to the base table after a transaction is known to have succeeded. This one can’t be avoided 🙂
  • Step 4, the pruning step, is distinct to Scylla. We’ve heard a lot of complaints from Cassandra users that system.paxos can grow out of control. There are no nodetool operations for repairing or pruning LWT state, so at least we make an effort to keep it small. Besides, keeping system.paxos small reduces its write amplification during compaction, and hence speeds up LWT performance quite a bit.

But let’s continue with the banking application. Not only INSERTSs, but also UPDATEs and DELETEs support conditions. I will jump right to the core of my app, making a stab at implementing a money transfer between two accounts.

A transfer, naturally, needs to subtract an amount from the source account and add it to the destination account. The transaction shouldn’t proceed if the source account would be overdrawn — even if it is debited concurrently. A transfer must never be partial — if we debit the source, we must ensure we credit the destination. Finally, the application has to deal with failures, i.e. retry on error. Let me sum this up. A transfer routine must:

  • never lead to an overdraft; the client should get an error about possible overdraft
  • change source and destination accounts atomically
  • be safe to re-try, for example, re-trying should be a no-op if the transfer is already done.

One could observe that with a classical ACID transaction an atomic transfer procedure would seem to be trivial to write:

BEGIN
  UPDATE accounts
    WHERE bic=$bic AND ban=$src_ban
    SET balance = $balance - $amount
  UPDATE accounts
    WHERE bic=$bic AND ban=$dst_ban
    SET balance = balance + $amount
COMMIT (*)

Note: one would still need to take care of a possible overdraft and, more importantly, making the transaction safe to re-try. The default transaction isolation level is virtually never SERIAL, so this also has to be taken into account.

In Scylla, since each account resides on its own partition, we’ll need to use multiple conditional statements. It may console you that a real world ledger rarely resides in a single database. Besides, modifying it may require accessing a physical device, like a tape printer, which is hardly aware of the concept of “rollback”. In all such cases wrapping a transfer into a single transaction is impossible as well.

Adding transfer history

The technique to ensure we never make a partial or duplicate transfer despite the transfer procedure consisting of multiple steps is perhaps as old as clay tablets. However I will use a trendy name for it, derived from microservice architecture patterns: “event sourcing.”

Jokes aside, let’s add a history of all transfers, and make sure we register the intent first, to be able to carry out unfinished transfers reliably:

CREATE TABLE lightest.transfers (
    transfer_id UUID, -- transfer UUID
    src_bic TEXT, -- source bank identification code
    src_ban TEXT, -- source bank account number
    dst_bic TEXT, -- destination bank identification code
    dst_ban TEXT, -- destination bank account number
    amount DECIMAL, -- transfer amount
    state TEXT, -- 'new', 'locked', 'complete'
    client_id UUID, -- the client performing the transfer
    PRIMARY KEY (transfer_id)
)

With that, to start a transfer it’s enough to add it to the history. It’s now possible to return to the history record any time and retrieve the transfer state. Eventually we may prune the history from completed transfers — just like Paxos algorithm prunes system.paxos from completed lightweight transactions.

INSERT INTO transfers
  (transfer_id, src_bic, src_ban, dst_bic, dst_ban, amount, state)
  VALUES (?, ?, ?, ?, ?, ?, 'new')
  IF NOT EXISTS</code

Thanks to using conditions this INSERT satisfies all our requirements: it’s atomic, safe to retry, and, as we’ll see in a second, it doesn’t lead to an overdraft.

Inserting a history record doesn’t guarantee that the transfer is or will ever be carried out: this is why besides the transfer details we have a transfer state column, and the initial state is “new”. If funds are sufficient, which will be known once we fetch the source balance, the algorithm will proceed with the withdrawal, otherwise it’ll cut the party short by changing the state to ‘complete’. Otherwise the state will change to ‘complete’ only when the funds are transferred.

The next step is to change debit and credit accounts. To ensure it’s safe in presence of client failure I’ll make sure all algorithm steps are idempotent – are safe to carry out more than once. A simple trick to make a series of LWT statements idempotent is to condition each next statement on the results of the previous one. This will allow any client to resume a transfer even if the client that started it fails. To avoid multiple clients from working on the same transfer, we will register the client responsible for the transfer in the history:

UPDATE transfers USING TTL 30
  SET client_id = ?
  WHERE transfer_id = ?
  IF amount != NULL AND client_id = NULL</code

This update is TTLed: as long as the transfer is not being worked on, any client should be able to take it. It ensures the transfer doesn’t deadlock if the client that started it disintegrates. If TTL is set on the original INSERT, it will expire the entire transfer, that’s why I use a separate update. When TTL expires, Scylla will “cover” our UPDATE with a DELETE (also known as a tombstone) which has the same timestamp. Using the original timestamp is a good idea, since it guarantees nothing but the UPDATE itself is erased: if two mutations have the same timestamp, DELETE trumps UPDATE. With LWT, each write gets a unique monotonically growing timestamp, so no other mutation is at risk.

For correctness, we need to guarantee that the transfer procedure never lasts longer than TTL — otherwise two clients could compete for the same transfer. Scylla’s default request timeout is only 1 second. Most modern systems have a functioning NTP daemon, so clock skew is very unlikely to grow beyond 10-15 seconds. Based on this, 30 seconds should be more than enough — but perhaps a larger value could be used in production.

Now we can prepare source and destination accounts, by setting a pending transfer against them:

UPDATE accounts
  SET pending_transfer = ?, pending_amount = ?
  WHERE bic = ? AND ban = ?
IF balance != NULL AND pending_amount != NULL AND pending_transfer = NULL

The statement does nothing if there is a transfer which is already in progress. Note, the pending amount is actually negative for the debit account, since funds are withdrawn from it.

I use a != NULL check against balance and pending_amount for a couple of reasons:

  • It ensures the account is not accidentally created if it doesn’t exist
  • In Scylla, it returns back the previous balance and pending amount, which saves one query:
cqlsh:lightest> UPDATE accounts
  SET pending_transfer = b22cfef0-9078-11ea-bda5-b306a8f6411c,
  pending_amount = -24.12
  WHERE bic = 'DCCDIN51' AND ban = '30000000000000'
  IF balance != NULL AND pending_amount != NULL
  AND pending_transfer = NULL;

[applied] | balance | pending_amount | pending_transfer
----------+---------+----------------+------------------
     True |   42716 |              0 |             null

Notice the change in the output when I run the same query one more time:

cqlsh:lightest> UPDATE accounts
  SET pending_transfer = b22cfef0-9078-11ea-bda5-b306a8f6411c,
  pending_amount = -24.12
  WHERE bic = 'DCCDIN51' AND ban = '30000000000000'
  IF balance != NULL AND pending_amount != NULL
  AND pending_transfer = NULL;

[applied] | balance | pending_amount | pending_transfer
----------+---------+----------------+-------------------------------------
    False |   42716 |         -24.12 | b22cfef0-9078-11ea-bda5-b306a8f6411c

(In Cassandra I would have to make a separate SELECT to query these fields after the update above is applied).

This seals the debit and credit accounts from concurrent work. I will touch upon what should be done if any of the two above steps find a competing transfer in a second.

For now let’s see how the transfer is completed if this improvised locking succeeds. First of all, since the locking statement returns the previous row, we can check if the source account has sufficient funds. If not, we could segue to setting the transfer to ‘complete’. Otherwise, we need to prepare for updating the balances:

UPDATE transfers
  SET state = 'locked'
  WHERE transfer_id = ?
  IF amount != NULL AND client_id = ?

Notice the same trick with != NULL to not accidentally insert a transfer if it’s already finished and removed. Changing the state to 'locked' helps us avoid moving the money for the same transfer twice if it is resumed after only one of the two statements below succeed:

-- run for both debit and credit accounts
UPDATE accounts
  SET pending_amount = 0, balance = ?
  WHERE bic = ? AND ban = ?
  IF balance != NULL AND pending_transfer = ?

If the source account has insufficient funds, or one of the accounts is missing, or any other pre-condition is violated, the above step is skipped. If we resume the transfer after only one of the accounts is modified, we do not debit or credit it twice, the current value of the pending_amount serves as a sentinel. Once both accounts are updated successfully, we can change the transfer state to complete:

UPDATE transfers
  SET state = ‘complete’
  WHERE transfer_id = ?
  IF amount != NULL AND client_id = ?

An updated transfer state indicates the money has been moved, so we won’t attempt to do it again if the transfer is resumed after one of the unlocking steps fails:

UPDATE accounts
  SET pending_transfer = NULL, pending_amount = 0
  WHERE bic = ? AND ban = ?
  IF balance != NULL AND pending_transfer = ?

Finally, once the transfer has been completed, its record can be removed from the transfer history.

Thanks to the power of lightweight transactions it’s safe to execute each step more than once, as well as repeat all steps as many times as we please, without risking breaking anything.

Error handling

The beauty of idempotency is that error handling is fairly straightforward: in most cases, we can simply retry from the first failed step. For example, if the transfer state is already ‘locked’, we proceed directly to updating the balance, and it’s safe to skip the update if we see the state is ‘complete’.

But what if we find that there is another transfer against one of the accounts, carried out by another client? Normally, we should let it finish, and then try again. All transfer preconditions, such as absence of overdraft, will need to be evaluated again. In a rare case when the client which initiated the transfer is gone (we can always consult with the transfers.client_id cell), we can carry out the transfer on its behalf, after all, this is what the record in the transfer history is for. Repairing an abandoned transfer uses the exact same steps as making our own. Alternatively, we could create a materialized view for abandoned transfers:

CREATE MATERIALIZED VIEW orphaned_transfers AS
  SELECT * FROM transfers WHERE client_id=null
  PRIMARY KEY (id);

… and have a background process finishing them up.

There is one last thing to take care of. When trying to update the accounts table with our transfer UUID we may encounter a conflicting transfer making the exact opposite move: from credit to debit accounts. We mustn’t sleep then, otherwise the two transfers will deadlock. To avoid the situation altogether, lightest reorders the “locking” steps of the algorithm to always update accounts in lexicographical order. This makes sure deadlocks are impossible.

If you’re curious about the nitty-gritty details of deadlock detection and failed transfer recovery, it’s all implemented in lightest. You’re more than welcome to take a look.

  • An additional challenge: there is a very improbable race if a transfer takes longer than 30 seconds. Drop me a message to kostja@scylladb.com if you guess what it is.

Let’s run a test workload to see how many transfers per second we could squeeze out of our cluster. This time I filled the database with 100,000,000 accounts, which took me ~90 minutes, mostly because of the high (~62%) fraction of duplicates .

Now let’s make 1,000,000 payments:

$ ./lightest pay -n 1000000 -w 200 --host db1
Total time: 384.056s, 2515 t/sec
Latency min/max/avg: 0.018s/46.906s/0.077s
Latency 95/99/99.9%: 0.120s/0.166s/0.221s
Errors: 0, Retries: 650, Recoveries: 274,
Not found: 41889, Overdraft: 23086

The latency is much higher now: a single transfer translates into 8 LWT statements. a huge maximal latency suggests a few clients were racing to lock one or few hot accounts. The retries counter is another indicator of the same.

p99 Latency for LWTs

Let’s see how bad it could get if we change the account selection to a Zipfian distribution:

$ ./lightest pay -n 1000000 -w 4000 --host db1 --zipfian
Total time: 1443.835s, 693 t/sec
Latency min/max/avg: 0.009s/1327.792s/3.378s
Latency 95/99/99.9%: 1.072s/64.676s/551.144s
Errors: 0, Retries: 879726, Recoveries: 11150,
Not found: 178461, Overdraft: 19131

The results are not pleasing to the eye, but this was the purpose of the exercise. The good part is that our transfer algorithm stands the test of contention and makes steady progress. The bad part is that thanks to a naive retry strategy it could take over 500 seconds to make a transfer for the most contended accounts. We should also be interested in Paxos metrics: how much of a performance penalty did we pay thanks to Paxos internal contention, and how much was it simply because the clients slept between repeating attempts to lock accounts?

In issue #6259 Scylla extended Cassandra binary protocol to allow the driver to detect conditional statements at prepare, and direct them to the primary replica if it’s available. We also opened a corresponding Cassandra ticket. Additionally, when requests from different clients arrive at the same coordinator, they don’t contend with each other, but wait in a line. These optimizations helped reduce coordinator contention, a very unpleasant artefact of the straightforward Paxos algorithm. While these optimizations significantly improve the worst case performance, LWT is still not the best choice for highly contended workloads. Queued up requests take up memory, and still can timeout and then have to retry. This adds useless work and leads to performance jitter:

Conclusion

Lightweight transactions provide conditional updates through linearizability — a database system property similar to the widely known serializability. While linearizable statements are more expensive and may seem a bit harder to use than statements in classical serializable transactions, they are no less powerful. By using a leader-less architecture for LWT, the Scylla implementation provides unprecedented availability, at the cost of possibly lower throughput. When implementing LWT, Scylla engineering team made an effort to address some of the well known gotchas of Cassandra:

  • Consistency of SELECT statements
  • Excessive contention issues and retries
  • Uncontrolled growth of the Paxos table.

We also invested in making LWT durable out of the box, and less expensive, to the extent possible without breaking Cassandra semantics.

We also wanted to ensure that LWTs had strong visibility to users. We want you to be able to see how LWTs are impacting your system as you try them out. Scylla Monitoring Stack can be used to track any production issues with LWT.

Apart from high cost, a responsible implementation needs to be careful to not create hotspots.

If you want to provide feedback to us on our implementation of LWTs, please feel free to contact us directly, or drop in our Slack channel, or our email list and let us know what you think.

CHECK OUT LIGHTEST ON GITHUB

DOWNLOAD SCYLLA OPEN SOURCE

The post Getting the Most out of Lightweight Transactions in Scylla appeared first on ScyllaDB.

Scylla Enterprise Release 2019.1.10

The ScyllaDB team announces the release of Scylla Enterprise 2019.1.10, a production-ready Scylla Enterprise patch release. As always, Scylla Enterprise customers are encouraged to upgrade to Scylla Enterprise 2019.1.10 in coordination with the Scylla support team.

The focus of Scylla Enterprise 2019.1.10 is improving stability and bug fixes. More below.

Related Links

Fixed issues in this release are listed below, with open source references, if present:

  • Performance: A few of the local system tables from the `system` namespace, like large_partitions do not use gc grace period to 0, which may result in millions of tombstones being needlessly kept for these tables, which can cause read timeouts. Local system tables use LocalStrategy replication, so they do not need to be concerned about gc grace period. #6325
  • CQL: ALTERing compaction settings for table also sets default_time_to_live to 0 #5048
  • Stability: Scylla freezes when casting a decimal with a negative scale to a float #6720
  • Stability: In a case when using partition or clustering keys which have a representation in memory which is larger than 12.8 KB (10% of LSA segment size), linearization of the large (fragmented) keys may cause undefined behavior #6637
  • Correctness: Materialized view updates with future partition tombstones are not correctly generated. When the base table does not have any rows, but it does have a partition tombstone with a given timestamp, inserting a base row with a smaller timestamp will generate an incorrect materialized view update #5793

The post Scylla Enterprise Release 2019.1.10 appeared first on ScyllaDB.

Byte Down: Making Netflix’s Data Infrastructure Cost-Effective

By Torio Risianto, Bhargavi Reddy, Tanvi Sahni, Andrew Park

Background on data efficiency

At Netflix, we invest heavily in our data infrastructure which is composed of dozens of data platforms, hundreds of data producers and consumers, and petabytes of data.

At many other organizations, an effective way to manage data infrastructure costs is to set budgets and other heavy guardrails to limit spending. However, due to the highly distributed nature of our data infrastructure and our emphasis on freedom and responsibility, those processes are counter-cultural and ineffective.

Our efficiency approach, therefore, is to provide cost transparency and place the efficiency context as close to the decision-makers as possible. Our highest leverage tool is a custom dashboard that serves as a feedback loop to data producers and consumers — it is the single holistic source of truth for cost and usage trends for Netflix’s data users. This post details our approach and lessons learned in creating our data efficiency dashboard.

Netflix’s data platform landscape

Netflix’s data platforms can be broadly classified as data at rest and data in motion systems. Data at rest stores such as S3 Data Warehouse, Cassandra, Elasticsearch, etc. physically store data and the infrastructure cost is primarily attributed to storage. Data in motion systems such as Keystone, Mantis, Spark, Flink, etc. contribute to data infrastructure compute costs associated with processing transient data. Each data platform contains thousands of distinct data objects (i.e. resources), which are often owned by various teams and data users.

Creating usage and cost visibility

To get a unified view of cost for each team, we need to be able to aggregate costs across all these platforms but also, retaining the ability to break it down by a meaningful resource unit (table, index, column family, job, etc).

Data flow

S3 Inventory: Provides a list of objects and their corresponding metadata like the size in bytes for S3 buckets which are configured to generate the inventory lists.
Netflix Data Catalog (NDC): In-house federated metadata store which represents a single comprehensive knowledge base for all data resources at Netflix.
Atlas: Monitoring system which generates operational metrics for a system (CPU usage, memory usage, network throughput, etc.)

Cost calculations and business logic

As the source of truth for cost data, AWS billing is categorized by service (EC2, S3, etc) and can be allocated to various platforms based on AWS tags. However, this granularity is not sufficient to provide visibility into infrastructure costs by data resource and/or team. We have used the following approach to further allocate these costs:

EC2-based platforms: Determine bottleneck metrics for the platform, namely CPU, memory, storage, IO, throughput, or a combination. For example, Kafka data streams are typically network bound, whereas spark jobs are typically CPU and memory bound. Next, we identified the consumption of bottleneck metrics per data resource using Atlas, platform logs, and various REST APIs. Cost is allocated based on the consumption of bottleneck metrics per resource (e.g., % CPU utilization for spark jobs). The detailed calculation logic for platforms can vary depending on their architecture. The following is an example of cost attributions for jobs running in a CPU-bound compute platform:

S3-based platforms: We use AWS’s S3 Inventory (which has object level granularity) in order to map each S3 prefix to the corresponding data resource (e.g. hive table). We then translate storage bytes per data resource to cost based on S3 storage prices from AWS billing data.

Dashboard view

We use a druid-backed custom dashboard to relay cost context to teams. The primary target audiences for our cost data are the engineering and data science teams as they have the best context to take action based on such information. In addition, we provide cost context at a higher level for engineering leaders. Depending on the use case, the cost can be grouped based on the data resource hierarchy or org hierarchy. Both snapshots and time-series views are available.

Note: The following snippets containing costs, comparable business metrics, and job titles do not represent actual data and are for ILLUSTRATIVE purposes only.

Illustrative summary facts showing annualized costs and comparable business metrics
Illustrative annualized data cost split by organization hierarchy
Illustrative annualized data cost split by resource hierarchy for a specific team
Illustrative time-series showing week over week cost (annualized) for a specific team by platform

Automated storage recommendations — Time to live (TTL)

In select scenarios where the engineering investment is worthwhile, we go beyond providing transparency and provide optimization recommendations. Since data storage has a lot of usage and cost momentum (i.e. save-and-forget build-up), we automated the analysis that determines the optimal duration of storage (TTL) based on data usage patterns. So far, we have enabled TTL recommendations for our S3 big data warehouse tables.

Our big data warehouse allows individual owners of tables to choose the length of retention. Based on these retention values, data stored in date- partitioned S3 tables are cleaned up by a data janitor process which drops partitions older than the TTL value on a daily basis. Historically most data owners did not have a good way of understanding usage patterns in order to decide optimal TTL.

Data flow

S3 Access logs: AWS generated logging for any S3 requests made which provide detailed records about what S3 prefix was accessed, time of access, and other useful information.
Table Partition Metadata: Generated from an in-house metadata layer (Metacat) which maps a hive table and its partitions to a specific underlying S3 location and stores this metadata. This is useful to map the S3 access logs to the DW table which was accessed in the request.
Lookback days: Difference between the date partition accessed and the date when the partition was accessed.

Cost calculations and business logic

The largest S3 storage cost comes from transactional tables, which are typically partitioned by date. Using S3 access logs and S3 prefix-to-table-partition mapping, we are able to determine which date partitions are accessed on any given day. Next, we look at access(read/write) activities in the last 180 days and identify the max lookback days. This maximum value of lookback days determines the ideal TTL of a given table. In addition, we calculate the potential annual savings that can be realized (based on today’s storage level) based on the optimal TTL.

Dashboard view

From the dashboard, data owners can look at the detailed access patterns, recommended vs. current TTL values, as well as the potential savings.

An illustrative example of a table with sub-optimal TTL

Communication and alerting users

Checking data costs should not be part of any engineering team’s daily job, especially those with insignificant data costs. To that regard, we invested in email push notifications to increase data cost awareness among teams with significant data usage. Similarly, we send automated TTL recommendations only for tables with material cost-saving potentials. Currently, these emails are sent monthly.

Learnings and challenges

Identifying and maintaining metadata of assets is critical for cost allocation

What is a resource? What is the complete set of data resources we own?
These questions form the primary building blocks of cost efficiency and allocation. We are extracting metadata for a myriad of platforms across in-motion and at-rest systems as described earlier. Different platforms store their resource metadata in different ways. To address this, Netflix is building a metadata store called the Netflix Data Catalog (NDC). NDC enables easier data access and discovery to support data management requirements for both existing and new data. We use the NDC as the starting point for cost calculations. Having a federated metadata store ensures that we have a universally understood and accepted concept of defining what resources exist and which resources are owned by individual teams.

Time trends are challenging

Time trends carry a much higher maintenance burden than point-in-time snapshots. In the case of data inconsistencies and latencies in ingestion, showing a consistent view over time is often challenging. Specifically, we dealt with the following two challenges:

  • Changes in resource ownership: for a point-in-time snapshot view, this change should be automatically reflected. However, for a time series view, any change in the ownership should also be reflected in historical metadata as well.
  • Loss of state in case of data issues: resource metadata is extracted from a variety of sources many of which are API extractions, it’s possible to lose state in case of job failures during data ingestion time. API extractions in general have drawbacks because the data is transient. It’s important to explore alternatives like pumping events to Keystone so that we can persist data for a longer period.

Conclusion

When faced with a myriad of data platforms with a highly distributed, decentralized data user base, consolidating usage and cost context to create feedback loops via dashboards provide great leverage in tackling efficiency. When reasonable, creating automated recommendations to further reduce the efficiency burden is warranted — in our case, there was high ROI in data warehouse table retention recommendations. So far, these dashboards and TTL recommendations have contributed to over a 10% decrease in our data warehouse storage footprint.

What’s next?

In the future, we plan to further push data efficiency by using different storage classes for resources based on usage patterns as well as identifying and aggressively deleting upstream and downstream dependencies of unused data resources.

Interested in working with large scale data? Platform Data Science & Engineering is hiring!


Byte Down: Making Netflix’s Data Infrastructure Cost-Effective was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Scylla Manager 2.1.1


The ScyllaDB team announces the release of Scylla Manager 2.1.1, a production-ready Scylla Manager patch release of the stable Scylla Manager 2.1 branch. As always, Scylla Manager customers and users are encouraged to upgrade to Scylla 2.1.1 in coordination with the Scylla support team.

Scylla Manager is a centralized cluster administration and recurrent tasks automation tool.
Note that this upgrade only affects the Manager Server and sctool, not the Manager Agent.

The release fixes one bug in the Manager --intensity flag, an option introduced in Manager 2.1 to control the repair speed.

Using an intensity value lower than 1 is supposed to set the percent of shard repaired in parallel. Instead, it was repairing only one shard at a time, resulting in a slower than expected repair. In addition the intensity flag was not correctly displayed in task progress.

Related Links

The post Scylla Manager 2.1.1 appeared first on ScyllaDB.

Upgrading a Large Cassandra Cluster with cstar

I recently did an upgrade of 200+ nodes of Cassandra across multiple environments sitting behind multiple applications using the cstar tool. We chose the cstar tool because, out of all automation options, it has topology awareness specifically to Cassandra. Here are some things I noticed.

1. The sister program cstarpar is sometimes required

The cstar tool is used to run commands on servers in a distributed way. The alternate cstarpar is used if you need to run the commands on the originating server instead. The Last Pickle detailed a fine example in their 3-part series on cstarpar [https://thelastpickle.com/blog/2018/12/11/cstar-reboots.html]. In our case, the individual nodes didn’t have the same access to a configuration management server that the jump host did. The cstarpar script was used to issue a command to the configuration management server, and then send ssh commands to the individual nodes (to change files, restart, etc.).

2. The cstar jobs folder can be used to view output

The jobs folder is on the originating server under ~/.cstar/jobs, with a UUID-labeled directory for each job, and server hostname directories underneath. The output is in a file named “out” under each hostname directory. Grepping through ~/.cstar/jobs/[UUID]/server*/out is a handy way to view desired info in the output.

3. Use verbose output

The cstar output can be a little too quiet, and we know that sometimes means trouble. The tag on a -v flag so you have lots of output to grep through as above.

4. Ask for the output

Related, you also have to ask for some output. One of the pre-checks was to verify that specifically named files didn’t exist. Long story short, but the most efficient way to do this particular check was to grep through directories. In the test, the command worked, and in staging, the command worked. In production, cstar was marking each node as failed. Much troubleshooting later, we realized that the files existed in test and staging, but not production, so the script wasn’t finding anything and therefore “failing.” Piping the output into a ‘wc -l’ allowed each check to have some kind of response, and the script succeeded.

5. The Cassandra nodes have to be up

It’s documented that all of the nodes in a cluster have to be registering as up, or cstar will fail. The automated process we used was to shut down Cassandra, pull the new config and binary, and restart Cassandra, node by node. With a lot of Cassandra nodes, even with a brief sleep time in between nodes, I was hitting the permissions server too often and too quickly for its comfort, and about 75% of the way through, it started blocking me after Cassandra was shut down on every 10th node or so. The only way I detected this was that cstar paused for long enough that I noticed; there was no error message. I had to wait for the permissions server to stop limiting me, and then manually issue the commands on the node. On the plus side, cstar didn’t fail while waiting for me and continued on with the list of nodes automatically after I took care of the individual node.

6. It really is topology-aware

I saved the best for last. It’s a trick to make other automation tools aware of Cassandra topology. In this upgrade environment, we had multiple data centers with varying numbers of nodes within each, and cstar was smart about distributing the work so that generally the same percentage of nodes were completed for each data center at any point in time. That meant that in the end, the largest data center wasn’t being hit repeatedly with remaining upgrades.

Overall, the gotchas were minor, and I’m happy we used the cstar tool on this upgrade. It allowed flexibility to run custom scripts in a unique environment and certainly shortened the amount of time required to upgrade a large cluster.

Check out the cstar tool here https://github.com/spotify/cstar.

Spark + Cassandra Best Practices

Spark Overview

Spark was created in 2009 as a response to difficulties with map-reduce in Hadoop, particularly in supporting machine learning and other interactive data analysis. Spark simplifies the processing and analysis of data, reducing the number of steps and allowing ease of development. It also provides for reduced latency, since processing is done in-memory.

Spark can be used to process and analyze data to and from a variety of data storage sources and destinations. In this blog , we will discuss Spark in conjunction with data stored in Cassandra.

Querying and manipulating data in Spark has several advantages over doing so directly in Cassandra, not the least of which is being able to join data performantly. This feature is useful for analytics projects.

Spark Use Cases

Typical use cases for Spark when used with Cassandra are: aggregating data (for example, calculating averages by day or grouping counts by category) and archiving data (for example, sending external data to cold storage before deleting from Cassandra). Spark is also used for batched inserts to Cassandra. Other use cases not particular to Cassandra include a variety of machine learning topics.

Spark in the Data Lifecycle

A data analysis project starts with data ingestion into data storage. From there, data is cleansed and otherwise processed. The resulting data is analyzed, reviewing for patterns and other qualities. It may then be further analyzed using a variety of machine learning methods. End-users will be able to run ad hoc queries and use interfaces to visualize data patterns. Spark has a star role within this data flow architecture.

Ingestion

Spark can be used independently to load data in batches from a variety of data sources (including Cassandra tables) into distributed data structures (RDDs) used in Spark to parallelize analytic jobs. Since one of the key features of RDDs is the ability to do this processing in memory, loading large amounts of data without server-side filtering will slow down your project. The spark-cassandra-connector has this filtering and other capabilities. (See https://github.com/datastax/spark-cassandra-connector/.) The limitation on memory resources also implies that, once the data is analyzed, it should be persisted (e.g., to a file or database).

To avoid some of the limitations of this batch processing, streaming functionality was added to Spark. In Spark 1, this functionality was offered through DStreams. (See https://spark.apache.org/docs/latest/streaming-programming-guide.html.)

Spark 2 — a more robust version of Spark in general — includes Structured Streaming. (See https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.) With Structured Streaming, consider that instead of creating a static table based on a batch input, the table is constantly updated with new data from the source. The data will be stored in a data frame and continuously updated with the new data. (Another benefit of using dataframes over RDDs is that the data is intuitively abstracted into columns and rows.)

Available data sources on the source side for streaming include the commonly used Apache Kafka. Kafka buffers the ingest, which is key for high-volume streams. See https://kafka.apache.org/ for more details on Kafka.

Data Storage

Although we are focusing on Cassandra as the data storage in this presentation, other storage sources and destinations are possible. Another frequently used data storage option is Hadoop HDFS. The previously mentioned spark-cassandra-connector has capabilities to write results to Cassandra, and in the case of batch loading, to read data directly from Cassandra.

Native data output formats available include both JSON and Parquet. The Parquet format in particular is useful for writing to AWS S3. See https://aws.amazon.com/about-aws/whats-new/2018/09/amazon-s3-announces-new-features-for-s3-select/ for more information on querying S3 files stored in Parquet format. A good use case for this is archiving data from Cassandra.

Data Cleansing

Data cleansing involves dealing with questionable data (such as null values) and other preprocessing tasks (such as converting categorical data to mapped integers). Once data is stored in a data frame, it can be transformed into new dataframes based on filters. Other than the fact you have the capability to do this cleansing within the same code (e.g., the Scala script running Spark), Spark does not provide magic to clean data; after all, this takes knowledge about the data and the business to understand and code particular transformation tasks.

Pattern Analysis

Spark dataframes can be easily explored for basic patterns using commands like describe, which will show the count, mean, standard deviation, minimum value, and maximum value of selected columns. Dataframes can be further transformed with functions like groupBy, pivot, and agg (aggregate). Spark SQL can be used for complex joins and similar tasks, using the SQL language that will be familiar to many data analysts.

Machine Learning and Data Mining

Machine learning and data mining encompass a broad range of data modeling algorithms intended to make predictions or to discover unknown meaning within data.

From Spark 2 onward, the main library for Spark machine learning is based on data frames instead of RDDs. You may see this new data frame-based library referred to as Spark ML, but the library name hasn’t changed — it is still MLlib. (See https://spark.apache.org/docs/latest/ml-guide.html.) Some things that are possible with Spark in this area are recommendation engines, anomaly detection, semantic analysis, and risk analysis.

Ad Hoc Queries

Spark SQL is available to use within any code used with Spark, or from the command line interface; however, the requirement to run ad hoc queries generally implies that business end-users want to access a GUI to both ask questions of the data and create visualizations. This activity could take place using the eventual destination datastore as the backend. Directly from Spark, there are enterprise options such as Tableau, which has a Spark connector. For query speed, a memory-based cache such as Apache Ignite could be used as the analytics backend; to maintain that speed by avoiding disk i/o, the data being used for queries should fit into memory.

Visualization

Depending on the programming language and platform used, there may be libraries available to directly visualize results. For example, if Python is used in a Jupyter notebook, then Matplotlib will be available. (See https://matplotlib.org/.) In general, except for investigating data in order to clean it, etc., visualization will be done on the data after it is written to the destination. For business end-users, the above discussion in Ad Hoc Queries applies.

Architecture

The general architecture for a Spark + Cassandra project is apparent from the discussion of the Data Lifecycle above. The core elements are source data storage, a queueing technology, the Spark cluster, and destination data storage.

In the case of Cassandra, the source data storage is of course a cluster. Spark will only query a single data center, and to avoid load on a production cluster, this is what you want. Most installations will set up a second data center replicating data from the production data center cluster and attach Spark to this second data center. If you are unable to set up a separate data center to connect to Spark (and we strongly recommend setting it up), be sure to carefully tune the write variables in the spark-cassandra-connector. In addition, if Datastax Enterprise is being used, then DSE Search should be isolated on a third data center.

Another consideration is whether to set up Spark on dedicated machines. It is possible to run Spark on the same nodes as Cassandra, and this is the default with DSE Analytics. Again, this is not advised on the main production cluster, but can be done on a second, separate cluster.

Spark in the Cloud

Spark at Google Cloud

Google Cloud offers Dataproc as a fully managed service for Spark (and Hadoop):

https://cloud.google.com/dataproc/

Spark at AWS

AWS supports Spark on EMR: https://aws.amazon.com/emr/features/spark/.

Spark Development

Coding Language Options

Spark code can be written in Python, Scala, Java, or R. SQL can also be used within much of Spark code.

Tuning Notes

Spark Connector Configuration

Slowing down the throughput (output.throughput_mb_per_sec) can alleviate latency.

For writing, then the Spark batch size (spark.cassandra.output.batch.size.bytes) should be within the Cassandra configured batch size (batch_size_fail_threshold_in_kb).

Writing more frequently will reduce the size of the write, reducing the latency. Increasing the batch size will reduce the number of times Cassandra has to deal with timestamp management. Spark can cause allocation failures if the batch size is too big.

For further documentation on connector settings, see  https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md.

Spark Security

Security has to be explicitly configured in Spark; it is not on by default. However, the configuration doesn’t cover all risk vectors, so review the options carefully. Also, most of these settings can be overridden in code accessing Spark, so it is important to audit your codebase and most important to limit connections from specific hosts further protected by user authentication.

Enable authentication

Authentication is turned off by default. It is enabled through two configuration variables, using a shared secret.

The two configuration variables are spark.authenticate (default is false; set to true) and spark.authenticate.secret (set to string of shared secret). If YARN is used, then much of this is done by default. If not, then set these two in the spark-defaults.conf file.

Then use this secret key when submitting jobs. Note that the secret key can be used to submit jobs by anyone with the key, so protect it well.

Enable logging for all submitted jobs

You can set spark.eventLog.enabled in the spark-defaults.conf file, but it can be overridden in a user’s code (e.g., in the SparkConf) or in shell commands, so it has to be enforced by business policy.

Note also that the log file itself (configured via spark.eventLog.dir) should be protected with filesystem permissions to avoid users snooping data within it.

Block Java debugging options in JVM

Make sure the JVM configuration does not include the following options: -Xdebug, -Xrunjdwp, -agentlib:jdwp.

Redact environment data in WebUI

Can disable the whole UI via spark.ui.enabled, but other than that, or overriding the EnvironmentListener with alternate custom code, there is no way to redact the information in the Environment tab of the UI specifically.

It is recommended to enable ACLs for both the WebUI and the history server, which will protect the entirety of the web-based information.

Enable and enforce SASL RPC encryption

The recommendation with Spark is to enable AES encryption since version 2.2, unless using an external Shuffle service. To enable SASL, set the following to true: spark.authenticate.enableSaslEncryption and spark.network.sasl.serverAlwaysEncrypt.

Enable encryption on all UIs and clients

To enable AES encryption for data going across the wire, in addition to turning on authentication as above, also set the following to true: spark.network.crypto.enabled. Choose a key length and set via spark.network.crypto.keyLength, and choose an algorithm from those available in your JRE and set via spark.network.crypto.keyFactoryAlgorithm.

Don’t forget to also set configuration from any database (e.g., Cassandra) to Spark, to encrypt that traffic.

Enable encryption on Shuffle service

In addition to the above encryption configuration, set the following to true: spark.network.crypto.saslFallback.

To encrypt the temporary files created by the Shuffle service, set this to true: spark.io.encryption.enabled. The key size and algorithm can also be set via spark.io.encryption.keySizeBits and spark.io.encryption.keygen.algorithm, but these have reasonable defaults.

Disable Spark REST server

The REST server presents a serious risk, as it does not allow for encryption. Set the following to false: spark.master.rest.enabled.

Operations

Monitoring Spark

Spark has built-in monitoring: https://spark.apache.org/docs/latest/monitoring.html

Apache Cassandra 4.0 Benchmarks

Apache Cassandra 4.0 will reach beta shortly and is the first version that will support JDK 11 and onwards. Latency is an obvious concern for Apache Cassandra™ users and big hopes have been put into ZGC, the new low latency garbage collector introduced in JDK 11. It reached GA in JDK 14, which made us eager to evaluate how good of a fit it would be for Apache Cassandra clusters. We also wanted to compare Apache Cassandra 3.11.6 performance against 4.0 and see if Shenandoah, RedHat’s garbage collector, should be considered for production use. In this post we will see that Cassandra 4.0 brings strong performance improvements on its own which are massively amplified by the availability of new garbage collectors: ZGC and especially Shenandoah.

Benchmarking methodology

The following benchmarks were conducted using tlp-cluster for provisioning and configuring Apache Cassandra clusters in AWS and using tlp-stress for load generation and metrics collection. All used tools are available as open source and benchmarks are easily reproducible for anyone with an AWS account.

Clusters were composed of 3 nodes using r3.2xlarge instances and a single stress node using a c3.2xlarge instance.

Default settings were used for Apache Cassandra, with the exception of GC and heap.

Cluster provisioning and configuration was done using the latest release of tlp-cluster. We recently added some helper scripts to automate cluster creation and also the installation of Reaper and Medusa.

After installing and configuring tlp-cluster according to the documentation, you’ll be able to recreate the clusters we used to run the benchmarks:

# 3.11.6 CMS JDK8
build_cluster.sh -n CMS_3-11-6_jdk8 -v 3.11.6 --heap=16 --gc=CMS -s 1 -i r3.2xlarge --jdk=8 --cores=8

# 3.11.6 G1 JDK8
build_cluster.sh -n G1_3-11-6_jdk8 -v 3.11.6 --heap=31 --gc=G1 -s 1 -i r3.2xlarge --jdk=8 --cores=8

# 4.0 CMS JDK11
build_cluster.sh -n CMS_4-0_jdk11 -v 4.0~alpha4 --heap=16 --gc=CMS -s 1 -i r3.2xlarge --jdk=11 --cores=8

# 4.0 G1 JDK14
build_cluster.sh -n G1_4-0_jdk14 -v 4.0~alpha4 --heap=31 --gc=G1 -s 1 -i r3.2xlarge --jdk=14 --cores=8

# 4.0 ZGC JDK11
build_cluster.sh -n ZGC_4-0_jdk11 -v 4.0~alpha4 --heap=31 --gc=ZGC -s 1 -i r3.2xlarge --jdk=11 --cores=8

# 4.0 ZGC JDK14
build_cluster.sh -n ZGC_4-0_jdk14 -v 4.0~alpha4 --heap=31 --gc=ZGC -s 1 -i r3.2xlarge --jdk=14 --cores=8

# 4.0 Shenandoah JDK11
build_cluster.sh -n Shenandoah_4-0_jdk11 -v 4.0~alpha4 --heap=31 --gc=Shenandoah -s 1 -i r3.2xlarge --jdk=11 --cores=8

Note: in order to conduct all benchmarks under similar conditions, a single set of EC2 instances was used throughout the tests.

An upgrade from Cassandra 3.11.6 to Cassandra 4.0~alpha4 was executed and JDKs were switched when appropriate using the following script:

#!/usr/bin/env bash

OLD=$1
NEW=$2
curl -sL https://github.com/shyiko/jabba/raw/master/install.sh | bash
. ~/.jabba/jabba.sh
jabba uninstall $OLD
jabba install $NEW
jabba alias default $NEW
sudo update-alternatives --install /usr/bin/java java ${JAVA_HOME%*/}/bin/java 20000
sudo update-alternatives --install /usr/bin/javac javac ${JAVA_HOME%*/}/bin/javac 20000

The following JDK values were used when invoking jabba:

  • openjdk@1.11.0-2
  • openjdk@1.14.0
  • openjdk-shenandoah@1.8.0
  • openjdk-shenandoah@1.11.0

OpenJDK 8 was installed using Ubuntu apt.

Here are the java -version outputs for the different JDKs that were used during the benchmarks:

jdk8

openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)

jdk8 with Shenandoah

openjdk version "1.8.0-builds.shipilev.net-openjdk-shenandoah-jdk8-b712-20200629"
OpenJDK Runtime Environment (build 1.8.0-builds.shipilev.net-openjdk-shenandoah-jdk8-b712-20200629-b712)
OpenJDK 64-Bit Server VM (build 25.71-b712, mixed mode)

jdk11

openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

jdk11 with Shenandoah

openjdk version "11.0.8-testing" 2020-07-14
OpenJDK Runtime Environment (build 11.0.8-testing+0-builds.shipilev.net-openjdk-shenandoah-jdk11-b277-20200624)
OpenJDK 64-Bit Server VM (build 11.0.8-testing+0-builds.shipilev.net-openjdk-shenandoah-jdk11-b277-20200624, mixed mode)

jdk14

openjdk version "14.0.1" 2020-04-14
OpenJDK Runtime Environment (build 14.0.1+7)
OpenJDK 64-Bit Server VM (build 14.0.1+7, mixed mode, sharing)

CMS

CMS (Concurrent Mark Sweep) is the current default garbage collector in Apache Cassandra. It was removed from JDK 14 so all tests were conducted with either JDK 8 or 11.

The following settings were used for CMS benchmarks:

-XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-XX:+CMSParallelRemarkEnabled
-XX:SurvivorRatio=8
-XX:MaxTenuringThreshold=1
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSWaitDuration=10000
-XX:+CMSParallelInitialMarkEnabled
-XX:+CMSEdenChunksRecordAlways
-XX:+CMSClassUnloadingEnabled
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=8
-Xms16G
-Xmx16G
-Xmn8G

Note that the -XX:+UseParNewGC flag was removed from JDK 11 and is then implicit. Using this flag would prevent the JVM from starting up.

We’ll keep CMS at 16GB of max heap as it could otherwise lead to very long pauses on major collections.

G1

G1GC (Garbage-First Garbage Collector) is easier to configure than CMS as it resizes the young generation dynamically, but delivers better with large heaps (>=24GB). This explains why it hasn’t been promoted to be the default garbage collector. It also shows higher latencies than a tuned CMS, but provides better throughput.

The following settings were used for G1 benchmarks:

-XX:+UseG1GC
-XX:G1RSetUpdatingPauseTimePercent=5
-XX:MaxGCPauseMillis=300
-XX:InitiatingHeapOccupancyPercent=70
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=8
-Xms31G
-Xmx31G

For 4.0 benchmarks, JDK 14 was used when running G1 tests.

We’re using 31GB of heap size to benefit from compressed oops and have the largest number of addressable objects for the smallest heap size.

ZGC

ZGC (Z Garbage Collector) is the latest GC from the JDK, which focuses on providing low latency with stop-the-world pauses shorter than 10ms. It is also supposed to guarantee that the heap size has no impact on pause times, allowing it to scale up to 16TB of heap.If these expectations are met, it could remove the need to use off-heap storage and simplify some development aspects of Apache Cassandra.

The following settings were used for ZGC benchmarks:

-XX:+UnlockExperimentalVMOptions
-XX:+UseZGC
-XX:ConcGCThreads=8
-XX:ParallelGCThreads=8
-XX:+UseTransparentHugePages
-verbose:gc
-Xms31G
-Xmx31G

We needed to use the -XX:+UseTransparentHugePages as a workaround to avoid enabling large pages on Linux. While the official ZGC documentation states it could (possibly) generate latency spikes, the results didn’t seem to show such behavior. It could be worth running the throughput tests using large pages to see how it could affect the results.

Note that ZGC cannot use compressed oops and is not affected by the “32GB threshold”. We’re using 31GB of heap to use the same sizing as G1 and allow the system to have the same amount of free RAM.

Shenandoah

Shenandoah is a low latency garbage collector developed at RedHat. It is available as a backport in JDK 8 and 11, and is part of the mainline builds of the OpenJDK starting with Java 13.

Like ZGC, Shenandoah is a mostly concurrent garbage collector which aims at making pause times not proportional to the heap size.

The following settings for Shenandoah benchmarks:

-XX:+UnlockExperimentalVMOptions
-XX:+UseShenandoahGC
-XX:ConcGCThreads=8
-XX:ParallelGCThreads=8
-XX:+UseTransparentHugePages
-Xms31G
-Xmx31G

Shenandoah should be able to use compressed oops and thus benefits from using heaps a little below 32GB.

Cassandra 4.0 JVM configuration

Cassandra version 4.0 ships with separate jvm.options files for Java 8 and Java 11.

These are the files:

  • conf/jvm-server.options
  • conf/jvm8-server.options
  • conf/jvm11-server.options

Upgrading to version 4.0 will work with an existing jvm.options file from version 3.11, so long as it is renamed to jvm-server.options and the jvm8-server.options and jvm11-server.options files are removed. This is not the recommended approach.

The recommended approach is to re-apply the settings found in the previous jvm.options file to the new jvm-server.options and jvm8-server.options files. The Java specific option files are mostly related to the garbage collection flags. Once these two files are updated and in place, it then becomes easier to configure the jvm11-server.options file, and simpler to switch from JDK 8 to JDK 11.

Workloads

Benchmarks were done using 8 threads running with rate limiting and 80% writes/20% reads. tlp-stress uses asynchronous queries extensively, which can easily overwhelm Cassandra nodes with a limited number of stress threads. The load tests were conducted with each thread sending 50 concurrent queries at a time. The keyspace was created with a replication factor of 3 and all queries were executed at consistency level LOCAL_ONE.

All garbage collectors and Cassandra versions were tested with growing rates of 25k, 40k, 45k and 50k operations per second to evaluate their performance under different levels of pressure.

The following tlp-stress command was used:

tlp-stress run BasicTimeSeries -d 30m -p 100M -c 50 --pg sequence -t 8 -r 0.2 --rate <desired rate> --populate 200000

All workloads ran for 30 minutes, loading approximately 5 to 16 GB of data per node and allowing a reasonable compaction load.

Note: The purpose of this test is not to evaluate the maximum performance of Cassandra, which can be tuned in many ways for various workloads. Neither is it to fine tune the garbage collectors which all expose many knobs to improve their performance for specific workloads. These benchmarks attempt to provide a fair comparison of various garbage collectors using mostly default settings when the same load is generated in Cassandra.

Benchmarks results

3.11.6 25k-40k ops/s:
3.11.6 latency charts - 25k-40k

4.0 25k-40k ops/s:
4.0-alpha4 latency charts - 25k-40k

4.0 45k-50k ops/s:
4.0-alpha4 latency charts - 25k-40k

Throughput

Throughput wise, Cassandra 3.11.6 maxed out at 41k ops/s while Cassandra 4.0 went up to 51k ops/s, which is a nice 25% improvement thanks to the upgrade, using CMS in both cases. There have been numerous performance improvements in 4.0 explaining these results, especially on heap pressure caused by compaction (check CASSANDRA-14654 for example).

Shenandoah in jdk8 on Cassandra 3.11.6 fails delivering the maximum throughput in the 40k ops/s load test and starts showing failed requests at this rate. It behaves much better with jdk11 and Cassandra 4.0. It can now nearly match the throughput from CMS with a maximum at 49.6k ops/s. Both G1 and Shenandoah, with jdk 8 maxed out at 36k ops/s overall with Cassandra 3.11.6.

G1 seems to have been improved in jdk14 as well and beats jdk11 with a small improvement from 47k/s to 50k/s.

ZGC fails at delivering a throughput that matches its contenders in both jdk11 and jdk14, with at most 41k ops/s.

Average write p99 tableAverage write p99 (ms)

Average read p99 tableAverage read p99 (ms)

Max p99 tableMax p99 (ms)

Shenandoah in jdk8 delivers some very impressive latencies under moderate load on Cassandra 3.11.6, but performance severely degrades when it gets under pressure.

Using CMS, Cassandra 4.0 manages to keep an average p99 between 11ms and 31ms with up to 50k ops/s. The average read p99 under moderate load went down from 17ms in Cassandra 3.11.6 to 11.5ms in Cassandra 4.0, which gives us a 30% improvement.

With 25% to 30% improvements in both throughput and latency, Cassandra 4.0 easily beats Cassandra 3.11.6 using the same garbage collectors.

Honorable mention to Shenandoah for the very low latencies under moderate load in Cassandra 3.11.6, but the behavior under pressure makes us worried about its capacity of handling spiky loads.

While ZGC delivers some impressive latencies under moderate load, especially with jdk14, it doesn’t keep up at higher rates when compared to Shenandoah. Average p99 latencies for both reads and writes are the lowest for Shenandoah in almost all load tests. These latencies combined with the throughput it can achieve in Cassandra 4.0 make it a very interesting GC to consider when upgrading. An average p99 read latency of 2.64ms under moderate load is pretty impressive! Even more knowing that these are recorded by the client.

G1 mostly matches its configured maximum pause time of 300ms when looking at the max p99, but using lower target pause could have undesirable effects under high load and trigger even longer pauses.

Under moderate load, Shenandoah manages to lower average p99 latencies by 77%, with a top low at 2.64ms. This will be a major improvement for latency sensitive use cases. Compared to CMS in Cassandra 3.11.6, it’s a whopping 85% latency reduction for reads at p99!

Honorable mention to ZGC in jdk14 which delivers some great performance under moderate load but sadly can’t yet keep up at higher rates. We are optimistic that it will be improved in the coming months and might eventually compete with Shenandoah.

Final thoughts

G1 brought improvements in Cassandra’s usability by removing the need to fine tune generation sizes at the expense of some performance. The release of Apache Cassandra 4.0, which brings very impressive boosts on its own, will allow using new generation garbage collectors such as Shenandoah or ZGC, which are both simple to implement with minimal tuning, and more efficient in latencies.

Shenandoah is hard to recommend on Cassandra 3.11.6 as nodes start to misbehave at high loads, but starting from jdk11 and Cassandra 4.0, this garbage collector offers stunning improvements in latencies while almost delivering the maximum throughput one can expect from the database.

Your mileage may vary as these benchmarks focused on a specific workload, but the results make us fairly optimistic in the future of Apache Cassandra for latency sensitive use cases, bringing strong improvements over what Cassandra 3.11.6 can currently deliver.

Download the latest Apache 4 build and give it a try. Let us know if you have any feedback on the community mailing lists or in the ASF Slack.

Cassandra Data Modeling Best Practices Guide

Apache Cassandra is an open source non-relational, or NoSQL, distributed database that enables continuous availability, tremendous scale, and data distribution across multiple data centers and cloud availability zones. Simply put, it provides a highly reliable data storage engine for applications requiring immense scale.

Data modeling is a process used to analyze, organize, and understand the data requirements for a product or service. Data modeling creates the structure your data will live in. It defines how things are labeled and organized, and determines how your data can and will be used. The process of data modeling is similar to designing a house. You start with a conceptual model and add detail to produce the final blueprint. 

The ultimate goal of Cassandra data modeling and analysis is to develop a complete, well organized, and high performance Cassandra cluster. Following the five Cassandra data modeling best practices outlined will hopefully help you meet that goal:

  1. Cassandra is not a relational database, don’t try to model it like one
  2. Design your model to meet 3 fundamental goals for data distribution
  3. Understand the importance of the Primary Key in the overall data structure 
  4. Model around your queries but don’t forget about your data
  5. Follow a six step structured approach to building your model. 

Cassandra Is Not a Relational Database

Do not try to design a Cassandra data model like you would with a relational database. 

Query first design: You must define how you plan to access the data tables at the beginning of the data modeling process not towards the end. 

No joins or derived tables: Tables cannot be joined so if you need data from more than one table, the tables must be merged into a denormalized table.   

Denormalization: Cassandra does not support joins or derived tables so denormalization is a key practice in Cassandra table design.

Designing for optimal storage: For relational databases this is usually transparent to the designer. With Cassandra, an important goal of the design is to optimize how data is distributed around the cluster. 

Sorting is a Design Decision: In Cassandra, sorting can be done only on the clustering columns specified in the PRIMARY KEY.

The Fundamental Goals of the Cassandra Data Model

Distributed data systems, such as Cassandra, distribute incoming data into chunks called partitions.  Cassandra groups data into distinct partitions by hashing a data attribute called partition key and distributes these partitions among the nodes in the cluster. 

(A detailed explanation can be found in Cassandra Data Partitioning.)

A good Cassandra data model is one that: 

  1. Distributes data evenly across the nodes in the cluster
  2. Place limits on the size of a partition
  3. Minimizes the number of partitions returned by a query.

Distributes Data Evenly Around the Cassandra Cluster

Choose a partition key that has a high cardinality to avoid hot spots—a situation where one or a few nodes are under heavy load while others are idle.   

Limits the Size of Partitions

For performance reasons choose partition keys whose number of possible values is bounded. For optimal performance, keep the size of a partition between 10 and 100MB. 

Minimize the Number of Partitions Read by a Single Query 

Ideally, each of your queries will read a single partition. Reading many partitions at one time is expensive because each partition may reside on a different node. The coordinator (this is the node in the cluster that first receives the request) will generally need to issue separate commands to separate nodes for each partition you request. This adds overhead and increases the variation in latency. Unless the data set is small, attempting to read an entire table, that is all the partitions, fails due to a read timeout. 

Understand the Importance of the Primary Key

Every table in Cassandra must have a  set of columns called the primary key. (In older versions of Cassandra, tables were called column families). In addition to determining the uniqueness of a row, the primary key also shapes the data structure of a table. The Cassandra primary key has two parts:

Partition key: The first column or set of columns in the primary key. This is required. The hashed value of the partition key value determines where the partition will reside within the cluster.

Clustering key (aka clustering columns): Are the columns after the partition key. The clustering key is optional. The clustering key determines the default sort order of rows within a partition.  

A very important part of the design process is to make sure a partition key will: 

  1. Distribute data evenly across all nodes in a cluster.  Avoid using keys that have a very small domain of possible values, such as gender, status, school grades, and the like.  The minimum number of possible values should always be greater  than the number of nodes in the cluster.  Also, avoid using keys where the distribution of possible values is highly skewed. Using such a key will create “hotspots” on the cluster. 
  2. Have a bounded range of values. Large partitions can increase read latency and cause stress on a node during a background process called compaction. Try to keep the size of partitions under 100MB. 

Model Around Your Queries

The Cassandra Query Language (CQL) is the primary language used to communicate with a Cassandra database. In syntax and function, CQL resembles SQL which makes it easy for those who know the latter to quickly learn how to write queries for Cassandra. But there are some important differences that affect your design choices. 

A well known one is that Cassandra does not support joins or derived tables. Whenever you require data from two or more tables, you must denormalize. 

Search conditions have restrictions that also impact the design. 

  • Only primary key columns can be used as query predicates. (Note: a predicate is an operation on expressions that evaluates to TRUE, FALSE).
  • Partition key columns are limited to equality searches. Range searches can only be done on clustering columns.
  • If there are multiple partition key columns (i.e. a composite partition key), all partition columns must be included in the search condition.
  • Not all clustering columns need to be included in the search condition. But there are some restrictions: 
    • When omiting columns you must start with the rightmost column listed in the primary key definition;  
    • An equality search cannot follow a range search.

Don’t Forget About the Data

Creating a complete Cassandra data model involves more than knowing your queries. You can identify all the queries correctly but if you miss some data, your model will not be complete.  Attempting to refactor a mature Cassandra data can be an arduous task. 

Developing a good conceptual model (see below) will help identify the data your application needs. 

Take a Structured Approach to Design 

In order to create a data model that is complete and high performing, it helps to follow a big data modeling methodology for Apache Cassandra that can be summarized as: 

  1. Data Discovery (DD). This is a high level view of the data your application needs and identifies the entities (things), the attributes of the entities, and which attributes are the identifiers. This may be an iterative process as development. 
  2. Identify the Access Patterns (AP).  Identify and list the queries your application will want to perform.  You need to answer: What data needs to be retrieved together, what are the search criteria, and what are the update patterns? This also may be an iterative process. 
  3. Map data and queries (MDQ).  Maps the queries to the data identified in steps 1 and 2 to create logical tables which are high level representations of Cassandra tables.
  4. Create the physical tables (PT).  Convert the logical data model to a physical data model (PDM) by using CQL CREATE TABLE statements. 
  5. Review and Refine physical data model.  Confirm that the physical tables will meet the 3 Basic Goals for Cassandra Data Model.

Structured approach to cassandra data model design

A more detail examination of these steps can be found in an earlier Instaclustr Whitepaper: 6 Step Guide to Apache Cassandra Data Modelling

If you have worked with relational database design, some steps will be familiar because they are also in the entity-relationship (ER) model.  At the conceptual stage, it can be useful to visually represent the data model by ER diagrams using either the Chen or Information Engineering (IE) notation. The Chebotko diagram uses a notation developed by Artem Chebotko to represent data and queries at the logical and physical modeling stages. 

Cassandra Model Example

Let’s assume that we have a simple logging system with two entities: LogSource and LogMessage.  For LogSource the key attribute is sourceName.  For the entity LogMessage, the key attribute is messageID.  

The query we want to execute is:  Q1) show the message information about the 10 most recent messages for a given source. 

The primary access entity is LogSource because it contains the equality search attribute (sourceName).  We create a logical table named LogMessage_by_Source and push the attribute sourceName into it. That becomes the partition key (indicated by the K).

We need to sort by time so messageTime becomes the clustering column in  LogMessage_by_Source.  (Indicated by C↑) 

The secondary entity is LogMessage. The key attribute messageID becomes a 2nd clustering column of the primary key in  LogMessage_By_Source to ensure uniqueness of the row.  Finally, we add the remaining columns from the secondary source to complete the data needed by the query. 

An example of Cassandra data model

Data Duplication 

Data duplication refers to the number of times data must be duplicated in different tables to satisfy access patterns.   For example, if  we wanted to search for a specific message by its  unique identifier we would duplicate the data by creating a new table called LogMessage_by_ID that uses  messageID as the partition key.

Two issues can arise from duplication: 

  • Increased complexity to maintain  data integrity across multiple tables; 
  • If the data being duplicated is very large it puts size and write pressure on the database.

In a case where data duplication would cause more problems than it solves, an alternative is to duplicate only lookup keys, that is a lookup table. However, this solution requires the client perform a second query to read the secondary data. The trade-off between read performance and data maintenance cost needs to be judged in the context of the specific performance requirements of your application and such a solution would need to be benchmarked to ensure that it is a workable solution.

Materialized Views

These are objects created by a query which are copies of a base table but with a different partition key. The data between the materialized view and the base table is automatically synchronized by Cassandra. Their purpose was to make modeling to new query patterns easier and more flexible.  

Instaclustr’s advice is not to use them in Cassandra 3.x because of problems in keeping the view and the base table synchronized. The Apache Cassandra project has classified Materialized Views as an experimental feature for Cassandra 3.x. 

Summary

Cassandra Data modeling is a process used to define and analyze data requirements and access patterns on the data needed to support a business process. 

A data model helps define the problem, enabling you to consider different approaches and choose the best one.  It ensures that all necessary data is captured and stored efficiently. 

Models document important concepts and jargon, proving a basis for long-term maintenance.

Creating a Cassandra is a non-relational database.  Do not design it as you would a relational database. Don’t be afraid to denormalize data. Writes in Cassandra are relatively cheaper than for relational databases.

 The goals of a successful Cassandra Data Model are to choose a partition key that (1)  distributes data evenly across the nodes in the cluster; (2) minimizes the number of partitions read by one query, and (3) bounds the size of a partition.

Take a structured approach to your model. Your first steps are understanding your and identifying access patterns on the data. These are most critical to developing a complete model.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post Cassandra Data Modeling Best Practices Guide appeared first on Instaclustr.

Cassandra Monitoring: A Best Practice Guide

Introduction to Cassandra Monitoring

Apache Cassandra is a NoSQL database designed to provide scalability, reliability, and availability with linear performance scaling. Cassandra database is designed as a distributed system, and aims to handle big data efficiently. Refer to what-is-apache-cassandra and cassandra-architecture for more information. Note that knowledge of Cassandra architecture and basic terminology is a prerequisite to understanding Cassandra monitoring. 

Cassandra monitoring is an essential area of database operations to ensure the good health of a cluster and optimal performance. Alerting is another crucial area for production systems, and it is complementary to monitoring. Good alerting in Cassandra can be achieved by utilization of the monitoring infrastructure and relevant toolset. Alerting and monitoring help create a robust environment for any Cassandra deployment.

This blog post aims to touch all important aspects of Cassandra monitoring. We hope it provides the reader with crucial information about monitoring tools, components, and metrics.

Monitoring Terminologies

JVM Based Monitoring

Cassandra is developed in Java and is a JVM based system. Each Cassandra node runs a single Cassandra process. JVM based systems are enabled with JMX (Java Management Extensions) for monitoring and management. Cassandra exposes various metrics using MBeans which can be accessed through JMX. Cassandra monitoring tools are configured to scrape the metrics through JMX and then filter, aggregate, and render the metrics in the desired format. There are a few performance limitations in the JMX monitoring method, which are referred to later. 

The metrics management in Cassandra is performed using Dropwizard library. The metrics are collected per node in Cassandra. However, those can be aggregated by the monitoring system. 

Cassandra Monitoring

Metrics 

There are a large number of metrics exposed by Cassandra to cover all possible areas including performance, resources, communication, node, and cluster state etc. The metrics are defined with distinct types, and those can be categorized as well for operational ease.    

Metrics Types

Cassandra metrics are defined with specific data types. These types are designed to accommodate metrics representations to represent the metrics like latency, counts, and others correctly. 

The metrics types are not intuitive and you might need some time to get familiar. 

  • Gauge: A single value representing a metric at a specific point in time, e.g. value of memory allocated or a number of active tasks. 
  • Counter: Counters are the same as a gauge but are used for value comparisons. Generally, a counter is only incremented, and it is reset when the functionality gets disrupted like a node restart. An example is cache_hit count.
  • Histogram: Histogram is a count of data elements from a data stream grouped in fixed intervals. A histogram gives a statistical distribution of values. The data elements are provided over min, max, mean, median, 75th, 90th, 95th, 98th, 99th, 99.9th percentile value intervals. 
  • Timer: Timer keeps the rate of execution and histogram of duration for a metric. 
  • Latency: This is a special type to measure latency. It includes Timer and the latency is in microseconds. There is also a TotalLatency with each latency metric. The total latency is the count of latency since the beginning. The beginning means the start of a node. 
  • Meter: Meter is a unit to measure throughput. It also includes a weighted moving average for first, fifth, and fifteenth minute.

Metrics Categories

The metrics are categorised based on Cassandra domains, e.g. table, keyspace, storage, communication, JVM etc. Not all metrics should be monitored all the time, but those should be available in case required, i.e. during troubleshooting or application performance testing. 

The metrics are further subdivided in terms of broader areas like resources, network, internals, crucial data elements etc. Metrics can be represented as per topology levels like cluster level, node level, table level etc. to organize all the information. 

The categorization becomes clear as we go through specific metrics and correlate those with specific Cassandra areas.

Metrics Format

The Cassandra dropwizard metrics are specified in format below:

Dropwizard Metric Name: org.apache.cassandra.metrics.<Metric scope>.<Metric type>.<MetricName>

Mbean: org.apache.cassandra.metrics:type=<Metric type> scope=<Metric scope> name=<MetricName>

Metric Type: This is the category of metrics e.g. table, keyspace, threadpool. Do not confuse this with the data type of metrics.

Metric scope: This is the metric sub type for more granularity wherever required. The scope is hence optional. E.g. the table name or keyspace name. 

Metric name: The final metric name like LiveSSTableCount. 

Essential Metrics

Cassandra Metrics

Node Status 

The status of nodes must be monitored and alerted immediately if a node is down. Cassandra cluster’s availability directly depends on the uptime of all the nodes in the cluster. Although the anti-entropy mechanism in Cassandra helps protect data from inconsistency, there is no replacement for lost performance during a node downtime. A down node puts pressure on other nodes in the data center to handle requests and store hints. Hence, downtime for a node should be minimum. 

Cassandra operational activity requires node restart or downtime but those can be scheduled at least busy times for the cluster. This alert helps keep track of any service disruption and the need to run repair a node. A node should be repaired if it is out of the cluster for more than the hinted handoff window which is three hours by default. 

Client Request Metrics

The client requests metrics provide information about client communication in forms of read and write requests per second between the client and a coordinator node. Other than normal read and write requests, there are special types of read and write operations CAS, View, and RangeSlice which have their own set of metrics. These metrics help to track the request count, latency, failures, and a few other statistics. The basic statistic to monitor is the number of requests per seconds, i.e. throughput and request latency.

Requests Per Second

The number of requests should be aggregated per data center and per node. There could be some nodes receiving more requests as compared to other nodes. This behaviour creates extra pressure for the nodes receiving more requests. The specific requests like CAS and RangeSlice should be tracked separately for clarity. These operations are resource-intensive and have a unique effect on the nodes. The ratio of read requests to write requests is crucial to understand the type of workload. There are specific configurations to optimize a read-heavy or a write-heavy workload. 

Each cluster can handle a certain amount of client requests per second efficiently. If the number of requests exceeds the cluster capacity, it can result in undesirable results like dropped messages, inconsistency, increased latency etc. The CAS and RangeSlice request can cause increased latency. 

Uneven load on a few nodes can be handled with optimal load balancing at the driver side. The read and write latency or throughput issues caused by constant overloading should be addressed by adding more nodes to the data center and revisiting the data model if required.

Alerting: Set alerts on the number of requests threshold served per node and data center. 

Client Request Latency

Latency tracked by these metrics is the read and write latency experienced by client applications. There are various percentiles of latency, as mentioned in latency metric type. These metric types should be tracked separately as well as overall values so that there is a clear view of system performance metrics. Production systems generally have latency SLAs. The SLA on a specific or overall latency should be tracked and alerted upon the client latency.

There are various factors which affect latency including, the amount of load served by a node or cluster, system resources and tuning, GC settings and behaviour, type of requests. Troubleshooting latency issues mainly depends on the accurate investigation of the root cause. Correlating latency metrics with other metrics helps to track down root causes. Using a graph solution like Grafana for visualization is the most efficient way to sight and track issues.

Alerting: Set alerts for latency SLA thresholds if any or expected latency range.

Request Timeout and Failure 

These metrics are the number of client requests timed out or failed. Failed requests are a clear indication of errors, and those should be addressed immediately. The common causes for request failure are unavailability of data, failure to get a response from the required number of replicas, data inconsistency, and network error. Troubleshooting for error is performed using the error messages and other metrics correlation. 

Alerting: Set alerts for more than a few failure requests on production systems.

Compaction Statistics 

This group of metrics include the amount of data compacted, the number of active/completed compactions, and other relevant details. Compactions consume node resources and could consume the disk space quickly. Monitoring compactions provides a good insight into the compaction strategy used as each strategy has a unique operational footprint. Specific Cassandra operations like repairs, high volume data writes, add/remove/replace nodes etc. increase the compaction activity. It is important to monitor the compactions while performing such operations. 

A common troubleshooting method for high compaction activities and high resource consumption is to throttle the compaction rate. In some scenarios, compactions can be temporarily stopped, but it requires a lot of caution and must be re-enabled at some point to keep the SSTable count low, and read latency optimal.

Alerting: Alerting is not essential for these metrics. However, alerts can be set if there are a higher number of pending compactions sustained for longer than expected time interval.

Garbage Collector Metrics

The Garbage Collector (GC) is yet another crucial area for monitoring. The efficiency of Cassandra throughput and performance depends on the effective use of JVM resources and streamlined GC. The GC behaviour mainly depends on these factors—the garbage collector used, the workload served by Cassandra nodes, GC parameter settings, the heap size for JVM etc. A common issue with garbage collection is long GC pause or the time taken to perform garbage collection. 

The GC works well with the default settings by Cassandra, but those can be tuned if required to suit a specific workload and the number of resources. GC parameter tuning is a non-trivial task and requires knowledge of GC internals. However, sometimes the GC can be resolved by fixing the data model, changing the workload, or JVM resources. It is essential to correlate bad GC behaviour with the exact root cause before performing a remedy. Also, any change in parameters impacting GC should be monitored carefully to ensure improvements. 

Alerting: Set alert on GC pauses for more than acceptable thresholds on production systems.

Memory Metrics

The memory metrics provide JVM heap, non-heap, and total memory used by Cassandra. The JVM heap storage is used heavily for a variety of purposes by Cassandra. The non-heap memory is also used a lot by later versions of Cassandra. Monitoring the heap and overall memory gives insight into memory usage. It can be used to correlate with any issues and determine memory requirements. 

Please note, Cassandra cannot scale with an indefinite amount of memory. This boils down to the fact that JVM and GC cannot perform optimally for large heap size. The most common range of heap size for Cassandra is 8GB-32GB where the smaller size is configured with CMS GC and the larger size with G1GC.

Alerting: Set alerts to test specific memory thresholds and tuning.  

Threadpool Metrics

Cassandra works with numerous thread pools internally. This design is aimed to achieve asynchronous tasks, and it also helps to handle back pressure. Monitoring for the thread pools makes it easy to understand the internal system behaviour. It also helps to understand  specific pools under pressure with active, pending, and blocked tasks. 

The solution for constantly saturated pools generally is to provide more processing capacity to the node or the cluster. Other core issues like poor data model and query pattern also impact on the thread pools. 

Alerting: Set alerts for more than a few blocked tasks on the production system. This helps take preventive action to help avoid performance impact.

Table Metrics 

Table metrics are useful in tracking each table independently. These can be used to monitor a specific set of tables which are performance-critical or host a large volume of data. There are various metrics for each table but some of the most important are discussed here:  

Partition Size

The partition size is a crucial factor in ensuring optimal performance. Cassandra uses partitions of data as a unit of data storage, retrieval, and replication. Hence, if the partition size is larger it impacts overall performance. The ideal range of partition size is less than 10MB with an upper limit of 100MB. These values are derived from operational experience from the Cassandra community. 

The data model and table definition control the partition size. The partition key for a table determines the data to create partitions. A partition key should be designed to accumulate data only up to acceptable size limits. Unfortunately, it is not easy to replace current partitions for a table. But, if the data model is in the design phase, it is crucial to test all the table definitions for potential large partitions sizes. In the existing tables, if large partitions are a major issue, they can be addressed by complete data rewrite. This operation could be long-running, but it can solve many performance issues, and if configured correctly, it can be performed without minimal or no downtime for the table. 

Alerting: Configure alerts on large partitions for tables with unbounded partitions. An unbounded partition is where the partition grows in size with new data insertion and does not have an upper bound.

Tombstone Scanned

Tombstones are the deletion markers in Cassandra. Tombstones are produced by data deletion, and it could be performed using various means like delete queries, TTL expiry, null inserts etc. The immutable design of SSTables and compaction operations makes tombstone eviction difficult in some scenarios. Tombstone presence directly impacts read performance; its effect increases with the number of tombstones scanned per operation. This metric provides a histogram of tombstones read for a table’s queries in recent time. 

The troubleshooting for tombstone eviction can be performed using various options like revisiting the compaction strategy, major compaction, nodetool garbagecollect etc. Note that all the mentioned remedies for tombstone eviction could operate on a large set of SSTables and are non-trivial operations. The operations must be well tested before executing on production. 

Alerting: Set alerts for tombstones-scanned per read metrics for performance-sensitive tables. 

SSTable Per Read

These metrics are related to the immutable design of SSTables and read operation. The SSTables are created per table, and the data is arranged sequentially in the order it is written. This results in multiple SSTable reads to complete a single read operation. The number of SSTables read contributes to the time consumed to complete the read operation. Hence, the number of SSTables per read should be minimized. 

A good number of SSTables per read is a relative value and depends on the data volume and compaction strategy. However, as a general rule, those should be less than 10. The compaction strategy used for a table plays a crucial role in this metric. A table should be configured with optimum compaction strategy as per the table usage. Repair operation plays a role in keeping the SSTables consistent and hence also indirectly impacts this metric. All the data in Cassandra should ideally be repaired once per gc_grace_seconds cycle. 

Alerting: Set alerts for all the read performance-sensitive and high data volume tables for SSTables per read. 

Additional Metrics

It is difficult to cover all the metrics present in Cassandra in this blog post, and it is also difficult to predict the most useful ones in general. I have tried to cover the most used metrics individually. But there are still some crucial metrics which are useful for getting insight in specific Cassandra areas. Let’s look at those briefly:

Dropped Messages

Cassandra handles many forms of messages corresponding to various functions. These messages can get dropped mostly due to load or communication error etc. The dropping of messages causes data inconsistency between nodes, and if those are frequent, it can cause performance issues. It is necessary to identify the cause of dropped messages. If those occur frequently or if those are in large numbers, the system resources and data model should be revisited. Alerts should be set for an unexpected occurrence or number of dropped messages. 

Caches For Tables

Cassandra uses quite some cache, and those are configurable. The cache metrics are useful to track effective use of a particular cache. A good example is the use of row cache for frequently accessed rows in a table. If caching hot data in row cache improves the cache hits, it is a successful use of the row cache. 

Data Streaming

Streaming is used while booting up new nodes, repair operations, and during some other cluster operations. Streaming operations can move many data across a cluster and hence consume network bandwidth. The streaming metrics are useful for monitoring node activities and repairs when planned. The streaming rate can be controlled if required to spare the bandwidth for operations.

Hinted Handoff 

Hints are a part of the anti-entropy mechanism, and those try to protect nodes from data loss when those are offline. Hints are stored and transferred, so metrics related to these attributes and delivery success, failure, delays, and timeouts are exposed. 

The hints metrics are useful to monitor all hints activities. A lot of hints stored and used indicate nodes being offline where hint delays, failures indicate a network or other communication issues.

CQL and Batch 

CQL metrics include the number of statements executed of each type. The batch metrics include the number of batch statements executed. These metrics help to monitor the application activity and query semantics used. Use of logged and unlogged batches has its caveats in Cassandra, and they can cause performance penalty if not used correctly. 

System Metrics

These metrics are not exported by Cassandra but those are obtained from the OS. These metrics are equally important as the Cassandra metrics to obtain system insights. 

Disk Usage

The disk usage is subject to monitoring as Cassandra is optimized to write a lot of data in quick time. The real risk for disk fillup is from compactions. The default compaction strategy used for Cassandra is SizeTieredCompactionStrategy STCS. This strategy merges many SSTables and outputs a single SSTable. The resulting SSTable can have a size equal to the combined size of all the SSTables merged in it. Also, until a compaction operation ends, both old and new SSTables exist on disk. 

The disk space guidelines for a cluster with most tables using STCS is to utilise the disk space up to 50% and to leave the rest as a room for compactions. Generally, disk space is cheaper in cost as compared to other resources and there is no harm to keep vacant space on nodes. However, if there is limited disk space available, disk monitoring becomes even more crucial as free disk left for compactions can be reduced further than general guidelines. 

Remedy for high disk usage includes snapshot deletion as those can consume a considerable amount of space. Another method is to stop specific compaction operation; this frees space consumed by the new SSTables. The time until the compaction starts again can be utilizd to add more space. 

Alerting: Set alerts for various stages of disk usage. The alerts can be categorized for severity based on the amount of free disk space on a node. 

CPU Usage

CPU capacity in a Cassandra cluster contributes as the main processing capacity. The number of requests served by a node and the amount of data stored are the factors directly proportional to the CPU utilization. CPU utilization should be monitored to ensure the nodes are not overloaded. 

A Cassandra cluster or a single data center should have all the nodes of similar size. Those should have an equal number of CPU cores, and the CPU utilization should also be equivalent. A single node or a few nodes with high CPU is an indication of uneven load or request processing across the nodes. It is observed that Cassandra is not CPU bound in most cases. However, a cluster or data center with high CPU utilization at most times should be considered for node size upgrade. 

Alerting: Set alerts for specific levels of CPU utilization on nodes or just for a single threshold. The levels can be defined as per expected CPU load, e.g. 80%, 90%, >95% etc. 

Monitoring tools

There are various tools available to set up Cassandra monitoring. I am describing here a few popular open-source tools used widely across the Cassandra community.

Prometheus

Prometheus is a metrics tool used for handling time-series based monitoring. It has alerting capability as well, which works on the time-series metrics. Prometheus can be configured to collect Cassandra metrics from nodes as well as the system metrics of the nodes. Prometheus uses exporters which are installed on the nodes and export data to Prometheus.  

Prometheus runs with a time-series database to store metrics. The metrics are stored in the database and can be queried using promQL, a query language for Prometheus. Prometheus also runs a web UI which can be used to visualise the actual metrics, graphs, alert rules etc. 

Alertmanager is the extension used for configuring alerts. Alertmanager has various integrations available for alerting including email, slack, hipchat, pagerduty etc. Prometheus has evolved over time, and it integrates well with the dropwizard metrics library. 

Prometheus - time-series based cassandra monitoring

Grafana

Grafana is a visualisation tool which can be used to visualize any time-series metrics. Grafana has various panels to showcase the data. The most commonly used panel is a graph. A graph is used to plot incoming data against a time-series in two dimensions. 

Grafana integrates with various data sources. These sources are queried in real-time by Grafana to obtain metrics. Each Grafana panel has one or more queries configured to query a data source; the result of the query is rendered on the panel. Grafana uses Prometheus as a well-integrated data source.

Grafana - Time series metrics visualization

Cassandra Exporter

Cassandra exporter is Instaclustr’s open-source solution for collecting Cassandra metrics efficiently. It is designed to integrate with Cassandra JVM and collect and publish metrics. Hence, Cassandra exporter is a replacement for the JMX metrics. 

JMX metrics in Cassandra have performance limitations and hence can cause some issues if used on systems with a large number of nodes. The Cassandra exporter has been well tested for optimal performance monitoring. The metrics produced by Cassandra exporter are also time-series and can be readily consumed by Prometheus. Please refer to the github page for information regarding configuration and usage. 

Conclusion

Cassandra monitoring is essential to get insight into the database internals. Monitoring is a must for production systems to ensure optimal performance, alerting, troubleshooting, and debugging. There are a large number of Cassandra metrics out of which important and relevant metrics can provide a good picture of the system. 

Finally, Instaclustr has the Cassandra monitoring expertise and capability with various options. 

  • Cassandra exporter is an excellent open source tool for optimal monitoring performance on large Cassandra clusters. 
  • Instaclustr Cassandra managed service uses a comprehensive monitoring-alerting service with 24×7 support and it is a good option to outsource all Cassandra operations and it comes with a free trial.

Instaclustr Cassandra Consulting services can help you with any monitoring or other Cassandra operations.

The post Cassandra Monitoring: A Best Practice Guide appeared first on Instaclustr.

Building a Low-Latency Distributed Stock Broker Application: Part 3

In the third blog of  “Around the World ” series focussing on globally distributed storage, streaming and search, we build a Stock Broker Application. 

1. Place Your Bets!

London Stock Exchange 1800’s

How did Phileas Fogg make his fortune? Around the World in Eighty Days describes Phileas Fogg in this way: 

Was Phileas Fogg rich? Undoubtedly. But those who knew him best could not imagine how he had made his fortune, and Mr. Fogg was the last person to whom to apply for the information.

I wondered if he had made his fortune on the Stock Market, until I read this:

Certainly an Englishman, it was more doubtful whether Phileas Fogg was a Londoner. He was never seen on ‘Change, nor at the Bank, nor in the counting-rooms of the “City“‘

Well, even if Fogg wasn’t seen in person at the ‘Change (London Stock Exchange), by 1872 (the year the story is set), it was common to use the telegraph (the internet of the Victorian age, which features regularly in the story) to play the market.

Victorian era Telegraph Office

(Not) a real Victorian era Telegraph Office

In fact the ability of the telegraph to send and receive information faster than horses/trains/boats etc. had been used for stock market fraud as early as 1814! (The “Great Stock Exchange Fraud of 1814”). Coincidentally (or not?), the famous London Stock Exchange Forgery, also involving the telegraph, also occurred in 1872! Perhaps this explains the ambiguity around the origin of Fogg’s wealth!

What is certain is that Phileas Fogg became the subject of intense betting, and he was even listed on the London Stock Exchange (Chapter V – IN WHICH A NEW SPECIES OF FUNDS, UNKNOWN TO THE MONEYED MEN, APPEARS ON ‘CHANGE):

Not only the members of the Reform, but the general public, made heavy wagers for or against Phileas Fogg, who was set down in the betting books as if he were a race-horse. Bonds were issued, and made their appearance on ‘Change; “Phileas Fogg bonds” were offered at par or at a premium, and a great business was done in them. But five days after the article in the bulletin of the Geographical Society appeared, the demand began to subside: “Phileas Fogg” declined. They were offered by packages, at first of five, then of ten, until at last nobody would take less than twenty, fifty, a hundred!”

The 1870’s also saw the introduction of a new technological innovation in Stock Trading, the Stock Ticker machine. Stock tickers were a special type of telegraph receiver designed to print an alphabetical company symbol and the current price of that company’s stock on a paper roll called ticker tape. This enabled stock prices to be communicated closer to real-time across vast distances, and revolutionized trading. 

1870’s Stock Ticker Machine

1870’s Stock Ticker Machine

2. Let’s Build a Stock Broker Application

Fast forward 128 years from 1872 to 2000 and technology looked a bit different. I’m taking inspiration from an earlier project I worked with from 2000-2003 at CSIRO (Australia’s national science research agency) called “StockOnline”. This was an online Stock Broker application designed to benchmark new component-based middleware technologies, including Corba and Enterprise Java (J2EE). The original version simulated traders checking their stock holdings and current stock prices, and then buying and selling stocks, resulting in revised stock holdings. The benchmark could be configured with different workload mixes, and the number of concurrent traders could be scaled up to stress the system under test. Metrics captured included the relative number, throughput, and response time of each of the operations. 

Some of the technology innovations that the project was designed to give insights into included: 

  • the use of application servers to provide easy to manage and scalable container resourcing (yes, containers are at least 20 years old); 
  • how portable the application was across multiple different vendors application servers (sort of);
  •  the impact of JVM choice and settings (lots); 
  • explicit support for component configuration (e.g. wiring components together); and 
  • deployment into containers, rich container services, and multiple persistence models to manage state and allow database portability (e.g. Container Managed Persistence vs. Bean Managed Persistence). 

At the end of the project we made the StockOnline code and documentation open source, and I recently rediscovered it and made it available on github. I was surprised to learn that Enterprise Java is still going strong and is now run by the Eclipse Foundation and called “Jakarta EE”.  Also interesting is that there is support for persistence to Cassandra

So let’s move on to the present day.

3. High-Frequency Low-Latency Trading

Modern stock markets are fast-moving, ultra-competitive environments. Orders are sent to the market by high speed networks and executed almost instantly. This makes low-latency trading a key to profitability.  Trade related latency is any delay in the time it takes for a trader to interact with the market, and includes distance related network delays, delays in receiving and acting on information, and delays caused by brokers (e.g. queuing of orders, delays interacting with the stock exchange to trade the orders, etc.). Some of the key solutions to reducing latency are broker side hosting of orders (orders are hosted on brokers and automatically traded when conditions are met), and Direct Market Access (brokers are as close as possible to stock exchanges, with super-fast network connections).

A new type of US Stock Exchange (IEX) was even created to address some of the issues around fairness of stock trading due to latency. Some brokers are able to take advantage of even small latency differences – “price snipping”, or so called “dark pools” which fill orders from within a pool rather than via public stock exchanges, to make huge profits. Although, somewhat oddly, the IEX levelled the playing field by introducing delays to ensure that no one playing the market has more up-to-date information than anyone else.

Latency is partially caused by the global location of stock exchanges. Where are stock exchanges located?  There are 60 exchanges around the world on every continent with a total value of $69 Trillion, and 16 worth more than $1 Trillion each!

4. Initial Application Design

I had already started writing a new version of StockOnline before I rediscovered the original, so the new version doesn’t share any of the original code. However, it does turn out to have similar entities, but with some significant differences to model multiple StockExchanges and Brokers.  Here’s the UML Class diagram of my new prototype code:

The first major difference is that it’s designed to model and simulate distributed stock brokers across multiple global “georegions”. We introduced the concept of georegions in blog 1 (called “latency regions”) and blog 2 (called “georegions”). A georegion is a geographic region that has at least two AWS regions (for data center redundancy), and ensures that applications within the same georegion are within 100ms latency of each other and users in the same georegion in case of failure of one region.  Here’s the map from the previous blogs showing the eight sub 100ms latency georegions that we identified (North America, South America, Atlantic, Europe, Middle East, Central Asia, East Asia, Australasia):

This means that I have to explicitly model multiple StockExchanges. Each StockExchange is in a particular location in a georegion.  Each StockExchange is responsible for listing some stocks, providing meta-data about the stocks,  publishing changes to stock prices as StockTickers, and matching buy/sell orders (i.e. executing trades).  For simplicity we assume that each stock is only listed on a single StockExchange. 

Each georegion has one or more StockBrokers which are co-located in the same georegion as some StockExchanges to ensure low-latency (and potentially redundancy).  The StockBrokers are responsible for discovering StockExchanges, all the stocks listed for trading, obtaining StockTickers to update current price data, and computing trends and longer term Stock Statistics that inform traders making decisions about buying and selling. They are also responsible for keeping track of trader data, updating StockHoldings for Traders, keeping track of orders and trading them on the appropriate StockExchanges, and keeping records of trades (transactions). Also different to the original version (which only had a single Market Order type), I wanted to have multiple different order types including Market, Limit and Stop Orders. This is important for the latency story as market orders are traded “manually” and immediately, but Limit and Stop Orders are traded automatically when they meet specific conditions, so can be traded very quickly and in larger volumes, this is a good explanation).

We assume that traders connect to their nearest StockBroker (to reduce latency and possibly to satisfy local financial rules). There is a series of operations supported by StockBrokers for traders, and also for interacting with the StockExchanges as follows.  First let’s look at the workflow for trading Market Orders, “Place Market Order”. These are essentially synchronous and immediate trades. The trader connects to the nearest broker, gets their current StockHoldings and current StockStatistics (for their holdings and possibly for other stocks they don’t currently own). Based on this information they decide what stocks to trade, whether to buy or sell, and the quantity of stocks, and create a Market Order. The broker then processes the Market Order (which may involve sending it to another broker), and receives confirmation that the trade occurred (including price, quantity, transaction ID etc.), and finally updates the trader’s StockHoldings for the stock traded. 

The steps to “Process Market Order” are as follows.  The order is sent to a broker in the same Georegion as the StockExchange listing the stock. This broker then immediately executes the trade (buys or sells) with the StockExchange, gets the transaction details, marks the order as filled (so it isn’t processed more than once), and updates the StockHolding amounts for the trader. 

The “Execute Trade with StockExchange” assumes that a trade is always possible (at the current price) and will occur instantaneously and completely, and has the following sub steps:

Market Orders are potentially a slow process due to all the synchronous steps, “think time” for the trader, and cumulative latencies due to the trader to broker, broker to broker, and broker to StockExchange communication paths.

As an alternative we also provide some asynchronous Order types: Limit and Stop. These brder types are only traded when the conditions are met, but then need to be executed as quickly as possible to prevent losses in a fast moving market.

We assume that the initial steps are mostly the same as “Place Market Order”, but with the added choice of Limit of Stop Order, and the limit price, and the final step (notification of eventual Trade) is asynchronous:

Once the Order placed, it is processed by sending it to the correct broker (as for Market Orders), and then that broker is responsible for continuously checking orders to see if they match:

This is done as follows (“Trade Matching Orders”) and relies on each broker receiving a stream of StockTicker updates from the StockExchanges in the same georegion. For each StockTicker the broker finds orders for that stock, and checks which orders meet the buy/sell conditions (the logic depends on the order type, if the price is rising or dropping, and if the current price is less than or greater to the order limit price). If the matching order(s) are Sell Orders then an extra step is to check that the trader still has sufficient holdings of that stock (they may have already sold some stock due to other orders being executed first). If all conditions are met then the broker initiates an immediate “Market” trade with the StockExchange as before.

The initial prototype application code is written in pure Java and just simulates the expected behaviour of the traders, brokers, and StockExchanges. It creates a specified number of georegions, brokers, StockExchanges, stocks, and traders with initial data. Then the simulation runs lots of rounds (seconds) for a specified period (e.g. a day).  Each round results in traders checking their holdings and StockStatistics, and creating orders (about ⅓ of each type, but only if the type matches the specific stock and market conditions). The orders are sent to the correct brokers. Each round the brokers receive simulated StockTickers from StockMarkets in the same georegion (using a pseudo random walk which keeps the stock direction for the majority of the time, but occasionally changing direction). Some stocks are more volatile than others so change faster.  Each round the brokers immediately Trade Market Orders, and check the conditions and trade matching Limit or Stop Orders. 

5. Initial Simulation Results—“Time Is Money”!

The simulation computes business level metrics including number of trades, value of trades, etc., and expected end-to-end latency based on deploying brokers in 8 georegions, and using the AWS inter-region latencies from blog 1 of this series. This gives us a baseline to eventually compare the real results with. The average latency to get the current stock price from a remote StockExchange and “Process Market Orders” is 700ms (max 1.2s), which includes multiple times for intra-region and inter-region networking. The average latency for Limit and Stop “Trade Matching” Orders is shorter at 100ms (max 200ms), as it only includes times to get StockTicker updates and the time to trade;.  i.e. it doesn’t include any AWS inter-region latencies as the operation is asynchronous and processed entirely within the georegion of the broker/StockExchange (we expect this to be slightly higher in practice due to the eventual overhead of database lookups and condition checking on the broker).

So is the saying “Time Is Money!” true in the context of low latency trading, and how much money exactly? I added a calculation to the simulation to compute potential profit losses assuming high volatility in the prices of some stocks, and higher latency times to trade. Due to potentially high volumes of trades even a small profit loss per trade can add up to big losses in profit very quickly.  For one simulation run with 2,101 completed trades, the potential profit loss for the higher latency Market Orders was 0.7% of the trade value (or Market Orders), but for the lower latency Limit and Stop Orders it was significantly less at 0.1% of the trade value (for those order types). For an average order size of $20,000 this corresponds to a $140 profit loss per Market Order, compared with only $20 profit loss for each Limit and Stop Order. Over hundreds or even thousands of trades per day (typical of HFT) this would quickly add up to significant amounts of money! Moreover, to make a profit, High Frequency Trading (HFT) relies on conducting a high volume of trades to rapidly take advantage of very small movements in prices, with potentially smaller profits per trade. So it’s easy to see why minimizing latency is a worthwhile goal in Fintech applications such as this. 

6. What Next?

In the next few blogs we’ll continue our journey “Around the World” and explore how to refine the initial simple design of the application so as to deploy and run it across multiple georegions using multiple AWS regions.

Initially this will involve mapping the components to Cassandra tables on a single data center. Once it’s working correctly with a single Cassandra data center, we’ll extend it to use multiple Cassandra data centers, which will require the use of multiple keyspaces for different replication topologies (e.g. replication across pairs of data centers vs. all data centers). We’ll also work out if, and how, to load-balance the application across multiple data centers in the same georegions, and how to enable redundancy, failover, and recovery at the application level.  It’s possible that a Kubernetes federated microservices mesh framework will help in doing this. We also plan to put Kafka to use to enable streaming StockTickers, so we’ll be investigating Kafka multi data center replication. 

7. Further Resources

IBM also has a StockTrader demonstration application, and an in-depth series about deploying it using Cassandra, Kafka, Redis, and Kubernetes.

There’s an example of stock analysis using Elasticsearch (I’m planning on using Elasticsearch to analyse the stock trends, and provide some map-based visualisation of the data).

This is an interesting article on “Hacking a HFT System”!

The Original CSIRO StockOnline code and documentation is now on github.

The post Building a Low-Latency Distributed Stock Broker Application: Part 3 appeared first on Instaclustr.

A Comprehensive Guide to Cassandra Architecture

Introduction

The Apache Cassandra architecture is designed to provide scalability, availability, and reliability to store massive amounts of data. If you are new to Cassandra, we recommend going through the high-level concepts covered in what is Cassandra before diving into the architecture.  

This blog post aims to cover all the architecture components of Cassandra. After reading the post, you will have a basic understanding of the components. This can be used as a basis to learn about the Cassandra Data Model, to design your own Cassandra cluster, or simply for Cassandra knowledge.

Cluster Topology and Design

Cassandra is based on distributed system architecture. In its simplest form, Cassandra can be installed on a single machine or in a docker container, and it works well for basic testing. A single Cassandra instance is called a node. Cassandra supports horizontal scalability achieved by adding more than one node as a part of a Cassandra cluster. The scalability works with linear performance improvement if the resources are configured optimally.

Cassandra works with peer to peer architecture, with each node connected to all other nodes. Each Cassandra node performs all database operations and can serve client requests without the need for a master node. A Cassandra cluster does not have a single point of failure as a result of the peer-to-peer distributed architecture. 

Nodes in a cluster communicate with each other for various purposes. There are various components used in this process:

  • Seeds: Each node configures a list of seeds which is simply a list of other nodes. A seed node is used to bootstrap a node when it is first joining a cluster. A seed does not have any other specific purpose, and it is not a single point of failure. A node does not require a seed on subsequent restarts after bootstrap. It is recommended to use two to three seed nodes per Cassandra data center (data centers are explained below), and keep the seeds list uniform across all the nodes. 
  • Gossip: Gossip is the protocol used by Cassandra nodes for peer-to-peer communication. The gossip informs a node about the state of all other nodes. A node performs gossip with up to three other nodes every second. The gossip messages follow specific format and version numbers to make efficient communication.

A cluster is subdivided into racks and data centers. These terminologies are Cassandra’s representation of a real-world rack and data center. A physical rack is a group of bare-metal servers sharing resources like a network switch, power supply etc. In Cassandra, the nodes can be grouped in racks and data centers with snitch configuration. Ideally, the node placement should follow the node placement in actual data centers and racks. Data replication and placement depends on the rack and data center configuration. 

Cluster subdivided into Racks and Data centers

Multiple Data Centers

A rack in Cassandra is used to hold a complete replica of data if there are enough replicas, and the configuration uses NetworkTopologyStrategy, which is explained later. This configuration allows Cassandra to survive a rack failure without losing a significant level of replication to perform optimally. 

There are various scenarios to use multiple data centers in Cassandra. Few common scenarios are:

  • Build a Cassandra cluster with geographically distinct data centers which cater to clients from distinct locations, e.g.a cluster with three data centers in US, EU, and APAC serving local clients with low latency.
  • Separate Cassandra data centers which cater to distinct workloads using the same data, e.g. separate data centers to serve client requests and to run analytics jobs.
  • Active disaster recovery by creating geographically distinct data centers, e.g. a cluster with data centers in each US AWS region to support disaster recovery.

Database Structures

Cassandra stores data in tables where each table is organized in rows and columns same as any other database. Cassandra table was formerly referred to as column family. Tables are grouped in keyspaces. A keyspace could be used to group tables serving a similar purpose from a business perspective like all transactional tables, metadata tables, use information tables etc. Data replication is configured per keyspace in terms of replication factor per data center and the replication strategy.  See the replication section for more details.

Each table has a defined primary key. The primary key is divided into partition key and clustering columns. The clustering columns are optional. There is no uniqueness constraint for any of the keys.

The partition key is used by Cassandra to index the data. All rows which share a common partition key make a single data partition which is the basic unit of data partitioning, storage, and retrieval in Cassandra.  

Refer to cassandra-data-partitioning for detailed information about this topic. 

Partitioning

A partition key is converted to a token by a partitioner. There are various partitioner options available in Cassandra out of which Murmur3Partitioner is used by default. The tokens are signed integer values between -2^63 to +2^63-1, and this range is referred to as token range. Each Cassandra node owns a portion of this range and it primarily owns data corresponding to the range. A token is used to precisely locate the data among the nodes and on data storage of the corresponding node.  

It is evident that when there is only one node in a cluster, it owns the complete token range. As more nodes are added, the token range ownership is split between the nodes, and each node is aware of the range of all the other nodes. 

Here is a simplified example to illustrate the token range assignment. If we consider there are only 100 tokens used for a Cassandra cluster with three nodes. Each node is assigned approximately 33 tokens like: 

 node1: 0-33 node2: 34-66 node3: 67-99. 

 If there are nodes added or removed, the token range distribution should be shuffled to suit the new topology. This process takes a lot of calculation and configuration change for each cluster operation. 

Virtual nodes/Vnodes

To simplify the token calculation complexity and other token assignment difficulties, Cassandra uses the concept of virtual nodes referred to as Vnodes. A cluster is divided into a large number of virtual nodes for token assignment. Each physical node is assigned an equal number of virtual nodes. In our previous example, if each node is assigned three Vnodes and each Vnode 11 tokens: 

 v1:0-9, v2:10-19, v3:20-29 so on 

 Each physical node is assigned these vnodes as:

 node1: v1, v4, v7 node2: v2, v5, v8 node3: v3, v6, v9 

Virtual Nodes or Vnodes

The default number of Vnodes owned by a node in Cassandra is 256, which is set by  num_tokens property. When a node is added into a cluster, the token allocation algorithm allocates tokens to the node. The algorithm selects random token values to ensure uniform distribution. But, the num_tokens property can be changed to achieve uniform data distribution. The number of 256 Vnodes per physical node is calculated to achieve uniform data distribution for clusters of any size and with any replication factor. In some large clusters, the 256 Vnode do not perform well please refer blog cassandra-vnodes-how-many-should-i-use for more information.

Replication

The data in each keyspace is replicated with a replication factor. The most common replication factor used is three. There is one primary replica of data which resides with the token owner node as explained in the data partitioning section. The remainder of replicas are placed by Cassandra on specific nodes using replica placement strategy. All replicas are equally important for all database operations except for a few cluster mutation operations.

There are two settings which mainly impact replica placement. First is snitch, which determines the data center, and the rack a Cassandra node belongs to, and it is set at the node level. They inform Cassandra about the network topology so that requests are routed efficiently and allow Cassandra to distribute replicas by grouping machines into data centers and racks. GossipingPropertyFileSnitch is the goto snitch for any cluster deployment. It uses a configuration file called cassandra-rackdc.properties on each node. It contains the rack and data center name which hosts the node. There are cloud-specific snitch available for AWS and GCP. 

The second setting is the replication strategy. The replication strategy is set at keyspace level. There are two strategies: SimpleStrategy and NetworkTopologyStrategy. The SimpleStrategy does not consider racks and multiple data centers. It places data replicas on nodes sequentially. The NetworkTopologyStrategy is rack aware and data center aware. SimpleStrategy should be only used for temporary and small cluster deployments, for all other clusters NetworkTopologyStrategy is highly recommended. A keyspace definition when used with NetworkTopologyStrategy specifies the number of replicas per data center as:

cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy', dc_1: 3, dc_2: 1}

Here, the keyspace named ks is replicated in dc_1 with factor three and in dc_2 with factor one.

Consistency and Availability

Each distributed system works on the principle of CAP theorem. The CAP theorem states that any distributed system can strongly deliver any two out of the three properties: Consistency, Availability and Partition-tolerance. Cassandra provides flexibility for choosing between consistency and availability while querying data. In other words, data can be highly available with low consistency guarantee, or it can be highly consistent with lower availability. For example, if there are three data replicas, a query reading or writing data can ask for acknowledgments from one, two, or all three replicas to mark the completion of the request. For a read request, Cassandra requests the data from the required number of replicas and compares their write-timestamp. The replica with the latest write-timestamp is considered to be the correct version of the data. Hence, the more replicas involved in a read operation adds to the data consistency guarantee. For write requests, the requested number is considered for replicas acknowledgeing the write. 

Naturally, the time required to get the acknowledgement from replicas is directly proportional to the number of replicas requests for acknowledgement. Hence, consistency and availability are exchangeable. The concept of requesting a certain number of acknowledgements is called tunable consistency and it can be applied at the individual query level. 

There are a few considerations related to data availability and consistency: 

  • The replication factor should ideally be an odd number. The common replication factor used is three, which provides a balance between replication overhead, data distribution, and consistency for most workloads.    
  • The number of racks in a data center should be in multiples of the replication factor. The common number used for nodes is in multiples of three. 
  • There are various terms used to refer to the consistency levels – 
    • One, two, three: Specified number of replicas must acknowledge the operation.
    • Quorum: The strict majority of nodes is called a quorum. The majority is one more than half of the nodes. This consistency level ensures that most of the replicas confirm the operation without having to wait for all replicas. It balances the operation efficiency and good consistency. e.g.Quorum for a replication factor of three is (3/2)+1=2; For replication factor five it is (5/2)+1=3.
    • Local_*: This is a consistency level for a local data center in a multi-data center cluster. A local data center is where the client is connected to a coordinator node. The * takes a value of any specific number specified above or quorum, e.g. local_three, local_quorum. 
    • Each_*: This level is also related to multi data center setup. It denotes the consistency to be achieved in each of the data centers independently, e.g. each_quorum means quorum consistency in each data center. 

The data written and read at a low consistency level does not mean it misses the advantage of replication. The data is kept consistent across all replicas by Cassandra, but it happens in the background. This concept is referred to as eventual consistency. In the three replica example, if a user queries data at consistency level one, the query will be acknowledged when the read/write happens for a single replica. In case of a read operation, this could mean relying on a single data replica as a source of truth for the data. In case of a write operation, the remainder replicas receive the data later on and are made consistent eventually. In case of failure of replication, the replicas might not get the data. Cassandra handles replication shortcomings with a mechanism called anti-entropy which is covered later in the post. 

Query Interface

Cassandra Query Language CQL is the interface to query Cassandra with a binary protocol. Earlier versions of Cassandra supported thrift which is now entirely replaced by CQL. CQL is designed to be similar to SQL for a quicker learning curve and familiar syntax. The DDL operations allow to create keyspace and tables, the CRUD operations are select, insert, update, and delete where select is a Cassandra read operation, and all others are Cassandra write operations. 

A table definition includes column definitions and primary, partition, and clustering keys. The table definition also contains several settings for data storage and maintenance. The primary key is a combination of partition key and clustering columns. The clustering columns are optional. The partition key can be a single column or a composite key. 

The query set available in CQL is quite limited as compared to SQL. A few highlights: 

  • Cassandra does not support join operations and nested queries. 
  • Each select query should specify a complete partition key. It is possible to query multiple partitions, but not recommended. Refer cassandra scalability 
  • Cassandra supports a limited set of data aggregation operations.
  • The order by clause can be used only for columns in the clustering key. Also, those should be used in the correct order of precedence.

The reason for a limited query set in Cassandra comes from specific data modelling requirements. The data model for a Cassandra database should be aimed to create denormalized tables which can cater to the select query patterns. Cassandra data modeling is one of the essential operations while designing the database. All the features provided by Cassandra architecture like scalability and reliability are directly subject to an optimum data model. Refer cassandra-data-modelling for details on the topic.  

The Cassandra driver program provides a toolset for connection management, pooling, and querying. The driver creates a connection with a Cassandra node which is then referred to as the coordinator node for the query. The coordinator is responsible for query execution and to aggregate partial results. 

The Datastax Java Driver is the most popular, efficient and feature rich driver available for Cassandra. There are several other technology drivers which provide similar functionality. 

Data Storage

Cassandra uses commit log for each incoming write request on a node. Commit log is a write-ahead log, and it can be replayed in case of failure. The on-disk data structure is called SSTable. SSTables are created per table in the database. 

Example:

Consider a sample keyspace and table created as follows.

cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy','datacenter_1' : 3};

cqlsh> CREATE TABLE ks.tb (
    id int PRIMARY KEY,
    col1 text);

And insert some data:
cqlsh> insert into ks.tb (id, col1) values (1, 'first_row');
cqlsh> insert into ks.tb (id, col1) values (2, 'second_row');
cqlsh> insert into ks.tb (id, col1) values (3, 'third_row');

The data we inserted looks as given below in an SSTable. 

Note that this representation is obtained by a utility to generate human-readable data from SSTables. The actual data in SSTables is in binary format and compressed for efficiency.

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 33,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:07.756013Z" },
        "cells" : [
          { "name" : "col1", "value" : "first_row" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "2" ],
      "position" : 34
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 71,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:29.923397Z" },
        "cells" : [
          { "name" : "col1", "value" : "second_row" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "3" ],
      "position" : 72
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 108,
        "liveness_info" : { "tstamp" : "2020-04-14T13:22:39.282459Z" },
        "cells" : [
          { "name" : "col1", "value" : "third_row" }
        ]
      }
    ]
  }
]

Cassandra maintains immutability for data storage to provide optimal performance. Hence, SSTables are immutable. The updates and deletes to data are handled with a new version of data. This strategy results in multiple versions of data at any given time. Cassandra is designed to be optimistic for write operations as compared to the read operations. The read operation consolidates all versions of the data and returns the most recent version. Each data cell is written with a write-timestamp which specifies the time when the particular data was written. This timestamp is used to find the latest version of data while retrieving data for a read operation. 

In the above example, we update data for a column of id 1 and see the result:

cqlsh> update ks.tb set col1='updated_row_one' where id=1;

The resulting data in the SSTable for this update looks like:

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 39,
        "cells" : [
          { "name" : "col1", "value" : "updated_row_one", "tstamp" : "2020-04-14T13:38:37.794697Z" }
        ]
      }
    ]
  }
]

The data looks precisely the same to the newly inserted data. Cassandra identifies this and considers the updated value as it has greater timestamp value. 

The deletes are handled uniquely in Cassandra to make those compatible with immutable data. Each delete is recorded as a new record which marks the deletion of the referenced data. This special data record is called a tombstone. Cassandra read operation discards all the information for a row or cell if a tombstone exists, as it denotes deletion of the data. There are various types of tombstones to denote data deletion for each element, e.g. cell, row, partition, range of rows etc. Cassandra allows setting a Time To Live TTL on a data row to expire it after a specified amount of time after insertion. The data once past its TTL is regarded as a tombstone in Cassandra. Refer managing-tombstones-in-cassandra for operational information and efficiency about tombstones. 

Now with the SSTable example, a cell delete looks like:

cqlsh> delete col1 from ks.tb where id=1;

[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 24,
        "cells" : [
          { "name" : "col1", "deletion_info" : { "local_delete_time" : "2020-04-14T13:44:27Z" },
            "tstamp" : "2020-04-14T13:44:27.179254Z"
          }
        ]
      }
    ]
  }
]

The deletion_info indicates that the cell is deleted. This data is the tombstone for the original data and all the data versions. 

Cassandra performs compaction operation on SSTables which consolidates two or more SSTables to form a new SSTable. This process combines all versions of data in participating SSTables. The compaction outputs a single version of data among all obtained versions in the resulting SSTable. Compactions also purge the data associated with a tombstone if all the required conditions for purging are met. There are various strategies to trigger and perform compaction. Refer apache-cassandra-compactions

  • SizeTieredCompactionStrategy (STCS): This is the default compaction strategy. It is triggered using the size of SSTables on-disk. 
  • LevelledCompactionStrategy (LCS): This strategy is used to optimize read performance. This strategy considers the data partitions present in SSTables, and arranges SSTables in levels. Each level has a fixed set of tables and those are compacted with each other.  
  • TimeWindowCompactionStrategy (TWCS): This is a specialized strategy for time series data. It arranges SSTables in time window buckets defined in the table definition. The SSTables within a time window are only compacted with each other. 

The other crucial set of operations performed in Cassandra is anti-entropy. The aim of these operations is to keep data as consistent as possible. The anti-entropy enables Cassandra to provide the eventual consistency model.

  • Hinted Handoff: If a node in Cassandra is not available for a short period, the data which is supposed to be replicated on the node is stored on a peer node. This data is called hints. Once the original node becomes available, the hints are transferred to the node, and the node is caught up with missed data. There are time and storage restrictions for hints. If a node is not available for a longer duration than configured, no hints are saved for it. Hints cannot be considered as a primary anti-entropy mechanism.
  • Read Repair: Read operation is used as an opportunity to repair inconsistent data across replicas. The latest write-timestamp is used as a marker for the correct version of data. The read repair operation is performed only in a portion of the total reads to avoid performance degradation. Read repairs are opportunistic operations and not a primary operation for anti-entropy.
  • Repair: Repair is the primary anti-entropy operation to make data consistent across replicas. Repairs are performed by creating specialized data structures called Merkel-trees. These are hash values of all data values in a replica. Then these are transferred to other replicas and compared to detect inconsistencies. The correct data is then streamed across nodes to repair the inconsistencies.

Repairs need to be scheduled manually as these are intensive operations that consume a significant amount of cluster resources. 

Write Path

Cassandra write path is the process followed by a Cassandra node to store data in response to a write operation. A coordinator node initiates a write path and is responsible for the request completion. 

The high-level steps are as follows:

  1. The partitioner applies hash to the partition key of an incoming data partition and generates a token.
  2. The node is identified where the partition belongs to and all the nodes where the replicas reside for the partition.
  3. Write request is forwarded to all replica nodes, and acknowledgement is awaited. 
  4. As the number of nodes required to fulfil the write consistency level acknowledge the request completion, the write operation completes. 

An example with a six node cluster, a replication factor of three and a write request consistency of quorum. 

Quorum for RF 3 = (3/2)+1 = 2

Common error scenarios:

  1. If the sufficient number of nodes required to fulfil the request are not available, or do not return the request acknowledgement, coordinator throws an exception.  
  2. Even after satisfying the request with the required number of replica acknowledgements, if an additional node which stores a replica for the data is not available,  the data could be saved as a hint on another node. 

In a multi-data center cluster, the coordinator forwards write requests to all applicable local nodes. For the remote data centers, the write request is forwarded to a single node per data center. The node replicates data to the data center with the required number of nodes to satisfy the consistency level. 

The Anatomy of a Write Operation on a Node

This operation involves commit log, memtable and SSTable. Commit log is a write-ahead log which is stored on-disk. The write operation is recorded in the commit log of a node, and the acknowledgement is returned. The data is then stored in a memtable which is in memory structure representing SSTable on-disk. The memtable is flushed to disk after reaching the memory threshold which creates a new SSTable. The SSTables are eventually compacted to consolidate the data and optimize read performance.

Read Path 

Cassandra read path is the process followed by a Cassandra node to retrieve data in response to a read operation. The read path has more steps than the write path. Actions performed to serve a read request are as follows:

  1. The coordinator generates a hash using the partition key and gathers the replica nodes which are responsible for storing the data.
  2. The coordinator checks if replicas required to satisfy the read consistency level are available. If not, an exception is thrown, and the read operation ends.
  3. The coordinator then sends a read data request to the fastest responding replica; the fastest replica could be the coordinator itself. The fast replica is determined by dynamic snitch, which keeps track of node latencies dynamically.
  4. The coordinator then sends a digest request to the replicas of data. The digest is a hash calculated over requested data by the replica nodes.
  5. The coordinator compares all the digests to determine whether all the replicas have a consistent version of the data. If those are equal, it returns the result obtained from the fastest replica.

If the digests from all the replicas are not equal, it means some replicas do not have the latest version of the data. In this case, read data requests for all the replicas are issued, and the data with the latest timestamp is returned to the client. Also, read repair requests are issued for the replicas which do not have the latest data version.

Components involved in a read operation on a node:

  • Row cache: This is a cache for frequently read data rows, also referred to as hot data. It stores a complete data row which can be returned directly to the client if requested by a read operation. This is an optional feature and works best if there are a small number of hot rows which can fit in the row cache.
  • Partition key cache: This component caches the partition index entries per table which are frequently used. In other words, it stores the location of partitions which are commonly queried but not the complete rows. This feature is used by default in Cassandra, but it can be optimized more.
  • Bloom filter: A bloom filter is a data structure which indicates if a data partition could be included in a given SSTable. The positive result returned by a bloom filter can be a false signal, but the negative results are always accurate. Hence it saves a lot of seek-time for read operations.
  • Partition index and summary: A partition index contains offset of all partitions for their location in SSTable. The partition summary is a summary of the index. These components enable locating a partition exactly in an SSTable rather than scanning data.
  • Memtable: Memtable is in-memory representation of SSTables. If a data partition is present in memtable, it can be directly read for specific data rows and returned to the client.
  • Compression offset map: This is the map for locating data in SSTables when it is compressed on-disk. 
  • SSTable: The on-disk data structure which holds all the data once flushed from memory. 

Anatomy of Read Operation on a Node

  1. Cassandra checks the row cache for data presence. If present, the data is returned, and the request ends.
  2. The flow of request includes checking bloom filters. If the bloom filter indicates data presence in an SSTable, Cassandra continues to look for the required partition in the SSTable.
  3. The key cache is checked for the partition key presence. The cache hit provides an offset for the partition in SSTable. This offset is then used to retrieve the partition, and the request completes.
  4. Cassandra continues to seek the partition in the partition summary and partition index. These structures also provide the partition offset in an SSTable which is then used to retrieve the partition and return. The caches are updated if present with the latest data read. 

Conclusion

Cassandra architecture is uniquely designed to provide scalability, reliability, and performance. It is based on distributed system architecture and operates on CAP theorem.  Cassandra’s unique architecture needs careful configuration and tuning. It is essential to understand the components in order to use Cassandra efficiently.

Contact us to get expert advice on managing and deploying Apache Cassandra.

The post A Comprehensive Guide to Cassandra Architecture appeared first on Instaclustr.

Dial C* for Operator - Creating a Cassandra Cluster with Cass Operator

In this post we are going to take a deep dive look at provisioning a Cassandra cluster using the DataStax Kubernetes operator for Cassandra, Cass Operator. We will set up a multi-rack cluster with each rack in a different availability zone.

For the examples, I will use a nine node, regional cluster in Google Kubernetes Engine (GKE) that is spread across three zones. Here is what my Kubernetes cluster looks like:

$ kubectl get nodes --label-columns failure-domain.beta.kubernetes.io/region,failure-domain.beta.kubernetes.io/zone | awk {'print $1" "$6" "$7'} | column -t
NAME                                     REGION    ZONE
gke-cass-dev-default-pool-3cab2f1f-3swp  us-east1  us-east1-d
gke-cass-dev-default-pool-3cab2f1f-408v  us-east1  us-east1-d
gke-cass-dev-default-pool-3cab2f1f-pv6v  us-east1  us-east1-d
gke-cass-dev-default-pool-63ec3f9d-5781  us-east1  us-east1-b
gke-cass-dev-default-pool-63ec3f9d-blrh  us-east1  us-east1-b
gke-cass-dev-default-pool-63ec3f9d-g4cb  us-east1  us-east1-b
gke-cass-dev-default-pool-b1ee1c3c-5th7  us-east1  us-east1-c
gke-cass-dev-default-pool-b1ee1c3c-ht20  us-east1  us-east1-c
gke-cass-dev-default-pool-b1ee1c3c-xp2v  us-east1  us-east1-c

Operator Concepts

Without getting into too much detail, I want to quickly cover some fundamental concepts for some of the things we will discuss in this post. Kubernetes is made up of controllers. A controller manages the state of one more Kubernetes resource types. The controller executes an infinite loop continually trying to converge the desired state of resources with their actual state. The controller watches for changes of interest in the Kubernetes cluster, i.e., a resource added, deleted, or updated. When there is a change, a key uniquely identifying the effected resource is added to a work queue. The controller eventually gets the key from the queue and begins whatever work is necessary.

Sometimes a controller has to perform potentially long-running operations like pulling an image from a remote registry. Rather than blocking until the operation completes, the controller usually requeues the key so that it can continue with other work while the operation completes in the background. When there is no more work to do for a resource, i.e. the desired state matches the actual state, the controller removes the key from the work queue.

An operator consists of one or more controllers that manage the state of one or more custom resources. Every controller has a Reconciler object that implements a reconcile loop. The reconcile loop is passed a request, which is the resource key.

A Few Words on Terminology

A Kubernetes worker node, Kubernetes worker, or worker node is a machine that runs services necessary to run and manage pods. These services include:

  • kubelet
  • kube-proxy
  • container runtime, e.g., Docker

A Cassandra node is the process running in a container.

A Cassandra container is the container, i.e., Docker container, in which the Cassandra node is running.

A Cassandra pod is a Kubernetes pod that includes one more containers. One of those containers is running the Cassandra node.

Installing the Operator

Apply the cass-operator-manifests.yaml manifests as follows:

$ kubectl create -f https://raw.githubusercontent.com/datastax/cass-operator/b96bfd77775b5ba909bd9172834b4a56ef15c319/docs/user/cass-operator-manifests.yaml
namespace/cass-operator created
serviceaccount/cass-operator created
secret/cass-operator-webhook-config created
customresourcedefinition.apiextensions.k8s.io/cassandradatacenters.cassandra.datastax.com created
clusterrole.rbac.authorization.k8s.io/cass-operator-cluster-role created
clusterrolebinding.rbac.authorization.k8s.io/cass-operator created
role.rbac.authorization.k8s.io/cass-operator created
rolebinding.rbac.authorization.k8s.io/cass-operator created
service/cassandradatacenter-webhook-service created
deployment.apps/cass-operator created
validatingwebhookconfiguration.admissionregistration.k8s.io/cassandradatacenter-webhook-registration created

Note: The operator is deployed in the cass-operator namespace.

Make sure that the operator has deployed successfully. You should see output similar to this:

$ kubectl -n cass-operator get deployments
NAME            READY   UP-TO-DATE   AVAILABLE   AGE
cass-operator   1/1     1            1           2m8s

Create a Storage Class

We need to create a StorageClass that is suitable for Cassandra. Place the following in a file named server-storageclass.yaml:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: server
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-ssd
  replication-type: none
volumeBindingMode: WaitForFirstConsumer
reclaimPolicy: Delete

One thing thing to note here is volumeBindingMode: WaitForFirstConsumer. The default value is Immediate and should not be used. It can prevent Cassandra pods from being scheduled on a worker node. If a pod fails to run and its status reports a message like, had volume node affinity conflict, then check the volumeBindingMode of the StorageClass being used. See Topology-Aware Volume Provisioning in Kubernetes for more details.

Create the StorageClass with:

$ kubectl -n cass-operator apply -f server-storageclass.yaml
storageclass.storage.k8s.io/server-storage created

The Spec

Most Kubernetes resources define spec and status properties. The spec declares the desired state of a resource which includes configuration settings provided by the user, default values expanded by the system, and other properties initialized by other internal components after resource creation. We will talk about the status in a little bit.

The manifest below declares a CassandraDatacenter custom resource. It does not include all possible properties. It includes the minimum necessary to create a multi-zone cluster.

apiVersion: cassandra.datastax.com/v1beta1
kind: CassandraDatacenter
metadata:
  name: multi-rack
spec:
  clusterName: multi-rack
  serverType: cassandra
  serverVersion: 3.11.6
  managementApiAuth:
    insecure: {}
  size: 9
  racks:
  - name: us-east1-b
    zone: us-east1-b
  - name: us-east1-c
    zone: us-east1-c
  - name: us-east1-d
    zone: us-east1-d    
  storageConfig:
    cassandraDataVolumeClaimSpec:
      storageClassName: standard
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 5Gi

This spec declares a single Cassandra datacenter. Cass Operator does support multi-DC clusters. It requires creating a separate CassandraDatacenter for each datacenter. Discussion of multi-DC clusters is outside the scope of this post.

The size property specifies the total number of Cassandra nodes in the datacenter.

racks is an array of Rack objects which consist of name and zone properties. The zone should be the name of a zone in GCP (or an AWS zone if the cluster was running in AWS for example). The operator will use this to pin Cassandra pods to Kubernetes workers in the zone. More on this later.

Create the CassandraDatacenter

Put the above manifest in a file named multi-rack-cassdc.yaml and then run:

$ kubectl -n cass-operator apply -f multi-rack-cassdc.yaml
cassandradatacenter.cassandra.datastax.com/multi-rack created

This creates a CassandraDatacenter object named multi-rack in the Kubernetes API server. The API server provides a REST API with which clients, like kubectl, interact. The API server maintains state in etcd. Creating a Kubernetes resource ultimately means persisting state in etcd. When the CassandraDatacenter object is persisted, the API server notifies any clients watching for changes, namely Cass Operator. From here the operator takes over. The new object is added to the operator’s internal work queue. The job of the operator is to make sure the desired state, i.e., the spec, matches the actual state of the CassandraDatacenter.

Now that we have created the CassandraDatacenter, it is time to focus our attention on what Cass Operator is doing to build the Cassandra cluster.

Monitor the Progress

We will look at a couple things to monitor the progress of the provisioning or scaling up of the cluster:

  • Changes in the status of the CassandraDatacenter
  • Kubernetes events emitted by the operator

We have already discussed that the spec describes a resource’s desired state. The status on the other hand, describes the object’s current, observed state. Earlier I mentioned that the Kubernetes API server provides a REST API to clients. A Kubernetes object or resource is a REST resource. The status of a Kubernetes resource is typically implemented as a REST subresource that can only be modified by internal, system components. In the case of a CassandraDatacenter, Cass Operator manages the status property.

An event is a Kubernetes resource that is created when objects like pods change state, or when an error occurs. Like other resources, events get stored in the API server. Cass Operator generates a number of events for a CassandraDatacenter.

Understandng both the changes in a CassandraDatacenter’s status and the events emitted by the operator provide valuable insight into what is actually happening during the provisioning process. That understanding also makes it easier to resolve issues when things go wrong. This applies not only to CassandraDatacenter, but also to other Kubernetes resource as well.

Monitor Status Updates

We can watch for changes in the status with:

$ kubectl -n cass-operator get -w cassdc multi-rack -o yaml

In the following sections we will discuss each of the status updates that occur while the operator works to create the Cassandra cluster.

Initial Status Update

Here is what the status looks like initially after creating the CassandraDatacenter:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  nodeStatuses: {}

cassandraOperatorProgress can have one of two values, Ready or Updating. It will change to Ready when the operator has no more work to do for the resource. This simple detail is really important, particularly if you are performing any automation with Cass Operator. For example, I have used Cassandra operators to provision clusters for integration tests. With Cass Operator my test setup code could simply poll cassandraOperatorProgress to know when the cluster is ready.

conditions is an array of DatacenterCondition objects. A lot of Kubernetes resources use conditions in their statuses. Conditions represent the latest observations of an object’s state. They should minimally include type and status fields. The status field can have as its value either True, False, or Unknown. lastTransitionTime is the time the condition transitioned from one status to another. type identifies the condition. CassandraDatacenter currently has the following condition types:

  • Ready
  • Initialized
  • ReplacingNodes
  • ScalingUp
  • Updating
  • Stopped
  • Resuming
  • RollingRestart

Implementing, understanding, and using conditions are often points of confusion. It is intuitive to think of and model a resource’s state as a state machine. Reminding yourself conditions are observations and not a state machine will go a long way in avoiding some of that confusion. It is worth noting there has been a lot of debate in the Kubernetes community about whether conditions should be removed. Some of the latest discussions in this ticket indicate that they are will remain.

lastRollingRestart is only updated when a rolling restart is explicitly requested. As we will see its value will remain unchanged, and therefore we will be ignoring it for this post.

nodeStatuses is a map that provides some details for each node. We will see it get updated as nodes are deployed.

Cassandra Node Starting

With the next update we see that a lastServerNodeStarted property has been added to the status:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:41:24Z"
  nodeStatuses: {}

lastServerNodeStarted gets updated when a Cassandra node is starting up. The operator also adds the label cassandra.datastax.com/node-state: Starting to the Cassandra pod. The astute reader may have noted that I said the lastServerNodeStarted is updated when Cassandra node is starting up rather than when the pod is starting up. For Cass Operator, there is a important distinction between the Cassandra node and the Cassandra container. The Cassandra Container section at the end of the post goes over this in some detail.

Cassandra Node Started

In the next update lastServerNodeStarted is modified and another entry is added to nodeStatuses:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:41:50Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5

The entry is keyed by the pod name, multi-rack-multi-rack-us-east1-b-sts-2. The value consists of two fields - the node’s host id and its IP address.

When Cass Operator determines the Cassandra node is up running, it updates the node-state label to cassandra.datastax.com/node-state: Started. After the label update, the operator uses a label selector query to see which pods have been started and are running. When the operator finds another node running, its host ID and IP address will be added to nodeStatuses.

Remaining Nodes Started

In this section we follow the progression of the rest of the Cassandra cluster being started. lastServerNodeStarted is changed with each of these status updates in addition to nodeStatuses being updated.

multi-rack-multi-rack-us-east1-c-sts-0 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:42:49Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3

Next, multi-rack-multi-rack-us-east1-d-sts-0 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:43:53Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4

Next, multi-rack-multi-rack-us-east1-c-sts-2 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:44:54Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4

Next, multi-rack-multi-rack-us-east1-d-sts-0 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:45:50Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3

With five out of the nine nodes started, now is a good time point out a couple things. First, we see one node at a time is added to nodeStatuses. Based on this it stands to reason Cass Operator is starting nodes serially. That is precisely what is happening.

Secondly, there is roughly a minute difference between the values of lastServerNodeStarted in each status update. It is taking about a minute or so to start each node, which means it should take somewhere between nine and ten minutes for the cluster to be ready. These times will almost certainly vary depending on a number of factors like the type of disks used, the machine type, etc. It is helpful though, particularly for larger cluster sizes, to be able to gauge how long it will take to get the entire cluster up and running.

Next, multi-rack-multi-rack-us-east1-d-sts-2 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:46:51Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Next, multi-rack-multi-rack-us-east1-b-sts- is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:00Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Next, multi-rack-multi-rack-us-east1-c-sts-1 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Finally, multi-rack-multi-rack-us-east1-b-sts-1 is started:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4

Although all nine nodes are now started, the operator still has more work to do. This is evident based on the ScalingUp condition still being True and cassandraOperatorProgress still having a value of Updating.

Cassandra Super User Created

With the next update the superUserUpserted property is added to the status:

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:40:51Z"
    status: "True"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

superUserUpserted is the timestamp at which the operator creates a super user in Cassandra. We will explore this in a little more detail when we go through the events.

ScalingUp Transition

In this update the ScalingUp condition transitions to False. This condition changes only after all nodes have been started and after the super user has been created.

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "False"
    type: ScalingUp
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

Add Initialized and Ready Conditions

Next, the operator adds the Initialized and Ready conditions to the status. Initialized means the CassandraDatacenter was successfully created. The transition for this condition should only happen once. Ready means the cluster can start serving client requests. The Ready condition will remain True during a rolling restart for example but will transition to False when all nodes are stopped. See The Cassandra Container section at the end of the post for more details on starting and stopping Cassandra nodes.

status:
  cassandraOperatorProgress: Updating
  conditions:
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "False"
    type: ScalingUp
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Initialized
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Ready
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

Final Status Update

In the last update, the value of cassandraOperatorProgress is changed to Ready:

status:
  cassandraOperatorProgress: Ready
  conditions:
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "False"
    type: ScalingUp
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Initialized
  - lastTransitionTime: "2020-05-06T16:49:55Z"
    status: "True"
    type: Ready
  lastRollingRestart: "2020-05-06T16:40:51Z"
  lastServerNodeStarted: "2020-05-06T16:48:57Z"
  nodeStatuses:
    multi-rack-multi-rack-us-east1-b-sts-0:
      hostID: 3b1b60e0-62c6-47fb-93ff-3d164825035a
      nodeIP: 10.32.1.4
    multi-rack-multi-rack-us-east1-b-sts-1:
      hostID: d7246bca-ae64-45ec-8533-7c3a2540b5ef
      nodeIP: 10.32.2.6
    multi-rack-multi-rack-us-east1-b-sts-2:
      hostID: 62399b3b-80f0-42f2-9930-6c4f2477c9bd
      nodeIP: 10.32.0.5
    multi-rack-multi-rack-us-east1-c-sts-0:
      hostID: dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76
      nodeIP: 10.32.6.3
    multi-rack-multi-rack-us-east1-c-sts-1:
      hostID: a55082ba-0692-4ee9-97a2-a1bb16383d31
      nodeIP: 10.32.7.6
    multi-rack-multi-rack-us-east1-c-sts-2:
      hostID: facbbaa0-ffa7-403c-b323-e83e4cab8756
      nodeIP: 10.32.8.5
    multi-rack-multi-rack-us-east1-d-sts-0:
      hostID: c7e43757-92ee-4ca3-adaa-46a128045d4d
      nodeIP: 10.32.4.4
    multi-rack-multi-rack-us-east1-d-sts-1:
      hostID: 785e30ca-5772-4a57-b4bc-4bd7b3b24ebf
      nodeIP: 10.32.3.3
    multi-rack-multi-rack-us-east1-d-sts-2:
      hostID: 8e8733ab-6f7b-4102-946d-c855adaabe49
      nodeIP: 10.32.5.4
  superUserUpserted: "2020-05-06T16:49:55Z"

We now know the operator has completed its work to scale up the cluster. We also know the cluster is initialized and ready for use. Let’s verify the desired state of the CassandraDatacenter matches actual state. We can do this with nodetool status and kubectl get nodes.

$ kubectl -n cass-operator exec -it multi-rack-multi-rack-us-east1-b-sts-0 -c cassandra -- nodetool status
Datacenter: multi-rack
======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  10.32.4.4  84.43 KiB  1            4.8%              c7e43757-92ee-4ca3-adaa-46a128045d4d  us-east1-d
UN  10.32.1.4  70.2 KiB   1            7.4%              3b1b60e0-62c6-47fb-93ff-3d164825035a  us-east1-b
UN  10.32.6.3  65.36 KiB  1            32.5%             dfd6ebfb-2e2c-4f7a-92f8-9fe60fb24e76  us-east1-c
UN  10.32.3.3  103.54 KiB  1            34.0%             785e30ca-5772-4a57-b4bc-4bd7b3b24ebf  us-east1-d
UN  10.32.7.6  70.34 KiB  1            18.1%             a55082ba-0692-4ee9-97a2-a1bb16383d31  us-east1-c
UN  10.32.8.5  65.36 KiB  1            19.8%             facbbaa0-ffa7-403c-b323-e83e4cab8756  us-east1-c
UN  10.32.2.6  65.36 KiB  1            36.5%             d7246bca-ae64-45ec-8533-7c3a2540b5ef  us-east1-b
UN  10.32.0.5  65.36 KiB  1            39.9%             62399b3b-80f0-42f2-9930-6c4f2477c9bd  us-east1-b
UN  10.32.5.4  65.36 KiB  1            7.0%              8e8733ab-6f7b-4102-946d-c855adaabe49  us-east1-d

nodetool status reports nine nodes up across three racks. That looks good. Now let’s verify the pods are running where we expect them to be.

$ kubectl -n cass-operator get pods -l "cassandra.datastax.com/cluster=multi-rack" -o wide | awk {'print $1" "$7'} | column -t
NAME                                    NODE
multi-rack-multi-rack-us-east1-b-sts-0  gke-cass-dev-default-pool-63ec3f9d-5781
multi-rack-multi-rack-us-east1-b-sts-1  gke-cass-dev-default-pool-63ec3f9d-blrh
multi-rack-multi-rack-us-east1-b-sts-2  gke-cass-dev-default-pool-63ec3f9d-g4cb
multi-rack-multi-rack-us-east1-c-sts-0  gke-cass-dev-default-pool-b1ee1c3c-5th7
multi-rack-multi-rack-us-east1-c-sts-1  gke-cass-dev-default-pool-b1ee1c3c-ht20
multi-rack-multi-rack-us-east1-c-sts-2  gke-cass-dev-default-pool-b1ee1c3c-xp2v
multi-rack-multi-rack-us-east1-d-sts-0  gke-cass-dev-default-pool-3cab2f1f-3swp
multi-rack-multi-rack-us-east1-d-sts-1  gke-cass-dev-default-pool-3cab2f1f-408v
multi-rack-multi-rack-us-east1-d-sts-2  gke-cass-dev-default-pool-3cab2f1f-pv6v

Look carefully at the output, and you will see each pod is in fact running on a separate worker node. Furthermore, the pods are running on worker nodes in the expected zones.

Monitor Events

The operator reports a number of events useful for monitoring and debugging the provisioning process. As we will see, the events provide additional insights absent from the status updates alone.

There are some nuances with events that can make working with them a bit difficult. First, events are persisted with a TTL. They expire after one hour. Secondly, events can be listed out of order. The ordering appears to be done on the client side with a sort on the Age column. We will go through the events in the order in which the operator generates them. Lastly, while working on this post, I discovered that some events can get dropped. I created this ticket to investigate the issue. Kubernetes has in place some throttling mechanisms to prevent the system from getting overwhelmed by too many events. We won’t go through every single event as there are a lot. We will however cover enough, including some that may be dropped, in order to get an overall sense of what is going on.

We can list all of the events for the CassandraDatacenter with the describe command as follows:

$ kubectl -n cass-operator describe cassdc multi-rack
Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-b
  Normal  CreatedResource    12m    cass-operator  Created service multi-rack-seed-service
  Normal  CreatedResource    12m    cass-operator  Created service multi-rack-multi-rack-all-pods-service
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-b-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-c-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-d-sts
  Normal  CreatedResource    12m    cass-operator  Created service multi-rack-multi-rack-service
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-c
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-d
  Normal  LabeledPodAsSeed   12m    cass-operator  Labeled pod a seed node multi-rack-multi-rack-us-east1-b-sts-2
  Normal  StartingCassandra  12m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2
  Normal  StartedCassandra   11m    cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2
  Normal  StartingCassandra  11m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-0
  Normal  StartingCassandra  10m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-0
  Normal  StartedCassandra   10m    cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-0
  Normal  LabeledPodAsSeed   10m    cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-c-sts-0
  Normal  LabeledPodAsSeed   9m44s  cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-d-sts-0
  Normal  StartedCassandra   9m43s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-0
  Normal  StartingCassandra  9m43s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-2
  Normal  StartedCassandra   8m43s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-c-sts-2
  Normal  StartingCassandra  8m43s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  Normal  StartedCassandra   7m47s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  Normal  StartingCassandra  7m46s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-2
  Normal  StartedCassandra   6m45s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-2
  Normal  StartingCassandra  6m45s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-0
  Normal  LabeledPodAsSeed   5m36s  cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-b-sts-0

In the following sections we will go through several of these events as well as some that are missing.

Create Headless Services

The first thing that Cass Operator does during the initial reconciliation loop is create a few headless services:

  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedResource    10m    cass-operator  Created service multi-rack-seed-service
  Normal  CreatedResource    10m    cass-operator  Created service multi-rack-multi-rack-all-pods-service
  Normal  CreatedResource    10m    cass-operator  Created service multi-rack-multi-rack-service    

multi-rack-seed-service exposes all pods running seed nodes. This service is used by Cassandra to configure seed nodes.

multi-rack-multi-rack-all-pods-service exposes all pods that are part of the CassandraDatacenter, regardless of whether they are ready. It is used to scrape metrics with Prometheus.

multi-rack-multi-rack-service exposes ready pods. CQL clients should use this service to establish connections to the cluster.

Create StatefulSets

Next the operator creates three StatefulSets, one for each rack:

  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-b-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-c-sts
  Normal  CreatedResource    12m    cass-operator  Created statefulset multi-rack-multi-rack-us-east1-d-sts

I mentioned earlier the operator will use the zone property specified for each rack to pin pods to Kubernetes workers in the respective zones. The operator uses affinity rules to accomplish this.

Let’s take a look at the spec for multi-rack-multi-rack-us-east1-c-sts to see how this is accomplished:

$ kubectl -n cass-operator get sts multi-rack-multi-rack-us-east1-c-sts -o yaml
...
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: failure-domain.beta.kubernetes.io/zone
                operator: In
                values:
                - us-east1-c
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchExpressions:
              - key: cassandra.datastax.com/cluster
                operator: Exists
              - key: cassandra.datastax.com/datacenter
                operator: Exists
              - key: cassandra.datastax.com/rack
                operator: Exists
            topologyKey: kubernetes.io/hostname
...            

The nodeAffinity property constrains the worker nodes on which pods in the StatefulSet can be scheduled. requiredDuringSchedulingIgnoredDuringExecution is a NodeSelector which basically declares a query based on labels. In this case, if a node has the label failure-domain.beta.kubernetes.io/zone with a value of us-east1-c, then pods can be scheduled on that node.

Note: failure-domain.beta.kubernetes.io/zone is one of a number of well known labels that are used by the Kubernetes runtime.

I added emphasis to can be because of the podAntiAffinity property that is declared. It constrains the worker nodes on which the pods can be scheduled based on the labels of pods currently running on the nodes. The requiredDuringSchedulingIgnoredDuringExecution property is a PodAffinityTerm that defines labels that determine which pods cannot be co-located on a particular host. In short, this prevents pods from being scheduled on any node on which pods from a CassandraDatacenter are already running. In other words, no two Cassandra nodes should be running on the same Kubernetes worker node.

Note: You can run multiple Cassandra pods on a single worker node by setting .spec.allowMultipleNodesPerWorker to true.

Scale up the Racks

The next events involve scaling up the racks:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-b
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-c
  Normal  ScalingUpRack      12m    cass-operator  Scaling up rack us-east1-d

The StatefulSets are initially created with zero replicas. They are subsequently scaled up to the desired replica count, which is three (per StatefulSet) in this case.

Label the First Seed Node Pod

After the StatefulSet controller starts creating pods, Cass Operator applies the following label to a pod to designate it as a Cassandra seed node:

cassandra.datastax.com/seed-node: "true"

At this stage in the provisioning process, no pods have the seed-node label. The following event indicates that the operator designates the pod to be a seed node:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  LabeledPodAsSeed   12m    cass-operator  Labeled pod a seed node multi-rack-multi-rack-us-east1-b-sts-2   

Note: You can use a label selector to query for all seed node pods, e.g., kubectl -n cass-operator get pods -l cassandra.datastax.com/seed-node="true".

Start the First Seed Node

Next the operator starts the first seed node:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  StartingCassandra  12m    cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2

The operator applies the label cassandra.datastax.com/node-state: Starting to the pod. The operator then requeues the request with a short delay, allowing time for the Cassandra node to start. Requeuing the request ends the current reconciliation.

If you are familiar with Kubernetes, this step of starting the Cassandra node may seem counter-intuitive because pods/containers cannot exist in a stopped state. See The Cassandra Container section at the end of the post for more information.

Update Status of First Seed Node Pod

In a subsequent reconciliation loop the operator finds that multi-rack-multi-rack-us-east1-b-sts-2 has been started and records the following event:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  StartedCassandra   11m    cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-b-sts-2

Then the cassandra.datastax.com/node-state label is updated to a value of Started to indicate the Cassandra node is now running. The event is recorded and the labeled is updated only when the Cassandra container’s readiness probe passes. If the readiness probe fails, the operator will requeue the request, ending the current reconciliation loop.

Start One Node Per Rack

After the first node, multi-rack-multi-rack-us-east1-b-sts-2, is running, the operator makes sure there is a node per rack running. Here is the sequence of events for a given node:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  StartingCassandra  8m43s  cass-operator  Starting Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  
  Normal  StartedCassandra   7m47s  cass-operator  Started Cassandra for pod multi-rack-multi-rack-us-east1-d-sts-1
  
 Normal  LabeledPodAsSeed   9m44s  cass-operator  Labeled as seed node pod multi-rack-multi-rack-us-east1-d-sts-1 

Let’s break down what is happening here.

  • The cassandra.datastax.com/node-state: Starting label is applied to multi-rack-multi-rack-us-east1-d-sts-1
  • Cassandra is started
  • The request is requeued
  • On a subsequent reconciliation loop when Cassandra is running (as determined by the readiness probe), two things happen
    • The cassandra.datastax.com/seed-node="true" label is applied to the pod, making it a seed node
    • The cassandra.datastax.com/node-state label is updated to a value of Started

The operator will repeat this process for another rack which does not yet have a node running.

Now is a good time to discuss how the operator determines how many seeds there should be in total for the datacenter as well as how many seeds there should be per rack.

If the datacenter consists of only one or two nodes, then there will be one or two seeds respectively. If there are more than three racks, then the number of seeds will be set to the number of racks. If neither of those conditions hold, then there will be three seeds.

The seeds per rack are calculated as follows:

seedsPerRack = totalSeedCount / numRacks
extraSeeds = totalSeedCount % numRacks

For the example cluster in this post totalSeedCount will be three. Then seedsPersRack will be one, and extraSeeds will be zero.

Start Remaining Nodes

After we have a Cassandra node up and running in each rack, the operator proceeds to start the remaining non-seed nodes. I will skip over listing events here because they are the same as the previous ones. At this point the operator iterates over the pods without worrying about the racks. For each pod in which Cassandra is not already running, it will start Cassandra following the same process previously described.

Create a PodDisruptionBudget

After all Cassandra nodes have been started, Cass Operator creates a PodDisruptionBudget. It generates an event like this:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedResource    10m6s  cass-operator  Created PodDisruptionBudget multi-rack-pdb

Note: This is one of the dropped events.

A PodDisruptionBudget limits the number of pods that can be down from a voluntary disruption. Examples of voluntary disruptions include accidentally deleting a pod or draining a worker node for upgrade or repair.

All Cassandra pods in the CassandraDatacenter are managed by the disruption budget. When creating the PodDisruptionBudget, Cass Operator sets the .spec.minAvailable property. This specifies the number of pods that must be available after a pod eviction. Cass Operator sets this to the total number of Cassandra nodes minus one.

Create a Cassandra Super User

The final thing that Cass Operator does is to create a super user in Cassandra:

Events:
  Type    Reason             Age    From           Message
  ----    ------             ----   ----           -------
  Normal  CreatedSuperuser   10m6s  cass-operator  Created superuser

Earlier in the provisioning process, Cass Operator creates the super user credentials and stores them in a secret. The secret name can be specified by setting .spec.superuserSecretName.

The username is set <.spec.clusterName>-superuser which will be multi-rack-superuser for our example. The password is a random UTF-8 string less than or equal to 55 characters.

Note: Cass Operator disables the default super user, cassandra.

The Cassandra Container

Each Cassandra pod runs a container named cassandra. We need to talk about sidecars before we talk about the cassandra container. The sidecar pattern is a very well-known and used architectural pattern in Kubernetes. A pod consists of one or more containers. The containers in a pod share the same volume and network interfaces. Examples for sidecars include things like log aggregation, gRPC proxy, and backup / restore to name a few. Cass Operator utilizes the sidecar pattern but in a more unconventional manner.

We can take a look at the spec of one of the Cassandra pods to learn more about the cassandra container. Because we are only focused on this one part, most of the output is omitted.

$ kubectl -n cass-operator get pod multi-rack-multi-rack-us-east1-b-sts-0 -o yaml
apiVersion: v1
kind: Pod
...
spec:
...
      containers:
      - env:
        - name: DS_LICENSE
          value: accept
        - name: DSE_AUTO_CONF_OFF
          value: all
        - name: USE_MGMT_API
          value: "true"
        - name: MGMT_API_EXPLICIT_START
          value: "true"
        - name: DSE_MGMT_EXPLICIT_START
          value: "true"
        image: datastax/cassandra-mgmtapi-3_11_6:v0.1.0
        imagePullPolicy: IfNotPresent
        livenessProbe:
          failureThreshold: 3
          httpGet:
            path: /api/v0/probes/liveness
            port: 8080
            scheme: HTTP
          initialDelaySeconds: 15
          periodSeconds: 15
          successThreshold: 1
          timeoutSeconds: 1
        name: cassandra
        ports:
        - containerPort: 9042
          name: native
          protocol: TCP
        - containerPort: 8609
          name: inter-node-msg
          protocol: TCP
        - containerPort: 7000
          name: intra-node
          protocol: TCP
        - containerPort: 7001
          name: tls-intra-node
          protocol: TCP
        - containerPort: 8080
          name: mgmt-api-http
          protocol: TCP
        readinessProbe:
          failureThreshold: 3
          httpGet:
            path: /api/v0/probes/readiness
            port: 8080
            scheme: HTTP
          initialDelaySeconds: 20
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 1
        resources: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /config
          name: server-config
        - mountPath: /var/log/cassandra
          name: server-logs
        - mountPath: /var/lib/cassandra
          name: server-data
...          

There are two lines in the output on which we want to focus. The first line is:

name: cassandra

This is the name of the container. There are other containers listed in the output, but we are only concerned with the cassandra one.

The second line that we are interested in is:

image: datastax/cassandra-mgmtapi-3_11_6:v0.1.0

The image property specifies the image that the cassandra container is running. This is different from the Cassandra images such as the ones found on Docker Hub. This image is for the Management API Sidecar. There have been lots of discussions on the Cassandra community mailing lists about management sidecars. In fact there is even a Cassandra Enhancement Proposal (CEP) for providing an official, community based sidecar. The Management API Sidecar, or management sidecar for short, was not designed specifically for Kubernetes.

The process started in the cassandra container is the management sidecar rather than the CassandraDaemon process. The sidecar is responsible for starting/stopping the node. In addition to providing lifecycle management, the sidecar also provides configuration management, health checks, and per node actions (i.e., nodetool).

There is plenty to more to say about the management sidecar, but that is for another post.

Wrapping Up

Hopefully this post gives you a better understanding of Cass Operator and Kubernetes in general. While we covered a lot of ground, there is plenty more to discuss like multi-DC clusters and the management sidecar. If you want to hear more about Cassandra and Kubernetes, Patrick McFadin put together a series of interviews where he talks to early adopters in the field. Check out “Why Tomorrow’s Cassandra Deployments Will Be on Kubernetes” It will be available for streams as a part of the DataStax Accelerate online conference https://dtsx.io/3ex1Eop.

Comparing stress tools for Apache Cassandra

Editors Note: The Last Pickle was recently acquired by DataStax and as part of the new DataStax mission of reorienting to embrace open source Apache Cassandra, this is the first in a series of blog posts that will compare new open source offerings, particularly those now coming out of the new DataStax. In open source spirit we want to embrace you, the community, in choosing the right tool for the right job.

Benchmarking and stress testing Apache Cassandra are important exercises that both operators and developers do regularly. Approaches come in numerous flavours, from “look how my code is better than everyone else’s”, to “what do I need to run this?” and “how much money will this save me?”, and my favourite, “when this dies a horrible death what will that look like?”.

Knowing what you want to achieve and how to interpret the results of these tests is a big part of the challenge. With a run through of these available tools, hopefully that will become easier.

Comparing stress tools for Apache Cassandra


This blog post will look at and compare the following three stress tools:


With these three tools we will step through a number of basic use cases:

  1. Just Generate Some Load
  2. Using a Real Cluster
  3. Iteration and Thread Counts
  4. Observability
  5. Batch sizes and Overwrites
  6. Predefined Workloads
  7. Custom Workloads
  8. Client Contention and Server Saturation

The versions of the tools used in these steps are 3.11.6 for cassandra-stress, 4.0.0 for tlp-stress, and 3.12.77 for nosqlbench.


1. Just Generate Some Load

Sometimes all you want to do is generate some load or data. This is good for when all we want is a cassandra node that is doing something. It can be just to raise the CPU, or to generate some commitlogs, memtables, or sstables on disk.

Each tool will generate a slightly different load configuration for these tests:

$ cassandra-stress write

      Performs over a million writes (after an initial 50k warmup writes) iterating a number of times increasing the number of threads used in the client, starting with four threads.

$ tlp-stress run BasicTimeSeries -i 1M

      Performs exactly one million requests with a 9:1 write-to-read ratio.

$ nb cql-iot write_cl=LOCAL_ONE

      Performs ten million writes during a warmup phase and then ten million requests with a 9:1 write-to-read ratio.

All of them execute writes connected to a localhost Cassandra node, using the java-driver and consistency level LOCAL_ONE.

There is a difference in the model, however, as cassandra-stress uses a simple key value table, while tlp-stress and nosqlbench are using time-series data models.


2. Using a Real Cluster

This repeats the exercise of just generating any load or data, but is used when you have an actual cluster you are targeting.

$ cassandra-stress write -node cassandra1

$ tlp-stress run BasicTimeSeries --host cassandra1

$ nb cql-iot host=cassandra1

Note: There is no need to specify multiple hosts with any of these stress tools. These are contact hosts that are passed to the java driver, and unlike a coded application where you would want multiple contact hosts specified for reliability during deployment and startup, with a stress tool invocation it is reasonable to expect the single contact host specified to be up and running.


3. Iteration and Thread Counts

The following shows how to specify the number of iterations and the number of threads to use, in each of the tools.

$ cassandra-stress write n=100000 -rate threads=16

$ tlp-stress run BasicTimeSeries -n 100k -t 16

$ nb cql-iot write_cl=LOCAL_ONE main-cycles=100k threads=16


4. Observability

Even with well-designed workloads there is a lot more to benchmarking than the final throughput numbers. We want to see how the cluster operates over time. This can be from spikes in traffic to the many background operations Cassandra can perform. Taking a closer look at how Cassandra performs helps plan for a healthy and stable cluster over a longer period of time than what we are able to benchmark.


cassandra-stress
$ cassandra-stress write -graph file=example-benchmark.html title=example revision=benchmark-0

      For more information on this, read our previous blog post on Cassandra-Stress and Graphs.  


nosqlbench
$ nb cql-iot write_cl=LOCAL_ONE --docker-metrics

      Then open http://localhost:3000 in your browser. Note this only works on linux and requires docker to be running on the host. For more info see here.

 

tlp-stress

      Note: tlp-stress has no similar observability feature, but does export Prometheus metrics on port 9501.

The out of the box generated graphs from cassandra-stress are a nice feature. For any serious benchmarking activity though you will want to have metrics from Cassandra graphed and to have insight into the stress tools behaviour beyond just performance numbers.


5. Batch sizes and Overwrites

The following invocation is of particular interest because it has been a pain for those using cassandra-stress. In Cassandra, unlogged batches are not normal and not recommended unless for very small groupings (10-20) of rows within the one partition.

cassandra-stress, by default, puts all writes for any partition into single batches, which makes for poor and unrealistic results. It is impossible to get cassandra-stress to not use batches, and quite convoluted to get it to write batches that consist only of single inserts. More info on this can be read in this ticket CASSANDRA-11105

Overwrite and deletes are not something we see a lot of among published Cassandra benchmarks because its harder to implement. Often this makes sense as workloads like key-value and time-series are likely not overwrite data models. Yet, there are plenty of data models out there that do require these patterns and that we would like to benchmark.


cassandra-stress

      First download batch_too_large.yaml.

$ cassandra-stress user profile=batch_too_large.yaml ops\(insert=1\) -insert visits=FIXED\(10M\)  


tlp-stress

      tlp-stress does not perform unlogged batches by default like cassandra-stress. If unlogged batches are desired you need to write your own workload, see the Custom Workloads section.

      tlp-stress does make deletes very easy, treating them in a similar fashion to the read rate flag. This will make 10% of the operations deletes of previously written data

$ tlp-stress run KeyValue --deletes 0.1

      tlp-stress does overwrites in a similar way to cassandra-stress. This will write 100k operations over 100 partitions. Without clustering keys, this is roughly 1k overwrites on each partition

$ tlp-stress run KeyValue -p 100 -n 100k  


nosqlbench

      nosqlbench can handle overwrites in the same manner as cassandra-stress and tlp-stress by providing a smaller partition count than the iteration count. nosqlbench does not currently provide any deletes or unlogged batch examples. Logged batches have been implemented with custom workloads, so deletes and unlogged batches are probably possible with a custom implemented workload.  


6. Predefined Workloads

cassandra-stress

      cassandra-stress does not have built in workloads. You need to specify the user mode and supply your own configuration as shown in the next section.  


tlp-stress

      tlp-stress has the most extensive list of workloads. These workloads have been used at TLP to demonstrate real limitations with certain features and to provide a hands on approach to recommending the best production solutions.

$ tlp-stress list

    Available Workloads:

    AllowFiltering
    BasicTimeSeries
    CountersWide
    KeyValue
    LWT
    Locking
    Maps
    MaterializedViews
    RandomPartitionAccess
    Sets
    UdtTimeSeries

$ tlp-stress run CountersWide  


nosqlbench

      nosqlbench lists the workloads from its predefined yaml workload files. Within these workloads it lists the different phases that are used, and that can be combined. This offers us our first glimpse of how complex and specific a workload can be defined. It also lists the sequences workload, which is not based on the cql driver.

$ nb --list-workloads

    from: cql-keyvalue.yaml
        …
    from: cql-iot.yaml
        …
    from: cql-iot-dse.yaml
        …
    from: cql-tabular.yaml
        …
    from: sequences.yaml
        …


$ nb cql-tabular  


7. Custom Workloads

A benchmark that is part of a feasibility or capacity planning exercise for production environments will nearly always require a custom defined workload.  


cassandra-stress

      For cassandra-stress an example of this was done for the Zipkin project. cassandra-stress can not benchmark more than one table at a time, so there is a separate workload yaml for each table and these have to run as separate invocations. Here we see that cassandra-stress does not support Zipkin’s original schema, specifically UDTs and collections, so the folder above also contains some cql files to create a schema we can stress.

      Create the zipkin test schema

cqlsh -f zipkin2-test-schema.cql

      Fill this schema with some data, throttle as appropriate

$ cassandra-stress  user profile=span-stress.yaml ops\(insert=1\) no-warmup duration=1m  -rate threads=4 throttle=50/s

      Now benchmark a mixed read and write workload, again throttle as appropriate

$ cassandra-stress  user profile=span-stress.yaml ops\(insert=1,by_trace=1,by_trace_ts_id=1,by_annotation=1\)  duration=1m  -rate threads=4 throttle=50/s  

As can be seen above, creating custom workloads in cassandra-stress has always been an involved and difficult experience. While tlp-stress and nosqlbench improve on this situation, they each do so in different ways.


nosqlbench

      nosqlbench provides all of its workload configurations via yaml files. Getting the hang of these will be quite daunting for the newcomer, but along with the documentation provided, and practicing first with taking and tweaking the predefined workloads, there’s a wealth of possibility here.  


tlp-stress

      tlp-stress on the other hand focuses on writing workloads in the code. tlp-stress is written in Kotlin, so if you find Kotlin enjoyable you will find it quick and intuitive to write workloads. The existing workloads can be found here, take a peek and you will see that they are quite simple to write.  


8. Client Contention and Server Saturation

Which benchmark tool is faster? That may sound like a weird question, but it opens some real concerns. Not just in choosing what hardware to run the client on, or how many clients are required, but to know when the results you are getting are nonsense. Understanding the load you want to generate versus what you need to measure is as important to benchmarking as the workload.

It is important to avoid saturating the server. Any benchmark that pushes throughput to its limit is meaningless. A real world (and overly simplified) comparison of this is in OLAP clusters, like those paired with Apache Spark, where without appropriate thresholds put onto the spark-cassandra-connector you can get a yo-yo effect on throughput as the cluster saturates, jams up, and then accepts writes again. With tuning and throughput thresholds put into place, higher throughput is sustainable over time. Responsiveness Under Load (RUL) benchmark is where we apply such throughput limits and observe responsiveness instead.

These problems extend into the client stress tool as well. Unlike the server that can block or load-shed at the defined throughput threshold, the client’s throughput of operations can be either limited or scheduled. This difference can be important, but explaining it goes beyond this blog post. For those interested I’d recommend reading this post on Fixing Coordinated Omission in Cassandra Stress.  


cassandra-stress
$ cassandra-stress write -rate threads=4 fixed=50/s  


nosqlbench

      nosqlbench has no scheduler per se, but deals with reducing coordinated omission via asynchronous requests and a non-fixed thread count. More information on nosqlbench’s timing terminology can be found here.

$ nb cql-iot cyclerate=50 async=256 threads=auto

      Very few production clusters ever demonstrate constant throughput like this, so benchmarking bursts is a real thing. Currently only nosqlbench does this in-process.

$ nb cql-iot cyclerate=50,1.5 async=256 threads=auto

      This specifies a rate threshold of 50 operations per second, with bursts of up to 50%. More information on bursts is available here


tlp-stress

      tlp-stress does not deal with Coordinated Omission. Its --rate flag relies on google’s RateLimiter and limits the throughput, but does not schedule.  


Documentation

Looking through the documentation for each of the tools it is easy to see that nosqlbench offers substantially more. But tlp-stress docs are elegant and easy for the beginner, though they are still missing information on how to implement your own workload (or profile as tlp-stress refers to them).


Wrap Up

cassandra-stress is an advanced tool for very narrow applications against Cassandra. It is quickly a clumsy user-experience and often requires adventures into some awkward code to understand and get things working as expected.

tlp-stress was written as a replacement to cassandra-stress. Apart from not dealing with Coordinated Omission it succeeds in that goal in every aspect: good documentation, a rich command-line user-interface, and is an easy code to understand and contribute to.

nosqlbench takes the next step, aiming to be a YCSB replacement. It feels like a power-user’s tool and comes with the features and capacity to earn that title. Expect to see more and more workloads be made available for testing lots of different technologies in the NoSQL world.

An Introduction to Cassandra Multi-Data Centers: Part 2

In this second blog of  “Around the World in (Approximately) 8 Data Centers” series we catch our first mode of transportation (Cassandra) and explore how it works to get us started on our journey to multiple destinations (Data Centers).

1. What Is a (Cassandra) Data Center?

What does a Data Center (DC) look like? Here are some cool examples of DCs (old and new)!

Arguably the first “electronic” data center was ENIAC, circa 1946. It was, however, just a single (albeit monster) machine! It weighed more than 30 tonnes, occupied 1,800 square feet, consumed 200kW of power, got up to 50C inside, and was rumoured to cause blackouts in neighboring Philadelphia when it was switched on!

first electronic data centre

Jumping forward to the present, a photo of Google DC shows mainly cooling pipes. In common with ENIAC, power and heat are still a feature of modern DCs:

Google data centre

Google Data Center (plumbing)

So what is a Cassandra Data Center? Ever since I started using Cassandra I’ve been puzzled about Cassandra Data Centers (DCs). When you create a keyspace you typically also specify a DC name, for example:

CREATE KEYSPACE here
    WITH replication = {'class': 'NetworkTopologyStrategy', ‘DCHere’ : 3};

The NetworkTopologyStrategy is a production ready replication strategy that allows you to have an explicit DC name. But why do you need an explicit DC name? The reason is that you can actually have more than one data center in a Cassandra Cluster, and each DC can have a different replication factor, for example, here’s an example with two DCs:

CREATE KEYSPACE here_and_there
    WITH replication = {'class': 'NetworkTopologyStrategy', ‘DCHere’ : 3,  ‘DCThere' : 3};

So what does having multiple DCs achieve? Powerful automatic global replication of data! Essentially you can easily create a globally distributed Cassandra cluster where data written to the local DC is asynchronously replicated to all the other DCs in the keyspace.  You can have multiple keyspaces with different DCs and replication factors depending on how many copies and where you want your data replicated to.

But where do the Cassandra DCs come from? Well, it’s easy to create a cluster in a given location and Data Center name in Instaclustr Managed Cassandra!

When creating a Cassandra cluster using the Instaclustr console, there is a section called “Data Center” where you can select from options including: 

Infrastructure Provider, Region, Custom Name, Data Center Network address block, Node Size, EBS Encryption option, Replication Factor, and number of nodes.

The Custom Name is a logical name you can choose for a data center within Cassandra, and is how you reference the data center when you create a keyspace with NetworkTopologyStrategy

So that explains the mystery of single Cassandra Data Center creation. What does this look like once it’s provisioned and running? Well, you can use CQLSH to connect to a node in the cluster, and then discover the data center you are connected to as follows:

cqlsh> use system;
cqlsh:system> select data_center from local;

data_center
-------------
DCHere

How about multiple DCs?  Using Instaclustr Managed Cassandra the simplest way of creating multiple DC Cassandra clusters is to create a single DC cluster first (call it ‘DCHere’). Then in the management console for this cluster, click on “Add a DC”.  You can add one DC at a time to create a cluster with the total number of DCs you need, just follow our support documentation here and here.

2. Multi-Data Center Experiments With CQLSH

So, to better understand how Cassandra DCs work I created a test cluster with 3 nodes in each of three DCs, located in Sydney, Singapore, and North Virginia (USA) AWS regions (9 nodes in total) as follows:

Cassandra test cluster with 3 nodes in each of three DCs
For this experiment, I used cqlsh running on my laptop, located in Canberra (close to Sydney). My initial goal was limited simply to explore latencies and try out failures of DCs. 

To measure latency I turned “tracing on”, and to simulate DC failures I created multiple keyspaces, connected cqlsh to different DCs, and used different consistency levels. 

I created three separate keyspaces for each DC location. This doesn’t result in data replication across DCs, but instead directs all data written to any local DC to the single DC with RF > = 1. I.e. All data will be written to (and read from) the DCSydney DC for the “sydney” keyspace:

Create keyspace "sydney" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 0, 'DCSingapore' : 3, 'DCUSA' : 0 }; 

Create keyspace "singapore" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 0, 'DCUSA' : 0 }; 

Create keyspace "usa" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 0, 'DCUSA' : 0 };

I used a fourth keyspace for replication. Because this has multiple DCs with RF >= 1 the data will be replicated across all of the DCs, i.e. data written to any local DC will be written locally as well as to all other DCs:

Create keyspace "everywhere" with replication = {'class': 'NetworkTopologyStrategy', 'DCSydney' : 3, 'DCSingapore' : 3, 'DCUSA' : 3 };

2.1 Latency

First let’s look at latency.  To run latency tests I connected cqlsh to the Sydney Data Center.

I varied which keyspace I was writing or reading to/from (Keyspace column), and used consistency level ONE for all of these. ONE means that a write must be written to and acknowledged by at least one replica node, in any DC, so we don’t expect any write/read errors due to writing/read in a local DC which is different to the DC’s in the keyspace. The results show that latency increases from a minimum of 1-2ms (Sydney), to 200ms (Singapore) and 231ms (USA). Compared to the average inter-region network latencies I reported in the previous blog these latencies are 14% higher—the Singapore latency is 200ms (c.f. 174ms), and the USA latency is 231ms (c.f. 204ms). Longer times are to be expected as there is a Cassandra write or read included in this time, on top of the basic network latency. As expected (using consistency ONE) all of the operations succeeded. This table shows the results:

What does this reveal about how Cassandra keyspaces and DCs work? Cqlsh is connected to the Sydney DC as the local DC. For the keyspaces that just have a single DC, the write or read operation can only use that DC and therefore includes the overhead of network latency for the local DC to communicate with the remote DC (with no network overhead for sydney). However, for the “everywhere” keyspace which contains all three DCs, it behaves as if it’s just using the local DC and therefore has a low latency indistinguishable to the results for the “sydney” keyspace. The difference is that the row is written to all the other DCs asynchronously, which does not impact the operation time.  This picture shows the latencies on a map:

3 Cassandra DC's Latencies

2.2 Failures

I also wanted to understand what happens if a DC is unavailable. This was tricky to achieve acting as a typical user for Instaclustr Managed Cassandra (as there’s no way for users to stop/start Cassandra nodes), so I simulated it by using permutations of local DC, keyspaces, and a consistency level of LOCAL_ONE (a write must be sent to, and successfully acknowledged by, at least one replica node in the local DC). This also allows customers to try this out as well. Using LOCAL_ONE means that if cqlsh is connected to the Sydney DC, and the keyspace has a Sydney DC with RF >= 1 then writes and reads will succeed. But if the keyspace only has DCs in other regions (Singapore or USA) then the writes and reads will fail (simulating the failure of remote DCs).  This table shows the results of this experiment:

The results are almost identical to before, but with the key difference that we get a NoHostAvailable error (and therefore no latencies) when the keyspaces are singapore or usa. The keyspace of sydney or everywhere works ok still as the sydney DC is available in both cases. 

Note that Cassandra consistency levels are highly tunable, and there are more options that are relevant to multi-DC Cassandra operation. For example, ALL and EACH_QUORUM (writes only) work across all the DCs, and have stronger consistency, but at the penalty of higher latency and lower availability.

3. Multi-Data Centers Experiments With the Cassandra Java Client

Around the world in 80 days

In our journey “Around the World” it’s important to always have the latest information! As Phileas Fogg discovered, train timetables, like Cassandra documentation, can get out of date very quickly.

I was also interested in testing out the Cassandra Java client with my multi-DC cluster.   I had previously read that by default it supports automatic failover across multiple DCs which I thought would be interesting to see happening in practice. The DCAwareRoundRobinPolicy was recommended in the O’Reilly book “Learning Apache Cassandra (2nd edition 2017)” which says that “this policy is datacenter aware and routes queries to the local nodes first”. This is also the policy I used in my first attempt to connect with Cassandra way back in 2017!

However, a surprise was lying in wait! It turns out that since version 4.0 of the Java client there is no longer a DCAwareRoundRobinPolicy!  

Instead, the default policy now only connects to a single data center, so naturally there is no failover across DCs. You must provide the local DC name and this is the only one the client connects to. This also means that it behaves exactly like the last (LOCAL_ONE) results with cqlsh. This prevents potentially undesirable data consistency issues if you are using DC-local consistency levels but transparently failover to a different DC.

You can either handle any failures in the Java client code (e.g. if a DC is unavailable, pick the backup Cassandra DC and connect to that), or probably the intended approach is for the entire application stack in the DC with the Cassandra failure to failover to a complete copy in the backup region. I tried detecting a failure in the Java client code, and then failing over to the backup DC. This worked as expected. However, in the future I will need to explore how to recover from the failure (e.g. how do you detect when the original DC is available and consistent again, and switch back to using it).

3.1 Redundant Low-Latency DCs

This brings us full circle back to the first blog in the series where we discovered that there are 8 georegions in AWS that provide sub 100ms latency to clients in the same georegion:

8 georegions in AWS that provide sub 100ms latency to clients in the same georegion

The 8 AWS georegions with sub 100ms latency for geolocated clients

Which we then suggested could be serviced with 8 DCs to provide DC failover in each georegion as follows:

8 DCs to provide DC failover in each georegion

The matched pairs of DCs to provide High Availability (HA) and failover for each georegion look like this in table form. These are pairs of DCs that the application code will need to have knowledge of and failover between:

Table - Matched pairs of DCs to provide High Availability (HA) and failover for each georegion

In practice, the read load of the application/client would need to load balance over both of the data centers for some georegions (e.g. North America georegion load balances across both West and East Coast DCs). Depending on the data replication strategy (just replicating data written in each georegion to both redundant DC pairs, or to all DCs in the cluster—this really depends on the actual use case), and the expected extra load on each DC due to failover, DC cluster sizes will need to be sufficient to cope with the normal read/write loads on each DC, the replication write load (write amplification), and load spikes due to DC failures and failover to another DC. 

Based on these limited experiments we are now ready for the next Blog in the series, where we’ll try multi-DC Cassandra out in the context of a realistic globally distributed example application, potentially with multiple keyspaces, data centers and replication factors, to achieve goals including low latency, redundancy, and replication across georegions and even Around The World.

The post An Introduction to Cassandra Multi-Data Centers: Part 2 appeared first on Instaclustr.

Apache Cassandra 4.0 – Audit

Apache Cassandra 4.0 brings about a long awaited feature for tracking and logging database user activity. Primarily aimed at providing a robust set of audit capabilities allowing operators of Cassandra to meet external compliance obligations, it brings yet another enterprise feature into the database.  Combining work for the full query log capability, the audit log capability provides operators with the ability to audit all DML DDL and DCL changes to either a binary file or a user configurable source (including the new Diagnostics notification changes). 

This capability will go a long way toward helping Cassandra operators meet their SOX and PCI requirements.  If you are interested in reading about the development of the feature you can follow along here: https://issues.apache.org/jira/browse/CASSANDRA-12151

From a performance perspective the changes appear to only have a fairly minor hit on throughput and latency when enabled, and no discernible impact when disabled. Expect to see 10% to 15% impact on mixed workload throughput and p99 latency.

By default audit logs are written in the BinLog format and Cassandra comes with tools for parsing and processing them to human readable formats. Cassandra also supports executing an archive command for simple processing of audit logs. Audited keyspaces, users, and command categories can be whitelisted and blacklisted. Audit logging can be enabled in cassandra.yaml. 

What’s the Difference Between Audit Logging, Full Query Logging and Diagnostic Events? 

Both Audit logging (BinAuditLogger) and Full Query logging are managed internally by Apache Cassandra’s AuditLogManager. Both implement IAuditLogger, but are predefined in Apache Cassandra. The main difference is that the full query log receives AuditLogEntries before being processed by the AuditLogFilter. Both the FQL and BAL leverage the same BinLog format and share a common implementation of it. 

Diagnostic events are effectively a queue of internal events that happen in the node. There is an IAuditLogger implementation that publishes filtered LogEntries to the Diagnostics queue if users choose to consume audit records this way.

So think of it this way: Cassandra has an audit facility that enables both configurable audit on actions as well as a full query log, you can have as many AuditLoggers enabled as you want. Diagnostic events is a way for pushing events to client drivers using the CQL protocol and you can pipe AuditEvents to the Diagnostics system!

How Is This Different From Cassandra’s Change Data Capture() Mechanism?

Apache Cassandra has supported CDC on tables for some time now, however the implementation has always been a fairly low level and hard to consume mechanism. CDC in Cassandra is largely just an index into commitlog files that point to data relevant to the table with CDC enabled. It was then up to the consumer to read the commitlog format and do something with it. It also only just captured mutations that were persisted to disk.

Audit logging capability will log all reads, writes, login attempts, schema changes etc. Both features could be leveraged to build a proper CDC stream. I would hazard a guess that it’s probably easier to do with the IAuditLogger interface than consuming the CDC files!

The post Apache Cassandra 4.0 – Audit appeared first on Instaclustr.