Browse Source

数据导出增加多线程

chenminghua 3 years ago
parent
commit
d657b0fc9d

+ 56 - 0
src/main/java/com/gyee/frame/common/conf/ThreadPoolTaskConfig.java

@@ -0,0 +1,56 @@
+package com.gyee.frame.common.conf;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Configuration
+public class ThreadPoolTaskConfig {
+    /**
+     *   默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
+     *	当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
+     *  当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
+     */
+
+    /** 核心线程数(默认线程数) */
+    private static final int corePoolSize = 100;
+    /** 最大线程数 */
+    private static final int maxPoolSize = 500;
+    /** 允许线程空闲时间(单位:默认为秒) */
+    private static final int keepAliveTime = 60;
+    /** 缓冲队列大小 */
+    private static final int queueCapacity = 500;
+    /** 允许等待最长时间 */
+    private static final int awaitTime = 10;
+    /** 线程池名前缀 */
+    private static final String threadNamePrefix = "GYEE-Thread-";
+
+    private ThreadPoolTaskExecutor executor;
+
+    public ThreadPoolTaskExecutor getExecutor(){
+        if (executor == null)
+            executor = taskExecutor();
+
+        return executor;
+    }
+
+
+    private ThreadPoolTaskExecutor taskExecutor(){
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(corePoolSize);
+        executor.setMaxPoolSize(maxPoolSize);
+        executor.setQueueCapacity(queueCapacity);
+        executor.setKeepAliveSeconds(keepAliveTime);
+        executor.setThreadNamePrefix(threadNamePrefix);
+        executor.setAwaitTerminationSeconds(awaitTime);
+
+        // 线程池对拒绝任务的处理策略
+        // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        // 初始化
+        executor.initialize();
+        return executor;
+    }
+
+}

+ 14 - 14
src/main/java/com/gyee/frame/controller/export/GoldenController.java

