kafka

Documentation

https://kafka.apache.org/documentation/

1. GETTING STARTED

Apache Kafka™ is a distributed streaming platform. What exactly does that mean?

We think of a streaming platform as having three key capabilities:

  1. It lets you publish and subscribe to streams of records. In this respect it is similar to a message queue or enterprise messaging system.
  2. It lets you store streams of records in a fault-tolerant way.
  3. It lets you process streams of records as they occur .

TODO fault-tolerant 在这里指什么?

TODO process streams of records as they occur 和 message queue 的区别在什么地方?

What is Kafka good for?

It gets used for two broad classes of application:

  • Building real-time streaming data pipelines that reliably get data between systems or applications
  • Building real-time streaming applications that transform or react to the streams of data

TODO 还是不懂 streaming applications 和 pipeline 的区别点。

To understand how Kafka does these things, let’s dive in and explore Kafka’s capabilities from the bottom up.

First a few concepts:

  • Kafka is run as a cluster on one or more servers.
  • The Kafka cluster stores streams of records in categories called topics .
  • Each record consists of a key, a value, and a timestamp .

Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams .
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems . For example, a connector to a relational database might capture every change to a table.

In Kafka the communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. This protocol is versioned and maintains backwards compatibility with older version. We provide a Java client for Kafka, but clients are available in many languages.

Topics and Logs

Let’s first dive into the core abstraction Kafka provides for a stream of records — the topic.

A topic is a category or feed name to which records are published. Topics in Kafka are always multi-subscriber ; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

# TODO multi-subscriber 中,谁来负责最终删除 topic 中已消费的内容?

For each topic, the Kafka cluster maintains a partitioned log that looks like this:

https://kafka.apache.org/0110/images/log_anatomy.png

Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log . The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

The Kafka cluster retains all published records—whether or not they have been consumed— using a configurable retention period . For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so storing data for a long time is not a problem .

https://kafka.apache.org/0110/images/log_consumer.png

In fact, the only metadata retained on a per-consumer basis is the offset or position of that consumer in the log . This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from “now”.

This combination of features means that Kafka consumers are very cheap—they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to “tail” the contents of any topic without changing what is consumed by any existing consumers.

The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism — more on that in a bit.

# TODO 那么 offset 也需要多个并行的 partitions 之间保持一致咯?

Distribution

The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.

Each partition has one server which acts as the “leader” and zero or more servers which act as “followers”. The leader handles all read and write requests for the partition while the followers passively replicate the leader . If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster .

Producers

Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic . This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!

# TODO 惊呆!partition 对 producer 居然不是透明的。

Consumers

Consumers label themselves with a consumer group name , and each record published to a topic is delivered to one consumer instance within each subscribing consumer group . Consumer instances can be in separate processes or on separate machines.

# TODO 可以认为和 topic 绑定的,不是单个 consumer instance ,而是 consumer group 吗?

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

# TODO delivered to one consumer instance ,是说同一个 consumer group 中只有一个 instance 会被分发记录吗?但是下面又 broadcast 了是怎么回事?

# TODO 为什么不精准的投递给和 consumer group 对应的单个 consumer instance 上,而要 broadcast? 分发 record 的地方没有识别 consumer group 的能力吗?

https://kafka.apache.org/0110/images/consumer-groups.png

# TODO 这张图也不懂呀

A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.

More commonly, however, we have found that topics have a small number of consumer groups, one for each “logical subscriber”. Each group is composed of many consumer instances for scalability and fault tolerance. This is nothing more than publish-subscribe semantics where the subscriber is a cluster of consumers instead of a single process.

The way consumption is implemented in Kafka is by dividing up the partitions in the log over the consumer instances so that each instance is the exclusive consumer of a “fair share” of partitions at any point in time. This process of maintaining membership in the group is handled by the Kafka protocol dynamically. If new instances join the group they will take over some partitions from other members of the group; if an instance dies, its partitions will be distributed to the remaining instances.

