Spring Data Cassandra vs. Native Driver

Intro

For some time now spring data with cassandra is getting more and more popular. My main concern with the framework is performance characteristics when compared to native cql driver. After all with the driver everything is under your control and one can probably squeeze much more juice out of cluster. O.k. I admit it's not always about performance. If that would be the case we would all be writing software in C or assembler. But still I think it's a good practice to be aware of the drawbacks.

To be honest spring data cassandra is relatively new to me. I did the performance comparison on the lowest level without using repositories and other high level concepts that come with spring data cassandra. My focus in this post is more on the generics that decode the data that comes out from the driver. To make a comparison I'm going to use a simple cassandra table (skinny row), then I'm going to make query after query (5000 and 10000) towards cassandra and after that I'll decode results. Once again the focus in this post is not on performance characteristics of higher order functionalities like paged queries etc. I just wanted to know by a rule of thumb what can I expect from spring data cassandra.

Setup

    -- simple skinny row
    CREATE TABLE activities (
        activity_id uuid,
        activity_model_id bigint,
        activity_state text,
        asset_id text,
        attrs map<text, text>,
        creation_time timestamp,
        customer_id text,
        end_time timestamp,
        last_modified_time timestamp,
        person_id text,
        poi_id text,
        start_time timestamp,
        PRIMARY KEY (activity_id)
    );

    
To eliminate all possible effects, I just used single skinny row:
    activity_id 72b493f0-e59d-11e3-9bd6-0050568317c1
    activity_model_id 66
    activity_state DONE
    asset_id 8400848739855200000
    attrs {
        'businessDrive': '1:1',
        'customer': '4:test_test_test',
        'distance': '3:180', 
        'endLocation': '6:15.7437466839,15.9846853333,0.0000000000',
        'fromAddress': '4:XX1', 
        'locked': '1:0', 
        'reason': '4:Some reason 2', 
        'startLocation': 
        '6:15.7364385831,15.0071729736,0.0000000000', 
        'toAddress': '4:YY2'
        }
    creation_time 2014-05-27 14:50:14+0200
    customer_id 8400768435301400000
    end_time 2014-05-27 12:15:40+0200
    last_modified_time 2014-05-29 21:30:44+0200
    person_id 8401111750365200000
    poi_id null
    start_time 2014-05-27 12:13:05+0200
    
This row is fetched every time, to detect differences We'll see how long the iterations last. Network and cluster is also out of scope so everything was tested on local running datastax cassandra community (2.0.16) instance.

The code

To separate all possible interfering effects I used two separate projects. I had a situation where I used an old thrift api together with cql driver and it significantly affected performance. And it required additional configuration parameters etc. The main code snippets are located on gist. This is not the focus here, but if somebody is interested:

spring-data
native-drivers

Results in milliseconds

    3 fields - 5000 items
        spring-data
        5381
        5282
        5385
        avg: 5339

        driver
        4426
        4280
        4469
        avg: 4390

        result: driver faster 21.6%

    3 fields - 10000 items
        spring-data
        8560
        8133
        8144
        avg: 8279

        driver
        6822
        6770
        6875
        avg: 6822
        
        result: driver faster 21.3%

    12 fields - 5000 items
        spring-data
        5911
        5920
        5928
        avg: 5920 - 10.88 % slower than with 3 fields!

        driver
        4687
        4669
        4606
        avg: 4654 - 6 % slower than with 3 fields

        result: driver faster 27%

Conclusions

Spring data cassandra may be very interesting if you are interested to learn something new. It might also have very positive development effects when prototyping or doing something similar. I didn't test the higher order functionalities like pagination etc. This was just a rule of a thumb test to see what to expect. Basically the bigger the classes that you have to decode the bigger the deserialization cost. At least this is the effect I'm noticing in my basic tests.

Follow up with Object Mapping available in Cassandra driver 2.1

There was an interesting follow up disuccion on reddit. By a proposal from reddit user v_krishna another candidate was added to comparison Object-mapping API.

Let's see the results:

    3 fields - 5000 items
        spring-data
        5438
        5453
        5576
        avg: 5489

        object-map
        5390
        5299
        5476
        avg: 5388

        driver
        4382
        4410
        4249
        avg: 4347

    conclusion
        - driver 26% faster than spring data
        - object map just under 2% faster than spring data

    3 fields - 10000 items
        spring-data
        8792
        8507
        8473
        avg: 8591

        object-map
        8435
        8494
        8365
        avg: 8431

        driver
        6632
        6760
        6646
        avg: 6679

    conclusion
        - driver faster 28.6% than spring data
        - object mapping just under 2% faster than spring data

    12 fields 5000 items
        spring-data
        6193
        5999
        5938
        avg: 6043

        object-map
        6062
        5936
        5911
        avg: 5970

        driver
        4910
        4955
        4596
        avg: 4820

    conclusion
        - driver 25% faster than spring data
        - object mapping 1.2% faster than spring data

To keep everything fair, there was some deviation in test runs when compared to previous test, here are deviations:

comparison with first run:
    3 fields - 5000 items
        spring-data
        avg1: 5339
        avg2: 5489
        2.7% deviation

        driver
        avg1: 4390
        avg2: 4347
        1% deviation

    3 fields - 10000 items
        spring-data
        avg1: 8279
        avg2: 8591
        3.6% deviation

        driver
        avg1: 6822
        avg2: 6679
        2.1% deviation

    12 fields 5000 items
        spring-data
        avg1: 5920
        avg2: 6043
        2% deviation

        driver
        avg1: 4654
        avg2: 4820
        3.4% deviation
Object mapping from spring data seems to be just a bit slower then object mapping available in new driver. I can't wait to see the comparison of two in future versions. Initially I was expecting around 5-10% percent worse performance when compared to object mapping capabilities. It surprised me a bit that the difference was more on the level of 25%. So if you are planning on using object mapping capabilities there is a performance penalty.

