Analysis of Java development RocketMQ producer high availability example
1 Message
public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题名字 private String topic; //消息扩展信息,Tag,keys,延迟级别都存在这里 private Map<String, String> properties; //消息体,字节数组 private byte[] body; //设置消息的key, public void setKeys(String keys) {} //设置topic public void setTopic(String topic) {} //延迟级别 public int setDelayTimeLevel(int level) {} //消息过滤的标记 public void setTags(String tags) {} //扩展信息存放在此 public void putUserProperty(final String name, final String value) {} }
The message is the children. These children have their own characteristics but also have commonalities. Two children sent by the same parent can go to the same place, or they can go to different places.
1.1 topic
First of all, each child message has an attribute topic, which we mentioned above, is a waiting hall. After the children come in, they walk to the designated area of their designated waiting hall (don’t they also take the high-speed train at the designated platform when they go out), and sit in the message queue seats Wait, wait for the trip.
Broker has one or more topics, and messages will be stored in the message queue in the topic, waiting to be consumed.
1.2 Body
Child news, there is also a Body attribute, this is his ability, he can draw, he can sing, he Whatever you do is recorded in the Body attribute. When you go out, the place where value is reflected is also the Body attribute.
Body is the message body, and consumers will perform corresponding operations based on the message body.
1.3 tag
We mentioned this tag in the previous section. It is a mark. Some children carry drawing boards and cameras on their backs, and some cruise ships will specially find it. These kids pull away and complete their mission.
You can set tag attributes for messages, and consumers can choose messages containing specific tag attributes for consumption.
1.4 key
The key is the name of each child’s message. If you want to find a child, just call him by name.
Set the Key for the sent message, and you can search for messages based on this Key later. For example, if the message is abnormal or the message is lost, it will be very convenient to search.
1.5 Delay Level
Of course, some children don’t rush to leave when they come. They have thought about it before coming. It will take 30 minutes to have a meal, so they will wait when they come. Picked up after 30 minutes.
Setting the delay level can specify how long the message can be consumed.
2 Producer High Availability
Every parent who sends their children hopes to send them to the waiting hall, and they don’t want their children to be lost, this At this time, the waiting hall needs some guarantee mechanisms.
2.1 The client ensures high availability of the producer
2.1.1 Retry mechanism
That is to say, after the parents send the child to the waiting hall, If you fail to sit in the message queue seat, the staff will arrange to try again to see if there are seats available. The default number of retries is 2 times, that is to say, the message child has a total of 3 opportunities to find a seat.
Looking at the source code, I specially added comments so that it can be roughly understood.
//这里取到了重试的次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //获取消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试 throw e; } } else { break; } }
The retry code is as above. In this sendDefaultImpl
method, it will try to send the message three times. If it fails, the corresponding error will be thrown.
2.1.2 Client Fault Tolerance
If there are multiple Broker waiting halls, the service staff will arrange to message the child to choose one relatively less crowded, comparison Easy to enter to enter. Of course, we will not enter those that have been closed, have had power outages, and have no service capabilities. MQ Client will maintain a Broker's sending delay information, and based on this information will select a Broker with a relatively low delay to send the message. Will actively eliminate those Brokers that are down, unavailable or have a higher sending delay level.
Selecting
Broker is selecting message queue
. The corresponding code is as follows: Here we will first determine whether the
is turned on. This switch is turned off by default. If it is turned on, lower delay## will be selected first. #Broker.
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判断发送延迟容错开关是否开启 if (this.sendLatencyFaultEnable) { try { //选择一个延迟上可以接受,并且和上次发送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延迟时间可以接受,则返回这个Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前边都没选中一个Broker,就随机选一个Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
off state, the code executed is as follows: In order to evenly distribute the pressure on the Broker, it will choose A different Broker
from before.public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是没有上次的Brokername做参考,就随机选一个 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就选一个其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //这里判断遇上一个使用的Broker不是同一个 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectOneMessageQueue(); } }
accurately receive
message children in the Broker waiting hall, there will be at least two halls, oneMain Hall There is a Deputy Hall. Generally speaking, children will enter the main hall, and then after a while, the card should be busy believing the machine (the art of shadow clone) , and then let the clone enter the deputy hall, so that when the main hall has a power outage and stops working, the clone in the deputy hall will be fine as long as it completes the task. Generally speaking, it is the message from the main hall that the child goes to take the boat to complete the task.
The above is the detailed content of Analysis of Java development RocketMQ producer high availability example. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

Guide to Smith Number in Java. Here we discuss the Definition, How to check smith number in Java? example with code implementation.

In this article, we have kept the most asked Java Spring Interview Questions with their detailed answers. So that you can crack the interview.

Java 8 introduces the Stream API, providing a powerful and expressive way to process data collections. However, a common question when using Stream is: How to break or return from a forEach operation? Traditional loops allow for early interruption or return, but Stream's forEach method does not directly support this method. This article will explain the reasons and explore alternative methods for implementing premature termination in Stream processing systems. Further reading: Java Stream API improvements Understand Stream forEach The forEach method is a terminal operation that performs one operation on each element in the Stream. Its design intention is

Guide to TimeStamp to Date in Java. Here we also discuss the introduction and how to convert timestamp to date in java along with examples.

Capsules are three-dimensional geometric figures, composed of a cylinder and a hemisphere at both ends. The volume of the capsule can be calculated by adding the volume of the cylinder and the volume of the hemisphere at both ends. This tutorial will discuss how to calculate the volume of a given capsule in Java using different methods. Capsule volume formula The formula for capsule volume is as follows: Capsule volume = Cylindrical volume Volume Two hemisphere volume in, r: The radius of the hemisphere. h: The height of the cylinder (excluding the hemisphere). Example 1 enter Radius = 5 units Height = 10 units Output Volume = 1570.8 cubic units explain Calculate volume using formula: Volume = π × r2 × h (4

PHP and Python each have their own advantages, and the choice should be based on project requirements. 1.PHP is suitable for web development, with simple syntax and high execution efficiency. 2. Python is suitable for data science and machine learning, with concise syntax and rich libraries.

PHP is a scripting language widely used on the server side, especially suitable for web development. 1.PHP can embed HTML, process HTTP requests and responses, and supports a variety of databases. 2.PHP is used to generate dynamic web content, process form data, access databases, etc., with strong community support and open source resources. 3. PHP is an interpreted language, and the execution process includes lexical analysis, grammatical analysis, compilation and execution. 4.PHP can be combined with MySQL for advanced applications such as user registration systems. 5. When debugging PHP, you can use functions such as error_reporting() and var_dump(). 6. Optimize PHP code to use caching mechanisms, optimize database queries and use built-in functions. 7

Java is a popular programming language that can be learned by both beginners and experienced developers. This tutorial starts with basic concepts and progresses through advanced topics. After installing the Java Development Kit, you can practice programming by creating a simple "Hello, World!" program. After you understand the code, use the command prompt to compile and run the program, and "Hello, World!" will be output on the console. Learning Java starts your programming journey, and as your mastery deepens, you can create more complex applications.
