您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
RocketMQ之去重操作
 
  2228  次浏览      15
 2019-5-30
 
编辑推荐:

本文来自于liangzl,文章主要讨论了RocketMQ本身是否提供某些关键信息可以帮助我们去重,和业务上如何支持幂等。

之前聊过一个问题,RocketMQ的设计上,是不考虑消息去重的问题,即不考虑消息是否会重复的消费的问题,而是将这个问题抛给业务端自己去处理幂等的问题。

作为RocketMQ的使用者,现在去讨论RocketMQ为何不支持消息去重的问题,己经是无关痛痒,并且也意义不大。如果站在如何设计一个消息队列的角度去思考这个问题,这是设计上舍与得,无关对错。而现在要考虑的是,既然它不支持消息去重,那么就只能自己通过某些方式去保证消息去重。

RocketMQ本身是否提供某些关键信息可以帮助我们去重

业务上如何支持幂等

业务幂等

业务上支持幂等,实现起来定然不会很复杂,但是需要对现在的很多业务动刀,只有非常严谨的业务需要慎重考虑去重问题的时候才会去考虑改造,从业务上支持幂等,从它整个实现思路可能都不是特别优雅,因为业务方与组件耦合了。并且,RocketMQ不支持消息去重就可能猜到如果要支持去重,整个吞吐量可能都会有严重的下滑。

假设设计一层db来解决业务幂等(比如通过记录订单id),那么一条消息会有几种状态呢?

消息不存在 NONE

消息消费成功 SUCCESS

消息还在处理 PROCESS

消息消费失败 ERROR

基于以上几种状态去考虑设计一个严谨的业务幂等的解决方案,整个吞吐量下降了非常多,一条消息的消费至少涉及三次db操作,其中两次db writer,那么回过头来思考系统架构引入消息队列需要去解决什么样的问题?

解耦,提高响应速度

流量削峰填谷

数据异构

......

在消息的消费速度远远低于生产者的生产速度,直接会造成大量的消息堆积,带来的影响是非常严重的,比如很多业务的延迟大幅度提高,整个用户体验会很差,也许原先延迟仅在1s之内,突然上升到1h甚至更久。前几天阿里云生产事故,可以说是一片哀嚎,特别是消息队列,有的人泰然处之,有的人欲哭无泪,有的人准备删库跑路……

通过以下两个方案至少可以减轻如果组件方突然出事故的情况所带来的生产事故:

一致性方案

消息去重

其他

不通过业务上的各种唯一id来处理消息去重问题,而是基于原先对于RocketMQ的了解,下意识去考虑RocketMQ中的MsgId是否可以作为去重的关键点?

具备去重最紧要的首要因素是——>该值可以全局唯一的标识一条消息

大家都知道,在单机环境下想要生成唯一id是一件非常容易的事,比如通过数据库主键来生成一个唯一的id。但是在分布式作业环境下,想要生成一个全局唯一的id显然是比较困难的,不过目前也有非常多的成熟方案可以去处理,这里不作赘述。在这里更想提的是,RocketMQ提供全局唯一的id是如何做到的?

查看SendResult类可以看到其中有两个msgId

public class SendResult {
private SendStatus sendStatus;
private String msgId;// 客户端生成的id
private MessageQueue messageQueue;
private long queueOffset;
private String transactionId;
private String offsetMsgId;//服务端生成的id
private String regionId;
private boolean traceOn = true;

.....
}

客户端生成唯一id

客户端MsgId是怎么生成的呢?如下源码

public class MessageClientIDSetter {
private static final String TOPIC_KEY_SPLITTER = "#";
private static final int LEN;
private static final String FIX_STRING;
private static final AtomicInteger COUNTER;
private static long startTime;
private static long nextStartTime;

static {
LEN = 4 + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(10);
tempBuffer.position(2);
tempBuffer.putInt(UtilAll.getPid());// 进程id
tempBuffer.position(0);
try {
tempBuffer.put(UtilAll.getIP());// ip地址
} catch (Exception e) {
tempBuffer.put(createFakeIP());
}
tempBuffer.position(6);
tempBuffer.putInt(MessageClientIDSetter.class.
getClassLoader().hashCode());//
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}

private synchronized static void setStartTime(long
millis) {
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(millis);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
startTime = cal.getTimeInMillis();
cal.add(Calendar.MONTH, 1);
nextStartTime = cal.getTimeInMillis();
}

public static Date getNearlyTimeFromID(String
msgID) {
ByteBuffer buf = ByteBuffer.allocate(8);
byte[] bytes = UtilAll.string2bytes(msgID);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put(bytes, 10, 4);
buf.position(0);
long spanMS = buf.getLong();
Calendar cal = Calendar.getInstance();
long now = cal.getTimeInMillis();
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
long monStartTime = cal.getTimeInMillis();
if (monStartTime + spanMS >= now) {
cal.add(Calendar.MONTH, -1);
monStartTime = cal.getTimeInMillis();
}
cal.setTimeInMillis(monStartTime + spanMS);
return cal.getTime();
}

public static String getIPStrFromID(String
msgID) {
byte[] ipBytes = getIPFromID(msgID);
return UtilAll.ipToIPv4Str(ipBytes);
}

public static byte[] getIPFromID(String msgID) {
byte[] result = new byte[4];
byte[] bytes = UtilAll.string2bytes(msgID);
System.arraycopy(bytes, 0, result, 0, 4);
return result;
}

public static String createUniqID() {
StringBuilder sb = new StringBuilder(LEN * 2);
sb.append(FIX_STRING);
sb.append(UtilAll.bytes2string
(createUniqIDBuffer()));
return sb.toString();
}

private static byte[] createUniqIDBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
buffer.position(0);
buffer.putInt((int) (System.currentTimeMillis()
- startTime));
buffer.putShort((short) COUNTER.getAndIncrement());
return buffer.array();
}