Enhance Apache Cassandra Logging

Cassandra usually output all its logs in a system.log file. It uses log4j old 1.2 version for cassandra 2.0, and since 2.1, logback, which of course use different syntax :)
Logs can be enhanced with some configuration. These explanations works with Cassandra 2.0.x and Cassandra 2.1.x, I haven’t tested others versions yet.

I wanted to split logs in different files, depending on their “sources” (repair, compaction, tombstones etc), to ease debugging, while keeping the system.log as usual.

For example, to declare 2 new files to handle, say Repair and Tombstones logs :

Cassandra 2.0 :

You need to declare each new log files in log4j-server.properties file.

[...]
## Repair
log4j.appender.Repair=org.apache.log4j.RollingFileAppender
log4j.appender.Repair.maxFileSize=20MB
log4j.appender.Repair.maxBackupIndex=50
log4j.appender.Repair.layout=org.apache.log4j.PatternLayout
log4j.appender.Repair.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
## Edit the next line to point to your logs directory
log4j.appender.Repair.File=/var/log/cassandra/repair.log

## Tombstones
log4j.appender.Tombstones=org.apache.log4j.RollingFileAppender
log4j.appender.Tombstones.maxFileSize=20MB
log4j.appender.Tombstones.maxBackupIndex=50
log4j.appender.Tombstones.layout=org.apache.log4j.PatternLayout
log4j.appender.Tombstones.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
### Edit the next line to point to your logs directory
log4j.appender.Tombstones.File=/home/log/cassandra/tombstones.log

Cassandra 2.1 :

It is in the logback.xml file.

  <appender name="Repair" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>${cassandra.logdir}/repair.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
      <fileNamePattern>${cassandra.logdir}/system.log.%i.zip</fileNamePattern>
      <minIndex>1</minIndex>
      <maxIndex>20</maxIndex>
    </rollingPolicy>

    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
      <maxFileSize>20MB</maxFileSize>
    </triggeringPolicy>
    <encoder>
      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
      <!-- old-style log format
      <pattern>%5level [%thread] %date{ISO8601} %F (line %L) %msg%n</pattern>
      -->
    </encoder>
  </appender>

  <appender name="Tombstones" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>${cassandra.logdir}/tombstones.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
      <fileNamePattern>${cassandra.logdir}/tombstones.log.%i.zip</fileNamePattern>
      <minIndex>1</minIndex>
      <maxIndex>20</maxIndex>
    </rollingPolicy>

    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
      <maxFileSize>20MB</maxFileSize>
    </triggeringPolicy>
    <encoder>
      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
      <!-- old-style log format
      <pattern>%5level [%thread] %date{ISO8601} %F (line %L) %msg%n</pattern>
      -->
    </encoder>
  </appender>

Now that theses new files are declared, we need to fill them with logs. To do that, simply redirect some Java class to the good file. To redirect the class org.apache.cassandra.db.filter.SliceQueryFilter, loglevel WARN to the Tombstone file, simply add :

Cassandra 2.0 :

log4j.logger.org.apache.cassandra.db.filter.SliceQueryFilter=WARN,Tombstones

Cassandra 2.1 :

<logger name="org.apache.cassandra.db.filter.SliceQueryFilter" level="WARN">
    <appender-ref ref="Tombstones"/>
</logger>

It’s a on-the-fly configuration, so no need to restart Cassandra !
Now you will have dedicated files for each kind of logs.

A list of interesting Cassandra classes :

org.apache.cassandra.service.StorageService, WARN : Repair
org.apache.cassandra.net.OutboundTcpConnection, DEBUG : Repair (haha, theses fucking stuck repair)
org.apache.cassandra.repair, INFO : Repair
org.apache.cassandra.db.HintedHandOffManager, DEBUG : Repair
org.apache.cassandra.streaming.StreamResultFuture, DEBUG : Repair 
org.apache.cassandra.cql3.statements.BatchStatement, WARN : Statements
org.apache.cassandra.db.filter.SliceQueryFilter, WARN : Tombstones

You can find from which java class a log message come from by adding “%c” in log4j/logback “ConversionPattern” :

org.apache.cassandra.db.ColumnFamilyStore INFO  [BatchlogTasks:1] 2015-09-18 16:43:48,261 ColumnFamilyStore.java:939 - Enqueuing flush of batchlog: 226172 (0%) on-heap, 0 (0%) off-heap
org.apache.cassandra.db.Memtable INFO  [MemtableFlushWriter:4213] 2015-09-18 16:43:48,262 Memtable.java:347 - Writing Memtable-batchlog@1145616338(195.566KiB serialized bytes, 205 ops, 0%/0% of on/off-heap limit)
org.apache.cassandra.db.Memtable INFO  [MemtableFlushWriter:4213] 2015-09-18 16:43:48,264 Memtable.java:393 - Completed flushing /home/cassandra/data/system/batchlog/system-batchlog-tmp-ka-4267-Data.db; nothing needed to be retained.  Commitlog position was ReplayPosition(segmentId=1442331704273, position=17281204)

You can disable “additivity” (i.e avoid adding messages in system.log for example) in log4j for a specific class by adding :

log4j.additivity.org.apache.cassandra.db.filter.SliceQueryFilter=false

For logback, you can add additivity=”false” to <logger .../> elements.

To migrate from log4j logs to logback.xml, you can look at http://logback.qos.ch/translator/

Sources :

Note: you can add http://blog.alteroot.org/feed.cassandra.xml to your rss aggregator to follow all my Cassandra posts :)

Analysis of Cassandra powered Greenhouse with Apache Spark

Intro

