消息队列

本文最后更新于:1 年前

消息队列

一、MQ(Message Queue)应用场景分析

image-20220412223812235

消息队列中间件是分布式系统中的重要组件,主要解决异步消息、应用解耦、流量削峰等问题,从而实现高性能、高可用、可伸缩和最终一致性的架构。

使用较多的消息队列有ActiveMQ、RabbitMQ、Kafka、MetaMQ等。

1. 异步处理

场景说明:用户注册后,需要发送注册邮件和注册短信。传统的做法如下:

将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

image-20220412224208457

引入消息队列,异步处理,改造后的架构如下:

image-20220412224351295

2. 应用解耦

image-20220412224529754

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是:订单系统调用库存系统的接口。如下图

image-20220412225537861

传统模式的特点:

  • 假如库存系统无法访问,则订单减库存将失败,从而导致下单失败
  • 订单系统与库存系统耦合

引入消息队列后的方案,如下图

image-20220412224704331

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户,下单成功
  • 库存系统:订阅下单的消息,获取下单信息,库存系统根据下单信息,进行库存操作
  • 假如:在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息队列就不再关系其他的后续操作了,实现订单系统与库存系统的应用解耦

3. 流量削峰

流量削峰也就是消息队列中常用场景,一般在秒杀或団抢活动中广泛使用。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

  • 可以控制活动的人数
  • 可以缓解短时间内高流量压垮应用

image-20220412230406551

  • 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
  • 秒杀业务根据消息队列中的请求信息,再做后续处理

image-20220412230512068

image-20220412230741204

image-20220412230828945

image-20220412231039002

image-20220422103235328

image-20220422112431010

image-20220422112445116

image-20220422112517612

image-20220422113952109

TopicSubscriber源码解析:

https://blog.csdn.net/xueyehuilang/article/details/120814991

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Subscriber<T> {
//日志记录
private static final Log log = LogFactory.getLog(DefaultSubscriber.class);

//超时时间,subscriber超过这一段时间无操作后就会断开连接,即使此时仍有subscriberListener在等待监听结果
private static final int DEFAULT_SHUTDOWN_TIMEOUT = 5;
private static final TimeUnit DEFAULT_SHUTDOWN_TIMEOUT_UNITS = TimeUnit.SECONDS;
//节点标识符
private final NodeIdentifier nodeIdentifier;
//可以在规定时间或定期执行的服务
private final ScheduledExecutorService executorService;
//接收消息队列
private final IncomingMessageQueue<T> incomingMessageQueue;
//该subscriber订阅的publisher
private final Set<PublisherIdentifier> knownPublishers;
//TCP管理器
private final TcpClientManager tcpClientManager;
//信号量,用于同步
private final Object mutex;

/**
* 管理监听此subscriber的监听器
*/
private final ListenerGroup<SubscriberListener<T>> subscriberListeners;

//实现了newDefault方法
public static <S> DefaultSubscriber<S> newDefault(NodeIdentifier nodeIdentifier,
TopicDeclaration description, ScheduledExecutorService executorService,
MessageDeserializer<S> deserializer) {
return new DefaultSubscriber<S>(nodeIdentifier, description, deserializer, executorService);
}
//私有构造方法
private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration,
MessageDeserializer<T> deserializer, ScheduledExecutorService executorService) {
super(topicDeclaration);
this.nodeIdentifier = nodeIdentifier;
this.executorService = executorService;
incomingMessageQueue = new IncomingMessageQueue<T>(deserializer, executorService);
knownPublishers = Sets.newHashSet();
tcpClientManager = new TcpClientManager(executorService);
mutex = new Object();
SubscriberHandshakeHandler<T> subscriberHandshakeHandler =
new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(),
incomingMessageQueue, executorService);
tcpClientManager.addNamedChannelHandler(subscriberHandshakeHandler);
subscriberListeners = new ListenerGroup<SubscriberListener<T>>(executorService);
//增加监听者的方法
subscriberListeners.add(new DefaultSubscriberListener<T>() {
@Override
public void onMasterRegistrationSuccess(Subscriber<T> registrant) {
log.info("Subscriber registered: " + DefaultSubscriber.this);
}

@Override
public void onMasterRegistrationFailure(Subscriber<T> registrant) {
log.info("Subscriber registration failed: " + DefaultSubscriber.this);
}

@Override
public void onMasterUnregistrationSuccess(Subscriber<T> registrant) {
log.info("Subscriber unregistered: " + DefaultSubscriber.this);
}

@Override
public void onMasterUnregistrationFailure(Subscriber<T> registrant) {
log.info("Subscriber unregistration failed: " + DefaultSubscriber.this);
}
});
}

public SubscriberIdentifier toIdentifier() {
return new SubscriberIdentifier(nodeIdentifier, getTopicDeclaration().getIdentifier());
}

public SubscriberDeclaration toDeclaration() {
return new SubscriberDeclaration(toIdentifier(), getTopicDeclaration());
}

public Collection<String> getSupportedProtocols() {
return ProtocolNames.SUPPORTED;
}

@Override
public boolean getLatchMode() {
return incomingMessageQueue.getLatchMode();
}

