Browse Source

线程池对拒绝任务的处理策略等

xushili 2 years ago
parent
commit
719440babd

+ 1 - 1
build.gradle

@@ -38,7 +38,7 @@ dependencies {
     implementation('org.apache.poi:poi-ooxml:5.2.3')
     implementation 'cn.hutool:hutool-poi:5.8.12'
     implementation 'cn.hutool:hutool-core:5.8.13'
-    implementation("com.taosdata.jdbc:taos-jdbcdriver:3.0.3")
+    implementation("com.taosdata.jdbc:taos-jdbcdriver:3.1.0")
 }
 
 tasks.named('test') {

+ 5 - 1
src/main/java/com/gyee/redis2taos/config/RedisConfig.java

@@ -26,7 +26,11 @@ public class RedisConfig {
             jedisPoolConfig.setMaxIdle(64);
             // 最小空闲连接
             jedisPoolConfig.setMinIdle(0);
-            jedisPool = new JedisPool(jedisPoolConfig, host, port, 5000, password);
+            if("".equals(password)){
+                jedisPool = new JedisPool(jedisPoolConfig, host, port, 5000);
+            }else {
+                jedisPool = new JedisPool(jedisPoolConfig, host, port, 5000,password);
+            }
         }
         return jedisPool.getResource();
     }

+ 1 - 1
src/main/java/com/gyee/redis2taos/config/ThreadPoolTaskConfig.java