In the previous post we went over the steps for gathering the data on the Rasperry pi.

  1. Gather Data on Raspberry Pi with Cassandra and Arduino
  2. Arduino Greenhouse
In this post I'm going to go over the steps necessary to get the data into Cassandra and then process it with Apache Spark.

Cassandra queries

    -- we'll keep the data on just one node
    CREATE KEYSPACE home
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };
    
    -- create statement, bucketed by date
    CREATE TABLE greenhouse (
        source text,
        day text,
        time timestamp,
        temperaturein decimal,
        temperatureout decimal,
        temperaturecheck decimal,
        humidity decimal,
        light int,
        PRIMARY KEY ((source, day), time)
    )
    WITH CLUSTERING ORDER BY (time DESC);
    
    -- example insert, just to check everything out
    INSERT INTO greenhouse (
        source, day, time, temperaturein,
        temperatureout, temperaturecheck,
        humidity, light)
    VALUES ('G', '2015-04-04', dateof(now()), 0,
        0, 0, 0, 0);
    
    -- check if everything is inserted
    SELECT * FROM greenhouse WHERE source = 'G' AND day = '2015-04-19';
    

Analysis results

I wanted to keep the partitions relatively small because I didn't know how RaspberryPi is going to handle the data. Timeout is possible if the rows get to big so I went with the partitioning the data by day. The analysis of the April showed that the project paid off. Here are the results of analysis:

Total Data points(not much, but it's a home DIY solution after all)
172651

First record
Measurement{source='G', day='2015-04-04', time=Sat Apr 04 17:04:41 CEST 2015, temperaturein=11.77, temperatureout=10.43, temperaturecheck=15.0, humidity=46.0, light=57}

Last record
Measurement{source='G', day='2015-05-04', time=Mon May 04 09:37:35 CEST 2015, temperaturein=22.79, temperatureout=20.49, temperaturecheck=23.0, humidity=31.0, light=68}

Cold nights(bellow 2 C outside)
2015-04-06
2015-04-07
2015-04-10
2015-04-16
2015-04-17
2015-04-18
2015-04-19
2015-04-20

Lowest In
Measurement{source='G', day='2015-04-06', time=Mon Apr 06 06:22:25 CEST 2015, temperaturein=2.28, temperatureout=2.39, temperaturecheck=4.0, humidity=41.0, light=8}

Highest In
Measurement{source='G', day='2015-04-22', time=Wed Apr 22 14:52:26 CEST 2015, temperaturein=75.53, temperatureout=43.53, temperaturecheck=71.0, humidity=21.0, light=84}

Average In
19.45

Lowest Out
Measurement{source='G', day='2015-04-20', time=Mon Apr 20 04:42:16 CEST 2015, temperaturein=4.48, temperatureout=-2.88, temperaturecheck=6.0, humidity=31.0, light=0}

Highest Out
Measurement{source='G', day='2015-04-22', time=Wed Apr 22 15:58:32 CEST 2015, temperaturein=57.69, temperatureout=45.07, temperaturecheck=56.0, humidity=24.0, light=71}

Average Out
14.71

Average Difference
4.75

Biggest Diff
Measurement{source='G', day='2015-04-20', time=Mon Apr 20 15:11:53 CEST 2015, temperaturein=69.93, temperatureout=28.36, temperaturecheck=62.0, humidity=21.0, light=83}

The code

  1. Spark analysis code

Gather Data on Raspberry Pi with Cassandra and Arduino

Intro

In the previous post we went over the steps necessary to make a sensor for a small greenhouse for the balcony.

  1. Arduino Greenhouse
In this section we are going to concentrate on how to gather the data coming in from the Greenhouse. The approach is applicable for any kind of telemetry data or something similar. The parts list is simpler than in the previous section but as a "concentrator" node we are going to use a raspberry pi. Here are the parts:
  • Arduino Uno
  • USB cable
  • Raspberry PI
  • nRF24L01+
  • 7 Wires
To install Arduino libraries please consult the previous post. The wiring for the nRF24 is the same as in the previous post.

Persisting the data

To persist the data I opted for Apache Cassandra. It's a good fit even for a low powered Raspberry Pi. Cassandra is java technology. So before installing Cassandra you have to install java. It's all written up nicely in the following posts:

  1. Install Java
  2. Installing Cassandra

Overview of the process

The code

  1. Data Gathering in Arduino
  2. Python serial to Cassandra bridge
To be continued ...

How to change Cassandra compaction strategy on a production cluster

I’ll talk about changing Cassandra CompactionStrategy on a live production Cluster.
First of all, an extract of the Cassandra documentation :

Periodic compaction is essential to a healthy Cassandra database because Cassandra does not insert/update in place. As inserts/updates occur, instead of overwriting the rows, Cassandra writes a new timestamped version of the inserted or updated data in another SSTable. Cassandra manages the accumulation of SSTables on disk using compaction. Cassandra also does not delete in place because the SSTable is immutable. Instead, Cassandra marks data to be deleted using a tombstone.

By default, Cassandra use SizeTieredCompactionStrategyi (STC). This strategy triggers a minor compaction when there are a number of similar sized SSTables on disk as configured by the table subproperty, 4 by default.

Another compaction strategy available since Cassandra 1.0 is LeveledCompactionStrategy (LCS) based on LevelDB.
Since 2.0.11, DateTieredCompactionStrategy is also available.

Depending on your needs, you may need to change the compaction strategy on a running cluster. Change this setting involves rewrite ALL sstables to the new strategy, which may take long time and can be cpu / i/o intensive.

I needed to change the compaction strategy on my production cluster to LeveledCompactionStrategy because of our workflow : lot of updates and deletes, wide rows etc.
Moreover, with the default STC, progressively the largest SSTable that is created will not be compacted until the amount of actual data increases four-fold. So it can take long time before old data are really deleted !

Note: You can test a new compactionStrategy on one new node with the write_survey bootstrap option. See the datastax blogpost about it.

The basic procedure to change the CompactionStrategy is to alter the table via cql :

cqlsh> ALTER TABLE mykeyspace.mytable  WITH compaction = { 'class' :  'LeveledCompactionStrategy'  };

If you run alter table to change to LCS like that, all nodes will recompact data at the same time, so performances problems can occurs for hours/days…

A better solution is to migrate nodes by nodes !

You need to change the compaction locally on-the-fly, via the JMX, like in write_survey mode.
I use jmxterm for that. I think I’ll write articles about all theses jmx things :)
For example, to change to LCS on mytable table with jmxterm :

