flink 自定义数据源的开发过程中遇到这样一个错误。错误原因:在于KafkaConsumer的初始化放在了定义处。解决: 应该放在open方法内初始化。 DataStreamSource<String> dataStreamSource = env.addSource(new KafkaSourceFunction());
//获取数据 kafka消费数据获取
DataStreamSource<String> data = env.addSource(new RichSourceFunction<String>() {
KafkaConsumer<String, String> kafkaConsumer;
// = new KafkaConsumer<>(prop);
//new StringDeserializer(), new StringDeserializer());
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String topic = "xxx";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","xxx:0000");
prop.setProperty("group.id","con1");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaConsumer = new KafkaConsumer<>(prop);
kafkaConsumer.subscribe(Arrays.asList(topic));
}
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord<String, String> record = iterator.next();
String key = record.key();
String value = record.value();
sourceContext.collect("key:"+key+", value: "+value);
}
}
}
@Override
public void cancel() {
System.out.println("cancel");
}
});