我们通常使用Kafka有两种方式,一种是通过Kafka自带的命令行工具,一种是通过Kafka提供的API。
Kafka提供了五类API:
- Producer API:生产者api
- Consumer API:消费者api
- Admin API:管理api
- Streams API:大数据流api
- Connect API:从别的数据库导入数据到kafka中或从kafka导出数据的api
以Java为例,首先导入Kafka客户端:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
Admin API
admin api用于管理kafka,比如topic管理,下面举几个例子
创建adminClient实例:
private AdminClient adminClient;
@BeforeEach
public void initAdminClient(){
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.3:9092");
adminClient = AdminClient.create(properties);
}
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG
即kafka集群地址,其他配置参数参考AdminClientConfig
创建topic
@Test
public void createTopic() throws ExecutionException, InterruptedException {
int numPartitions = 1; // partition数量
short replication = 1; // 副本数
NewTopic topic = new NewTopic("product", numPartitions, replication);
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(topic));
System.out.println(result.all().get());
}
列举topic
@Test
public void listTopic() throws ExecutionException, InterruptedException {
System.out.println(adminClient.listTopics().listings().get());
}
从结果上可以看到:
[(name=product, internal=false)]
其中internal=false
表示这个主题不是一个内部主题,所谓内部主题,就是由kafka自己创建的,内部使用的主题
删除topic
@Test
public void deleteTopic() throws ExecutionException, InterruptedException {
DeleteTopicsResult result = adminClient.deleteTopics(Collections.singleton("product"));
System.out.println(result.all().get());
}
查看topic详情
@Test
public void describeTopic() throws ExecutionException, InterruptedException {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singleton("product"));
System.out.println(result.all().get());
}
可以看到名称和分区信息等:
{product=(
name=product,
internal=false,
partitions=(
partition=0,
leader=192.168.0.103:9092 (id: 1001 rack: null),
replicas=192.168.0.103:9092 (id: 1001 rack: null),
isr=192.168.0.103:9092 (id: 1001 rack: null)
),
authorizedOperations=null
)}
查看主题配置
@Test
public void describeTopicConfig() throws ExecutionException, InterruptedException {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "product");
DescribeConfigsResult result = adminClient.describeConfigs(Collections.singleton(configResource));
System.out.println(result.all().get());
}
配置内容很长,就不贴出来了,
修改主题配置
@Test
public void modifyConfig() throws ExecutionException, InterruptedException {
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "product");
AlterConfigOp op = new AlterConfigOp(new ConfigEntry("min.insync.replicas", "0"), AlterConfigOp.OpType.SET);
configs.put(configResource, Collections.singleton(op));
System.out.println(adminClient.incrementalAlterConfigs(configs).all().get());
}
修改其他配置
支持修改其他配置,具体查看ConfigResource.Type
:
其他API
adminClient还有很多API,比如
adminClient.listPartitionReassignments()
adminClient.listOffsets()
adminClient.createPartitions()
不一一列举,具体查看adminClient源码
Producer API
生产数据
这是生产者API,用于生产数据
@Test
public void testProducer() throws Exception {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.3:9092");
// key和value的序列化方法
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(properties);
// 构造kafka消息,key可以不指定。
ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", null, "hello word");
// 发送消息
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println("partition: " + recordMetadata.partition());
System.out.println("offset: " + recordMetadata.offset());
}
构造kafka消息时,key可以不传。
如果有传,在多个partition情况下,将会用key进行hash来决定这条消息应该存放在哪一个partition中。
指定分区
# 指定0分区
new ProducerRecord<>("mytopic", 0, null, "hello word");
异步发送
上面的例子,producer.send(record).get()
会同步等待发送结果,为了效率,我们也可以异步发送:
// 异步发送消息
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.out.println("发送消息失败:" + exception.getMessage());
} else {
System.out.println("partition: " + metadata.partition());
System.out.println("offset: " + metadata.offset());
}
});
ack参数配置
在生产者中,ack参数是一个重要参数,配置方法如下:
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
该配置含义如下:
配置值 | 说明 | 优缺点 |
---|---|---|
acks=0 | 不用等待broker确认收到消息 | 性能高,但容易丢数据 |
acks=1 | 等待leader成功将数据写入磁盘,不等待follower | 如果follower备份数据失败,同时leader又刚好挂了,就会丢数据 |
acks=-1或all | 等待一定数量的broker都成功写入数据。可以使用min.insync.replicas 来配置数量。 |
可靠性高,但性能较低。比较少用,除非对数据可靠性有很高要求,例如与钱相关的业务。 |
Consumer API
消费数据
@Test
public void testConsumer() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.3:9092");
// key和value的反序列化方法
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 消费策略
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题,动态分配分区
consumer.subscribe(Collections.singletonList("mytopic"));
while (true) {
// 消费数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
System.out.println("消费到消息:" + record.value());
});
}
}
示例代码中ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
这个参数需要注意。
当消费者启动时,如果所在消费组已经有offset(之前消费过),那么就按offset继续消费。
如果没有offset,则按ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
配置消费,它有两个取值:
- latest(默认):旧数据不消费,只消费启动之后才产生的新数据
- earliest:第一次从头消费,以后按照offset消费
指定分区消费
我们要先将原来的订阅方法注释掉,consumer.subscribe
其实就是动态分配分区进行消费。改用consumer.assign
手动分配分区。
# 原来的订阅方法注释掉。
# consumer.subscribe(Collections.singletonList("mytopic"));
# 指定0分区消费
consumer.assign(Collections.singletonList(new TopicPartition("mytopic", 0)));
自动提交offset
自动提交offset就是consumer在poll到数据时就自动提交offset到__consumer_offset
这个topic中,但这可能导致数据丢失。
例如consumer poll到消息并提交offset了,但是此时服务挂了,业务未处理,重启consumer以后,上一条未处理的消息就无法就消费不到了。
手动提交offset
默认是自动提交,为了解决可能丢数据的情况,我们可以使用以下配置关闭自动提交:
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
然后我们在消费数据之后,进行手动提交offset,这样就可以保证每条消息都被处理:
while (true) {
// 消费数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
System.out.println("消费到消息:" + record.value());
});
if (records.count() > 0) {
try {
// 同步提交,阻塞
consumer.commitSync();
} catch (Exception e) {
System.out.println("提交失败:" + e.getMessage());
}
}
}
也可以异步提交:
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.out.println("提交失败:" + exception.getMessage());
} else {
System.out.println("提交成功");
}
});
一般情况同步提交即可
指定分区消费
consumer.assign(Collections.singletonList(new TopicPartition("mytopic", 0)));
指定offset消费:
# 注意,要先指定分区
consumer.assign(Collections.singletonList(new TopicPartition("mytopic", 0)));
# 指定分区,从offset 5开始消费
consumer.seek(new TopicPartition("mytopic", 0 ), 5 );
由于每个分区都有各自的offset,因此指定offset时必须指定分区。
由于要指定分区,所以seek之前也需要使用consumer.assign
手动分配指定分区到当前消费者。
从头消费
和指定offset消费同理:
# 注意,手动指定分区
consumer.assign(Collections.singletonList(new TopicPartition("mytopic", 0)));
# 指定分区,从头开始消费
consumer.seekToBeginning(Collections.singletonList(new TopicPartition("mytopic",0 )));
根据时间消费
根据时间消费的原理很简单,就是先使用consumer.offsetsForTimes()
根据时间找到对应的offset,然后再根据offset进行消费:
// 获取所有分区
List<PartitionInfo> partitions = consumer.partitionsFor("mytopic");
Map<TopicPartition, Long> offsetMap = new HashMap<>();
// 从10分钟前开始消费
Long consumeTimestamp = System.currentTimeMillis() - 10 * 60 * 1000;
partitions.forEach(partitionInfo -> {
offsetMap.put(new TopicPartition("mytopic", partitionInfo.partition()), consumeTimestamp);
});
// 根据时间找到对应的offset
Map<TopicPartition, OffsetAndTimestamp> offsetPartMap = consumer.offsetsForTimes(offsetMap);
// 根据offset消费(所有分区)
offsetPartMap.forEach((partition, offsetTimestamp) -> {
consumer.assign(Collections.singletonList(partition));
consumer.seek(partition, offsetTimestamp.offset());
});