Files
shgxtzcjhoudaosaomadayinwpf/linesider_screen_tool/Hepler/MQTTHepler.cs
2025-04-22 15:33:38 +08:00

172 lines
5.4 KiB
C#

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace linesider_screen_tool
{
public class MQTTHepler
{
public IMqttClient _mqttClient;
private MqttClientOptions _options;
private readonly string _clientId;
private readonly string _server;
private readonly int _port;
private readonly string _username;
private readonly string _password;
private readonly bool _cleanSession;
/// <summary>
/// 初始化 MQTT 客户端
/// </summary>
public MQTTHepler(string server, int port = 1883, string clientId = "demo_test001",
string username = "admin", string password = "public123", bool cleanSession = true)
{
_server = server;
_port = port;
_clientId = clientId;
_username = username;
_password = password;
_cleanSession = cleanSession;
}
/// <summary>
/// 连接到 MQTT 服务器
/// </summary>
public async Task ConnectAsync()
{
if (_mqttClient != null && _mqttClient.IsConnected)
return;
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 先注册事件处理器
//_mqttClient.ApplicationMessageReceivedAsync += HandleReceivedApplicationMessage;
var builder = new MqttClientOptionsBuilder()
.WithTcpServer(_server, _port)
.WithClientId(_clientId) // 确保指定了 ClientId
.WithCleanSession(_cleanSession);
if (!string.IsNullOrEmpty(_username))
builder.WithCredentials(_username, _password);
_options = builder.Build();
_mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine($"MQTT 断开: {e.Reason}");
await Task.Delay(5000);
await ReconnectAsync();
};
await _mqttClient.ConnectAsync(_options, CancellationToken.None);
Console.WriteLine("MQTT 连接成功");
}
/// <summary>
/// 订阅主题
/// </summary>
public async Task SubscribeAsync(string topic, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce)
{
if (_mqttClient == null || !_mqttClient.IsConnected)
await ConnectAsync();
var topicFilter = new MqttTopicFilterBuilder()
.WithTopic(topic)
.WithQualityOfServiceLevel(qos)
.Build();
var result = await _mqttClient.SubscribeAsync(topicFilter, CancellationToken.None);
Console.WriteLine($"订阅结果: {result.Items.First().ResultCode}");
}
/// <summary>
/// 发布消息
/// </summary>
public async Task PublishAsync(string topic, string payload,MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce,
bool retain = false)
{
if (_mqttClient == null || !_mqttClient.IsConnected)
await ConnectAsync();
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(retain)
.Build();
try
{
await _mqttClient.PublishAsync(message, CancellationToken.None);
Console.WriteLine($"已发布消息到主题 {topic}: {payload}");
}
catch (Exception ex)
{
Console.WriteLine($"发布消息失败: {ex.Message}");
throw;
}
}
/// <summary>
/// 取消订阅主题
/// </summary>
public async Task UnsubscribeAsync(string topic)
{
if (_mqttClient == null || !_mqttClient.IsConnected)
return;
try
{
await _mqttClient.UnsubscribeAsync(topic, CancellationToken.None);
Console.WriteLine($"已取消订阅主题: {topic}");
}
catch (Exception ex)
{
Console.WriteLine($"取消订阅主题失败: {ex.Message}");
throw;
}
}
private async Task ReconnectAsync()
{
try
{
await _mqttClient.ConnectAsync(_options, CancellationToken.None);
Console.WriteLine("MQTT 重新连接成功");
}
catch (Exception ex)
{
Console.WriteLine($"MQTT 重新连接失败: {ex.Message}");
}
}
public async Task DisconnectAsync()
{
if (_mqttClient == null || !_mqttClient.IsConnected)
return;
try
{
await _mqttClient.DisconnectAsync();
DisconnectAsync().Wait();
_mqttClient?.Dispose();
GC.SuppressFinalize(this);
Console.WriteLine("MQTT 已断开连接");
}
catch (Exception ex)
{
Console.WriteLine($"断开连接失败: {ex.Message}");
}
}
}
}