raw AMQP to IoT Hub and IoT Edge

It seems like lately my life has consisted mostly of trying to figure out how to connect “brownfield or legacy systems” (that’s MSFT-speak for “doesn’t use our IoT device SDKs” :-)) to Azure IoT Hub or Azure IoT Edge or both. I’ve previously in other posts shown how to do it with raw MQTT, Mosquitto, and Node-Red.

I was recently asked by a customer for a sample of connecting a raw AMQP client to IoT Edge. So with unbridled optimism, I quickly did a web search to hunt down what surely already existed as a sample. Both google and bing quickly dashed my hope for that (as they so often do). Even StackOverflow, the best hope for all development-kind failed me! So I waded in to figure it out myself.

Setting the stage

Just for simplicity, I used the python uamqp library for this. This is the AMQP library (or at least the C version of it) that we use ourselves underlying IoT Hub and IoT Edge (and service bus, event hub, etc), so it seemed like a natural fit. And it also came with a sample that I could start from and adapt. The code further below and information in this post is based on that sample. The primary two issues with the sample out of the box was that it used a ‘hub-level’ key vs. a device-scoped key for authentication to IoT Hub (don’t do that!) and for some reason it was written to show device-bound (cloud to device) connectivity vs. cloud-bound (device to cloud, aka ‘telemetry’) messaging. So I adapted the sample for my needs, and will show the adaptations below.

While it took me a little time to figure things out, the two most complicated parts where

  • Figuring out the right AMQP connection string format to connect to IoT Hub/Edge. This is normally handled under the covers with our SDKs, but getting it right without the SDKs took a little research and trial/error
  • Figuring out how to get the client to trust the certificate chain that edgeHub presents to connecting clients for TLS connections (for more details on how Edge uses certs, see this article by my very favorite author!). This second bullet is only needed if you are connecting to IoT Edge. The right root-ca certs (i.e. Baltimore) are embedded in the uamqp library for IoT Hub itself.

The format for the AMQP connection string is actually already documented here by our engineering team (under the “protocol specifics” section), but it’s not called out very obviously like the entire sub-article we have for MQTT, so I actually missed it for a while. If you use a device-scoped key (which you generally should), the correct format for the AMQP connection string is:


amqps://[device_id]@sas.[short-hub-name]:[sas-token]@[target-endpoint]/[operation]

where:

  • [device_id] is an iot-hub registered device id for an IoT device
  • [short-hub-name] is the name of your IoT Hub *without* the .azure-devices-net
    • NOTE: the combination of device_id and short-hub-name, which collectively is the ‘username’ in the connection string, must be URL encoded before sent
  • [sas-token] is a SAS token generated for your device
  • [target-endpoint] is either the name of your IoT Hub *with* the .azure-devices.net in the case of an IoT Hub direction connection OR it’s the FQDN of your IoT Edge box in the case of connecting to IoT Edge (i.e. mygateway.contoso.local)
  • [operation] is the desired operation. For example, to send telemetry data, operation is /devices/[device id]/messages/events

Just to show an example of what the connection string looks like with a live device and hub, below is an example of one of mine (with a few random characters in the sas-token changed to protect my hub :-))


amqps://amqptest%40sas.sdbiothub1:SharedAccessSignature+sr%3Dsdbiothub1.azure-devices.net%252Fdevices%252Famqptest%26sig%3DyfStnV4tfi3p7xeUg2DCTSauZowQ90Gplq3hKFzTY10%253D%26se%3D1552015962@mygateway.contoso.local/devices/amqptest/messages/events

where:

  • amqptest is the device id of my device registered in IoT Hub
  • sdbiothub1 is the name of my IoT Hub
  • mygateway.contoso.local is the FQDN of my IoT Edge device (not really, but you don’t need to know the real one…)
  • /devices/amqptest/messages/events is the ‘operation’ I’m invoking, which in the case of IoT Hub/Edge means to send device-to-cloud telemetry data

