Simple messaging framework using Go TCP server and Kafka
I needed to create a simple framework to provide my endpoint devices ( doesn’t matter which platform they run on ) the option to send and receive messages from my backend.
I require those messages to be managed by a message broker so that they can be processed in an asynchronous way.
The system contains 4 main layers, this article section is mainly about the first one:
1. TCP servers - Needs to maintain as many TCP sockets in synch with the endpoints as possible. All of the endpoints messages will be processed on a different layer by the message broker. This keeps the TCP servers layer very thin and effective. I also want to keep as many concurrent connection as possible, and Go is a good choice for this ( see this article)
2. Message broker - Responsible for delivering the messages between the TCP servers layer and the workers layer. I chose Apache Kafka for that purpose.
3. Workers layer - will process the messages through services exposed in the backend layer.
4. Backed services layer - An encapsulation of services required by your application such as DB, Authentication, Logging, external APIs and more.
So, this Go Server:
1. communicates with its endpoint clients by TCP sockets.
2. queues the messages in the message broker.
3. receives back messages from the broker after they were processed and sends response acknowledgment and/or errors to the TCP clients.
The full source code is available in : https://github.com/orrchen/go-messaging
I have also included a Dockerfile and a build script to push the image to your Docker repository.
Special thanks to the great go Kafka sarama library from Shopify.
The article is divided to sections representing the components of the system. Each component should be decoupled from the others in a way that allows you to read about a single component in a straight forward manner.
TCP Client
Its role is to represent a TCP client communicating with our TCP server.
|
|
Please notice that onConnectionEvent
and onDataEvent
are callbacks for the Struct that will obtain and manage Clients.
Our client will listen permanently using the listen()
function and response to new connections, new data received and connections terminations.
Kafka Consumer
Its role is to consume messages from our Kafka broker, and to broadcast them back to relevant clients by their uids.
In this example we are consuming from multiple topics using the cluster implementation of sarama.
Let’s define our Consumer
struct:
|
|
The constructor receives the callbacks and relevant details to connect to the topic:
|
|
It will consume permanently on a new goroutine inside the Consume()
function.
It reads from the Messages()
channel for new messages and the Notifications()
channel for events.
Kafka Producer
Its role is to produce messages to our Kafka broker.
In this example we are producing to a single topic.
This section is mainly inspired from the example in https://github.com/Shopify/sarama/blob/master/examples/http_server/http_server.go
Let’s define our Producer
Struct:
|
|
Producer
is constructed with the callbacks for error, and the details to connect to the Kafka broker including optional ssl configurations that are created with createTLSConfiguration
:
|
|
I decided to produce messages that are encoded to JSON and to ensure it before sending them:
|
|
And finally, we provide the functions to produce the message and close the producer:
|
|
TCP Server
Its role is to obtain and manage a set of Client
, and send and receive messages from them.
|
|
It is constructed simply with an address to bind to and the callbacks to send:
|
|
When a connection event occurs we process it and handle it, if it’s a new event we attach a new UID to the client.
If connection is terminated we delete this client.
In both cases we send the callbacks to notify about those events.
TcpServer
will listen permanently for new connections and new data with Listen()
, and support a graceful shutdown with Close()
.
We provide 2 options ot send data to our clients, by their device uid ( generated from the client side) with SendDataByDeviceUid
or by the client id which is generated in our system with SendDataByClientId
.
API
We need to create structs for the API that the tcp clients use, and the API for the messages sent to/from the messages broker.
For the TCP clients:
* DeviceRequest
* DeviceResponse
For the message broker:
* ServerRequest
* ServerResponse
Main function - putting it all together
Obtains and manages all the other components in this system. It will include the TCP server that holds an array of TCP clients, and a connection to the Kafka broker for consuming and sending messages to it.
Here are the main parts of main.go
file:
|
|
Build, run and deploy to Docker image
To build:
|
|
To run:
|
|
To build and run with Docker I first set this Dockerfile
:
|
|
And build and push to my Docker repository with the build.sh script.
|
|
Future improvements
Of course this is just a base framework, it lacks a few things mandatory for production environments which are mainly authentication, better logging, recovery from errors and input checking.
But I believe this might be a very useful start point for many developers who need this kind of a service, just like I needed it before implementing it :)
I will be very happy to read your thoughts and comments, happy holidays to all!
About the author:
Hi, my name is Orr Chen, a software engineer and a gopher for the past 3 years.
My first experience with Go was migrating the entire backend of my startup PushApps from Rails to Golang. Since then I am a big fun of the language!
Github: OrrChen
Twitter: OrrChen
LinkedIn: orrchen