IM设计和实现
以下都是手敲,不涉密。 只是做这个系统的一些想法和踩得一些坑的一些记录。
# 架构
入侵监测系统,利用APISIX的API,分析他的请求响应日志分析,比如一些ip总是token不合法的,频繁访问验证码的,频繁触发缓存穿透的。 再把这些ip都给进黑名单。
分布式日志系统、
监控系统
# 分层
- apisix 层 代替nginx做域名解析,原则上不允许变动。
- getWay层 , 2台以上机器做灰度发布,切换流量使用,尽量不发布,除非网关的代码或配置变动。
- 消息API层,做消息处理层相关业务(如单聊/群聊等)的一些实现的门面
- netty/ws层 ,消息处理层,只做消息转发操作重试,不做任何业务处理。
- MQ,Redis层做中间临时数据存储层。
- 具体相关业务处理层,和相关对外提供接口。
# 主体流程
- 调用登录服务接口,获得token授权
- 设置客户端的一些个性化设置(连接类型,开启搜索,持久化时间等级,未读数统计方式,超时重发等级),调用获得IM连接服务初始化,利用nacos的rabbion获得地址,并放到redis登记(string记用户和hash记服务器),获得连接的类型ws/tcp和地址。
- 调用未读数,未读消息等等上线相关接口,并做各自的业务操作。
- 调用单聊/群聊等发送消息接口,往kafka中放入消息,再从redis中获得到对应的消息服务并且发送消息。(发消息可以不ok,MQ操作必须ok)
- MQ,下面存储、搜索、离线消息、未读数服务订阅消费。
- 接受到的客户端,调用ACK已读接口,只往MQ发送消息已读topic。
- MQ,下面的存储,未读数,离线服务做相关业务。
- 调用下线或者服务端保活失效,发送下线消息到MQ,同时关闭连接。 MQ对应的做移除redis记录,用户状态变更等等。
- MQ中的离线服务,做删除redis注册表。
# 表结构设计
- 发件箱和收件箱
- 群已读功能,使用读扩散,群已读表存放.
查看用户1跟用户2的聊天记录,首先可以先分页查询聊天消息索引的id,box_type 代表是收的还是发的,select mid,box_type from im_user_msg_box t where t.owner_uid = 1 and t.other_uid = 2 order by mid;(注意要分页查),然后再for循环在im_msg_content表查每条消息内容展示。
# 内部项目中的调用消息接口.
为了不走网关直连消息API层,利用feign和Rabbion的轮询配置。
(发消息和kafka都在消息接口中)
# 离线消息服务设计
减少数据库压力,且把有些业务上不需要存储的数据,不落数据库和ES存储,按照客户端的配置,做的区分。
极大的减少了业务的存储压力。就这样还是做了分表处理。
# Redis数据
receiverId 是用户id + to + 群组 或者 发送用户
mid 是客户端生成的时间戳的UUID
msg 是消息内容
- 添加消息:zadd offline_msg_#{receiverId} #{mid} #{msg} // score就存储消息的id _
- 查询消息:zrevrange offline_msg_#{receiverId} 0 9 // 按消息id从大到小排序取最新的 十条消息,上拉刷新继续查 _
- 删除消息:zremrangebyscore offline_msg_#{receiverId} 0 max_mid // 删除客户端已 读取过的介于最小的消息id和最大的消息id之间的所有消息
- 如果单个key消息存储过大,目前采用的是定时晚上查询数据库大字段做的处理,也可以考虑按周或者按月针对同一个receiverId多搞几个key分段来存储。
# 未读数设计
- 该用户下所有的未读数。
- 每个离线集合的key String 自增。
- 群未读数设计,hincrby msg:noreadcount:gid uid 1 (gid为群id,uid为用户id)。
- 用lua脚本保证里面的原子性。
# 消息服务某节点下线
需要对应连接的客户端主动去重连并且拉离线消息接口等业务操作。
# 正常下线
如果是正常发布的话,再脚本里面有触发spring-的shutdown机制:
- 注销nacos的节点,不再接受新的连接
- 循环关闭所有客户端连接,以便客户端重连到其他节点。
- 睡眠一会,以保证上面都是好的。
- 循环redis中的hash判断,之前所有的客户端,是否还是本机ip,是的话就发送下线消息到MQ。
- 删除上面的hash。
# 类似宕机
xxl-job跑类似获得,不健康的节点,操作上面的3操作。不及时也没关系。
# 异常情况
- 在消息API层中,发消息重试机制中,做最后的判断,是否删除reids对应的string
- 再消息API层中,报错未知发送消息错误,会触发客户端的主动重连操作。
# 其他问题
redis存客户端的uuid时间戳,有效时间半小时,来保证幂等性问题。
剔除长时间空闲的连接,目前采用的是客户端自己剔除。
如果业务端掉线或者清了他们的nextID,这种情况怎么办???
# 有序性的保证
# 基于客户端的有序性
利用序号和下次序号,实现了一套保证全局有序性,同时拥有背压敏感协议限流、和消息顺序阻塞报警和黑名单控制的可选方案。
我们后台有个接口专门获得他的nextId。 一般都是在他们上线后调用,以保证下次nextId不会断。 正常调用下线接口会有nextId不会存在客户端了。(如果业务端掉线或者清了他们的nextID,这种情况怎么办??? )
他发送消息会有这个id和自己生成的nextId,再后端会存入下次需要的nextId,以防客户端丢失,放到hash集合里面去,通过nextId找一只找,没有找到再判断这次消费是不是的,都不是就退出循环。
# 群聊已读功能
群聊要有已读用户列表,每条消息都要有这些已读功能。
# 读扩散
用户在群里发一条消息只存一份数据,群里所有人都读同一份消息数据。之前的实现,实现不了群聊已读功能
# 写扩散
群聊消息表,消息索引已读表。
就是说用户在群里发一条消息会针对群里每个用户都存一条消息索引,然后再单独 存储一份消息内容,这样可以针对用户是否已读做一些处理,但是写扩散有一个问题就是群的人数不能太 多,否则性能会有问题,而且会有大量存储浪费,比如万人群聊,要是用写扩散,每个用户发一条消息, 要存储上万条索引,这个对性能以及存储耗费太大。
# 根据bitmap
一条消息就是一条数组,数组大小就是用户的顺序,很可惜redis没有获得所有bitmap的内容,这个需要自己实现。
# 存消息对应的位置
一个群消息表。一个群消息用户表它里面2个字段,上次读取一批消息最大值id(next),最近读取一批消息最大值id ,过滤条件就可以是当前消息id是否再这之间。
这样就变成了读扩散了。
# 百万级别的群聊,直播间
# 未读数更新高并发问题
- 问题:一条消息就1百万次redis,这肯定不能这样做。
- 解决: 所以我们可以不实时更新服务端的未读数,而改为jvm内存操作+定时批量更新redis。
- 思考: 当然中间可能会出现未读数服务宕机导致丢失部分未读数, 一般来说业务是能够允许的。
# 消息发送高并发难题
- 问题: 按照上面的架构,要给群里百万个用户转发消息,意味着一条消息就要要查询1百万次redis里的netty服务路由信息,再去发送
- 解决: 把用户连接均匀的分配给每台netty,直接给所有netty发消息(不查redis了),在每台netty里根据群聊id找到所有对应用户连接,发送消息。
思考:
- 如果有台服务器,没有分配到群聊id的用户,消息怎么处理呢?? 直接丢弃。
- netty服务器多了,那么你同步轮询往netty发消息,就慢了,最终服务就挂了吧?? 这个可以用MQ解耦/异步/削峰吧;
- 上述的MQ异步之后,但还是同步的,意味着还是有顺序的??? 那么就把MQ放到每个netty上去,重新搭建一个MQ集群专门给这些群聊消息,这些只给netty监听。
- 并发高了,怎么办?? 放心上面的MQ机制已经平稳的处理netty的群聊消息了,但是其他服务我们可以用限流、熔断做处理。
- 能不能用UDP,减少TCP的ACK机制呢??? 不可以,群聊消息要保证可靠。直播间可以做的。
# 百万直播间
- 同时发送1万个人发消息的高并发问题??? 在业务上是,看不过来的,可以丢掉一些消息的,可以采用上面的UDP,甚至过载了不重要的用户直接丢弃不发消息也是可以的,但是这里最好做用户画像区分对待,避免错过金主用户。
# 百万关注的消息流
# 读扩散
实现: redis存消息id,再回查mysql的消息表。存储的一份,需要多次回查。
问题: 这种对于一般的是可以,用户查询频繁回查mysql就多了,这样mysql会有问题。
思考: 把消息放redis,不再回查mysql ??? 那也不行,用户都频繁查redis是不是也会有问题呀,同时把消息放到redis是不是存储也大,这个思路就是下面的写扩散。
总结: 这种对查询要求高,节省存储, 适合给那些不咋上线的用户。 回查总是慢的
# 写扩散
实现: 利用上面私聊的思路,发件箱和收件箱 ,他发一条消息,同时往他的粉丝收件箱写一份。
问题: 粉丝几百万的话,就会写几百万条消息,直接GG.
总结: 这个适合小粉丝量的。
# 读写混合扩散
- 实现: 活跃用户写扩散,非活跃用户读扩散,非活跃用户读取消息后把这些消息写到自己的收件箱(改进)
- 思考: 这个方案业务相关,真正活跃的粉丝其他并不多,所以用写扩散(以空间换时间ok),但是对非活跃的粉丝给他存就是浪费空间存储。
- 问题: 非活跃用户用读扩散方案,那么第一次查询也慢呀??? 这个没有办法,但是这里对读扩散做了优化,之后的重复查询就会快了,同时该用户经常上线后用户状态也会变成活跃用户,就有写扩散的机制了。
- 总结: 这套方案复杂,需要根据用户标签做不同方案处理。
# 总结
- 大V 采用读写混合扩散.
- 小V 采用写扩散.
- 读扩散查询慢就不适合。
- 收件箱可以是mysql、redis等等。但是发件箱最好是关系型数据库。