+package com.gyee.wisdom.dao.taoscz;
+import com.gyee.wisdom.common.data.timeseries.*;
+import com.gyee.wisdom.common.exception.WisdomException;
+import com.gyee.wisdom.dao.timeseries.IHistoryDao;
+import com.gyee.wisdom.dao.timeseries.TaosDao;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+public class TaosHistoryDao implements IHistoryDao {
+ private long day_time = 86400000L;
+ private long month_time = 2592000000L;
+ private long half_year_time = 15552000000L * 4;
+ private long year_time = 31536000000L;
+ @Autowired
+ private TaosConfiguration tdUtil;
+ @Autowired
+ private ThreadPoolTaskConfig taskConfig;
+ /**
+ * 查询单个测点历史数据
+ * @param tsQuery
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public List<TsData> getTsDataHistory(TsQuery tsQuery) throws Exception {
+ if (tsQuery.getTsPoint().getTsDataType() == TsDataType.DOUBLE) {
+ return getDoubleTsDataHistory(tsQuery);
+ } else //(tsQuery.getTsPoint().getTsDataType() == TsDataType.BOOLEAN) {
+ return getBooleanTsDataHistory(tsQuery);
+ /* } else if (tsQuery.getTsPoint().getTsDataType() == TsDataType.STRING) {
+ return getStringTsDataHistory(tsQuery);
+ } else if (tsQuery.getTsPoint().getTsDataType() == TsDataType.LONG) {
+ return getLongTsDataHistory(tsQuery);
+ }*/
+ }
+ /**
+ * 查询单测点min、max、avg值
+ * @param tsQuery
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public List<DoubleStatData> getDoubleStatDataHistory(TsQuery tsQuery) throws Exception {
+ if (tsQuery.getTsPoint().getTsDataType() != TsDataType.DOUBLE) {
+ throw new WisdomException("无效的数据类型:" + tsQuery.getTsPoint().getTsDataType());
+ }
+ List<DoubleStatData> result = new ArrayList<>();
+ Connection connect = tdUtil.getConnect();
+ try {
+ Statement st = connect.createStatement();
+ String point = tsQuery.getTsPoint().getId();
+ StringBuilder sql = new StringBuilder();
+ sql.append("select avg(value),max(value),min(value) from ");
+ // sb.append(TaosCovertUtil.coverStationPrefix(point) + "." + point.replace(".", "_"));
+ sql.append("gdnxxny.`");
+ sql.append(point);
+ sql.append("` where ts between").append(tsQuery.getStartTs());
+ sql.append(" and ").append(tsQuery.getEndTs());
+ sql.append(" interval(").append(tsQuery.getInterval()).append("s)");
+// sb.append(" where point_time>='").append(DateFormatUtils.format(tsQuery.getStartTs(), "yyyy-MM-dd HH:mm:ss:SSS"));
+// sb.append("' and point_time<='").append(DateFormatUtils.format(tsQuery.getEndTs(), "yyyy-MM-dd HH:mm:ss:SSS"));
+// sb.append("' interval(").append(tsQuery.getInterval()).append("s)");
+ // select avg(value),max(value),min(value) from `NSSDJL.NX_GD_NSSF_DD_P1_L1_001_FXWG004` where ts>=1654012800000 and ts<=1656604800000 interval(86400s)
+ log.info("4->" + sql);
+ ResultSet rs = st.executeQuery(sql.toString());
+ while (rs.next()) {
+ DoubleTsData avgData = new DoubleTsData(rs.getLong(1), (short) 1, rs.getDouble(2));
+ DoubleTsData maxData = new DoubleTsData(rs.getLong(1), (short) 1, rs.getDouble(3));
+ DoubleTsData minData = new DoubleTsData(rs.getLong(1), (short) 1, rs.getDouble(4));
+ DoubleStatData statData = new DoubleStatData(avgData, maxData, minData);
+ result.add(statData);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ } finally {
+ if (connect != null)
+ connect.close();
+ }
+ return result;
+ }
+ @Override
+ public boolean writeHistoryValue(List<TsPointData> dataList) throws Exception {
+ boolean flag = false;
+ Connection connect = tdUtil.getConnect();
+ try {
+ Statement st = connect.createStatement();
+ StringBuilder sb = new StringBuilder();
+ sb.append("insert into ");
+ for (TsPointData obj : dataList) {
+ long time = obj.getTsData().getTs();
+ String point = obj.getTagName();
+ double value = obj.getTsData().getDoubleValue().get();
+ sb.append(TaosCovertUtil.coverStationPrefix(point)).append(".");
+ sb.append(point).append(" values (");
+ sb.append(time).append(",").append(value).append(")");
+ }
+ flag = st.execute(sb.toString());
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ } finally {
+ if (connect != null)
+ connect.close();
+ }
+ return flag;
+ }
+ /**
+ * 查询多测点某个时间点的数值
+ * @param tsPoints 标签点
+ * @param ts 时间点(秒级)
+ * 上一个最近的数据
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public Map<String, TsData> getHistorySection(List<TsPoint> tsPoints, Long ts) throws Exception {
+ Map<String, TsData> result = new HashMap<>();
+ Map<TsDataType, List<TsPoint>> pointGroup = tsPoints.stream().collect(Collectors.groupingBy(TsPoint::getTsDataType));
+ //存储线程的返回值
+ List<Future<Map<String, TsData>>> results = new LinkedList<>();
+ for (Map.Entry<TsDataType, List<TsPoint>> entry : pointGroup.entrySet()) {
+ String[] tagNames = entry.getValue().stream().map(TsPoint::getId).toArray(String[]::new);
+ if (entry.getKey() == TsDataType.DOUBLE)
+ for (String tag : tagNames) {
+ String sql = getSectionSql(tag, ts);
+ log.info("5->" + sql);
+ TaskCallable task = new TaskCallable(tdUtil.getConnect(), sql, ts, tag, TsDataType.DOUBLE);
+ Future<Map<String, TsData>> submit = taskConfig.getExecutor().submit(task);
+ results.add(submit);
+ }
+ if (entry.getKey() == TsDataType.BOOLEAN) {
+ for (String tag : tagNames) {
+ String sql = getSectionSql(tag, ts);
+ TaskCallable task = new TaskCallable(tdUtil.getConnect(), sql, ts, tag, TsDataType.BOOLEAN);
+ Future<Map<String, TsData>> submit = taskConfig.getExecutor().submit(task);
+ results.add(submit);
+ }
+ }
+ }
+ //返回结果
+ for (int i = 0; i < results.size(); i++) {
+ result.putAll(results.get(i).get());
+ }
+// for (Map.Entry<TsDataType, List<TsPoint>> entry : pointGroup.entrySet()) {
+// String[] tagNames = tsPoints.stream().map(TsPoint::getId).toArray(String[]::new);
+// for (String tag : tagNames) {
+// TsData tsData = null;
+// String point = TaosCovertUtil.coverStationPrefix(tag) + "." + tag.replace(".", "_");
+// String sql = "select last_row(*) from " + point + " where point_time>='"
+// + DateFormatUtils.format(ts - year_time, "yyyy-MM-dd HH:mm:ss:SSS") + "'"
+// + " and point_time <='" + DateFormatUtils.format(ts, "yyyy-MM-dd HH:mm:ss:SSS") + "'";
+// ResultSet rs = config.getConnect().createStatement().executeQuery(sql);
+// while (rs.next()) {
+// if (entry.getKey() == TsDataType.DOUBLE)
+// tsData = new DoubleTsData(ts, (short) 0, rs.getDouble(2));
+// if (entry.getKey() == TsDataType.BOOLEAN)
+// tsData = new BooleanTsData(ts, (short) 0, rs.getBoolean(2));
+// result.put(tag, tsData);
+// }
+// }
+// }
+ return result;
+ }
+ public List<TsData> getDoubleTsDataHistory(TsQuery tsQuery) throws Exception {
+ List<TsData> tsDataList = new ArrayList<>();
+ try {
+ //存储线程的返回值
+ List<Future<List<TsData>>> results = new LinkedList<>();
+ String sql = getHistory(tsQuery);
+ log.info("2->" + sql);
+ TaskCallableList task = new TaskCallableList(tdUtil.getConnect(), sql, TsDataType.DOUBLE,tsQuery.getStartTs());
+ Future<List<TsData>> submit = taskConfig.getExecutor().submit(task);
+ results.add(submit);
+ //返回结果
+ tsDataList = results.get(0).get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return tsDataList;
+ }
+ //DoubleTsData{ts='', status='0', value='442.01'}
+ public List<TsData> getLongTsDataHistory(TsQuery tsQuery) throws Exception {
+ List<TsData> tsDataList = new ArrayList<>();
+// Connection connect = config.getConnect();
+ try {
+ //存储线程的返回值
+ List<Future<List<TsData>>> results = new LinkedList<>();
+ String sql = getHistory(tsQuery);
+ TaskCallableList task = new TaskCallableList(tdUtil.getConnect(), sql, TsDataType.DOUBLE,tsQuery.getStartTs());
+ Future<List<TsData>> submit = taskConfig.getExecutor().submit(task);
+ results.add(submit);
+ //返回结果
+ tsDataList = results.get(0).get();
+// Statement st = connect.createStatement();
+// String sql = getHistory(tsQuery);
+// ResultSet rs = st.executeQuery(sql);
+// while (rs.next()) {
+// tsDataList.add(new LongTsData(rs.getLong(1), (short) 0, rs.getLong(2)));
+// }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ finally {
+// if (connect != null)
+// connect.close();
+ }
+ return tsDataList;
+ }
+ public List<TsData> getBooleanTsDataHistory(TsQuery tsQuery) throws Exception {
+ List<TsData> tsDataList = new ArrayList<>();
+// Connection connect = config.getConnect();
+ try {
+ //存储线程的返回值
+ List<Future<List<TsData>>> results = new LinkedList<>();
+ String sql = getHistory(tsQuery);
+ log.info("2->" + sql);
+ TaskCallableList task = new TaskCallableList(tdUtil.getConnect(), sql, TsDataType.DOUBLE,tsQuery.getStartTs());
+ Future<List<TsData>> submit = taskConfig.getExecutor().submit(task);
+ results.add(submit);
+ //返回结果
+ tsDataList = results.get(0).get();
+// Statement st = connect.createStatement();
+// String sql = getHistory(tsQuery);
+// ResultSet rs = st.executeQuery(sql);
+// while (rs.next()) {
+// tsDataList.add(new BooleanTsData(rs.getLong(1), (short) 0, rs.getBoolean(2)));
+// }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ finally {
+// if (connect != null)
+// connect.close();
+ }
+ return tsDataList;
+ }
+ public List<TsData> getStringTsDataHistory(TsQuery tsQuery) throws Exception {
+ List<TsData> tsDataList = new ArrayList<>();
+// Connection connect = config.getConnect();
+ try {
+ //存储线程的返回值
+ List<Future<List<TsData>>> results = new LinkedList<>();
+ String sql = getHistory(tsQuery);
+ TaskCallableList task = new TaskCallableList(tdUtil.getConnect(), sql, TsDataType.DOUBLE,tsQuery.getStartTs());
+ Future<List<TsData>> submit = taskConfig.getExecutor().submit(task);
+ results.add(submit);
+ //返回结果
+ tsDataList = results.get(0).get();
+// Statement st = connect.createStatement();
+// String sql = getHistory(tsQuery);
+// ResultSet rs = st.executeQuery(sql);
+// while (rs.next()) {
+// tsDataList.add(new StringTsData(rs.getLong(1), (short) 0, rs.getString(2)));
+// }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ finally {
+// if (connect != null)
+// connect.close();
+ }
+ return tsDataList;
+ }
+ /**
+ * 拼接sql
+ *
+ * @param tsQuery
+ * @return
+ * @throws WisdomException
+ */
+ private String getHistory(TsQuery tsQuery) throws WisdomException {
+ String point = tsQuery.getTsPoint().getId();
+ StringBuilder sb = new StringBuilder();
+ if (tsQuery.getInterpolation() == Interpolation.RAW) {
+ sb.append("select * from gdnxxny.`");
+ sb.append(point);
+ sb.append("` where ts between ").append(tsQuery.getStartTs());
+ sb.append(" and ").append(tsQuery.getEndTs());
+ } else if (tsQuery.getInterpolation() == Interpolation.SNAP) {
+ if (tsQuery.getDateArray() != null && tsQuery.getDateArray().length > 0) {
+ sb.append("select interp(value) from gdnxxny.`");
+ sb.append(point);
+ sb.append("` ts >= ").append(tsQuery.getStartTs());
+ sb.append(" and ts<= ").append(tsQuery.getEndTs());
+ sb.append(" every(").append(tsQuery.getInterval()).append("s) fill(prev)");
+ } else {
+ log.warn("无效的查询条件!");
+ throw new WisdomException("无效的查询条件!");
+ }
+ } else {
+ log.warn("Taos不支持历史数据插值方法:" + tsQuery.getInterpolation());
+ throw new WisdomException("Taos不支持历史数据插值方法:" + tsQuery.getInterpolation());
+ }
+ return sb.toString();
+ }
+ /**
+ * 拼接sql
+ *
+ * @param tag
+ * @param ts
+ * @return
+ */
+ private String getSectionSql(String tag, long ts) {
+ // String point = TaosCovertUtil.coverStationPrefix(tag) + "." + tag.replace(".", "_");
+ StringBuilder sb = new StringBuilder();
+ // TODO: select interp(*) from xxx where _c0 = xxxx:xx:xx fill(prev)
+ // sb.append("select last_row(*) from ").append(point);
+// sb.append(" where point_time <="+ts);
+ sb.append("select last_row(*) from gdnxxny.`").append(tag);
+ sb.append("` where ts>=");
+ sb.append(ts);
+// sb.append(" fill(prev)");
+// sb.append("select last_row(*) from ").append(point);
+// sb.append(" where point_time>='");
+// sb.append(ts - half_year_time);
+// sb.append("'");
+// sb.append(" and point_time <='" + DateFormatUtils.format(ts, "yyyy-MM-dd HH:mm:ss:SSS") + "'");
+ return sb.toString();
+ }
+ /**
+ * 将blob转化为byte[]
+ *
+ * @param blob
+ * @return
+ */
+ private byte[] getBytes(Blob blob) {
+ try {
+ InputStream ins = blob.getBinaryStream();
+ byte[] b = new byte[1024];
+ int num = 0;
+ String res = "";
+ while ((num = ins.read(b)) != -1) {
+ res += new String(b, 0, num);
+ }
+ return res.getBytes();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }