ソースを参照

1、集成sqlite数据库
2、支持读取csv文件存到缓存
3、build.gradle修改打成jar包问题

‘xugp 2 年 前
コミット
057a5af986

+ 12 - 0
bridge/build.gradle

@@ -1,6 +1,18 @@
 buildscript {
+    repositories {
+        mavenLocal()
+        maven {
+            allowInsecureProtocol = true
+            url "http://maven.aliyun.com/nexus/content/groups/public" }
+        mavenCentral()
+    }
+    dependencies {
+        classpath("$bootGroup:spring-boot-gradle-plugin:$springBootVersion")
+    }
 }
 
+apply plugin: "$bootGroup"
+apply plugin: 'io.spring.dependency-management'
 dependencies {
     api("commons-codec:commons-codec:$commonsCodecVersion")
     api("org.apache.commons:commons-lang3:$commonsLang3Version")

+ 14 - 1
gateway/build.gradle

@@ -1,6 +1,19 @@
 buildscript {
+    repositories {
+        mavenLocal()
+        maven {
+            allowInsecureProtocol = true
+            url "http://maven.aliyun.com/nexus/content/groups/public" }
+        mavenCentral()
+    }
+    dependencies {
+        classpath("$bootGroup:spring-boot-gradle-plugin:$springBootVersion")
+    }
 }
 
+apply plugin: "$bootGroup"
+apply plugin: 'io.spring.dependency-management'
+
 dependencies {
     implementation project(":common:utils")
     implementation("io.netty:netty-all:$nettyVersion")
@@ -13,6 +26,6 @@ dependencies {
     implementation("org.apache.rocketmq:rocketmq-client:$rocketMq")
     implementation("com.google.protobuf:protobuf-java:$protoBuf")
     implementation("net.sourceforge.javacsv:javacsv:$Javacsv")
-    implementation 'com.google.protobuf:protobuf-java-util:3.21.2'
+    implementation("org.xerial:sqlite-jdbc:$sqliteJdbc")
 
 }

+ 2 - 5
gateway/src/main/java/com/gyee/edge/gateway/ApplicationEventListener.java

@@ -2,11 +2,11 @@ package com.gyee.edge.gateway;
 
 import com.gyee.edge.gateway.bridge.rocketmq.producter.AsyncProducter;
 import com.gyee.edge.gateway.config.cache.PointCache;
-import com.gyee.edge.gateway.config.point.ReadCSVData;
+import com.gyee.edge.gateway.config.read.ReadPoint102SqliteData;
+import com.gyee.edge.gateway.config.read.ReadPointSqLiteData;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.event.ApplicationReadyEvent;
 import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
@@ -15,9 +15,6 @@ import org.springframework.stereotype.Component;
 @Slf4j
 public class ApplicationEventListener implements
         ApplicationListener<ApplicationReadyEvent> {
-    @Autowired
-    private PointCache pointCache;
-
 
     @SneakyThrows
     @Override

+ 2 - 2
gateway/src/main/java/com/gyee/edge/gateway/config/cache/PointCache.java

@@ -1,7 +1,7 @@
 package com.gyee.edge.gateway.config.cache;
 
 import com.gyee.edge.gateway.config.point.PointMapper;
-import com.gyee.edge.gateway.config.point.ReadCSVData;
+import com.gyee.edge.gateway.config.read.ReadPointCSVData;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
@@ -16,7 +16,7 @@ public class PointCache {
 
     public ArrayList<PointMapper> getPointMapper(){
         if (strList.size() <= 0){
-            strList = ReadCSVData.readConfig(filePath);
+            strList = ReadPointCSVData.readConfig(filePath);
             return strList;
         }
         else

+ 14 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/point/Point102Mapper.java

@@ -0,0 +1,14 @@
+package com.gyee.edge.gateway.config.point;
+
+import lombok.Data;
+
+@Data
+public class Point102Mapper {
+    private int iec102_addr;
+    private float iec102_coeff;
+    private float iec102_base;
+    private int gyfp_addr;
+    private int gyfp_valid;
+    private float gyfp_coeff;
+    private float gyfp_base;
+}

+ 45 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/read/ReadPoint102SqliteData.java

@@ -0,0 +1,45 @@
+package com.gyee.edge.gateway.config.read;
+
+
+import com.gyee.edge.gateway.config.point.Point102Mapper;
+import com.gyee.edge.gateway.config.point.PointMapper;
+import com.gyee.edge.gateway.config.sqlite.Database;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class ReadPoint102SqliteData {
+    @Autowired
+    private Database db;
+
+    public List<Point102Mapper> readData() {
+        List<Point102Mapper> point102MapperList = new ArrayList<>();
+        try {
+            Connection conn = db.getConnection();
+            String sql = "select * from key_table";
+            PreparedStatement ps = conn.prepareStatement(sql);
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                Point102Mapper point102Mapper = new Point102Mapper();
+                point102Mapper.setGyfp_base(rs.getFloat("gyfp_base"));
+                point102Mapper.setGyfp_addr(rs.getInt("gyfp_addr"));
+                point102Mapper.setGyfp_coeff(rs.getInt("gyfp_coeff"));
+                point102Mapper.setIec102_addr(rs.getInt("iec102_addr"));
+                point102Mapper.setIec102_base(rs.getFloat("iec102_base"));
+                point102Mapper.setIec102_coeff(rs.getFloat("iec102_coeff"));
+                point102Mapper.setGyfp_valid(rs.getInt("gyfp_valid"));
+                point102MapperList.add(point102Mapper);
+            }
+            db.releaseResource();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return point102MapperList;
+    }
+}

+ 3 - 2
gateway/src/main/java/com/gyee/edge/gateway/config/point/ReadCSVData.java

@@ -1,13 +1,14 @@
-package com.gyee.edge.gateway.config.point;
+package com.gyee.edge.gateway.config.read;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import com.csvreader.CsvReader;
+import com.gyee.edge.gateway.config.point.PointMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class ReadCSVData {
+public class ReadPointCSVData {
     public static ArrayList<PointMapper> readConfig(String filePath)  {
         //默认路径
         if (filePath.length()>0) {

+ 44 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/read/ReadPointSqLiteData.java

@@ -0,0 +1,44 @@
+package com.gyee.edge.gateway.config.read;
+
+
+import com.gyee.edge.gateway.config.point.PointMapper;
+import com.gyee.edge.gateway.config.sqlite.Database;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class ReadPointSqLiteData {
+    @Autowired
+    private Database db;
+
+    public List<PointMapper> readData() {
+        List<PointMapper> pointMapperList = new ArrayList<>();
+        try {
+            Connection conn = db.getConnection();
+            String sql = "select * from point";
+            PreparedStatement ps = conn.prepareStatement(sql);
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                PointMapper pointMapper = new PointMapper();
+                pointMapper.setRatio(rs.getDouble("ratio"));
+                pointMapper.setDataSource(rs.getString("datasource"));
+                pointMapper.setMessageKey(rs.getInt("messagekey"));
+                pointMapper.setSourceAddress(rs.getInt("sourceaddress"));
+                pointMapper.setMessageId(rs.getString("messageid"));
+                pointMapper.setProtocolSource(rs.getString("protocolsource"));
+                pointMapperList.add(pointMapper);
+            }
+            System.out.println(pointMapperList.get(0));
+            db.releaseResource();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return pointMapperList;
+    }
+}

+ 364 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/sqlite/DB.java

@@ -0,0 +1,364 @@
+package com.gyee.edge.gateway.config.sqlite;
+
+import java.io.Reader;
+import java.io.StringReader;
+import java.sql.*;
+import java.text.MessageFormat;
+import java.util.*;
+
+public class DB {
+    //数据库驱动,默认为Oracle
+    public static String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver";
+
+    //数据库连接地址
+    public static String DB_URL = "";
+
+    public static String USER = "";
+
+    public static String PASS = "";
+
+    Connection CONN = null;
+
+    //PreparedStatement PSTM = null;
+
+    public DB(String url, String user, String pass) {
+        DB_URL = url;
+        USER = user;
+        PASS = pass;
+    }
+
+    public DB() {
+        /*Properties props = new Properties();
+        try {
+            String configFilePath = System.getProperty("antgis.web.oa.root")+"/WEB-INF/config.properties";
+            props.load(new FileInputStream(configFilePath));
+        }catch (FileNotFoundException e){
+
+        }catch (IOException e){
+
+        }*/
+
+
+//        JDBC_DRIVER = Config.DB_DRIVER;
+//        DB_URL = Config.DB_URL;
+//        USER = Config.DB_USER;
+//        PASS = Config.DB_PWD;
+    }
+
+    public Connection getConnection() {
+        try {
+            Class.forName(JDBC_DRIVER);
+            CONN = DriverManager.getConnection(DB_URL, USER, PASS);
+        } catch (ClassNotFoundException e) {
+
+        } catch (SQLException e) {
+
+        }
+        return CONN;
+    }
+
+    public ResultSet getResultSet(String sql) {
+        ResultSet resultSet = null;
+        //PreparedStatement pstm = null;
+        try {
+            CONN = getConnection();
+            PreparedStatement PSTM = CONN.prepareStatement(sql);
+            resultSet = PSTM.executeQuery();
+            return resultSet;
+        } catch (SQLException e) {
+            return resultSet;
+        } finally {
+
+        }
+    }
+
+    public void close() {
+        /*if(PSTM !=null) {
+            try {
+                PSTM.close();
+            } catch (SQLException e) {
+                e.printStackTrace();
+            }
+        }*/
+
+        try {
+            if (CONN != null && (!CONN.isClosed())) {
+                try {
+                    CONN.close();
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public List<Map<String, Object>> getList(String sql) {
+        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+        ResultSet rs = getResultSet(sql);
+        list = convertToList(rs);
+        return list;
+    }
+
+    public List<Map<String, Object>> convertToList(ResultSet rs) {
+        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+        try {
+            ResultSetMetaData md = rs.getMetaData(); //获得结果集结构信息,元数据
+            int columnCount = md.getColumnCount();   //获得列数
+            while (rs.next()) {
+                Map<String, Object> rowData = new HashMap<String, Object>();
+                for (int i = 1; i <= columnCount; i++) {
+                    rowData.put(md.getColumnName(i), rs.getObject(i));
+                }
+                list.add(rowData);
+            }
+        } catch (SQLException e) {
+
+        }
+
+        return list;
+    }
+
+    public ResultSet getResultSet(String tableName, String filter) {
+        String sql = MessageFormat.format("select * from {0} where {1}", tableName, filter);
+        return getResultSet(sql);
+    }
+
+    public ResultSet getResultSet(String tableName, String columnList, String filter) {
+        String sql = MessageFormat.format("select {2} from {0} where {1}", tableName, filter, columnList);
+        return getResultSet(sql);
+    }
+
+    public ResultSet getResultSet(String tableName, String columnList, String filter, String orderColumnList) {
+        String sql = MessageFormat.format("select {2} from {0} where {1} order by {3}", tableName, filter, columnList, orderColumnList);
+        return getResultSet(sql);
+    }
+
+    public Object getFirstColumn(String sql) {
+        ResultSet rs = getResultSet(sql);
+        try {
+            if (rs.next()) {
+                return rs.getObject(0);
+            }
+        } catch (SQLException e) {
+
+        }
+        return null;
+    }
+
+    public String getSqlForPageSize(String tableName, Integer pageSize, Integer pageIndex, String where, String order) {
+        return getSqlForPageSize(tableName, "*", pageSize, pageIndex, where, order);
+    }
+
+    public String getSqlForPageSize(String tableName, String sColumnList, int pageSize, int pageIndex, String where, String order) {
+        String sSql = "";
+        StringBuilder sb = new StringBuilder();
+        sb.append("select " + sColumnList + " from (select t.*,rownum rno ");
+        sb.append(" from (select * from " + tableName + " ");
+        sb.append(" where " + where);
+        sb.append(order);
+        sb.append(") t where rownum <= ");
+        sb.append((pageIndex + 1) * pageSize);
+        sb.append(" )");
+        sb.append(" where rno >= ");
+        sb.append(pageIndex * pageSize + 1);
+        sSql = sb.toString();
+        return sSql;
+    }
+
+    public boolean judgeRecordExist(String tableName, String filter) {
+        ResultSet rs = getResultSet(tableName, filter);
+        try {
+            if (rs.next())
+                return true;
+            else
+                return false;
+        } catch (SQLException e) {
+            return false;
+        }
+    }
+
+    public String formatTableName(String tableName) {
+        return tableName.replace("\"", "");
+    }
+
+    public boolean judgeTableOrViewExist(String tableName) {
+        String sql = "select count(*) from user_objects where object_type in ('TABLE','VIEW') AND upper(OBJECT_NAME)=upper('" + formatTableName(tableName) + "')";
+        ResultSet rs = getResultSet(sql);
+        return judgeCountValue(rs);
+    }
+
+    public boolean judgeColumnExist(String tableName, String columnName) {
+        String sql = "select count(*) from user_tab_cols where upper(table_name)=upper('" + formatTableName(tableName) + "') and upper(COLUMN_NAME)=upper('" + columnName + "') order by COLUMN_ID";
+        ResultSet rs = getResultSet(sql);
+        return judgeCountValue(rs);
+    }
+
+    private boolean judgeCountValue(ResultSet rs) {
+        try {
+            if (rs.next()) {
+                if (Integer.parseInt(rs.getObject(0).toString()) == 1)
+                    return true;
+                else
+                    return false;
+            } else {
+                return false;
+            }
+        } catch (SQLException e) {
+            return false;
+        }
+    }
+
+    public HashMap getTableColumnType(String tableName) {
+        String sql = "select COLUMN_NAME,DATA_TYPE,DATA_LENGTH,COLUMN_ID from user_tab_cols where upper(table_name)=upper('" + formatTableName(tableName) + "') order by COLUMN_ID";
+        ResultSet rs = getResultSet(sql);
+        HashMap hm = new HashMap();
+        try {
+            while (rs.next()) {
+                String name = rs.getString("COLUMN_NAME");
+                String value = rs.getString("DATA_TYPE");
+                hm.put(name, value);
+            }
+        } catch (SQLException e) {
+
+        }
+
+        return hm;
+    }
+
+    public boolean add(String tableName, LinkedHashMap<String, Object> hm) throws Exception {
+        if (tableName == null || tableName.equals("")) {
+            throw new Exception("表名不能为空字符串!");
+        }
+
+        String sColumns = "";
+        String sValues = "";
+
+        Iterator iter = hm.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry entry = (Map.Entry) iter.next();
+            sColumns += entry.getKey().toString() + ",";
+            sValues += "?,";
+        }
+        /*for(String key:hm.keySet()){
+            sColumns += key +",";
+            sValues += "?,";
+        }*/
+
+        sColumns = sColumns.substring(0, sColumns.length() - 1);
+        sValues = sValues.substring(0, sValues.length() - 1);
+        String sql = MessageFormat.format("insert into {0} ({1}) values({2})", tableName, sColumns, sValues);
+
+        return executeSql(sql, tableName, hm);
+    }
+
+    public boolean update(String tableName, LinkedHashMap<String, Object> hm, String filterColumns) throws Exception {
+        if (tableName == null || tableName.equals("")) {
+            throw new Exception("表名不能为空字符串!");
+        }
+
+        String filter = getFilterString(tableName, hm, filterColumns);
+
+        return update2(tableName, hm, filter);
+    }
+
+    public boolean update2(String tableName, LinkedHashMap<String, Object> hm, String filter) throws Exception {
+        if (tableName == null || tableName.equals("")) {
+            throw new Exception("表名不能为空字符串!");
+        }
+
+        String str = "";
+        Iterator iter = hm.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry entry = (Map.Entry) iter.next();
+            str += entry.getKey().toString() + "=?,";
+        }
+
+        str = str.substring(0, str.length() - 1);
+        String sql = MessageFormat.format("update {0} set {1} where {2}", tableName, str, filter);
+        return executeSql(sql, tableName, hm);
+    }
+
+    private String getFilterString(String tableName, LinkedHashMap<String, Object> hm, String filterColumns) {
+        String[] arr = filterColumns.split(",");
+        int length = arr.length;
+        HashMap columnsType = getTableColumnType(tableName);
+        String filter = "";
+        for (int ii = 0; ii < length; ii++) {
+            String columnType = columnsType.get(arr[ii]).toString();
+            switch (columnType) {
+                case "NUMBER":
+                    filter += "and " + arr[ii] + "=" + hm.get(arr[ii]) + " ";
+                    break;
+                case "DATE":
+                    filter += "and " + arr[ii] + "=to_date('" + hm.get(arr[ii]).toString() + "','yyyy-mm-dd hh24:mi:ss') ";
+                    break;
+                default:
+                    filter += "and " + arr[ii] + "='" + hm.get(arr[ii]).toString() + "' ";
+                    break;
+
+            }
+        }
+        filter = filter.substring(3);
+        return filter;
+    }
+
+
+    public boolean executeSql(String sql, String tableName, LinkedHashMap<String, Object> hm) {
+        boolean bl = false;
+        int i = 0;
+        try {
+            CONN = getConnection();
+            PreparedStatement pstm = CONN.prepareStatement(sql);
+            HashMap columnsType = getTableColumnType(tableName);
+
+            Iterator iter = hm.entrySet().iterator();
+            int index = 0;
+            while (iter.hasNext()) {
+                index += 1;
+                Map.Entry entry = (Map.Entry) iter.next();
+                String key = entry.getKey().toString();
+                Object val = entry.getValue();
+                String columnType = columnsType.get(key).toString();
+                switch (columnType) {
+                    case "NVARCHAR2":
+                    case "NCHAR":
+                    case "VARCHAR2":
+                    case "CHAR":
+                    case "NUMBER":
+                        pstm.setString(index, val.toString());
+                        break;
+                    case "DATE":
+                        Timestamp dateTime = Timestamp.valueOf(val.toString());
+                        pstm.setTimestamp(index, dateTime);
+                        break;
+                    /*case "BLOB":
+                        PSTM.setBlob(index,oracle.sql.BLOB.getEmptyBLOB());
+
+                        break;*/
+                    case "CLOB":
+                        String str = val.toString();
+                        Reader clobReader = new StringReader(str); // 将 text转成流形式
+                        pstm.setClob(index, clobReader, str.length());
+                        break;
+                    default:
+                        pstm.setString(index, val.toString());
+                        break;
+
+                }
+            }
+
+            i = pstm.executeUpdate();
+            if (i > 0)
+                bl = true;
+
+
+        } catch (SQLException e) {
+            String msg = e.getMessage();
+        }
+
+
+        return bl;
+    }
+}

+ 62 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/sqlite/Database.java

@@ -0,0 +1,62 @@
+package com.gyee.edge.gateway.config.sqlite;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.sql.*;
+
+@Component
+public class Database {
+
+    @Value("${spring.datasource.driver-class-name}")
+    private String driver;
+
+    @Value("${spring.datasource.url}")
+    private String url;
+
+    Connection connection = null;
+
+    PreparedStatement preparedStatement = null;
+    ResultSet rs = null;
+
+
+    public Connection getConnection() throws Exception {
+        try{
+            Class.forName(driver);
+            connection = DriverManager.getConnection(url);
+            System.out.println("数据库连接成功");
+
+        }catch (Exception e){
+            throw new Exception(e.getMessage());
+        }
+
+        return connection;
+    }
+
+    //释放资源
+    public void releaseResource(){
+        if(rs!=null){
+            try{
+                rs.close();
+            }catch (SQLException e){
+                e.printStackTrace();
+            }
+        }
+
+        if(preparedStatement!=null){
+            try{
+                preparedStatement.close();
+            }catch (SQLException e){
+                e.printStackTrace();
+            }
+        }
+
+        if(connection!=null){
+            try{
+                connection.close();
+            }catch (SQLException e){
+                e.printStackTrace();
+            }
+        }
+    }
+}

+ 6 - 1
gateway/src/main/resources/application.yaml

@@ -3,7 +3,12 @@ server:
 spring:
   application:
   name: gateway
-
+  datasource:
+    driver-class-name: org.sqlite.JDBC
+#    F:\\colud_ideaspace\\edge\\gateway\\src\\main\\resources\\myDb
+    url: jdbc:sqlite::resource:keytable.sqlite3
+    username:
+    password:
 
 filePath: D:\\data.csv
 

BIN
gateway/src/main/resources/keytable.sqlite3


+ 2 - 1
gradle.properties

@@ -40,4 +40,5 @@ gsonVersion=2.8.5
 rocketMq=4.8.0
 protoBuf=3.21.2
 openFeignVersion=11.8
-Javacsv=2.0
+Javacsv=2.0
+sqliteJdbc=3.8.11.2

+ 12 - 0
loader/build.gradle

@@ -1,6 +1,18 @@
 buildscript {
+    repositories {
+        mavenLocal()
+        maven {
+            allowInsecureProtocol = true
+            url "http://maven.aliyun.com/nexus/content/groups/public" }
+        mavenCentral()
+    }
+    dependencies {
+        classpath("$bootGroup:spring-boot-gradle-plugin:$springBootVersion")
+    }
 }
 
+apply plugin: "$bootGroup"
+apply plugin: 'io.spring.dependency-management'
 dependencies {
     implementation project(":common:utils")
     implementation("$bootGroup:spring-boot-starter:$springBootVersion")

+ 67 - 0
loader/src/main/java/com/gyee/edge/loader/rocketmq/comuser/BalanceComuser.java

@@ -0,0 +1,67 @@
+package com.gyee.edge.loader.rocketmq.comuser;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.gyee.edge.common.utils.util.ProtoJsonUtil;
+import com.gyee.edge.loader.golden.DataWrite;
+import com.gyee.edge.loader.protobuf.UserProto;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.List;
+
+@Slf4j
+@Component
+public class BalanceComuser {
+
+    private DataWrite dataWrite;
+        public void DataConsumer() throws Exception {
+            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_sonsumer");
+            consumer.setNamesrvAddr("127.0.0.1:9876");
+            consumer.subscribe("keyMessage", "*");
+            consumer.setMessageModel(MessageModel.CLUSTERING);
+            consumer.registerMessageListener(new MessageListenerConcurrently() {
+                @Override
+                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+                    try {
+                        for (MessageExt msg : msgs) {
+                            String topic = msg.getTopic();
+                            //String msgBody = new String(msg.getBody(), "utf-8");
+                            String tags = msg.getTags();
+                            try {
+                                //反序列化
+                                Date st = new Date();
+                                System.out.println("反序列化前时间"+(st.getTime()));
+                                UserProto.User user1 = UserProto.User.parseFrom(msg.getBody());
+                                String jsonObject = ProtoJsonUtil.toJson(user1);
+                                log.info("收到消息" + "topic" + topic + ",tags" + tags + ",msg" + user1);
+                                //数据写入
+                                Date et = new Date();
+                                System.out.println("反序列化后时间"+(et.getTime()));
+                                System.out.println("序列化时间"+(et.getTime()-st.getTime()));
+                                dataWrite.DataWriteGloden(jsonObject);
+                            } catch (InvalidProtocolBufferException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                    }
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                }
+            });
+            consumer.start();  //启动消费者
+            log.info("Consumer STARTTED");
+        }
+
+    public void setWriteGolden(DataWrite dataWrite) {
+        this.dataWrite = dataWrite;
+    }
+}