From 486a7f930641b9d70a3b7d4e6a1518ff301ec38f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=AE=E7=AC=91=E7=9D=80=E9=9D=A2=E5=AF=B9=E6=98=8E?= =?UTF-8?q?=E5=A4=A9?= <752558143@qq.com> Date: Thu, 11 Jul 2024 11:45:55 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mes/pcn/api/base/IMesEquipmentLogService.java | 3 + .../pcn/api/busi/IMesEquipmentLogExtService.java | 2 + .../i3plus/ext/mes/pcn/api/mqtt/MqttService.java | 38 ++++ modules/i3plus-ext-mes-pcn-apiservice/pom.xml | 6 + .../ext/mes/pcn/apiservice/config/MqttConfig.java | 69 ++++++++ .../controller/mqtt/PcnMqttController.java | 55 ++++++ .../mes/pcn/apiservice/mqtt/PcnMqttCallback.java | 125 ++++++++++++++ .../ext/mes/pcn/apiservice/mqtt/PcnMqttClient.java | 191 +++++++++++++++++++++ .../serviceimpl/base/MesEquipmentLogService.java | 6 + .../base/MesPojoVersionServiceImpl.java | 8 + .../busi/MesShippingLoadingCheckService.java | 2 +- .../busi/MesSortShippingCheckService.java | 8 +- .../equiplog/MesEquipmentLogExtService.java | 7 + .../serviceimpl/mqtt/MqttServiceImpl.java | 60 +++++++ .../ext/mes/pcn/apiservice/util/SpringUtils.java | 129 ++++++++++++++ .../ext/mes/pcn/pojo/mqtt/EquipLogMqttMsg.java | 35 ++++ pom.xml | 9 +- 17 files changed, 748 insertions(+), 5 deletions(-) create mode 100644 modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/mqtt/MqttService.java create mode 100644 modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/config/MqttConfig.java create mode 100644 modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/controller/mqtt/PcnMqttController.java create mode 100644 modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttCallback.java create mode 100644 modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttClient.java create mode 100644 modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/mqtt/MqttServiceImpl.java create mode 100644 modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/util/SpringUtils.java create mode 100644 modules/i3plus-ext-mes-pcn-pojo/src/main/java/cn/estsh/i3plus/ext/mes/pcn/pojo/mqtt/EquipLogMqttMsg.java diff --git a/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/base/IMesEquipmentLogService.java b/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/base/IMesEquipmentLogService.java index c33812c..c9901ee 100644 --- a/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/base/IMesEquipmentLogService.java +++ b/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/base/IMesEquipmentLogService.java @@ -1,6 +1,7 @@ package cn.estsh.i3plus.ext.mes.pcn.api.base; import cn.estsh.i3plus.ext.mes.pcn.pojo.model.ActorMessage; +import cn.estsh.i3plus.ext.mes.pcn.pojo.mqtt.EquipLogMqttMsg; public interface IMesEquipmentLogService { @@ -8,4 +9,6 @@ public interface IMesEquipmentLogService { void saveEquipmentLogDetails(String organizeCode); + + void updateValue(EquipLogMqttMsg equipLogMqttMsg); } diff --git a/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/busi/IMesEquipmentLogExtService.java b/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/busi/IMesEquipmentLogExtService.java index c3af99d..644a5d6 100644 --- a/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/busi/IMesEquipmentLogExtService.java +++ b/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/busi/IMesEquipmentLogExtService.java @@ -20,6 +20,8 @@ public interface IMesEquipmentLogExtService { @ApiOperation(value = "根据设备ID,设备数据变量ID集合 修改设备ID分表采集数据的状态") void updateEquipmentLogList(String organizeCode, Integer equipId, List equipVariableIdList); + void updateEquipmentLogValue(String organizeCode, Integer equipId, Long equipVariableId, String value); + @ApiOperation(value = "获取设备数据变量对应的采集数据") MesEquipLogDispatchContext doHandleEquipmentLogList(MesCellEquipContext cellEquipContext, List equipmentVariableList, List equipmentVariableCfgList, Boolean isResetEquipVariable); diff --git a/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/mqtt/MqttService.java b/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/mqtt/MqttService.java new file mode 100644 index 0000000..1c58fe7 --- /dev/null +++ b/modules/i3plus-ext-mes-pcn-api/src/main/java/cn/estsh/i3plus/ext/mes/pcn/api/mqtt/MqttService.java @@ -0,0 +1,38 @@ +package cn.estsh.i3plus.ext.mes.pcn.api.mqtt; + +public interface MqttService { + + /** + * 添加订阅主题 + * + * @param topic 主题名称 + */ + void addTopic(String topic); + + /** + * 取消订阅主题 + * + * @param topic 主题名称 + */ + void removeTopic(String topic); + + /** + * 发布主题消息内容 + * + * @param msgContent + * @param topic + */ + void publish(String msgContent, String topic); + + /** + * 发布主题消息内容 + * + */ + void unconnect(); + + /** + * 发布主题消息内容 + * + */ + void connect(); +} diff --git a/modules/i3plus-ext-mes-pcn-apiservice/pom.xml b/modules/i3plus-ext-mes-pcn-apiservice/pom.xml index c7f9a76..37467ed 100644 --- a/modules/i3plus-ext-mes-pcn-apiservice/pom.xml +++ b/modules/i3plus-ext-mes-pcn-apiservice/pom.xml @@ -68,6 +68,12 @@ i3plus-platform-plugin + + org.springframework.integration + spring-integration-mqtt + 6.1.2 + + diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/config/MqttConfig.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/config/MqttConfig.java new file mode 100644 index 0000000..7e5ac44 --- /dev/null +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/config/MqttConfig.java @@ -0,0 +1,69 @@ +package cn.estsh.i3plus.ext.mes.pcn.apiservice.config; + + +import cn.estsh.i3plus.ext.mes.pcn.api.base.IMesEquipmentLogService; +import cn.estsh.i3plus.ext.mes.pcn.apiservice.mqtt.PcnMqttClient; +import cn.estsh.i3plus.ext.mes.pcn.apiservice.serviceimpl.base.MesEquipmentLogService; +import cn.estsh.i3plus.ext.mes.pcn.apiservice.util.SpringUtils; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Arrays; +import java.util.List; + +@Slf4j +@Configuration +public class MqttConfig { + + @Value("${mqtt.host}") + public String host; + @Value("${mqtt.username}") + public String username; + @Value("${mqtt.password}") + public String password; + @Value("${mqtt.clientId}") + public String clientId; + @Value("${mqtt.timeout}") + public int timeOut; + @Value("${mqtt.keepalive}") + public int keepAlive; + + @Value("${mqtt.clearSession}") + public boolean clearSession; + @Value("${mqtt.topic}") + public String topic; + @Value("${mqtt.topic.list}") + private String topicList; + + @Bean//注入Spring + public PcnMqttClient myMqttClient() { + PcnMqttClient myMqttClient = new PcnMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession); + //手动注入 + MesEquipmentLogService service = SpringUtils.getBean(MesEquipmentLogService.class); + + myMqttClient.setService(service); + for (int i = 0; i < 10; i++) { + try { + List list = Arrays.asList(topicList.split(",")); + myMqttClient.connect(); + list.forEach(topic -> myMqttClient.subscribe(topic)); + log.info("== PcnStartSystemInit ==> 订阅主题成功,topicList:{}", topicList); + + } catch (MqttException e) { + log.error("== PcnStartSystemInit ==> MQTT connect exception, connect time = {}", i); + try { + Thread.sleep(2000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + + return myMqttClient; + } + +} diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/controller/mqtt/PcnMqttController.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/controller/mqtt/PcnMqttController.java new file mode 100644 index 0000000..3600434 --- /dev/null +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/controller/mqtt/PcnMqttController.java @@ -0,0 +1,55 @@ +package cn.estsh.i3plus.ext.mes.pcn.apiservice.controller.mqtt; + +import cn.estsh.i3plus.ext.mes.pcn.api.mqtt.MqttService; +import cn.estsh.i3plus.ext.mes.pcn.pojo.constant.MesCommonConstant; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * + */ +@RestController +@Api(value = "MyMqttController", tags = {"MQTT相关操作接口"}) +@RequestMapping(MesCommonConstant.MES_YANFEN + "/mqtt") +@Slf4j +public class PcnMqttController { + @Autowired + private MqttService mqttService; + + @GetMapping("/addTopic") + @ApiOperation(value = "添加订阅主题接口") + public void addTopic(String topic) { + mqttService.addTopic(topic); + } + + @GetMapping("/removeTopic") + @ApiOperation(value = "取消订阅主题接口") + public void removeTopic(String topic) { + mqttService.removeTopic(topic); + } + + @PostMapping("/public") + @ApiOperation(value = "发布主题消息内容接口") + public void publicTopic(String msgContent, String topic) { + mqttService.publish(msgContent, topic); + } + + + @PostMapping("/unconnect") + @ApiOperation(value = "退出连接接口") + public void unconnect() { + mqttService.unconnect(); + } + + @PostMapping("/connect") + @ApiOperation(value = "手动连接接口") + public void connect() { + mqttService.connect(); + } +} diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttCallback.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttCallback.java new file mode 100644 index 0000000..1f8ab8c --- /dev/null +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttCallback.java @@ -0,0 +1,125 @@ +package cn.estsh.i3plus.ext.mes.pcn.apiservice.mqtt; + +import cn.estsh.i3plus.ext.mes.pcn.api.base.IMesEquipmentLogService; +import cn.estsh.i3plus.ext.mes.pcn.apiservice.config.MqttConfig; +import cn.estsh.i3plus.ext.mes.pcn.apiservice.util.SpringUtils; +import cn.estsh.i3plus.ext.mes.pcn.pojo.mqtt.EquipLogMqttMsg; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Slf4j +public class PcnMqttCallback implements MqttCallbackExtended { + + //手动注入 + private MqttConfig mqttConfig = SpringUtils.getBean(MqttConfig.class); + + + private PcnMqttClient myMqttClient; + + private IMesEquipmentLogService equipmentLogService; + + public PcnMqttCallback(PcnMqttClient myMqttClient) { + this.myMqttClient = myMqttClient; + } + + + public PcnMqttCallback(PcnMqttClient myMqttClient, IMesEquipmentLogService equipmentLogService) { + this.myMqttClient = myMqttClient; + this.equipmentLogService = equipmentLogService; + } + /** + * MQTT Broker连接成功时被调用的方法。在该方法中可以执行 订阅系统约定的主题(推荐使用)。 + * 如果 MQTT Broker断开连接之后又重新连接成功时,主题也需要再次订阅,将重新订阅主题放在连接成功后的回调方法中比较合理。 + * + * @param reconnect + * @param serverURI MQTT Broker的url + */ + @Override + public void connectComplete(boolean reconnect, String serverURI) { + String connectMode = reconnect ? "重连" : "直连"; + log.info("== MyMqttCallback ==> MQTT 连接成功,连接方式:{},serverURI:{}", connectMode, serverURI); + //订阅主题 + //myMqttClient.subscribe(mqttConfig.topic, 1); + } + + + /** + * 丢失连接,可在这里做重连 + * 只会调用一次 + * + * @param throwable + */ + @Override + public void connectionLost(Throwable throwable) { + log.error("== MyMqttCallback ==> connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage()); + long reconnectTimes = 1; + while (true) { + try { + if (PcnMqttClient.getClient().isConnected()) { + //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择 + log.warn("== MyMqttCallback ==> mqtt reconnect success end 重新连接 重新订阅成功"); + return; + } + reconnectTimes += 1; + log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes); + PcnMqttClient.getClient().reconnect(); + } catch (MqttException e) { + log.error("== MyMqttCallback ==> mqtt断连异常", e); + } + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + } + } + } + + /** + * 接收到消息(subscribe订阅的主题消息)时被调用的方法 + * + * @param topic + * @param mqttMessage + * @throws Exception 后得到的消息会执行到这里面 + */ + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + log.info("== pcnMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, new String(mqttMessage.getPayload())); + try{ + String resStr = new String(mqttMessage.getPayload()); + /** + * 根据订阅的主题分别处理业务。可以通过if-else或者策略模式来分别处理不同的主题消息。 + */ + //topic1主题 + if (topic.equals("ABC")) { + EquipLogMqttMsg equipLogMqttMsg = JSONObject.parseObject(resStr, EquipLogMqttMsg.class); + + equipmentLogService.updateValue(equipLogMqttMsg); + //业务处理 + //doSomething1(maps); + log.info("== pcnMqttCallback ==> messageArrived 接收消息主题: {},业务处理消息内容完成", topic); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 消息发送(publish)完成时被调用的方法 + * + * @param iMqttDeliveryToken + */ + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.info("== MyMqttCallback ==> deliveryComplete 消息发送完成,Complete= {}", iMqttDeliveryToken.isComplete()); + } + +} diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttClient.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttClient.java new file mode 100644 index 0000000..4c6fe2b --- /dev/null +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttClient.java @@ -0,0 +1,191 @@ +package cn.estsh.i3plus.ext.mes.pcn.apiservice.mqtt; + + +import cn.estsh.i3plus.ext.mes.pcn.api.base.IMesEquipmentLogService; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +@Slf4j +public class PcnMqttClient { + + /** + * MQTT Broker 基本连接参数,用户名、密码为非必选参数 + */ + private String host; + private String username; + private String password; + private String clientId; + private int timeout; + private int keepalive; + private boolean clearSession; + + /** + * MQTT 客户端 + */ + private static MqttClient client; + + /** + * 服务 + */ + private IMesEquipmentLogService equipmentLogService; + + public static MqttClient getClient() { + return client; + } + + public static void setClient(MqttClient client) { + PcnMqttClient.client = client; + } + public void setService(IMesEquipmentLogService equipmentLogService) { + this.equipmentLogService = equipmentLogService; + } + + public PcnMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) { + this.host = host; + this.username = username; + this.password = password; + this.clientId = clientId; + this.timeout = timeOut; + this.keepalive = keepAlive; + this.clearSession = clearSession; + } + + /** + * 设置 MQTT Broker 基本连接参数 + * + * @param username + * @param password + * @param timeout + * @param keepalive + * @return + */ + public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) { + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(timeout); + options.setKeepAliveInterval(keepalive); + options.setCleanSession(clearSession); + options.setAutomaticReconnect(true); + return options; + } + + /** + * 连接 MQTT Broker,得到 MqttClient连接对象 + */ + public void connect() throws MqttException { + if (client == null) { + client = new MqttClient(host, clientId, new MemoryPersistence()); + // 设置回调 + client.setCallback(new PcnMqttCallback(PcnMqttClient.this, equipmentLogService)); + } + // 连接参数 + MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive); + if (!client.isConnected()) { + client.connect(mqttConnectOptions); + } else { + client.disconnect(); + client.connect(mqttConnectOptions); + } + log.info("== MyMqttClient ==> MQTT connect success");//未发生异常,则连接成功 + } + + + /** + * 连接 MQTT Broker,得到 MqttClient连接对象 + */ + public void unconnect() throws MqttException { + if (client == null) { + client = new MqttClient(host, clientId, new MemoryPersistence()); + // 设置回调 + client.setCallback(new PcnMqttCallback(PcnMqttClient.this)); + } + client.disconnect(); + + log.info("== MyMqttClient ==> MQTT unconnect success");//未发生异常,则连接成功 + } + /** + * 发布,默认qos为0,非持久化 + * + * @param pushMessage + * @param topic + */ + public void publish(String pushMessage, String topic) { + publish(pushMessage, topic, 0, false); + } + + /** + * 发布消息 + * + * @param pushMessage + * @param topic + * @param qos + * @param retained:留存 + */ + public void publish(String pushMessage, String topic, int qos, boolean retained) { + MqttMessage message = new MqttMessage(); + message.setPayload(pushMessage.getBytes()); + message.setQos(qos); + message.setRetained(retained); + MqttTopic mqttTopic = PcnMqttClient.getClient().getTopic(topic); + if (null == mqttTopic) { + log.error("== MyMqttClient ==> topic is not exist"); + } + MqttDeliveryToken token;//Delivery:配送 + synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 + try { + token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 + token.waitForCompletion(1000L); + } catch (MqttPersistenceException e) { + e.printStackTrace(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + } + + /** + * 订阅某个主题,qos默认为0 + * + * @param topic + */ + public void subscribe(String topic) { + subscribe(topic, 0); + } + + /** + * 订阅某个主题 + * + * @param topic + * @param qos + */ + public void subscribe(String topic, int qos) { + try { + PcnMqttClient.getClient().subscribe(topic, qos); + } catch (MqttException e) { + e.printStackTrace(); + } + log.info("== MyMqttClient ==> 订阅主题成功:topic = {}, qos = {}", topic, qos); + } + + + /** + * 取消订阅主题 + * + * @param topic 主题名称 + */ + public void cleanTopic(String topic) { + if (client != null && client.isConnected()) { + try { + client.unsubscribe(topic); + } catch (MqttException e) { + e.printStackTrace(); + } + } else { + log.error("== MyMqttClient ==> 取消订阅失败!"); + } + log.info("== MyMqttClient ==> 取消订阅主题成功:topic = {}", topic); + } + +} diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesEquipmentLogService.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesEquipmentLogService.java index cbcbab7..bb441fa 100644 --- a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesEquipmentLogService.java +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesEquipmentLogService.java @@ -3,6 +3,7 @@ package cn.estsh.i3plus.ext.mes.pcn.apiservice.serviceimpl.base; import cn.estsh.i3plus.ext.mes.pcn.api.base.IMesEquipmentLogService; import cn.estsh.i3plus.ext.mes.pcn.apiservice.serviceimpl.equiplog.MesEquipmentLogExtService; import cn.estsh.i3plus.ext.mes.pcn.pojo.model.ActorMessage; +import cn.estsh.i3plus.ext.mes.pcn.pojo.mqtt.EquipLogMqttMsg; import cn.estsh.i3plus.platform.common.convert.ConvertBean; import cn.estsh.i3plus.platform.common.util.CommonConstWords; import cn.estsh.i3plus.pojo.base.bean.DdlPackBean; @@ -144,4 +145,9 @@ public class MesEquipmentLogService implements IMesEquipmentLogService { // 删除key redisMesPcn.deleteKey(key); } + + @Override + public void updateValue(EquipLogMqttMsg equipLogMqttMsg) { + mesEquipmentLogExtService.updateEquipmentLogValue("CK01", Integer.valueOf(equipLogMqttMsg.getPTCode()), Long.valueOf(equipLogMqttMsg.getTagAddress()), equipLogMqttMsg.getValue()); + } } diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesPojoVersionServiceImpl.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesPojoVersionServiceImpl.java index 88fbe80..549f368 100644 --- a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesPojoVersionServiceImpl.java +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesPojoVersionServiceImpl.java @@ -217,6 +217,14 @@ public class MesPojoVersionServiceImpl implements IMesPojoVersionService { } } break; + case MES_LOADING_LIST: + statusList = Stream.of("status").collect(Collectors.toList()); + for (String status : statusList) { + if (mapDiff.containsKey(status)){ + isSave = true; + } + } + break; default: } if (!isSave && !isInsert) { diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/busi/MesShippingLoadingCheckService.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/busi/MesShippingLoadingCheckService.java index 3d78b26..2e63f96 100644 --- a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/busi/MesShippingLoadingCheckService.java +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/busi/MesShippingLoadingCheckService.java @@ -291,7 +291,7 @@ public class MesShippingLoadingCheckService implements IMesShippingLoadingCheckS @Override @MonitorLog public void update(MesLoadingListDetail item) { - vehiclesOrderRepository.update(item); + vehiclesOrderDetailRepository.update(item); } } diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/busi/MesSortShippingCheckService.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/busi/MesSortShippingCheckService.java index e9ee4ef..aaaed7f 100644 --- a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/busi/MesSortShippingCheckService.java +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/busi/MesSortShippingCheckService.java @@ -2,6 +2,7 @@ package cn.estsh.i3plus.ext.mes.pcn.apiservice.serviceimpl.busi; import cn.estsh.i3plus.ext.mes.pcn.api.busi.IMesJisShippingService; import cn.estsh.i3plus.ext.mes.pcn.api.busi.IMesProduceSnExtService; +import cn.estsh.i3plus.ext.mes.pcn.api.busi.IMesShippingLoadingCheckService; import cn.estsh.i3plus.ext.mes.pcn.api.busi.IMesSortShippingCheckService; import cn.estsh.i3plus.ext.mes.pcn.apiservice.aspect.MonitorLog; import cn.estsh.i3plus.ext.mes.pcn.apiservice.util.MesPcnException; @@ -61,6 +62,9 @@ public class MesSortShippingCheckService implements IMesSortShippingCheckService private MesLoadingListDetailRepository listDetailRepository; @Autowired + private IMesShippingLoadingCheckService shippingLoadingCheckService; + + @Autowired private MesJisShippingRepository jisShippingRepository; @Autowired @@ -69,6 +73,8 @@ public class MesSortShippingCheckService implements IMesSortShippingCheckService private MesLoadingListDetailRepository vehiclesOrderDetailRepository; @Autowired private MesLoadingListRepository vehiclesOrderRepository; + + @Override public MesSortShippingCheckModel queryShippingOrderNo(MesShippingOrderManagement shippingOrderManagement) { @@ -447,7 +453,7 @@ public class MesSortShippingCheckService implements IMesSortShippingCheckService if (!StringUtils.isEmpty(loadingList)) { loadingList.setStatus(MesExtEnumUtil.MES_LOADING_STATUS.ASN.getValue()); ConvertBean.serviceModelUpdate(loadingList, userName); - vehiclesOrderRepository.save(loadingList); + shippingLoadingCheckService.update(loadingList); } } diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/equiplog/MesEquipmentLogExtService.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/equiplog/MesEquipmentLogExtService.java index f213ccc..3693f3e 100644 --- a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/equiplog/MesEquipmentLogExtService.java +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/equiplog/MesEquipmentLogExtService.java @@ -61,7 +61,14 @@ public class MesEquipmentLogExtService implements IMesEquipmentLogExtService { else DdlPreparedPack.getInPackList(equipVariableIdList, MesPcnExtConstWords.EQUIP_VARIABLE_ID, packBean); equipmentLogRepository.updateByProperties(new String[]{MesPcnExtConstWords.EQUIP_VARIABLE_STATUS}, new Object[]{MesExtEnumUtil.EQUIP_VARIABLE_NEED_NEW_VALUE.TRUE.getValue()}, packBean); } + @Override + public void updateEquipmentLogValue(String organizeCode, Integer equipId, Long equipVariableId, String value) { + DdlPackBean packBean = DdlPackBean.getDdlPackBean(organizeCode); + DdlPreparedPack.getNumEqualPack(equipId, MesPcnExtConstWords.EQUIP_ID, packBean); + DdlPreparedPack.getNumEqualPack(equipVariableId, MesPcnExtConstWords.EQUIP_VARIABLE_ID, packBean); + equipmentLogRepository.updateByProperties(new String[]{"equipVariableValue",MesPcnExtConstWords.EQUIP_VARIABLE_STATUS}, new Object[]{value, MesExtEnumUtil.EQUIP_VARIABLE_NEED_NEW_VALUE.TRUE.getValue()}, packBean); + } @Override public Boolean checkEquipQuality(Integer quality) { return MesExtEnumUtil.EQUIP_LOG_QUALITY.checkQuality(quality); diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/mqtt/MqttServiceImpl.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/mqtt/MqttServiceImpl.java new file mode 100644 index 0000000..48e9385 --- /dev/null +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/mqtt/MqttServiceImpl.java @@ -0,0 +1,60 @@ +package cn.estsh.i3plus.ext.mes.pcn.apiservice.serviceimpl.mqtt; + +import cn.estsh.i3plus.ext.mes.pcn.api.mqtt.MqttService; +import cn.estsh.i3plus.ext.mes.pcn.apiservice.mqtt.PcnMqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class MqttServiceImpl implements MqttService { + + @Autowired + private PcnMqttClient myMqttClient; + + @Override + public void addTopic(String topic) { + myMqttClient.subscribe(topic); + } + + @Override + public void removeTopic(String topic) { + myMqttClient.cleanTopic(topic); + } + + @Override + public void publish(String msgContent, String topic) { + /*try { + myMqttClient.unconnect(); + } catch (MqttException e) { + e.printStackTrace(); + }*/ + /* //MyXxxMqttMsg 转Json + EquipLogMqttMsg myXxxMqttMsg = new EquipLogMqttMsg(); + myXxxMqttMsg.setContent(msgContent); + myXxxMqttMsg.setTimestamp(System.currentTimeMillis()); + // TODO Md5值 + myXxxMqttMsg.setMd5(UUID.randomUUID().toString()); + String msgJson = JSON.toJSONString(myXxxMqttMsg); + + //发布消息 + myMqttClient.publish(msgJson, topic);*/ + } + + @Override + public void unconnect() { + try { + myMqttClient.unconnect(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + @Override + public void connect() { + try { + myMqttClient.connect(); + } catch (MqttException e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/util/SpringUtils.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/util/SpringUtils.java new file mode 100644 index 0000000..d0cf3ee --- /dev/null +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/util/SpringUtils.java @@ -0,0 +1,129 @@ +package cn.estsh.i3plus.ext.mes.pcn.apiservice.util; + +import org.springframework.aop.framework.AopContext; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +@Component +public class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware { + /** + * Spring应用上下文环境 + */ + private static ConfigurableListableBeanFactory beanFactory; + + private static ApplicationContext applicationContext; + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { + SpringUtils.beanFactory = beanFactory; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringUtils.applicationContext = applicationContext; + } + + /** + * 获取对象 + * + * @param name + * @return Object 一个以所给名字注册的bean的实例 + * @throws org.springframework.beans.BeansException + */ + @SuppressWarnings("unchecked") + public static T getBean(String name) throws BeansException { + return (T) beanFactory.getBean(name); + } + + /** + * 获取类型为requiredType的对象 + * + * @param clz + * @return + * @throws org.springframework.beans.BeansException + */ + public static T getBean(Class clz) throws BeansException { + T result = (T) beanFactory.getBean(clz); + return result; + } + + /** + * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true + * + * @param name + * @return boolean + */ + public static boolean containsBean(String name) { + return beanFactory.containsBean(name); + } + + /** + * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException) + * + * @param name + * @return boolean + * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + */ + public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { + return beanFactory.isSingleton(name); + } + + /** + * @param name + * @return Class 注册对象的类型 + * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + */ + public static Class getType(String name) throws NoSuchBeanDefinitionException { + return beanFactory.getType(name); + } + + /** + * 如果给定的bean名字在bean定义中有别名,则返回这些别名 + * + * @param name + * @return + * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + */ + public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { + return beanFactory.getAliases(name); + } + + /** + * 获取aop代理对象 + * + * @param invoker + * @return + */ + @SuppressWarnings("unchecked") + public static T getAopProxy(T invoker) { + return (T) AopContext.currentProxy(); + } + + /** + * 获取当前的环境配置,无配置返回null + * + * @return 当前的环境配置 + */ + public static String[] getActiveProfiles() { + return applicationContext.getEnvironment().getActiveProfiles(); + } + + /** + * 获取当前的环境配置,当有多个环境配置时,只获取第一个 + * + * @return 当前的环境配置 + */ + public static String getActiveProfile() { + final String[] activeProfiles = getActiveProfiles(); + if (activeProfiles == null) { + return null; + } + return activeProfiles[0]; + } + +} diff --git a/modules/i3plus-ext-mes-pcn-pojo/src/main/java/cn/estsh/i3plus/ext/mes/pcn/pojo/mqtt/EquipLogMqttMsg.java b/modules/i3plus-ext-mes-pcn-pojo/src/main/java/cn/estsh/i3plus/ext/mes/pcn/pojo/mqtt/EquipLogMqttMsg.java new file mode 100644 index 0000000..cf90a02 --- /dev/null +++ b/modules/i3plus-ext-mes-pcn-pojo/src/main/java/cn/estsh/i3plus/ext/mes/pcn/pojo/mqtt/EquipLogMqttMsg.java @@ -0,0 +1,35 @@ +package cn.estsh.i3plus.ext.mes.pcn.pojo.mqtt; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class EquipLogMqttMsg implements Serializable { + + private static final long serialVersionUID = -8303548938481407659L; + + /** + * 设备id + */ + private String PTCode; + + /** + * 点位地址 + */ + private String key; + + /** + * 点位地址 + */ + private String tagAddress; + /** + * 值 + */ + private String value; + + /** + * 时间戳 + */ + private String time; +} diff --git a/pom.xml b/pom.xml index 9e0fe1b..300c220 100644 --- a/pom.xml +++ b/pom.xml @@ -29,9 +29,7 @@ INFO true UTF-8 - + 1.0.1-YZ 1.0.0-yfai @@ -142,6 +140,11 @@ brave 5.6.4 + + org.springframework.integration + spring-integration-mqtt + 6.1.2 +