123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559 |
- using NEIntelligentControl2.Models;
- using NEIntelligentControl2.Models.Messages;
- using NEIntelligentControl2.Models.Windturbine;
- using NEIntelligentControl2.Service.Windturbine;
- using System;
- using System.Collections.Generic;
- using System.Configuration;
- using System.Linq;
- using System.Text;
- using System.Text.Json;
- using System.Text.Json.Serialization;
- using System.Threading.Tasks;
- namespace NEIntelligentControl2.Service.WebSocket
- {
- /// <summary>
- /// 用于消息收发,WebSocket
- /// </summary>
- public class MessageBridge
- {
- private static readonly object _RegisterLocker = new object();
- private Dictionary<string, HashSet<IMessage>> _Observers; // 订阅者
- private WebSocketStomp _WebSocket; // WebSocket
- private WebSocketStomp _Adapter; // 适配器
- public WebSocketStomp WebSocket { get => _WebSocket; }// 用于消息发送
- private static JsonSerializerOptions SerializerOptions = new JsonSerializerOptions() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
- private CacheManager _CacheManager;
- private WEBHelper _WEBHelper;
- private string _Url;
- public MessageBridge(CacheManager cm, UrlManager um, WEBHelper web)
- {
- _Observers = new Dictionary<string, HashSet<IMessage>>();
- _CacheManager = cm;
- _WEBHelper = web;
- _Url = um.ServicePath;
- string calcurl = null;
- string adapterurl = null;
- try
- {
- #if (DEBUG)
- calcurl = ConfigurationManager.AppSettings["ServiceSocketPathDebug"];
- adapterurl = ConfigurationManager.AppSettings["AdapterSocketPathDebug"];
- #else
- calcurl = ConfigurationManager.AppSettings["ServiceSocketPath"];
- calcurl = ConfigurationManager.AppSettings["AdapterSocketPath"];
- #endif
- }
- catch (Exception e)
- {
- Console.WriteLine($"配置文件读取出现错误:{e.Message}");
- }
- // WebSocket
- _WebSocket = new WebSocketStomp(WebSocket_OnMessage, calcurl,
- "SubscribeSuggestion".GetConfiguration(), "SubscribeRuntimeFault".GetConfiguration(),
- "SubscribeRuntimeAlarm".GetConfiguration(), "SubscribePopupAlarm".GetConfiguration());
- _WebSocket.IsCheckTime = true;
- // DataAdapter
- _Adapter = new WebSocketStomp(WebSocket_OnMessage_Adapter, adapterurl, "WindturbineInformation".GetConfiguration(), "PVInformation".GetConfiguration());
- SerializerOptions.Converters.Add(new DateTimeConverter());
- Task.Run(RefreshCustomLockInfo);
- }
- /// <summary>
- /// 定时刷新自定义挂牌信息
- /// </summary>
- private async void RefreshCustomLockInfo()
- {
- while (true)
- {
- try
- {
- List<CustomLockInfo> ls = _WEBHelper.HttpGetJSON<List<CustomLockInfo>>($"{_Url}/api/windturbine/customer-lock");
- if (ls == null) continue;
- _CacheManager.CustomLockInfos = ls.ToDictionary(c => c.WindturbineID, c => c);
- }
- catch { }
- await Task.Delay(5000);
- }
- }
- private void WebSocket_OnMessage(object sender, WebSocketSharp.MessageEventArgs e)
- {
- var v = StompMessage.Deserialize(e.Data);
- _WebSocket.Time = DateTime.Now;
- if (v.Command.ToUpper() != "MESSAGE" || !v.Headers.ContainsKey("destination") || !_Observers.ContainsKey(v.Headers["destination"])) return;
- var value = GetValue(v);
- var vs = _Observers[v.Headers["destination"]].ToArray();
- foreach (var iv in vs)
- {
- try
- {
- iv.OnMessage(value);
- }
- catch (Exception ee)
- {
- Console.WriteLine(ee.ToString());
- }
- }
- }
- private void WebSocket_OnMessage_Adapter(object sender, WebSocketSharp.MessageEventArgs e)
- {
- var v = StompMessage.Deserialize(e.Data);
- if (v.Command.ToUpper() != "MESSAGE" || !v.Headers.ContainsKey("destination") || !_Observers.ContainsKey(v.Headers["destination"])) return;
- _Adapter.Time = DateTime.Now;
- var value = GetValue(v);
- var vs = _Observers[v.Headers["destination"]].ToArray();
- foreach (var iv in vs)
- {
- try
- {
- iv.OnMessage(value);
- }
- catch (Exception ee)
- {
- Console.WriteLine(ee.ToString());
- }
- }
- }
- /// <summary>
- /// 注册,在不需要使用消息的时候请取消注册,避免造成内存泄露
- /// </summary>
- public void Register(IMessage message)
- {
- if (message == null || message.MessageTypes == null) return;
- lock (_RegisterLocker)
- {
- foreach (var v in message.MessageTypes)
- {
- if (v == null)
- {
- Console.WriteLine("注册消息不能为空");
- continue;
- }
- if (!_Observers.ContainsKey(v))
- {
- _Observers.Add(v, new HashSet<IMessage>());
- }
- _Observers[v].Add(message);
- }
- }
- }
- /// <summary>
- /// 取消注册
- /// </summary>
- public void Unregister(IMessage message)
- {
- if (message == null || message.MessageTypes == null) return;
- lock (_RegisterLocker)
- {
- foreach (var v in message.MessageTypes)
- {
- if (_Observers.ContainsKey(v))
- {
- _Observers[v].Remove(message);
- }
- }
- }
- }
- /// <summary>
- /// 关闭WebSocket连接
- /// </summary>
- public void Close()
- {
- _WebSocket.Close();
- _Adapter.Close();
- }
- /// <summary>
- /// 获取不同类型的数据
- /// </summary>
- /// <returns></returns>
- private object GetValue(StompMessage v)
- {
- if (!v.Headers.ContainsKey("destination")) return v;
- var type = v.Headers["destination"];
- switch (type)
- {
- case "/topic/suggestion":// 启停推荐
- return GetSuggestion(v);
- case "/topic/windturbine":// 风机信息
- return GetWindturbineInfo(v);
- case "/topic/pv":// 光伏信息
- return GetPVInfo(v);
- case "/topic/fault-popup":// 报警弹窗
- return FalutPopupInfo(v);
- case "/topic/fault-count":// 故障数量
- return GetFaultCount(v);
- case "/topic/alarm-count":// 报警数量
- return GetAlarmCount(v);
- default: return v;
- }
- }
- private object GetAlarmCount(StompMessage v)
- {
- try
- {
- var val = JsonSerializer.Deserialize<List<Models.Alarm.AlarmInfo>>(v.Body, SerializerOptions);
- return val;
- }
- catch (Exception e)
- {
- Console.WriteLine($"解析报警弹窗数据出现错误:{e.Message}");
- }
- return null;
- }
- /// <summary>
- /// 故障数量信息解析
- /// </summary>
- private object GetFaultCount(StompMessage v)
- {
- try
- {
- var val = JsonSerializer.Deserialize<List<Models.Alarm.FaultInfoModel>>(v.Body, SerializerOptions);
- return val;
- }
- catch (Exception e)
- {
- Console.WriteLine($"解析报警弹窗数据出现错误:{e.Message}");
- }
- return null;
- }
- /// <summary>
- /// 报警弹窗信息解析
- /// </summary>
- private object FalutPopupInfo(StompMessage v)
- {
- try
- {
- var val = JsonSerializer.Deserialize<List<Models.Alarm.FaultInfo>>(v.Body, SerializerOptions);
- return val;
- }
- catch (Exception e)
- {
- Console.WriteLine($"解析报警弹窗数据出现错误:{e.Message}");
- }
- return null;
- }
- /// <summary>
- /// 光伏信息解析
- /// </summary>
- private object GetPVInfo(StompMessage v)
- {
- try
- {
- var val = JsonSerializer.Deserialize<Dictionary<string, Models.PV.PVInfo>>(v.Body, SerializerOptions);
- _CacheManager.PVInfos = val;
- return val;
- }
- catch (Exception e)
- {
- Console.WriteLine($"解析光伏数据出现错误:{e.Message}");
- }
- return null;
- }
- /// <summary>
- /// 风机信息解析
- /// </summary>
- private object GetWindturbineInfo(StompMessage sm)
- {
- try
- {
- var val = JsonSerializer.Deserialize<Dictionary<string, Models.Windturbine.WindturbineInfo>>(sm.Body, SerializerOptions);
- _CacheManager.Windturbineinfos = val;
- return val;
- }
- catch (Exception e)
- {
- Console.WriteLine($"解析风机数据出现错误:{e.Message}");
- }
- return null;
- }
- /// <summary>
- /// 启停推荐数据解析
- /// </summary>
- private List<Models.Windturbine.WindturbineSuggestion> GetSuggestion(StompMessage sm)
- {
- try
- {
- var val = JsonSerializer.Deserialize<List<Models.Windturbine.WindturbineSuggestion>>(sm.Body, SerializerOptions);
- return val;
- }
- catch (Exception e)
- {
- Console.WriteLine($"解析启停推荐数据出现错误:{e.Message}");
- }
- return null;
- }
- }
- public class WebSocketStomp
- {
- private WebSocketSharp.WebSocket _WebSocket; // 当前WebSocket
- private int _Index; // 发送序列
- private bool _IsInitConnecting = false; // WebSocket是否正在重新连接
- private string[] _Settings; // 要注册的数据类型
- private string _Url; // 连接地址
- private EventHandler<WebSocketSharp.MessageEventArgs> _Message; // 消息发送
- /// <summary>
- /// 是否检测时间,用于确定websocket是否断开连接
- /// </summary>
- public bool IsCheckTime { get; set; } = false;
- /// <summary>
- /// 当前心跳时间
- /// </summary>
- public DateTime Time { get; set; }
- public WebSocketStomp(EventHandler<WebSocketSharp.MessageEventArgs> onMessage, string url, params string[] settings)
- {
- try
- {
- _Settings = settings;
- _Url = url;
- _Message = onMessage;
- Task.Run(Connect);
- // 用于定时检测WebSocket是否连接,如果断开则重新连接
- Task.Factory.StartNew(Protector, TaskCreationOptions.LongRunning);
- }
- catch (Exception e)
- {
- Console.WriteLine($"连接WebSocket[{_Url}]出现错误:{e.Message}");
- }
- }
- /// <summary>
- /// 连接WebSocket
- /// </summary>
- private void Connect()
- {
- _WebSocket?.Close();
- _WebSocket = new WebSocketSharp.WebSocket(_Url);
- _WebSocket.OnMessage += _Message;
- _WebSocket.Connect();
- if (!_WebSocket.IsAlive)
- {
- Console.WriteLine($"WebSocket连接失败,{DateTime.Now}");
- return;
- }
- var connect = new StompMessage("CONNECT");
- connect["accept-version"] = "1.2";
- connect["host"] = "";
- _WebSocket.Send(connect.ToString());
- foreach (var v in _Settings)
- {
- Subscribe(v);
- }
- Time = DateTime.Now;
- Console.WriteLine($"WebSocket连接成功,{DateTime.Now}");
- }
- /// <summary>
- /// 用于检测WebSocket连接是否被断开,如果断开则重新连接
- /// </summary>
- private async void Protector()
- {
- while (true)
- {
- await Task.Delay(10000);
- if (IsCheckTime)
- {
- if ((DateTime.Now - Time).TotalSeconds < 90) continue;
- }
- else if ((_WebSocket != null && _WebSocket.IsAlive) || _IsInitConnecting)
- {
- continue;
- }
- Reconnect();
- }
- }
- /// <summary>
- /// 重新连接WebSocket
- /// </summary>
- private void Reconnect()
- {
- Console.WriteLine($"WebSocket开始重新连接,{DateTime.Now}");
- try
- {
- _IsInitConnecting = true;
- Connect();
- }
- finally
- {
- _IsInitConnecting = false;
- }
- }
- /// <summary>
- /// 向服务器请求注册
- /// </summary>
- public void Subscribe(string destination)
- {
- try
- {
- var sub = new StompMessage("SUBSCRIBE");
- sub["id"] = "sub-" + _Index++;
- sub["destination"] = destination;
- _WebSocket.Send(sub.ToString());
- }
- catch { }
- }
- /// <summary>
- /// 向服务器请求注册
- /// </summary>
- public async void Subscribe(string subid, string destination)
- {
- _Index++;
- var sub = new StompMessage("SUBSCRIBE");
- sub["id"] = subid;
- sub["destination"] = destination;
- _WebSocket.Send(sub.ToString());
- await Task.Delay(500);
- }
- /// <summary>
- /// 向服务器请求注册
- /// </summary>
- public async void Subscribe(string destination, Dictionary<string, string> headers)
- {
- try
- {
- _Index++;
- var sub = new StompMessage("SUBSCRIBE");
- foreach (var kv in headers)
- {
- sub[kv.Key] = kv.Value;
- }
- if (!headers.ContainsKey("id"))
- {
- sub["id"] = "sub-" + _Index++;
- }
- sub["destination"] = destination;
- _WebSocket.Send(sub.ToString());
- await Task.Delay(500);
- }
- catch { }
- }
- /// <summary>
- /// 发送消息
- /// </summary>
- public async void Send(StompMessage stomp)
- {
- if (stomp == null) return;
- stomp["id"] = "cs-" + _Index++;
- stomp["content-type"] = "application/json;charset=utf-8";
- stomp["accept-version"] = "1.2";
- stomp["clientId"] = RandomString(20);
- _WebSocket.Send(stomp.ToString());
- await Task.Delay(500);
- }
- /// <summary>
- /// 发送消息
- /// </summary>
- public async void Send(string destination, object content)
- {
- var stomp = new StompMessage("SEND", System.Text.Json.JsonSerializer.Serialize(content));
- stomp["id"] = "cs-" + _Index++;
- stomp["destination"] = destination;
- stomp["content-type"] = "application/json;charset=utf-8";
- stomp["accept-version"] = "1.2";
- stomp["clientId"] = RandomString(20);
- _WebSocket.Send(stomp.ToString());
- await Task.Delay(500);
- }
- /// <summary>
- /// 发送消息
- /// </summary>
- public async void Send(string destination, string command, string body, Dictionary<string, string> headers)
- {
- var stomp = new StompMessage(command, body + "\r\n", headers);
- stomp["id"] = "cs-" + _Index++;
- stomp["destination"] = destination;
- stomp["content-type"] = "application/json;charset=utf-8";
- stomp["accept-version"] = "1.2";
- stomp["clientId"] = RandomString(20);
- string stompText = stomp.ToString();
- _WebSocket.Send(stompText);
- await Task.Delay(500);
- }
- /// <summary>
- /// 获取随机字符串
- /// </summary>
- /// <param name="length">字符串长度</param>
- /// <returns>随机字符串</returns>
- private string RandomString(int length)
- {
- const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
- var random = new Random();
- return new string(Enumerable.Repeat(chars, length)
- .Select(s => s[random.Next(s.Length)]).ToArray());
- }
- /// <summary>
- /// 关闭WebSocket连接
- /// </summary>
- internal void Close()
- {
- _WebSocket.Close();
- }
- }
- public class DateTimeConverter : JsonConverter<DateTime>
- {
- public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
- {
- DateTime.TryParse(reader.GetString(), out DateTime dt);
- return dt;
- }
- public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
- {
- writer.WriteStringValue(value.ToString("yyyy-MM-dd HH:mm:ss"));
- }
- }
- }
|