Java客户端使用


我们通常使用Kafka有两种方式,一种是通过Kafka自带的命令行工具,一种是通过Kafka提供的API。

Kafka提供了五类API:

  1. Producer API:生产者api
  2. Consumer API:消费者api
  3. Admin API:管理api
  4. Streams API:大数据流api
  5. Connect API:从别的数据库导入数据到kafka中或从kafka导出数据的api

以Java为例,首先导入Kafka客户端:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.8.0</version>
</dependency>

Admin API

admin api用于管理kafka,比如topic管理,下面举几个例子

创建adminClient实例:

private AdminClient adminClient;

@BeforeEach
public void initAdminClient(){
  Properties properties = new Properties();
  properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.3:9092");
  adminClient = AdminClient.create(properties);
}

AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG即kafka集群地址,其他配置参数参考AdminClientConfig

创建topic

@Test
public void createTopic() throws ExecutionException, InterruptedException {
  int numPartitions = 1; // partition数量
  short replication = 1; // 副本数
  NewTopic topic = new NewTopic("product", numPartitions, replication);
  CreateTopicsResult result = adminClient.createTopics(Collections.singleton(topic));
  System.out.println(result.all().get());
}

列举topic

@Test
public void listTopic() throws ExecutionException, InterruptedException {
  System.out.println(adminClient.listTopics().listings().get());
}

从结果上可以看到:

[(name=product, internal=false)]

其中internal=false表示这个主题不是一个内部主题,所谓内部主题,就是由kafka自己创建的,内部使用的主题

删除topic

@Test
public void deleteTopic() throws ExecutionException, InterruptedException {
  DeleteTopicsResult result = adminClient.deleteTopics(Collections.singleton("product"));
  System.out.println(result.all().get());
}

查看topic详情

@Test
public void describeTopic() throws ExecutionException, InterruptedException {
  DescribeTopicsResult result = adminClient.describeTopics(Collections.singleton("product"));
  System.out.println(result.all().get());
}

可以看到名称和分区信息等:

{product=(
  	name=product, 
  	internal=false, 
  	partitions=(
      	partition=0, 
      	leader=192.168.0.103:9092 (id: 1001 rack: null), 
      	replicas=192.168.0.103:9092 (id: 1001 rack: null), 
      	isr=192.168.0.103:9092 (id: 1001 rack: null)
    ), 
  	authorizedOperations=null
)}

查看主题配置

@Test
public void describeTopicConfig() throws ExecutionException, InterruptedException {
  ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "product");
  DescribeConfigsResult result = adminClient.describeConfigs(Collections.singleton(configResource));
  System.out.println(result.all().get());
}

配置内容很长,就不贴出来了,

修改主题配置

@Test
public void modifyConfig() throws ExecutionException, InterruptedException {
    Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "product");
    AlterConfigOp op = new AlterConfigOp(new ConfigEntry("min.insync.replicas", "0"), AlterConfigOp.OpType.SET);
    configs.put(configResource, Collections.singleton(op));
    System.out.println(adminClient.incrementalAlterConfigs(configs).all().get());
}

修改其他配置

支持修改其他配置,具体查看ConfigResource.Type:

其他API

adminClient还有很多API,比如

adminClient.listPartitionReassignments()
adminClient.listOffsets()
adminClient.createPartitions()

不一一列举,具体查看adminClient源码

Producer API

生产数据

这是生产者API,用于生产数据

@Test
public void testProducer() throws Exception {
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.3:9092");
    // key和value的序列化方法
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<String, String> producer = new KafkaProducer<>(properties);

    // 构造kafka消息,key可以不指定。
    ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", null, "hello word");

    // 发送消息
    RecordMetadata recordMetadata = producer.send(record).get();
    System.out.println("partition: " + recordMetadata.partition());
    System.out.println("offset: " + recordMetadata.offset());
}

构造kafka消息时,key可以不传。

如果有传,在多个partition情况下,将会用key进行hash来决定这条消息应该存放在哪一个partition中。

指定分区

# 指定0分区
new ProducerRecord<>("mytopic", 0, null, "hello word");

异步发送

上面的例子,producer.send(record).get()会同步等待发送结果,为了效率,我们也可以异步发送:

// 异步发送消息
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        System.out.println("发送消息失败:" + exception.getMessage());
    } else {
        System.out.println("partition: " + metadata.partition());
        System.out.println("offset: " + metadata.offset());
    }
});

ack参数配置

在生产者中,ack参数是一个重要参数,配置方法如下:

properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");

该配置含义如下:

