作者:Philipp Sacha 2023年1月4日在 AWS IoT Core 的 技术指南、思想领导力 中。永久链接
AWS IoT Core 连接了互联网设备IoT与 AWS IoT 及其他 AWS 服务。设备和客户端可以通过 MQTT 协议发布和订阅消息。其中,像 AWS IoT 设备 SDK 这样的 MQTT 库,提供了开源库、带示例的开发者指南以及移植指南,让您可以在选择的硬件平台上构建创新的 IoT 产品或解决方案。
用户常常询问如何使用 AWS IoT Core 消息代理与标准 MQTT 库。这种请求可能出于希望从其他 MQTT 代理迁移到 AWS IoT Core,同时仍在使用标准 MQTT 库,或者他们可能已经在使用标准 MQTT 库。
在本文中,您将学习如何使用 Python、Nodejs 或 Java 等不同编程语言的标准 MQTT 库与 AWS IoT Core 消息代理进行交互。本文涵盖的 MQTT 库支持 MQTT 协议版本 5。要开始使用并了解有关 AWS IoT Core 的更多信息,请参考 技术文档。
阅读时间: 8分钟学习水平: 300涉及服务: AWS IoT Core
要执行本文中的操作,您需要拥有 AWS 账户 和管理 IoT 设备的权限。
在以下示例中,您将使用名为 mqtt5 的设备进行演示。为了进行设备身份验证,您将使用不是由 AWS IoT Core 颁发的 X509 证书。请使用 openssl 创建证书,并将其注册到 AWS IoT Core无需 CA。为了方便使用,创建一个开放的 IoT 策略。通常,您应使用遵循 最低权限原则 的权限。
使用以下命令创建设备。
bash
THINGNAME=mqtt5
aws iot creatething thingname THINGNAME
openssl req x509 newkey rsa2048 keyout THINGNAMEprivatekey out THINGNAMEcertificatepem sha256 days 365 nodes subj /CN=THINGNAME
aws iot registercertificatewithoutca certificatepem file//THINGNAMEcertificatepem status ACTIVE gt /tmp/registercertificatejsonCERTIFICATEARN=(jq r certificateArn /tmp/registercertificatejson)CERTIFICATEID=(jq r certificateId /tmp/registercertificatejson)
POLICYNAME={THINGNAME}Policyaws iot createpolicy policyname POLICYNAME policydocument {Version20121017Statement[{EffectAllowAction iotResource}]}
aws iot attachpolicy policyname POLICYNAME target CERTIFICATEARN
aws iot attachthingprincipal thingname THINGNAME principal CERTIFICATEARN
将您的 AWS IoT Core 端点赋值给一个 shell 变量。这使在本文示例中更容易使用您的端点。
bashexport IOTENDPOINT=(aws iot describeendpoint endpointtype iotDataATS query endpointAddress output text)
下载用于签署 AWS IoT Core 服务器证书的 根 CA 证书,并将其保存在文件 AmazonRootCA1pem 中。
MQTT CLI 是一个由 HiveMQ 支持的开源项目。它支持 MQTT 311 和 MQTT 50。您可以使用 MQTT CLI 与 AWS IoT Core 消息代理进行交互。HiveMQ MQTT CLI 命令为 mqtt。
订阅
使用 MQTT 版本 5 订阅主题 hivemq/with/aws。
bashmqtt sub h IOTENDPOINT p 8883 cafile AmazonRootCA1pem cert mqtt5certificatepem key mqtt5privatekey d V 5 q 0 t hivemq/with/aws
发布
让订阅者保持运行并向主题 hivemq/with/aws 发布消息。您应该能够在订阅者上看到收到的消息。以下命令发布一条消息。可以多次执行该命令以发布任意数量的消息。
bashmqtt pub h IOTENDPOINT p 8883 cafile AmazonRootCA1pem cert mqtt5certificatepem key mqtt5privatekey d V 5 q 0 t hivemq/with/aws m {mqtt5 arrived client lib hivemq date (date)}
您将看到所有发布的消息都出现在订阅者上。
Eclipse Mosquitto 是一个开源消息代理,提供发布 mosquittopub 和订阅 mosquittosub 客户端。
通过 mosquittosub 订阅主题
使用 mosquittosub 订阅主题,同时使用 mosquittopub 向同一主题发布。AWS IoT Core 会将消息从发布者路由到订阅者。
订阅
使用 mosquittosub 客户端订阅主题 mosquitto/with/aws。
bashmosquittosub cafile AmazonRootCA1pem cert THINGNAMEcertificatepem key THINGNAMEprivatekey h IOTENDPOINT p 8883 q 0 t mosquitto/with/aws i {THINGNAME}sub tlsversion tlsv12 d V mqttv5
发布

