flink的tablesql api的多种写法汇总

it2023-06-17  70

这个记载是为了方便转化网络中各种资料的写法,

所以每个阶段都收集了各种写法.

并且用代码进行了运行验证.

 

DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(             new OrderStream(1L, "beer", 3, Timestamp.valueOf("2017-09-16 10:30:00")),             new OrderStream(3L, "rubber", 2,Timestamp.valueOf("2017-09-16 10:10:00")),             new OrderStream(1L, "diaper", 4,Timestamp.valueOf("2017-09-16 10:20:00"))     ));

 

步骤写法初始化环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);建立表格

下面4种写法大多数情况下完全等效(任选1种即可)

①tEnv.registerDataStream("orderA", orderA, "user,product,amount");

 

②Deprecated写法

tEnv.createTemporaryView("orderA", orderA, "user,product,amount");

 

最新写法,②的替代写法

tEnv.createTemporaryView("orderA", orderA, $("user,product,amount"));

Table tableA = tEnv.fromDataStream(orderA, "user,product,amount"); tEnv.registerTable("orderA", tableA);

 

⑤当后续有使用as函数的需要时,必须采用下面这种写法,此时禁止采用③的写法

tEnv.createTemporaryView("Orders", orderA, $("user"), $("product"), $("amount"));

注册UDF

下面3种写法完全等效(任选1种即可)

①tEnv.registerFunction("hashcode", new HashCode(10));

②tEnv.createTemporaryFunction("hashcode", new HashCode(10));

 

③tEnv.createTemporarySystemFunction("hashcode", new HashCode(10));

 

②③在概念上肯定会有差别,但是一般应用中,效果没区别.

SQL查询与输出

下面4种写法完全等效(任选1种即可)

①需要注册UDF 的情况下

Table result = tEnv.from("orderA").select("product,hashcode(product)"); tEnv.toAppendStream(result, Row.class).print(); env.execute();

 

②需要注册UDF 的情况下

Table result=tEnv.from("orderA").select($("product"),call("hashcode","product")); tEnv.toAppendStream(result, Row.class).print(); env.execute();

 

③需要注册UDF 的情况下

Table result=tEnv.sqlQuery("select product,hashcode(product) from orderA"); tEnv.toAppendStream(result,Row.class).print(); env.execute();

 

④需要注册UDF 的情况下

tableA.select($("product"),call("hashcode","product")).execute().print();

 

 

⑤不注册UDF情况下,直接使用UDF

Table result=tEnv.from("orderA").select($("product"),call(HashCode.class,"product")); tEnv.toAppendStream(result, Row.class).print(); env.execute();

 

 

 

 

把上述步骤中,每个步骤任意抽取一个,拼接起来,

就能构成完整的Flink SQL程序.

 

Reference:

[1]Flink基础(二十):Table API 和 Flink SQL(五)函数

最新回复(0)