public static void setUniqID(final Message
msg) {
if (msg.getProperty(MessageConst.
PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
msg.putProperty(MessageConst.
PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
createUniqID());
}
}

public static String getUniqID(final Message
msg) {
return msg.getProperty(MessageConst.
PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
}

public static byte[] createFakeIP() {
ByteBuffer bb = ByteBuffer.allocate(8);
bb.putLong(System.currentTimeMillis());
bb.position(4);
byte[] fakeIP = new byte[4];
bb.get(fakeIP);
return fakeIP;
}
}

从源码中可以看到

|value|part| |--|--| |FIX_STRING | ip地址 + 进程id + MessageClientIDSetter.class.getClassLoader().hashCode()组成 | |COUNTER | 是AtomicInteger值,可保证并发操作下的安全性| |TIME | System.currentTimeMillis() - 当前月开始的时间|

MsgId = FIX_STRING + bytes2string(TIME + COUNTER)

ip地址决定了分布式作业环境下生产的id值唯一

进程id决定了单机上多个客户端实例间生产的id值唯一

count作为原子Integer类型,决定了单实例运行时高并发下生产的值唯一

time 乃当前时间戳 - 当月开始时间戳的long值,保证应用月内重启不会重复。

什么情况下会出现id重复?

应用不重启,id不可能重复

月初重启,在各个条件均不变的情况下,得到的值可能跟上个月开始的值相等。但是RocketMQ另外的一个机制保证不会出现重复的数据,即默认删除三天前的数据。(可配置)

从这个里可以看到,通过ip+进程+自增值+时间戳达到了一个月内的数据时不会重复的,又通过默认清理数据的机制保证整个MQ运行时MsgId不会重复出现。但是总体来说,算法本身依赖两个条件达到的唯一性,一个是数据月内唯一性,以及数据清理机制。这个算法不适合所有的分布式唯一id生成场景,但是它非常适合消息队列这个场景,简单并且性能好(比如相较分布式锁生成id)。

服务端生成唯一id

offsetMsgId生成方式更加简单。

public class MessageDecoder {

public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
input.flip();
input.limit(MessageDecoder.MSG_ID_LENGTH);//长度16位

input.put(addr);// ip地址
input.putLong(offset);// 物理分区的偏移量offset

return UtilAll.bytes2string(input.array());
}
}

通过服务端ip地址+服务端消息的物理分区偏移量来达到唯一值id。

通过MsgId或者offsetMsgId去重

在平常的开发工作中,我们常常会提醒自己,不能去信任前端的关键数据,为何?因为它传输到服务端的过程是极大可能被修改的,所以它不是可信任的。

MsgId是客户端生成的id,它可不可靠?先不说被串改这样的问题,从算法的角度分析它是可靠的,但是它存在一个致命的问题:客户端发送至服务端消息时,有没有可能重复发送一条消息?正常情况下不可能,但是当存在网络波动,网路延时等诸多问题时,消息从客户端发送至服务端过程中,服务端正常写入了commit-log,可在响应客户端(ACK)的时候失败了……

结果如何?

可能是两条一样的消息内容,却有了不一样的MsgId跟OffsetMsgId,最终它还是重复消费了(这种情况极少出现,适合那些业务较为宽松的场景),但是由于在消费端无法直接取到MsgId的值(亦或者我还没看到),所以如果要以之作为去重id,过程需要自己实现。

OffsetMsgId是服务端生成的id,它可不可靠?很明显它也存在前文中说的客户端id的情况,但是它的好处是消费端可以通过api直接取到。从代码实现的角度来讲,以OffsetMsgId作为去重id是更为优雅的,RocketMQ 作为第三方组件嵌入系统,类似去重这样的工作如果可以与业务隔离开,无疑是最合适不过的。

另,忍不住吐槽,在RocketMQ-Console的客户端上重试消息的时候,拿到的MsgId跟消息正常消息的Id竟然不相同。前者为客户端Id,后端为服务端Id。

比如可以考虑最简单的方案,如下:

总结

1、极为严谨的业务必须业务幂等。 2、宽松业务可以考虑使用OffsetMsgId作为去重id。 3、唯一id的两种方式非常值得借鉴与思考,简单而且优雅。

 
   
2228 次浏览       15
相关文章

企业架构、TOGAF与ArchiMate概览
架构师之路-如何做好业务建模?
大型网站电商网站架构案例和技术架构的示例
完整的Archimate视点指南(包括示例)
相关文档

数据中台技术架构方法论与实践
适用ArchiMate、EA 和 iSpace进行企业架构建模
Zachman企业架构框架简介
企业架构让SOA落地
相关课程

云平台与微服务架构设计
中台战略、中台建设与数字商业
亿级用户高并发、高可用系统架构
高可用分布式架构设计与实践