由于想要获取kafka 的key,进行处理,故不能使用FlinkKafkaConsumer 来进行获取kafka数据进行消费。自定义source采用RichSourceFunction模式,代码如下: package xxx;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
/**
*/
public class FlinkKafkaSource {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();
//获取数据 kafka消费数据获取
DataStreamSource<String> data = env.addSource(new RichSourceFunction<String>() {
KafkaConsumer<String, String> kafkaConsumer;
// = new KafkaConsumer<>(prop);
//new StringDeserializer(), new StringDeserializer());
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String topic = "xxx";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","xxx:0");
prop.setProperty("group.id","con1");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaConsumer = new KafkaConsumer<>(prop);
kafkaConsumer.subscribe(Arrays.asList(topic));
}
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord<String, String> record = iterator.next();
String key = record.key();
String value = record.value();
sourceContext.collect("key:"+key+", value: "+value);
}
}
}
@Override
public void cancel() {
System.out.println("cancel");
}
});
// 数据进行过滤 根据消息key 过滤 清洗 写入redis redis读取后存入mysql
// 基于应用id 基于攻击 进行实时统计 计算频率
data.print().setParallelism(1);
env.execute("StreamingKafkaSource");
}
}