Kafka ProducerRecord 是 Apache Kafka 中用于发送消息到 Kafka 主题(Topic)的一个类。要处理 Kafka ProducerRecord,你需要遵循以下步骤:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String topic = "my-topic";
String key = "my-key";
String value = "my-value";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
send()
方法发送 ProducerRecord 实例。这个方法是一个异步方法,它将立即返回一个 Future 对象。你可以选择等待这个操作完成,或者继续执行其他任务。Future<RecordMetadata> future = producer.send(record);
如果你希望等待消息发送完成并获取一个确认消息,可以调用 future.get()
方法。这将抛出 ExecutionException,你需要捕获并处理它。同时,你还可以获取一个 RecordMetadata 对象,其中包含消息的元数据(如分区、偏移量等)。
try {
RecordMetadata metadata = future.get();
System.out.println("Message sent to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
producer.close();
总结一下,处理 Kafka ProducerRecord 的步骤如下:
send()
方法发送消息。辰迅云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
推荐阅读: springboot 整合 kafka 性能调优