|
@@ -0,0 +1,327 @@
|
|
|
+package com.gyee.gaia.state.wind.service;
|
|
|
+
|
|
|
+import com.gyee.gaia.state.wind.config.Status8Properties;
|
|
|
+import com.gyee.gaia.state.wind.entity.StationInfo;
|
|
|
+import com.gyee.gaia.state.wind.entity.TagInfo;
|
|
|
+import com.gyee.gaia.state.wind.entity.WindturbineInfo;
|
|
|
+import com.gyee.gaia.state.wind.restful.RestfulClient;
|
|
|
+import com.gyee.wisdom.common.data.timeseries.DoubleTsData;
|
|
|
+import com.gyee.wisdom.common.data.timeseries.GeneralTsData;
|
|
|
+import com.gyee.wisdom.common.data.timeseries.TsPointData;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class CalculateServer {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private Status8Properties status8Properties;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private Status8Service status8Service;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private RestfulClient restfulClient;
|
|
|
+
|
|
|
+ private boolean serverStarted = false;
|
|
|
+ private boolean readThreadFlag = false;
|
|
|
+ private boolean calcThreadFlag = false;
|
|
|
+ //只有数据加载线程执行成功一次后,才开始执行计算线程
|
|
|
+ private boolean readRtdbSuccess = false;
|
|
|
+
|
|
|
+ private Map<String, StationInfo> stationInfoMap;
|
|
|
+ private Map<String, DoubleTsData> result;
|
|
|
+
|
|
|
+ public boolean start() {
|
|
|
+ if (serverStarted) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ log.info("明细状态计算服务正在启动...... ");
|
|
|
+ log.info("开始加载配置及测点数据......");
|
|
|
+ stationInfoMap = status8Service.getStationInfoMap();
|
|
|
+ log.info("配置数据加载完成,场站数:{}", stationInfoMap.size());
|
|
|
+
|
|
|
+ loadCurrentStatus();
|
|
|
+
|
|
|
+ readThreadFlag = true;
|
|
|
+ getReadThread().start();
|
|
|
+ sleep(1000);
|
|
|
+ calcThreadFlag = true;
|
|
|
+ getCalcThread().start();
|
|
|
+
|
|
|
+ serverStarted = true;
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("------------异常---------------", ex.getMessage());
|
|
|
+ serverStarted = false;
|
|
|
+ stop();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void stop() {
|
|
|
+ readThreadFlag = false;
|
|
|
+ calcThreadFlag = false;
|
|
|
+ try {
|
|
|
+ Thread.sleep(3000);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void restart() {
|
|
|
+ }
|
|
|
+
|
|
|
+ Thread getReadThread() {
|
|
|
+ return new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ log.info("数据加载线程启动...");
|
|
|
+ while (readThreadFlag) {
|
|
|
+ try {
|
|
|
+ for (StationInfo stInfo : stationInfoMap.values()) {
|
|
|
+ try {
|
|
|
+ String keys = stInfo.getReadTagKeys();
|
|
|
+ Map<String, DoubleTsData> result = restfulClient.getLatest(keys);
|
|
|
+ for (Map.Entry<String, DoubleTsData> entry : result.entrySet()) {
|
|
|
+ if (stInfo.getReadMap().containsKey(entry.getKey())) {
|
|
|
+ TagInfo tagInfo = stInfo.getReadMap().get(entry.getKey());
|
|
|
+
|
|
|
+ if (tagInfo.getLastUpdateTsData() == null ||
|
|
|
+ tagInfo.getLastUpdateTsData().getDoubleValue() != entry.getValue().getDoubleValue())
|
|
|
+ stInfo.getReadMap().get(entry.getKey()).setLastUpdateTsData(entry.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (readRtdbSuccess == false)
|
|
|
+ readRtdbSuccess = true;
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error(ex.getMessage());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ System.out.print('.');
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error(ex.getMessage());
|
|
|
+ }
|
|
|
+ sleep(status8Properties.getReadThreadInterval());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ Thread getCalcThread() {
|
|
|
+ return new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ log.info("状态计算线程启动...");
|
|
|
+ while (calcThreadFlag) {
|
|
|
+ if (readRtdbSuccess == false) {
|
|
|
+ sleep(status8Properties.getCalcThreadInterval());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ try {
|
|
|
+ for (StationInfo stInfo : stationInfoMap.values()) {
|
|
|
+ Map<String, WindturbineInfo> wtMap = stInfo.getWindturbineMap();
|
|
|
+ for (Map.Entry<String, WindturbineInfo> entry : wtMap.entrySet()) {
|
|
|
+ String wtId = entry.getKey();
|
|
|
+ WindturbineInfo wtInfo = entry.getValue();
|
|
|
+ wtInfo.resetStatusArray();
|
|
|
+ Map<String, TagInfo> tagMap = wtInfo.getTagMap();
|
|
|
+ for (TagInfo tagInfo : tagMap.values()) {
|
|
|
+ if (tagInfo.getLastUpdateTsData() == null) {
|
|
|
+ log.info("empty value, tagId = " + tagInfo.getId());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String strStatus = tagInfo.getRelateStatus();
|
|
|
+ if (strStatus.equals("YGGL")) {
|
|
|
+ wtInfo.setYGGL(tagInfo.getLastUpdateTsData().getDoubleValue());
|
|
|
+ if (wtInfo.getLastUpdateTs() < tagInfo.getLastUpdateTsData().getTs()) {
|
|
|
+ wtInfo.setLastUpdateTs(tagInfo.getLastUpdateTsData().getTs());
|
|
|
+ wtInfo.setSystemTs(System.currentTimeMillis());
|
|
|
+ }
|
|
|
+ } else if (strStatus.equals("LX")) {
|
|
|
+ if (wtInfo.getLastUpdateTs() < tagInfo.getLastUpdateTsData().getTs()) {
|
|
|
+ wtInfo.setLastUpdateTs(tagInfo.getLastUpdateTsData().getTs());
|
|
|
+ wtInfo.setSystemTs(System.currentTimeMillis());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ wtInfo.updateStatus(tagInfo.getRelateStatus(), tagInfo.getLastUpdateTsData().getDoubleValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ List<TsPointData> writeList = new ArrayList<>();
|
|
|
+ //List<String> shutdownWtList = new ArrayList<>();
|
|
|
+ stInfo.resetCntStatus();
|
|
|
+ long ts = new Date().getTime();
|
|
|
+ for (WindturbineInfo wtInfo : wtMap.values()) {
|
|
|
+ String wtId = wtInfo.getId();
|
|
|
+ double wtStatus = wtInfo.getStatus8(status8Properties.getOfflineInterval());
|
|
|
+ TagInfo tagInfo = stInfo.getFjztMap().get(wtId);
|
|
|
+ if (tagInfo.getLastUpdateTsData() == null)
|
|
|
+ tagInfo.setLastUpdateTsData(new DoubleTsData(ts, (short) 0, 0));
|
|
|
+
|
|
|
+ if (tagInfo.getLastUpdateTsData().getDoubleValue() != wtStatus) {
|
|
|
+
|
|
|
+ log.info(wtId + "状态变化,N:" + wtStatus + ", O:" + tagInfo.getLastUpdateTsData().getDoubleValue());
|
|
|
+
|
|
|
+ TsPointData tsPointData = new TsPointData();
|
|
|
+ tsPointData.setTagName(tagInfo.getId());
|
|
|
+ GeneralTsData generalTsData = new GeneralTsData();
|
|
|
+ generalTsData.setTs(ts);
|
|
|
+ generalTsData.setDoubleValue(Optional.ofNullable(wtStatus));
|
|
|
+ tsPointData.setTsData(generalTsData);
|
|
|
+ writeList.add(tsPointData);
|
|
|
+
|
|
|
+ //写故障状态点
|
|
|
+ if (wtStatus == 4.0 || wtStatus == 5.0) {
|
|
|
+ //log.info("aa");
|
|
|
+ TagInfo tagInfo2 = stInfo.getGzztMap().get(wtId);
|
|
|
+ TsPointData tsPointData2 = new TsPointData();
|
|
|
+ tsPointData2.setTagName(tagInfo2.getId());
|
|
|
+ GeneralTsData generalTsData2 = new GeneralTsData();
|
|
|
+ generalTsData2.setTs(ts);
|
|
|
+ if (wtStatus == 4.0)
|
|
|
+ generalTsData2.setDoubleValue(Optional.ofNullable(0.0));
|
|
|
+ else
|
|
|
+ generalTsData2.setDoubleValue(Optional.ofNullable(1.0));
|
|
|
+ tsPointData2.setTsData(generalTsData2);
|
|
|
+ writeList.add(tsPointData2);
|
|
|
+ //log.info("bb");
|
|
|
+ }
|
|
|
+
|
|
|
+ // if (wtStatus == 0)
|
|
|
+ // shutdownWtList.add(wtId);
|
|
|
+
|
|
|
+ tagInfo.setLastUpdateTsData(new DoubleTsData(ts, (short) 0, wtStatus));
|
|
|
+ }
|
|
|
+
|
|
|
+ stInfo.updateCntStatus(wtStatus);
|
|
|
+ }
|
|
|
+
|
|
|
+ //Map<String, TagInfo> zttsMap = stInfo.getZttsMap();
|
|
|
+ for (TagInfo tagInfo : stInfo.getZttsMap().values()) {
|
|
|
+ TsPointData tsPointData = new TsPointData();
|
|
|
+ tsPointData.setTagName(tagInfo.getId());
|
|
|
+ GeneralTsData generalTsData = new GeneralTsData();
|
|
|
+ generalTsData.setTs(ts);
|
|
|
+
|
|
|
+ switch (tagInfo.getUniformCode()) {
|
|
|
+ case "TJTS":
|
|
|
+ generalTsData.setDoubleValue(Optional.ofNullable((double) stInfo.getCntTJ()));
|
|
|
+ break;
|
|
|
+ case "SDTS":
|
|
|
+ generalTsData.setDoubleValue(Optional.ofNullable((double) stInfo.getCntSD()));
|
|
|
+ break;
|
|
|
+ case "DJTS":
|
|
|
+ generalTsData.setDoubleValue(Optional.ofNullable((double) stInfo.getCntDJ()));
|
|
|
+ break;
|
|
|
+ case "QDTS":
|
|
|
+ generalTsData.setDoubleValue(Optional.ofNullable((double) stInfo.getCntQD()));
|
|
|
+ break;
|
|
|
+ case "GZTS":
|
|
|
+ generalTsData.setDoubleValue(Optional.ofNullable((double) stInfo.getCntGZ()));
|
|
|
+ break;
|
|
|
+ case "WHTS":
|
|
|
+ generalTsData.setDoubleValue(Optional.ofNullable((double) stInfo.getCntWH()));
|
|
|
+ break;
|
|
|
+ case "BWTS":
|
|
|
+ generalTsData.setDoubleValue(Optional.ofNullable((double) stInfo.getCntBW()));
|
|
|
+ break;
|
|
|
+ case "LXTS":
|
|
|
+ generalTsData.setDoubleValue(Optional.ofNullable((double) stInfo.getCntLX()));
|
|
|
+ break;
|
|
|
+ // default:
|
|
|
+ // generalTsData.setDoubleValue(Optional.ofNullable((double)0));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ //变化存
|
|
|
+ double preValue = tagInfo.getLastUpdateTsData().getDoubleValue();
|
|
|
+ if (generalTsData.getDoubleValue().get() != preValue) {
|
|
|
+ tsPointData.setTsData(generalTsData);
|
|
|
+ writeList.add(tsPointData);
|
|
|
+ tagInfo.setLastUpdateTsData(new DoubleTsData(ts, (short) 0, generalTsData.getDoubleValue().get()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (writeList.size() > 0) {
|
|
|
+ try {
|
|
|
+ restfulClient.writeLatestPointList(writeList);
|
|
|
+ log.info("*******状态变化:写入数据库" + writeList.size() + "个点**********");
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error(ex.getMessage());
|
|
|
+ loadCurrentStatus();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // if (shutdownWtList.size() > 0) {
|
|
|
+ // snapService.saveFaultSnaps(shutdownWtList);
|
|
|
+ // log.info("停机风机:" + shutdownWtList.size());
|
|
|
+ // }
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.info(ex.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ sleep(status8Properties.getCalcThreadInterval());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sleep(int milliseconds) {
|
|
|
+ try {
|
|
|
+ TimeUnit.MILLISECONDS.sleep(milliseconds);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.info(ex.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void loadCurrentStatus() {
|
|
|
+ log.info("从实时库读取风机当前状态及状态台数......");
|
|
|
+ for (StationInfo stInfo : stationInfoMap.values()) {
|
|
|
+ try {
|
|
|
+ Map<String, TagInfo> tagMap = new HashMap<>();
|
|
|
+ for (TagInfo tagInfo : stInfo.getFjztMap().values()) {
|
|
|
+ if (tagMap.containsKey(tagInfo.getId()) == false)
|
|
|
+ tagMap.put(tagInfo.getId(), tagInfo);
|
|
|
+ }
|
|
|
+ for (TagInfo tagInfo : stInfo.getZttsMap().values()) {
|
|
|
+ if (tagMap.containsKey(tagInfo.getId()) == false)
|
|
|
+ tagMap.put(tagInfo.getId(), tagInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (tagMap.size() > 0) {
|
|
|
+ String[] arr = new String[tagMap.size()];
|
|
|
+ tagMap.keySet().toArray(arr);
|
|
|
+ String tagKeys = StringUtils.join(arr, ",");
|
|
|
+ Map<String, DoubleTsData> result = restfulClient.getLatest(tagKeys);
|
|
|
+ for (Map.Entry<String, DoubleTsData> entry : result.entrySet()) {
|
|
|
+ if (tagMap.containsKey(entry.getKey())) {
|
|
|
+ tagMap.get(entry.getKey()).setLastUpdateTsData(entry.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.info("加载" + stInfo.getStationId() + "风机状态和状态台数完成。");
|
|
|
+ }
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error(ex.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|