using MQTTnet; using MQTTnet.Client; using MQTTnet.Formatter; using MQTTnet.Protocol; using System; using System.Collections.Concurrent; using System.Text; using System.Threading; using System.Threading.Tasks; namespace linesider_screen_tool { public class MQTTHepler { public IMqttClient _mqttClient; private MqttClientOptions _options; private readonly string _clientId; private readonly string _server; private readonly int _port; private readonly string _username; private readonly string _password; private readonly bool _cleanSession; /// /// 初始化 MQTT 客户端 /// public MQTTHepler(string server, int port = 1883, string clientId = null, string username = null, string password = null, bool cleanSession = true) { _server = server; _port = port; _clientId = clientId; _username = username; _password = password; _cleanSession = cleanSession; } /// /// 连接到 MQTT 服务器 /// public async Task ConnectAsync() { if (_mqttClient != null && _mqttClient.IsConnected) return; var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); // 先注册事件处理器 //_mqttClient.ApplicationMessageReceivedAsync += HandleReceivedApplicationMessage; var builder = new MqttClientOptionsBuilder() .WithTcpServer(_server, _port) .WithClientId(_clientId) // 确保指定了 ClientId .WithCleanSession(_cleanSession); if (!string.IsNullOrEmpty(_username)) builder.WithCredentials(_username, _password); _options = builder.Build(); _mqttClient.DisconnectedAsync += async e => { Console.WriteLine($"MQTT 断开: {e.Reason}"); await Task.Delay(5000); await ReconnectAsync(); }; await _mqttClient.ConnectAsync(_options, CancellationToken.None); Console.WriteLine("MQTT 连接成功"); } /// /// 订阅主题 /// public async Task SubscribeAsync(string topic, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce) { if (_mqttClient == null || !_mqttClient.IsConnected) await ConnectAsync(); var topicFilter = new MqttTopicFilterBuilder() .WithTopic(topic) .WithQualityOfServiceLevel(qos) .Build(); var result = await _mqttClient.SubscribeAsync(topicFilter, CancellationToken.None); Console.WriteLine($"订阅结果: {result.Items.First().ResultCode}"); } /// /// 发布消息 /// public async Task PublishAsync(string topic, string payload,MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, bool retain = false) { if (_mqttClient == null || !_mqttClient.IsConnected) await ConnectAsync(); var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(qos) .WithRetainFlag(retain) .Build(); try { await _mqttClient.PublishAsync(message, CancellationToken.None); Console.WriteLine($"已发布消息到主题 {topic}: {payload}"); } catch (Exception ex) { Console.WriteLine($"发布消息失败: {ex.Message}"); throw; } } /// /// 取消订阅主题 /// public async Task UnsubscribeAsync(string topic) { if (_mqttClient == null || !_mqttClient.IsConnected) return; try { await _mqttClient.UnsubscribeAsync(topic, CancellationToken.None); Console.WriteLine($"已取消订阅主题: {topic}"); } catch (Exception ex) { Console.WriteLine($"取消订阅主题失败: {ex.Message}"); throw; } } private async Task ReconnectAsync() { try { await _mqttClient.ConnectAsync(_options, CancellationToken.None); Console.WriteLine("MQTT 重新连接成功"); } catch (Exception ex) { Console.WriteLine($"MQTT 重新连接失败: {ex.Message}"); } } public async Task DisconnectAsync() { if (_mqttClient == null || !_mqttClient.IsConnected) return; try { await _mqttClient.DisconnectAsync(); DisconnectAsync().Wait(); _mqttClient?.Dispose(); GC.SuppressFinalize(this); Console.WriteLine("MQTT 已断开连接"); } catch (Exception ex) { Console.WriteLine($"断开连接失败: {ex.Message}"); } } } }