Caused by: java.io.NotSerializableException: org.apache.kafka.common.metrics.MetricConfig

it2024-10-18  36

flink 自定义数据源的开发过程中遇到这样一个错误。错误原因:在于KafkaConsumer的初始化放在了定义处。解决: 应该放在open方法内初始化。 DataStreamSource<String> dataStreamSource = env.addSource(new KafkaSourceFunction()); //获取数据 kafka消费数据获取 DataStreamSource<String> data = env.addSource(new RichSourceFunction<String>() { KafkaConsumer<String, String> kafkaConsumer; // = new KafkaConsumer<>(prop); //new StringDeserializer(), new StringDeserializer()); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); String topic = "xxx"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","xxx:0000"); prop.setProperty("group.id","con1"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaConsumer = new KafkaConsumer<>(prop); kafkaConsumer.subscribe(Arrays.asList(topic)); } @Override public void run(SourceContext<String> sourceContext) throws Exception { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500)); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()){ ConsumerRecord<String, String> record = iterator.next(); String key = record.key(); String value = record.value(); sourceContext.collect("key:"+key+", value: "+value); } } } @Override public void cancel() { System.out.println("cancel"); } });

 

最新回复(0)