151 lines
5.3 KiB
C#
151 lines
5.3 KiB
C#
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Security.Cryptography;
|
||
using System.Text;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
using Hncore.Infrastructure.Common;
|
||
using MQTTnet;
|
||
using MQTTnet.Client;
|
||
using Polly;
|
||
|
||
namespace Hncore.Infrastructure.Mqtt
|
||
{
|
||
public class MQTTClient : IDisposable
|
||
{
|
||
//实例 ID,购买后从控制台获取
|
||
string _instanceId = "post-cn-v0h12xbue09";
|
||
|
||
//此处填写购买得到的 MQTT 接入点域名
|
||
string _brokerUrl = "post-cn-v0h12xbue09.mqtt.aliyuncs.com";
|
||
|
||
//此处填写阿里云帐号 AccessKey
|
||
string _accessKey = "LTAIaJpHI68JfX2c";
|
||
|
||
//此处填写阿里云帐号 SecretKey
|
||
string _secretKey = "f17za6FRggVzwlSqzFHl8GndQ59SGV";
|
||
|
||
//此处填写客户端 ClientId,需要保证全局唯一,其中前缀部分即 GroupId 需要先在 MQ 控制台创建
|
||
string _clientId = "GID_DOOR@@@MA_" + Guid.NewGuid();
|
||
|
||
private IMqttClient _mqttClient;
|
||
|
||
private SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
|
||
|
||
private ConcurrentDictionary<string, Action<string>> cmdMap = new ConcurrentDictionary<string, Action<string>>();
|
||
|
||
public MQTTClient()
|
||
{
|
||
_mqttClient = new MqttFactory().CreateMqttClient();
|
||
|
||
_mqttClient.Disconnected += async (s, e) =>
|
||
{
|
||
await Task.Delay(TimeSpan.FromSeconds(2));
|
||
await Conn();
|
||
};
|
||
_mqttClient.ApplicationMessageReceived += (s, e) =>
|
||
{
|
||
var topic = e.ApplicationMessage.Topic.TrimEnd('/');
|
||
var data = Encoding.UTF8.GetString(e.ApplicationMessage.Payload??new byte[0]);
|
||
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
|
||
Console.WriteLine($"+ Topic = {topic}");
|
||
Console.WriteLine($"+ Payload = {data}");
|
||
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
|
||
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
|
||
Console.WriteLine();
|
||
if (cmdMap.ContainsKey(topic))
|
||
{
|
||
try
|
||
{
|
||
cmdMap[topic](data);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
LogHelper.Error($"Mqtt:{topic}", ex.Message);
|
||
}
|
||
}
|
||
};
|
||
}
|
||
|
||
private async Task Conn()
|
||
{
|
||
if (!_mqttClient.IsConnected)
|
||
{
|
||
await _lock.WaitAsync();
|
||
|
||
try
|
||
{
|
||
int i = 0;
|
||
|
||
await Policy.Handle<Exception>()
|
||
.OrResult<MqttClientConnectResult>(res => !res.IsSessionPresent)
|
||
.RetryAsync(10)
|
||
.ExecuteAsync(async () =>
|
||
{
|
||
if (!_mqttClient.IsConnected)
|
||
{
|
||
i++;
|
||
|
||
try
|
||
{
|
||
string userName = "Signature|" + _accessKey + "|" + _instanceId;
|
||
string passWord = HMACSHA1(_secretKey, _clientId);
|
||
|
||
var options = new MqttClientOptionsBuilder()
|
||
.WithClientId(_clientId)
|
||
.WithTcpServer(_brokerUrl)
|
||
.WithCredentials(userName, passWord)
|
||
.WithCleanSession()
|
||
.Build();
|
||
|
||
return await _mqttClient.ConnectAsync(options);
|
||
}
|
||
catch (Exception e)
|
||
{
|
||
LogHelper.Error($"mqtt连接失败,第{i}次连接", e);
|
||
throw;
|
||
}
|
||
}
|
||
|
||
return new MqttClientConnectResult(true);
|
||
});
|
||
}
|
||
finally
|
||
{
|
||
_lock.Release();
|
||
}
|
||
}
|
||
}
|
||
|
||
public async Task PublishAsync(string topic, string payload)
|
||
{
|
||
await Conn();
|
||
|
||
await _mqttClient.PublishAsync(topic, payload);
|
||
}
|
||
|
||
public async Task SubscribeAsync(string topic, Action<string> action)
|
||
{
|
||
await Conn();
|
||
var option = new TopicFilterBuilder().WithTopic(topic).Build();
|
||
await _mqttClient.SubscribeAsync(option);
|
||
cmdMap[topic] = action;
|
||
}
|
||
|
||
public static string HMACSHA1(string key, string dataToSign)
|
||
{
|
||
Byte[] secretBytes = UTF8Encoding.UTF8.GetBytes(key);
|
||
HMACSHA1 hmac = new HMACSHA1(secretBytes);
|
||
Byte[] dataBytes = UTF8Encoding.UTF8.GetBytes(dataToSign);
|
||
Byte[] calcHash = hmac.ComputeHash(dataBytes);
|
||
String calcHashString = Convert.ToBase64String(calcHash);
|
||
return calcHashString;
|
||
}
|
||
|
||
public async void Dispose()
|
||
{
|
||
await _mqttClient.DisconnectAsync();
|
||
_mqttClient?.Dispose();
|
||
}
|
||
}
|
||
} |