~ java -jar jmxterm-1.0-alpha-4-uber.jar --url instance1:7199                                                      
Welcome to JMX terminal. Type "help" for available commands.
$>domain org.apache.cassandra.db
#domain is set to org.apache.cassandra.db
$>bean org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies
#bean is set to org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies
$>get CompactionStrategyClass
#mbean = org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies:
CompactionStrategyClass = org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
$>set CompactionStrategyClass "org.apache.cassandra.db.compaction.LeveledCompactionStrategy" 
#Value of attribute CompactionStrategyClass is set to "org.apache.cassandra.db.compaction.LeveledCompactionStrategy" 

A nice one-liner :

~ echo "set -b org.apache.cassandra.db:columnfamily=mytable,keyspace=mykeyspace,type=ColumnFamilies CompactionStrategyClass org.apache.cassandra.db.compaction.LeveledCompactionStrategy" | java -jar jmxterm-1.0-alpha-4-uber.jar --url instance1:7199

On next commitlog flush, the node will start it compaction to rewrite all it mytable sstables to the new strategy.

You can see the progression with nodetool :

~ nodetool compactionstats
pending tasks: 48
compaction type        keyspace           table       completed           total      unit  progress
Compaction        mykeyspace       mytable      4204151584     25676012644     bytes    16.37%
Active compaction remaining time :   0h23m30s

You need to wait for the node to recompact all it sstables, then change the strategy to instance2, etc.
The transition will be done in multiple compactions if you have lots of data. By default new sstables will be 160MB large.

you can monitor you table with nodetool cfstats too :

~ nodetool cfstats mykeyspace.mytable
[...]
Pending Tasks: 0
        Table: sort
        SSTable count: 31
        SSTables in each level: [31/4, 0, 0, 0, 0, 0, 0, 0, 0]
[...]

You can see the 31/4 : it means that there is 31 sstables in L0, whereas cassandra try to have only 4 in L0.

Taken from the code ( src/java/org/apache/cassandra/db/compaction/LeveledManifest.java )

[...]
// L0: 988 [ideal: 4]
// L1: 117 [ideal: 10]
// L2: 12  [ideal: 100]
[...]

When all nodes have the new strategy, let’s go for the global alter table. /!\ If a node restart before the final alter table, it will recompact to default strategy (SizeTiered)!

~ cqlsh 
cqlsh> ALTER TABLE mykeyspace.mytable  WITH compaction = { 'class' :  'LeveledCompactionStrategy'  };

Et voilà, I hope this article will help you :)

My latest Cassandra blogpost was one year ago… I have several in mind (jmx things !) so stay tuned !

Cassandra Community Handling 100 000 req per second

Intro

Recently I got an assignment to prove that Cassandra cluster can hold up to 100 000 requests per second. Also all this had to be done on the budget and with not so much time spent on development of the whole application. This setup had to be as close to the real thing as possible. We will go trough the details soon. Here is just the basic overview of the experiment:

Amazon

Generating and handling the load on this scale requires the infrastructure that is usually not available within a personal budget so I turned to Amazon EC2. I listened about the EC2 for quite some time now and It turned out really easy to use. Basically All you have to do is to setup a security group and store the "pem" file for that security group. Really easy and if anybody didn't try it yet there is a free micro instance available for a whole year after registering. I won't go into details of how to setup the security group. It's all described in the DataStax documentation. Note that the security definition is a bit extensive and that defining the port range from 1024-65535 is sufficient for an inter group communication and I didn't expose any ports to the public as described in the documentation. The second part is generating the key pair. In the rest of the document I'll reference this file as "cassandra.pem".

Load

Generating the load on that scale is not as easy as it might seem. After some searching I've stumbled upon the following. So I came to a conclusion that the best solution is to use Tsung. I've setup the load generating machines with the following snippet. Note that I've placed the "cassandra.pem" file on the node from which I'll start running tsung. Read the node addresses from the aws console. The rest is pretty much here:

        # do this only for the machine from which you'll initiate tsung
        scp -i cassandra.pem cassandra.pem ec2-user@tsung_machine:~

        # connect to every load machine and install erlang and tsung
        ssh -i cassandra.pem ec2-user@every_load_machine

        # repeat this on every node
        sudo yum install erlang

        wget http://tsung.erlang-projects.org/dist/tsung-1.5.1.tar.gz
        tar -xvzf tsung-1.5.1.tar.gz
        cd tsung-1.5.1
        ./configure
        make
        sudo make install

        # you can close other load nodes now
        # go back to the first node. and move cassandra.pem to id_rsa
        mv cassandra.pem .ssh/id_rsa

        # now make an ssh connection from first tsung node to every
        # load generating machine (to add the host key) so that
        # the first tsung node won't have any problem connecting to
        # other nodes and issuing erlang commands to them
        ssh ip-a-b-c-d
        exit

        # create the basic.xml file on the first tsung node
        vi basic.xml
    

