Gravity Tech: Centralized logging (and more) with Apache Kafka

With numerous large enterprise customers (and several smaller ones), recommendation engines in the Gravity SaaS infrastructure generate around a terabyte of application logs a day, scattered on more than fifty hosts. This counts as moderate in today’s “big data” world, but it already exceeds the size where naive approaches to log aggregation and processing (like copy scripts to a central host or grepping on individual hosts) would work effectively: what you would like to do is see and process all logs at once, near real-time. Generally speaking, this is a distributed system, hence it needs a messaging solution.

While messaging systems abound (see queues.io for a long list), Apache Kafka is a messaging system designed specifically for log transmission. Apart from the human-readable text lines emitted by applications for debugging purposes (commonly referred to as application logs), the word “log” here refers to any stream of events or updates; for instance user activity streams or snapshots of application metrics. Thus, when Gravity developers were looking for a log aggregation solution, Kafka was a natural choice.

Kafka basics

Kafka has a simple publish-subscribe architecture: producers generate messages and send them to the Kafka cluster comprised by several servers (called brokers in messaging terminology), where messages are stored. Consumers connect to the cluster and read messages; consumption of a message does not delete it from the server, so multiple consumers can get the same message. This allows for treating online and offline processing uniformly. Kafka servers share information via a Zookeeper cluster.

The Kafka server application is written in Scala, a JVM language, and so are the primary implementations of a producer and a consumer. Apart from those, there are producer and consumer implementations in many other languages, see the clients list on the Kafka site.

kafka-1-cluster

Apache Kafka is named after a writer because it is optimized for writing. It is essentially a linear storage: one partition of messages in Kafka is simply a linearly ordered sequence, where messages are identified by their index (called offset in Kafka terms). All the data in a Kafka cluster is the disjoint union of such partitions. Incoming messages are written at the end of a partition, and reads are typically sequential. Messages are written to a file immediately on their arrival (but buffers are not flushed), thus Kafka fully avoids cache management and the troubles it brings. Durability is provided by replicating messages to different brokers rather than immediate persistence to disk. Caching is left to the operating system, which is much more adept at file caching than any JVM application could possibly be. Also, linear reads and writes are very fast on hard disk drives.

Another essential element in Kafka’s design is that no consumer state is kept on the brokers: it is the responsibility of consumers to provide an offset they start reading at. There is some support in consumer client libraries to use Zookeeper for offset storage and consumer coordination — this is still a changing area with a new consumer library being designed for the 0.9 release.

With this setup, Kafka achieves very high throughput. There are a handful of benchmarks available, from Kafka authors (at the Kafka site and on LinkedIn’s engineering blog) as well as from other sources, with slightly different outcomes, but results suggest that you can write at least 50 MB of messages per second with several producers and a single broker on commodity hardware.

Note, however, that in order to achieve this high throughput, Kafka trades delivery guarantees for performance. Delivery guarantees in Kafka are fairly weak: for instance, if you write a message to Kafka and get back an error, you cannot be sure whether the write has actually happened or not. Also, a consumer that has failed and is restarted will typically reread a few messages it has already consumed. As long as Kafka brokers do not fail we have at least one delivery. If Kafka brokers start to fail, that is no longer true and in a worst case scenario some messages may get lost.

Thus, with Kafka it is best to design your consumers to be idempotent: reading the same message for a second time should not change the consumer-side state. As for at least once delivery, keep in mind that Kafka is designed for logs: losing a few log messages is certainly not the end of the world. On the other hand, Kafka has configuration parameters to tune delivery guarantees: you can set the number of brokers a partition is replicated on and the number of broker receipt acknowledgments. Increasing these numbers will keep messages available even in the face of broker failures.

If your use case does not allow losing any of the messages, you need a custom mechanism to ensure delivery: for instance, LinkedIn operates a dedicated audit topic to guarantee that the number of messages produced and consumed are equal. Plans are that there will be more of these delivery guarantees around version 1.0, see Jay Kreps’ talk about the future of Kafka, but it seems no audit trail will be by default included in Kafka.

Kafka@Gravity

Kafka has been operating at Gravity for over a year now, and it has proved to be a useful and stable element in the infrastructure. We started to use it for log aggregation, but it gradually found new uses, according to its original purpose, as the middleware for sending streams of events from application servers to backend services. These include business data as well as metrics: for instance, logs from Nginx servers are sent to Kafka and consumed by processes that parse and insert them to a time series database, which in turn feeds Grafana dashboards to display response times. With the upcoming release of Reco, the Gravity solution for smaller e-commerce businesses, Kafka is becoming increasingly important, since Reco recommendation engines run somewhere in a cloud rather than on predetermined, dedicated hardware from where you could easily fetch logs. Gravity’s Kafka clusters are comprised by two or three Kafka brokers with twofold replication and onefold receipt acknowledgment: even this minimally safe setup proved to be reliable.

Kafka is production ready at the current version 0.8.2, but, as always, there are some features that we are missing. First, there is currently very little done on the producer side to prepare for situations when connectivity to the broker fails: once an in-memory queue fills up, messages simply get dropped. There has been an issue about in the Kafka JIRA for three and a half years now, without any plans to fix it. We developed our own Log4j appender to write logs to local files if brokers are permanently unavailable, but that is clearly no optimal solution. Secondly, Kafka has no timestamp abstraction. That would be convenient for storing log and event messages, as most of them do contain a timestamp: often, consumers would rather specify a time interval than an offset interval to consume from. There is some support now for getting an offset based on a timestamp, but that is hopelessly broken; the real thing, a time based segment index, is not there.

As for performance, we wanted to have more log channels, hence more partitions on one broker, than available benchmarks exhibit (on the order of thousands), so we ran a small test to check how throughput changes as a function of the number of partitions. The result is shown below with a fully saturated producer:

kafka-3-partitions

We see that the throughput decreases slightly as the number of partitions grow until 400, and after that the behaviour becomes rather unpredictable. A similar pattern emerges when using more producers. The reason for this is that Kafka keeps a separate file open for each partition, and when a lot of partitions are being written simultaneously, HDD writes are not actually linear anymore. Thus, one of the factors yielding the high throughput, linearity, is missing. Whenever this limit is hit, new brokers have to be added.

confluent.io

Kafka started as a project at LinkedIn; it was open sourced in 2011 and brought to graduation as an Apache project. In 2014, the core Kafka developers founded a new company, Confluent, to commercialize the ideas around Kafka. By now, they offer a stream data platform that aims to help make all data in a company available as realtime streams. Using Kafka as a backbone, the Confluent platform includes a REST proxy, tools to help (de)serialization at the sending and receiving ends with Avro, and the Camus application to consume data from Kafka and write it to HDFS. It would definitely be worth trying at some point.

To sum up, a log aggregation solution is inevitable for distributed event processing systems. Kafka is a stable and performant solution to this specific problem; it is not a general purpose messaging system. Although it is already widely used in large production setups, it is still evolving. When applying it to particular use cases in an enterprise environment, you might miss a few features.

Share this post: