RocketMQ--生产者消息发送

it2023-12-20  62

异步发消息

// 2.2 异步发送消息 // producer.send(message, new SendCallback() { // //rabbitmq急速入门的实战: 可靠性消息投递 // @Override // public void onSuccess(SendResult sendResult) { // System.err.println("msgId: " + sendResult.getMsgId() + ", status: " + sendResult.getSendStatus()); // } // @Override // public void onException(Throwable e) { // e.printStackTrace(); // System.err.println("------发送失败"); // } // });

异步发消息不能将producer关掉

同步发消息

=========================================================================================================

@Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg); }

=========================================================================================================

public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }

=========================================================================================================

public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }

=========================================================================================================

private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer);

=========================================================================================================

=========================================================================================================

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.client.impl; public enum CommunicationMode { SYNC, 同步 ASYNC, 异步 ONEWAY, 单向 }

=========================================================================================================

=========================================================================================================

=========================================================================================================

=========================================================================================================

最新回复(0)