java程序怎么获取kafka的topic

998
2023/9/22 20:16:40
栏目: 编程语言
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

Java程序可以通过Kafka提供的Java客户端来获取Kafka的topic。以下是一个获取topic列表的示例代码:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaFuture;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaTopicExample {
public static void main(String[] args) {
// Kafka配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// 创建AdminClient对象
try (AdminClient adminClient = AdminClient.create(properties)) {
// 获取topic列表
ListTopicsResult topicsResult = adminClient.listTopics();
// 获取Future对象
KafkaFuture<Collection<TopicListing>> topicListingFuture = topicsResult.listings();
// 获取topic列表
Collection<TopicListing> topicListings = topicListingFuture.get();
// 遍历输出每个topic
for (TopicListing topicListing : topicListings) {
System.out.println(topicListing.name());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}

在上述代码中,我们首先创建了一个AdminClient对象,并传入Kafka的配置。然后,我们通过listTopics方法获取一个ListTopicsResult对象,该对象包含了获取topic列表的方法。我们通过调用listings方法获取一个KafkaFuture对象,该对象代表了一个异步的获取topic列表的过程。最后,我们通过调用get方法获取真正的topic列表,并遍历输出每个topic的名称。

请注意,这里的配置中使用了bootstrap.servers参数来指定Kafka集群的地址,你需要根据你实际的Kafka集群配置来修改该参数。

辰迅云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

推荐阅读: java中timestamp转化为date怎么实现