Files
shgxtzcjhoudaosaomadayinwpf/RIZO_Application/RIZO_Application.Infrastructure/Util/MqttHelper/MqttHelper.cs

266 lines
8.8 KiB
C#
Raw Normal View History

using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
namespace RIZO_Helper.Tools
{
public class MqttHelper : IDisposable
{
private readonly IMqttClient _mqttClient;
private MqttClientOptions _options;
private bool _isDisposed;
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1, 1);
// 定义消息接收事件
public event EventHandler<MqttApplicationMessageReceivedEventArgs>? MessageReceived;
public event EventHandler<Exception>? ConnectionFailed;
public MqttHelper(string server, int port = 1883, string clientId = "wpf-demo", bool cleanSession = true)
{
if (string.IsNullOrEmpty(server))
throw new ArgumentNullException(nameof(server), "MQTT服务器地址不能为空");
if (string.IsNullOrEmpty(clientId))
throw new ArgumentNullException(nameof(clientId), "MQTT客户端ID不能为空");
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
_options = new MqttClientOptionsBuilder()
.WithTcpServer(server, port)
.WithClientId(clientId)
.WithCleanSession(cleanSession)
.Build();
_mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine($"MQTT连接断开原因: {e.Reason}");
await ReconnectWithBackoffAsync();
};
_mqttClient.ApplicationMessageReceivedAsync += e =>
{
MessageReceived?.Invoke(this, e);
return Task.CompletedTask;
};
}
public bool IsConnected => _mqttClient.IsConnected;
public async Task<bool> ConnectAsync(CancellationToken cancellationToken = default)
{
if (_isDisposed)
throw new ObjectDisposedException(nameof(MqttHelper));
if (_mqttClient.IsConnected)
return true;
await _connectionLock.WaitAsync(cancellationToken);
try
{
if (_mqttClient.IsConnected)
return true;
Console.WriteLine($"正在连接MQTT服务器: {_options.ChannelOptions}");
try
{
await _mqttClient.ConnectAsync(_options, cancellationToken);
Console.WriteLine("MQTT连接成功");
return true;
}
catch (OperationCanceledException)
{
Console.WriteLine("MQTT连接操作被取消");
throw;
}
catch (Exception ex)
{
Console.WriteLine($"MQTT连接失败: {ex.Message}");
ConnectionFailed?.Invoke(this, ex);
return false;
}
}
finally
{
_connectionLock.Release();
}
}
public async Task<bool> SubscribeAsync(string topic, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce)
{
if (string.IsNullOrEmpty(topic))
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
if (!await EnsureConnectedAsync())
return false;
try
{
var topicFilter = new MqttTopicFilterBuilder()
.WithTopic(topic)
.WithQualityOfServiceLevel(qos)
.Build();
await _mqttClient.SubscribeAsync(topicFilter);
Console.WriteLine($"成功订阅主题: {topic} (QoS: {qos})");
return true;
}
catch (Exception ex)
{
Console.WriteLine($"订阅主题失败: {topic}, 错误: {ex.Message}");
return false;
}
}
public async Task<bool> PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, bool retain = false)
{
if (string.IsNullOrEmpty(topic))
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
if (payload == null)
throw new ArgumentNullException(nameof(payload), "MQTT消息内容不能为空");
if (!await EnsureConnectedAsync())
return false;
try
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(retain)
.Build();
await _mqttClient.PublishAsync(message);
Console.WriteLine($"成功发布消息到主题: {topic} (QoS: {qos}, 保留: {retain})");
return true;
}
catch (Exception ex)
{
Console.WriteLine($"发布消息失败: {topic}, 错误: {ex.Message}");
return false;
}
}
public async Task<bool> UnsubscribeAsync(string topic)
{
if (string.IsNullOrEmpty(topic))
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
if (!await EnsureConnectedAsync())
return false;
try
{
await _mqttClient.UnsubscribeAsync(topic);
Console.WriteLine($"成功取消订阅主题: {topic}");
return true;
}
catch (Exception ex)
{
Console.WriteLine($"取消订阅失败: {topic}, 错误: {ex.Message}");
return false;
}
}
public async Task DisconnectAsync()
{
if (_isDisposed)
return;
if (!_mqttClient.IsConnected)
return;
try
{
await _mqttClient.DisconnectAsync();
Console.WriteLine("MQTT客户端已断开连接");
}
catch (Exception ex)
{
Console.WriteLine($"断开MQTT连接时出错: {ex.Message}");
}
}
private async Task ReconnectWithBackoffAsync()
{
if (_isDisposed)
return;
int retries = 0;
const int maxRetries = 5;
const int baseDelayMs = 1000;
while (retries < maxRetries && !_isDisposed)
{
int delayMs = baseDelayMs * (int)Math.Pow(2, retries);
Console.WriteLine($"将在 {delayMs}ms 后尝试重新连接MQTT服务器 (尝试 {retries + 1}/{maxRetries})");
await Task.Delay(delayMs);
if (await ConnectAsync())
return;
retries++;
}
Console.WriteLine($"达到最大重试次数 ({maxRetries})停止尝试重新连接MQTT服务器");
ConnectionFailed?.Invoke(this, new Exception("MQTT重新连接失败达到最大重试次数"));
}
private async Task<bool> EnsureConnectedAsync()
{
if (_isDisposed)
throw new ObjectDisposedException(nameof(MqttHelper));
if (_mqttClient.IsConnected)
return true;
Console.WriteLine("检测到MQTT连接断开尝试重新连接...");
return await ConnectAsync();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_isDisposed)
return;
_isDisposed = true;
if (disposing)
{
try
{
// 先取消事件订阅,防止在释放过程中触发事件
_mqttClient.DisconnectedAsync -= async e => await ReconnectWithBackoffAsync();
_mqttClient.ApplicationMessageReceivedAsync -= e =>
{
MessageReceived?.Invoke(this, e);
return Task.CompletedTask;
};
// 断开连接
if (_mqttClient.IsConnected)
_mqttClient.DisconnectAsync().GetAwaiter().GetResult();
// 释放资源
_mqttClient.Dispose();
_connectionLock.Dispose();
}
catch (Exception ex)
{
Console.WriteLine($"释放MQTT资源时出错: {ex.Message}");
}
}
}
}
}