|
@@ -0,0 +1,404 @@
|
|
|
+package com.gyee.wisdom.dao.redistaos;
|
|
|
+
|
|
|
+import com.gyee.wisdom.common.data.timeseries.*;
|
|
|
+import com.gyee.wisdom.dao.redistaos.config.TaosConfig;
|
|
|
+import com.gyee.wisdom.dao.timeseries.ILatestDao;
|
|
|
+import com.gyee.wisdom.dao.timeseries.TaosDao;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import redis.clients.jedis.Jedis;
|
|
|
+import redis.clients.jedis.JedisPool;
|
|
|
+import redis.clients.jedis.Pipeline;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.sql.Connection;
|
|
|
+import java.sql.ResultSet;
|
|
|
+import java.sql.SQLException;
|
|
|
+import java.sql.Statement;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.LinkedList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+@TaosDao
|
|
|
+public class TaosLatestDao implements ILatestDao {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private TaosConfig config;
|
|
|
+ @Autowired
|
|
|
+ private ThreadPoolTaskConfig taskConfig;
|
|
|
+ @Resource
|
|
|
+ private JedisPool jedisPool;
|
|
|
+
|
|
|
+ private static final ThreadLocal<SimpleDateFormat> sdf = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 查询多测点最新数据
|
|
|
+ * @param tsPoints
|
|
|
+ * @return
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Map<String, TsData> getTsDataLatest(List<TsPoint> tsPoints) throws Exception {
|
|
|
+ Map<Integer, List<TsPoint>> hh = new HashMap<>();
|
|
|
+ hh.put(0, tsPoints);
|
|
|
+ Map<String, TsData> redisData = getRedisData(hh);
|
|
|
+ return redisData;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, TsData> getRedisData(Map<Integer, List<TsPoint>> redisPoints) {
|
|
|
+ Map<String, TsData> results = new HashMap<>();
|
|
|
+ Map<String, Boolean> typeMap = new HashMap<>();
|
|
|
+ try (Jedis resource = jedisPool.getResource(); Pipeline pipeline = resource.pipelined()) {
|
|
|
+ for (Map.Entry<Integer, List<TsPoint>> kv : redisPoints.entrySet()) {
|
|
|
+ pipeline.select(kv.getKey());
|
|
|
+ final List<TsPoint> tps = kv.getValue();
|
|
|
+ for (TsPoint tp : tps) {
|
|
|
+ pipeline.hgetAll(tp.getId());
|
|
|
+ typeMap.put(tp.getId(), tp.getTsDataType() == TsDataType.BOOLEAN);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ final List<Object> objects = pipeline.syncAndReturnAll();
|
|
|
+ for (Object o : objects) {
|
|
|
+ if (o instanceof HashMap) {
|
|
|
+ HashMap<String, String> val = (HashMap) o;
|
|
|
+ if (!val.containsKey("value") || !val.containsKey("timestamp") || !val.containsKey("pointid")) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ double value = Double.parseDouble(val.get("value"));
|
|
|
+ long ts = Long.parseLong(val.get("timestamp")) * 1000;
|
|
|
+ String id = val.get("pointid");
|
|
|
+ TsData dt = null;
|
|
|
+ if (typeMap.get(id)) {
|
|
|
+ dt = new BooleanTsData(ts, (short) 0, value != 0);
|
|
|
+ } else {
|
|
|
+ dt = new DoubleTsData(ts, (short) 0, value);
|
|
|
+ }
|
|
|
+ results.put(id, dt);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, TsData> getTsDataLatestOneUniformCode(TsPoint tsPoint) throws Exception {
|
|
|
+ String sql = "SELECT last_row(*) from gdnxxny.`" + tsPoint.getId() + "`";
|
|
|
+ final ResultSet rs = config.getConnect().createStatement().executeQuery(sql);
|
|
|
+ rs.next();
|
|
|
+ Map<String, TsData> result = new HashMap<>();
|
|
|
+ result.put(tsPoint.getUniformCode(), new DoubleTsData(rs.getLong(1), (short) 0, rs.getDouble(2)));
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, TsData> getTsDataLatestBatchOneUniformCode(List<TsPoint> pointList) throws Exception {
|
|
|
+ final StringBuilder sql = new StringBuilder("SELECT last_row(*) from stag_double where tbname in (");
|
|
|
+
|
|
|
+ Map<String, String> pointMap = new HashMap<>();
|
|
|
+ for (TsPoint tsPoint : pointList) {pointMap.put(tsPoint.getId(), tsPoint.getUniformCode());}
|
|
|
+ for (Map.Entry<String, String> entry : pointMap.entrySet()) {
|
|
|
+ sql.append("'");
|
|
|
+ sql.append(entry.getKey());
|
|
|
+ sql.append("',");
|
|
|
+ }
|
|
|
+ sql.append("') GROUP BY tbname");
|
|
|
+ final ResultSet rs = config.getConnect().createStatement().executeQuery(sql.toString());
|
|
|
+ Map<String, TsData> result = new HashMap<>();
|
|
|
+ while (rs.next()) {
|
|
|
+ result.put(pointMap.get(rs.getString(3)), new DoubleTsData(rs.getLong(1), (short) 0, rs.getDouble(2)));
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String joinSql(final int startIndex, final int endIndex, List<String> tagNameList) {
|
|
|
+ StringBuilder sb = new StringBuilder("SELECT last_row(*) FROM stag_double where tbname in ('");
|
|
|
+ sb.append(tagNameList.get(startIndex));
|
|
|
+ sb.append("'");
|
|
|
+ for (int i = startIndex + 1; i < endIndex; i++) {
|
|
|
+ sb.append(",'");
|
|
|
+ sb.append(tagNameList.get(i));
|
|
|
+ sb.append("'");
|
|
|
+ }
|
|
|
+ sb.append(") GROUP BY tbname");
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int writeDoubleLatest(List<TsPointData> list) throws Exception {
|
|
|
+ boolean flag = false;
|
|
|
+ Connection connection = config.getConnect();
|
|
|
+
|
|
|
+ try {
|
|
|
+ Statement st = connection.createStatement();
|
|
|
+
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append("insert into ");
|
|
|
+ for (TsPointData obj : list) {
|
|
|
+ long time = obj.getTsData().getTs();
|
|
|
+ String point = obj.getTagName();
|
|
|
+ double value = obj.getTsData().getDoubleValue().get();
|
|
|
+
|
|
|
+ sb.append("`").append(point).append("` USING stag_double (organization) TAGS(")
|
|
|
+ .append(obj.getTagName().substring(0,obj.getTagName().indexOf(".")))
|
|
|
+ .append(") values (").append(time).append(",").append(value).append(")");
|
|
|
+ }
|
|
|
+ sb.append(";");
|
|
|
+ flag = st.execute(sb.toString());
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ } finally {
|
|
|
+ if (connection != null)
|
|
|
+ connection.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ return flag == true ? list.size() : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int writeStringLatest(List<TsPointData> list) throws Exception {
|
|
|
+ boolean flag = false;
|
|
|
+ Connection connection = config.getConnect();
|
|
|
+
|
|
|
+ try {
|
|
|
+ Statement st = connection.createStatement();
|
|
|
+
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append("insert into ");
|
|
|
+ for (TsPointData obj : list) {
|
|
|
+ long time = obj.getTsData().getTs();
|
|
|
+ String point = obj.getTagName();
|
|
|
+ String value = obj.getTsData().getStringValue().get();
|
|
|
+ sb.append(TaosCovertUtil.coverStationPrefix(point)).append(".");
|
|
|
+ sb.append(point).append(" values (");
|
|
|
+ sb.append(time).append(",'").append(value).append("') ");
|
|
|
+ }
|
|
|
+
|
|
|
+ flag = st.execute(sb.toString());
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ } finally {
|
|
|
+ if (connection != null)
|
|
|
+ connection.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ return flag == true ? list.size() : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int writeBooleanLatest(List<TsPointData> list) throws Exception {
|
|
|
+ boolean flag = false;
|
|
|
+ Connection connection = config.getConnect();
|
|
|
+
|
|
|
+ try {
|
|
|
+ Statement st = connection.createStatement();
|
|
|
+
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append("insert into ");
|
|
|
+ for (TsPointData obj : list) {
|
|
|
+ long time = obj.getTsData().getTs();
|
|
|
+ String point = obj.getTagName();
|
|
|
+ boolean value = obj.getTsData().getBooleanValue().get();
|
|
|
+ sb.append(TaosCovertUtil.coverStationPrefix(point)).append(".");
|
|
|
+ sb.append(point).append(" values (");
|
|
|
+ sb.append(time).append(",").append(value).append(")");
|
|
|
+ }
|
|
|
+
|
|
|
+ flag = st.execute(sb.toString());
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ } finally {
|
|
|
+ if (connection != null)
|
|
|
+ connection.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ return flag == true ? list.size() : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int writeLongLatest(List<TsPointData> list) throws Exception {
|
|
|
+ boolean flag = false;
|
|
|
+ Connection connection = config.getConnect();
|
|
|
+
|
|
|
+ try {
|
|
|
+ Statement st = connection.createStatement();
|
|
|
+
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append("insert into ");
|
|
|
+ for (TsPointData obj : list) {
|
|
|
+ long time = obj.getTsData().getTs();
|
|
|
+ String point = obj.getTagName();
|
|
|
+ long value = obj.getTsData().getLongValue().get();
|
|
|
+ sb.append(TaosCovertUtil.coverStationPrefix(point)).append(".");
|
|
|
+ sb.append(point).append(" values (");
|
|
|
+ sb.append(time).append(",").append(value).append(")");
|
|
|
+ }
|
|
|
+
|
|
|
+ flag = st.execute(sb.toString());
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ } finally {
|
|
|
+ if (connection != null)
|
|
|
+ connection.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ return flag == true ? list.size() : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int writeBlobLatest(List<TsPointData> list) throws Exception {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int writeCoordinateLatest(List<TsPointData> list) throws Exception {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 由于taos数据库中全部是DOUBLE类型,故不需要区分类型
|
|
|
+ *
|
|
|
+ * @param dataList
|
|
|
+ * @return
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public boolean writeLatest(List<TsPointData> dataList) throws Exception {
|
|
|
+ for (TsPointData tsPointData : dataList) {
|
|
|
+ Map<String, String> mp = new HashMap<>(5);
|
|
|
+ mp.put("pointid", tsPointData.getTagName());
|
|
|
+ mp.put("value", String.valueOf(tsPointData.getTsData().getDoubleValue().get()));
|
|
|
+ long ts = tsPointData.getTsData().getTs();
|
|
|
+ mp.put("timestamp", String.valueOf(ts/1000));
|
|
|
+ mp.put("datetime", sdf.get().format(ts));
|
|
|
+ mp.put("status", "0");
|
|
|
+ try {
|
|
|
+ jedisPool.getResource().hmset(tsPointData.getTagName(), mp);
|
|
|
+ }catch (Exception e){
|
|
|
+ e.printStackTrace();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int writeCount = 0;
|
|
|
+
|
|
|
+ Map<TsDataType, List<TsPointData>> pointGroup = dataList.stream().collect(Collectors.groupingBy(TsPointData::findDataType));
|
|
|
+ for (Map.Entry<TsDataType, List<TsPointData>> entry : pointGroup.entrySet()) {
|
|
|
+ List<TsPointData> pointDataList = entry.getValue();
|
|
|
+ int count = 0;
|
|
|
+ if(entry.getKey()==TsDataType.DOUBLE){
|
|
|
+ count = writeDoubleLatest(pointDataList);
|
|
|
+ }else if(entry.getKey()==TsDataType.BOOLEAN){
|
|
|
+ count = writeIntLatest(pointDataList);
|
|
|
+ }
|
|
|
+ writeCount = writeCount + count;
|
|
|
+ }
|
|
|
+ return writeCount > 0 ? true : false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int writeIntLatest(List<TsPointData> list) throws SQLException {
|
|
|
+ boolean flag = false;
|
|
|
+ Connection connection = config.getConnect();
|
|
|
+
|
|
|
+ try {
|
|
|
+ Statement st = connection.createStatement();
|
|
|
+
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append("insert into ");
|
|
|
+ for (TsPointData obj : list) {
|
|
|
+ long time = obj.getTsData().getTs();
|
|
|
+ String point = obj.getTagName();
|
|
|
+ int value = obj.getTsData().getBooleanValue().get()?1:0;
|
|
|
+
|
|
|
+ sb.append("`").append(point).append("` USING stag_double (organization) TAGS(")
|
|
|
+ .append(obj.getTagName().substring(0,obj.getTagName().indexOf(".")))
|
|
|
+ .append(") values (").append(time).append(",").append(value).append(")");
|
|
|
+ }
|
|
|
+ sb.append(";");
|
|
|
+ flag = st.execute(sb.toString());
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ } finally {
|
|
|
+ if (connection != null)
|
|
|
+ connection.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ return flag == true ? list.size() : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, TsData> getDoubleTsDataSnapshots(String... tagNames) throws Exception {
|
|
|
+ Map<String, TsData> tsDataMap = new HashMap<>();
|
|
|
+
|
|
|
+ //存储线程的返回值
|
|
|
+ List<Future<Map<String, TsData>>> results = new LinkedList<>();
|
|
|
+
|
|
|
+ for (String tag : tagNames) {
|
|
|
+ // String point = TaosCovertUtil.coverStationPrefix(tag) + "." + tag.replace(".", "_");
|
|
|
+ String sql = "select last_row(*) from gdnxxny.`" + tag +"`";
|
|
|
+
|
|
|
+ TaskCallable task = new TaskCallable(config.getConnect(), sql, 0, tag, TsDataType.DOUBLE);
|
|
|
+ Future<Map<String, TsData>> submit = taskConfig.getExecutor().submit(task);
|
|
|
+ results.add(submit);
|
|
|
+ }
|
|
|
+
|
|
|
+ //返回结果
|
|
|
+ for (int i = 0; i < results.size(); i++) {
|
|
|
+ tsDataMap.putAll(results.get(i).get());
|
|
|
+ }
|
|
|
+
|
|
|
+ return tsDataMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, TsData> getLongTsDataSnapshots(String... tagNames) throws Exception {
|
|
|
+ Map<String, TsData> tsDataMap = new HashMap<>();
|
|
|
+
|
|
|
+ //存储线程的返回值
|
|
|
+ List<Future<Map<String, TsData>>> results = new LinkedList<>();
|
|
|
+
|
|
|
+ for (String tag : tagNames) {
|
|
|
+ // String point = TaosCovertUtil.coverStationPrefix(tag) + "." + tag.replace(".", "_");
|
|
|
+ String sql = "select last_row(*) from " + tag;
|
|
|
+ TaskCallable task = new TaskCallable(config.getConnect(), sql, 0, tag, TsDataType.LONG);
|
|
|
+ Future<Map<String, TsData>> submit = taskConfig.getExecutor().submit(task);
|
|
|
+ results.add(submit);
|
|
|
+ }
|
|
|
+
|
|
|
+ //返回结果
|
|
|
+ for (int i = 0; i < results.size(); i++) {
|
|
|
+ tsDataMap.putAll(results.get(i).get());
|
|
|
+ }
|
|
|
+
|
|
|
+ return tsDataMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, TsData> getBooleanTsDataSnapshots(String... tagNames) throws Exception {
|
|
|
+ Map<String, TsData> tsDataMap = new HashMap<>();
|
|
|
+
|
|
|
+ //存储线程的返回值
|
|
|
+ List<Future<Map<String, TsData>>> results = new LinkedList<>();
|
|
|
+
|
|
|
+ for (String tag : tagNames) {
|
|
|
+// String point = TaosCovertUtil.coverStationPrefix(tag) + "." + tag.replace(".", "_");
|
|
|
+ String sql = "select last_row(*) from gdnxxny.`" + tag +"`";
|
|
|
+ log.info("1->" + sql);
|
|
|
+ TaskCallable task = new TaskCallable(config.getConnect(), sql, 0, tag, TsDataType.BOOLEAN);
|
|
|
+ Future<Map<String, TsData>> submit = taskConfig.getExecutor().submit(task);
|
|
|
+ results.add(submit);
|
|
|
+ }
|
|
|
+
|
|
|
+ //返回结果
|
|
|
+ for (int i = 0; i < results.size(); i++) {
|
|
|
+ tsDataMap.putAll(results.get(i).get());
|
|
|
+ }
|
|
|
+
|
|
|
+ return tsDataMap;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|