Flux7 engineer Ahsan Ali and CTO Ali Hussain collaborated on this post
The rise of IoT has given rise to a new generation of needs in the world of big data processing. Now we need to handle data ingress from many sensors around the world and make real-time decisions to be executed by these devices. As such it is no surprise we see new services to handle the processing of streaming data, such as Amazon Kinesis.
Recently we implemented a streaming data processor for a customer using Amazon Kinesis and wanted to share our learnings. Our customer is a telecom business which, yes you guessed it right, means that they needed to process some real-time data in a distributed manner. Using the metrics for popularity and manageability, the telecom opted for Amazon SQS.
Producers created different queues for different organizations – let’s just assume organizations are different entities that can produce multiple events at the same time. Upon receiving an event, a producer would enqueue it in its specific queue. From there a listener to that queue, created by the processor, would pick it up and process it.
This solution was living the dream until the client realized that Amazon SQS, because of its distributed nature is not, despite the name suggests, FIFO. Long story short, all hell broke loose and in comes Flux7.
After looking at different solutions, and given our CTO’s overwhelming love for Amazon Web Services, we ended up choosing Kinesis. So the real challenge here was to process events in parallel from different organizations and in an orderly manner within an organization. In order to tackle this problem we used organization IDs as partition keys for Kinesis so that all the events from a single organization end up in the same shard as shown in figure 2.0 below.
The next challenge was to have distributed processors in place. Turns out, Amazon has already made a wonderful library called Kinesis Client Library (KCL). KCL is made in Java and there is a Python port of it. But, our project was in Node JS, so we ended up using an unofficial clone of it. Below is some basic terminology taken from the project website:
- Consumer: A single process that is responsible for consuming a single Kinesis shard. Consumers are created and managed by clusters; they should not be created directly.
- Cluster: A process that manages consumers. A single cluster can have many consumers (or 1, or 0) that all run on the same machine. Clusters should be created by the CLI.
- Network: All of the consumers processing a single Kinesis stream for a single application. Networks can have many clusters that are run on one or many machines.
Networks are meant to be distributed, though they don’t have to be, and will automatically rebalance shards across clusters. Each cluster will try to process an equal number of shards, so it’s a good idea to make sure each cluster has equivalent resources e.g. clusters should have similar memory and CPU available. Networks will automatically pick up new shards as splits or merges happen.
Multiple networks can independently process the same stream as long as the stream has enough capacity. In this case, you should be sure to give each set of processors a unique table flag.
KCL uses Amazon Dynamo DB in order to maintain its state. So, here a kinesis consumer fetches events from the stream in order of sequence numbers (assigned by Kinesis in ascending order) and thereby maintaining state on Dynamo DB in order to make sure that (i) no two consumers are listening on the same shard (ii) events do not get processed twice. As shown in figure 3.0:
The only problem that remained to solve was to deduplicate events. From Amazon’s documentation on the issue there can be two reasons for duplicates:
- Producer Retries: One of the unexpected reasons can be network-related timeout after it makes a call to PutRecord
- Consumer Retries: Consumer retries happen when record processors restart. Record processors for the same shard restart in the following cases:
- A worker terminates unexpectedly
- Worker instances are added or removed
- The application is deployed
- Shards are merged or split
In order to cater to this problem, we started appending a UUID to every event. Processors used Memcached nodes on an Amazon ElastiCache cluster to deduplicate events using these UUIDs.
Currently, the customer did not have a need to dynamically scale their Kinesis stream to match demand. This is sufficient for many IoT vendors since their devices are always sending data. We sketched out an architecture to support dynamic scaling up of the instances by detecting shards with high amount of traffic and splitting them. As our customer’s needs change we may implement these changes.
If We Did It Again
At the time we did this project, support for Kinesis streams in AWS Lambda was not in general availability. If we were to do it again, we would have changed the setup to use AWS Lambda instead of using our EC2 instances for the consumers. This would have simplified some of the work on our end.
The End Result
Our client was plagued with many reliability issues with their old setup. By using Kinesis we were able to work through many of the reliability issues they had. They still had some issues with other aspects of the application that they wanted to address, but at Flux7 we strive to limit ourselves only to projects where we can provide an excellent experience to our customers and we did not believe we’d be able to do that in this case so we turned down the project. We did provide them with DevOps and distributed systems guidance to enable them to fix the issues themselves.
The Kinesis setup performed admirably, and while it is true that the same result could have been obtained by using SQS, RabbitMQ or several other platforms, each of these platforms would have required a significant amount of development work. This would have been extremely burdensome for our client that is a small startup. SO, using Kinesis they were able to get a more stable solution at a lower cost.
Post Date: 04/29/2015