Kafka only provides a total order over records within a partition, not between different partitions in a topic . Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over records this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.

Guarantees

At a high-level Kafka gives the following guarantees:

  • Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a record M1 is sent by the same producer as a record M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
  • A consumer instance sees records in the order they are stored in the log.
  • For a topic with replication factor N, we will tolerate up to N-1 server failures without losing any records committed to the log.

More details on these guarantees are given in the design section of the documentation.

Kafka as a Messaging System

How does Kafka’s notion of streams compare to a traditional enterprise messaging system?

Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. Each of these two models has a strength and a weakness. The strength of queuing is that it allows you to divide up the processing of data over multiple consumer instances, which lets you scale your processing. Unfortunately, queues aren’t multi-subscriber—once one process reads the data it’s gone. Publish-subscribe allows you broadcast data to multiple processes, but has no way of scaling processing since every message goes to every subscriber.

The consumer group concept in Kafka generalizes these two concepts. As with a queue the consumer group allows you to divide up processing over a collection of processes (the members of the consumer group). As with publish-subscribe, Kafka allows you to broadcast messages to multiple consumer groups.

The advantage of Kafka’s model is that every topic has both these properties—it can scale processing and is also multi-subscriber—there is no need to choose one or the other.

Kafka has stronger ordering guarantees than a traditional messaging system, too.

A traditional queue retains records in-order on the server, and if multiple consumers consume from the queue then the server hands out records in the order they are stored. However, although the server hands out records in order, the records are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the records is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of “exclusive consumer” that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing.

Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group . By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances in a consumer group than partitions.

# TODO 不懂呀,不懂呀 😓

Kafka as a Storage System

Any message queue that allows publishing messages decoupled from consuming them is effectively acting as a storage system for the in-flight messages. What is different about Kafka is that it is a very good storage system.

Data written to Kafka is written to disk and replicated for fault-tolerance. Kafka allows producers to wait on acknowledgement so that a write isn’t considered complete until it is fully replicated and guaranteed to persist even if the server written to fails.

The disk structures Kafka uses scale well—Kafka will perform the same whether you have 50 KB or 50 TB of persistent data on the server.

As a result of taking storage seriously and allowing the clients to control their read position, you can think of Kafka as a kind of special purpose distributed filesystem dedicated to high-performance, low-latency commit log storage, replication, and propagation.

Kafka for Stream Processing

It isn’t enough to just read, write, and store streams of data, the purpose is to enable real-time processing of streams.

In Kafka a stream processor is anything that takes continual streams of data from input topics, performs some processing on this input, and produces continual streams of data to output topics.

For example, a retail application might take in input streams of sales and shipments, and output a stream of reorders and price adjustments computed off this data.

It is possible to do simple processing directly using the producer and consumer APIs. However for more complex transformations Kafka provides a fully integrated Streams API. This allows building applications that do non-trivial processing that compute aggregations off of streams or join streams together.

This facility helps solve the hard problems this type of application faces: handling out-of-order data, reprocessing input as code changes, performing stateful computations , etc.

The streams API builds on the core primitives Kafka provides: it uses the producer and consumer APIs for input, uses Kafka for stateful storage, and uses the same group mechanism for fault tolerance among the stream processor instances.

Putting the Pieces Together

This combination of messaging, storage, and stream processing may seem unusual but it is essential to Kafka’s role as a streaming platform.

A distributed file system like HDFS allows storing static files for batch processing. Effectively a system like this allows storing and processing historical data from the past.

A traditional enterprise messaging system allows processing future messages that will arrive after you subscribe. Applications built in this way process future data as it arrives.

Kafka combines both of these capabilities, and the combination is critical both for Kafka usage as a platform for streaming applications as well as for streaming data pipelines.