The second part with the load generating machines is to edit the basic.xml file. To make it more interesting we are going to send various kinds of messages with a timestamp. The users list will be predefined in a file userlist.csv. Note that the password is the same for all the users, you can adapt this to your own needs or completely remove the password:

        0000000001;pass
        0000000002;pass
        0000000003;pass
        ...
        ...
        ...
    

The tsung tool is well documented, the configuration I used is similar to this:

        <?xml version="1.0" encoding="utf-8"?>
        <!DOCTYPE tsung SYSTEM "/usr/share/tsung/tsung-1.0.dtd" []>
        <tsung loglevel="warning">

        <clients>
            <client host="ip-a-b-c-d0" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d1" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d2" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d3" cpu="8" maxusers="25"/>
        </clients>

        <servers>
            <server host="app-servers-ip-addresses-internal" port="8080" type="tcp"/>
            <!-- enter the rest of the app servers here-->
        </servers>

        <load>
            <arrivalphase phase="1" duration="11" unit="minute">
                <users maxnumber="100" arrivalrate="100" unit="second"/>
            </arrivalphase>
        </load>

        <options>
            <option name="file_server" id='id' value="userlist.csv"/>
        </options>

        <sessions>
            <session probability="100" name="load_session" type="ts_http">
                <setdynvars sourcetype="file" fileid="id" delimiter=";" order="iter">
                    <var name="username" />
                    <var name="pass" />
                </setdynvars>
                <setdynvars sourcetype="eval"
                            code="fun({Pid,DynVars}) -&gt;
                            {Mega, Sec, Micro} = os:timestamp(),
                            (Mega*1000000 + Sec)*1000 + round(Micro/1000)
                            end.
                            ">
                    <var name="millis" />
                </setdynvars>
                <for from="1" to="10000000" var="i">
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%ABC41.7127837,42.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%DEF43.7127837,44.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%GHI45.7127837,46.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%JKL47.7127837,48.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%MNO49.7127837,50.71278370000.0"  method="GET"/>
                    </request>
                </for>
            </session>
        </sessions>
        </tsung>
    

Resources

  • 3x c3.xlarge
  • 1x c4.xlarge
Note I've added c4 node because I was limited on the amazon with the number of instances I could boot.

App

I've spent most of the time on the app part when developing. The basics for the component handling the requests was netty listener. In one of my previous posts I described how to use netty to handle http requests and acknowledge them with HELLO message. Here I acknowledged them with OK.

The most complicated part with the messages was sending them to cassandra as fast as possible. The fastest way to send them is to use executeAsync. Initially I had trouble with it where I was loosing messages. Some of the issues were due to concurrency. Some were due to poor understanding of the DataStax driver.

Concurrency - Basically what I was doing was that I tried to save on instantiating the BoundStatement instances because of the overal speed. The BoundStatement is not thread safe and after calling the bind method it returns "this". It took me some time to figure this out because when used in loops this behavior is not dangerous. Anyway, thanks to colleague I figured it out.

        // always instantiate new in concurrent code
        // don't reuse and make multiple calls with .bind()!

        BoundStatement bs = new BoundStatement(insertStatement);
    

Asynchronous execution - also a bit tricky. The executeAsync returns a future. Initially I was just adding it to Futures.

        // don't do this under heavy load with the result of executeAsync
        // in Cassandra you will start to loose data

        Futures.addCallback(future, ...
    

After some trial and error I found a pattern where I didn't loose any data:

        // here we are going to keep the futures
        private ArrayBlockingQueue<ResultSetFuture> queue = 
            new ArrayBlockingQueue<>(10000);

        // in the handling code
        queue.add(session.executeAsync(bs));

        // when reaching 1000th element in the queue
        // start emptying it
        if (queue.size() % 1000 == 0) {
            ResultSetFuture elem;
            do {
                elem = queue.poll();
                if (elem != null) {
                    elem.getUninterruptibly();
                }
            } while (elem != null);
        }

        // this will make your insertions around
        // 4x faster when compared to normal execute
    

App setup

The instances come with Open JDK installed. This doesn't guarantee the best performance so I installed the Oracle java. In order not to loose the time on firewall setup I simply copied the "cassandra.pem" file to every node.

        # copy ".jar" and "cassandra.pem" file to a single app node
        # copy the two files from single node to other nodes
        # it's a lot faster then uploading to every node (at least on my connection)

        # setup the machine
        wget --no-check-certificate --no-cookies - --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u71-b14/jdk-7u71-linux-x64.tar.gz"
        
        tar -xvzf jdk-7u71-linux-x64.tar.gz

        sudo update-alternatives --install "/usr/bin/java" "java" "/home/ec2-user/jdk1.7.0_71/jre/bin/java" 1
        
        # pick the new java number in this step
        sudo update-alternatives --config java

        # check with this
        java -version
    

Resources

  • 2x c4.xlarge
  • 2x c4.2xlarge
  • 4x c3.xlarge
Note I've added c4 nodes because I was limited on the amazon with the number of instances I could boot. Also I had to request it with the customer service but I couldn't assume how many instances of every type I'll use so the instances are not of the same type for load and app servers.

Cassandra

Setting up the Cassandra is the easiest part of the whole undertaking. All I did was following this guide by DataStax.

Resources

  • 7x c3.2xlarge
After hanging on the 90 000 req/s for a while I came to conclusion that perhaps the replication factor of two might be too much for the resources I had available. I would probably need to further increase the number of Cassandra nodes but since I couldn't get any more instance up I've set the replication to 1. Notice that this replication factor does not allow loosing nodes in the cluster without loosing the data. But the goal here is 100 000 req/s on a budget :)

Results

In the end it took me around 30$ to reach the 100k limit. I'm afraid to calculate how much this setup would cost on a monthly or yearly basis.

The successful run looked like this:

Total messages: 31 145 914 messages
Checked number: 31 145 914 messages
Average: 103 809 req/s