使用 mosquittopub 客户端发布多条消息。要发布多条消息,请创建一个包含消息的文件,mosquittopub 将从该文件中读取消息。
创建名为 messagesjson 的文件,并填入以下内容。
json{mqtt5 arrived message 1}{mqtt5 arrived message 2}{mqtt5 arrived message 3}
使用主题别名发布消息。主题别名是可以替代主题名称的整数。在以下示例中,使用主题别名 2 替代主题 mosquitto/with/aws。
bashcat messagesjson mosquittopub cafile AmazonRootCA1pem cert THINGNAMEcertificatepem key THINGNAMEprivatekey h IOTENDPOINT p 8883 q 0 t mosquitto/with/aws i THINGNAME tlsversion tlsv12 d V mqttv5 D publish topicalias 2 l
发布请求的输出应如下所示:
textClient mqtt5 sending CONNECTClient mqtt5 received CONNACK (0)Client mqtt5 sending PUBLISH (d0 q0 r0 m1 mqtt5 (36 bytes))Client mqtt5 sending PUBLISH (d0 q0 r0 m2 (null) (36 bytes))Client mqtt5 sending PUBLISH (d0 q0 r0 m3 (null) (36 bytes))Client mqtt5 sending DISCONNECT
只有第一次发布请求包含主题名称,后续请求中将收到 (null),表示使用了主题别名。
在订阅者处,您可以观察到发布者发送的消息。
以下代码片段展示了如何使用 AWS IoT Core Eclipse Paho Python 客户端 库。首先连接到 AWS IoT Core。成功连接后,可以计算主题别名并订阅主题,然后向主题发布消息。
MQTT 客户端
创建一个 MQTT 版本 5 客户端,设置成功连接和消息到达的处理程序,并连接到 AWS IoT Core 端点。在此示例中,根 CA 证书、设备证书、设备密钥及端点信息由命令行选项传入。
pythonmqttc = mqttClient(protocol=mqttMQTTv5)mqttctlsset( cacerts=argsca certfile=argscertificate keyfile=argskey tlsversion=2)mqttconconnect = onconnectmqttconmessage = onmessagemqttcconnect(argsendpoint 8883 60)
连接处理程序
连接后,MQTT 客户端将收到来自 AWS IoT Core 的 CONNACK 消息,其中包括最大可用主题别名。根据最大可用主题别名,代码会随机生成一个在 0 到最大主题别名之间的主题别名。
pythondef onconnect(mqttc userdata flags rc properties=None) global TOPICALIASMAX LOGGERinfo(connected to endpoint s with result code s argsendpoint rc) LOGGERinfo(userdata s flags s properties s userdata flags properties) LOGGERinfo(topicaliasmaximum s propertiesTopicAliasMaximum) TOPICALIASMAX = propertiesTopicAliasMaximum mqttcisconnected = True
LOGGERinfo(subscribing to topic s argstopic)mqttcsubscribe(argstopic qos=0 options=None properties=None)
topicalias = randomSystemRandom()randint(0 TOPICALIASMAX)
当 onmessage 处理程序接收到消息时,消息将被记录。
pythondef onmessage(mqttc userdata msg) LOGGERinfo(received message topic s payload s msgtopic msgpayloaddecode())
为您的主题名称定义主题别名,您可以发布第一条消息,包括主题别名和主题名。所有后续消息将在循环中使用主题别名发布。
pythonpropertiesTopicAlias = topicaliasmessage = jsondumps({mqttv5 has arrived datetime datetimenow()strftime(Ymd HMS) topicalias topicalias})LOGGERinfo(publish topic s message s argstopic message)mqttcpublish(argstopic payload=message qos=0 retain=False properties=properties)
while True message = jsondumps({mqttv5 has arrived datetime datetimenow()strftime(Ymd HMS) topicalias topicalias}) LOGGERinfo(publish topicalias s message s topicalias message) mqttcpublish( payload=message qos=0 retain=False properties=properties) timesleep(2)
在前面一节中,您学习了如何使用 Paho Python 客户端。Paho 还提供了一个 JavaScript 客户端,它使用 WebSockets 连接到 MQTT 代理。 MQTTjs 客户端库不仅支持 WebSockets,还支持使用证书认证的 TLS 连接。以下 Nodejs 代码片段假设您通过命令行提供根 CA、设备私钥和证书,以及 AWS IoT Core 端点和主题。在该示例中,您还将使用主题别名来发布消息。
MQTT 客户端
构建一个连接到 AWS IoT Core 的客户端,使用 MQTT 版本 5。
javascriptconsolelog(building client)const client = mqttconnect( mqtts// argve 8883 { key fsreadFileSync(argvk) cert fsreadFileSync(argvc) ca [fsreadFileSync(argvca)] protocolId MQTT protocolVersion 5 })
加速器官方下载连接处理程序
成功连接后,获取 AWS IoT Core 通知的最大主题别名。用随机函数计算主题别名,并设置发布属性以使用主题别名。订阅您要发布的主题并发布消息。
javascriptclienton(connect function () { topicAliasMax = clienttopicAliasSendnumberAllocatormax topicAlias = Mathfloor(Mathrandom() (topicAliasMax 0 1) 0) consolelog(topicAliasMax topicAliasMax topicAlias topicAlias) consolelog(subscribe to argvt) publishOptionspropertiestopicAlias = topicAlias
consolelog(subscribe topic argvt)clientsubscribe(argvt function (err) { if (err) { consolelog(subscribe error err) } else { var message = generateMessage() consolelog(publish first message to set topicAlias topicAlias topic argvt message message) clientpublish(argvt message publishOptions) }})})
接收消息
接收到的消息将被记录到控制台。
javascriptclienton(message (topic message) =gt { consolelog(message received subscription topic argvt topic topic message messagetoString())})
发布消息
使用主题别名持续发布消息。
javascriptsetInterval(function () { var message = generateMessage() consolelog(publish topic argvt message message) clientpublish( message publishOptions)} 5000)
您应该能够看到消息抵达订阅者,并显示在控制台中。
删除您创建的设备,以及相关的证书和 IoT 策略。您可以在 如何使用注册表管理设备 文档中找到详细步骤。
在本文中,您学习了如何使用标准 MQTT 库与 AWS IoT Core 配合使用,这些库已经支持 MQTT5。使用 AWS IoT 设备 SDK 简化并加速了运行在连接设备上的代码开发,因为它们包括了便于使用 AWS IoT 功能的方法,例如 AWS IoT Greengrass 发现、自定义身份验证 和 设备影子。AWS IoT 设备 SDK 免费作为开源项目提供。
Philipp Sacha 是亚马逊网络服务的合作伙伴解决方案架构师,专注于制造领域的合作伙伴。他于2015年加入AWS,并担任过多个角色,包括解决方案架构师和物联网领域的专家。
标签 AWS IoT,[AWS IoT 最佳实践](https