Files
shgxtzcjhoudaosaomadayinwpf/RIZO_Application/RIZO_Application.Infrastructure/Util/MqttHelper/MqttHelper.cs
2025-05-23 08:45:58 +08:00

297 lines
9.7 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
namespace RIZO_Helper.Tools
{
public class MqttHelper : IDisposable
{
private readonly IMqttClient _mqttClient;
private MqttClientOptions _options;
private bool _isDisposed;
// 用于确保只有一个后台重连任务运行
private bool _isReconnecting;
private readonly object _reconnectLock = new object();
// 定义消息接收事件
public event EventHandler<MqttApplicationMessageReceivedEventArgs>? MessageReceived;
public event EventHandler<Exception>? ConnectionFailed;
public Action<string, int, int> Disconnected;
public MqttHelper(string server, int port = 1883, string clientId = "wpf-demo", bool cleanSession = true)
{
if (string.IsNullOrEmpty(server))
throw new ArgumentNullException(nameof(server), "MQTT服务器地址不能为空");
if (string.IsNullOrEmpty(clientId))
throw new ArgumentNullException(nameof(clientId), "MQTT客户端ID不能为空");
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
_options = new MqttClientOptionsBuilder()
.WithTcpServer(server, port)
.WithClientId(clientId)
.WithCleanSession(cleanSession)
.Build();
_mqttClient.DisconnectedAsync += async e =>
{
Disconnected.Invoke($"MQTT连接断开原因: {e.Reason}", 0, 1);
Debug.WriteLine($"MQTT连接断开原因: {e.Reason}");
await StartReconnectIfNeededAsync();
};
_mqttClient.ApplicationMessageReceivedAsync += e =>
{
MessageReceived?.Invoke(this, e);
return Task.CompletedTask;
};
}
public bool IsConnected => _mqttClient.IsConnected;
public async Task<bool> ConnectAsync(CancellationToken cancellationToken = default)
{
if (_isDisposed)
throw new ObjectDisposedException(nameof(MqttHelper));
if (_mqttClient.IsConnected)
return true;
Debug.WriteLine($"正在连接MQTT服务器: {_options.ChannelOptions}");
try
{
await _mqttClient.ConnectAsync(_options, cancellationToken);
Debug.WriteLine("MQTT连接成功");
return true;
}
catch (OperationCanceledException)
{
Debug.WriteLine("MQTT连接操作被取消");
throw;
}
catch (Exception ex)
{
Debug.WriteLine($"MQTT连接失败: {ex.Message}");
ConnectionFailed?.Invoke(this, ex);
// 连接失败时启动重连
await StartReconnectIfNeededAsync();
return false;
}
}
public async Task<bool> SubscribeAsync(string topic, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce)
{
if (string.IsNullOrEmpty(topic))
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
if (!await EnsureConnectedAsync())
return false;
try
{
var topicFilter = new MqttTopicFilterBuilder()
.WithTopic(topic)
.WithQualityOfServiceLevel(qos)
.Build();
await _mqttClient.SubscribeAsync(topicFilter);
Debug.WriteLine($"成功订阅主题: {topic} (QoS: {qos})");
return true;
}
catch (Exception ex)
{
Debug.WriteLine($"订阅主题失败: {topic}, 错误: {ex.Message}");
return false;
}
}
public async Task<bool> PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, bool retain = false)
{
if (string.IsNullOrEmpty(topic))
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
if (payload == null)
throw new ArgumentNullException(nameof(payload), "MQTT消息内容不能为空");
if (!await EnsureConnectedAsync())
return false;
try
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(retain)
.Build();
await _mqttClient.PublishAsync(message);
Debug.WriteLine($"成功发布消息到主题: {topic} (QoS: {qos}, 保留: {retain})");
return true;
}
catch (Exception ex)
{
Debug.WriteLine($"发布消息失败: {topic}, 错误: {ex.Message}");
return false;
}
}
public async Task<bool> UnsubscribeAsync(string topic)
{
if (string.IsNullOrEmpty(topic))
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
if (!await EnsureConnectedAsync())
return false;
try
{
await _mqttClient.UnsubscribeAsync(topic);
Debug.WriteLine($"成功取消订阅主题: {topic}");
return true;
}
catch (Exception ex)
{
Debug.WriteLine($"取消订阅失败: {topic}, 错误: {ex.Message}");
return false;
}
}
public async Task DisconnectAsync()
{
if (_isDisposed)
return;
if (!_mqttClient.IsConnected)
return;
try
{
await _mqttClient.DisconnectAsync();
Debug.WriteLine("MQTT客户端已断开连接");
}
catch (Exception ex)
{
Debug.WriteLine($"断开MQTT连接时出错: {ex.Message}");
}
}
// 启动重连任务(如果没有正在运行的重连任务)
private async Task StartReconnectIfNeededAsync()
{
if (_isDisposed)
return;
// 使用锁确保只有一个重连任务启动
lock (_reconnectLock)
{
if (_isReconnecting)
return;
_isReconnecting = true;
}
try
{
await ReconnectWithBackoffAsync();
}
finally
{
// 无论重连成功或失败,都标记为重连已完成
lock (_reconnectLock)
{
_isReconnecting = false;
}
}
}
private async Task ReconnectWithBackoffAsync()
{
if (_isDisposed)
return;
int retries = 0;
const int maxRetries = 5;
const int baseDelayMs = 1000;
while (retries < maxRetries && !_isDisposed)
{
// 指数退避算法,避免频繁重试
int delayMs = baseDelayMs * (int)Math.Pow(2, retries);
Disconnected.Invoke($"将在 {delayMs}ms 后尝试重新连接MQTT服务器 (尝试 {retries + 1}/{maxRetries})", retries+1, maxRetries);
Debug.WriteLine($"将在 {delayMs}ms 后尝试重新连接MQTT服务器 (尝试 {retries + 1}/{maxRetries})");
await Task.Delay(delayMs);
if (await ConnectAsync())
return;
retries++;
}
Debug.WriteLine($"达到最大重试次数 ({maxRetries})停止尝试重新连接MQTT服务器");
ConnectionFailed?.Invoke(this, new Exception("MQTT重新连接失败达到最大重试次数"));
}
private async Task<bool> EnsureConnectedAsync()
{
if (_isDisposed)
throw new ObjectDisposedException(nameof(MqttHelper));
if (_mqttClient.IsConnected)
return true;
Debug.WriteLine("检测到MQTT连接断开尝试重新连接...");
return await ConnectAsync();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_isDisposed)
return;
_isDisposed = true;
if (disposing)
{
try
{
// 先取消事件订阅,防止在释放过程中触发事件
_mqttClient.DisconnectedAsync -= async e => await StartReconnectIfNeededAsync();
_mqttClient.ApplicationMessageReceivedAsync -= e =>
{
MessageReceived?.Invoke(this, e);
return Task.CompletedTask;
};
// 断开连接
if (_mqttClient.IsConnected)
_mqttClient.DisconnectAsync().GetAwaiter().GetResult();
// 释放资源
_mqttClient.Dispose();
}
catch (Exception ex)
{
Debug.WriteLine($"释放MQTT资源时出错: {ex.Message}");
}
}
}
}
}