|
@@ -0,0 +1,273 @@
|
|
|
|
+package com.gyee.alarm.service;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+import com.gyee.alarm.feigns.IAlarmService;
|
|
|
|
+import com.gyee.alarm.init.CacheContext;
|
|
|
|
+import com.gyee.alarm.model.auto.ProBasicEquipmentPoint;
|
|
|
|
+import com.gyee.alarm.model.vo.AlarmCustomTag;
|
|
|
|
+import com.gyee.alarm.service.auto.IAlarmTsService;
|
|
|
|
+import com.gyee.alarm.task.thread.AlarmThread;
|
|
|
|
+import com.gyee.alarm.task.thread.ReadWtDataThread;
|
|
|
|
+import com.gyee.alarm.util.realtimesource.IEdosUtil;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.concurrent.*;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@Service
|
|
|
|
+@Slf4j
|
|
|
|
+public class AlarmCustomAsyncService {
|
|
|
|
+
|
|
|
|
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
|
|
+
|
|
|
|
+ @Value("${task-count}")
|
|
|
|
+ private Integer taskCount;
|
|
|
|
+
|
|
|
|
+ @Value("${interval}")
|
|
|
|
+ private Integer interval;
|
|
|
|
+
|
|
|
|
+ @Value("${read-count}")
|
|
|
|
+ private Integer readCount;
|
|
|
|
+
|
|
|
|
+ @Value("${read-rows}")
|
|
|
|
+ private Integer readRows;
|
|
|
|
+
|
|
|
|
+ @Value("${alarmType}")
|
|
|
|
+ private String alarmType;
|
|
|
|
+ @Resource
|
|
|
|
+ private IEdosUtil edosUtil;
|
|
|
|
+ @Resource
|
|
|
|
+ private IAlarmService alarmService;
|
|
|
|
+
|
|
|
|
+ private List<AlarmCustomTag> alarmTags = new CopyOnWriteArrayList<>();
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private IAlarmTsService alarmTsService;
|
|
|
|
+ @Resource
|
|
|
|
+ private CustomAsyncService customAsyncService;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 初始化
|
|
|
|
+ */
|
|
|
|
+ public void init() {
|
|
|
|
+
|
|
|
|
+ alarmTags = alarmService.findTagsByCt();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void taskJobs(boolean runStarted) throws Exception {
|
|
|
|
+
|
|
|
|
+ while (runStarted) {
|
|
|
|
+
|
|
|
|
+ readTaskJobs(runStarted);
|
|
|
|
+
|
|
|
|
+ Thread.sleep((interval * 1000));
|
|
|
|
+
|
|
|
|
+ calTaskJobs(runStarted);
|
|
|
|
+
|
|
|
|
+ Thread.sleep((interval * 1000));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 读取测点数据,缓存到内存
|
|
|
|
+ */
|
|
|
|
+ public void readTaskJobs(boolean runStarted) throws Exception {
|
|
|
|
+
|
|
|
|
+ List<ProBasicEquipmentPoint> wtPointList = CacheContext.wtPointList;// 设备测点集合
|
|
|
|
+
|
|
|
|
+ List<ProBasicEquipmentPoint> fdPointList =new ArrayList<>();
|
|
|
|
+ List<ProBasicEquipmentPoint> gdPointList =new ArrayList<>();
|
|
|
|
+ if(!wtPointList.isEmpty())
|
|
|
|
+ {
|
|
|
|
+ for(ProBasicEquipmentPoint tag:wtPointList)
|
|
|
|
+ {
|
|
|
|
+ if(tag.getNemCode().startsWith("GF"))
|
|
|
|
+ {
|
|
|
|
+ gdPointList.add(tag);
|
|
|
|
+ }else
|
|
|
|
+ {
|
|
|
|
+ fdPointList.add(tag);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+// List<ProBasicPowerstationPoint> wsPointList = CacheContext.wsPointList; //升压站测点集合
|
|
|
|
+
|
|
|
|
+ //读取设备测点
|
|
|
|
+ List<ProBasicEquipmentPoint> wtpols = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ int wtlistNumber = fdPointList.size() / readCount;
|
|
|
|
+// int wslistNumber = wsPointList.size() / readCount;
|
|
|
|
+
|
|
|
|
+ if (fdPointList.size() % readCount != 0) {
|
|
|
|
+ wtlistNumber=wtlistNumber+1;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+// if (wsPointList.size() % readCount != 0) {
|
|
|
|
+// wslistNumber = wslistNumber+1;
|
|
|
|
+// }
|
|
|
|
+ List<CompletableFuture<String>> futures = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ int time = 0;
|
|
|
|
+ StringBuilder str = new StringBuilder();
|
|
|
|
+ str.append("设备读取总数:").append(fdPointList.size()).append(",线程数:").append(wtlistNumber ).append(",分块大小:").append(readCount);
|
|
|
|
+ log.info(String.valueOf(str));
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < fdPointList.size(); i++) {
|
|
|
|
+ wtpols.add(fdPointList.get(i));
|
|
|
|
+ if (i != 0 && (i) % readCount == 0) {
|
|
|
|
+
|
|
|
|
+ time++;
|
|
|
|
+ str.setLength(0);
|
|
|
|
+ str.append("数据读取").append(time).append("线程");
|
|
|
|
+ futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str)));
|
|
|
|
+ wtpols = new ArrayList<>();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!wtpols.isEmpty()) {
|
|
|
|
+
|
|
|
|
+ time++;
|
|
|
|
+ str.setLength(0);
|
|
|
|
+ str.append("数据读取").append(time).append("线程");
|
|
|
|
+ futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str)));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+/***************************************************************************************************************************/
|
|
|
|
+
|
|
|
|
+ //读取设备测点
|
|
|
|
+ wtpols = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ wtlistNumber = gdPointList.size() / readCount;
|
|
|
|
+// int wslistNumber = wsPointList.size() / readCount;
|
|
|
|
+
|
|
|
|
+ if (gdPointList.size() % readCount != 0) {
|
|
|
|
+ wtlistNumber=wtlistNumber+1;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+// if (wsPointList.size() % readCount != 0) {
|
|
|
|
+// wslistNumber = wslistNumber+1;
|
|
|
|
+// }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ time = 0;
|
|
|
|
+ str = new StringBuilder();
|
|
|
|
+ str.append("设备读取总数:").append(fdPointList.size()).append(",线程数:").append(wtlistNumber ).append(",分块大小:").append(readCount);
|
|
|
|
+ log.info(String.valueOf(str));
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < gdPointList.size(); i++) {
|
|
|
|
+ wtpols.add(gdPointList.get(i));
|
|
|
|
+ if (i != 0 && (i) % readCount == 0) {
|
|
|
|
+
|
|
|
|
+ time++;
|
|
|
|
+ str.setLength(0);
|
|
|
|
+ str.append("数据读取").append(time).append("线程");
|
|
|
|
+ futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str)));
|
|
|
|
+ wtpols = new ArrayList<>();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!wtpols.isEmpty()) {
|
|
|
|
+
|
|
|
|
+ time++;
|
|
|
|
+ str.setLength(0);
|
|
|
|
+ str.append("数据读取").append(time).append("线程");
|
|
|
|
+ futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str)));
|
|
|
|
+ }
|
|
|
|
+/***************************************************************************************************************************/
|
|
|
|
+
|
|
|
|
+ //读取升压站测点
|
|
|
|
+
|
|
|
|
+// List<ProBasicPowerstationPoint> wspols = new ArrayList<>();
|
|
|
|
+// for (int i = 0; i < wsPointList.size(); i++) {
|
|
|
|
+// wspols.add(wsPointList.get(i));
|
|
|
|
+// if (i != 0 && (i) % readCount == 0) {
|
|
|
|
+// time++;
|
|
|
|
+// str.setLength(0);
|
|
|
|
+// str.append("数据读取").append(time).append("线程");
|
|
|
|
+// new Thread(new ReadWsDataThread(executor, edosUtil, readRows, wspols, interval, str, countDownLatch)).start();
|
|
|
|
+// wspols = new ArrayList<>();
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// if (!wspols.isEmpty()) {
|
|
|
|
+// time++;
|
|
|
|
+// str.setLength(0);
|
|
|
|
+// str.append("数据读取").append(time).append("线程");
|
|
|
|
+// new Thread(new ReadWsDataThread(executor, edosUtil, readRows, wspols, interval, str, countDownLatch)).start();
|
|
|
|
+//
|
|
|
|
+// }
|
|
|
|
+ // 等待所有任务完成
|
|
|
|
+ CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]);
|
|
|
|
+ CompletableFuture.allOf(futuresArray).join();
|
|
|
|
+ log.info("数据读取完成!");
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ public void calTaskJobs(boolean runStarted) throws Exception {
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ List<CompletableFuture<String>> futures = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ List<AlarmCustomTag> alarmls = new ArrayList<>();
|
|
|
|
+// int number = 0;
|
|
|
|
+ int listNumber = alarmTags.size() / taskCount;
|
|
|
|
+
|
|
|
|
+ if (alarmTags.size() % taskCount == 0) {
|
|
|
|
+ listNumber = listNumber+1;
|
|
|
|
+ }
|
|
|
|
+ StringBuilder str = new StringBuilder();
|
|
|
|
+ str.append("自定义分析总数:").append(alarmTags.size()).append(",线程数:").append(listNumber).append(",分块大小:").append(taskCount);
|
|
|
|
+ log.info(String.valueOf(str));
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ int time = 0;
|
|
|
|
+ for (int i = 0; i < alarmTags.size(); i++) {
|
|
|
|
+ alarmls.add(alarmTags.get(i));
|
|
|
|
+ if (i != 0 && (i) % taskCount == 0) {
|
|
|
|
+
|
|
|
|
+ time++;
|
|
|
|
+ str.setLength(0);
|
|
|
|
+ str.append("自定义分析").append(time).append("线程");
|
|
|
|
+
|
|
|
|
+ futures.add( customAsyncService.checkCall(alarmService, alarmls, str, readRows, alarmTsService));
|
|
|
|
+ alarmls = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!alarmls.isEmpty()) {
|
|
|
|
+
|
|
|
|
+ time++;
|
|
|
|
+ str.setLength(0);
|
|
|
|
+ str.append("自定义分析").append(time).append("线程");
|
|
|
|
+ futures.add( customAsyncService.checkCall(alarmService, alarmls, str, readRows, alarmTsService));
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]);
|
|
|
|
+ CompletableFuture.allOf(futuresArray).join();
|
|
|
|
+
|
|
|
|
+ log.info("自定义分析完成!");
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+//
|
|
|
|
+// }
|
|
|
|
+}
|