1.11 flinksql自定义redis connector 连接器

it2023-11-28  94

Flink Stream Api

Apache Bahir已经实现了redis对应的连接器

Flink streaming connector for Redis

maven仓库如下 

<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>

 Flinksql

如果想在flinksql中使用,还得对RedisSink进行封装使用, 下面就给出简单的使用例子

只需要两个文件

RedisDynamicTableSourceFactory

import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.FactoryUtil; import java.util.HashSet; import java.util.Set; public class RedisDynamicTableSourceFactory implements DynamicTableSinkFactory { public static final ConfigOption<String> host = ConfigOptions.key("host").stringType().noDefaultValue(); public static final ConfigOption<Integer> port = ConfigOptions.key("port").intType().noDefaultValue(); @Override public DynamicTableSink createDynamicTableSink(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); helper.validate(); ReadableConfig options = helper.getOptions(); return new RedisDynamicTableSink(options); } @Override public String factoryIdentifier() { return "redis"; } @Override public Set<ConfigOption<?>> requiredOptions() { Set<ConfigOption<?>> options = new HashSet(); options.add(host); return options; } @Override public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet(); options.add(port); return options; } }

RedisDynamicTableSink

import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; public class RedisDynamicTableSink implements DynamicTableSink { private ReadableConfig options; public RedisDynamicTableSink(ReadableConfig options) { this.options = options; } @Override public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { return ChangelogMode.insertOnly(); } @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { String host = options.get(RedisDynamicTableSourceFactory.host); FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder().setHost(host); Integer port = options.get(RedisDynamicTableSourceFactory.port); if(port != null){ builder.setPort(port); } FlinkJedisPoolConfig build = builder.build(); RedisMapper<RowData> stringRedisMapper = new RedisMapper<RowData>() { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.SET); } @Override public String getKeyFromData(RowData rowData) { StringData string = rowData.getString(0); return string.toString(); } @Override public String getValueFromData(RowData rowData) { String s = rowData.toString(); return s; } }; RedisSink<RowData> stringRedisSink = new RedisSink<>(build, stringRedisMapper); return SinkFunctionProvider.of(stringRedisSink); } @Override public DynamicTableSink copy() { return new RedisDynamicTableSink(this.options); } @Override public String asSummaryString() { return "my_redis_sink"; } }

配置

使用

public class Kafka2Redis { public static void main(final String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings); tableEnvironment.executeSql(" " + " CREATE TABLE sourceTable ( " + " a varchar, " + " b varchar " + " ) WITH ( " + " 'connector.type' = 'kafka', " ... " ) "); tableEnvironment.executeSql("CREATE TABLE sinktable (\n" + " a STRING," + " b STRING" + ") WITH (\n" + " 'connector' = 'redis',\n" + " 'host' = 'localhost'\n" + ")"); tableEnvironment.executeSql( "insert into sinktable " + "select * " + "from sourceTable"); } }

测试

最新回复(0)