|
@@ -0,0 +1,315 @@
|
|
|
+package com.gyee.wisdom.samplekudu.biz;
|
|
|
+
|
|
|
+import com.baomidou.mybatisplus.annotation.TableField;
|
|
|
+import com.gyee.wisdom.samplekudu.domain.data.LcingTagNameData;
|
|
|
+import com.gyee.wisdom.samplekudu.entity.LcingSampleEntity;
|
|
|
+import com.gyee.wisdom.samplekudu.entity.TsPointEntity;
|
|
|
+import com.gyee.wisdom.samplekudu.domain.input.InputLcing;
|
|
|
+import com.gyee.wisdom.samplekudu.kudu.KuduConnection;
|
|
|
+import com.gyee.wisdom.samplekudu.service.TsPointService;
|
|
|
+import com.gyee.wisdom.samplekudu.taos.TaosHistoryDao;
|
|
|
+import com.gyee.wisdom.samplekudu.timeseries.BooleanTsData;
|
|
|
+import com.gyee.wisdom.samplekudu.timeseries.DoubleTsData;
|
|
|
+import com.gyee.wisdom.samplekudu.timeseries.TsData;
|
|
|
+import com.gyee.wisdom.samplekudu.util.CsvExportUtil;
|
|
|
+import com.gyee.wisdom.samplekudu.util.SnowflakeGenerator;
|
|
|
+import com.gyee.wisdom.samplekudu.util.StringUtil;
|
|
|
+import org.apache.commons.lang3.SerializationUtils;
|
|
|
+import org.apache.kudu.ColumnSchema;
|
|
|
+import org.apache.kudu.client.*;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.lang.reflect.Field;
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
+import java.lang.reflect.Method;
|
|
|
+import java.util.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+import org.apache.kudu.client.KuduScanner.KuduScannerBuilder;
|
|
|
+
|
|
|
+
|
|
|
+ * @description:
|
|
|
+ * @auther: Wanghs
|
|
|
+ * @date: 2022-11-22
|
|
|
+ */
|
|
|
+@Component
|
|
|
+public class LcingSampleImportBiz {
|
|
|
+
|
|
|
+ @Value("${kudu_lcing_sample_table:impala::gyee_sample_kudu.lcing_sample}")
|
|
|
+ private String lcingSampleTableName;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private TsPointService tsPointService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private TaosHistoryDao taosHistoryDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private KuduConnection kuduConnection;
|
|
|
+
|
|
|
+
|
|
|
+ public LcingTagNameData initTagNameData(InputLcing lcing) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
|
|
|
+ LcingTagNameData data = new LcingTagNameData();
|
|
|
+ Field[] lcingFieldArrays = lcing.getClass().getDeclaredFields();
|
|
|
+ for (Field field :
|
|
|
+ lcingFieldArrays) {
|
|
|
+ String name = field.getName();
|
|
|
+
|
|
|
+ if (name.contains("UniformCode")) {
|
|
|
+ String getMethodStr_lcing = "get" + name.substring(0, 1).toUpperCase() + name.substring(1);
|
|
|
+ Method getMethod_lcing = lcing.getClass().getMethod(getMethodStr_lcing);
|
|
|
+ String uniformCode = (String) getMethod_lcing.invoke(lcing);
|
|
|
+
|
|
|
+ List<TsPointEntity> pointList = tsPointService.getPointList(lcing.getWindturbineId(), Arrays.asList(uniformCode));
|
|
|
+
|
|
|
+ String setMethodStr_lcing = "set" + name.substring(0, 1).toUpperCase() + name.substring(1);
|
|
|
+ String setMethodStr_tagNameData = setMethodStr_lcing.replace("UniformCode", "TsPointEntity");
|
|
|
+ Method setMethod_tagNameData = data.getClass().getMethod(setMethodStr_tagNameData, Object.class);
|
|
|
+ setMethod_tagNameData.invoke(data, pointList.get(0));
|
|
|
+ System.out.println(field.getName());
|
|
|
+ } else {
|
|
|
+ String getMethodStr_lcing = "get" + name.substring(0, 1).toUpperCase() + name.substring(1);
|
|
|
+ Method getMethod_lcing = lcing.getClass().getMethod(getMethodStr_lcing);
|
|
|
+ Object fileVaue = getMethod_lcing.invoke(lcing);
|
|
|
+
|
|
|
+ String setMethodStr = "set" + name.substring(0, 1).toUpperCase() + name.substring(1);
|
|
|
+ Method setMethod_tagNameData = data.getClass().getMethod(setMethodStr, Object.class);
|
|
|
+ setMethod_tagNameData.invoke(data, fileVaue);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public List<LcingSampleEntity> importLcingSample(InputLcing inputLcing) throws Exception {
|
|
|
+
|
|
|
+ LcingTagNameData lcingTagNameData = initTagNameData(inputLcing);
|
|
|
+
|
|
|
+ Map<String, List<TsData>> taosHistoryDataMap = getTaosHistoryData(lcingTagNameData);
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ Map<String, List<TsData>> stringListMap = rejectData(taosHistoryDataMap);
|
|
|
+
|
|
|
+
|
|
|
+ List<LcingSampleEntity> entityList = initLcingSample(stringListMap, inputLcing.getWindturbineId(), inputLcing.getModel(), inputLcing.getStationId());
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ saveKudu(lcingSampleTableName, entityList, lcingTagNameData.getCorrectSample());
|
|
|
+ return entityList;
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private List<LcingSampleEntity> initLcingSample(Map<String, List<TsData>> map, String windutbineId, String model, String stationId) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
|
|
|
+
|
|
|
+
|
|
|
+ List<LcingSampleEntity> list = new ArrayList<>();
|
|
|
+ Object o = map.keySet().toArray()[0];
|
|
|
+ List<TsData> dataList = map.get(o);
|
|
|
+ for (int i = 0; i < dataList.size(); i++) {
|
|
|
+
|
|
|
+ LcingSampleEntity entity = new LcingSampleEntity();
|
|
|
+
|
|
|
+ entity.setWindturbineId(windutbineId).setModel(model).setStationId(stationId).setId(SnowflakeGenerator.generateId());
|
|
|
+ entity.setTs(dataList.get(i).getTs());
|
|
|
+
|
|
|
+ Field[] entityFieldArrays = entity.getClass().getDeclaredFields();
|
|
|
+
|
|
|
+ for (Field field :
|
|
|
+ entityFieldArrays) {
|
|
|
+ String fieldName = field.getName();
|
|
|
+
|
|
|
+ if (fieldName.contains("Value") && !fieldName.contains("ts")) {
|
|
|
+ String setMethodStr = "set" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);
|
|
|
+ Method setMethod = entity.getClass().getMethod(setMethodStr, double.class);
|
|
|
+ String key = fieldName + "TsPointEntity";
|
|
|
+ if (map.containsKey(key.replace("Value", ""))) {
|
|
|
+ List<TsData> otherDataList = map.get(key.replace("Value", ""));
|
|
|
+ DoubleTsData doubleTsData = (DoubleTsData) otherDataList.get(i);
|
|
|
+ setMethod.invoke(entity, doubleTsData.getDoubleValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ list.add(entity);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ return list;
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private Map<String, List<TsData>> getTaosHistoryData(LcingTagNameData lcingTagNameData) throws Exception {
|
|
|
+ Field[] lcingFieldArrays = lcingTagNameData.getClass().getDeclaredFields();
|
|
|
+
|
|
|
+ Map<String, List<TsData>> resultMap = new HashMap<>();
|
|
|
+
|
|
|
+ for (Field field :
|
|
|
+ lcingFieldArrays) {
|
|
|
+ String fieldName = field.getName();
|
|
|
+ if (fieldName.contains("TsPointEntity")) {
|
|
|
+ String getMethodStr_lcing = "get" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);
|
|
|
+ Method getMethod_lcing = lcingTagNameData.getClass().getMethod(getMethodStr_lcing);
|
|
|
+ TsPointEntity tsPointEntity = (TsPointEntity) getMethod_lcing.invoke(lcingTagNameData);
|
|
|
+
|
|
|
+ List<TsData> tsDataHistory = taosHistoryDao.getTsDataHistory(tsPointEntity.getId(), tsPointEntity.getDataType(), lcingTagNameData.getStartTime(), lcingTagNameData.getEndTime(), 1);
|
|
|
+ resultMap.put(fieldName, tsDataHistory);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return resultMap;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private Map<String, List<TsData>> rejectData(Map<String, List<TsData>> map) {
|
|
|
+
|
|
|
+
|
|
|
+ List<Long> rejectTime = new ArrayList<>();
|
|
|
+ for (Map.Entry<String, List<TsData>> entry :
|
|
|
+ map.entrySet()) {
|
|
|
+ if (entry.getValue().size() > 0) {
|
|
|
+ TsData tsData = entry.getValue().get(0);
|
|
|
+ if (tsData instanceof BooleanTsData) {
|
|
|
+ if (entry.getKey().contains("Status")) {
|
|
|
+ entry.getValue().forEach(s -> {
|
|
|
+ BooleanTsData booleanTsData = (BooleanTsData) s;
|
|
|
+ if (booleanTsData.getBooleanValue() == false) {
|
|
|
+ rejectTime.add(booleanTsData.getTs());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ entry.getValue().forEach(s -> {
|
|
|
+ BooleanTsData booleanTsData = (BooleanTsData) s;
|
|
|
+ if (booleanTsData.getBooleanValue() == true) {
|
|
|
+ rejectTime.add(booleanTsData.getTs());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, List<TsData>> resultMap = new HashMap<>();
|
|
|
+
|
|
|
+ for (Map.Entry<String, List<TsData>> entry :
|
|
|
+ map.entrySet()) {
|
|
|
+ List<TsData> collect = entry.getValue().stream().filter(s -> !rejectTime.contains(s.getTs())).collect(Collectors.toList());
|
|
|
+ resultMap.put(entry.getKey(), collect);
|
|
|
+
|
|
|
+ }
|
|
|
+ return resultMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean saveKudu(String tableName, List<LcingSampleEntity> entityList, Boolean correctSample) throws KuduException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
|
|
|
+
|
|
|
+ KuduClient kuduClient = kuduConnection.getKuduClient();
|
|
|
+ KuduSession kuduSession = kuduClient.newSession();
|
|
|
+ kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
|
|
|
+ KuduTable lcingSampleTable = kuduClient.openTable(tableName);
|
|
|
+
|
|
|
+ for (LcingSampleEntity entity :
|
|
|
+ entityList) {
|
|
|
+
|
|
|
+ Insert insert = lcingSampleTable.newInsert();
|
|
|
+ PartialRow row = insert.getRow();
|
|
|
+ row.addBoolean("correct_sample", correctSample);
|
|
|
+
|
|
|
+ Field[] declaredFields = entity.getClass().getDeclaredFields();
|
|
|
+ for (Field field :
|
|
|
+ declaredFields) {
|
|
|
+ TableField methodAnno = field.getAnnotation(TableField.class);
|
|
|
+
|
|
|
+ String infoFiledName = methodAnno.value();
|
|
|
+
|
|
|
+ String fieldName = field.getName();
|
|
|
+ String getMethodStr_lcing = "get" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);
|
|
|
+ Method getMethod_lcing = entity.getClass().getMethod(getMethodStr_lcing);
|
|
|
+ Object fileVaue = getMethod_lcing.invoke(entity);
|
|
|
+ if (fieldName == "id" || fieldName == "ts")
|
|
|
+ row.addLong(infoFiledName, (Long) fileVaue);
|
|
|
+ else if (fieldName.contains("Value")) {
|
|
|
+ row.addDouble(infoFiledName, (Double) fileVaue);
|
|
|
+ } else {
|
|
|
+ row.addString(infoFiledName, fileVaue.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+ kuduSession.apply(insert);
|
|
|
+ System.out.println("123");
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ kuduSession.flush();
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<LcingSampleEntity> getLcingSample(Boolean correct, Long startTs, Long endTs) throws KuduException {
|
|
|
+
|
|
|
+ KuduClient kuduClient = kuduConnection.getKuduClient();
|
|
|
+ KuduTable kuduTable = kuduClient.openTable(lcingSampleTableName);
|
|
|
+ KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduTable);
|
|
|
+
|
|
|
+
|
|
|
+ List<String> columnList = new ArrayList<>();
|
|
|
+ columnList.add("correct_sample");
|
|
|
+ for (Field field :
|
|
|
+ LcingSampleEntity.class.getDeclaredFields()) {
|
|
|
+
|
|
|
+ TableField methodAnno = field.getAnnotation(TableField.class);
|
|
|
+
|
|
|
+ String infoFiledName = methodAnno.value();
|
|
|
+ columnList.add(infoFiledName);
|
|
|
+ }
|
|
|
+
|
|
|
+ kuduScannerBuilder.setProjectedColumnNames(columnList);
|
|
|
+ if (correct != null) {
|
|
|
+ KuduPredicate predicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn("correct_sample"), KuduPredicate.ComparisonOp.EQUAL, correct);
|
|
|
+ kuduScannerBuilder.addPredicate(predicate);
|
|
|
+ }
|
|
|
+ if (startTs != null) {
|
|
|
+ KuduPredicate predicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn("ts"), KuduPredicate.ComparisonOp.GREATER_EQUAL, startTs);
|
|
|
+ kuduScannerBuilder.addPredicate(predicate);
|
|
|
+ }
|
|
|
+ if (endTs != null) {
|
|
|
+ KuduPredicate predicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn("ts"), KuduPredicate.ComparisonOp.LESS_EQUAL, endTs);
|
|
|
+ kuduScannerBuilder.addPredicate(predicate);
|
|
|
+ }
|
|
|
+ KuduScanner scanner = kuduScannerBuilder.build();
|
|
|
+ while (scanner.hasMoreRows()) {
|
|
|
+ RowResultIterator results = scanner.nextRows();
|
|
|
+
|
|
|
+ int numRows = results.getNumRows();
|
|
|
+ List<Map<String,Object>> lst=new ArrayList<>();
|
|
|
+ while (results.hasNext()) {
|
|
|
+
|
|
|
+ RowResult result = results.next();
|
|
|
+
|
|
|
+ Map<String,Object> mp=new HashMap<>();
|
|
|
+ for (String c:
|
|
|
+ columnList) {
|
|
|
+ Optional<ColumnSchema> first = result.getSchema().getColumns().stream().filter(s -> s.getName().equals(c)).findFirst();
|
|
|
+
|
|
|
+ String str = result.getString(c);
|
|
|
+ mp.put(c,str);
|
|
|
+ }
|
|
|
+ lst.add(mp);
|
|
|
+ }
|
|
|
+ System.out.println("--");
|
|
|
+
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|