Don't be afraid to send me an email if you have any questions what so ever ;)

Setting up Cassandra Cluster in Virtual Machines

Intro

From time to time having just one Cassandra instance installed on your machine is not enough because you want to test certain behaviors when Cassandra cluster is up and running. Having extra spare hardware on the side or processing time on amazon is not always an option. So it's a good idea to setup a simple cluster on your own machine with instances in virtual machines. This post is going to show you how to do it with VirtualBox.

Getting VirtualBox Images

The reason why I chose VirtualBox is that there are lot of free virtual images available. Most of the time you'll be installing Cassandra on a Linux machine. I decided to go with the CentOS. Head over to http://virtualboxes.org/images/centos/ and download CentOS-6.6-x86_64-minimal. The default settings are fine for every machine. Create couple of them, give them names so that you can differentiate between them (Node1, Node2, etc. ...).

Perhaps the best idea would be for you to setup one node first and then make copies afterwards. Do not forget to set the network to bridged adapter. The username and password for the virtual machines are probably set to "root/reverse" but check those options when downloading the virtual box image. To keep it short I'll just continue with using the root user. When doing things in production it's an extremely bad practice.

Setup networking

When importing .ova file virtual box is going to ask you if you want to reinitialize mac address. Check that option. There is a certain amount of buggy behavior when it comes down to networking. So to prevent those errors run the following command when logging in to the virtual machine (root/reverse):

        rm  /etc/udev/rules.d/70-persistant-net.rules
    
When VirtualBoxinitializes the networking on the virtual machine it put a new mac address to a file. There seems to be a bug where this mac address is not transferred from that file to the virtual machine settings. Run the following command and copy the MAC Address.
        cat /etc/sysconfig/network-scripts/ifcfg-eth0
    
Shutdown the machine and set the mac address under Settings > Network > Advanced > MAC Address

Install Java

Just to make things a bit easier we're going to install wget:

        yum install wget
    
Now we are going to install java:
        $ cd /opt/
        $ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u72-b14/jdk-7u72-linux-x64.tar.gz"
        $ tar xzf jdk-7u72-linux-x64.tar.gz
        $ rm jdk-7u72-linux-x64.tar.gz

        $ cd /opt/jdk1.7.0_72/

        $ alternatives --install /usr/bin/java java /opt/jdk1.7.0_72/bin/java 2
        $ alternatives --config java

        $ alternatives --install /usr/bin/jar jar /opt/jdk1.7.0_72/bin/jar 2
        $ alternatives --install /usr/bin/javac javac /opt/jdk1.7.0_72/bin/javac 2
        $ alternatives --set jar /opt/jdk1.7.0_72/bin/jar
        $ alternatives --set javac /opt/jdk1.7.0_72/bin/javac

        $ vi /etc/profile.d/java.sh
        export JAVA_HOME=/opt/jdk1.7.0_72
        export JRE_HOME=/opt/jdk1.7.0_72/jre
        export PATH=$PATH:/opt/jdk1.7.0_72/bin:/opt/jdk1.7.0_72/jre/bin
    
reboot (and check with echo $JAVA_HOME[enter])

Install Cassandra

Cassandra is installed and run by the following commands:

        $ cd /opt/
        $ wget http://downloads.datastax.com/community/dsc-cassandra-2.1.2-bin.tar.gz
        $ tar xzf dsc-cassandra-2.1.2-bin.tar.gz
        $ rm dsc-cassandra-2.1.2-bin.tar.gz

        [check ip address with ifconfig]

        $ cd conf

        $ vi cassandra.yaml
            rpc_address: ip address of the node
            broadcast_address: ip address of the node
            - seeds: ip_address of the first node

        $ cd ../bin
        $ ./cassandra
    

Firewall settings

The cluster will not work out of the box because of the firewall settings. To start everything you will need to enable the following ports:

        $ iptables -I INPUT -p tcp -m tcp --dport 9042 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7000 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7001 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7199 -j ACCEPT

        $ /etc/init.d/iptables save

        $ service iptables restart
    
Now make copies of this machine and update cassandra.yaml file with the ip addresses of the new machines. Also do check /var/log/cassandra/system.log to see if other nodes are joining in.

Installing Cassandra on MINIX NEO X5 min (android multimedia player)

Intro

I started doing some DIY home automation projects. Although I have the mega popular Raspberry Pi available I decided to use the MINIX NEO X5 mini because I felt this device could be used a lot better if it served me as some sort of home automation server. The first part in this story is getting a more server oriented OS on the device. I decided to go with the linux. After a lot of searching and trial and error I decided to deploy an application called Linux deploy and described it in my previous blog post. Trough the rest of the tutorial I'll assume you managed to install a linux instance on your MINIX. I am going to gather a lot of telemetry data with the solution I am building so installing Cassandra seems as a natural choice to me. There will be a lot of writes and Cassandra is good at writing at an incredible scale.

Installing Java

        $ echo "deb http://ppa.launchpad.net/webupd8team/java/ubuntu trusty main" | sudo tee /etc/apt/sources.list.d/webupd8team-java.list
        $ echo "deb-src http://ppa.launchpad.net/webupd8team/java/ubuntu trusty main" | sudo tee -a /etc/apt/sources.list.d/webupd8team-java.list
        $ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys EEA14886
        $ sudo apt-get update
        $ sudo apt-get install oracle-java8-installer
        # you'll need to accept license agreement

        # set environment variables
        $ sudo apt-get install oracle-java8-set-default

        # login once again just in case
        $ exit
    

Installing python

Cassandra comes with a very nice tool called cqlsh. The version of linux we currently have installed will not run it without a python available on the system. So we have to install it first.

        $ sudo apt-get install python2.7
    

Let's start the Cassandra

