实践kafka

Kafka is a distributed streaming platform.

Kafka是一个分布式流式处理平台。

Confluent Apache Kafka & Streaming Platform for the Enterprise.

Confluent是Kafka三位作者离开LinkedIn后创办了Confluence.io后商业化Kafka的产品。

基本概念和术语

broker

消息

采用ByteBuffer存储

Key:消息键,对消息做 partition 时使用,即决定消息被保存在某 topic 下的哪个 partition 。

Value:

Timestamp:

消息交付语义

at most once

at least once

exactly once

topic

auto.create.topics.enable

自动创建主题,默认值为true

delete.topic.enable

是否可以删除主题,默认值为false

partition

一个topic可以设置多个partition,可以实现负载

num .partitions

默认分区数,默认值为1

一个分区只能被一个消费者组中的一个消费者处理。

segment

partition被划分成多个segment来组织数据

一个segment写满,会新建一个segment,segment的名称以该segment的base offset为名称

包涵log和index文件

replication

default.replication.factor

默认副本数,默认值1,副本数不能大于broker集群节点数

leader和follower

partition leader election

AR

assigned replicas

ISR

in-sync replicas

OSR

out-of-sync replicas

offset

topic partition下的每一条消息都有一个offset,用于记录消息的位置。

消费者端也有一个offset,用于记录消费者消费的位置。

生产者

bootstrap.servers

group.id

key.serializer

value.serializer

acks

broker应答模式

0

1

all或-1

buffer.memery

缓冲区大小,单位:字节,默认值32MB

compression.type

压缩类型,可取值:none、gzip、snappy、lz4

retries

batch.size

批量提交大小,单位:字节,默认值16KB

linger.ms

延时提交,可以聚集成批后提交,减少交互,提高吞吐量

max.request.size

消息最大长度,单位:字节,默认值1048576B

request.timeout.ms

请求超时时间,默认值30秒

enable.idempotence

是否启用幂等性producer

分区器(Partitioner)

实现接口Partitioner自定义分区

消费者

bootstrap.servers

group.id

key.deserializer

value.deserializer

session.timeout.ms

max.poll.interval.ms

auto.offset.reset

kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest和largest(offest保存在zk中)

kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为earliest,latest和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面)

如果存在已经提交的offest时,不管设置为earliest或者latest 都会从已经提交的offest处开始消费

如果不存在已经提交的offest时,earliest 表示从头开始消费,latest 表示从最新的数据消费,也就是新产生的数据.

none:topic各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常

enable.auto.commit

auto.commit.interval.ms

自动提交时间间隔

fetch.max.bytes

max.poll.records

heartbeat.interval.ms

connections.max.idle.ms

消费者组

1个partition只能被同组的一个consumer消费,同组的consumer则起到均衡效果,即同组的不同consumer可以分别消费不同的partition。

kafka-partition-consumer

事务

概要设计

吞吐量和延时

  • 操作系统页缓存是在内存中分配的(page cache,默认flush时间间隔是5秒),所以消息写入的速度非常快。
  • Kafka不必直接与底层的文件系统打交道。所有烦琐的 1/0 操作都交由操作系统来处理 。
  • Kafka写入操作采用追加写入( append )的方式,避免了磁盘随机写操作。
  • 使用以 sendfile 为代表的零拷贝技术加强网络间的数据传输效率。

持久化

负载均衡和故障转移

伸缩性

状态保存于zookeeper

快速开始

启动kafka自带的zookeeper

1
$ bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

1
$ bin/kafka-server-start.sh config/server.properties

帮助信息

1
2
3
# help
$ kafka-server-start.sh 
USAGE: /app/software/kafka/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*

使用独立的zookeeper,需要修改kafka配置文件config/server.propertieszookeeper.connect

1
2
# zookeeper.connect=localhost:2181
zookeeper.connect=192.168.1.23:2181

在bin目录下新建kafka deamon启动脚本start-kafka.sh

1
2
3
#!/bin/bash
BIN_HOME=$(cd `dirname $0`;pwd)
kafka-server-start.sh -daemon "$BIN_HOME/../config/server.properties"

创建topic

1
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic

1
2
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

发送消息

1
2
3
4
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 回车发送消息
> This is a message
> This is another message

消费消息

1
2
3
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

生产者

老版本

使用scala编写,从0.9.0.0版本开始不再推荐使用

1
2
3
4
5
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>0.8.2.0</version>
</dependency>

样例(github[sample-old-kafka])

1
2
3
4
5
6
7
Properties properties = new Properties();
properties.put("metadata.broker.list", "localhost:9092");
properties.put("serializer.class", StringEncoder.class.getName());

Producer producer = new Producer<Integer, String>(new ProducerConfig(properties));
// 默认同步发送
producer.send(new KeyedMessage<Integer, String>("topic", "message-" + i++));

新版本

0.8.2.x

