xxxx is not found in PojoType<Order, fields = [amount: Integer, product: String, user: Long]>

it2024-04-16  50

代码如下:

Table orders = tEnv.from("Orders"); Table result = orders.select($("product"), $("user"),$("amount").as("total")); tEnv.toAppendStream(result, Order.class).print(); env.execute();

完整报错如下:

Exception in thread "main" org.apache.flink.table.api.TableException: total is not found in PojoType<Order, fields = [amount: Integer, product: String, user: Long]> at org.apache.flink.table.planner.sinks.TableSinkUtils$.$anonfun$expandPojoTypeToSchema$1(TableSinkUtils.scala:263) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.sinks.TableSinkUtils$.expandPojoTypeToSchema(TableSinkUtils.scala:260) at org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:240) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:258) at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:292) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:283) at Select.main(Select.java:32)

原因:

由于代码中使用了AS,也就是说新建了一个field名,所以原来的Order这个pojo中已经找不到你AS以后的名字了.

total没有在Order.class中查到,所以要改成Row.class

代码改成:

Table orders = tEnv.from("Orders"); Table result = orders.select($("product"), $("user"),$("amount").as("total")); tEnv.toAppendStream(result, Order.class).print(); env.execute();  

 

最新回复(0)