|
@@ -0,0 +1,125 @@
|
|
|
+package com.gyee.impala.service.impl.master;
|
|
|
+
|
|
|
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
+import com.gyee.impala.common.base.ExcludeQueryWrapper;
|
|
|
+import com.gyee.impala.common.config.datasource.KuduDataSourceConfig;
|
|
|
+import com.gyee.impala.common.exception.CustomException;
|
|
|
+import com.gyee.impala.common.result.ResultCode;
|
|
|
+import com.gyee.impala.common.spring.InitialRunner;
|
|
|
+import com.gyee.impala.common.util.SnowFlakeUtil;
|
|
|
+import com.gyee.impala.mapper.master.KnowinspectrecordMapper;
|
|
|
+import com.gyee.impala.model.master.Knowinspectrecord;
|
|
|
+import com.gyee.impala.service.gdoa.EqJxjlService;
|
|
|
+import com.gyee.impala.service.master.KnowinspectService;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.hadoop.mapreduce.ID;
|
|
|
+import org.apache.kudu.client.*;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+@Service
|
|
|
+public class KnowinspectrecordServiceImpl extends ServiceImpl<KnowinspectrecordMapper, Knowinspectrecord> implements KnowinspectService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private KuduDataSourceConfig kuduConfig;
|
|
|
+ @Resource
|
|
|
+ private EqJxjlService eqJxjlService;
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void insertBatch(String date) {
|
|
|
+ List<Knowinspectrecord> list = eqJxjlService.getEquipmentList(date);
|
|
|
+ if (list == null || list.size() == 0)
|
|
|
+ return;
|
|
|
+
|
|
|
+ list.stream().filter(a -> a.getStationcn().contains("风电场") && StringUtils.isNotEmpty(a.getStarttime())
|
|
|
+ && a.getWindturbinename().contains("风机") && StringUtils.isNotEmpty(a.getWindturbineid())
|
|
|
+ && a.getWindturbineid().contains("G"))
|
|
|
+ .forEach(item -> {
|
|
|
+ if (!InitialRunner.stationenMap.containsKey(item.getStationcn()) || !item.getWindturbineid().contains("G")) {
|
|
|
+ list.remove(item);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ item.setStationen(InitialRunner.stationenMap.get(item.getStationcn()));
|
|
|
+ item.setWindturbineid(item.getWindturbineid().substring(0, 2) + "01_" + item.getWindturbineid().substring(2));
|
|
|
+ });
|
|
|
+
|
|
|
+ try{
|
|
|
+ insert(list);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error(e.getMessage());
|
|
|
+ throw new CustomException(ResultCode.ERROR_DATA);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String selectMaxTime() {
|
|
|
+ String time = baseMapper.getMaxByTime();
|
|
|
+ return time;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<Knowinspectrecord> getListByStationAndTime(String station, String st, String et) {
|
|
|
+ ExcludeQueryWrapper<Knowinspectrecord> wrapper = new ExcludeQueryWrapper<>();
|
|
|
+ wrapper.eq("stationen", station)
|
|
|
+ .ge("starttime", st)
|
|
|
+ .le("endtime", et)
|
|
|
+ .orderByDesc("starttime");
|
|
|
+
|
|
|
+ List<Knowinspectrecord> list = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ list = baseMapper.selectList(wrapper);
|
|
|
+ } catch (CustomException e){}
|
|
|
+
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 由于mybatis-plus存储的中文乱码
|
|
|
+ * 采用原生写法
|
|
|
+ * @param list
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private void insert(List<Knowinspectrecord> list) throws KuduException {
|
|
|
+ KuduTable kuduTable = kuduConfig.kuduClient.openTable("impala::gyee_sample_kudu.knowinspectrecord");
|
|
|
+ KuduSession kuduSession = kuduConfig.kuduSession();
|
|
|
+ int i = 0;
|
|
|
+ for (Knowinspectrecord obj : list){
|
|
|
+ Insert insert = kuduTable.newInsert();
|
|
|
+ // 获取Row对象,设置插入的值
|
|
|
+ PartialRow row = insert.getRow();
|
|
|
+ row.addObject("id", SnowFlakeUtil.generateId());
|
|
|
+ row.addObject("stationcn", obj.getStationcn());
|
|
|
+ row.addObject("stationen", obj.getStationen());
|
|
|
+ row.addObject("windturbineid", obj.getWindturbineid());
|
|
|
+ row.addObject("windturbinename", obj.getWindturbinename());
|
|
|
+ row.addObject("inspectname", obj.getInspectname());
|
|
|
+ row.addObject("inspecttype", obj.getInspecttype());
|
|
|
+ row.addObject("starttime", obj.getStarttime());
|
|
|
+ row.addObject("endtime", obj.getEndtime());
|
|
|
+ row.addObject("inspectuser", obj.getInspectuser());
|
|
|
+ row.addObject("record", obj.getRecord());
|
|
|
+ row.addObject("bjghqk", obj.getBjghqk());
|
|
|
+ row.addObject("remark", obj.getRemark());
|
|
|
+ row.addObject("result", obj.getResult());
|
|
|
+ row.addObject("problem", obj.getProblem());
|
|
|
+ row.addObject("category", obj.getCategory());
|
|
|
+
|
|
|
+ // 先不提交kudu
|
|
|
+ kuduSession.apply(insert);
|
|
|
+ i ++;
|
|
|
+ if (i % kuduConfig.getCount() == 0)
|
|
|
+ kuduSession.flush(); //批量写入kudu
|
|
|
+ }
|
|
|
+
|
|
|
+ // 最后,补加一条批量写入
|
|
|
+ kuduSession.flush();
|
|
|
+ // 关闭和kudu的会话
|
|
|
+ kuduSession.close();
|
|
|
+ }
|
|
|
+}
|