using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; namespace RIZO_Helper.Tools { public class MqttHelper : IDisposable { private readonly IMqttClient _mqttClient; private MqttClientOptions _options; private bool _isDisposed; // 用于确保只有一个后台重连任务运行 private bool _isReconnecting; private readonly object _reconnectLock = new object(); // 定义消息接收事件 public event EventHandler? MessageReceived; public event EventHandler? ConnectionFailed; public Action Disconnected; public MqttHelper(string server, int port = 1883, string clientId = "wpf-demo", bool cleanSession = false) { if (string.IsNullOrEmpty(server)) throw new ArgumentNullException(nameof(server), "MQTT服务器地址不能为空"); if (string.IsNullOrEmpty(clientId)) throw new ArgumentNullException(nameof(clientId), "MQTT客户端ID不能为空"); var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); _options = new MqttClientOptionsBuilder() .WithTcpServer(server, port) .WithClientId(clientId) .WithCleanSession(cleanSession) .Build(); _mqttClient.DisconnectedAsync += async e => { Disconnected.Invoke($"MQTT连接断开,原因: {e.Reason}", 0, 1); Debug.WriteLine($"MQTT连接断开,原因: {e.Reason}"); await StartReconnectIfNeededAsync(); }; _mqttClient.ApplicationMessageReceivedAsync += e => { MessageReceived?.Invoke(this, e); return Task.CompletedTask; }; } public bool IsConnected => _mqttClient.IsConnected; public async Task ConnectAsync(CancellationToken cancellationToken = default) { if (_isDisposed) throw new ObjectDisposedException(nameof(MqttHelper)); if (_mqttClient.IsConnected) return true; Debug.WriteLine($"正在连接MQTT服务器: {_options.ChannelOptions}"); try { await _mqttClient.ConnectAsync(_options, cancellationToken); Debug.WriteLine("MQTT连接成功"); return true; } catch (OperationCanceledException) { Debug.WriteLine("MQTT连接操作被取消"); throw; } catch (Exception ex) { Debug.WriteLine($"MQTT连接失败: {ex.Message}"); ConnectionFailed?.Invoke(this, ex); // 连接失败时启动重连 await StartReconnectIfNeededAsync(); return false; } } public async Task SubscribeAsync(string topic, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce) { if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空"); if (!await EnsureConnectedAsync()) return false; try { var topicFilter = new MqttTopicFilterBuilder() .WithTopic(topic) .WithQualityOfServiceLevel(qos) .Build(); await _mqttClient.SubscribeAsync(topicFilter); Debug.WriteLine($"成功订阅主题: {topic} (QoS: {qos})"); return true; } catch (Exception ex) { Debug.WriteLine($"订阅主题失败: {topic}, 错误: {ex.Message}"); return false; } } public async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, bool retain = false) { if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空"); if (payload == null) throw new ArgumentNullException(nameof(payload), "MQTT消息内容不能为空"); if (!await EnsureConnectedAsync()) return false; try { var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(qos) .WithRetainFlag(retain) .Build(); await _mqttClient.PublishAsync(message); Debug.WriteLine($"成功发布消息到主题: {topic} (QoS: {qos}, 保留: {retain})"); return true; } catch (Exception ex) { Debug.WriteLine($"发布消息失败: {topic}, 错误: {ex.Message}"); return false; } } public async Task UnsubscribeAsync(string topic) { if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic), "MQTT主题不能为空"); if (!await EnsureConnectedAsync()) return false; try { await _mqttClient.UnsubscribeAsync(topic); Debug.WriteLine($"成功取消订阅主题: {topic}"); return true; } catch (Exception ex) { Debug.WriteLine($"取消订阅失败: {topic}, 错误: {ex.Message}"); return false; } } public async Task DisconnectAsync() { if (_isDisposed) return; if (!_mqttClient.IsConnected) return; try { await _mqttClient.DisconnectAsync(); Debug.WriteLine("MQTT客户端已断开连接"); } catch (Exception ex) { Debug.WriteLine($"断开MQTT连接时出错: {ex.Message}"); } } // 启动重连任务(如果没有正在运行的重连任务) private async Task StartReconnectIfNeededAsync() { if (_isDisposed) return; // 使用锁确保只有一个重连任务启动 lock (_reconnectLock) { if (_isReconnecting) return; _isReconnecting = true; } try { await ReconnectWithBackoffAsync(); } finally { // 无论重连成功或失败,都标记为重连已完成 lock (_reconnectLock) { _isReconnecting = false; } } } private async Task ReconnectWithBackoffAsync() { if (_isDisposed) return; int retries = 0; const int maxRetries = 5; const int baseDelayMs = 1000; while (retries < maxRetries && !_isDisposed) { // 指数退避算法,避免频繁重试 int delayMs = baseDelayMs * (int)Math.Pow(2, retries); Disconnected.Invoke($"将在 {delayMs}ms 后尝试重新连接MQTT服务器 (尝试 {retries + 1}/{maxRetries})", retries+1, maxRetries); Debug.WriteLine($"将在 {delayMs}ms 后尝试重新连接MQTT服务器 (尝试 {retries + 1}/{maxRetries})"); await Task.Delay(delayMs); if (await ConnectAsync()) return; retries++; } Debug.WriteLine($"达到最大重试次数 ({maxRetries}),停止尝试重新连接MQTT服务器"); ConnectionFailed?.Invoke(this, new Exception("MQTT重新连接失败,达到最大重试次数")); } private async Task EnsureConnectedAsync() { if (_isDisposed) throw new ObjectDisposedException(nameof(MqttHelper)); if (_mqttClient.IsConnected) return true; Debug.WriteLine("检测到MQTT连接断开,尝试重新连接..."); return await ConnectAsync(); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (_isDisposed) return; _isDisposed = true; if (disposing) { try { // 先取消事件订阅,防止在释放过程中触发事件 _mqttClient.DisconnectedAsync -= async e => await StartReconnectIfNeededAsync(); _mqttClient.ApplicationMessageReceivedAsync -= e => { MessageReceived?.Invoke(this, e); return Task.CompletedTask; }; // 断开连接 if (_mqttClient.IsConnected) _mqttClient.DisconnectAsync().GetAwaiter().GetResult(); // 释放资源 _mqttClient.Dispose(); } catch (Exception ex) { Debug.WriteLine($"释放MQTT资源时出错: {ex.Message}"); } } } } }