using NEIntelligentControl2.Models;
using NEIntelligentControl2.Models.Messages;
using NEIntelligentControl2.Models.Windturbine;
using NEIntelligentControl2.Service.Windturbine;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
namespace NEIntelligentControl2.Service.WebSocket
{
///
/// 用于消息收发,WebSocket
///
public class MessageBridge
{
private static readonly object _RegisterLocker = new object();
private Dictionary> _Observers; // 订阅者
private WebSocketStomp _WebSocket; // WebSocket
private WebSocketStomp _Adapter; // 适配器
public WebSocketStomp WebSocket { get => _WebSocket; }// 用于消息发送
private static JsonSerializerOptions SerializerOptions = new JsonSerializerOptions() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
private CacheManager _CacheManager;
private WEBHelper _WEBHelper;
private string _Url;
public MessageBridge(CacheManager cm, UrlManager um, WEBHelper web)
{
_Observers = new Dictionary>();
_CacheManager = cm;
_WEBHelper = web;
_Url = um.ServicePath;
string calcurl = null;
string adapterurl = null;
try
{
#if (DEBUG)
calcurl = ConfigurationManager.AppSettings["ServiceSocketPathDebug"];
adapterurl = ConfigurationManager.AppSettings["AdapterSocketPathDebug"];
#else
calcurl = ConfigurationManager.AppSettings["ServiceSocketPath"];
calcurl = ConfigurationManager.AppSettings["AdapterSocketPath"];
#endif
}
catch (Exception e)
{
Console.WriteLine($"配置文件读取出现错误:{e.Message}");
}
// WebSocket
_WebSocket = new WebSocketStomp(WebSocket_OnMessage, calcurl,
"SubscribeSuggestion".GetConfiguration(), "SubscribeRuntimeFault".GetConfiguration(),
"SubscribeRuntimeAlarm".GetConfiguration(), "SubscribePopupAlarm".GetConfiguration());
_WebSocket.IsCheckTime = true;
// DataAdapter
_Adapter = new WebSocketStomp(WebSocket_OnMessage_Adapter, adapterurl, "WindturbineInformation".GetConfiguration(), "PVInformation".GetConfiguration());
SerializerOptions.Converters.Add(new DateTimeConverter());
Task.Run(RefreshCustomLockInfo);
}
///
/// 定时刷新自定义挂牌信息
///
private async void RefreshCustomLockInfo()
{
while (true)
{
try
{
List ls = _WEBHelper.HttpGetJSON>($"{_Url}/api/windturbine/customer-lock");
if (ls == null) continue;
_CacheManager.CustomLockInfos = ls.ToDictionary(c => c.WindturbineID, c => c);
}
catch { }
await Task.Delay(5000);
}
}
private void WebSocket_OnMessage(object sender, WebSocketSharp.MessageEventArgs e)
{
var v = StompMessage.Deserialize(e.Data);
_WebSocket.Time = DateTime.Now;
if (v.Command.ToUpper() != "MESSAGE" || !v.Headers.ContainsKey("destination") || !_Observers.ContainsKey(v.Headers["destination"])) return;
var value = GetValue(v);
var vs = _Observers[v.Headers["destination"]].ToArray();
foreach (var iv in vs)
{
try
{
iv.OnMessage(value);
}
catch (Exception ee)
{
Console.WriteLine(ee.ToString());
}
}
}
private void WebSocket_OnMessage_Adapter(object sender, WebSocketSharp.MessageEventArgs e)
{
var v = StompMessage.Deserialize(e.Data);
if (v.Command.ToUpper() != "MESSAGE" || !v.Headers.ContainsKey("destination") || !_Observers.ContainsKey(v.Headers["destination"])) return;
_Adapter.Time = DateTime.Now;
var value = GetValue(v);
var vs = _Observers[v.Headers["destination"]].ToArray();
foreach (var iv in vs)
{
try
{
iv.OnMessage(value);
}
catch (Exception ee)
{
Console.WriteLine(ee.ToString());
}
}
}
///
/// 注册,在不需要使用消息的时候请取消注册,避免造成内存泄露
///
public void Register(IMessage message)
{
if (message == null || message.MessageTypes == null) return;
lock (_RegisterLocker)
{
foreach (var v in message.MessageTypes)
{
if (v == null)
{
Console.WriteLine("注册消息不能为空");
continue;
}
if (!_Observers.ContainsKey(v))
{
_Observers.Add(v, new HashSet());
}
_Observers[v].Add(message);
}
}
}
///
/// 取消注册
///
public void Unregister(IMessage message)
{
if (message == null || message.MessageTypes == null) return;
lock (_RegisterLocker)
{
foreach (var v in message.MessageTypes)
{
if (_Observers.ContainsKey(v))
{
_Observers[v].Remove(message);
}
}
}
}
///
/// 关闭WebSocket连接
///
public void Close()
{
_WebSocket.Close();
_Adapter.Close();
}
///
/// 获取不同类型的数据
///
///
private object GetValue(StompMessage v)
{
if (!v.Headers.ContainsKey("destination")) return v;
var type = v.Headers["destination"];
switch (type)
{
case "/topic/suggestion":// 启停推荐
return GetSuggestion(v);
case "/topic/windturbine":// 风机信息
return GetWindturbineInfo(v);
case "/topic/pv":// 光伏信息
return GetPVInfo(v);
case "/topic/fault-popup":// 报警弹窗
return FalutPopupInfo(v);
case "/topic/fault-count":// 故障数量
return GetFaultCount(v);
case "/topic/alarm-count":// 报警数量
return GetAlarmCount(v);
default: return v;
}
}
private object GetAlarmCount(StompMessage v)
{
try
{
var val = JsonSerializer.Deserialize>(v.Body, SerializerOptions);
return val;
}
catch (Exception e)
{
Console.WriteLine($"解析报警弹窗数据出现错误:{e.Message}");
}
return null;
}
///
/// 故障数量信息解析
///
private object GetFaultCount(StompMessage v)
{
try
{
var val = JsonSerializer.Deserialize>(v.Body, SerializerOptions);
return val;
}
catch (Exception e)
{
Console.WriteLine($"解析报警弹窗数据出现错误:{e.Message}");
}
return null;
}
///
/// 报警弹窗信息解析
///
private object FalutPopupInfo(StompMessage v)
{
try
{
var val = JsonSerializer.Deserialize>(v.Body, SerializerOptions);
return val;
}
catch (Exception e)
{
Console.WriteLine($"解析报警弹窗数据出现错误:{e.Message}");
}
return null;
}
///
/// 光伏信息解析
///
private object GetPVInfo(StompMessage v)
{
try
{
var val = JsonSerializer.Deserialize>(v.Body, SerializerOptions);
_CacheManager.PVInfos = val;
return val;
}
catch (Exception e)
{
Console.WriteLine($"解析光伏数据出现错误:{e.Message}");
}
return null;
}
///
/// 风机信息解析
///
private object GetWindturbineInfo(StompMessage sm)
{
try
{
var val = JsonSerializer.Deserialize>(sm.Body, SerializerOptions);
_CacheManager.Windturbineinfos = val;
return val;
}
catch (Exception e)
{
Console.WriteLine($"解析风机数据出现错误:{e.Message}");
}
return null;
}
///
/// 启停推荐数据解析
///
private List GetSuggestion(StompMessage sm)
{
try
{
var val = JsonSerializer.Deserialize>(sm.Body, SerializerOptions);
return val;
}
catch (Exception e)
{
Console.WriteLine($"解析启停推荐数据出现错误:{e.Message}");
}
return null;
}
}
public class WebSocketStomp
{
private WebSocketSharp.WebSocket _WebSocket; // 当前WebSocket
private int _Index; // 发送序列
private bool _IsInitConnecting = false; // WebSocket是否正在重新连接
private string[] _Settings; // 要注册的数据类型
private string _Url; // 连接地址
private EventHandler _Message; // 消息发送
///
/// 是否检测时间,用于确定websocket是否断开连接
///
public bool IsCheckTime { get; set; } = false;
///
/// 当前心跳时间
///
public DateTime Time { get; set; }
public WebSocketStomp(EventHandler onMessage, string url, params string[] settings)
{
try
{
_Settings = settings;
_Url = url;
_Message = onMessage;
Task.Run(Connect);
// 用于定时检测WebSocket是否连接,如果断开则重新连接
Task.Factory.StartNew(Protector, TaskCreationOptions.LongRunning);
}
catch (Exception e)
{
Console.WriteLine($"连接WebSocket[{_Url}]出现错误:{e.Message}");
}
}
///
/// 连接WebSocket
///
private void Connect()
{
_WebSocket?.Close();
_WebSocket = new WebSocketSharp.WebSocket(_Url);
_WebSocket.OnMessage += _Message;
_WebSocket.Connect();
if (!_WebSocket.IsAlive)
{
Console.WriteLine($"WebSocket连接失败,{DateTime.Now}");
return;
}
var connect = new StompMessage("CONNECT");
connect["accept-version"] = "1.2";
connect["host"] = "";
_WebSocket.Send(connect.ToString());
foreach (var v in _Settings)
{
Subscribe(v);
}
Time = DateTime.Now;
Console.WriteLine($"WebSocket连接成功,{DateTime.Now}");
}
///
/// 用于检测WebSocket连接是否被断开,如果断开则重新连接
///
private async void Protector()
{
while (true)
{
await Task.Delay(10000);
if (IsCheckTime)
{
if ((DateTime.Now - Time).TotalSeconds < 90) continue;
}
else if ((_WebSocket != null && _WebSocket.IsAlive) || _IsInitConnecting)
{
continue;
}
Reconnect();
}
}
///
/// 重新连接WebSocket
///
private void Reconnect()
{
Console.WriteLine($"WebSocket开始重新连接,{DateTime.Now}");
try
{
_IsInitConnecting = true;
Connect();
}
finally
{
_IsInitConnecting = false;
}
}
///
/// 向服务器请求注册
///
public void Subscribe(string destination)
{
try
{
var sub = new StompMessage("SUBSCRIBE");
sub["id"] = "sub-" + _Index++;
sub["destination"] = destination;
_WebSocket.Send(sub.ToString());
}
catch { }
}
///
/// 向服务器请求注册
///
public async void Subscribe(string subid, string destination)
{
_Index++;
var sub = new StompMessage("SUBSCRIBE");
sub["id"] = subid;
sub["destination"] = destination;
_WebSocket.Send(sub.ToString());
await Task.Delay(500);
}
///
/// 向服务器请求注册
///
public async void Subscribe(string destination, Dictionary headers)
{
try
{
_Index++;
var sub = new StompMessage("SUBSCRIBE");
foreach (var kv in headers)
{
sub[kv.Key] = kv.Value;
}
if (!headers.ContainsKey("id"))
{
sub["id"] = "sub-" + _Index++;
}
sub["destination"] = destination;
_WebSocket.Send(sub.ToString());
await Task.Delay(500);
}
catch { }
}
///
/// 发送消息
///
public async void Send(StompMessage stomp)
{
if (stomp == null) return;
stomp["id"] = "cs-" + _Index++;
stomp["content-type"] = "application/json;charset=utf-8";
stomp["accept-version"] = "1.2";
stomp["clientId"] = RandomString(20);
_WebSocket.Send(stomp.ToString());
await Task.Delay(500);
}
///
/// 发送消息
///
public async void Send(string destination, object content)
{
var stomp = new StompMessage("SEND", System.Text.Json.JsonSerializer.Serialize(content));
stomp["id"] = "cs-" + _Index++;
stomp["destination"] = destination;
stomp["content-type"] = "application/json;charset=utf-8";
stomp["accept-version"] = "1.2";
stomp["clientId"] = RandomString(20);
_WebSocket.Send(stomp.ToString());
await Task.Delay(500);
}
///
/// 发送消息
///
public async void Send(string destination, string command, string body, Dictionary headers)
{
var stomp = new StompMessage(command, body + "\r\n", headers);
stomp["id"] = "cs-" + _Index++;
stomp["destination"] = destination;
stomp["content-type"] = "application/json;charset=utf-8";
stomp["accept-version"] = "1.2";
stomp["clientId"] = RandomString(20);
string stompText = stomp.ToString();
_WebSocket.Send(stompText);
await Task.Delay(500);
}
///
/// 获取随机字符串
///
/// 字符串长度
/// 随机字符串
private string RandomString(int length)
{
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
var random = new Random();
return new string(Enumerable.Repeat(chars, length)
.Select(s => s[random.Next(s.Length)]).ToArray());
}
///
/// 关闭WebSocket连接
///
internal void Close()
{
_WebSocket.Close();
}
}
public class DateTimeConverter : JsonConverter
{
public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
DateTime.TryParse(reader.GetString(), out DateTime dt);
return dt;
}
public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
{
writer.WriteStringValue(value.ToString("yyyy-MM-dd HH:mm:ss"));
}
}
}