初识flink之自定义source

it2024-11-02  15

由于想要获取kafka 的key,进行处理,故不能使用FlinkKafkaConsumer 来进行获取kafka数据进行消费。自定义source采用RichSourceFunction模式,代码如下: package xxx; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; /** */ public class FlinkKafkaSource { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); //获取数据 kafka消费数据获取 DataStreamSource<String> data = env.addSource(new RichSourceFunction<String>() { KafkaConsumer<String, String> kafkaConsumer; // = new KafkaConsumer<>(prop); //new StringDeserializer(), new StringDeserializer()); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); String topic = "xxx"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","xxx:0"); prop.setProperty("group.id","con1"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaConsumer = new KafkaConsumer<>(prop); kafkaConsumer.subscribe(Arrays.asList(topic)); } @Override public void run(SourceContext<String> sourceContext) throws Exception { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500)); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()){ ConsumerRecord<String, String> record = iterator.next(); String key = record.key(); String value = record.value(); sourceContext.collect("key:"+key+", value: "+value); } } } @Override public void cancel() { System.out.println("cancel"); } }); // 数据进行过滤 根据消息key 过滤 清洗 写入redis redis读取后存入mysql // 基于应用id 基于攻击 进行实时统计 计算频率 data.print().setParallelism(1); env.execute("StreamingKafkaSource"); } }

 

最新回复(0)