123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- package com.gyee.alarm.service;
- import com.gyee.alarm.feigns.IAlarmService;
- import com.gyee.alarm.init.CacheContext;
- import com.gyee.alarm.model.auto.ProBasicEquipmentPoint;
- import com.gyee.alarm.model.vo.AlarmCustomTag;
- import com.gyee.alarm.service.auto.IAlarmTsService;
- import com.gyee.alarm.task.thread.AlarmThread;
- import com.gyee.alarm.task.thread.ReadWtDataThread;
- import com.gyee.alarm.util.realtimesource.IEdosUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.*;
- @Service
- @Slf4j
- public class AlarmCustomAsyncService {
- private Logger logger = LoggerFactory.getLogger(this.getClass());
- @Value("${task-count}")
- private Integer taskCount;
- @Value("${interval}")
- private Integer interval;
- @Value("${read-count}")
- private Integer readCount;
- @Value("${read-rows}")
- private Integer readRows;
- @Value("${alarmType}")
- private String alarmType;
- @Resource
- private IEdosUtil edosUtil;
- @Resource
- private IAlarmService alarmService;
- private List<AlarmCustomTag> alarmTags = new CopyOnWriteArrayList<>();
- @Resource
- private IAlarmTsService alarmTsService;
- @Resource
- private CustomAsyncService customAsyncService;
- /**
- * 初始化,从服务端获得自定义报警
- */
- public void init() {
- alarmTags = alarmService.findTagsByCt();
- }
- public void taskJobs(boolean runStarted) throws Exception {
- while (runStarted) {
- //读取庚顿数据库实时数据
- readTaskJobs(runStarted);
- Thread.sleep((interval * 1000));
- //针对读取的数据库进行,自定义报警判定
- calTaskJobs(runStarted);
- Thread.sleep((interval * 1000));
- }
- }
- /**
- * 读取测点数据,缓存到内存
- */
- public void readTaskJobs(boolean runStarted) throws Exception {
- List<ProBasicEquipmentPoint> wtPointList = CacheContext.wtPointList;// 设备测点集合
- List<ProBasicEquipmentPoint> fdPointList =new ArrayList<>();
- List<ProBasicEquipmentPoint> gdPointList =new ArrayList<>();
- if(!wtPointList.isEmpty())
- {
- for(ProBasicEquipmentPoint tag:wtPointList)
- {
- if(tag.getNemCode().startsWith("GF"))
- {
- gdPointList.add(tag);
- }else
- {
- fdPointList.add(tag);
- }
- }
- }
- // List<ProBasicPowerstationPoint> wsPointList = CacheContext.wsPointList; //升压站测点集合
- //读取设备测点
- List<ProBasicEquipmentPoint> wtpols = new ArrayList<>();
- int wtlistNumber = fdPointList.size() / readCount;
- // int wslistNumber = wsPointList.size() / readCount;
- if (fdPointList.size() % readCount != 0) {
- wtlistNumber=wtlistNumber+1;
- }
- // if (wsPointList.size() % readCount != 0) {
- // wslistNumber = wslistNumber+1;
- // }
- List<CompletableFuture<String>> futures = new ArrayList<>();
- int time = 0;
- StringBuilder str = new StringBuilder();
- str.append("设备读取总数:").append(fdPointList.size()).append(",线程数:").append(wtlistNumber ).append(",分块大小:").append(readCount);
- log.info(String.valueOf(str));
- for (int i = 0; i < fdPointList.size(); i++) {
- wtpols.add(fdPointList.get(i));
- if (i != 0 && (i) % readCount == 0) {
- time++;
- str.setLength(0);
- str.append("数据读取").append(time).append("线程");
- futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str)));
- wtpols = new ArrayList<>();
- }
- }
- if (!wtpols.isEmpty()) {
- time++;
- str.setLength(0);
- str.append("数据读取").append(time).append("线程");
- futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str)));
- }
- /***************************************************************************************************************************/
- //读取设备测点
- wtpols = new ArrayList<>();
- wtlistNumber = gdPointList.size() / readCount;
- // int wslistNumber = wsPointList.size() / readCount;
- if (gdPointList.size() % readCount != 0) {
- wtlistNumber=wtlistNumber+1;
- }
- // if (wsPointList.size() % readCount != 0) {
- // wslistNumber = wslistNumber+1;
- // }
- time = 0;
- str = new StringBuilder();
- str.append("设备读取总数:").append(fdPointList.size()).append(",线程数:").append(wtlistNumber ).append(",分块大小:").append(readCount);
- log.info(String.valueOf(str));
- for (int i = 0; i < gdPointList.size(); i++) {
- wtpols.add(gdPointList.get(i));
- if (i != 0 && (i) % readCount == 0) {
- time++;
- str.setLength(0);
- str.append("数据读取").append(time).append("线程");
- futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str)));
- wtpols = new ArrayList<>();
- }
- }
- if (!wtpols.isEmpty()) {
- time++;
- str.setLength(0);
- str.append("数据读取").append(time).append("线程");
- futures.add( customAsyncService.readCall( edosUtil, readRows, wtpols,String.valueOf(str)));
- }
- /***************************************************************************************************************************/
- //读取升压站测点
- // List<ProBasicPowerstationPoint> wspols = new ArrayList<>();
- // for (int i = 0; i < wsPointList.size(); i++) {
- // wspols.add(wsPointList.get(i));
- // if (i != 0 && (i) % readCount == 0) {
- // time++;
- // str.setLength(0);
- // str.append("数据读取").append(time).append("线程");
- // new Thread(new ReadWsDataThread(executor, edosUtil, readRows, wspols, interval, str, countDownLatch)).start();
- // wspols = new ArrayList<>();
- // }
- // }
- //
- // if (!wspols.isEmpty()) {
- // time++;
- // str.setLength(0);
- // str.append("数据读取").append(time).append("线程");
- // new Thread(new ReadWsDataThread(executor, edosUtil, readRows, wspols, interval, str, countDownLatch)).start();
- //
- // }
- // 等待所有任务完成
- CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]);
- CompletableFuture.allOf(futuresArray).join();
- log.info("数据读取完成!");
- }
- public void calTaskJobs(boolean runStarted) throws Exception {
- List<CompletableFuture<String>> futures = new ArrayList<>();
- List<AlarmCustomTag> alarmls = new ArrayList<>();
- // int number = 0;
- int listNumber = alarmTags.size() / taskCount;
- if (alarmTags.size() % taskCount == 0) {
- listNumber = listNumber+1;
- }
- StringBuilder str = new StringBuilder();
- str.append("自定义分析总数:").append(alarmTags.size()).append(",线程数:").append(listNumber).append(",分块大小:").append(taskCount);
- log.info(String.valueOf(str));
- int time = 0;
- for (int i = 0; i < alarmTags.size(); i++) {
- alarmls.add(alarmTags.get(i));
- if (i != 0 && (i) % taskCount == 0) {
- time++;
- str.setLength(0);
- str.append("自定义分析").append(time).append("线程");
- futures.add( customAsyncService.checkCall(alarmService, alarmls, str, readRows, alarmTsService));
- alarmls = new ArrayList<>();
- }
- }
- if (!alarmls.isEmpty()) {
- time++;
- str.setLength(0);
- str.append("自定义分析").append(time).append("线程");
- futures.add( customAsyncService.checkCall(alarmService, alarmls, str, readRows, alarmTsService));
- }
- CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]);
- CompletableFuture.allOf(futuresArray).join();
- log.info("自定义分析完成!");
- }
- //
- // }
- }
|