代码如下:
Table orders = tEnv.from("Orders"); Table result = orders.select($("product"), $("user"),$("amount").as("total")); tEnv.toAppendStream(result, Order.class).print(); env.execute();完整报错如下:
Exception in thread "main" org.apache.flink.table.api.TableException: total is not found in PojoType<Order, fields = [amount: Integer, product: String, user: Long]> at org.apache.flink.table.planner.sinks.TableSinkUtils$.$anonfun$expandPojoTypeToSchema$1(TableSinkUtils.scala:263) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.sinks.TableSinkUtils$.expandPojoTypeToSchema(TableSinkUtils.scala:260) at org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:240) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:258) at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:292) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:283) at Select.main(Select.java:32)原因:
由于代码中使用了AS,也就是说新建了一个field名,所以原来的Order这个pojo中已经找不到你AS以后的名字了.
total没有在Order.class中查到,所以要改成Row.class
代码改成:
Table orders = tEnv.from("Orders"); Table result = orders.select($("product"), $("user"),$("amount").as("total")); tEnv.toAppendStream(result, Order.class).print(); env.execute();