在flink中,source主要负责数据的读取。
flink预定义的source中又分为「并行source(主要实现ParallelSourceFunction接口)「和」非并行source(主要实现了SourceFunction接口)」
附上官网相关的说明:
you can always write your own custom sources by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources
「1.基于File的数据源」
「1.1底层原理」
Flink在进行文件读取的时候会启动两个subTask,一个subTask用来监听,另外一个subTask用来数据读取。且监听的subTask是属于非并行的,即并行度为1,而数据读取的subTask是属于并行的 ,通常并行度和job的并行度是一致的
readTextFile(path):逐行读取文本文件,将符合TextInputFormat规范的文件,作为字符串返回 readFile(fileInputFormat, path):根据指定的文件输入格式读取 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):上面两个方法底层其实调用的就是该方法「1.2 代码」
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> textSource = env.readTextFile("/data/GitCode/flink/src/main/resources/word.txt"); //如果不设置并行度的话,这里会跟机器的核数一致 int parallelism = textSource.getParallelism(); System.out.println("textFile并行度:" + parallelism); textSource.print(); env.execute("TextFileSourceDemo"); }「2.基于Socket的数据源(一般用于学习,测试)」
「2.1底层原理」
通常调用socketTextStream方法来接收socket数据,元素可以用分隔符分开,该方法底层实现了SourceFunction接口,是属于非并行source
「2.2代码」
public static void main(String[] args) throws Exception { //并行度和系统核数一致 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //非并行的source,只有1个并行度 DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1])); int parallelism = lines.getParallelism(); System.out.println("socketTextStream并行度:"+parallelism); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> Arrays.stream(line.split(" ")).forEach(word -> out.collect(Tuple2.of(word, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT)); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(t -> t.f0).sum(1); summed.print(); env.execute("WordCount"); }「3.基于集合的数据源(学习,测试)」
从集合中创建一个数据流,集合中所有的元素类型都是一致的.其底层调用了SourceFunction接口,属于非并行source。注意:fromParallelCollection方法是属于并行的,底层调用了RichParallelSourceFunction接口
fromCollection(Collection)
val list=List(1,2,3,4,5,6,7,8,9) val inputStream=env.fromCollection(list)fromCollection(Iterator, Class)
val iterator=Iterator(1,2,3,4) val inputStream=env.fromCollection(iterator)fromElements(T ...)
从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的。
DataStreamSource<Integer> fromSource = env.fromElements(1, 2, 3, 4); int parallelism = fromSource.getParallelism(); System.out.println("fromElements并行度:" + parallelism);generateSequence(from, to)
从给定的间隔中并行地产生一个数字序列。
val inputStream = env.generateSequence(1,10)「4.基于Kafka的数据源(重点)」
这里给出简单的入门示例,实际工作场景中需要考虑到容错、数据一致性、分区发现、结合水印实现一些窗口操作等功能
public class kafkaSource { private static Properties properties = new Properties(); private static final String BOOTSTRAPLIST = "hdp1:6667,hdp2:6667"; private static final String GROUPID = "metric-group"; private static final String TOPIC = "metric"; static { properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPLIST); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> kafkaSource = env.addSource(new FlinkKafkaConsumer<String>(TOPIC, new SimpleStringSchema(), properties)); kafkaSource.print(); env.execute(); } }「2.1 非并行Source」
即实现SourceFunction接口
public class CustomNonParallelSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); List<String> list = new ArrayList<>(); list.add("Spark"); list.add("Hadoop"); list.add("Flink"); list.add("HBase"); list.add("Es"); SingleOutputStreamOperator<String> singleOutputStreamOperator = env.addSource(new MyWordSource(list)).returns(String.class); System.out.println("自定义非并行source对应的并行度:" + singleOutputStreamOperator.getParallelism()); singleOutputStreamOperator.print(); env.execute(); } static class MyWordSource implements SourceFunction<String> { private List<String> words; private boolean isStop = false; public MyWordSource() { } public MyWordSource(List<String> words) { this.words = words; } @Override public void run(SourceContext<String> ctx) throws Exception { if (words != null && words.size() > 0) { while (!isStop) { words.forEach(word -> ctx.collect(word)); } } } @Override public void cancel() { isStop = true; } } }「2.2 并行Source」
通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来定义并行source。
其中RichParallelSourceFunction功能更加丰富些,可以获取上下文信息,提供open和close方法等更加强大的功能
「简单的实时流」
public class CustomSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<String> words = new ArrayList<>(); words.add("spark"); words.add("hadoop"); words.add("flink"); // DataStreamSource<String> wordsSource = env.addSource(new MyWordSource(words)); SingleOutputStreamOperator singleOutputStreamOperator = env.addSource(new MyWordParallelSource(words)).returns(String.class); // 如果不设置并行度,则和系统核数一致 System.out.println("自定义并行source对应parallelism:" + singleOutputStreamOperator.getParallelism()); singleOutputStreamOperator.print(); env.execute("CustomSource"); } static class MyWordParallelSource extends RichParallelSourceFunction { private boolean isStop = false; private List<String> words; public MyWordParallelSource() { } /** * 通常在该方法做一些初始化的工作,如创建数据库连接等 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } /** * 释放资源 * @throws Exception */ @Override public void close() throws Exception { super.close(); } public MyWordParallelSource(List<String> word) { this.words = word; } @Override public void run(SourceContext ctx) throws Exception { int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); while (!isStop) { if (words != null && words.size() > 0) { words.forEach(word -> ctx.collect(numberOfParallelSubtasks + "--->" + word)); } Thread.sleep(2000); } } /** * 该方法对无界数据流有作用 */ @Override public void cancel() { isStop = true; } } static class MyWordSource extends RichSourceFunction<String> { private List<String> words; private boolean flag = true; public MyWordSource(List<String> words) { this.words = words; } /** * 产生数据,用sourceContext将数据发生出去 * * @param sourceContext * @throws Exception */ @Override public void run(SourceContext sourceContext) throws Exception { //获取当前subTask的运行上下文 RuntimeContext runtimeContext = getRuntimeContext(); int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); while (flag) { if (words != null && words.size() > 0) { words.forEach(word -> sourceContext.collect(indexOfThisSubtask + "--->" + word)); } Thread.sleep(2000); } } /** * 停止source,对于无界数据流有用 */ @Override public void cancel() { flag = false; } } }「自定义source读取mysql」
public class MysqlSource extends RichSourceFunction<Student> { private static Logger logger = LoggerFactory.getLogger(MysqlSource.class); private static final String URL ="jdbc:mysql://localhost:3306/hibernate?useUnicode=true&characterEncoding=UTF-8"; private static final String USERNAME="root"; private static final String PASSWORD="123456"; PreparedStatement preparedStatement; private Connection connection; private static Connection getConnection() { Connection connection = null; try { Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection(URL, USERNAME, PASSWORD); } catch (Exception e) { logger.error("创建Mysql连接异常,信息:[{}]",e); e.printStackTrace(); } return connection; } /** * open()方法中建立连接,这样不用每次invoke的时候建立和释放连接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "select * from student"; preparedStatement = this.connection.prepareStatement(sql); } /** * DataStream调用一次run方法用来获取数据 * @param ctx * @throws Exception */ @Override public void run(SourceContext<Student> ctx) throws Exception { ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next()){ Student student = new Student( resultSet.getInt("id"), resultSet.getString("name").trim(), resultSet.getString("password").trim(), resultSet.getInt("age")); ctx.collect(student); } } @Override public void cancel() { } /** * 关闭时释放连接 * @throws Exception */ @Override public void close() throws Exception { super.close(); if(connection!=null){ connection.close(); } if(preparedStatement!=null){ preparedStatement.close(); } } }