By combining storage and low-latency subscriptions, streaming applications can treat both past and future data the same way. That is a single application can process historical, stored data but rather than ending when it reaches the last record it can keep processing as future data arrives. This is a generalized notion of stream processing that subsumes batch processing as well as message-driven applications.

Likewise for streaming data pipelines the combination of subscription to real-time events make it possible to use Kafka for very low-latency pipelines; but the ability to store data reliably make it possible to use it for critical data where the delivery of data must be guaranteed or for integration with offline systems that load data only periodically or may go down for extended periods of time for maintenance. The stream processing facilities make it possible to transform data as it arrives.

For more information on the guarantees, apis, and capabilities Kafka provides see the rest of the documentation.

What are the differences between Apache Kafka and RabbitMQ?

https://www.quora.com/What-are-the-differences-between-Apache-Kafka-and-RabbitMQ

by Stuart Charlton, Pivotal Software

(Updated May 2017 - it’s been 4.5 years!)

Kafka is a general purpose message broker, like RabbItMQ, with similar distributed deployment goals, but with very different assumptions on message model semantics. I would be skeptical of the “AMQP is more mature” argument and look at the facts of how either solution solves your problem.

TL;DR,

  1. Use Kafka if you have a fire hose of events (20k+/sec per producer) you need delivered in partitioned order ‘at least once’ with a mix of online and batch consumers, but most importantly you’re OK with your consumers managing the state of your “cursor” on the Kafka topic .

Kafka’s main superpower is that it is less like a queue system and more like a circular buffer that scales as much as your disk on your cluster, and thus allows you to be able to re-read messages.

  1. Use Rabbit if you have messages (20k+/sec per queue) that need to be routed in complex ways to consumers, you want per-message delivery guarantees, you need one or more features of protocols like AMQP 0.9.1, 1.0, MQTT, or STOMP, and you want the broker to manage that state of which consumer has been delivered which message .

TODO 这里的意思是说 kafka 不能提供 per-message delivery guarantees 吗?

RabbitMQ’s main superpowers are that it’s a scalable, high performance queue system with well-defined consistency rules , and ability to create interesting exchange topologies .

Neither offers “filter/processing” capabilities - if you need that, consider using a data flow or stream processing framework - there are many: Apache Beam (which is an abstraction on top of Google Dataflow, Flink, Spark, or Apex), Storm, NiFi, direct use of Apex, Flink, or Spark or Spring Cloud Data Flow on top of one of these solutions to add computation, filtering, querying, on your streams. You may also want to use something like Apache Cassandra or Geode or Ignite as your queryable stream cache.

Kafka traditionally hasn’t offered transactional semantics in its writes, though this is changing in 0.11.

TODO 什么是 transactional semantics

Pivotal has recently published a reasonably fair post on when to use RabbitMQ or Kafka, which I provided some input into. Pivotal is the owner of RabbitMQ but is also a fan of using the right tool for the job, and encouraging open source innovation … and thus is a fan of Kafka!

Details:

Firstly, on RabbitMQ vs. Kafka. They are both excellent solutions, RabbitMQ being more mature, but both have very different design philosophies. Fundamentally, I’d say RabbitMQ is broker-centric, focused around delivery guarantees between producers and consumers, with transient preferred over durable messages . Whereas Kafka is producer-centric, based around partitioning a fire hose of event data into durable message brokers with cursors, supporting batch consumers that may be offline, or online consumers that want messages at low latency.

RabbitMQ uses the broker itself to maintain state of what’s consumed (via message acknowledgements) - it uses Erlang’s Mnesia to maintain delivery state around the broker cluster. Kafka doesn’t have message acknowledgements , it assumes the consumer tracks of what’s been consumed so far. Kafka brokers use Zookeeper to reliably maintain their state across a cluster.

RabbitMQ presumes that consumers are mostly online, and any messages “in wait” (persistent or not) are held opaquely (i.e. no cursor). Kafka was based from the beginning around both online and batch consumers, and also has producer message batching - it’s designed for holding and distributing large volumes of messages.

