|
@@ -3,6 +3,7 @@ package com.gyee.redis2taos.service;
|
|
|
import com.gyee.redis2taos.config.RedisConfig;
|
|
|
import com.gyee.redis2taos.config.TaosConfig;
|
|
|
import com.gyee.redis2taos.timeseries.TsDataType;
|
|
|
+import com.gyee.redis2taos.util.ChangedSave;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
@@ -12,6 +13,7 @@ import redis.clients.jedis.Pipeline;
|
|
|
|
|
|
import java.sql.Connection;
|
|
|
import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
|
|
|
public class TaskRedis2Taos implements Runnable {
|
|
|
private final List<String> keys;
|
|
@@ -72,10 +74,17 @@ public class TaskRedis2Taos implements Runnable {
|
|
|
pointid = response.get(0);
|
|
|
if (StringUtils.isEmpty(pointid)) continue;
|
|
|
|
|
|
- ts = Long.valueOf(datetime) * 1000;
|
|
|
- bl = "0".equals(value) ? false : true;
|
|
|
+ if(ChangedSave.map.containsKey(pointid)){
|
|
|
+ if(!Objects.equals(ChangedSave.map.get(pointid),value)){
|
|
|
+ ChangedSave.map.put(pointid, value);
|
|
|
+ ts = Long.valueOf(datetime) * 1000;
|
|
|
+ bl = "0".equals(value) ? false : true;
|
|
|
|
|
|
- sb.append(pointid).append(" VALUES (").append(ts).append(",").append(bl).append(") ");
|
|
|
+ sb.append(pointid).append(" VALUES (").append(ts).append(",").append(bl).append(") ");
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ ChangedSave.map.put(pointid, value);
|
|
|
+ }
|
|
|
|
|
|
i++;
|
|
|
//分为1000个1组,taos单机可以控制在1s内
|
|
@@ -100,10 +109,17 @@ public class TaskRedis2Taos implements Runnable {
|
|
|
pointid = response.get(0);
|
|
|
if (StringUtils.isEmpty(pointid)) continue;
|
|
|
|
|
|
- ts = Long.valueOf(datetime) * 1000;
|
|
|
- val = Double.valueOf(value);
|
|
|
+ if(ChangedSave.map.containsKey(pointid)){
|
|
|
+ if(!Objects.equals(ChangedSave.map.get(pointid),value)){
|
|
|
+ ChangedSave.map.put(pointid, value);
|
|
|
+ ts = Long.valueOf(datetime) * 1000;
|
|
|
+ val = Double.valueOf(value);
|
|
|
|
|
|
- sb.append(pointid).append(" VALUES (").append(ts).append(",").append(val).append(") ");
|
|
|
+ sb.append(pointid).append(" VALUES (").append(ts).append(",").append(val).append(") ");
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ ChangedSave.map.put(pointid, value);
|
|
|
+ }
|
|
|
|
|
|
i++;
|
|
|
if (i % 1000 == 0 || i == objects.size()) {
|