This guest blog was penned by Jay Kreps, CEO at Confluent.
Apache Kafka is a system you increasingly see as a central element of a company’s big data stack. As one of the original authors of Kafka, I’ll use the next few blog posts to discuss why it was made, what it is for, and how to put it to use for capturing and processing real-time data streams.
Kafka is actually not a new system; it was originally built by a small team of people (myself and two of my co-founders at Confluent) at LinkedIn starting in 2009 and has been under continuous development since – growing to include contributions from hundreds of engineers at dozens of companies. Building a new data system from the ground up is a massive effort, so let me start by describing the problems that Kafka was designed to solve.
Experience at LinkedIn
In 2009, the team I led was focused on Hadoop adoption and getting a complete, high-quality representation of our business in HDFS for processing and analysis. The term “enterprise data hub” wasn’t around yet, but, in effect, that is what we were putting in place. As a data-driven business we thought that being able to produce a complete representation of LinkedIn’s data in a system that supported scalable processing would be a bet that was sure to pay off. This turned out to be correct, and Hadoop succeeded as a platform at LinkedIn beyond even our ambitious expectations.
However, getting a complete copy of our data in Hadoop proved to be harder than we expected. There were several core challenges:
- We ran a variety of data systems, each of which needed special integration to pull data into Hadoop
- The data we wanted to represent was quite large, representing the full data and traffic of a consumer-scale website.
We attacked this problem with vigor, working to get logs of events, database data, and various other data sources copied into HDFS so we could make use of them for processing.
We got enough data in place to build a series of highly successful data products, including some popular site features. We had Hadoop jobs to score the strength of connections, predict people you might know, compute the similarity between users, recommend jobs, discover new skills, analyze advertising data, and run all manner of internal analytics.
However, all was not well. We found that once we had reliable data in HDFS, building products on top of it was not too hard but the process of getting data was taking up almost all our time. We were doing manual work to configure data loads, each of which would arrive in some loosely typed text format. Then we would configure mappings for that data into appropriate Hive tables and an efficient HDFS format (this is before Parquet or ORC so we were using something custom). It sounded simple enough, and we did get this working well enough to get a handful of our major data sets available and regularly updated, but that was when the problems started.
Even pushing very hard for many months, we found that we had only captured a small percentage of the overall business in Hadoop, and even keeping this limited subset flowing smoothly was stretching us too thin.
Any business that moves fast will generate data sets that evolve quickly. And analytics, by their very nature, bring together lots of data from different corners of the organization. This meant that any given process might depend on dozens of data sources, and a glitch in any of these integrations or any change in the format or schema of incoming data (a lot of which was loosely typed CSVs or other text formats) would regularly bring our Hadoop processing to its knees.
Additionally, this problem was only getting worse the more we worked on it. LinkedIn was simultaneously adopting a wider variety of data systems, from NoSQL databases to monitoring systems, and each needed special data piping. Increasingly, the types of analytics and processing we were doing was not focused on what was in our database tables, but rather on what had happened — the events and logs of activities that contained the raw data about what our users and customers were doing second-by-second throughout the day. This event data was much larger, and required horizontally scalable piping just to transport it.
Being engineers we decided that the core problem with our approach to data ingestion was that the level of manual effort was O(N) in the number of data sources. By this we meant that if we doubled the number of Hadoop tables we ingested, we would roughly double the amount of work, and double the number of people involved in keeping it all running. Our Hadoop team found itself at the tail end of a large, growing, fast-moving organization, and any change or new system upstream would create more work downstream for the Hadoop team. For us, the solution wasn’t just to optimize some part of how we were doing this but rather to move to a model that decoupled our efforts from the number of different systems that we integrated with.
In short, we decided to rethink how we handled data movement. We realized that data flow was not a problem that was particular to our warehouse environment (Hadoop and a relational data warehouse); it was common across all the company’s applications and systems.
What we saw was that data throughout the organization moved in a variety of ways and each process for moving data was fragile, hard to scale, often lossy, and poorly monitored.
Although each of these processes were broken in different ways, the requirements for each were actually pretty simple. When any data was generated anywhere in the organization, we needed to be able to propagate this change out to each system that needed it. Some of these systems would be batch systems that would pull data only periodically, but others would be real-time systems that would continually ingest data.
A Better Approach
As we started to think about this problem, we realized that the solution had long been known to distributed system designers – it just hadn’t been applied in this way. Distributed systems internally have the problem of synchronizing data between nodes. The basic abstraction distributed systems often use is a log of requests that is used to authoritatively record and broadcast out an ordered stream of updates for multiple replicas within the system.
Systems have long used this kind of a “log” abstraction as a mechanism for synchronizing data, from traditional relational databases to newer distributed systems like HBase, Google’s Spanner, and Yahoos PNuts all have a log abstraction internally they use to capture and distribute data changes.
Our idea was to apply this mechanism not as an implementation detail of a distributed database, but rather to conceive our whole complex mess of systems and applications as one big distributed system in its own right and use the same technique at the datacenter level to synchronize.
This may sound esoteric but the concept is actually straightforward. This representation of data is not unlike a Twitter feed that represents each data system, application, or other data generating process as a kind of persistent stream of changes. These changes aren’t tweets of course, but rather occurrences in the business, database updates, or other data generating processes. Like Twitter, each of these feeds progresses in real-time and can fan out to any number of subscribers that want to consume the updates.
Enter Apache Kafka
To implement this idea, we started to develop a piece of infrastructure with the codename “Kafka.” The name came from the fact that our system would be keeping a log or “journal” of changes, so we thought a writer’s name would be appropriate. Plus anyone who has experienced the reality of large-scale data integration knows that it is nothing if not Kafkaesque. Kafka implements this log abstraction at large scale. A Kafka cluster can accept tens of millions of writes per-second and persist them for days, months, or indefinitely. Each write is replicated over multiple nodes for fault-tolerance and a Kafka cluster can support thousands of concurrent readers and writers. This journal acts as the subscription mechanism for other systems to feed off of. The Kafka cluster can expand dynamically to handle greater load or retain more data. This log abstraction allows systems to synchronize themselves off the central stream at a rate they control — they may do so in real-time or in periodic batch loads.
This usage scaled at LinkedIn until Kafka became the record of virtually everything happening in the company — every click, service call, profile update, and application restart is recorded in Kafka. This totaled to over 800 billion events per day, with 175TB of daily writes and over 650 TB of reads (since each write fans out to multiple readers).
After the initial deployment at LinkedIn, we open sourced Kafka, originally as a Github project and then as an Apache project. We continued to work on hardening it and the usage grew across hundreds of companies in virtually every industry. A few of these have publically described their usage here.
In my next post, I will describe how Kafka evolved to handle stream processing use cases and what some of the other common use cases are.
For more information on how to get started with Kafka and Cloudera, read “How to Deploy and Configure Apache Kafka in Cloudera Enterprise” or download Apache Kafka now.
Jay Kreps is the CEO of Confluent, a company that provides a platform for streaming data built around Apache Kafka. He is one of the original creators of Apache Kafka as well as several other popular open source projects. He was formerly the lead architect for data infrastructure at LinkedIn.