The code

ok, enough pre-amble, let’s get to the code

NOTE:  Please note - strangely enough, as of this writing (3/7/2019) the code and post below will NOT actually work today.  During my work and investigation, and working with one of the IoT Edge engineers, we discovered a small bug in edgeHub that prevented the raw AMQP connection string from being parsed correctly. The bug has already been fixed, per this pull request, but the fix won't be publicly available until later this month in the next official release.  But, since I'm internal MSFT and "it's good to be the king", I was able to get a private build of edgeHub to test against.  I'll update this post once the fix is publicly available.  (technically, if you really want it, you can do your own private build of edgeHub, since it's open source

The first step in using the sample is to install the uamqp library, the instructions for which can be found here.

Below is my modified version of the sample that i started with. I tried to annotate any change I made with a preceding comment that starts with #steve, so you can just search for them to understand what I changed, or just use the sample directly


#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import os
import logging
import sys
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from time import time
from uuid import uuid4
try:
    from urllib import quote, quote_plus, urlencode #Py2
except Exception:
    from urllib.parse import quote, quote_plus, urlencode

import uamqp
from uamqp import utils, errors

#steve - added to share the SAS token and username broadly
sas_token = ''
auth_username = ''

def get_logger(level):
    uamqp_logger = logging.getLogger("uamqp")
    if not uamqp_logger.handlers:
        handler = logging.StreamHandler(stream=sys.stdout)
        handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
        uamqp_logger.addHandler(handler)
    uamqp_logger.setLevel(level)
    return uamqp_logger

log = get_logger(logging.DEBUG)

def _generate_sas_token(uri, policy, key, expiry=None):

    if not expiry:
        expiry = time() + 3600  # Default to 1 hour.
    encoded_uri = quote_plus(uri)
    ttl = int(expiry)
    sign_key = '%s\n%d' % (encoded_uri, ttl)
    signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest())
    result = {
        'sr': uri,
        'sig': signature,
        'se': str(ttl)}
    #if policy:
    #    result['skn'] = policy
    return 'SharedAccessSignature ' + urlencode(result)


def _build_iothub_amqp_endpoint_from_target(target, deviceendpoint):
#steve - reference global sas_token and auth_username because we will use it outside this function
    global sas_token
    global auth_username

    hub_name = target['hostname'].split('.')[0]

#steve - the format for a *device scoped* key for amqp is
# [deviceid]@sas.[shortiothubhubname]
# this is the same for both IoT Hub and IoT Edge.  This is a change from the original sample
# which used a 'hub scoped' key
    endpoint = "{}@sas.{}".format(target['device'], hub_name)
#steve - grab the username for use later..  before the URL-encoding below
    auth_username = endpoint

    endpoint = quote_plus(endpoint)
    sas_token = _generate_sas_token(target['hostname'] + deviceendpoint, target['key_name'],
                                    target['access_key'], time() + 36000)

#  steve - the first line below is used for talking to IoThub, the second for IoT Edge
#  basically we are just changing the connection endpoint
#    endpoint = endpoint + ":{}@{}".format(quote_plus(sas_token), target['hostname'])
    endpoint = endpoint + ":{}@{}".format(quote_plus(sas_token), target['edgehostname'])
    return endpoint

def test_iot_hub_send(live_iothub_config):
#steve - reference my globals set earlier
    global sas_token
    global auth_username

    msg_content = b"hello world"
    app_properties = {"test_prop_1": "value", "test_prop_2": "X"}
    msg_props = uamqp.message.MessageProperties()
#steve - honestly dunno what this property does :-), but we aren't going devicebound, so nuked it
#    msg_props.to = '/devices/{}/messages/devicebound'.format(live_iothub_config['device'])
    msg_props.message_id = str(uuid4())
    message = uamqp.Message(msg_content, properties=msg_props, application_properties=app_properties)

