前言
最近在忙一个高考项目,看着系统顺利完成了这次高考,终于可以松口气了。看到那些即将参加高考的学生,也想起当年高三的自己。
下面分享下RabbitMQ实战经验,希望对大家有所帮助:
一、生产消息
关于RabbitMQ的基础使用,这里不再介绍了,项目中使用的是Exchange中的topic模式。
先上发消息的代码
private bool MarkErrorSend(string[] lstMsg) { try { var factory = new ConnectionFactory() { UserName = \"guest\",//用户名 Password = \"guest\",//密码 HostName = \"localhost\",//ConfigurationManager.AppSettings[\"sHostName\"], }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); try { //定义一个Direct类型交换机 channel.ExchangeDeclare( exchange: \"TestTopicChange\", //exchange名称 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自动删除,一般设成false arguments: null//一些结构化参数,比如:alternate-exchange ); //定义测试队列 channel.QueueDeclare( queue: \"Test_Queue\", //队列名称 durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效) exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除 autoDelete: false,//是否自动删除,一般设成false arguments: null ); //将队列绑定到交换机 string routeKey = \"TestRouteKey.*\";//*匹配一个单词 channel.QueueBind( queue: \"Test_Queue\", exchange: \"TestTopicChange\", routingKey: routeKey, arguments: null ); //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效) IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.ConfirmSelect();//发送确认机制 foreach (var itemMsg in lstMsg) { byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg); //发布消息 channel.BasicPublish( exchange: \"TestTopicChange\", routingKey: \"TestRouteKey.one\", basicProperties: properties, body: sendBytes ); } bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回true return isAllPublished; } catch (Exception ex) { //写错误日志 return false; } finally { channel.Close(); connection.Close(); } } catch { //RabbitMQ.Client.Exceptions.BrokerUnreachableException: //When the configured hostname was not reachable. return false; } }
发消息没啥特别的。关于消息持久化的介绍这里也不再介绍,不懂的可以看上篇文章。发消息需要注意的地方是,可以选择多条消息一起发送,最后才确定消息发送成功,这样效率比较高;此外,需要尽量精简每条消息的长度(楼主在这里吃过亏),不然会因消息过长从而增加发送时间。在实际项目中一次发了4万多条数据没有出现问题。
二、接收消息
接下来说下消费消息的过程,我使用的是单个连接多个channel,每个channel每次只取一条消息方法。有人会问单个TCP连接,多个channel会不会影响通信效率。这个理论上肯定会有影响的,看影响大不大而已。我开的channel数一般去到30左右,并没有觉得影响效率,有可能是因为我每个channel是拿一条消息的原因。通过单个连接多个channel的方法,可以少开了很多连接。至于我为什么选每个channel每次只取一条消息,这是外界因素限制了,具体看自己需求。
接下接收消息的过程,首先定义一个RabbitMQHelper类,里面有个全局的conn连接变量,此外还有创建连接、关闭连接和验证连接是否打开等方法。程序运行一个定时器,当
检测到连接未打开的情况下,主动创建连接处理消息。
public class RabbitMQHelper { public IConnection conn = null; /// <summary> /// 创建RabbitMQ消息中间件连接 /// </summary> /// <returns>返回连接对象</returns> public IConnection RabbitConnection(string sHostName, ushort nChannelMax) { try { if (conn == null) { var factory = new ConnectionFactory() { UserName = \"guest\",//用户名 Password = \"guest\",//密码 HostName = sHostName,//ConfigurationManager.AppSettings[\"MQIP\"], AutomaticRecoveryEnabled = false,//取消自动重连,改用定时器定时检测连接是否存在 RequestedConnectionTimeout = 10000,//请求超时时间设成10秒,默认的为30秒 RequestedChannelMax = nChannelMax//与开的线程数保持一致 }; //创建连接 conn = factory.CreateConnection(); Console.WriteLine(\"RabbitMQ连接已创建!\"); } return conn; } catch { Console.WriteLine(\"创建连接失败,请检查RabbitMQ是否正常运行!\"); return null; } } /// <summary> /// 关闭RabbitMQ连接 /// </summary> public void Close() { try { if (conn != null) { if (conn.IsOpen) conn.Close(); conn = null; Console.WriteLine(\"RabbitMQ连接已关闭!\"); } } catch { } } /// <summary> /// 判断RabbitMQ连接是否打开 /// </summary> /// <returns></returns> public bool IsOpen() { try { if (conn != null) { if (conn.IsOpen) return true; } return false; } catch { return false; } } }
接下来我们看具体如何接收消息。
private static AutoResetEvent myEvent = new AutoResetEvent(false); private RabbitMQHelper rabbit = new RabbitMQHelper(); private ushort nChannel = 10;//一个连接的最大通道数和所开的线程数一致
首先初始化一个rabbit实例,然后通过RabbitConnection方法创建RabbitMQ连接。
当连接打开时候,用线程池运行接收消息的方法。注意了,这里开的线程必须和开的channel数量一致,不然会有问题(具体问题是,设了RabbitMQ连接超时时间为10秒,有时候不管用,原因未查明。RabbitMQ创建连接默认超时时间为30秒,假如在这个时间内再去调用创建的话,就有可能得到两倍的channel;)
/// <summary> /// 单个RabbitMQ连接开多个线程,每个线程开一个channel接受消息 /// </summary> private void CreateConnecttion() { try { rabbit.RabbitConnection(\"localhost\", nChannel); if (rabbit.conn != null) { ThreadPool.SetMinThreads(1, 1); ThreadPool.SetMaxThreads(100, 100); for (int i = 1; i <= nChannel; i++) { ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), \"\"); } myEvent.WaitOne();//等待所有线程工作完成后,才能关闭连接 rabbit.Close(); } } catch (Exception ex) { rabbit.Close(); Console.WriteLine(ex.Message); } }
接着就是接收消息的方法,处理消息的过程省略了。
/// <summary> /// 接收并处理消息,在一个连接中创建多个通道(channel),避免创建多个连接 /// </summary> /// <param name=\"con\">RabbitMQ连接</param> private void ReceiveMsg(object obj) { IModel channel = null; try { #region 创建通道,定义中转站和队列 channel = rabbit.conn.CreateModel(); channel.ExchangeDeclare( exchange: \"TestTopicChange\", //exchange名称 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自动删除,一般设成false arguments: null//一些结构化参数,比如:alternate-exchange ); //定义阅卷队列 channel.QueueDeclare( queue: \"Test_Queue\", //队列名称 durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效) exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除 autoDelete: false, arguments: null ); #endregion channel.BasicQos(0, 1, false);//每次只接收一条消息 channel.QueueBind(queue: \"Test_Queue\", exchange: \"TestTopicChange\", routingKey: \"TestRouteKey.*\"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; //处理消息方法 try { bool isMark = AutoMark(message); if (isMark) { //Function.writeMarkLog(message); //确认该消息已被消费,发消息给RabbitMQ队列 channel.BasicAck(ea.DeliveryTag, false); } else { if (MarkErrorSend(message))//把错误消息推到错误消息队列 channel.BasicReject(ea.DeliveryTag, false); else //消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者 channel.BasicReject(ea.DeliveryTag, true); } } catch (Exception ex) { try { Console.WriteLine(ex.Message); if (channel != null && channel.IsOpen)//处理RabbitMQ停止重启而自动评阅崩溃的问题 { //消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者 channel.BasicReject(ea.DeliveryTag, true); } } catch { } } }; //手动确认消息 channel.BasicConsume(queue: \"Test_Queue\", autoAck: false, consumer: consumer); } catch (Exception ex) { try { Console.WriteLine(\"接收消息方法出错:\" + ex.Message); if (channel != null && channel.IsOpen)//关闭通道 channel.Close(); if (rabbit.conn != null)//处理RabbitMQ突然停止的问题 rabbit.Close(); } catch { } } }
三、处理错误消息
把处理失败的消息放到“错误队列”,然后把原队列的消息删除(这里主要解决问题是,存在多个处理失败或处理不了的消息时,如果把这些消息都放回原队列,它们会继续分发到其他线程的channel,但结果还是处理不了,就会造成一个死循环,导致后面的消息无法处理)。把第一次处理不了的消息放到“错误队列”后,重新再开一个新的连接去处理“错误队列”的消息。
/// <summary> /// 把处理错误的消息发送到“错误消息队列” /// </summary> /// <param name=\"msg\"></param> /// <returns></returns> private bool MarkErrorSend(string msg) { RabbitMQHelper MQ = new RabbitMQHelper(); MQ.RabbitConnection(\"localhost\",1); //创建通道 var channel = MQ.conn.CreateModel(); try { //定义一个Direct类型交换机 channel.ExchangeDeclare( exchange: \"ErrorTopicChange\", //exchange名称 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自动删除,一般设成false arguments: null//一些结构化参数,比如:alternate-exchange ); //定义阅卷队列 channel.QueueDeclare( queue: \"Error_Queue\", //队列名称 durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效) exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除 autoDelete: false,//是否自动删除,一般设成false arguments: null ); //将队列绑定到交换机 string routeKey = \"ErrorRouteKey.*\";//*匹配一个单词 channel.QueueBind( queue: \"Error_Queue\", exchange: \"ErrorTopicChange\", routingKey: routeKey, arguments: null ); //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效) IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.ConfirmSelect();//发送确认机制 byte[] sendBytes = Encoding.UTF8.GetBytes(msg); //发布消息 channel.BasicPublish( exchange: \"ErrorTopicChange\", routingKey: \"ErrorRouteKey.one\", basicProperties: properties, body: sendBytes ); bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回true return isAllPublished; } catch (Exception ex) { //写错误日志 return false; } finally { channel.Close(); MQ.conn.Close(); } }
总结:RabbitMQ本身已经很稳定了,而且性能也很好,所有不稳定的因素都在我们处理消息的过程,所以可以放心使用。
Demo源码地址:https://github.com/Bingjian-Zhu/RabbitMQHelper