Configuring the Cassandra is a chapter on it's own. We'll make minimal adjustments before starting. We'll configure the Cassandra to respond to queries from other hosts and while we are at it we'll enable the virtual nodes. (Will be easier to scale later).

        $ cd CASSANDRA_INSTALL_DIRECTORY
        $ nano conf/cassandra.yaml

        # uncomment
        num_tokens: 256

        # change to 0.0.0.0
        # this will enable you to contact the cassandra
        # from other computers etc.
        rpc_address: 0.0.0.0

        #save file

        $ cd ..
        $ ./bin/cassandra

        # after seeing something like
        # Startup completed! Now serving reads.
        # press ^C (don't be afraid cassandra still runs)

        $ bin/cqlsh

        Connected to Test Cluster at localhost:9160.
        [cqlsh 3.1.8 | Cassandra 1.2.18 | CQL spec 3.0.5 | 
        Thrift protocol 19.36.2]
        Use HELP for help.
        cqlsh>
    

Shutting cassandra down:

        # find PID of a cassandra process

        $ ps -ef | grep cassandra

        # run kill -9 [the PID number ... i.e. 8212]
    

Running Cassandra on android multimedia player is fun :)

Cassandra with Node.js and Arduino

Intro

This post continues where this post stopped. The Cassandra setup used for this post is more or less the same so please read this post if you are interested in cassandra setup before continuing with the rest of the post.

Arduino

Learning big data stuff is most exciting when the data represents something from the real world and not something generated with a help of big loop and then randomized data in it. To create data for this example I've used the following components:

  1. arduino uno
  2. Photoresistor GL5528 LDR
  3. 10K OHM NTC Thermistor 5mm
  4. 2x 10k resistor
  5. Protoboard
  6. Wires
Couple of this inexpensive components combined with arduino give us a nice big data sensor / generator. Now it might not seem that complicated but sampling any data at a one second level will hit on the cassandra limitations after one month of sampling if not done right, so having a simple arduino setup is fun and motivating way to tackle learning cassandra stuff. For now let's concentrate on the arduino part. The wiring is shown here:


The Arduino sketch will be on the gitHub, so we'll concentrate on the important parts. The light level in this example is read at analog 0. Reading analog values in arduino results in values ranging from 0-1023. We'll define light level as a mapping from 0-1023 into 0-100. Arduino already has a built in function for this called map. Also, I had some trouble in my initial experiments with Arduino serial communication and reading pin values. The data written to the serial port simply got corrupted after a while. I've read a couple of forums on this subject and found out that it actually helps when one delays execution after reading a pin value for 1ms. Also to keep the things as stable as possible we'll pause the execution for 1 second after writing to serial port as shown here:

  int light = map(analogRead(0), 0, 1023, 0, 100);
  delay(1);

    ....

  sprintf(sOut, "%d,%s", light, deblank(sTemp));

  Serial.println(sOut);
  delay(1000);
 

Node.js and Cassandra

Parsing the messages that come from the measuring devices is pretty repetitive stuff that causes pretty ugly code. I've learned that the hard way. To make parsing of this messages as easy as possible I've written a small utility package for parsing the messages that come from the measuring devices and it's available on npm.