#steve - the original sample was set up for cloud-to-device communication.  I changed the 'operation'
# to be device-to-cloud by changing the operation to /devices/[device id]/messages/events
    #operation = '/messages/devicebound'
    deviceendpoint='/devices/{}'.format(live_iothub_config['device'])
    operation = deviceendpoint + '/messages/events'
    endpoint = _build_iothub_amqp_endpoint_from_target(live_iothub_config, deviceendpoint)

    target = 'amqps://' + endpoint + operation
    log.info("Target: {}".format(target))

#steve - this is where the magic happens for Edge.  We need a way to specify the
# path to the root ca cert used for the TLS connection to IoT Edge.  So we need to
# manually created the SASLPlain authentication object to be able to specify that
# and then pass it to the SendClient method below.  All of that is not necessary
# just to talk directly to IoT Hub as the root Baltimore cert for IoT Hub itself is buried
# somewhere in the uamqp library
# if you are connecting to IoT Hub directly, you can remove/comment this line
    auth_settings = uamqp.authentication.SASLPlain(live_iothub_config['edgehostname'], auth_username, sas_token, verify=live_iothub_config['edgerootcacert'])

#steve - for iot hub  (simple because we don't have to worry about the edge TLS cert
#    send_client = uamqp.SendClient(target, debug=True)
# for iot edge
    send_client = uamqp.SendClient(target, debug=True, auth=auth_settings)
    send_client.queue_message(message)
    results = send_client.send_all_messages()
    assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
    log.info("Message sent.")

if __name__ == '__main__':
    config = {}
#steve - changed from environment variables to hardcoded, just for this sample
#    config['hostname'] = os.environ['IOTHUB_HOSTNAME']
#    config['device'] = os.environ['IOTHUB_DEVICE']
#    config['key_name'] = os.environ['IOTHUB_SAS_POLICY']
#    config['access_key'] = os.environ['IOTHUB_SAS_KEY']
    config['hostname'] = 'your long iothub name'  # e.g. 'sdbiothub1.azure-devices.net'
    config['device'] = 'your device id'  # e.g. 'amqptest'
    config['key_name'] = ''  # leave empty string
    config['access_key'] = 'your primary or secondary device key'   # e.g 'P38y2x3vdWNGu7Fd9Tqq9saPgDry/kZTyaKmpy1XYhg='
#steve - the FQDN of your edge box (i.e. mygateway.local)
# it MUST match the 'hostname' parameter in config.yaml on your edge box
# otherwise TLS certificate validation will fail
    config['edgehostname'] = 'your FQDN for your edge box'  # e.g. 'mygateway.contoso.local'
#steve - path to the 'root ca' certificate used for the IoT Edge TLS cert
    config['edgerootcacert'] = 'the path to your root ca cert for edge'  # e.g. '/home/stevebus/edge/certs/azure-iot-test-only.root.ca.cert.pem'

    test_iot_hub_send(config)

The code is a little hard to read in blog format, so feel free to copy/paste into your favorite python editor to view it.

The key changes are to the line that generates the ‘username’ for the connection string, changing it from a ‘hub level’ key to a device-level key


endpoint = "{}@sas.{}".format(target['device'], hub_name)

and the two lines that allow me to customize the SASLPlain authentication information to add in the path to the IoT Edge root CA cert


auth_settings = uamqp.authentication.SASLPlain(live_iothub_config['edgehostname'], auth_username, sas_token, verify=live_iothub_config['edgerootcacert'])

send_client = uamqp.SendClient(target, debug=True, auth=auth_settings)

When you run the sample, you’ll see a ton of debug output. At the top you should see your connection string dumped out, but most importantly, if it works, you should see a line like this somewhere in the middle of the output


2019-03-07 19:00:15,893 uamqp.client DEBUG Message sent: <MessageSendResult.Ok: 0>, []

This indicates a successful sending of a message to IoT Hub/Edge.

Enjoy, and as always, feel free to ping me via my contact page and/or via comments here