@@ -61,7 +61,7 @@ public class ThreadPoolTaskConfig {
 
         // 线程池对拒绝任务的处理策略
         // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
-        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
         // 初始化
         executor.initialize();
         return executor;

+ 18 - 2
src/main/java/com/gyee/redis2taos/controller/TaosController.java

@@ -1,18 +1,24 @@
 package com.gyee.redis2taos.controller;
 
+import com.gyee.redis2taos.service.ReadAndWriteService;
 import com.gyee.redis2taos.service.RedisService;
 import com.gyee.redis2taos.service.TaosService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.annotation.Resource;
+
 @RestController("/taos")
 public class TaosController {
 
-    @Autowired
+    @Resource
     private RedisService redisService;
-    @Autowired
+    @Resource
     private TaosService taosService;
+    @Resource
+    private ReadAndWriteService readAndWriteService;
 
     //TODO 升级用mybatis-plus
 
@@ -67,4 +73,14 @@ public class TaosController {
     private void tableSelect() {
 
     }
+
+    @GetMapping("/2redis/start")
+    private void taos2RedisStart() throws Exception {
+        readAndWriteService.start();
+    }
+
+    @GetMapping("/2redis/stop")
+    private void taos2RedisStop() {
+        readAndWriteService.stop();
+    }
 }

+ 41 - 14
src/main/java/com/gyee/redis2taos/service/ReadAndWriteService.java

@@ -9,6 +9,7 @@ import com.gyee.redis2taos.timeseries.TsPoint;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.DefaultApplicationArguments;
 import org.springframework.core.annotation.Order;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
@@ -33,16 +34,45 @@ public class ReadAndWriteService implements ApplicationRunner {
     @Resource
     private TaosConfig taosConfig;
 
+    private List<List<String>> boolStrll;
+    private List<List<String>> douStrll;
+
+    private boolean active;
+
     @Override
-    public void run(ApplicationArguments args) throws Exception {
+    public void run(ApplicationArguments args) {
 
+        getKeys();
+
+        while (active) {
+            try{
+                for (List<String> strings : boolStrll) {
+                    taskExecutor.execute(new TaskRedis2Taos(strings, redisConfig, taosConfig, TsDataType.BOOLEAN, taskExecutor));
+                }
+                for (List<String> strings : douStrll) {
+                    taskExecutor.execute(new TaskRedis2Taos(strings, redisConfig, taosConfig, TsDataType.DOUBLE, taskExecutor));
+                }
+            }catch (Exception e){
+                e.printStackTrace();
+            }
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            log.info("\n#");
+        }
+    }
+
+    private void getKeys() {
         //long l1 = System.currentTimeMillis();
         Set<String> keys = redisService.getKeys();
+        if(keys.size()==0) return;
         //long l2 = System.currentTimeMillis();
         log.info("读取rediskeys成功!!!," + keys.size() + "个点。");
         //按类型分点
         Map<TsDataType, List<BasicTsPoint>> di = keys.stream().map(ss -> {
-            if (ss.contains("DI")) {
+            if (ss.contains("DI") || ss.contains("di")) {
                 return new BasicTsPoint(ss, TsDataType.BOOLEAN);
             } else {
                 return new BasicTsPoint(ss, TsDataType.DOUBLE);
@@ -54,19 +84,16 @@ public class ReadAndWriteService implements ApplicationRunner {
         List<String> douCollect = di.get(TsDataType.DOUBLE).stream().map(k1 -> k1.getId()).collect(Collectors.toList());
 
         //按数量分组,di点,分为9216个1组,redis单机可以控制在1s内
-        List<List<String>> boolStrll = ListUtil.split(boolCollect, 9216);
+        boolStrll = ListUtil.split(boolCollect, 9216);
         //按数量分组,ai点
-        List<List<String>> douStrll = ListUtil.split(douCollect, 9216);
+        douStrll = ListUtil.split(douCollect, 9216);
+    }
 
-        while (true) {
-            for (List<String> strings : boolStrll) {
-                taskExecutor.execute(new TaskRedis2Taos(strings, redisConfig, taosConfig, TsDataType.BOOLEAN, taskExecutor));
-            }
-            for (List<String> strings : douStrll) {
-                taskExecutor.execute(new TaskRedis2Taos(strings, redisConfig, taosConfig, TsDataType.DOUBLE, taskExecutor));
-            }
-            Thread.sleep(1000);
-            log.info("\n#");
-        }
+    public void stop(){
+        this.active = false;
+    }
+    public void start() throws Exception {
+        this.active = true;
+        run(new DefaultApplicationArguments());
     }
 }

+ 1 - 1
src/main/java/com/gyee/redis2taos/service/RedisService.java

@@ -30,7 +30,7 @@ public class RedisService {
                     map = new LinkedHashMap<>();
                     map.put("pointid", excelRedi);
                     map.put("value", "0");
-                    map.put("timestamp", String.valueOf(date.getTime()));
+                    map.put("timestamp", String.valueOf(date.getTime()/1000));
                     map.put("datetime", sdf.get().format(date));
                     map.put("status", "0");
                     jedis.hmset(excelRedi, map);

+ 31 - 12
src/main/java/com/gyee/redis2taos/service/TaskRedis2Taos.java

@@ -4,6 +4,7 @@ import com.gyee.redis2taos.config.RedisConfig;
 import com.gyee.redis2taos.config.TaosConfig;
 import com.gyee.redis2taos.timeseries.TsDataType;
 import com.gyee.redis2taos.util.ChangedSave;
+import com.taosdata.jdbc.TSDBPreparedStatement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -12,6 +13,8 @@ import redis.clients.jedis.Jedis;
 import redis.clients.jedis.Pipeline;
 
 import java.sql.Connection;
+import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -39,11 +42,13 @@ public class TaskRedis2Taos implements Runnable {
 
         String pointid = null;
 
+        //String sql = "insert into ? values(?,?)";
+
         try (Jedis jedis = redisConfig.getJedis(); Pipeline pipelined = jedis.pipelined();) {
-            //TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
 
             Connection conn = taosConfig.getInstance();
 
+            //TSDBPreparedStatement pstmt = (TSDBPreparedStatement)conn.prepareStatement(sql);
             for (String key : keys) {
                 //pipelined.hscan(key, "0");
                 pipelined.hmget(key, "pointid", "value", "timestamp");
@@ -74,16 +79,30 @@ public class TaskRedis2Taos implements Runnable {
                     pointid = response.get(0);
                     if (StringUtils.isEmpty(pointid)) continue;
 
-                    if(ChangedSave.map.containsKey(pointid)){
-                        if(!Objects.equals(ChangedSave.map.get(pointid),value)){
-                            ChangedSave.map.put(pointid, value);
+
+                    /*ts = Long.valueOf(datetime) * 1000;
+                    bl = Double.valueOf(value)==1;
+                    pstmt.setTableName(pointid.toLowerCase());
+                    ArrayList<Long> objects1 = new ArrayList<>();
+                    objects1.add(ts);
+                    pstmt.setTimestamp(0,objects1);
+                    ArrayList<Boolean> objects2 = new ArrayList<>();
+                    objects2.add(false);
+                    pstmt.setBoolean(1,objects2);
+                    pstmt.columnDataAddBatch();
+                    pstmt.columnDataExecuteBatch();*/
+
+
+                    if (ChangedSave.map.containsKey(pointid)) {
+                        if (!Objects.equals(ChangedSave.map.get(pointid), datetime)) {
+                            ChangedSave.map.put(pointid, datetime);
                             ts = Long.valueOf(datetime) * 1000;
-                            bl = "0".equals(value) ? false : true;
+                            bl = Double.valueOf(value) == 1;
 
                             sb.append(pointid).append(" VALUES (").append(ts).append(",").append(bl).append(") ");
                         }
-                    }else {
-                        ChangedSave.map.put(pointid, value);
+                    } else {
+                        ChangedSave.map.put(pointid, datetime);
                     }
 
                     i++;
@@ -109,16 +128,16 @@ public class TaskRedis2Taos implements Runnable {
                     pointid = response.get(0);
                     if (StringUtils.isEmpty(pointid)) continue;
 
-                    if(ChangedSave.map.containsKey(pointid)){
-                        if(!Objects.equals(ChangedSave.map.get(pointid),value)){
-                            ChangedSave.map.put(pointid, value);
+                    if (ChangedSave.map.containsKey(pointid)) {
+                        if (!Objects.equals(ChangedSave.map.get(pointid), datetime)) {
+                            ChangedSave.map.put(pointid, datetime);
                             ts = Long.valueOf(datetime) * 1000;
                             val = Double.valueOf(value);
 
                             sb.append(pointid).append(" VALUES (").append(ts).append(",").append(val).append(") ");
                         }
-                    }else {
-                        ChangedSave.map.put(pointid, value);
+                    } else {
+                        ChangedSave.map.put(pointid, datetime);
                     }
 
                     i++;

+ 8 - 0
src/test/java/com/gyee/redis2taos/MyTest.java

@@ -0,0 +1,8 @@
+package com.gyee.redis2taos;
+
+public class MyTest {
+    public static void main(String[] args) {
+        double a = 1.0d;
+        System.out.println(a==1);
+    }
+}