TaosLatestDao.java 10 KB


  1. package com.gyee.wisdom.adapter.dao;
  2. import com.gyee.wisdom.adapter.common.config.TaosConfiguration;
  3. import com.gyee.wisdom.adapter.common.config.ThreadPoolTaskConfig;
  4. import com.gyee.wisdom.adapter.common.exception.WisdomException;
  5. import com.gyee.wisdom.adapter.common.util.TaosCovertUtil;
  6. import com.gyee.wisdom.adapter.model.adapter.*;
  7. import com.gyee.wisdom.adapter.timeseries.dao.ILatestDao;
  8. import com.gyee.wisdom.adapter.timeseries.TaosDao;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Component;
  12. import java.sql.Connection;
  13. import java.sql.Statement;
  14. import java.util.HashMap;
  15. import java.util.LinkedList;
  16. import java.util.List;
  17. import java.util.Map;
  18. import java.util.concurrent.Future;
  19. import java.util.stream.Collectors;
  20. @Slf4j
  21. @Component
  22. @TaosDao
  23. public class TaosLatestDao implements ILatestDao {
  24. @Autowired
  25. private TaosConfiguration config;
  26. @Autowired
  27. private ThreadPoolTaskConfig taskConfig;
  28. /**
  29. * 查询多测点最新数据
  30. * @param tsPoints
  31. * @return
  32. * @throws Exception
  33. */
  34. @Override
  35. public Map<String, TsData> getTsDataLatest(List<TsPoint> tsPoints) throws Exception {
  36. Map<String, TsData> result = new HashMap<>();
  37. Map<TsDataType, List<TsPoint>> pointGroup = tsPoints.stream().collect(Collectors.groupingBy(TsPoint::getTsDataType));
  38. for (Map.Entry<TsDataType, List<TsPoint>> entry : pointGroup.entrySet()) {
  39. String[] tagNames = entry.getValue().stream().map(TsPoint::getId).toArray(String[]::new);
  40. if (entry.getKey() == TsDataType.DOUBLE) {
  41. result.putAll(getDoubleTsDataSnapshots(tagNames));
  42. } else if (entry.getKey() == TsDataType.LONG) {
  43. result.putAll(getLongTsDataSnapshots(tagNames));
  44. } else if (entry.getKey() == TsDataType.BOOLEAN) {
  45. result.putAll(getBooleanTsDataSnapshots(tagNames));
  46. } else {
  47. throw new WisdomException("Taos不支持数据类型:" + entry.getKey());
  48. }
  49. }
  50. return result;
  51. }
  52. @Override
  53. public int writeDoubleLatest(List<TsPointData> list) throws Exception {
  54. boolean flag = false;
  55. Connection connection = config.getConnect();
  56. try {
  57. Statement st = connection.createStatement();
  58. StringBuilder sb = new StringBuilder();
  59. sb.append("insert into ");
  60. for (TsPointData obj : list) {
  61. long time = obj.getTsData().getTs();
  62. String point = obj.getTagName();
  63. double value = obj.getTsData().getDoubleValue().get();
  64. sb.append(TaosCovertUtil.coverStationPrefix(point)).append(".");
  65. sb.append(point).append(" values (");
  66. sb.append(time).append(",").append(value).append(")");
  67. }
  68. flag = st.execute(sb.toString());
  69. } catch (Exception e) {
  70. } finally {
  71. if (connection != null)
  72. connection.close();
  73. }
  74. return flag == true ? list.size() : 0;
  75. }
  76. @Override
  77. public int writeStringLatest(List<TsPointData> list) throws Exception {
  78. boolean flag = false;
  79. Connection connection = config.getConnect();
  80. try {
  81. Statement st = connection.createStatement();
  82. StringBuilder sb = new StringBuilder();
  83. sb.append("insert into ");
  84. for (TsPointData obj : list) {
  85. long time = obj.getTsData().getTs();
  86. String point = obj.getTagName();
  87. String value = obj.getTsData().getStringValue().get();
  88. sb.append(TaosCovertUtil.coverStationPrefix(point)).append(".");
  89. sb.append(point).append(" values (");
  90. sb.append(time).append(",'").append(value).append("') ");
  91. }
  92. flag = st.execute(sb.toString());
  93. } catch (Exception e) {
  94. } finally {
  95. if (connection != null)
  96. connection.close();
  97. }
  98. return flag == true ? list.size() : 0;
  99. }
  100. @Override
  101. public int writeBooleanLatest(List<TsPointData> list) throws Exception {
  102. boolean flag = false;
  103. Connection connection = config.getConnect();
  104. try {
  105. Statement st = connection.createStatement();
  106. StringBuilder sb = new StringBuilder();
  107. sb.append("insert into ");
  108. for (TsPointData obj : list) {
  109. long time = obj.getTsData().getTs();
  110. String point = obj.getTagName();
  111. boolean value = obj.getTsData().getBooleanValue().get();
  112. sb.append(TaosCovertUtil.coverStationPrefix(point)).append(".");
  113. sb.append(point).append(" values (");
  114. sb.append(time).append(",").append(value).append(")");
  115. }
  116. flag = st.execute(sb.toString());
  117. } catch (Exception e) {
  118. } finally {
  119. if (connection != null)
  120. connection.close();
  121. }
  122. return flag == true ? list.size() : 0;
  123. }
  124. @Override
  125. public int writeLongLatest(List<TsPointData> list) throws Exception {
  126. boolean flag = false;
  127. Connection connection = config.getConnect();
  128. try {
  129. Statement st = connection.createStatement();
  130. StringBuilder sb = new StringBuilder();
  131. sb.append("insert into ");
  132. for (TsPointData obj : list) {
  133. long time = obj.getTsData().getTs();
  134. String point = obj.getTagName();
  135. long value = obj.getTsData().getLongValue().get();
  136. sb.append(TaosCovertUtil.coverStationPrefix(point)).append(".");
  137. sb.append(point).append(" values (");
  138. sb.append(time).append(",").append(value).append(")");
  139. }
  140. flag = st.execute(sb.toString());
  141. } catch (Exception e) {
  142. } finally {
  143. if (connection != null)
  144. connection.close();
  145. }
  146. return flag == true ? list.size() : 0;
  147. }
  148. /**
  149. * 由于taos数据库中全部是DOUBLE类型,故不需要区分类型
  150. *
  151. * @param dataList
  152. * @return
  153. * @throws Exception
  154. */
  155. @Override
  156. public boolean writeLatest(List<TsPointData> dataList) throws Exception {
  157. int writeCount = 0;
  158. Map<TsDataType, List<TsPointData>> pointGroup = dataList.stream().collect(Collectors.groupingBy(TsPointData::findDataType));
  159. for (Map.Entry<TsDataType, List<TsPointData>> entry : pointGroup.entrySet()) {
  160. List<TsPointData> pointDataList = entry.getValue();
  161. // if (entry.getKey() == TsDataType.DOUBLE) {
  162. int count = writeDoubleLatest(pointDataList);
  163. writeCount = writeCount + count;
  164. // } else if (entry.getKey() == TsDataType.BOOLEAN) {
  165. // int count = writeBooleanLatest(pointDataList);
  166. // writeCount = writeCount + count;
  167. // } else if (entry.getKey() == TsDataType.STRING) {
  168. // int count = writeStringLatest(pointDataList);
  169. // writeCount = writeCount + count;
  170. // } else if (entry.getKey() == TsDataType.BLOB) {
  171. // int count = writeBlobLatest(pointDataList);
  172. // writeCount = writeCount + count;
  173. // } else if (entry.getKey() == TsDataType.COORDINATE) {
  174. // int count = writeCoordinateLatest(pointDataList);
  175. // writeCount = writeCount + count;
  176. // } else if (entry.getKey() == TsDataType.LONG) {
  177. // int count = writeLongLatest(pointDataList);
  178. // writeCount = writeCount + count;
  179. // }
  180. }
  181. return writeCount > 0 ? true : false;
  182. }
  183. public Map<String, TsData> getDoubleTsDataSnapshots(String... tagNames) throws Exception {
  184. Map<String, TsData> tsDataMap = new HashMap<>();
  185. //存储线程的返回值
  186. List<Future<Map<String, TsData>>> results = new LinkedList<>();
  187. for (String tag : tagNames) {
  188. String point = TaosCovertUtil.coverStationPrefix(tag) + "." + tag.replace(".", "_");
  189. String sql = "select last_row(*) from " + point;
  190. TaskCallable task = new TaskCallable(config.getConnect(), sql, 0, tag, TsDataType.DOUBLE);
  191. Future<Map<String, TsData>> submit = taskConfig.getExecutor().submit(task);
  192. results.add(submit);
  193. }
  194. //返回结果
  195. for (int i = 0; i < results.size(); i++) {
  196. tsDataMap.putAll(results.get(i).get());
  197. }
  198. return tsDataMap;
  199. }
  200. public Map<String, TsData> getLongTsDataSnapshots(String... tagNames) throws Exception {
  201. Map<String, TsData> tsDataMap = new HashMap<>();
  202. //存储线程的返回值
  203. List<Future<Map<String, TsData>>> results = new LinkedList<>();
  204. for (String tag : tagNames) {
  205. String point = TaosCovertUtil.coverStationPrefix(tag) + "." + tag.replace(".", "_");
  206. String sql = "select last_row(*) from " + point;
  207. TaskCallable task = new TaskCallable(config.getConnect(), sql, 0, tag, TsDataType.LONG);
  208. Future<Map<String, TsData>> submit = taskConfig.getExecutor().submit(task);
  209. results.add(submit);
  210. }
  211. //返回结果
  212. for (int i = 0; i < results.size(); i++) {
  213. tsDataMap.putAll(results.get(i).get());
  214. }
  215. return tsDataMap;
  216. }
  217. public Map<String, TsData> getBooleanTsDataSnapshots(String... tagNames) throws Exception {
  218. Map<String, TsData> tsDataMap = new HashMap<>();
  219. //存储线程的返回值
  220. List<Future<Map<String, TsData>>> results = new LinkedList<>();
  221. for (String tag : tagNames) {
  222. String point = TaosCovertUtil.coverStationPrefix(tag) + "." + tag.replace(".", "_");
  223. String sql = "select last_row(*) from " + point;
  224. TaskCallable task = new TaskCallable(config.getConnect(), sql, 0, tag, TsDataType.BOOLEAN);
  225. Future<Map<String, TsData>> submit = taskConfig.getExecutor().submit(task);
  226. results.add(submit);
  227. }
  228. //返回结果
  229. for (int i = 0; i < results.size(); i++) {
  230. tsDataMap.putAll(results.get(i).get());
  231. }
  232. return tsDataMap;
  233. }
  234. }