|
@@ -0,0 +1,220 @@
|
|
|
+package com.gyee.gaia.realtime.wind.job;
|
|
|
+
|
|
|
+import cn.hutool.core.date.DateUtil;
|
|
|
+import cn.hutool.core.util.NumberUtil;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
+import com.gyee.gaia.common.data.point.PointData;
|
|
|
+import com.gyee.gaia.common.data.point.TestingPoint;
|
|
|
+import com.gyee.gaia.common.data.taos.RealtimeAverageTarget;
|
|
|
+import com.gyee.gaia.dao.sql.point.ITestingPointService;
|
|
|
+import com.gyee.gaia.dao.sql.taos.IRealtimeAverageTargetService;
|
|
|
+import com.gyee.gaia.realtime.wind.adapter.IAdapterApi;
|
|
|
+import com.gyee.gaia.realtime.wind.config.AppConfig;
|
|
|
+import com.gyee.gaia.realtime.wind.init.CacheContext;
|
|
|
+import org.springframework.boot.ApplicationArguments;
|
|
|
+import org.springframework.boot.ApplicationRunner;
|
|
|
+import org.springframework.core.annotation.Order;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Order(2)
|
|
|
+@Component
|
|
|
+public class CauseJobHandler implements ApplicationRunner {
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ITestingPointService testingPointService;
|
|
|
+ @Resource
|
|
|
+ private AppConfig appConfig;
|
|
|
+ @Resource
|
|
|
+ private IRealtimeAverageTargetService realtimeAverageTargetService;
|
|
|
+ @Resource
|
|
|
+ private IAdapterApi adapterApi;
|
|
|
+ @Resource
|
|
|
+ private ThreadPoolTaskExecutor taskExecutor;
|
|
|
+ private String pointCodes;
|
|
|
+ private Map<String, ArrayDeque<PointData>> pdaqMap = new ConcurrentHashMap<>();
|
|
|
+ /**
|
|
|
+ * 风机id,uniformcode,点名
|
|
|
+ */
|
|
|
+ private Map<String, Map<String, String>> equipUcMap = new HashMap<>();
|
|
|
+ private Map<String, String> pjzb1fzMap;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(ApplicationArguments args) throws Exception {
|
|
|
+
|
|
|
+ CacheContext.equipMap.keySet().forEach(em -> equipUcMap.put(em, new HashMap<>()));
|
|
|
+
|
|
|
+ Collection<String> uniformCodeList = appConfig.getUniformcodeOne().values();
|
|
|
+
|
|
|
+ //根据uniformcode获取测点
|
|
|
+ QueryWrapper<TestingPoint> tpWrapper = new QueryWrapper<>();
|
|
|
+ tpWrapper.eq("thing_type", "windturbine");
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+
|
|
|
+ //获取最新更新时间
|
|
|
+ /*QueryWrapper<RealtimeAverageTarget> ratWrapper = new QueryWrapper<>();
|
|
|
+ ratWrapper.select("max(time)");
|
|
|
+ Map<String, Object> map = realtimeAverageTargetService.getMap(ratWrapper);
|
|
|
+ Date max;
|
|
|
+ if (map != null && map.size() > 0) {
|
|
|
+ max = (Date) map.get("max");
|
|
|
+ } else {
|
|
|
+ max = DateUtil.beginOfDay(DateUtil.date());
|
|
|
+ }*/
|
|
|
+
|
|
|
+ //风机号,时间,RealtimeAverageTarget
|
|
|
+ Map<String, Map<Long, RealtimeAverageTarget>> ratssmm = new HashMap<>();
|
|
|
+ List<RealtimeAverageTarget> rats = new ArrayList<>();
|
|
|
+ //5个指标
|
|
|
+ for (String value : uniformCodeList) {
|
|
|
+ //每个指标407个风机
|
|
|
+ List<TestingPoint> list = testingPointService.list(tpWrapper.clone().eq("uniform_code", value));
|
|
|
+
|
|
|
+ List<PointData> rawByKey;
|
|
|
+ RealtimeAverageTarget target = null;
|
|
|
+ //每个指标每个风机
|
|
|
+ for (TestingPoint point : list) {
|
|
|
+ String thingId = point.getThingId();
|
|
|
+ sb.append(",").append(point.getCode());
|
|
|
+ equipUcMap.get(thingId).put(value, point.getCode());
|
|
|
+ /*rawByKey = adapterApi.getRawByKey(point.getCode(), max.getTime(), System.currentTimeMillis());
|
|
|
+ Map<Long, Double> collect = rawByKey.stream().collect(Collectors.groupingBy(data -> data.getTs() / 60000, Collectors.averagingDouble(PointData::getDoubleValue)));
|
|
|
+ //TODO 补缺的时间
|
|
|
+ for (Map.Entry<Long, Double> entry : collect.entrySet()) {
|
|
|
+ if(target==null){
|
|
|
+ target = new RealtimeAverageTarget();
|
|
|
+ target.setTime(new Timestamp(entry.getKey()*60000));
|
|
|
+ target.setEquipmentId(thingId);
|
|
|
+ setRats(target, value,entry.getValue());
|
|
|
+ rats.add(target);
|
|
|
+
|
|
|
+ if(!ratssmm.containsKey(thingId)) ratssmm.put(thingId, new HashMap<>());
|
|
|
+ ratssmm.get(thingId).put(entry.getKey(), target);
|
|
|
+ }else {
|
|
|
+ RealtimeAverageTarget averageTarget = ratssmm.get(thingId).get(entry.getKey());
|
|
|
+ if(averageTarget!=null) setRats(averageTarget, value, entry.getValue());
|
|
|
+ }
|
|
|
+ }*/
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //realtimeAverageTargetService.saveBatch(rats, 3000);
|
|
|
+ QueryWrapper<RealtimeAverageTarget> ratWrapper = new QueryWrapper<>();
|
|
|
+ ratWrapper.select("tbname", "equipment_id").eq("uniform_code", "1FZPJZB");
|
|
|
+ List<RealtimeAverageTarget> ratList = realtimeAverageTargetService.list(ratWrapper);
|
|
|
+ pjzb1fzMap = ratList.stream().collect(Collectors.toMap(RealtimeAverageTarget::getEquipmentId, RealtimeAverageTarget::getTbname));
|
|
|
+
|
|
|
+ pointCodes = sb.delete(0, 1).toString();
|
|
|
+ init();
|
|
|
+ taskExecutor.submit(this::refreshQueue);
|
|
|
+ taskExecutor.submit(this::calcRealtimeAverageTarget);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setRats(RealtimeAverageTarget target, String uc, double value) {
|
|
|
+ if (uc.equals(appConfig.getUniformcodeOne().get("wind-speed"))) {
|
|
|
+ target.setWindSpeed(value);
|
|
|
+ } else if (uc.equals(appConfig.getUniformcodeOne().get("active-power"))) {
|
|
|
+ target.setPower(value);
|
|
|
+ } else if (uc.equals(appConfig.getUniformcodeOne().get("generator_speed"))) {
|
|
|
+ target.setGeneratorSpeed(value);
|
|
|
+ } else if (uc.equals(appConfig.getUniformcodeOne().get("impeller_speed"))) {
|
|
|
+ target.setImpellerSpeed(value);
|
|
|
+ } else if (uc.equals(appConfig.getUniformcodeOne().get("wind_direction"))) {
|
|
|
+ target.setWindDirection(value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void init() throws InterruptedException {
|
|
|
+
|
|
|
+ //缓存1分钟的值填满pdaqMap
|
|
|
+ for (int i = 0; i < 60; i++) {
|
|
|
+ Map<String, PointData> latest = adapterApi.getLatest(pointCodes);
|
|
|
+ if (i == 0) {
|
|
|
+ latest.forEach((k, v) -> {
|
|
|
+ ArrayDeque<PointData> pdaq = new ArrayDeque<>(60);
|
|
|
+ pdaq.offer(v);
|
|
|
+ pdaqMap.put(k, pdaq);
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ latest.forEach((k, v) -> {
|
|
|
+ pdaqMap.get(k).offer(v);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void refreshQueue() {
|
|
|
+
|
|
|
+ try {
|
|
|
+ while (true) {
|
|
|
+ Map<String, PointData> latest = adapterApi.getLatest(pointCodes);
|
|
|
+ latest.forEach((k, v) -> {
|
|
|
+ pdaqMap.get(k).poll();
|
|
|
+ pdaqMap.get(k).offer(v);
|
|
|
+ });
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void calcRealtimeAverageTarget() {
|
|
|
+
|
|
|
+ String pointCode = null;
|
|
|
+ try {
|
|
|
+ while (true) {
|
|
|
+ if (LocalDateTime.now().getSecond() == 59) {
|
|
|
+
|
|
|
+ RealtimeAverageTarget target;
|
|
|
+ double v;
|
|
|
+ List<RealtimeAverageTarget> targets = new ArrayList<>();
|
|
|
+ for (Map.Entry<String, Map<String, String>> entry : equipUcMap.entrySet()) {
|
|
|
+ target = new RealtimeAverageTarget();
|
|
|
+ target.setTime(DateUtil.beginOfMinute(DateUtil.date()).toTimestamp());
|
|
|
+ //target.setEquipmentId(entry.getKey());
|
|
|
+ target.setTbname(pjzb1fzMap.get(entry.getKey()));
|
|
|
+
|
|
|
+ pointCode = entry.getValue().get(appConfig.getUniformcodeOne().get("wind-speed"));
|
|
|
+ if (pointCode != null) {
|
|
|
+ v = pdaqMap.get(pointCode).stream().mapToDouble(data -> data.getDoubleValue()).average().orElse(0);
|
|
|
+ target.setWindSpeed(NumberUtil.round(v, 2).doubleValue());
|
|
|
+ }
|
|
|
+ pointCode = entry.getValue().get(appConfig.getUniformcodeOne().get("active-power"));
|
|
|
+ if (pointCode != null) {
|
|
|
+ v = pdaqMap.get(pointCode).stream().mapToDouble(data -> data.getDoubleValue()).average().orElse(0);
|
|
|
+ target.setPower(NumberUtil.round(v, 2).doubleValue());
|
|
|
+ }
|
|
|
+ pointCode = entry.getValue().get(appConfig.getUniformcodeOne().get("generator_speed"));
|
|
|
+ if (pointCode != null) {
|
|
|
+ v = pdaqMap.get(pointCode).stream().mapToDouble(data -> data.getDoubleValue()).average().orElse(0);
|
|
|
+ target.setGeneratorSpeed(NumberUtil.round(v, 2).doubleValue());
|
|
|
+ }
|
|
|
+ pointCode = entry.getValue().get(appConfig.getUniformcodeOne().get("impeller_speed"));
|
|
|
+ if (pointCode != null) {
|
|
|
+ v = pdaqMap.get(pointCode).stream().mapToDouble(data -> data.getDoubleValue()).average().orElse(0);
|
|
|
+ target.setImpellerSpeed(NumberUtil.round(v, 2).doubleValue());
|
|
|
+ }
|
|
|
+ pointCode = entry.getValue().get(appConfig.getUniformcodeOne().get("wind_direction"));
|
|
|
+ if (pointCode != null) {
|
|
|
+ v = pdaqMap.get(pointCode).stream().mapToDouble(data -> data.getDoubleValue()).average().orElse(0);
|
|
|
+ target.setWindDirection(NumberUtil.round(v, 2).doubleValue());
|
|
|
+ }
|
|
|
+ targets.add(target);
|
|
|
+ }
|
|
|
+ realtimeAverageTargetService.saveBatch(targets);
|
|
|
+ }
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|