flink从入门到放弃之入门篇(二)-Source操作

it2024-06-27  45

​1.Flink预定义Source操作

在flink中,source主要负责数据的读取。

flink预定义的source中又分为「并行source(主要实现ParallelSourceFunction接口)「和」非并行source(主要实现了SourceFunction接口)」

附上官网相关的说明:

you can always write your own custom sources by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources

「1.基于File的数据源」

「1.1底层原理」

Flink在进行文件读取的时候会启动两个subTask,一个subTask用来监听,另外一个subTask用来数据读取。且监听的subTask是属于非并行的,即并行度为1,而数据读取的subTask是属于并行的 ,通常并行度和job的并行度是一致的

readTextFile(path):逐行读取文本文件,将符合TextInputFormat规范的文件,作为字符串返回 readFile(fileInputFormat, path):根据指定的文件输入格式读取 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):上面两个方法底层其实调用的就是该方法

「1.2 代码」

public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());         DataStreamSource<String> textSource = env.readTextFile("/data/GitCode/flink/src/main/resources/word.txt");         //如果不设置并行度的话,这里会跟机器的核数一致         int parallelism = textSource.getParallelism();         System.out.println("textFile并行度:" + parallelism);         textSource.print();         env.execute("TextFileSourceDemo");     }

「2.基于Socket的数据源(一般用于学习,测试)」

「2.1底层原理」

通常调用socketTextStream方法来接收socket数据,元素可以用分隔符分开,该方法底层实现了SourceFunction接口,是属于非并行source

「2.2代码」

   public static void main(String[] args) throws Exception {         //并行度和系统核数一致         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //非并行的source,只有1个并行度         DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));         int parallelism = lines.getParallelism();         System.out.println("socketTextStream并行度:"+parallelism);         SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> Arrays.stream(line.split(" ")).forEach(word -> out.collect(Tuple2.of(word, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));         SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1);         summed.print();         env.execute("WordCount");     }

「3.基于集合的数据源(学习,测试)」

从集合中创建一个数据流,集合中所有的元素类型都是一致的.其底层调用了SourceFunction接口,属于非并行source。注意:fromParallelCollection方法是属于并行的,底层调用了RichParallelSourceFunction接口

fromCollection(Collection)

val list=List(1,2,3,4,5,6,7,8,9) val inputStream=env.fromCollection(list)

fromCollection(Iterator, Class)

val iterator=Iterator(1,2,3,4) val inputStream=env.fromCollection(iterator)

fromElements(T ...)

从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的。

DataStreamSource<Integer> fromSource = env.fromElements(1, 2, 3, 4); int parallelism = fromSource.getParallelism(); System.out.println("fromElements并行度:" + parallelism);

generateSequence(from, to)

从给定的间隔中并行地产生一个数字序列。

val inputStream = env.generateSequence(1,10)

「4.基于Kafka的数据源(重点)」

这里给出简单的入门示例,实际工作场景中需要考虑到容错、数据一致性、分区发现、结合水印实现一些窗口操作等功能

public class kafkaSource {     private static Properties properties = new Properties();     private static final String BOOTSTRAPLIST = "hdp1:6667,hdp2:6667";     private static final String GROUPID = "metric-group";     private static final String TOPIC = "metric";     static {         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPLIST);         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");         properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");     }     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());         DataStreamSource<String> kafkaSource = env.addSource(new FlinkKafkaConsumer<String>(TOPIC, new SimpleStringSchema(), properties));         kafkaSource.print();         env.execute();     } }

2.Flink自定义Source操作

「2.1 非并行Source」

即实现SourceFunction接口

public class CustomNonParallelSource {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());         List<String> list = new ArrayList<>();         list.add("Spark");         list.add("Hadoop");         list.add("Flink");         list.add("HBase");         list.add("Es");         SingleOutputStreamOperator<String> singleOutputStreamOperator = env.addSource(new MyWordSource(list)).returns(String.class);         System.out.println("自定义非并行source对应的并行度:" + singleOutputStreamOperator.getParallelism());         singleOutputStreamOperator.print();         env.execute();     }     static class MyWordSource implements SourceFunction<String> {         private List<String> words;         private boolean isStop = false;         public MyWordSource() {         }         public MyWordSource(List<String> words) {             this.words = words;         }         @Override         public void run(SourceContext<String> ctx) throws Exception {             if (words != null && words.size() > 0) {                 while (!isStop) {                     words.forEach(word -> ctx.collect(word));                 }             }         }         @Override         public void cancel() {             isStop = true;         }     } }

