上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

开启物联网的魔法之门 - 深入探索发布订阅模式

guduadmin321月前

文章目录

  • MQTT 发布/订阅模式
  • MQTT 发布/订阅中的消息路由
  • MQTT 与 HTTP 请求响应
  • MQTT 与消息队列
  • Paho Java 使用示例
  • 结语

    开启物联网的魔法之门 - 深入探索发布订阅模式,在这里插入图片描述,第1张

    MQTT 发布/订阅模式

    发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。

    MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。

    在 MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者,如果当前不存在任何订阅,那么消息将被直接丢弃。

    MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。

    • 发布者(Publisher)

      负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。

    • 订阅者(Subscriber)

      订阅者通过订阅主题接收消息,且可一次订阅多个主题。MQTT 还支持通过共享订阅的方式在多个订阅者之间实现订阅的负载均衡。

    • 代理(Broker)

      负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外,代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。

    • 主题(Topic)

      主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠 / 进行分层,比如 sensor/1/temperature。一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。

      MQTT 还支持订阅者使用主题通配符一次订阅多个主题。

      开启物联网的魔法之门 - 深入探索发布订阅模式,在这里插入图片描述,第2张

      MQTT 发布/订阅中的消息路由

      在 MQTT 发布/订阅模式中,一个客户端既可以是发布者,也可以是订阅者,也可以同时具备这两个身份。 当客户端发布一条消息时,它会被发送到代理,然后代理将消息路由到该主题的所有订阅者。 当客户端订阅一个主题时,它会收到代理转发到该主题的所有消息。

      一般来说,大多数发布/订阅系统主要通过以下两种方式过滤并路由消息。

      • 根据主题

        订阅者向代理订阅自己感兴趣的主题,发布者发布的所有消息中都会包含自己的主题,代理根据消息的主题判断需要将消息转发给哪些订阅者。

      • 根据消息内容

        订阅者定义其感兴趣的消息的条件,只有当消息的属性或内容满足订阅者定义的条件时,消息才会被投递到该订阅者。

        MQTT 协议是基于主题进行消息路由的,在这个基础上,EMQX 从 3.1 版本开始通过基于 SQL 的规则引擎提供了额外的按消息内容进行路由的能力。

        MQTT 与 HTTP 请求响应

        HTTP 是万维网数据通信的基础,其简单易用无客户端依赖,被广泛应用于各个行业。在物联网领域,HTTP 也可以用于连接物联网设备和 Web 服务器,实现设备的远程监控和控制。

        虽然使用简单、开发周期短,但是基于请求响应的 HTTP 在物联网领域的应用却有一定的局限性。首先,协议层面 HTTP 报文相较与 MQTT 需要占用更多的网络开销;其次,HTTP 是一种无状态协议,这意味着服务器在处理请求时不会记录客户端的状态,也无法实现从连接异常断开中恢复;最后,请求响应模式需要通过轮询才能获取数据更新,而 MQTT 通过订阅即可获取实时数据更新。

        发布订阅模式的松耦合特性,也给 MQTT 带来了一些副作用。由于发布者并不知晓订阅者的状态,因此发布者也无法得知订阅者是否收到了消息,或者是否正确处理了消息。为此,MQTT 5.0 增加了请求响应特性,以实现订阅者收到消息后向某个主题发送应答,发布者收到应答后再进行后续操作。

        MQTT 与消息队列

        尽管 MQTT 与消息队列的很多行为和特性非常接近,比如都采用发布/订阅模式,但是他们面向的场景却有着显著的不同。消息队列主要用于服务端应用之间的消息存储与转发,这类场景往往数据量大但客户端数量少。MQTT 是一种消息传输协议,主要用于物联网设备之间的消息传递,这类场景的特点是海量的设备接入、管理与消息传输。

        在一些实际的应用场景中,MQTT 与消息队列往往会被结合起来使用,以使 MQTT 服务器能专注于处理设备的连接与设备间的消息路由。比如先由 MQTT 服务器接收物联网设备上报的数据,然后再通过消息队列将这些数据转发到不同的业务系统进行处理。

        不同于消息队列,MQTT 主题不需要提前创建。MQTT 客户端在订阅或发布时即自动的创建了主题,开发者无需再关心主题的创建,并且也不需要手动删除主题。

        Paho Java 使用示例

        通过包管理工具 Maven 可以方便地安装 Paho Java 客户端库,截止目前最新版本安装如下:

        
          org.eclipse.paho
        	org.eclipse.paho.client.mqttv3
        	1.2.2
        
        

        Java 体系中 Paho Java 是比较稳定、广泛应用的 MQTT 客户端库,本示例包含 Java 语言的 Paho Java 连接 EMQX Broker,并进行消息收发完整代码:

        public class App {
            public static void main(String[] args) {
                String subTopic = "testtopic/#";
                String pubTopic = "testtopic/1";
                String content = "Hello World";
                int qos = 2;
                String broker = "tcp://broker.emqx.io:1883";
                String clientId = "emqx_test";
                MemoryPersistence persistence = new MemoryPersistence();
                try {
                    MqttClient client = new MqttClient(broker, clientId, persistence);
                    // MQTT 连接选项
                    MqttConnectOptions connOpts = new MqttConnectOptions();
                    connOpts.setUserName("emqx_test");
                    connOpts.setPassword("emqx_test_password".toCharArray());
                    // 保留会话
                    connOpts.setCleanSession(true);
                    // 设置回调
                    client.setCallback(new PushCallback());
                    // 建立连接
                    System.out.println("Connecting to broker: " + broker);
                    client.connect(connOpts);
                    System.out.println("Connected");
                    System.out.println("Publishing message: " + content);
                    // 订阅
                    client.subscribe(subTopic);
                    // 消息发布所需参数
                    MqttMessage message = new MqttMessage(content.getBytes());
                    message.setQos(qos);
                    client.publish(pubTopic, message);
                    System.out.println("Message published");
                    client.disconnect();
                    System.out.println("Disconnected");
                    client.close();
                    System.exit(0);
                } catch (MqttException me) {
                    System.out.println("reason " + me.getReasonCode());
                    System.out.println("msg " + me.getMessage());
                    System.out.println("loc " + me.getLocalizedMessage());
                    System.out.println("cause " + me.getCause());
                    System.out.println("excep " + me);
                    me.printStackTrace();
                }
            }
        }
        

        回调消息处理类 OnMessageCallback.java

        public class OnMessageCallback implements MqttCallback {
            public void connectionLost(Throwable cause) {
                // 连接丢失后,一般在这里面进行重连
                System.out.println("连接断开,可以做重连");
            }
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // subscribe后得到的消息会执行到这里面
                System.out.println("接收消息主题:" + topic);
                System.out.println("接收消息Qos:" + message.getQos());
                System.out.println("接收消息内容:" + new String(message.getPayload()));
            }
            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("deliveryComplete---------" + token.isComplete());
            }
        }
        

        结语

        MQTT 的发布/订阅机制可以很轻易地满足我们一对一、一对多、多对一的通信需要。这也在很大程度上拓宽了 MQTT 在 IoT 领域之外的应用,像网络直播互动、手机消息推送等行业场景,都非常适合使用 MQTT。

        链接来源:https://www.emqx.com/zh/blog/mqtt-5-introduction-to-publish-subscribe-model

网友评论

搜索
最新文章
热门文章
热门标签
 
 象征鸟  梦见拉屎周公解梦原版  梦见抬棺材出殡的队伍是什么征兆