//新增消息监听器
@Override
public void addMessageListener(MessageListener<T> messageListener, int limit) {
incomingMessageQueue.addListener(messageListener, limit);
}

@Override
public void addMessageListener(MessageListener<T> messageListener) {
addMessageListener(messageListener, 1);
}

//移除消息监听器
@Override
public boolean removeMessageListener(MessageListener<T> messageListener) {
return incomingMessageQueue.removeListener(messageListener);
}

@Override
public void removeAllMessageListeners() {
incomingMessageQueue.removeAllListeners();
}

@VisibleForTesting
public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) {
synchronized (mutex) {
// TODO(damonkohler): If the connection is dropped, knownPublishers should
// be updated.
if (knownPublishers.contains(publisherIdentifier)) {
return;
}
tcpClientManager.connect(toString(), address);
knownPublishers.add(publisherIdentifier);
signalOnNewPublisher(publisherIdentifier);
}
}

/**
* 更新与该subscriber订阅的topic关联的publisher列表
*/
public void updatePublishers(Collection<PublisherIdentifier> publisherIdentifiers) {
for (final PublisherIdentifier publisherIdentifier : publisherIdentifiers) {
executorService.execute(new UpdatePublisherRunnable<T>(this, nodeIdentifier,
publisherIdentifier));
}
}

//关闭与该subscriber关联的所有对象
@Override
public void shutdown(long timeout, TimeUnit unit) {
signalOnShutdown(timeout, unit);
incomingMessageQueue.shutdown();
tcpClientManager.shutdown();
subscriberListeners.shutdown();
}

//关闭该subscriber
@Override
public void shutdown() {
shutdown(DEFAULT_SHUTDOWN_TIMEOUT, DEFAULT_SHUTDOWN_TIMEOUT_UNITS);
}

//增加监听器
@Override
public void addSubscriberListener(SubscriberListener<T> listener) {
subscriberListeners.add(listener);
}

/**
* 通知所有的SubscriberListener,该Subscriber在ROS master的注册成功
* 每个监听器都由单独的线程唤醒
*/
@Override
public void signalOnMasterRegistrationSuccess() {
final Subscriber<T> subscriber = this;
subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
@Override
public void run(SubscriberListener<T> listener) {
listener.onMasterRegistrationSuccess(subscriber);
}
});
}

/**
* 通知所有的SubscriberListener,该Subscriber在ROS master的注册失败
* 每个监听器都由单独的线程唤醒
*/
@Override
public void signalOnMasterRegistrationFailure() {
final Subscriber<T> subscriber = this;
subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
@Override
public void run(SubscriberListener<T> listener) {
listener.onMasterRegistrationFailure(subscriber);
}
});
}

/**
* 通知所有的SubscriberListener,该Subscriber在ROS master的注销成功
* 每个监听器都由单独的线程唤醒
*/
@Override
public void signalOnMasterUnregistrationSuccess() {
final Subscriber<T> subscriber = this;
subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
@Override
public void run(SubscriberListener<T> listener) {
listener.onMasterUnregistrationSuccess(subscriber);
}
});
}

/**
* 通知所有的SubscriberListener,该Subscriber在ROS master的注销失败
* 每个监听器都由单独的线程唤醒
*/
@Override
public void signalOnMasterUnregistrationFailure() {
final Subscriber<T> subscriber = this;
subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
@Override
public void run(SubscriberListener<T> listener) {
listener.onMasterUnregistrationFailure(subscriber);
}
});
}

/**
* 通知所有的SubscriberListener,该Subscriber连接到一个新的publisher
* 每个监听器都由单独的线程唤醒
*/
public void signalOnNewPublisher(final PublisherIdentifier publisherIdentifier) {
final Subscriber<T> subscriber = this;
subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
@Override
public void run(SubscriberListener<T> listener) {
listener.onNewPublisher(subscriber, publisherIdentifier);
}
});
}

/**
* 通知所有的SubscriberListener,该Subscriber已关闭
* 每个监听器都由单独的线程唤醒
*/
private void signalOnShutdown(long timeout, TimeUnit unit) {
final Subscriber<T> subscriber = this;
try {
subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
@Override
public void run(SubscriberListener<T> listener) {
listener.onShutdown(subscriber);
}
}, timeout, unit);
} catch (InterruptedException e) {
// Ignored since we do not guarantee that all listeners will finish before
// shutdown begins.
}
}

//重写打印方法
@Override
public String toString() {
return "Subscriber<" + getTopicDeclaration() + ">";
}
}

二、消息队列

消息中间件有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等

ActiveMQ: https://www.bilibili.com/video/BV1vJ41177j1

RabbitMQ: https://www.bilibili.com/video/BV15k4y1k7Ep

Kafka: https://www.bilibili.com/video/BV19y4y1b7Uo

RocketMQ: https://www.bilibili.com/video/BV1L4411y7mn

尚硅谷ActiveMQ教程: https://www.bilibili.com/video/BV1hK4y1U7sc


消息队列
https://chris-z-su.github.io/2022/04/20/java/消息队列/
作者
Chris
发布于
2022年4月20日
许可协议