package com.gyee.alarm.service; import com.gyee.alarm.feigns.IAlarmService; import com.gyee.alarm.init.CacheContext; import com.gyee.alarm.model.auto.ProBasicEquipmentPoint; import com.gyee.alarm.model.vo.AlarmCustomTag; import com.gyee.alarm.service.auto.IAlarmTsService; import com.gyee.alarm.task.thread.AlarmThread; import com.gyee.alarm.task.thread.ReadWtDataThread; import com.gyee.alarm.util.realtimesource.IEdosUtil; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; @Service @Slf4j public class AlarmCustomAsyncService { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${task-count}") private Integer taskCount; @Value("${interval}") private Integer interval; @Value("${read-count}") private Integer readCount; @Value("${read-rows}") private Integer readRows; @Value("${alarmType}") private String alarmType; @Resource private IEdosUtil edosUtil; @Resource private IAlarmService alarmService; private List alarmTags = new CopyOnWriteArrayList<>(); @Resource private IAlarmTsService alarmTsService; @Resource private CustomAsyncService customAsyncService; /** * 初始化,从服务端获得自定义报警 */ public void init() { alarmTags = alarmService.findTagsByCt(); } public void taskJobs(boolean runStarted) throws Exception { while (runStarted) { //读取庚顿数据库实时数据 readTaskJobs(runStarted); Thread.sleep((interval * 1000)); //针对读取的数据库进行,自定义报警判定 calTaskJobs(runStarted); Thread.sleep((interval * 1000)); } } /** * 读取测点数据,缓存到内存 */ public void readTaskJobs(boolean runStarted) throws Exception { List wtPointList = CacheContext.wtPointList;// 设备测点集合 List fdPointList =new ArrayList<>(); List gdPointList =new ArrayList<>(); if(!wtPointList.isEmpty()) { for(ProBasicEquipmentPoint tag:wtPointList) { if(tag.getNemCode().startsWith("GF")) { gdPointList.add(tag); }else { fdPointList.add(tag); } } } // List wsPointList = CacheContext.wsPointList; //升压站测点集合 //读取设备测点 List wtpols = new ArrayList<>(); int wtlistNumber = fdPointList.size() / readCount; // int wslistNumber = wsPointList.size() / readCount; if (fdPointList.size() % readCount != 0) { wtlistNumber=wtlistNumber+1; } // if (wsPointList.size() % readCount != 0) { // wslistNumber = wslistNumber+1; // } List> futures = new ArrayList<>(); int time = 0; StringBuilder str = new StringBuilder(); str.append("设备读取总数:").append(fdPointList.size()).append(",线程数:").append(wtlistNumber ).append(",分块大小:").append(readCount); log.info(String.valueOf(str)); for (int i = 0; i < fdPointList.size(); i++) { wtpols.add(fdPointList.get(i)); if (i != 0 && (i) % readCount == 0) { time++; str.setLength(0); str.append("数据读取").append(time).append("线程"); futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str))); wtpols = new ArrayList<>(); } } if (!wtpols.isEmpty()) { time++; str.setLength(0); str.append("数据读取").append(time).append("线程"); futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str))); } /***************************************************************************************************************************/ //读取设备测点 wtpols = new ArrayList<>(); wtlistNumber = gdPointList.size() / readCount; // int wslistNumber = wsPointList.size() / readCount; if (gdPointList.size() % readCount != 0) { wtlistNumber=wtlistNumber+1; } // if (wsPointList.size() % readCount != 0) { // wslistNumber = wslistNumber+1; // } time = 0; str = new StringBuilder(); str.append("设备读取总数:").append(fdPointList.size()).append(",线程数:").append(wtlistNumber ).append(",分块大小:").append(readCount); log.info(String.valueOf(str)); for (int i = 0; i < gdPointList.size(); i++) { wtpols.add(gdPointList.get(i)); if (i != 0 && (i) % readCount == 0) { time++; str.setLength(0); str.append("数据读取").append(time).append("线程"); futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str))); wtpols = new ArrayList<>(); } } if (!wtpols.isEmpty()) { time++; str.setLength(0); str.append("数据读取").append(time).append("线程"); futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str))); } /***************************************************************************************************************************/ //读取升压站测点 // List wspols = new ArrayList<>(); // for (int i = 0; i < wsPointList.size(); i++) { // wspols.add(wsPointList.get(i)); // if (i != 0 && (i) % readCount == 0) { // time++; // str.setLength(0); // str.append("数据读取").append(time).append("线程"); // new Thread(new ReadWsDataThread(executor, edosUtil, readRows, wspols, interval, str, countDownLatch)).start(); // wspols = new ArrayList<>(); // } // } // // if (!wspols.isEmpty()) { // time++; // str.setLength(0); // str.append("数据读取").append(time).append("线程"); // new Thread(new ReadWsDataThread(executor, edosUtil, readRows, wspols, interval, str, countDownLatch)).start(); // // } // 等待所有任务完成 CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]); CompletableFuture.allOf(futuresArray).join(); log.info("数据读取完成!"); } public void calTaskJobs(boolean runStarted) throws Exception { List> futures = new ArrayList<>(); List alarmls = new ArrayList<>(); // int number = 0; int listNumber = alarmTags.size() / taskCount; if (alarmTags.size() % taskCount == 0) { listNumber = listNumber+1; } StringBuilder str = new StringBuilder(); str.append("自定义分析总数:").append(alarmTags.size()).append(",线程数:").append(listNumber).append(",分块大小:").append(taskCount); log.info(String.valueOf(str)); int time = 0; for (int i = 0; i < alarmTags.size(); i++) { alarmls.add(alarmTags.get(i)); if (i != 0 && (i) % taskCount == 0) { time++; str.setLength(0); str.append("自定义分析").append(time).append("线程"); futures.add( customAsyncService.checkCall(alarmService, alarmls, str, readRows, alarmTsService)); alarmls = new ArrayList<>(); } } if (!alarmls.isEmpty()) { time++; str.setLength(0); str.append("自定义分析").append(time).append("线程"); futures.add( customAsyncService.checkCall(alarmService, alarmls, str, readRows, alarmTsService)); } CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]); CompletableFuture.allOf(futuresArray).join(); log.info("自定义分析完成!"); } // // } }