Files

94 lines
3.4 KiB
C#

using Infrastructure.Attribute;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
namespace ZR.Common.MqttHelper
{
[AppService(ServiceLifetime = LifeTime.Singleton)]
public class MyMqttConfig
{
// 配置常量
private const string MqttConfigSection = "MqttConfig";
private const string TopicsConfigKey = "Topics";
private const int DefaultReconnectDelaySeconds = 5;
private const int MaxReconnectAttempts = 10;
private const int MaxReconnectDelaySeconds = 60;
private readonly ILogger<MyMqttConfig> _logger;
private readonly IConfiguration _configuration;
public MyMqttConfig(ILogger<MyMqttConfig> logger, IConfiguration configuration)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_configuration = configuration;
}
public MqttClientOptions BuildMqttClientOptions()
{
var mqttConfig = _configuration.GetSection(MqttConfigSection);
var builder = new MqttClientOptionsBuilder()
.WithClientId(mqttConfig["ClientId"] ?? Guid.NewGuid().ToString())
.WithTcpServer(mqttConfig["Server"], mqttConfig.GetValue<int>("Port", 1883))
.WithCleanSession(false);
return builder.Build();
}
public MqttClientSubscribeOptions GetSubscribeToTopicsAsync()
{
var topicsSection = _configuration.GetSection($"{MqttConfigSection}:{TopicsConfigKey}");
var topicConfigs = topicsSection.Get<List<TopicSubscriptionConfig>>() ?? new List<TopicSubscriptionConfig>();
if (!topicConfigs.Any())
{
_logger.LogInformation("没有配置要订阅的MQTT主题");
return null; // 或返回空配置,根据业务需求决定
}
var subscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder();
foreach (var config in topicConfigs)
{
var topicFilter = config.ToMqttTopicFilter();
subscribeOptionsBuilder.WithTopicFilter(topicFilter); // 直接添加单个主题过滤器
}
return subscribeOptionsBuilder.Build();
}
// 主题订阅配置类
private class TopicSubscriptionConfig
{
public string Topic { get; set; }
public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } = MqttQualityOfServiceLevel.AtLeastOnce;
public bool NoLocal { get; set; } = false;
public bool RetainAsPublished { get; set; } = false;
public MqttRetainHandling RetainHandling { get; set; } = MqttRetainHandling.SendAtSubscribe;
public MqttTopicFilter ToMqttTopicFilter()
{
return new MqttTopicFilterBuilder()
.WithTopic(Topic)
.WithQualityOfServiceLevel(QualityOfServiceLevel)
.WithNoLocal(NoLocal)
.WithRetainAsPublished(RetainAsPublished)
.WithRetainHandling(RetainHandling)
.Build();
}
public override string ToString()
{
return $"{Topic}@QoS{QualityOfServiceLevel}";
}
}
}
}