Files
shgx_tz_mes_backend_sync/ZR.Common/MqttHelper/MqttService.cs

425 lines
15 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure.Attribute;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
namespace ZR.Common.MqttHelper
{
public class MqttService : IHostedService, IDisposable
{
private readonly ILogger<MqttService> _logger;
private readonly IConfiguration _configuration;
private readonly IMqttClient _mqttClient;
private readonly MyMqttConfig _mqttConfig; // 注入配置类
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1, 1);
private readonly CancellationTokenSource _disposeCts = new CancellationTokenSource();
private Timer _reconnectTimer;
private bool _disposed = false;
private int _reconnectAttempts = 0;
// 配置常量
private const string MqttConfigSection = "MqttConfig";
private const string TopicsConfigKey = "Topics";
private const int DefaultReconnectDelaySeconds = 5;
private const int MaxReconnectAttempts = 10;
private const int MaxReconnectDelaySeconds = 60;
public MqttService(
ILogger<MqttService> logger,
IConfiguration configuration,
MyMqttConfig mqttConfig
)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_configuration =
configuration ?? throw new ArgumentNullException(nameof(configuration));
_mqttConfig = mqttConfig ?? throw new ArgumentNullException(nameof(mqttConfig)); // 注入配置类
// 创建MQTT客户端
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 注册事件处理程序
_mqttClient.ConnectedAsync += OnConnectedAsync;
_mqttClient.DisconnectedAsync += OnDisconnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
_logger.LogInformation($"MqttService 实例已创建,哈希值: {GetHashCode()}");
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("MQTT服务正在启动...");
try
{
await ConnectAsync(cancellationToken);
_logger.LogInformation("MQTT服务已成功启动");
}
catch (Exception ex)
{
_logger.LogCritical(ex, "MQTT服务启动失败将在后台尝试重连");
ScheduleReconnect(); // 即使启动失败,也应该尝试重连
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("MQTT服务正在停止...");
// 取消所有计划的重连
_disposeCts.Cancel();
_reconnectTimer?.Dispose();
await DisconnectAsync(cancellationToken);
_logger.LogInformation("MQTT服务已停止");
}
private async Task ConnectAsync(CancellationToken cancellationToken)
{
await _connectionLock.WaitAsync(cancellationToken);
try
{
if (_mqttClient.IsConnected)
{
_logger.LogDebug("MQTT客户端已连接跳过连接操作");
return;
}
// 直接使用MyMqttConfig获取客户端选项
var options = _mqttConfig.BuildMqttClientOptions();
_logger.LogInformation($"正在连接到MQTT代理服务器 {options.ChannelOptions}...");
try
{
await _mqttClient.ConnectAsync(options, cancellationToken);
_reconnectAttempts = 0; // 重置重连计数
_logger.LogInformation("已成功连接到MQTT代理服务器");
}
catch (OperationCanceledException)
{
_logger.LogWarning("MQTT连接操作被取消");
throw;
}
catch (Exception ex)
{
_reconnectAttempts++;
_logger.LogError(
ex,
$"连接MQTT代理服务器失败 (尝试次数: {_reconnectAttempts}/{MaxReconnectAttempts})"
);
if (_reconnectAttempts >= MaxReconnectAttempts)
{
_logger.LogCritical("达到最大重连次数,停止尝试");
return;
}
ScheduleReconnect();
throw;
}
}
finally
{
_connectionLock.Release();
}
}
private MqttClientOptions BuildMqttClientOptions(IConfigurationSection config)
{
var builder = new MqttClientOptionsBuilder()
.WithClientId(config["ClientId"] ?? Guid.NewGuid().ToString())
.WithTcpServer(config["Server"], config.GetValue<int>("Port", 1883))
.WithCleanSession();
return builder.Build();
}
private async Task DisconnectAsync(CancellationToken cancellationToken)
{
if (!_mqttClient.IsConnected)
{
_logger.LogDebug("MQTT客户端未连接跳过断开操作");
return;
}
try
{
await _mqttClient.DisconnectAsync(cancellationToken: cancellationToken);
_logger.LogInformation("已成功断开与MQTT代理服务器的连接");
}
catch (OperationCanceledException)
{
_logger.LogWarning("MQTT断开操作被取消");
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "断开MQTT连接时出错");
// 即使断开连接失败,也应该释放资源
CleanupClientResources();
}
}
private Task OnConnectedAsync(MqttClientConnectedEventArgs e)
{
_logger.LogInformation("MQTT连接已建立");
return SubscribeToTopicsAsync();
}
private async Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs e)
{
_logger.LogWarning($"MQTT连接已断开 - 客户端是否之前已连接: {e.ClientWasConnected}");
if (_disposed || _disposeCts.IsCancellationRequested)
{
_logger.LogDebug("MQTT断开连接是预期行为正在关闭服务");
return;
}
if (e.ClientWasConnected)
{
ScheduleReconnect();
}
}
private async Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
_logger.LogInformation(
$"收到MQTT消息 - 主题: {e.ApplicationMessage.Topic}, QoS: {e.ApplicationMessage.QualityOfServiceLevel}"
);
// 消息处理委托给专用的处理器
await ProcessMessageAsync(e.ApplicationMessage.Topic, payload);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理MQTT消息时出错");
}
}
private async Task ProcessMessageAsync(string topic, string payload)
{
try
{
// 这里可以根据主题路由消息到不同的处理程序
switch (topic)
{
case string t when t.StartsWith("devices/"):
await HandleDeviceMessage(topic, payload);
break;
case "system/alert":
await HandleSystemAlert(payload);
break;
default:
_logger.LogDebug($"未处理的MQTT主题: {topic}");
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理主题 '{topic}' 的消息时出错");
}
}
private async Task SubscribeToTopicsAsync()
{
if (!_mqttClient.IsConnected)
{
_logger.LogWarning("无法订阅主题MQTT客户端未连接");
return;
}
try
{
// 获取配置类生成的订阅选项
var subscribeOptions = _mqttConfig.GetSubscribeToTopicsAsync();
if (subscribeOptions == null || !subscribeOptions.TopicFilters.Any())
{
_logger.LogInformation("没有配置要订阅的MQTT主题");
return;
}
_logger.LogInformation(
$"正在订阅MQTT主题: {string.Join(", ", subscribeOptions.TopicFilters.Select(f => f.Topic))}"
);
// 使用强类型的订阅选项进行订阅
var result = await _mqttClient.SubscribeAsync(subscribeOptions);
foreach (var item in result.Items)
{
_logger.LogInformation(
$"订阅主题 '{item.TopicFilter.Topic}' 结果: {item.ResultCode}"
);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "订阅MQTT主题时出错");
ScheduleReconnect();
}
}
private void ScheduleReconnect()
{
// 实现指数退避算法
var delaySeconds = Math.Min(
DefaultReconnectDelaySeconds * (int)Math.Pow(2, Math.Min(_reconnectAttempts, 5)),
MaxReconnectDelaySeconds
);
_logger.LogInformation($"计划在 {delaySeconds} 秒后尝试重新连接MQTT代理服务器");
// 使用Timer替代Task.Run更好地控制资源
_reconnectTimer?.Dispose();
_reconnectTimer = new Timer(
async _ =>
{
if (_disposed || _disposeCts.IsCancellationRequested)
{
_logger.LogDebug("跳过重连:服务正在关闭");
return;
}
try
{
await ConnectAsync(_disposeCts.Token);
}
catch (Exception ex)
{
_logger.LogError(ex, "计划的MQTT重连失败");
}
},
null,
TimeSpan.FromSeconds(delaySeconds),
Timeout.InfiniteTimeSpan
);
}
private Task HandleDeviceMessage(string topic, string payload)
{
_logger.LogInformation($"处理设备消息: {topic} - {payload}");
// 这里添加设备消息处理逻辑
return Task.CompletedTask;
}
private Task HandleSystemAlert(string payload)
{
_logger.LogWarning($"系统警报: {payload}");
// 这里添加系统警报处理逻辑
return Task.CompletedTask;
}
public async Task PublishAsync(
string topic,
string payload,
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce,
bool retain = false
)
{
if (string.IsNullOrEmpty(topic))
{
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
}
if (payload == null)
{
throw new ArgumentNullException(nameof(payload), "MQTT消息内容不能为空");
}
await _connectionLock.WaitAsync(_disposeCts.Token);
try
{
if (!_mqttClient.IsConnected)
{
_logger.LogWarning("发布消息前需要连接MQTT代理服务器");
//await ConnectAsync(_disposeCts.Token);
throw new Exception("发布消息前需要连接MQTT代理服务器");
}
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(retain)
.Build();
_logger.LogDebug($"准备发布MQTT消息 - 主题: {topic}, QoS: {qos}, 保留: {retain}");
try
{
var result = await _mqttClient.PublishAsync(message, _disposeCts.Token);
_logger.LogInformation($"已发布MQTT消息 - 主题: {topic}, 结果: {result.ReasonCode}");
}
catch (OperationCanceledException)
{
_logger.LogWarning("MQTT发布操作被取消");
throw;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"发布MQTT消息失败 - 主题: {topic}");
throw;
}
finally
{
_connectionLock.Release();
}
}
private void CleanupClientResources()
{
try
{
_mqttClient.ApplicationMessageReceivedAsync -= OnMessageReceivedAsync;
_mqttClient.ConnectedAsync -= OnConnectedAsync;
_mqttClient.DisconnectedAsync -= OnDisconnectedAsync;
if (_mqttClient.IsConnected)
{
_mqttClient.DisconnectAsync().GetAwaiter().GetResult();
}
_mqttClient.Dispose();
}
catch (Exception ex)
{
_logger.LogError(ex, "清理MQTT客户端资源时出错");
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_disposeCts.Cancel();
_reconnectTimer?.Dispose();
_connectionLock.Dispose();
CleanupClientResources();
_disposeCts.Dispose();
}
_disposed = true;
_logger.LogDebug("MQTT服务已释放所有资源");
}
}
}
}