kafka-demo 3 生产者发送消息的同步与异步

it2023-04-02  78

3 生产者发送消息的同步与异步

代码地址:https://github.com/luslin1711/kafka_demo/tree/master/kafka_demo_03

producer.send() 返回的是Future 对象。 可以通过 Future 对象同步或是异步获取结果

public interface Future<V> { /** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when {@code cancel} is called, * this task should never run. If the task has already started, * then the {@code mayInterruptIfRunning} parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * * <p>After this method returns, subsequent calls to {@link #isDone} will * always return {@code true}. Subsequent calls to {@link #isCancelled} * will always return {@code true} if this method returned {@code true}. * * @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete * @return {@code false} if the task could not be cancelled, * typically because it has already completed normally; * {@code true} otherwise */ boolean cancel(boolean mayInterruptIfRunning); /** * Returns {@code true} if this task was cancelled before it completed * normally. * * @return {@code true} if this task was cancelled before it completed */ boolean isCancelled(); /** * Returns {@code true} if this task completed. * * Completion may be due to normal termination, an exception, or * cancellation -- in all of these cases, this method will return * {@code true}. * * @return {@code true} if this task completed */ boolean isDone(); /** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException; /** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

producer 对send 方法进行了重载。 支持只传入ProducerRecord对象, 或者 传入ProducerRecord对象与Callback对象

public interface Callback { /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will * be called when the record sent to the server has been acknowledged. When exception is not null in the callback, * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid. * * @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata * with -1 value for all fields except for topicPartition will be returned if an error occurred. * @param exception The exception thrown during processing of this record. Null if no error occurred. * Possible thrown exceptions include: * * Non-Retriable exceptions (fatal, the message will never be sent): * * InvalidTopicException * OffsetMetadataTooLargeException * RecordBatchTooLargeException * RecordTooLargeException * UnknownServerException * UnknownProducerIdException * * Retriable exceptions (transient, may be covered by increasing #.retries): * * CorruptRecordException * InvalidMetadataException * NotEnoughReplicasAfterAppendException * NotEnoughReplicasException * OffsetOutOfRangeException * TimeoutException * UnknownTopicOrPartitionException */ void onCompletion(RecordMetadata metadata, Exception exception); }

AbstractProducer

import com.luslin.demo.kafka.structs.Message; import com.luslin.demo.kafka.structs.serilizers.MessageSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public abstract class AbstractProducer { KafkaProducer<String, Message> producer; public void init() throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("enable.idempotence", "true"); props.put("retries", 5); props.put("max.in.flight.requests.per.connection", 1); producer = new KafkaProducer<>(props, new StringSerializer(), new MessageSerializer()); send(); producer.close(); } protected abstract void send() throws ExecutionException, InterruptedException; }

SyncSendProducer

import com.luslin.demo.kafka.structs.Message; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SyncSendProducer extends AbstractProducer { public SyncSendProducer() throws ExecutionException, InterruptedException { init(); } @Override protected void send() throws ExecutionException, InterruptedException { if (producer == null) throw new InterruptedException("producer is null"); for (int i = 0; i < 10; i++) { Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<String, Message>("topic03", Integer.toString(i), new Message("m:" + i, "context:" + i))); RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("offset: " + recordMetadata.offset() + ", partition: " + recordMetadata.partition()); } } }

AsyncSendProducer

public class AsyncSendProducer extends AbstractProducer { public AsyncSendProducer() throws ExecutionException, InterruptedException { init(); } @Override protected void send() throws ExecutionException, InterruptedException { if (producer == null) throw new InterruptedException("producer is null"); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, Message>("topic03", Integer.toString(i), new Message("m:" + i, "context:" + i)), new CallBackHandler()); } } }

CallBackHandler

import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CallBackHandler implements Callback { private KafkaProducer producer; private ProducerRecord record; @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.out.println("send error: " + exception.getMessage()); } else { System.out.println("send success, partition:" + metadata.partition() + ", offset: " + metadata.offset()); } } }

Producer

public class Producer { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("sync producer"); new SyncSendProducer(); System.out.println("async producer"); new AsyncSendProducer(); } }
最新回复(0)