Skip to content

编解码举例 - Protobuf

本页通过一个例子来让您了解在 EMQX 中使用 Protobuf 编码的消息数据是如何通过编解码进行格式转换并在规则引擎中进行匹配的。

解码场景

设备发布一个使用 Protobuf 编码的二进制消息,需要通过规则引擎匹配过后,将消息重新发布到与 name 字段相关的主题上。主题的格式为 person/${name}

比如,将 name 字段为 Shawn 的消息重新发布到主题 person/Shawn

创建 Schema

为了使规则引擎能够正确地解码或编码 Protobuf 消息,您需要先在 Schema Registry 中注册一个用于定义 Protobuf 消息结构的 Schema。

  1. 在 Dashboard 左侧导航栏中选择数据智能中心 -> Schema Registry

  2. 内部 Schema 页签下点击创建

  3. 输入 Schema 的名称,例如:protobuf_person。名称将用于编解码函数中。

  4. 选择 Schema 类型:选择 Protobuf

  5. 选择一种创建方式。支持以下两种模式:

    TIP

    本页中的示例使用输入模式。

    • 输入模式(适用于简单的 schema):

      • 选择输入作为创建方式。

      • 直接在 Schema 输入框中粘贴 Protobuf 定义,例如:

        protobuf
        message Person {
          required string name = 1;
          required int32 id = 2;
          optional string email = 3;
        }
    • 上传 Protobuf 包(适用于复杂或多文件 schema):

      • 选择上传 Protobuf 包作为创建方式。
      • 点击选择文件上传包含多个 .proto 文件的 .tar.gz 压缩包。
      • 主入口文件字段中填写主入口的 .proto 文件名称(例如:person.proto)。该文件必须位于上传的压缩包的根目录下。
  6. 点击创建

创建规则

  1. 在 Dashboard 左侧导航栏中选择数据集成 -> 规则

  2. 规则页面,点击右上角的创建

  3. SQL 编辑器中,使用刚才创建好的 Schema 编写规则 SQL 语句:

    sql
    SELECT
      schema_decode('protobuf_person', payload, 'Person') as person, payload
    FROM
      "t/#"
    WHERE
      person.name = 'Shawn'

    这里的关键点在于 schema_decode('protobuf_person', payload, 'Person'):

    • schema_decode 函数将 payload 字段的内容按照 'protobuf_person' 这个 Schema 来做解码;
    • as person 将解码后的值保存到变量 person 里;
    • 最后一个参数 Person 指明了 payload 中的消息的类型是 protobuf schema 里定义的 'Person' 类型。
  4. 点击添加动作,在动作下拉框中选择消息重发布

  5. 主题 文本框中输入 person/${person.name} 作为目标主题。

  6. Payload 中使用消息内容模板 ${person}

这个动作将解码之后的 “Person” 消息 以 JSON 的格式发送到 person/${person.name} 这个主题。其中${person.name} 是个变量占位符,将在运行时被替换为消息内容中 name 字段的值。

准备设备端代码

规则创建好之后,您可以模拟数据进行测试。

下面的代码使用 Python 语言填充了一个 Person 消息并编码为二进制数据,然后将其发送到 t/1 主题。详见完整代码

python
def publish_msg(client):
    p = person_pb2.Person()
    p.id = 1
    p.name = "Shawn"
    p.email = "[email protected]"
    message = p.SerializeToString()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)

检查规则执行结果

  1. 在 Dashboard 页面,点击左侧导航目录中的 问题分析 -> WebSocket 客户端

  2. 填写当前 EMQX 的连接信息。

    • 如果 EMQX 在本地运行,可直接使用默认配置。
    • 如果您修改过 EMQX 的默认配置,如修改过访问规则的配置,则需要输入用户名和密码。
  3. 点击连接,作为 MQTT 客户端连接到 EMQX。

  4. 订阅区域,在主题 中输入 person/#,点击订阅

  5. 安装 python 依赖,并执行设备端代码:

    shell
    $ pip3 install protobuf
    $ pip3 install paho-mqtt
    
    $ python3 ./pb2_mqtt.py
    Connected with result code 0
    publish to topic: t/1, payload: b'\n\x05Shawn\x10\x01\x1a\[email protected]'
    t/1 b'\n\x05Shawn\x10\x01\x1a\[email protected]'
  6. 检查 Websocket 端收到主题为 person/Shawn 的消息:

    json
    {"email":"[email protected]","id":1,"name":"Shawn"}

编码场景

设备订阅了主题为 protobuf_out 的消息,希望收到使用 Protobuf 编码的二进制消息。规则引擎将对消息进行编码并发送到相关主题。

创建 Schema

使用在解码场景创建的 Schema

创建规则

  1. 在 Dashboard 左侧导航栏中选择数据集成 -> 规则

  2. 规则页面,点击右上角的创建

  3. SQL 编辑器中,使用刚才创建好的 Schema 编写规则 SQL 语句:

    sql
    SELECT
      schema_encode('protobuf_person', json_decode(payload), 'Person') as protobuf_person
    FROM
      "protobuf_in"

    这里的关键点在于 schema_encode('protobuf_person', json_decode(payload), 'Person'):

    • schema_encode 函数将 payload 字段的内容按照 protobuf_person 这个 Schema 来编码;
    • as protobuf_person 将编码后的值保存到变量 protobuf_person 里;
    • json_decode(payload) 用来对 payload 进行解码,因为 schema_encode 的输入必须是 Map 数据格式,而 payload 通常是一个 JSON 编码的二进制消息。
    • 最后一个参数 Person 指明了 payload 中的消息的类型是 protobuf schema 里定义的 Person 类型。
  4. 点击添加动作,在动作下拉框中选择消息重发布

  5. 主题文本框中输入 protobuf_out 作为目标主题。

  6. Payload 中使用消息内容模板 ${protobuf_person}

这个动作将 Protobuf 编码的消息发送到 protobuf_out 这个主题。其中${protobuf_person} 是个变量占位符,将在运行时被替换为经 schema_encode 编码后的值 (一个二进制的值)。

准备设备端代码

规则创建好之后,您可以模拟数据进行测试。

下面的代码使用 Python 语言填充了一个 Person 消息并编码为二进制数据,然后将其发送到 protobuf_in 主题。详 完整代码

python
def publish_msg(client):
    p = person_pb2.Person()
    p.id = 1
    p.name = "Shawn"
    p.email = "[email protected]"
    message = p.SerializeToString()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)

检查规则执行结果

  1. 在 Dashboard 页面,点击左侧导航目录中的问题分析 -> WebSocket 客户端

  2. 填写当前 EMQX 的连接信息。

    • 如果 EMQX 在本地运行,可直接使用默认配置。
    • 如果您修改过 EMQX 的默认配置,如修改过访问规则的配置,则需要输入用户名和密码。
  3. 点击连接,作为 MQTT 客户端连接到 EMQX。

  4. 发布区域,在主题中输入 protobuf_in,在 Payload 中输入以下消息:

    json
    {"name":"Shawn","id":1,"email":"[email protected]"}
  5. 点击发布

  6. 安装 python 依赖,并执行设备端代码:

    shell
    $ pip3 install protobuf paho-mqtt
    
    $ python3 protobuf_mqtt_sub.py
    Connected with result code 0
    msg payload b'\n\x05Shawn\x10\x01\x1a\[email protected]'
    protobuf_out name: "Shawn"
    id: 1
    email: "[email protected]"
OSZAR »