配置值 说明 优缺点
acks=0 不用等待broker确认收到消息 性能高,但容易丢数据
acks=1 等待leader成功将数据写入磁盘,不等待follower 如果follower备份数据失败,同时leader又刚好挂了,就会丢数据
acks=-1或all 等待一定数量的broker都成功写入数据。可以使用min.insync.replicas来配置数量。 可靠性高,但性能较低。比较少用,除非对数据可靠性有很高要求,例如与钱相关的业务。

Consumer API

消费数据

@Test
public void testConsumer() {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.3:9092");

    // key和value的反序列化方法
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
    // 消费策略
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    //消费组
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    // 订阅主题,动态分配分区
    consumer.subscribe(Collections.singletonList("mytopic"));

    while (true) {
        // 消费数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        records.forEach(record -> {
            System.out.println("消费到消息:" + record.value());
        });
    }
}

示例代码中ConsumerConfig.AUTO_OFFSET_RESET_CONFIG这个参数需要注意。

当消费者启动时,如果所在消费组已经有offset(之前消费过),那么就按offset继续消费。

如果没有offset,则按ConsumerConfig.AUTO_OFFSET_RESET_CONFIG配置消费,它有两个取值:

  • latest(默认):旧数据不消费,只消费启动之后才产生的新数据
  • earliest:第一次从头消费,以后按照offset消费

指定分区消费

我们要先将原来的订阅方法注释掉,consumer.subscribe其实就是动态分配分区进行消费。改用consumer.assign手动分配分区。

# 原来的订阅方法注释掉。
# consumer.subscribe(Collections.singletonList("mytopic"));

# 指定0分区消费
consumer.assign(Collections.singletonList(new TopicPartition("mytopic", 0)));

自动提交offset

自动提交offset就是consumer在poll到数据时就自动提交offset到__consumer_offset这个topic中,但这可能导致数据丢失。

例如consumer poll到消息并提交offset了,但是此时服务挂了,业务未处理,重启consumer以后,上一条未处理的消息就无法就消费不到了。

手动提交offset

默认是自动提交,为了解决可能丢数据的情况,我们可以使用以下配置关闭自动提交:

properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

然后我们在消费数据之后,进行手动提交offset,这样就可以保证每条消息都被处理:

while (true) {
    // 消费数据
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    records.forEach(record -> {
        System.out.println("消费到消息:" + record.value());
    });
    if (records.count() > 0) {
        try {
            // 同步提交,阻塞
            consumer.commitSync();
        } catch (Exception e) {
            System.out.println("提交失败:" + e.getMessage());
        }
    }
}

也可以异步提交:

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.out.println("提交失败:" + exception.getMessage());
    } else {
        System.out.println("提交成功");
    }
});

一般情况同步提交即可

指定分区消费

consumer.assign(Collections.singletonList(new TopicPartition("mytopic", 0)));

指定offset消费:

# 注意,要先指定分区
consumer.assign(Collections.singletonList(new TopicPartition("mytopic", 0)));

# 指定分区,从offset 5开始消费
consumer.seek(new TopicPartition("mytopic", 0 ), 5 );

由于每个分区都有各自的offset,因此指定offset时必须指定分区。

由于要指定分区,所以seek之前也需要使用consumer.assign手动分配指定分区到当前消费者。

从头消费

和指定offset消费同理:

# 注意,手动指定分区
consumer.assign(Collections.singletonList(new TopicPartition("mytopic", 0)));

# 指定分区,从头开始消费
consumer.seekToBeginning(Collections.singletonList(new TopicPartition("mytopic",0 )));

根据时间消费

根据时间消费的原理很简单,就是先使用consumer.offsetsForTimes()根据时间找到对应的offset,然后再根据offset进行消费:

// 获取所有分区
List<PartitionInfo> partitions = consumer.partitionsFor("mytopic");

Map<TopicPartition, Long> offsetMap = new HashMap<>();
// 从10分钟前开始消费
Long consumeTimestamp = System.currentTimeMillis() - 10 * 60 * 1000;
partitions.forEach(partitionInfo -> {
    offsetMap.put(new TopicPartition("mytopic", partitionInfo.partition()), consumeTimestamp);
});

// 根据时间找到对应的offset
Map<TopicPartition, OffsetAndTimestamp> offsetPartMap = consumer.offsetsForTimes(offsetMap);

// 根据offset消费(所有分区)
offsetPartMap.forEach((partition, offsetTimestamp) -> {
    consumer.assign(Collections.singletonList(partition));
    consumer.seek(partition, offsetTimestamp.offset());
});
文章作者: 周君
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 周君 !
评论