简单说一下,scala代码添加了自定义分区器,一直报错
对应的构造器,不知道哪里写错
java代码是没问题的:
已经解决:
对应的构造器
对应的代码:
val kafkaProducer = new FlinkKafkaProducer[TopicAndValuePojo](
"",
new DwdKafkaProducerSerializationSchema(),
GlobalConfig.getProperties,
java.util.Optional.of[FlinkKafkaPartitioner[TopicAndValuePojo]](new kafkaPartitioner())
)
TopicAndValuePojo是我自己定义的类。
执行报错:
Caused by: org.apache.kafka.common.errors.SerializationException
是因为配置参数加入了序列化,注释掉这2行代码就可以了。