kafka-demo 2 自定义serializer

it2023-05-16  76

2 自定义serializer

代码地址:https://github.com/luslin1711/kafka_demo/tree/master/kafka_demo_02

一、序列化与反序列化

一般情况下,我们传递的value都是自己定义的结构。 如果想让kafka识别这些数据结构, 需要实现org.apache.kafka.common.serialization.Serializer 接口。

public interface Serializer<T> extends Closeable { /** * Configure this class. * @param configs configs in key/value pairs * @param isKey whether is for key or value */ default void configure(Map<String, ?> configs, boolean isKey) { // intentionally left blank } /** * Convert {@code data} into a byte array. * * @param topic topic associated with data * @param data typed data * @return serialized bytes */ byte[] serialize(String topic, T data); /** * Convert {@code data} into a byte array. * * @param topic topic associated with data * @param headers headers associated with the record * @param data typed data * @return serialized bytes */ default byte[] serialize(String topic, Headers headers, T data) { return serialize(topic, data); } /** * Close this serializer. * <p> * This method must be idempotent as it may be called multiple times. */ @Override default void close() { // intentionally left blank } }

实现 byte[] serialize(String topic, T data); 方法, 将自定义结构转换成byte 数组。 这里可以使用io.protostuf.protostuff 来进行

a. 定义message类

public class Message { String Id; String Context; public Message(String id, String context) { Id = id; Context = context; } }

b. 添加依赖:

'io.protostuff:protostuff-runtime:1.7.2', 'io.protostuff:protostuff-core:1.7.2'

c. 创建序列化工具类

package com.luslin.demo.kafka.server.util; import io.protostuff.LinkedBuffer; import io.protostuff.ProtostuffIOUtil; import io.protostuff.Schema; import io.protostuff.runtime.RuntimeSchema; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class ProtostuffUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>(); /** * 序列化 * * @param message 序列化数据 * @param tClass .class * @param <T> 类型 * @return byte[] */ public static <T> byte[] serializer(T message, Class<T> tClass) { Schema<T> schema = getSchema(tClass); return ProtostuffIOUtil.toByteArray(message, schema, LinkedBuffer.allocate()); } /** * 反序列化 * * @param bytes bytes * @param tClass .class * @param <T> 类型 * @return T */ public static <T> T deserializer(byte[] bytes, Class<T> tClass) { Schema<T> schema = getSchema(tClass); T message = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, message, schema); return message; } private static <T> Schema<T> getSchema(Class<T> tClass) { Schema<T> schema = (Schema<T>) cachedSchema.get(tClass); if (schema == null) { schema = RuntimeSchema.createFrom(tClass); cachedSchema.put(tClass, schema); } return schema; } }

d.创建序列化类

package com.luslin.demo.kafka.server.util; import com.luslin.demo.kafka.server.structs.Message; import org.apache.kafka.common.serialization.Serializer; public class MessageSerializer implements Serializer<Message> { @Override public byte[] serialize(String topic, Message data) { return ProtostuffUtil.serializer(data, Message.class); } }

二、producer

改写Producer , 实现发送Message

package com.luslin.demo.kafka.server; import com.luslin.demo.kafka.server.structs.Message; import com.luslin.demo.kafka.server.util.MessageSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class Producer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("enable.idempotence", "true"); props.put("retries", 5); props.put("max.in.flight.requests.per.connection", 1); KafkaProducer<String, Message> producer = new KafkaProducer<>(props, new StringSerializer(), new MessageSerializer()); for (int i = 0; i < 100; i++) { Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<String, Message>("message_topic", Integer.toString(i), new Message("m:" + i, "context:" + i))); RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("offset: " + recordMetadata.offset() + ", partition: " + recordMetadata.partition()); } producer.close(); } }

三、consumer

改写consumer , 实现接收Message

package com.luslin.demo.kafka.consumer; import com.luslin.demo.kafka.structs.Message; import com.luslin.demo.kafka.structs.serilizers.MessageDeserilizer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "consumers01"); // 组id,同一组下共享offset props.put("enable.auto.commit", "false"); // 关闭自动提交 props.put("fetch.min.bytes", "512"); // 指定从服务器获取的最小字节数 props.put("fetch.max.wait.ms", "100"); // 等待时间。在服务器数据不足fetch.min.bytes时, 达到时间也会返回 props.put("auto.offset.reset", "earliest"); // 在offset 失效的情况下,earliest表示从起始位置开始读取分区记录。 latest 表示从最新记录读取(在消费者启动后) KafkaConsumer<String, Message> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new MessageDeserilizer()); try { consumer.subscribe(Collections.singletonList("topic02")); while (true) { ConsumerRecords<String, Message> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) continue; System.out.println("recordsLen: " + records.count()); for ( ConsumerRecord <String, Message> record : records ) { System.out.println("partition: " + record.partition() + " message: " + record.value() + " offset: " + record.offset() ); } consumer.commitAsync(); } } finally { consumer.close(); } } }
最新回复(0)