@@ -49,20 +49,20 @@ public class GoldenController {
      * @param interval 时间间隔
      * @return
      */
-    @GetMapping("/history/snap")
-    public AjaxResult getHistory(
-            @RequestParam(value = "station") String station,
-            @RequestParam(value = "wtId", required = false) String wtId,
-            @RequestParam(value = "templateId") Integer id,
-            @RequestParam(value = "startTs") Long startTs,
-            @RequestParam(value = "endTs") Long endTs,
-            @RequestParam(value = "interval", required = false) Optional<Integer> interval) {
-
-        int val = interval.isPresent() ? interval.get() : 1800;
-
-        List<Object> list = goldenService.getHistoryDataSingle(station, wtId, id, startTs, endTs, val);
-        return AjaxResult.successData(AjaxStatus.success.code, list);
-    }
+//    @GetMapping("/history/snap")
+//    public AjaxResult getHistory(
+//            @RequestParam(value = "station") String station,
+//            @RequestParam(value = "wtId", required = false) String wtId,
+//            @RequestParam(value = "templateId") Integer id,
+//            @RequestParam(value = "startTs") Long startTs,
+//            @RequestParam(value = "endTs") Long endTs,
+//            @RequestParam(value = "interval", required = false) Optional<Integer> interval) {
+//
+//        int val = interval.isPresent() ? interval.get() : 1800;
+//
+//        List<Object> list = goldenService.getHistoryDataSingle(station, wtId, id, startTs, endTs, val);
+//        return AjaxResult.successData(AjaxStatus.success.code, list);
+//    }
 
     /**
      * 通过模板导出所有风机数据

+ 74 - 60
src/main/java/com/gyee/frame/service/export/GoldenService.java

@@ -1,22 +1,21 @@
 package com.gyee.frame.service.export;
 
 import com.gyee.frame.common.conf.ExportConfig;
+import com.gyee.frame.common.conf.ThreadPoolTaskConfig;
 import com.gyee.frame.common.exception.QiNiuException;
 import com.gyee.frame.common.exception.enums.QiNiuErrorEnum;
 import com.gyee.frame.common.feign.RemoteServiceBuilder;
-import com.gyee.frame.model.auto.WindTurbineTestingPointAi2;
 import com.gyee.frame.model.custom.export.TsPointData;
 import com.gyee.frame.service.WindTurbineTestingPointAiService;
 import com.gyee.frame.service.WindturbineService;
 import com.gyee.frame.util.DateUtils;
+import com.gyee.frame.util.task.TaskTemplateCallable;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 
@@ -28,6 +27,8 @@ public class GoldenService {
     @Resource
     private ExportConfig exportConfig;
     @Resource
+    private ThreadPoolTaskConfig taskConfig;
+    @Resource
     private RemoteServiceBuilder remoteService;
     @Resource
     private WindturbineService windturbineService;
@@ -50,14 +51,14 @@ public class GoldenService {
      * @param interval
      * @return
      */
-    public List<Object> getHistoryDataSingle(String station, String wtId, int templateId, long startTs, long endTs, int interval) {
-        switch (templateId) {
-            case 1:
-                return getTemplateHistory1(station, wtId, startTs, endTs, interval);
-            default:
-                throw new QiNiuException(QiNiuErrorEnum.TEMPLATE_NO_SUPPORT);
-        }
-    }
+//    public List<Object> getHistoryDataSingle(String station, String wtId, int templateId, long startTs, long endTs, int interval) {
+//        switch (templateId) {
+//            case 1:
+//                return getTemplateHistory1(station, wtId, startTs, endTs, interval);
+//            default:
+//                throw new QiNiuException(QiNiuErrorEnum.TEMPLATE_NO_SUPPORT);
+//        }
+//    }
 
 
     /**
@@ -100,11 +101,24 @@ public class GoldenService {
             if (winds == null)
                 return map;
 
+            // 异步返回结果
+            List<Future<Map<String, List<Object>>>> list = new LinkedList<>();
             for (String wind : winds) {
-                List<Object> data = getTemplateHistory1(station, wind, startTs, endTs, interval);
-                map.put(wind, data);
+                TaskTemplateCallable task = new TaskTemplateCallable(windService, remoteService.ShardingService(),
+                        exportConfig.getTemplate1(), station, wind, startTs, endTs, interval);
+                Future submit = taskConfig.getExecutor().submit(task);
+                list.add(submit);
             }
-        } catch (QiNiuException e) {
+
+            // 返回结果处理
+            for (Future<Map<String, List<Object>>> future: list){
+                Map<String, List<Object>> wtMap = future.get();
+                for (Map.Entry<String, List<Object>> entry : wtMap.entrySet()) {
+                    map.put(entry.getKey(), entry.getValue());
+                }
+            }
+            
+        } catch (Exception e) {
             log.error(e.getMessage());
             throw new QiNiuException(QiNiuErrorEnum.ERROR_DATA);
         }
@@ -123,50 +137,50 @@ public class GoldenService {
      * @param interval
      * @return
      */
-    private List<Object> getTemplateHistory1(String station, String wtId, long startTs, long endTs, int interval) {
-        List<Object> list = new ArrayList<>();
-        List<String> codes = exportConfig.getTemplate1();
-
-        // 第一列添加时间
-        List<String> intervals = dateInterval(startTs, endTs, interval);
-        list.add(intervals);
-
-        try {
-            for (String code : codes) {
-                List<WindTurbineTestingPointAi2> windPoints = windService.findPointsByUniformCodeAndStation(station, wtId, code);
-                if (windPoints == null || windPoints.size() == 0)
-                    continue;
-
-                WindTurbineTestingPointAi2 windPoint = windPoints.get(0);
-                List<TsPointData> data = remoteService.ShardingService().getHistorySnap(windPoint.getId(), startTs, endTs, interval);
-
-                List<Object> collect = new ArrayList<>();
-                // 故障状态或限电状态都是整型0或1
-                if (code.equals("GZZT") || code.equals("XDZT")) {
-                    if (data.size() == 0) {
-                        for (int i = 0; i < intervals.size(); i++)
-                            collect.add(0);
-                    } else {
-                        collect = data.stream().map(point -> (int) point.getDoubleValue()).collect(Collectors.toList());
-                    }
-                    list.add(collect);
-                } else {
-                    if (data.size() == 0) {
-                        for (int i = 0; i < intervals.size(); i++)
-                            collect.add(0.0);
-                    } else {
-                        collect = data.stream().map(TsPointData::getDoubleValue).collect(Collectors.toList());
-                    }
-                    list.add(collect);
-                }
-            }
-        } catch (QiNiuException e) {
-            log.error(e.getMessage());
-            throw new QiNiuException(QiNiuErrorEnum.ERROR_DATA);
-        }
-
-        return list;
-    }
+//    private List<Object> getTemplateHistory1(String station, String wtId, long startTs, long endTs, int interval) {
+//        List<Object> list = new ArrayList<>();
+//        List<String> codes = exportConfig.getTemplate1();
+//
+//        // 第一列添加时间
+//        List<String> intervals = dateInterval(startTs, endTs, interval);
+//        list.add(intervals);
+//
+//        try {
+//            for (String code : codes) {
+//                List<WindTurbineTestingPointAi2> windPoints = windService.findPointsByUniformCodeAndStation(station, wtId, code);
+//                if (windPoints == null || windPoints.size() == 0)
+//                    continue;
+//
+//                WindTurbineTestingPointAi2 windPoint = windPoints.get(0);
+//                List<TsPointData> data = remoteService.ShardingService().getHistorySnap(windPoint.getId(), startTs, endTs, interval);
+//
+//                List<Object> collect = new ArrayList<>();
+//                // 故障状态或限电状态都是整型0或1
+//                if (code.equals("GZZT") || code.equals("XDZT")) {
+//                    if (data.size() == 0) {
+//                        for (int i = 0; i < intervals.size(); i++)
+//                            collect.add(0);
+//                    } else {
+//                        collect = data.stream().map(point -> (int) point.getDoubleValue()).collect(Collectors.toList());
+//                    }
+//                    list.add(collect);
+//                } else {
+//                    if (data.size() == 0) {
+//                        for (int i = 0; i < intervals.size(); i++)
+//                            collect.add(0.0);
+//                    } else {
+//                        collect = data.stream().map(TsPointData::getDoubleValue).collect(Collectors.toList());
+//                    }
+//                    list.add(collect);
+//                }
+//            }
+//        } catch (QiNiuException e) {
+//            log.error(e.getMessage());
+//            throw new QiNiuException(QiNiuErrorEnum.ERROR_DATA);
+//        }
+//
+//        return list;
+//    }
 
 
     /**

+ 113 - 0
src/main/java/com/gyee/frame/util/task/TaskTemplateCallable.java

@@ -0,0 +1,113 @@
+package com.gyee.frame.util.task;
+
+
+import com.gyee.frame.common.exception.QiNiuException;
+import com.gyee.frame.common.exception.enums.QiNiuErrorEnum;
+import com.gyee.frame.common.feign.IAdapterService;
+import com.gyee.frame.model.auto.WindTurbineTestingPointAi2;
+import com.gyee.frame.model.custom.export.TsPointData;
+import com.gyee.frame.service.WindTurbineTestingPointAiService;
+import com.gyee.frame.util.DateUtils;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class TaskTemplateCallable implements Callable {
+
+    private WindTurbineTestingPointAiService windService;
+    private IAdapterService service;
+    private List<String> codes;
+    private String station;
+    private String wtId;
+    private long startTs;
+    private long endTs;
+    private int interval;
+
+
+    public TaskTemplateCallable(WindTurbineTestingPointAiService windService, IAdapterService service, List<String> codes,
+                                String station, String wtId, long startTs, long endTs, int interval) {
+        this.windService = windService;
+        this.service = service;
+        this.codes = codes;
+        this.station = station;
+        this.wtId = wtId;
+        this.startTs = startTs;
+        this.endTs = endTs;
+        this.interval = interval;
+    }
+
+
+    @Override
+    public Map<String, List<Object>> call()  {
+
+        Map<String, List<Object>> map = new HashMap<>();
+        List<Object> list = new ArrayList<>();
+
+        // 第一列添加时间
+        List<String> intervals = dateInterval(startTs, endTs, interval);
+        list.add(intervals);
+
+        try {
+            for (String code : this.codes) {
+                List<WindTurbineTestingPointAi2> windPoints = this.windService.findPointsByUniformCodeAndStation(station, wtId, code);
+                if (windPoints == null || windPoints.size() == 0)
+                    continue;
+
+                WindTurbineTestingPointAi2 windPoint = windPoints.get(0);
+                List<TsPointData> data = this.service.getHistorySnap(windPoint.getId(), startTs, endTs, interval);
+
+                List<Object> collect = new ArrayList<>();
+                // 故障状态或限电状态都是整型0或1
+                if (code.equals("GZZT") || code.equals("XDZT")) {
+                    if (data.size() == 0) {
+                        for (int i = 0; i < intervals.size(); i++)
+                            collect.add(0);
+                    } else {
+                        collect = data.stream().map(point -> (int) point.getDoubleValue()).collect(Collectors.toList());
+                    }
+                    list.add(collect);
+                } else {
+                    if (data.size() == 0) {
+                        for (int i = 0; i < intervals.size(); i++)
+                            collect.add(0.0);
+                    } else {
+                        collect = data.stream().map(TsPointData::getDoubleValue).collect(Collectors.toList());
+                    }
+                    list.add(collect);
+                }
+            }
+
+            map.put(this.wtId, list);
+
+        } catch (QiNiuException e) {
+            log.error(e.getMessage());
+            throw new QiNiuException(QiNiuErrorEnum.ERROR_DATA);
+        }
+
+        return map;
+    }
+
+
+    /**
+     * 分割时间
+     *
+     * @param st
+     * @param et
+     * @param interval
+     * @return
+     */
+    private List<String> dateInterval(long st, long et, int interval) {
+        List<String> list = new ArrayList<>();
+        int val = interval * 1000;
+        while (st < et) {
+            list.add(DateUtils.format(st, DateUtils.DATE_TIME_PATTERN));
+            st += val;
+        }
+
+        return list;
+    }
+
+}