「2.2 并行Source」

通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来定义并行source。

其中RichParallelSourceFunction功能更加丰富些,可以获取上下文信息,提供open和close方法等更加强大的功能

「简单的实时流」

public class CustomSource {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         List<String> words = new ArrayList<>();         words.add("spark");         words.add("hadoop");         words.add("flink"); //        DataStreamSource<String> wordsSource = env.addSource(new MyWordSource(words));         SingleOutputStreamOperator singleOutputStreamOperator = env.addSource(new MyWordParallelSource(words)).returns(String.class); //      如果不设置并行度,则和系统核数一致         System.out.println("自定义并行source对应parallelism:" + singleOutputStreamOperator.getParallelism());         singleOutputStreamOperator.print();         env.execute("CustomSource");     }     static class MyWordParallelSource extends RichParallelSourceFunction {         private boolean isStop = false;         private List<String> words;         public MyWordParallelSource() {         }         /**          * 通常在该方法做一些初始化的工作,如创建数据库连接等          * @param parameters          * @throws Exception          */         @Override         public void open(Configuration parameters) throws Exception {             super.open(parameters);         }         /**          * 释放资源          * @throws Exception          */         @Override         public void close() throws Exception {             super.close();         }         public MyWordParallelSource(List<String> word) {             this.words = word;         }         @Override         public void run(SourceContext ctx) throws Exception {             int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();             while (!isStop) {                 if (words != null && words.size() > 0) {                     words.forEach(word -> ctx.collect(numberOfParallelSubtasks + "--->" + word));                 }                 Thread.sleep(2000);             }         }         /**          * 该方法对无界数据流有作用          */         @Override         public void cancel() {             isStop = true;         }     }     static class MyWordSource extends RichSourceFunction<String> {         private List<String> words;         private boolean flag = true;         public MyWordSource(List<String> words) {             this.words = words;         }         /**          * 产生数据,用sourceContext将数据发生出去          *          * @param sourceContext          * @throws Exception          */         @Override         public void run(SourceContext sourceContext) throws Exception {             //获取当前subTask的运行上下文             RuntimeContext runtimeContext = getRuntimeContext();             int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();             while (flag) {                 if (words != null && words.size() > 0) {                     words.forEach(word -> sourceContext.collect(indexOfThisSubtask + "--->" + word));                 }                 Thread.sleep(2000);             }         }         /**          * 停止source,对于无界数据流有用          */         @Override         public void cancel() {             flag = false;         }     } }

「自定义source读取mysql」

public class MysqlSource extends RichSourceFunction<Student> {     private static Logger logger = LoggerFactory.getLogger(MysqlSource.class);     private static final String URL ="jdbc:mysql://localhost:3306/hibernate?useUnicode=true&characterEncoding=UTF-8";     private static final String USERNAME="root";     private static final String PASSWORD="123456";     PreparedStatement preparedStatement;     private Connection connection;     private static Connection getConnection() {         Connection connection = null;         try {             Class.forName("com.mysql.jdbc.Driver");             connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);         } catch (Exception e) {             logger.error("创建Mysql连接异常,信息:[{}]",e);             e.printStackTrace();         }         return  connection;     }     /**      * open()方法中建立连接,这样不用每次invoke的时候建立和释放连接      *      * @param parameters      * @throws Exception      */     @Override     public void open(Configuration parameters) throws Exception {         super.open(parameters);         connection = getConnection();         String sql = "select * from student";         preparedStatement = this.connection.prepareStatement(sql);     }     /**      * DataStream调用一次run方法用来获取数据      * @param ctx      * @throws Exception      */     @Override     public void run(SourceContext<Student> ctx) throws Exception {         ResultSet resultSet =                 preparedStatement.executeQuery();         while (resultSet.next()){             Student student = new Student(                     resultSet.getInt("id"),                     resultSet.getString("name").trim(),                     resultSet.getString("password").trim(),                     resultSet.getInt("age"));             ctx.collect(student);         }     }     @Override     public void cancel() {     }     /**      * 关闭时释放连接      * @throws Exception      */     @Override     public void close() throws Exception {         super.close();         if(connection!=null){             connection.close();         }         if(preparedStatement!=null){             preparedStatement.close();         }     } }

3.总结

最新回复(0)