Files
juipnet/Infrastructure/Hncore.Infrastructure/Mqtt/MQTTClient.cs
wanyongkang d318014316 初始提交
2020-10-07 20:25:03 +08:00

151 lines
5.5 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System;
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Hncore.Infrastructure.Common;
using MQTTnet;
using MQTTnet.Client;
using Polly;
namespace Hncore.Infrastructure.Mqtt
{
public class MQTTClient : IDisposable
{
//实例 ID购买后从控制台获取
string _instanceId = "post-cn-v0h12xbue09";
//此处填写购买得到的 MQTT 接入点域名
string _brokerUrl = "post-cn-v0h12xbue09.mqtt.aliyuncs.com";
//此处填写阿里云帐号 AccessKey
string _accessKey = "LTAIaJpHI68JfX2c";
//此处填写阿里云帐号 SecretKey
string _secretKey = "f17za6FRggVzwlSqzFHl8GndQ59SGV";
//此处填写客户端 ClientId需要保证全局唯一其中前缀部分即 GroupId 需要先在 MQ 控制台创建
string _clientId = "GID_DOOR@@@MA_" + Guid.NewGuid();
private IMqttClient _mqttClient;
private SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
private ConcurrentDictionary<string, Action<string>> cmdMap = new ConcurrentDictionary<string, Action<string>>();
public MQTTClient()
{
_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.Disconnected += async (s, e) =>
{
await Task.Delay(TimeSpan.FromSeconds(2));
await Conn();
};
_mqttClient.ApplicationMessageReceived += (s, e) =>
{
var topic = e.ApplicationMessage.Topic.TrimEnd('/');
var data = Encoding.UTF8.GetString(e.ApplicationMessage.Payload??new byte[0]);
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {topic}");
Console.WriteLine($"+ Payload = {data}");
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
Console.WriteLine();
if (cmdMap.ContainsKey(topic))
{
try
{
cmdMap[topic](data);
}
catch (Exception ex)
{
LogHelper.Error($"Mqtt:{topic}", ex.Message);
}
}
};
}
private async Task Conn()
{
if (!_mqttClient.IsConnected)
{
await _lock.WaitAsync();
try
{
int i = 0;
await Policy.Handle<Exception>()
.OrResult<MqttClientConnectResult>(res => !res.IsSessionPresent)
.RetryAsync(10)
.ExecuteAsync(async () =>
{
if (!_mqttClient.IsConnected)
{
i++;
try
{
string userName = "Signature|" + _accessKey + "|" + _instanceId;
string passWord = HMACSHA1(_secretKey, _clientId);
var options = new MqttClientOptionsBuilder()
.WithClientId(_clientId)
.WithTcpServer(_brokerUrl)
.WithCredentials(userName, passWord)
.WithCleanSession()
.Build();
return await _mqttClient.ConnectAsync(options);
}
catch (Exception e)
{
LogHelper.Error($"mqtt连接失败第{i}次连接", e);
throw;
}
}
return new MqttClientConnectResult(true);
});
}
finally
{
_lock.Release();
}
}
}
public async Task PublishAsync(string topic, string payload)
{
await Conn();
await _mqttClient.PublishAsync(topic, payload);
}
public async Task SubscribeAsync(string topic, Action<string> action)
{
await Conn();
var option = new TopicFilterBuilder().WithTopic(topic).Build();
await _mqttClient.SubscribeAsync(option);
cmdMap[topic] = action;
}
public static string HMACSHA1(string key, string dataToSign)
{
Byte[] secretBytes = UTF8Encoding.UTF8.GetBytes(key);
HMACSHA1 hmac = new HMACSHA1(secretBytes);
Byte[] dataBytes = UTF8Encoding.UTF8.GetBytes(dataToSign);
Byte[] calcHash = hmac.ComputeHash(dataBytes);
String calcHashString = Convert.ToBase64String(calcHash);
return calcHashString;
}
public async void Dispose()
{
await _mqttClient.DisconnectAsync();
_mqttClient?.Dispose();
}
}
}