TODO batch 操作中,如果出现部分错误,如果保持一致性?

RabbitMQ provides rich routing capabilities with AMQP 0.9.1’s exchange, binding and queuing model . Kafka has a very simple routing approach - in AMQP parlance it uses topic exchanges only.

TODO AMQP, binding and queuing model

Both solutions run as distributed clusters, but RabbitMQ’s philosophy is to make the cluster transparent, as if it were a virtual broker. Kafka makes partitions explicit , by forcing the producer to know it is partitioning a topic’s messages across several nodes., this has the benefit of preserving ordered delivery within a partition .

TODO 怎么保证 order

RabbitMQ ensures queued messages are stored in published order even in the face of requeues or channel closure. One can setup a similar topology & order delivery to Kafka using the consistent hash exchange or sharding plugin., or even more interesting topologies.

Put another way, Kafka presumes that producers generate a massive stream of events on their own timetable - there’s no room for throttling producers because consumers are slow, since the data is too massive. The whole job of Kafka is to provide the “shock absorber” between the flood of events and those who want to consume them in their own way – some online, others offline - only batch consuming on an hourly or even daily basis.

Performance-wise, both are excellent performers, but have major architectural differences. RabbitMQ has demonstrated setups of over a million messages/sec, Kafka has demonstrated setups of several million messages/sec. The primary architectural difference is that RabbitMQ handles its messages largely in-memory and thus uses a large cluster in these benchmarks (30+ nodes), whereas Kafka proudly leverages the powers of sequential disk I/O and requires less hardware (this benchmark uses 3x 6 core / 32 GB RAM nodes).

This older paper indicates Kafka handled 500,000 messages published per second and 22,000 messages consumed per second on a 2-node cluster with 6-disk RAID 10. http://research.microsoft.com/en

Now, a word on AMQP. Frankly, it seems the standard was a mess but has stabilized. Officially there is a 1.0 specification standardized by OASIS . In practice it is a forked standard, with 0.9.1 being broadly deployed in production, and a smaller number of users of 1.0.

AMQP has lost some of its sheen and momentum, but it has already succeeded in its goal of helping to break the hold TIBCO had on high performance, low latency messaging through 2007 or so. Now there are many options.

airbnb - some kafka facts

  • 90 small brokers
  • 70 topics
  • replication factor of 3
  • IN: 80MB/second
  • OUT: 1.5GB / second
  • MessagesInPerSec: 60K

airbnb - jetney

Message Bus 的作用:

At-lease once delivery 这个是什么意思?

User Activity Logging

  • site and image load times, OOM events
  • searches, bookings, etc.
  • Data ingestion should be reliable: timely and complete

Challenges

  • without schemas
  • easy to break events during evolution/code changes
  • one topic overall for 800+ event types
  • improper producer configs
  • lack of monitoring

lead to:

  • to many data outages, data loss incidents
  • lack of trust on data systems

Jitney Components

  • Schema Repository
  • Topic Repository

Schema Repository

Thrift Schema Repository which ships jar and gems

use Semantic Versioning: 1.0.0 = MODEL . REVISION . ADDITION

  • MODEL is a change which breaks the rules of backward compatibility. Example: changing the type of a field.
  • REVISION is a change which is backward compatible but not forward compatible. Example: adding a new field to a union type.
  • ADDITION is a change which is both backward compatible and forward compatible. Example: adding a new optional field.

Topic Repository

Declare all Jitney topics

Aggregate all characteristics of a topic:

  • name
  • ordering (partitioning function)
  • white list of accepted schemas

Great for documentation purposes

DRY(Don’t repeat yourself, 旨在软件开发中,减少重复的信息。DRY 的原则是系统中的每一部分,都必须有一个单一的、明确的、权威的代表。当DRY原则被成功应用时,一个系统中任何单个元素的修改都不需要与其逻辑无关的其他元素发生改变。此外,与之逻辑上相关的其他元素的变化均为可预见的、均匀的,并如此保持同步。)

