public static void main(String[] args) throws Exception {
    // Instantiate with a producer group name.
    DefaultMQProducer producer = new
        DefaultMQProducer("GroupNameDemo");
    // Specify name server addresses.
    producer.setNamesrvAddr("localhost:9876");
    //Launch the instance.
    producer.start();
    for (int i = 0; i < 100; i++) {
        //Create a message instance, specifying topic, tag and message body.
        Message msg = new Message("TopicDemo" /* Topic */,
                                  "TagA" /* Tag */,
                                  ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                                 );
        // Call send message to deliver message to one of brokers.
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
    }
    //Shut down once the producer instance is not longer in use.
    producer.shutdown();
}

消息发送的入口 DefaultMQProducer#send()

// DefaultMQProducer.java
@Override
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 检查消息
    Validators.checkMessage(msg, this);
    msg.setTopic(withNamespace(msg.getTopic()));
    // 发送消息
    return this.defaultMQProducerImpl.send(msg);
}

消息检验

  1. 检查消息内容不能为空
  2. 检查消息长度不能为 0
  3. 默认情况,消息大小不能超过 4M
// Validators.java
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
    throws MQClientException {
    if (null == msg) {
        // 抛异常
    }
    // topic
    Validators.checkTopic(msg.getTopic());

    // body 检查消息内容不能为空
    if (null == msg.getBody()) {
        // 抛异常
    }
    // 检查消息长度不能为0
    if (0 == msg.getBody().length) {
        // 抛异常
    }
    // 默认情况,消息大小不能超过 4M = 1024 * 1024 * 4; 
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        // 抛异常
    }
}
消息发送
// DefaultMQProducerImpl.java
// DEFAULT SYNC
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // this.defaultMQProducer.getSendMsgTimeout() 获取消息发送超时时间, 默认为3s
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg,long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 这里去查询主题路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        // 省略......
}

查询主题路由信息

如果生产者已经缓存了 topic 的路由信息,则直接返回。如果没有缓存,则向 NameServer 查询该 topic 的路由信息。

如果最终未能查询到路由信息,则直接抛出异常。

// DefaultMQProducerImpl.java
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 1. 先从缓存中获取
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 2. 向NameServer查询该topic的路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
TopicPublishInfo 信息如下
public class TopicPublishInfo {
    // 是否是顺序消息
    private boolean orderTopic = false; 
    private boolean haveTopicRouterInfo = false;
    // 该主题队列的消息队列
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    // 每选择一次消息队列,该值会自增1,如果Integer.MAX_VALUE,则重置为0,用于选择消息队列。
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    // topic队列元数据
    private List<QueueData> queueDatas;
    // topic分布的broker元数据
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
具体信息如下
{
    "haveTopicRouterInfo": true,
    "messageQueueList": [
        {
            "brokerName": "PQSZ-L0039",
            "queueId": 0,
            "topic": "TopicDemo"
        },
        {
            "brokerName": "PQSZ-L0039",
            "queueId": 1,
            "topic": "TopicDemo"
        },
        {
            "brokerName": "PQSZ-L0039",
            "queueId": 2,
            "topic": "TopicDemo"
        },
        {
            "brokerName": "PQSZ-L0039",
            "queueId": 3,
            "topic": "TopicDemo"
        }
    ],
    "orderTopic": false,
    "sendWhichQueue": {
        "andIncrement": 1497938501
    },
    "topicRouteData": {
        "brokerDatas": [
            {
                "brokerAddrs": {
                    "0": "10.178.42.122:10911"
                },
                "brokerName": "PQSZ-L0039",
                "cluster": "DefaultCluster"
            }
        ],
        "filterServerTable": {},
        "queueDatas": [
            {
                "brokerName": "PQSZ-L0039",
                "perm": 6,
                "readQueueNums": 4,
                "topicSynFlag": 0,
                "writeQueueNums": 4
            }
        ]
    }
}

智一面热门岗位面试题:

java实习(基础知识)

java实习(阿里巴巴实习生面经)

高级java开发工程师(微服务/Spring Cloud)

中高级PHP开发工程师(thinkphp/面向对象)

初中级运维工程师(linux/shell)

python数据分析师(数据分析/python)