使用新框架与技术代替旧框架与技术,实现涂装车间后道标签扫码程序
This commit is contained in:
@@ -0,0 +1,266 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MQTTnet;
|
||||
using MQTTnet.Client;
|
||||
using MQTTnet.Protocol;
|
||||
|
||||
namespace RIZO_Helper.Tools
|
||||
{
|
||||
public class MqttHelper : IDisposable
|
||||
{
|
||||
private readonly IMqttClient _mqttClient;
|
||||
private MqttClientOptions _options;
|
||||
private bool _isDisposed;
|
||||
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1, 1);
|
||||
|
||||
// 定义消息接收事件
|
||||
public event EventHandler<MqttApplicationMessageReceivedEventArgs>? MessageReceived;
|
||||
public event EventHandler<Exception>? ConnectionFailed;
|
||||
|
||||
public MqttHelper(string server, int port = 1883, string clientId = "wpf-demo", bool cleanSession = true)
|
||||
{
|
||||
if (string.IsNullOrEmpty(server))
|
||||
throw new ArgumentNullException(nameof(server), "MQTT服务器地址不能为空");
|
||||
|
||||
if (string.IsNullOrEmpty(clientId))
|
||||
throw new ArgumentNullException(nameof(clientId), "MQTT客户端ID不能为空");
|
||||
|
||||
var factory = new MqttFactory();
|
||||
_mqttClient = factory.CreateMqttClient();
|
||||
_options = new MqttClientOptionsBuilder()
|
||||
.WithTcpServer(server, port)
|
||||
.WithClientId(clientId)
|
||||
.WithCleanSession(cleanSession)
|
||||
.Build();
|
||||
|
||||
_mqttClient.DisconnectedAsync += async e =>
|
||||
{
|
||||
Console.WriteLine($"MQTT连接断开,原因: {e.Reason}");
|
||||
await ReconnectWithBackoffAsync();
|
||||
};
|
||||
|
||||
_mqttClient.ApplicationMessageReceivedAsync += e =>
|
||||
{
|
||||
MessageReceived?.Invoke(this, e);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
}
|
||||
|
||||
public bool IsConnected => _mqttClient.IsConnected;
|
||||
|
||||
public async Task<bool> ConnectAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_isDisposed)
|
||||
throw new ObjectDisposedException(nameof(MqttHelper));
|
||||
|
||||
if (_mqttClient.IsConnected)
|
||||
return true;
|
||||
|
||||
await _connectionLock.WaitAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
if (_mqttClient.IsConnected)
|
||||
return true;
|
||||
|
||||
Console.WriteLine($"正在连接MQTT服务器: {_options.ChannelOptions}");
|
||||
|
||||
try
|
||||
{
|
||||
await _mqttClient.ConnectAsync(_options, cancellationToken);
|
||||
Console.WriteLine("MQTT连接成功");
|
||||
return true;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Console.WriteLine("MQTT连接操作被取消");
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"MQTT连接失败: {ex.Message}");
|
||||
ConnectionFailed?.Invoke(this, ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_connectionLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> SubscribeAsync(string topic, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce)
|
||||
{
|
||||
if (string.IsNullOrEmpty(topic))
|
||||
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
|
||||
|
||||
if (!await EnsureConnectedAsync())
|
||||
return false;
|
||||
|
||||
try
|
||||
{
|
||||
var topicFilter = new MqttTopicFilterBuilder()
|
||||
.WithTopic(topic)
|
||||
.WithQualityOfServiceLevel(qos)
|
||||
.Build();
|
||||
|
||||
await _mqttClient.SubscribeAsync(topicFilter);
|
||||
Console.WriteLine($"成功订阅主题: {topic} (QoS: {qos})");
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"订阅主题失败: {topic}, 错误: {ex.Message}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, bool retain = false)
|
||||
{
|
||||
if (string.IsNullOrEmpty(topic))
|
||||
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
|
||||
|
||||
if (payload == null)
|
||||
throw new ArgumentNullException(nameof(payload), "MQTT消息内容不能为空");
|
||||
|
||||
if (!await EnsureConnectedAsync())
|
||||
return false;
|
||||
|
||||
try
|
||||
{
|
||||
var message = new MqttApplicationMessageBuilder()
|
||||
.WithTopic(topic)
|
||||
.WithPayload(payload)
|
||||
.WithQualityOfServiceLevel(qos)
|
||||
.WithRetainFlag(retain)
|
||||
.Build();
|
||||
|
||||
await _mqttClient.PublishAsync(message);
|
||||
Console.WriteLine($"成功发布消息到主题: {topic} (QoS: {qos}, 保留: {retain})");
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"发布消息失败: {topic}, 错误: {ex.Message}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> UnsubscribeAsync(string topic)
|
||||
{
|
||||
if (string.IsNullOrEmpty(topic))
|
||||
throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空");
|
||||
|
||||
if (!await EnsureConnectedAsync())
|
||||
return false;
|
||||
|
||||
try
|
||||
{
|
||||
await _mqttClient.UnsubscribeAsync(topic);
|
||||
Console.WriteLine($"成功取消订阅主题: {topic}");
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"取消订阅失败: {topic}, 错误: {ex.Message}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task DisconnectAsync()
|
||||
{
|
||||
if (_isDisposed)
|
||||
return;
|
||||
|
||||
if (!_mqttClient.IsConnected)
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
await _mqttClient.DisconnectAsync();
|
||||
Console.WriteLine("MQTT客户端已断开连接");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"断开MQTT连接时出错: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReconnectWithBackoffAsync()
|
||||
{
|
||||
if (_isDisposed)
|
||||
return;
|
||||
|
||||
int retries = 0;
|
||||
const int maxRetries = 5;
|
||||
const int baseDelayMs = 1000;
|
||||
|
||||
while (retries < maxRetries && !_isDisposed)
|
||||
{
|
||||
int delayMs = baseDelayMs * (int)Math.Pow(2, retries);
|
||||
Console.WriteLine($"将在 {delayMs}ms 后尝试重新连接MQTT服务器 (尝试 {retries + 1}/{maxRetries})");
|
||||
await Task.Delay(delayMs);
|
||||
|
||||
if (await ConnectAsync())
|
||||
return;
|
||||
|
||||
retries++;
|
||||
}
|
||||
|
||||
Console.WriteLine($"达到最大重试次数 ({maxRetries}),停止尝试重新连接MQTT服务器");
|
||||
ConnectionFailed?.Invoke(this, new Exception("MQTT重新连接失败,达到最大重试次数"));
|
||||
}
|
||||
|
||||
private async Task<bool> EnsureConnectedAsync()
|
||||
{
|
||||
if (_isDisposed)
|
||||
throw new ObjectDisposedException(nameof(MqttHelper));
|
||||
|
||||
if (_mqttClient.IsConnected)
|
||||
return true;
|
||||
|
||||
Console.WriteLine("检测到MQTT连接断开,尝试重新连接...");
|
||||
return await ConnectAsync();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
protected virtual void Dispose(bool disposing)
|
||||
{
|
||||
if (_isDisposed)
|
||||
return;
|
||||
|
||||
_isDisposed = true;
|
||||
|
||||
if (disposing)
|
||||
{
|
||||
try
|
||||
{
|
||||
// 先取消事件订阅,防止在释放过程中触发事件
|
||||
_mqttClient.DisconnectedAsync -= async e => await ReconnectWithBackoffAsync();
|
||||
_mqttClient.ApplicationMessageReceivedAsync -= e =>
|
||||
{
|
||||
MessageReceived?.Invoke(this, e);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
// 断开连接
|
||||
if (_mqttClient.IsConnected)
|
||||
_mqttClient.DisconnectAsync().GetAwaiter().GetResult();
|
||||
|
||||
// 释放资源
|
||||
_mqttClient.Dispose();
|
||||
_connectionLock.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"释放MQTT资源时出错: {ex.Message}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user