using System; using System.Collections.Concurrent; using System.Security.Cryptography; using System.Text; using System.Threading; using System.Threading.Tasks; using Hncore.Infrastructure.Common; using MQTTnet; using MQTTnet.Client; using Polly; namespace Hncore.Infrastructure.Mqtt { public class MQTTClient : IDisposable { //实例 ID,购买后从控制台获取 string _instanceId = "post-cn-v0h12xbue09"; //此处填写购买得到的 MQTT 接入点域名 string _brokerUrl = "post-cn-v0h12xbue09.mqtt.aliyuncs.com"; //此处填写阿里云帐号 AccessKey string _accessKey = "LTAIaJpHI68JfX2c"; //此处填写阿里云帐号 SecretKey string _secretKey = "f17za6FRggVzwlSqzFHl8GndQ59SGV"; //此处填写客户端 ClientId,需要保证全局唯一,其中前缀部分即 GroupId 需要先在 MQ 控制台创建 string _clientId = "GID_DOOR@@@MA_" + Guid.NewGuid(); private IMqttClient _mqttClient; private SemaphoreSlim _lock = new SemaphoreSlim(1, 1); private ConcurrentDictionary> cmdMap = new ConcurrentDictionary>(); public MQTTClient() { _mqttClient = new MqttFactory().CreateMqttClient(); _mqttClient.Disconnected += async (s, e) => { await Task.Delay(TimeSpan.FromSeconds(2)); await Conn(); }; _mqttClient.ApplicationMessageReceived += (s, e) => { var topic = e.ApplicationMessage.Topic.TrimEnd('/'); var data = Encoding.UTF8.GetString(e.ApplicationMessage.Payload??new byte[0]); Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); Console.WriteLine($"+ Topic = {topic}"); Console.WriteLine($"+ Payload = {data}"); Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); Console.WriteLine(); if (cmdMap.ContainsKey(topic)) { try { cmdMap[topic](data); } catch (Exception ex) { LogHelper.Error($"Mqtt:{topic}", ex.Message); } } }; } private async Task Conn() { if (!_mqttClient.IsConnected) { await _lock.WaitAsync(); try { int i = 0; await Policy.Handle() .OrResult(res => !res.IsSessionPresent) .RetryAsync(10) .ExecuteAsync(async () => { if (!_mqttClient.IsConnected) { i++; try { string userName = "Signature|" + _accessKey + "|" + _instanceId; string passWord = HMACSHA1(_secretKey, _clientId); var options = new MqttClientOptionsBuilder() .WithClientId(_clientId) .WithTcpServer(_brokerUrl) .WithCredentials(userName, passWord) .WithCleanSession() .Build(); return await _mqttClient.ConnectAsync(options); } catch (Exception e) { LogHelper.Error($"mqtt连接失败,第{i}次连接", e); throw; } } return new MqttClientConnectResult(true); }); } finally { _lock.Release(); } } } public async Task PublishAsync(string topic, string payload) { await Conn(); await _mqttClient.PublishAsync(topic, payload); } public async Task SubscribeAsync(string topic, Action action) { await Conn(); var option = new TopicFilterBuilder().WithTopic(topic).Build(); await _mqttClient.SubscribeAsync(option); cmdMap[topic] = action; } public static string HMACSHA1(string key, string dataToSign) { Byte[] secretBytes = UTF8Encoding.UTF8.GetBytes(key); HMACSHA1 hmac = new HMACSHA1(secretBytes); Byte[] dataBytes = UTF8Encoding.UTF8.GetBytes(dataToSign); Byte[] calcHash = hmac.ComputeHash(dataBytes); String calcHashString = Convert.ToBase64String(calcHash); return calcHashString; } public async void Dispose() { await _mqttClient.DisconnectAsync(); _mqttClient?.Dispose(); } } }