|
@@ -2,18 +2,24 @@ package com.gyee.impala.service.impl.master;
|
|
|
|
|
|
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
+import com.gyee.impala.common.config.datasource.KuduDataSourceConfig;
|
|
|
import com.gyee.impala.common.exception.CustomException;
|
|
|
import com.gyee.impala.common.result.ResultCode;
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
+import com.gyee.impala.common.spring.InitialRunner;
|
|
|
+import com.gyee.impala.common.util.DateUtil;
|
|
|
import com.gyee.impala.mapper.master.WindturbineMapper;
|
|
|
import com.gyee.impala.model.master.Windturbine;
|
|
|
import com.gyee.impala.service.master.WindturbineService;
|
|
|
+import org.apache.kudu.client.*;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
import java.util.List;
|
|
|
|
|
|
/**
|
|
|
* <p>
|
|
|
- * 服务实现类
|
|
|
+ * 服务实现类
|
|
|
* </p>
|
|
|
*
|
|
|
* @author chenmh
|
|
@@ -22,6 +28,11 @@ import java.util.List;
|
|
|
@Service
|
|
|
public class WindturbineServiceImpl extends ServiceImpl<WindturbineMapper, Windturbine> implements WindturbineService {
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private InitialRunner initialRunner;
|
|
|
+ @Autowired
|
|
|
+ private KuduDataSourceConfig kuduConfig;
|
|
|
+
|
|
|
@Override
|
|
|
public List<Windturbine> getWindTurbineId(String station) {
|
|
|
try {
|
|
@@ -29,9 +40,90 @@ public class WindturbineServiceImpl extends ServiceImpl<WindturbineMapper, Windt
|
|
|
wrapper.eq("station", station);
|
|
|
wrapper.orderByAsc("id");
|
|
|
return baseMapper.selectList(wrapper);
|
|
|
- } catch (CustomException e){
|
|
|
+ } catch (CustomException e) {
|
|
|
+ log.error(e.getMessage());
|
|
|
+ throw new CustomException(ResultCode.ERROR_DATA);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void insertBatch(List<Windturbine> list) {
|
|
|
+ if (list == null || list.size() == 0)
|
|
|
+ return;
|
|
|
+ try {
|
|
|
+ insert(list);
|
|
|
+ initialRunner.cacheStation();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error(e.getMessage());
|
|
|
+ throw new CustomException(ResultCode.ERROR_DATA);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void editItem(Windturbine obj) {
|
|
|
+ try {
|
|
|
+ baseMapper.updateById(obj);
|
|
|
+ initialRunner.cacheStation();
|
|
|
+ } catch (CustomException e) {
|
|
|
log.error(e.getMessage());
|
|
|
throw new CustomException(ResultCode.ERROR_DATA);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deleteItem(String id) {
|
|
|
+ QueryWrapper<Windturbine> query = new QueryWrapper<>();
|
|
|
+ query.eq("id", id);
|
|
|
+
|
|
|
+ try {
|
|
|
+ baseMapper.delete(query);
|
|
|
+ initialRunner.cacheStation();
|
|
|
+ } catch (CustomException e) {
|
|
|
+ log.error(e.getMessage());
|
|
|
+ throw new CustomException(ResultCode.ERROR_DATA);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 由于mybatis-plus存储的中文乱码
|
|
|
+ * 采用原生写法
|
|
|
+ *
|
|
|
+ * @param list
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private void insert(List<Windturbine> list) throws KuduException {
|
|
|
+ KuduTable kuduTable = kuduConfig.kuduClient.openTable("impala::gyee_sample_kudu.windturbine");
|
|
|
+ KuduSession kuduSession = kuduConfig.kuduSession();
|
|
|
+ int i = 0;
|
|
|
+ for (Windturbine obj : list) {
|
|
|
+ Insert insert = kuduTable.newInsert();
|
|
|
+ // 获取Row对象,设置插入的值
|
|
|
+ PartialRow row = insert.getRow();
|
|
|
+ row.addObject("id", obj.getId());
|
|
|
+ row.addObject("station", obj.getStation());
|
|
|
+ row.addObject("longitude", obj.getLongitude());
|
|
|
+ row.addObject("latitude", obj.getLatitude());
|
|
|
+ row.addObject("model", obj.getModel());
|
|
|
+ row.addObject("projectid", obj.getProjectid());
|
|
|
+ row.addObject("lineid", obj.getLineid());
|
|
|
+ row.addObject("name", obj.getName());
|
|
|
+ row.addObject("remark", obj.getRemark());
|
|
|
+ row.addObject("category", obj.getCategory());
|
|
|
+ row.addObject("manufacturer", obj.getManufacturer());
|
|
|
+ row.addObject("time", DateUtil.getCurrentDate());
|
|
|
+
|
|
|
+ // 先不提交kudu
|
|
|
+ kuduSession.apply(insert);
|
|
|
+ i++;
|
|
|
+ if (i % kuduConfig.getCount() == 0)
|
|
|
+ kuduSession.flush(); //批量写入kudu
|
|
|
+ }
|
|
|
+
|
|
|
+ // 最后,补加一条批量写入
|
|
|
+ kuduSession.flush();
|
|
|
+ // 关闭和kudu的会话
|
|
|
+ kuduSession.close();
|
|
|
+ }
|
|
|
}
|