使用java编写

1
2
3
4
5
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.0.0</version>
</dependency>
1
2
3
org.apache.kafka.clients.producer.Producer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>)

org.apache.kafka.clients.producer.Producer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback)

样例(github[sample-kafka])

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Properties properties = new Properties();
// 三个必填
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 默认异步发送
kafkaProducer.send(new ProducerRecord<>(topic, sendMsg), new Callback() {
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e != null) {
      e.printStackTrace();
    }
    log.info("Producer Message: Partition:" + recordMetadata.partition() + ",Offset:" + recordMetadata.offset());
    }
});

消费者

老版本

使用scala编写,从0.10.0.0版本开始不再推荐使用

kafka.consumer. Consumer.ConsumerConnector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int threadCount = 3;

Properties props= new Properties();
props.put ("zookeeper.connect", zkConnect); //必须指定
props.put ("group.id", groupID) ; //必须指定
           
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

Map<String , Integer> topicMap =new HashMap<> ();
// topic及其对应的消费线程数,小于或等于分区数
topicMap.put("topic", threadCount);

Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicMap);

List<KafkaStream<byte[], byte[]> streams= consumerMap.get(topic);

int threadNumber = 0;
// 为每个消息流启动一个线程迭代读取消息
executor= Executors.newFixedThreadPool(threadCount);
for (final KafkaStream stream : streams) {
    executor.submit(new ConsumerWork(stream, threadNumber++));
}

class ConsumerWork implements Runnable {
	private final KafkaStream<byte[], byte[]> stream;
	private int threadNumber;
	public ConsumerWork(KafkaStream stream, int threadNumber){
		this.stream= stream;
		this.threadNumber = threadNumber;
    }
	@Override
    public void run () {
		ConsumerIterator<byte[], byte[]> iter =stream.iterator();
		while (iter.hasNext()) {
			System.out.println(String.format("Thread %d consumed message: %s", threadNumber, String.valueOf(iter.next().message())));
			System.out.println(String.format ("Thread %d finished, exiting...", threadNumber));
        }
    }
}

新版本

0.9.0.x,org.apache.kafka.clients.consumer.KafkaConsumer

1
2
3
4
5
6
7
8
// 订阅topic
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection<java.lang.String>)
    
// 订阅topicPartition
org.apache.kafka.clients.consumer.KafkaConsumer#assign

// 读取消息
org.apache.kafka.clients.consumer.KafkaConsumer#poll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Properties properties = new Properties();
// 四个必填
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "testgroup");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("testtopic"));

while (true) {
  ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
  log.debug("Consumer {} messages", consumerRecords.count());
  for (ConsumerRecord<String, String> item : consumerRecords) {
    log.info("Consumer Message:" + item.value() + ",Partition:" + item.partition() + ",Offset:" + item.offset());
  }
}

集群

在集群环境中创建多副本的topic

1
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看集群环境中topic的信息

1
2
3
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,0,2	Isr: 1,0,2

–list

–alter

–delete

查看分区的消息数

1
2
3
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic HelloWorld1
HelloWorld1:1:47
HelloWorld1:0:68

伪集群

在一台服务器上部署3个节点,实现伪集群

config/server-1.properties:

1
2
3
4
# 各节点broker.id不能相同
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:

1
2
3
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

config/server-2.properties:

1
2
3
broker.id=3
listeners=PLAINTEXT://:9095
log.dir=/tmp/kafka-logs-3

真实集群

在三台服务器(192.168.7.100、192.168.7.101和192.168.7.102)上分别部署一个节点,实现真实集群

config/server.properties

1
2
3
4
5
6
# 各节点broker.id不能相同
broker.id=1
# 各主机IP
host.name=192.168.7.100
# 设置zookeeper的连接端口,可以指定zookeeper的chroot,起到更好的隔离作用
zookeeper.connect=192.168.7.100:2181,192.168.7.101:2181,192.168.7.102:2181/kafka_cluster1

Kafka Connect

Kafka Streams

工具

CMAK

CMAK

Kafka Eagle

kafka-eagle

Kafka Tool

Kafka Manager

Kafka Manager

Kafka Web Console

Kafka Web Console已停止维护,推荐使用KafkaManager

KafkaOffsetMonitor

KafkaOffsetMonitor

参考

《kafka并不难学!入门、进阶、商业实战》,kafka 0.10.2.0

《Kafka技术内幕》,kafka 0.10.0.0

《Kafka入门与实践》,kafka 0.10.1.1

《Apache Kafka 实战》,kafka 1.0.0,作者胡夕的个人博客

《Kafka权威指南》,kafka 0.9.0.1

《Apache Kafka源码剖析》,kafka 0.10.0

《深入理解Kafka:核心设计与实践原理》,kafka 2.0.0

Apache kafka是如何实现消息的精确一次(Exactly-once-semantics)语义的?