using NEIntelligentControl2.Models; using NEIntelligentControl2.Models.Messages; using NEIntelligentControl2.Models.Windturbine; using NEIntelligentControl2.Service.Windturbine; using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading.Tasks; namespace NEIntelligentControl2.Service.WebSocket { /// /// 用于消息收发,WebSocket /// public class MessageBridge { private static readonly object _RegisterLocker = new object(); private Dictionary> _Observers; // 订阅者 private WebSocketStomp _WebSocket; // WebSocket private WebSocketStomp _Adapter; // 适配器 public WebSocketStomp WebSocket { get => _WebSocket; }// 用于消息发送 private static JsonSerializerOptions SerializerOptions = new JsonSerializerOptions() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; private CacheManager _CacheManager; private WEBHelper _WEBHelper; private string _Url; public MessageBridge(CacheManager cm, UrlManager um, WEBHelper web) { _Observers = new Dictionary>(); _CacheManager = cm; _WEBHelper = web; _Url = um.ServicePath; string calcurl = null; string adapterurl = null; try { #if (DEBUG) calcurl = ConfigurationManager.AppSettings["ServiceSocketPathDebug"]; adapterurl = ConfigurationManager.AppSettings["AdapterSocketPathDebug"]; #else calcurl = ConfigurationManager.AppSettings["ServiceSocketPath"]; calcurl = ConfigurationManager.AppSettings["AdapterSocketPath"]; #endif } catch (Exception e) { Console.WriteLine($"配置文件读取出现错误:{e.Message}"); } // WebSocket _WebSocket = new WebSocketStomp(WebSocket_OnMessage, calcurl, "SubscribeSuggestion".GetConfiguration(), "SubscribeRuntimeFault".GetConfiguration(), "SubscribeRuntimeAlarm".GetConfiguration(), "SubscribePopupAlarm".GetConfiguration()); _WebSocket.IsCheckTime = true; // DataAdapter _Adapter = new WebSocketStomp(WebSocket_OnMessage_Adapter, adapterurl, "WindturbineInformation".GetConfiguration(), "PVInformation".GetConfiguration()); SerializerOptions.Converters.Add(new DateTimeConverter()); Task.Run(RefreshCustomLockInfo); } /// /// 定时刷新自定义挂牌信息 /// private async void RefreshCustomLockInfo() { while (true) { try { List ls = _WEBHelper.HttpGetJSON>($"{_Url}/api/windturbine/customer-lock"); if (ls == null) continue; _CacheManager.CustomLockInfos = ls.ToDictionary(c => c.WindturbineID, c => c); } catch { } await Task.Delay(5000); } } private void WebSocket_OnMessage(object sender, WebSocketSharp.MessageEventArgs e) { var v = StompMessage.Deserialize(e.Data); _WebSocket.Time = DateTime.Now; if (v.Command.ToUpper() != "MESSAGE" || !v.Headers.ContainsKey("destination") || !_Observers.ContainsKey(v.Headers["destination"])) return; var value = GetValue(v); var vs = _Observers[v.Headers["destination"]].ToArray(); foreach (var iv in vs) { try { iv.OnMessage(value); } catch (Exception ee) { Console.WriteLine(ee.ToString()); } } } private void WebSocket_OnMessage_Adapter(object sender, WebSocketSharp.MessageEventArgs e) { var v = StompMessage.Deserialize(e.Data); if (v.Command.ToUpper() != "MESSAGE" || !v.Headers.ContainsKey("destination") || !_Observers.ContainsKey(v.Headers["destination"])) return; _Adapter.Time = DateTime.Now; var value = GetValue(v); var vs = _Observers[v.Headers["destination"]].ToArray(); foreach (var iv in vs) { try { iv.OnMessage(value); } catch (Exception ee) { Console.WriteLine(ee.ToString()); } } } /// /// 注册,在不需要使用消息的时候请取消注册,避免造成内存泄露 /// public void Register(IMessage message) { if (message == null || message.MessageTypes == null) return; lock (_RegisterLocker) { foreach (var v in message.MessageTypes) { if (v == null) { Console.WriteLine("注册消息不能为空"); continue; } if (!_Observers.ContainsKey(v)) { _Observers.Add(v, new HashSet()); } _Observers[v].Add(message); } } } /// /// 取消注册 /// public void Unregister(IMessage message) { if (message == null || message.MessageTypes == null) return; lock (_RegisterLocker) { foreach (var v in message.MessageTypes) { if (_Observers.ContainsKey(v)) { _Observers[v].Remove(message); } } } } /// /// 关闭WebSocket连接 /// public void Close() { _WebSocket.Close(); _Adapter.Close(); } /// /// 获取不同类型的数据 /// /// private object GetValue(StompMessage v) { if (!v.Headers.ContainsKey("destination")) return v; var type = v.Headers["destination"]; switch (type) { case "/topic/suggestion":// 启停推荐 return GetSuggestion(v); case "/topic/windturbine":// 风机信息 return GetWindturbineInfo(v); case "/topic/pv":// 光伏信息 return GetPVInfo(v); case "/topic/fault-popup":// 报警弹窗 return FalutPopupInfo(v); case "/topic/fault-count":// 故障数量 return GetFaultCount(v); case "/topic/alarm-count":// 报警数量 return GetAlarmCount(v); default: return v; } } private object GetAlarmCount(StompMessage v) { try { var val = JsonSerializer.Deserialize>(v.Body, SerializerOptions); return val; } catch (Exception e) { Console.WriteLine($"解析报警弹窗数据出现错误:{e.Message}"); } return null; } /// /// 故障数量信息解析 /// private object GetFaultCount(StompMessage v) { try { var val = JsonSerializer.Deserialize>(v.Body, SerializerOptions); return val; } catch (Exception e) { Console.WriteLine($"解析报警弹窗数据出现错误:{e.Message}"); } return null; } /// /// 报警弹窗信息解析 /// private object FalutPopupInfo(StompMessage v) { try { var val = JsonSerializer.Deserialize>(v.Body, SerializerOptions); return val; } catch (Exception e) { Console.WriteLine($"解析报警弹窗数据出现错误:{e.Message}"); } return null; } /// /// 光伏信息解析 /// private object GetPVInfo(StompMessage v) { try { var val = JsonSerializer.Deserialize>(v.Body, SerializerOptions); _CacheManager.PVInfos = val; return val; } catch (Exception e) { Console.WriteLine($"解析光伏数据出现错误:{e.Message}"); } return null; } /// /// 风机信息解析 /// private object GetWindturbineInfo(StompMessage sm) { try { var val = JsonSerializer.Deserialize>(sm.Body, SerializerOptions); _CacheManager.Windturbineinfos = val; return val; } catch (Exception e) { Console.WriteLine($"解析风机数据出现错误:{e.Message}"); } return null; } /// /// 启停推荐数据解析 /// private List GetSuggestion(StompMessage sm) { try { var val = JsonSerializer.Deserialize>(sm.Body, SerializerOptions); return val; } catch (Exception e) { Console.WriteLine($"解析启停推荐数据出现错误:{e.Message}"); } return null; } } public class WebSocketStomp { private WebSocketSharp.WebSocket _WebSocket; // 当前WebSocket private int _Index; // 发送序列 private bool _IsInitConnecting = false; // WebSocket是否正在重新连接 private string[] _Settings; // 要注册的数据类型 private string _Url; // 连接地址 private EventHandler _Message; // 消息发送 /// /// 是否检测时间,用于确定websocket是否断开连接 /// public bool IsCheckTime { get; set; } = false; /// /// 当前心跳时间 /// public DateTime Time { get; set; } public WebSocketStomp(EventHandler onMessage, string url, params string[] settings) { try { _Settings = settings; _Url = url; _Message = onMessage; Task.Run(Connect); // 用于定时检测WebSocket是否连接,如果断开则重新连接 Task.Factory.StartNew(Protector, TaskCreationOptions.LongRunning); } catch (Exception e) { Console.WriteLine($"连接WebSocket[{_Url}]出现错误:{e.Message}"); } } /// /// 连接WebSocket /// private void Connect() { _WebSocket?.Close(); _WebSocket = new WebSocketSharp.WebSocket(_Url); _WebSocket.OnMessage += _Message; _WebSocket.Connect(); if (!_WebSocket.IsAlive) { Console.WriteLine($"WebSocket连接失败,{DateTime.Now}"); return; } var connect = new StompMessage("CONNECT"); connect["accept-version"] = "1.2"; connect["host"] = ""; _WebSocket.Send(connect.ToString()); foreach (var v in _Settings) { Subscribe(v); } Time = DateTime.Now; Console.WriteLine($"WebSocket连接成功,{DateTime.Now}"); } /// /// 用于检测WebSocket连接是否被断开,如果断开则重新连接 /// private async void Protector() { while (true) { await Task.Delay(10000); if (IsCheckTime) { if ((DateTime.Now - Time).TotalSeconds < 90) continue; } else if ((_WebSocket != null && _WebSocket.IsAlive) || _IsInitConnecting) { continue; } Reconnect(); } } /// /// 重新连接WebSocket /// private void Reconnect() { Console.WriteLine($"WebSocket开始重新连接,{DateTime.Now}"); try { _IsInitConnecting = true; Connect(); } finally { _IsInitConnecting = false; } } /// /// 向服务器请求注册 /// public void Subscribe(string destination) { try { var sub = new StompMessage("SUBSCRIBE"); sub["id"] = "sub-" + _Index++; sub["destination"] = destination; _WebSocket.Send(sub.ToString()); } catch { } } /// /// 向服务器请求注册 /// public async void Subscribe(string subid, string destination) { _Index++; var sub = new StompMessage("SUBSCRIBE"); sub["id"] = subid; sub["destination"] = destination; _WebSocket.Send(sub.ToString()); await Task.Delay(500); } /// /// 向服务器请求注册 /// public async void Subscribe(string destination, Dictionary headers) { try { _Index++; var sub = new StompMessage("SUBSCRIBE"); foreach (var kv in headers) { sub[kv.Key] = kv.Value; } if (!headers.ContainsKey("id")) { sub["id"] = "sub-" + _Index++; } sub["destination"] = destination; _WebSocket.Send(sub.ToString()); await Task.Delay(500); } catch { } } /// /// 发送消息 /// public async void Send(StompMessage stomp) { if (stomp == null) return; stomp["id"] = "cs-" + _Index++; stomp["content-type"] = "application/json;charset=utf-8"; stomp["accept-version"] = "1.2"; stomp["clientId"] = RandomString(20); _WebSocket.Send(stomp.ToString()); await Task.Delay(500); } /// /// 发送消息 /// public async void Send(string destination, object content) { var stomp = new StompMessage("SEND", System.Text.Json.JsonSerializer.Serialize(content)); stomp["id"] = "cs-" + _Index++; stomp["destination"] = destination; stomp["content-type"] = "application/json;charset=utf-8"; stomp["accept-version"] = "1.2"; stomp["clientId"] = RandomString(20); _WebSocket.Send(stomp.ToString()); await Task.Delay(500); } /// /// 发送消息 /// public async void Send(string destination, string command, string body, Dictionary headers) { var stomp = new StompMessage(command, body + "\r\n", headers); stomp["id"] = "cs-" + _Index++; stomp["destination"] = destination; stomp["content-type"] = "application/json;charset=utf-8"; stomp["accept-version"] = "1.2"; stomp["clientId"] = RandomString(20); string stompText = stomp.ToString(); _WebSocket.Send(stompText); await Task.Delay(500); } /// /// 获取随机字符串 /// /// 字符串长度 /// 随机字符串 private string RandomString(int length) { const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; var random = new Random(); return new string(Enumerable.Repeat(chars, length) .Select(s => s[random.Next(s.Length)]).ToArray()); } /// /// 关闭WebSocket连接 /// internal void Close() { _WebSocket.Close(); } } public class DateTimeConverter : JsonConverter { public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { DateTime.TryParse(reader.GetString(), out DateTime dt); return dt; } public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options) { writer.WriteStringValue(value.ToString("yyyy-MM-dd HH:mm:ss")); } } }