This content originally appeared on Level Up Coding - Medium and was authored by Nicholas Basker
Message queue is a very important tool in building micro services application systems that operate across different process spaces. It provides a non-blocking communication scheme between any two entities that supports asynchronous handling resulting in increased concurrency. As a part of an IoT-Edge project that I worked in the recent past, there was a need to enrich data produced by multiple sources. There were different enrichment methodologies needed and each of them was done by a specific but different process. This article describes the problem scenario where the Multi-Input-Multi-Output (MIMO) message queue was used and gives a detailed overview of the implementation that multiplexes messages between more than one publishers (or producers) and subscribers (or consumers).
Data Enrichment at IoT Edge Devices
As IoT is mainstream, the number of devices producing data increases, a lot of processing is expected to be done at the “Edge”. This thinking has led to creation of Linux Foundation Edge consortium (https://www.lfedge.org/projects/) that has at least 9 projects focussed on IoT Edge. Further the IoT devices at the edge perform extraction and transformation of data that is consolidated and made available for processing at the next higher software layer in the IoT device hierarchy. This phenomenon causes the IoT Edge software to be fairly sophisticated and collect data and enrich them.
Consider the generic Edge Device architecture from IBM shown in Figure-2, there are typically multiple sensors and actuators in a device which are publishers (aka producers) of data. This data is being consumed by subscribers such as “Edge Analytics Service”, “AI Service”, “Edge Application” etc. This figure can be simplified and visualised in terms of producers and consumers as shown in Figure-3 below.
Here all the three functional modules such as Analytics, AI/ML, and Application need the data. The data could form one single Laser or any other component that is being monitored. Figure-3 is a simplified diagram as there can be 100s of sensors data that shall be collected by the IoT Edge device. The important thing to note here is that every functional module needs data from every producer and each of them enrich the collected data uniquely specific to that module. The data thus generated could be reused by another module or stored in the database for further use. A single robust message queue is optimal and avoids the need for n x m (publishers x subscribers) connections needed. With this understanding of the requirement or the problem to solve, in the next section let us look at the tools and options available.
Message Queue options for IoT Edge Device
There are a number of open source message queues (aka brokers) such as Apache Kafka, Apache Pulsar, Rabbit MQ etc… that are available as tools for software developers. In addition, the cloud infrastructure vendors provide message queue implementations and some popular ones are SQS from AWS and Google Cloud Pub/Sub. Kafka, Pulsar and Rabbit-MQ are typically used in a distributed system setup where the producers and consumers are themselves implemented in large scaled out micro-services implementations. In addition, the message-queue service itself scales to very high data-rates and is implemented as micro-services. These services provide topic/partitions and other schemes and are managed by a configuration maintenance service like zookeeper. Such a heavy duty message queue is not needed at the IoT edge device which is a single node setup. Additionally, cloud services are also a stretch for this specific IoT edge application.
One possible open source implementation that could be used in this context is Zero Message Queues (https://zeromq.org/). The MQTT (https://mqtt.org/) standard is also a very popular message-queue protocol used in IoT context and can be considered for the use case described in the previous section. The ZMQ [REF-3] is a good option recommended for IoT Edge application. However, in this article, a simple message queue implementation using linked list is discussed. A sample skeleton implementation in Python and Go are provided with comments. In both Python and Go concurrency is exploited using asyncio/await and goroutines respectively.
Message Queue Using Linked List
The Message Queue data structure should support taking in multiple inputs via multiple publishers (aka producers) and be able to supply the produced messages to multiple outputs when many subscribers (aka consumers) are connected to it. Every produced message should be supplied to every consumer registered with the Message Queue entity. The order of messages coming from a specific publisher should be maintained across all consumers.
As depicted in Figure-4, the message queue is implemented using a singly linked list with Add-Pointer, Remove-Pointer and per-consumer Last-Consumed-Pointer. The message-queue operates using a retention time period and garbage collector time frequency. Every message that has crossed the retention period is eligible for removal provided all the consumers present have consumed the message. At regular intervals of garbage-collector time period, the messages are cleaned up. Each publisher/producer using the Add-Pointer adds the message so that order with respect to a producer is maintained. Each subscriber/consumer can operate and be scheduled independently and can consume the messages. A separate garbage collector routine is scheduled that cleans the messages based on retention time and consumption by the subscriber/consumer processes.
The message queue described can be implemented in multiple ways based on the programming language, platform, operating system, virtualisation and containerisation features that is supported by the IoT Edge system. This article covers two method, the first using Python3 based asyncio/await model and the second using go-routines and channels provided by the Go programming language.
MIMO Message Queue Using Python
One possible way to realise the strategy described in the previous section is to use a generic message structure that can be held in a singly linked list. The producers can add and consumers can remove as the read the messages. Since there are multiple producers and consumers, to make it concurrent asyncio is used with appropriate locks. Also each consumer may consume messages at a different frequency and hence a per-consumer tracker needs to be maintained. All these can be captured in the MessageQueue class that can be supplied to consumers and producers. The generic interfaces defined can be used to send and receive messages. A sample implementation is provided here and an usage example is also present in the same git repository. The [Ref-5] has more details.
MIMO Message Queue Using Go
In Go, the Message Queue is realised using the container/list and channels. Concurrency is achieved through go routines. The message node structure and the queue implementation is here. The [REF-6] has a more details.
Conclusion
A simple Multi Input Multi Output queue can be realised using the above technique that is light weight and can be used in systems with very small resource foot print. However I would recommend using ZMQ as there could be new requirements and scalability needs that would come into any software project that starts small but grows quickly.
References
- [REF-1] Asynchronous Messaging Primer https://docs.microsoft.com/en-us/previous-versions/msp-n-p/dn589781(v=pandp.10)#sending-and-receiving-messages-by-using-a-message-queue
- [REF-2] IoT Edge Device Reference Architecture https://www.ibm.com/cloud/blog/architecting-at-the-edge
- [REF-3] Distributed Tasks with Python ZeroMQ https://betterprogramming.pub/create-zero-point-failure-distributed-tasks-with-python-and-zeromq-e2a20941d85b
- [REF-4] ZMQ https://zeromq.org/
- [REF-5] https://github.com/nbasker/tools/tree/master/pymsgmuxer
- [REF-6] https://github.com/nbasker/tools/tree/master/gomsgmuxer
Message Queue Supporting Multiple Publisher/Subscriber Clients was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.
This content originally appeared on Level Up Coding - Medium and was authored by Nicholas Basker
Nicholas Basker | Sciencx (2021-08-06T18:42:19+00:00) Message Queue Supporting Multiple Publisher/Subscriber Clients. Retrieved from https://www.scien.cx/2021/08/06/message-queue-supporting-multiple-publisher-subscriber-clients/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.