using Infrastructure.Attribute; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Client; using MQTTnet.Packets; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.Linq; namespace ZR.Common.MqttHelper { [AppService(ServiceLifetime = LifeTime.Singleton)] public class MyMqttConfig { // 配置常量 private const string MqttConfigSection = "MqttConfig"; private const string TopicsConfigKey = "Topics"; private const int DefaultReconnectDelaySeconds = 5; private const int MaxReconnectAttempts = 10; private const int MaxReconnectDelaySeconds = 60; private readonly ILogger _logger; private readonly IConfiguration _configuration; public MyMqttConfig(ILogger logger, IConfiguration configuration) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _configuration = configuration; } public MqttClientOptions BuildMqttClientOptions() { var mqttConfig = _configuration.GetSection(MqttConfigSection); var builder = new MqttClientOptionsBuilder() .WithClientId(mqttConfig["ClientId"] ?? Guid.NewGuid().ToString()) .WithTcpServer(mqttConfig["Server"], mqttConfig.GetValue("Port", 1883)) .WithCleanSession(false); return builder.Build(); } public MqttClientSubscribeOptions GetSubscribeToTopicsAsync() { var topicsSection = _configuration.GetSection($"{MqttConfigSection}:{TopicsConfigKey}"); var topicConfigs = topicsSection.Get>() ?? new List(); if (!topicConfigs.Any()) { _logger.LogInformation("没有配置要订阅的MQTT主题"); return null; // 或返回空配置,根据业务需求决定 } var subscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder(); foreach (var config in topicConfigs) { var topicFilter = config.ToMqttTopicFilter(); subscribeOptionsBuilder.WithTopicFilter(topicFilter); // 直接添加单个主题过滤器 } return subscribeOptionsBuilder.Build(); } // 主题订阅配置类 private class TopicSubscriptionConfig { public string Topic { get; set; } public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } = MqttQualityOfServiceLevel.AtLeastOnce; public bool NoLocal { get; set; } = false; public bool RetainAsPublished { get; set; } = false; public MqttRetainHandling RetainHandling { get; set; } = MqttRetainHandling.SendAtSubscribe; public MqttTopicFilter ToMqttTopicFilter() { return new MqttTopicFilterBuilder() .WithTopic(Topic) .WithQualityOfServiceLevel(QualityOfServiceLevel) .WithNoLocal(NoLocal) .WithRetainAsPublished(RetainAsPublished) .WithRetainHandling(RetainHandling) .Build(); } public override string ToString() { return $"{Topic}@QoS{QualityOfServiceLevel}"; } } } }