Flink Stream Api
Apache Bahir已经实现了redis对应的连接器
Flink streaming connector for Redis
maven仓库如下
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
Flinksql
如果想在flinksql中使用,还得对RedisSink进行封装使用, 下面就给出简单的使用例子
只需要两个文件
RedisDynamicTableSourceFactory
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
public class RedisDynamicTableSourceFactory implements DynamicTableSinkFactory {
public static final ConfigOption<String> host = ConfigOptions.key("host").stringType().noDefaultValue();
public static final ConfigOption<Integer> port = ConfigOptions.key("port").intType().noDefaultValue();
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
return new RedisDynamicTableSink(options);
}
@Override
public String factoryIdentifier() {
return "redis";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet();
options.add(host);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet();
options.add(port);
return options;
}
}
RedisDynamicTableSink
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
public class RedisDynamicTableSink implements DynamicTableSink {
private ReadableConfig options;
public RedisDynamicTableSink(ReadableConfig options) {
this.options = options;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
return ChangelogMode.insertOnly();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
String host = options.get(RedisDynamicTableSourceFactory.host);
FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder().setHost(host);
Integer port = options.get(RedisDynamicTableSourceFactory.port);
if(port != null){
builder.setPort(port);
}
FlinkJedisPoolConfig build = builder.build();
RedisMapper<RowData> stringRedisMapper = new RedisMapper<RowData>() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
@Override
public String getKeyFromData(RowData rowData) {
StringData string = rowData.getString(0);
return string.toString();
}
@Override
public String getValueFromData(RowData rowData) {
String s = rowData.toString();
return s;
}
};
RedisSink<RowData> stringRedisSink = new RedisSink<>(build, stringRedisMapper);
return SinkFunctionProvider.of(stringRedisSink);
}
@Override
public DynamicTableSink copy() {
return new RedisDynamicTableSink(this.options);
}
@Override
public String asSummaryString() {
return "my_redis_sink";
}
}
配置
使用
public class Kafka2Redis {
public static void main(final String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
tableEnvironment.executeSql(" " +
" CREATE TABLE sourceTable ( " +
" a varchar, " +
" b varchar " +
" ) WITH ( " +
" 'connector.type' = 'kafka', "
...
" ) ");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
" a STRING," +
" b STRING" +
") WITH (\n" +
" 'connector' = 'redis',\n" +
" 'host' = 'localhost'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select * " +
"from sourceTable");
}
}
测试