kafkaapi第一天

it2023-03-23  78

常用api 创建和查看topic

public static void main(String[] args) throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"node05kafka:9092,node06kafka:9092,node07kafka:9092"); KafkaAdminClient client =(KafkaAdminClient) KafkaAdminClient.create(properties); //创建topic信息 kafka默认的创建方式是异步,直接获取不到返回结果 CreateTopicsResult clientTopics = client.createTopics(Arrays.asList(new NewTopic("topic04", 3, (short) 3))); //clientTopics.all().get();//由异步改为同步 //获取topic列表 ListTopicsResult topicsResult = client.listTopics(); Set<String> names = topicsResult.names().get(); for (String name : names) { System.out.println(name); } client.close(); }

结果

INFO 2020-10-20 14:41:11 org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.2.0 INFO 2020-10-20 14:41:11 org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 05fcfde8f69b0349 topic01 topic02 topic03

看结果发现topic04是没有获取到的,但是已经调用了create方法,这是因为kafka的创建时异步的,不能 马上获取结果,要想获取到结果要添加 clientTopics.all().get();//由异步改为同步

public static void main(String[] args) throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"node05kafka:9092,node06kafka:9092,node07kafka:9092"); KafkaAdminClient client =(KafkaAdminClient) KafkaAdminClient.create(properties); //创建topic信息 kafka默认的创建方式是异步,直接获取不到返回结果 CreateTopicsResult clientTopics = client.createTopics(Arrays.asList(new NewTopic("topic03", 3, (short) 3))); **clientTopics.all().get();//由异步改为同步** //获取topic列表 ListTopicsResult topicsResult = client.listTopics(); Set<String> names = topicsResult.names().get(); for (String name : names) { System.out.println(name); } client.close(); }

删除topic

DeleteTopicsResult deleteTopics = client.deleteTopics(Arrays.asList( "topic03")); deleteTopics.all().get();//同步删除
最新回复(0)