Friday, October 17, 2014

Kafka Part 1

Just a blog post to document some things I learned as I started to play with Kafka.  Hopefully others are doing similar work and can google-able benefit ...

I wanted trying to repeat (as a starting point) the performance blog post on Kafka.  Here are the issues I ran into and how I fixed them.

I first started to play with Kafka on a simple instance and choose m3.medium.  While this gave me a place to play with Kafka functionally if I was going to push 75M/sec to the Kafka logs, given the on-instance storage is 4GB, I was going to run out of storage in 4*1024/75 = 55 seconds.  I decided to move up to i2.xlarge which gives me 800G which is 800*1024/75 = 11000 seconds = 3 hours.

After I got basic functional testing, I started to look for the performance testing tools.  The fore-mentioned blog shows a new performance test client available in > 0.8.1 Kafka.  Unfortunately 0.8.2 isn't available pre-compiled yet and the performance client uses the new Java client.  So off I went to compile Kafka from source (using trunk).

Kafka builds using gradle -- something I'm used to.  Weirdly, Kafka doesn't have the gradle wrapper checked in.  This means you have to pay attention to the readme which says clearly you need to install gradle separately and run gradle once to get the gradle wrapper.  Unfortunately, I tried to "fix" the lack of a wrapper without reading the docs (doh!).  I just added another version of gradle wrapper to kafka/gradle/wrapper.  This let me build, but I immediately, got the error:

Execution failed for task ':core:compileScala'.
> com.typesafe.zinc.Setup.create(Lcom/typesafe/zinc/ScalaLocation;Lcom/typesafe/zinc/SbtJars;Ljava/io/File;)Lcom/typesafe/zinc/Setup;

Running gradle with --info showed me that there was a class mismatch error launching the zinc plugin.  I was able to get rid of the error by changing the dependency in build.gradle for the latest zinc compiler.  Doing this made me think if there was a dependency issue between gradle and the zinc plugin.  Once I realized this, I re-read the readme where it asks you to install gradle first and then run gradle once to get the correct wrapper.  Oops.  After following the directions (TM), trunk compiles worked.

I then did some basic single host testing and I was up and running on Kafka trunk.  I then tried to create multiple servers and separated the client from the server and created replicated topics.  This fails horribly if you have issues with hostnames.  Specifically, you will see upon creating a replicated topic the following error hundreds of times before a server crashes and then the next will do the same thing.


[ReplicaFetcherThread-0-0], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 4; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [my-replicated-topic,0] -> PartitionFetchInfo(0,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.server.ReplicaFetcherThread)

Upon looking at the more detailed logs, I noticed it was trying to connect to the other servers by the hostname they knew themselves (vs. DNS).  I'm guessing that the server upon startup does a inetaddress.getlocalhost().gethostname() and then registers that name in Zookeeper.  When the other servers then try to connect unless that hostname is resolvable (mine wasn't), it just won't work.  You'll see similar issues with the client to server connection saying certain servers aren't resolvable.  For now, I have solved this by adding the hostname mapping to ip address in all clients and servers /etc/hosts.  I'll certainly need to see how this can be fixed automatically without the need for DNS (hopefully leveraging Eureka discovery eventually).

The fore-mentioned blog says there was no tuning done addition to "out of the box", but upon inspection of the server.properties, it seems like there are either changes that were trivial or have changed in trunk since blog was written.  Something to come back to eventually, but for not I decided to just use the out-of-box so I could get a baseline and then start tweaking.

I was then able to spin up five c3.xlarge instances as clients which should have enough CPU to push the servers.

I was able to get the following results:

Single producer thread, no replication - 657431 records/sec @ 62.70 MB/sec.

This is 80% of the original blog.  I'm guessing I'll be able to tune a bit to get there as I've seen initial peaks hit 710K.  First performance results are always low or wrong.

Then I see Three producer threads, no replication:

client 1 - 679412 records/sec @ 64.79 MB/sec
client 2 - 678398 records/sec @ 64.70 MB/sec
client 3 - 678150 records/sec @ 64.67 MB/sec
total ~ 2035000 records/sec @ 94MB/sec

This is about 3X the previous result as expected.  I also did vmstat and netstat -a -n |grep 9092 to confirm what was going on to help me understand the clustering technology.

At five clients, things dropped off in an unexpected way.  I've hit some bottleneck.  I basically get lower results per client that in total add up to the 3 node result.

I have also played with replicated topics, but won't post those numbers yet.

Things I'm thinking I need to investigate (in order of priority):

1.  Networking speed.  It looks like I am limited by network (or disk, but doubt it) at this point.  I am new to Amazon, so when the table says moderate network performance, I'm not sure how much that means.  I'm more used to 100M or 1G, etc.

2.  Enhanced networking performance.  While I launched HVM images, I'm not sure yet in the base image I used has the additional HVM drivers.

3.  Tuning of the settings for Kafka.

4.  Java tuning.

5.  Different instance types.  HS1 would give me a way to split the traffic between volumes.

6.  Kafka code.

All this said, I need to also play with message sizes (larger and mixed) and look at some of the expected application access patterns - expanding into consumers as well.

More in part 2 coming soon.