使用RedisMQ 做分布式改造,从此放心安全迭代

释放双眼,带上耳机,听听看~!

引言

  熟悉TPL Dataflow博文的朋友可能记得这是个单体程序,使用TPL Dataflow 处理工作流任务, 在使用Docker部署的过程中, 有一个问题一直无法回避:

       在单体程序部署的瞬间会有少量 流量无法处理;更糟糕的情况下,迭代部署的这个版本有问题,上线后无法运作, 更多的流量没有得到处理。

      背负神圣使命(巨大压力)的程序猿心生一计, 为何不将单体程序改成分布式:服务A只接受数据,服务B只处理数据。

 使用RedisMQ 做分布式改造,从此放心安全迭代

 

知识储备:

    消息队列和订阅发布作为老生常谈的两个知识点被反复提及,按照JMS的规范, 官方称为点对点(point to point, queue) 和 订阅发布(publish/subscribe,topic ),

点对点:

  消息生产者生产消息发送到queue中,然后消费者从queue中取出并且消费消息。

注意:

消息被消费以后,queue中不再有存储,所以消费者不可能消费到已经被消费的消息。

Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者。

发布/订阅

  消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

注意:

发布者将消息发布到通道中,而不用知道订阅者是谁(不关注是否存在订阅者);订阅者可收听自己感兴趣的多个通道, 也不需要知道发布者是谁(不关注是哪个发布者)。

故如果没有消费者,发布的消息将得不到处理;

使用RedisMQ 做分布式改造,从此放心安全迭代

头脑风暴 

本次采用的消息队列模型:

  •    解耦业务:  新建Receiver程序作为生产者,专注于接收并发送到队列;原有的webapp作为消费者专注数据处理。
  •    起到削峰填谷的作用, 若建立多个消费者webapp容器,还能形成负载均衡的效果。 

Redis 原生支持发布/订阅 模型,内置的List数据结构亦能形成轻量级MQ的效果。

    需要关注Redis 两个命令( 左进右出,右进左出同理):

    LPUSH  &  RPOP/BRPOP

Brpop 中的B 表示 “Block”, 是一个rpop命令的阻塞版本:若指定List没有新元素,在给定时间内,该命令会阻塞当前redis客户端连接,直到超时返回nil

编程实践

本次使用 AspNetCore 完成RedisMQ的实践。引入Redis国产第三方开源库 CSRedisCore.

不使用著名的StackExchange.Redis 组件库的原因:

  • 之前一直使用StackExchange.Redis, 参考了很多资料,做了很多优化,并未完全解决RedisTimeoutException问题 

  • StackExchange.Redis基于其多路复用机制,不支持阻塞式命令, 故采用了 CSRedisCore,该库强调了API 与Redis官方命令一致,很容易上手

生产者Receiver:

------------------截取自Startup.cs------------------------------ 
      public void ConfigureServices(IServiceCollection services)
      {
            var csredis = new CSRedisClient(Configuration.GetConnectionString(\"redis\"));
            RedisHelper.Initialization(csredis);
            services.AddMvc();
      }

---------------------截取自数据接收Controller-------------------
     [Route(\"batch\")]
     [HttpPost]
     public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs)
     {
            if (!ModelState.IsValid)
                throw new ArgumentException(\"Http Body Payload Error.\");
            var redisKey = $\"{DateTime.Now.ToString(\"yyyyMMdd\")}\"; 
            eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);
            if (eqidPairs != null && eqidPairs.Any())
                RedisHelper.LPush(redisKey, eqidPairs.ToArray());
            await Task.CompletedTask;
     }

 消费者webapp:

     根据RedisMQ的事件推送方式,需要轮询Redis  List 数据结构,这里使用AspNetCore内置的BackgroundService 实现了 后台轮询任务。

    public class BackgroundJob : BackgroundService
    {
        private readonly IEqidPairHandler _eqidPairHandler;
        private readonly ILogger _logger;
        public BackgroundJob(IEqidPairHandler eqidPairHandler, ILoggerFactory loggerFactory)
        {
            _eqidPairHandler = eqidPairHandler;
            _logger = loggerFactory.CreateLogger(nameof(BackgroundJob));
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation(\"Service starting\");

            while (!stoppingToken.IsCancellationRequested)
            {
                var key = $\"eqidpair:{DateTime.Now.ToString(\"yyyyMMdd\")}\";
                var eqidpair = RedisHelper.BRPop(5, key);
                if (eqidpair != null)
                    await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));
                else
                    await Task.Delay(1000, stoppingToken);
                
            }
            _logger.LogInformation(\"Service stopping\");
        }
    }

使用RedisMQ 做分布式改造,从此放心安全迭代
使用RedisMQ 做分布式改造,从此放心安全迭代

var redis = new CSRedisClient[16]; //定义成单例
            for (var a = 0; a < redis.Length; a++)
                redis[a] = new CSRedisClient(Configuration.GetConnectionString(\"redis\") + \",defualtDatabase=\" + a);
            services.AddSingleton<CSRedisClient[]>(redis);
            RedisHelper.Initialization(redis[0]);

注册CSRedisCore服务

 最后依照引言中的部署原理图,将Nginx,Receiver, WebApp dockerize, 并且让 webapp 依赖于Nginx,Receiver

-------------------截取自docker-compose.yml文件---------------------- 

 app:
    build:
      context: ./app
      dockerfile: Dockerfile
    expose:
      - \"80\"
    extra_hosts:
      - \"dockerhost:172.18.0.1\"
    environment:
      TZ: Asia/Shanghai
    volumes:
      - type: bind
        source: /mnt/eqidmanager/eqidlogs
        target: /app/eqidlogs
      - type: bind
        source: /mnt/eqidmanager/applogs
        target: /app/logs
      - type: bind
        source: /home/huangjun/eqidmanager/EqidManager.db
        target: /app/EqidManager.db
    healthcheck:
      test: [\'CMD\',\'curl\',\'-f\',\'http://localhost/healthcheck\']
      interval: 1m30s
      timeout: 10s
      retries: 3 depends_on: - receiver - proxy
    logging:
      options:
        max-size: \"200k\"
        max-file: \"10\"
    privileged: true 

       ① 根据docker-compsoe up命令的用法,若Receiver容器正在运行且服务配置并未改变,该容器不会被停止。

  ② 根据官方文档对于depends_on 指令的说明,该指定决定了容器启动和停止的顺序,因此引言中需要 【暂存流量】刚性需求可以得到满足
  使用RedisMQ 做分布式改造,从此放心安全迭代

  改造上线之后,效果很明显,现在可以放心安全的迭代 TPL DataFlow数据处理程序。

作者:
JulianHuang

码甲拙见,如有问题请下方留言大胆斧正;码字+Visio制图,均为原创,看官请不吝好评+关注,  ~。。~

本文欢迎转载,请转载页面明显位置注明原作者及原文链接

 

给TA打赏
共{{data.count}}人
人已打赏
随笔日记

WinForm开发中通用附件管理控件设计开发参考

2020-11-9 6:18:39

随笔日记

(14)ASP.NET Core 中的日志记录

2020-11-9 6:18:41

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索