Example of a Topic:

JitneyTopic.builder()
    .withRouter(SpinaltapHelper::getRouter) // what does router means?
    .setName('spinaltap')
    .addWhitelistedSchemas(
            "com.airbnb.jitney.event.spinaltap.v1",
            "com.airbnb.jitney.event.spinaltap.v2"
     )
    .withKeyProvider(SpinaltapHelper::getPartitionKey)
    .build()

Jitney Clients

  • Kafka clients are hard to use correctly. it’s better with 0.9
  • Committing offsets is tricky, someone will get it wrong. even with 0.9
  • Configuration is a mess

Consumer: // TODO

  • offset management to implement at-least once delivery
  • polymorphic dispatching to event handler

Example of a Java Producer:

JitneyProducer producer = JitneyProducerFactory.create(
        new ProducerConfig("my-client-id"),
        new JitneyBusConfig("localhost"), // TODO 需要自己指定 bus
        JitneyRepository.getTopic("canary")
    );
produver.startAsync();

//publish
final TBase event = new CanaryEvent(...); // TODO CanaryEvent
producer.publish(event);

producer.stopAsync();

Example of a Java Consumer:

final Dispatcher dispatcher = ClassBasedDispatcher.builder()
    .addHandler(CanaryEvent.class, new Handler<CanaryEvent>(){  // TODO 这是什么语法?
        public void handler(Message<CanaryEvent> message) {
            System.out.println("message received!");
        }
     })
    .build()

consumer = JitneyConsumerFactory.create(
        ...,
        ...,
        Arrays.asList(Subscription.of(Route.of("canary"), dispatcher))); // TODO

consumer.startAsync();
// worker threads are created and process incoming messages

HTTP Proxy

Warehouse Integration

// TODO 仓库整合是什么

Data Ingestion Pipeline

  • Stack: Jitney, Spark Streaming, HBase, HDFS
  • Spark Streaming 1.5 with Kafka “direct” connect
  • Process 1 minute batches
  • Write to HBase after deserializing with the right schema
  • Dump data to HDFS every hour (with dedup) and add a Hive partition // TODO Hive partition
  • But live data can be queried via “current” partition

Use cases currently powered

  • User activity ingestion
  • DB change ingestion
  • Payment processing via pub/sub
  • Experimentation
  • Cache invalidation // TODO cache invalidation 为什么要记日志,解决什么问题?

The Myth of “Exactly Once”

https://blog.treasuredata.com/blog/2012/09/05/takeaways-from-the-kafka-talk-at-airbnb-the-power-of-structured-data-and-the-myth-of-exactly-once/

The holy grail of messaging systems is “exactly once” , meaning that every message is always delivered (“at least once”) and never duplicated (“at most once”). And just like any other thing “holy grail”, it’s pretty unrealistic without major drawbacks.

While I cannot remember the exact line, Jay remarked how most systems that boast to have an “exactly once” guarantee come with a dubious footnote that goes something like “it is exactly once as long as consumers do not go down”. He went on to say that while exactly once semantics is not impossible (for example, with two-phase commits ), it is not often worth it because it results in reduced performance and availability.

Kafka Streams - Stream processing Made Simple with Kafka

Stream Processing isn’t (necessarily) // TODO

  • Transient, approximate, lossy…
  • … that you must have batch processing as safety net

Stream Processing

  • A different programming paradigm
  • … that bring computation to unbounded data
  • … with tradeoffs between latency / cost / correctness

Kafka: Real-time Platforms

  • Persistent Buffering
  • Logical Ordering // TODO
  • Scalable “source-of-truth” // TODO

DIY Stream Processing is Hard

  • Ordering
  • State Management
  • Partitioning & Scalability
  • Time, Window & Out-of-order Data
  • Fault tolerance
  • Re-processing