【C#】【Demo】消息队列RabbitMQ,和TCP监听Demo。

【C#】【Demo】消息队列RabbitMQ,和TCP监听Demo。
注意连接队列服务器时,参数必须和服务器配置⼀致
  private string queue;//队列名
private bool durable;//持久化
private bool exclusive;//独占
private bool autoDelete;//⾃动删除
默认帐号guest不能远程。
默认访问队列端⼝是5672,后台⽹站端⼝默认是15672。
1、实现发送和接收,类RabbitMQServerT
using System;
using MQServer;
using RabbitMQ.Client;
using System.Text;
using System.Configuration;
using RabbitMQ.Client.Events;
using Newtonsoft.Json;
namespace MQServer
{
///<summary>
/// RabbitMQ消息队列类
///</summary>
public class RabbitMQServerT
{
protected readonly Action<string, object> receive;//接收回调
private object penetrate;//接收回调透传参数
private string queue;//队列名
private bool durable;//持久化
private bool exclusive;//独占
private bool autoDelete;//⾃动删除
private bool isBeginInvoke;//接收后业务是否异步,异步的话消息可能在确认前被其他线程读⾛,造成重复读。//不异步就阻塞。//异步请独占
//接收消息对象
private IConnection connection;
private IModel channel;
public bool IsReceive;
private ConnectionFactory factory;
private RabbitMQServerT()
{
}
///<summary>
///使⽤默认配置参数
///</summary>
///<param name="_receive">消费事件,空则不消费</param>
///<param name="_queue">消息路径最后⼀层名字,可⽤于区分业务</param>
/
//<param name="_penetrate">接收回调透传参数</param>
public RabbitMQServerT(Action<string, object> _receive, string _queue = @"hello", object _penetrate = null)
{
queue = _queue;
receive = _receive;
penetrate = _penetrate;
isBeginInvoke = false;
durable = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_durable"].ToString());//
exclusive = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_exclusive"].ToString());//
autoDelete = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_autoDelete"].ToString());//
factory = new ConnectionFactory();
factory.HostName = ConfigurationManager.AppSettings["RabbitMQHostName"];//RabbitMQ服务器
factory.UserName = ConfigurationManager.AppSettings["RabbitMQUserName"];//⽤户名
factory.Password = ConfigurationManager.AppSettings["RabbitMQPassword"];//密码
factory.Port = int.Parse(ConfigurationManager.AppSettings["RabbitMQPort"].ToString());//
if (!string.IsNullOrWhiteSpace(ConfigurationManager.AppSettings["RabbitMQVirtualHost"]))
{
factory.VirtualHost = ConfigurationManager.AppSettings["RabbitMQVirtualHost"];//
}
}
///<summary>
///使⽤⼿动参数
/
//</summary>
///<param name="_receive">消费事件,空则不消费</param>
///<param name="_queue">消息路径最后⼀层名字,可⽤于区分业务</param>
///<param name="_penetrate">接收回调透传参数</param>
///<param name="factory">连接队列服务器</param>
///<param name="durable">持久化</param>
///<param name="exclusive">独占</param>
///<param name="autoDelete">⾃动删除</param>
///<param name="isBeginInvoke">接收是否异步//异步请独占,否则异常</param>
public RabbitMQServerT(Action<string, object> _receive, string _queue, object _penetrate, ConnectionFactory factory
,bool durable,bool exclusive, bool autoDelete,bool isBeginInvoke)
{
queue = _queue;
receive = _receive;
penetrate = _penetrate;
this.factory = factory;
this.durable = durable;
this.autoDelete = autoDelete;
this.isBeginInvoke = isBeginInvoke;
//异步请独占,不然会重复读
if (isBeginInvoke == true && exclusive == false)
{
throw new Exception("接收消息队列对象RabbitMQServerT参数isBeginInvoke=true异步执⾏接收业务,如果要异步执⾏业务,请独占该消息exclusive=true,否则会被其他线程重复读取。");            }
}
///<summary>
///发送消息
///</summary>
///<param name="message"></param>
public void Send(string message)
{
//发送消息队列
try
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);//创建⼀个消息队列
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", queue, null, body); //开始传递
//TLogHelper.Info(message, "RabbitMQServerTSend");//发送的内容写进TXT
}
}
}
catch (Exception ex)
{
TLogHelper.Error(ex.Message, "发送消息队列异常RabbitMQServerTSend:\n" + message);
}
}
///<summary>
///发送消息
///</summary>
///<param name="message"></param>
public void Send(RabbitMQMsgModel model)
{
//发送消息队列
string message = JsonConvert.SerializeObject(model);
Send(message);
}
///<summary>
///进⾏接收消息队列
///</summary>
public void Receive()
{
if (receive == null)
{
return;
}
IsReceive = true;
try
{
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//接收后业务
if (isBeginInvoke)
{
receive?.BeginInvoke(message, penetrate,(e)=>{
//确认消息
channel.BasicAck(ea.DeliveryTag, false);
},null);
}
else
{
receive?.Invoke(message, penetrate);
//确认消息
channel.BasicAck(ea.DeliveryTag, false);
}
}
catch (Exception ex)
{
TLogHelper.Error(ex.Message, "接收消息队列业务异常Received:"+queue);                    }
finally
{
//再次⽣成接收
Receive();
}
};
channel.BasicConsume(queue, true, consumer);
}
catch (Exception ex)
{
TLogHelper.Error(ex.Message, "接收消息队列异常Receive");
}
}
///<summary>
///取消接收
///</summary>
public void EndReceive()
{
IsReceive=false;
channel.Dispose();
消息推送服务connection.Dispose();
}
}
}
View Code
消息格式RabbitMQMsgModel
namespace MQServer
{
public class RabbitMQMsgModel
{
/
//<summary>
///业务名
///</summary>
public string BLLName { get; set; }
///<summary>
///业务数据
///</summary>
public object Data { get; set; }
}
车载影院
}
View Code
2、BLL实现消息业务
BaseMQBLL
using System;
namespace MQServer
{
///<summary>
///使⽤队列业务基类
///</summary>
public abstract class BaseMQBLL : IDisposable
{
public bool IsReceive { get { return MQ.IsReceive; } }
protected readonly RabbitMQServerT MQ;
private BaseMQBLL(){ }
protected BaseMQBLL(string queue,object _penetrate)
{
MQ = new MQServer.RabbitMQServerT((string source, object o) => {
try
{
ReceiveBack(source, _penetrate);
////test
//throw new Exception("测试消息异常");
}
catch (Exception)
{
throw;
}
}, queue, _penetrate: null);
}
///<summary>
///开启接收
///</summary>
public void Receive()
{
MQ.Receive();
}
///<summary>
///关闭接收
///</summary>
public void EndReceive()
{
MQ.EndReceive();
}
///<summary>
/
引道结构图//声明必须重写的接收回调
///</summary>
///<param name="source"></param>
///<param name="receiveO"></param>
protected abstract void ReceiveBack(string source, object receiveO);
public void Dispose()
{
EndReceive();
}
}
}
View Code
MQTestHello: BaseMQBLL
using MQServer;
using Newtonsoft.Json;
namespace BLL
{
public class MQTestHello : BaseMQBLL
{
public MQTestHello()
: base("hello", null)
{
}
///<summary>
///重写接收回调
///</summary>
///<param name="source"></param>
///<param name="receiveO"></param>
protected override void ReceiveBack(string source, object receiveO)
{
//解析source,根据source中的BLLName⽅法名,执⾏不同业务
RabbitMQMsgModel model = JsonConvert.DeserializeObject<RabbitMQMsgModel>(source);
switch (model.BLLName)
{
case"Hello":
Hello(model.Data);
break;
default:
break;
}
}
///<summary>
///发送Hello消息
/
//</summary>
public void SendHello(string msg)
{
MQ.Send(new RabbitMQMsgModel() { BLLName = "Hello", Data = msg });
}
///<summary>
///接收到Hello消息回调
///</summary>茂发跳跳糖
///<param name="data"></param>
public void Hello(object data)
{
TLogHelper.Info(JsonConvert.SerializeObject(data), "读取消息在MQTestHello");
}
}
}
View Code
3、记录⽇志
TLogHelper
using Newtonsoft.Json;
using System;
using System.IO;
using System.Messaging;
带风扇的安全帽
using System.Text;
namespace MQServer.Log
{
public class TLogHelper
{
public static object _lock = new object();
public static void MQ(Message myMessage,  string detail = "")
{
string msg = JsonConvert.SerializeObject(myMessage.Body);
Write(msg, detail, "MessageQueue");
}
public static void Info(string msg, string detail = "")
{
Write(msg, detail, "Info");
}
public static void Info(object msg, string detail = "")
{
Write(JsonConvert.SerializeObject(msg), detail, "Info");
}
public static void Error(string msg, string detail = "")
{
Write(msg, detail, "Error");
}
private static void Write(string msg,string detail="", string title = "Info")
{
DateTime now = DateTime.Now;
string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"];
if (!Directory.Exists(logPath))
{
Directory.CreateDirectory(logPath);
}
logPath += now.ToString("yyyyMMdd") + ".txt";
lock (_lock)
{
FileStream fs = new FileStream(@logPath, FileMode.OpenOrCreate, FileAccess.Write);
StreamWriter m_streamWriter = new StreamWriter(fs);
m_streamWriter.BaseStream.Seek(0, SeekOrigin.End);
m_streamWriter.WriteLine();
m_streamWriter.WriteLine(now.ToString("yyyyMMddHHmmssfff") + "" + title);
if (!string.IsNullOrWhiteSpace(detail))
{
m_streamWriter.WriteLine(detail);
}
m_streamWriter.WriteLine(msg);
m_streamWriter.Flush();
m_streamWriter.Close();
fs.Close();
}
}
public static string Read()
{
string res = "";
string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"];            logPath += DateTime.Now.ToString("yyyyMMdd") + ".txt";
lock (_lock)
{
StreamReader fs = new StreamReader(@logPath, Encoding.UTF8);
res = fs.ReadToEnd();
fs.Dispose();
}
return res;
}
}
}
View Code
4、Form窗体测试
RabbitMQForm : Form
using BLL;
using Newtonsoft.Json;
using System;
文件传输解决方案using System.Windows.Forms;

本文发布于:2024-09-21 17:30:03,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/4/183100.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   接收   队列
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议