Using serial ports in node.js doesn't take a lot of steps to setup:

  var serial = require( "serialport" );
  var SerialPort = serial.SerialPort;

  var portName = "/dev/tty.usb-something";

  var sp = new SerialPort(portName, {
      baudrate:9600,
      parser:serial.parsers.readline("\n")
  });

  sp.on("data", function ( data ) {
  var arduinoData = translator.parse(data);
  //...
 

To make the data handling easier and more in accordance with cassandra best practices the readings will be partitioned by date when they were recorded.

  CREATE TABLE room_data (
    day text,
    measurementtime timestamp,
    light int,
    temperature float,
    PRIMARY KEY (day, measurementtime)
  ) WITH CLUSTERING ORDER BY (measurementtime DESC);
 

Also the data will probably be more often fetched for recent time stamps with queries that have limits set on them. To make this fetching easier we've added a clustering statement above. Also to get the current light and temperature level we would just have to run the following query (no where combined with now function):

  SELECT * FROM room_data LIMIT 1;
 

After setting up the cassandra and reading the data from the serial port and parsing the data it's time to write this data into the cassandra. Analyzing the data and doing something useful with it will be in some future posts that I'll make but for now I'll stop with writing the data into cassandra:

  client.execute('INSERT INTO room_data ' + 
   '(day, measurementtime, light, temperature)' + 
   ' VALUES (?, dateof(now()), ?, ?)',
   [
    moment().format('YYYY-MM-DD'),
    arduinoData.light,
    arduinoData.temperature
   ],
   function(err, result) {
    if (err) {
     console.log('insert failed', err);
    }
   }
  );
 

On the fifth line I've used moment.js to format current time into string representation of current date used for partitioning in cassandra. The rest of the code is pretty much the usual sql stuff found in other database environments.

I recorder couple of hours worth of data here. Just in case anybody wants a sneak peak without having to setup everything up. I've exported the data out from cassandra trought cql using this command:

  COPY room_data (day, measurementtime, light, temperature) 
   TO 'room_data.csv';
 

The rest of the example is located on gitHub.

Replace a dead node in Cassandra

Note (June 2020): this article is old and not really revelant anymore. If you use a modern version of cassandra, look at -Dcassandra.replace_address_first_boot option !

I want to share some tips about my experimentations with Cassandra (version 2.0.x).

I found some documentations on datastax website about replacing a dead node, but it is not suitable for our needs, because in case of hardware crash, we will set up a new node with exactly the same IP (replace “in place”). Update : the documentation in now up to date on datastax !

If you try to start the new node with the same IP, cassandra doesn’t start with :

java.lang.RuntimeException: A node with address /10.20.10.2 already exists, cancelling join. Use cassandra.replace_address if you want to replace this node.

So, we need to use the “cassandra.replace_address” directive (which is not really documented ? :() See this commit and this bug report, available since 1.2.11/2.0.0, it’s an easier solution and it works.

+    - New replace_address to supplant the (now removed) replace_token and
+      replace_node workflows to replace a dead node in place.  Works like the
+      old options, but takes the IP address of the node to be replaced.

It’s a JVM directive, so we can add it at the end of /etc/cassandra/cassandra-env.sh (debian package), for example:

JVM_OPTS="$JVM_OPTS -Dcassandra.replace_address=10.20.10.2" 

Of course, 10.20.10.2 = ip of your dead/new node.

Now, start cassandra, and in logs you will see :

INFO [main] 2014-03-10 14:58:17,804 StorageService.java (line 941) JOINING: schema complete, ready to bootstrap
INFO [main] 2014-03-10 14:58:17,805 StorageService.java (line 941) JOINING: waiting for pending range calculation
INFO [main] 2014-03-10 14:58:17,805 StorageService.java (line 941) JOINING: calculation complete, ready to bootstrap
INFO [main] 2014-03-10 14:58:17,805 StorageService.java (line 941) JOINING: Replacing a node with token(s): [...]
[...]
INFO [main] 2014-03-10 14:58:17,844 StorageService.java (line 941) JOINING: Starting to bootstrap...
INFO [main] 2014-03-10 14:58:18,551 StreamResultFuture.java (line 82) [Stream #effef960-6efe-11e3-9a75-3f94ec5476e9] Executing streaming plan for Bootstrap

Node is in boostraping mode and will retrieve data from cluster. This may take lots of time.
If the node is a seed node, a warning will indicate that the node did not auto bootstrap. This is normal, you need to run a nodetool repair on the node.

On the new node :

# nodetools netstats

Mode: JOINING
Bootstrap effef960-6efe-11e3-9a75-3f94ec5476e9
    /10.20.10.1
        Receiving 102 files, 17467071157 bytes total
[...]

After some time, you will see some informations on logs !
On the new node :

 INFO [STREAM-IN-/10.20.10.1] 2014-03-10 15:15:40,363 StreamResultFuture.java (line 215) [Stream #effef960-6efe-11e3-9a75-3f94ec5476e9] All sessions completed
 INFO [main] 2014-03-10 15:15:40,366 StorageService.java (line 970) Bootstrap completed! for the tokens [...]
[...]
 INFO [main] 2014-03-10 15:15:40,412 StorageService.java (line 1371) Node /10.20.10.2 state jump to normal
 WARN [main] 2014-03-10 15:15:40,413 StorageService.java (line 1378) Not updating token metadata for /10.20.30.51 because I am replacing it
 INFO [main] 2014-03-10 15:15:40,419 StorageService.java (line 821) Startup completed! Now serving reads.

And on other nodes :

 INFO [GossipStage:1] 2014-03-10 15:15:40,625 StorageService.java (line 1371) Node /10.20.10.2 state jump to normal

Et voilà, dead node has been replaced !
Don’t forget to REMOVE modifications on cassandra-env.sh after the complete bootstrap !

Enjoy !

Hello Cassandra in node.js

Intro

Since I started to work in a team that deals with BigData stuff I came into contact with Apache Cassandra. After years in the relational world it took me some getting used to the many concepts that the Cassandra relies on. Actually in the relational world the concepts would be heavy anti patterns. I went over a couple of tutorials etc. for intro into the Cassandra data model I would recommend this video by Patrick McFadin:

C* Summit 2013: The World's Next Top Data Model


Basic setup

The easiest way to get the Cassandra is to download it from here: http://planetcassandra.org/Download/StartDownload

I somehow dislike when various applications write to /var/something and having to use the root access to install something unless it's absolutely necessary. So I followed this manual to avoid this problem.


cassandra.yaml

The Cassandra is setup out of the box to support queries coming from cql shell ("cqlsh"). The goal of this blog entry is to show how to make a simple connection from node.js to the Cassandra, so there is a bit of tweaking that has to be done in order to get all this working. The necessary configuration is located in this file:

 install_dir/conf/cassandra.yaml
The properties I had to change were (basically this allows logging in with users other than default):
 authenticator: PasswordAuthenticator
 authorizer: CassandraAuthorizer
After that going into bin directory and running cqlsh will require username & password
 ./cqlsh -u cassandra -p cassandra

Cassandra keyspace setup

 CREATE KEYSPACE test WITH REPLICATION = 
  {'class': 'SimpleStrategy', 'replication_factor': 1};

 --check if it's created with this
 DESCRIBE KEYSPACES;

 USE test;

 CREATE TABLE test_table (
  id text,
  test_value text,
  PRIMARY KEY (id)
 );

 INSERT INTO test_table (id, test_value) VALUES ('1', 'a');

 INSERT INTO test_table (id, test_value) VALUES ('2', 'b');

 INSERT INTO test_table (id, test_value) VALUES ('3', 'c');

 SELECT * FROM test_table;
 
If everything is o.k. you should see something like:
  id  | test_value
 ----+------------
   3 |          c
   2 |          b
   1 |          a

 (3 rows)
Add a testuser to make the hello world example work:
  create user testuser with password 'testuser';

  grant all on test.test_table to testuser;

node-cassandra-cql

I tried several Cassandra connection libraries from gitHub for the node.js and the one that I found most easy to work with (and setup) was node-cassandra-cql by jorgebay. The story with the project is pretty much standard. Going into new project empty directory and initializing it with init and then installing module with npm.

 npm init

 npm install node-cassandra-cql

 #copy hellocassandra.js from
 #https://github.com/msval/hellocassandrainnodejs

 node hellocassandra.js
 

Anyway here's my example on gitHub.