Integration of a streaming system with batch oriented, slower downstream systems

written by Mik Quinlan on May 6, 2014 with 2 comments

We’re happy to share a great blog from Mik Quinlan, a Java Technical Architect and Agile Mentor with more than 16 years experience in software development, who has extensive experience deploying VoltDB in fast data ingestion applications for the retail market.  He is currently Director of Mobile Advertising at Thinknear.  The blog originally appeared here.

Background

Over the last two years, I have been involved in transforming a complex legacy processing system that gave rise to a unique solution that may be used in other contexts. Before continuing, it is important to note that the pattern of processing data quickly and buffering for downstream consumption is nothing new. What is different is the way in which the problem was approached, the principles used and the technologies selected.

The data to be processed was supplied by up to 14,000 merchants by way of feed files. Each feed file contained one or more offers. The merchant would update one or more feed files and make them available for processing. At peak, a feed file could contain up to 3MM offers (though 12MM was seen on occasion). Further, there were approximately 160 – 180MM individual offers in the ecosystem at any one time, with up to 80MM offers being updated per day.

As offers were processed, the goals included enriching these offers to enable better search results and analytics and updating the search index with those offers so they would be available to the various web sites. The SLA for this end-to-end process was 1 hour. For smaller organisations that do not have the budget for an extremely large Hadoop MR cluster (or similar), another solution needs to be found.

To meet the desired SLA, a streaming system was conceived. This meant that each individual offer would be extracted from feed files and be sent down the pipeline for processing individually. Calculations showed that this ingestion part of the system would run at 30-35,000 offers/second. Many downstream systems, however, could not run at that speed of update of individual offers including, for example, Solr master index updates, various legacy systems and the Hadoop ecosystem. As a result, an architectural solution had to be conceived that allowed the fast ingestion and storage of this feed data, and the provision of that feed data in a manner that allowed downstream systems to dictate their own ingestion speed.

During the design of the system, some principles came to the fore:

  • Processing of data had to conform to the following principles in order to ensure data integrity during both normal operations, error states, re-processing or other abnormal behavior:
      • Immutable – immutable data would be easier to scale.
      • Idempotent – replaying operations would not cause any unexpected side effects.
      • Associative – if different versions of an offer were processed, the final outcome would be the same regardless of the order in which they were processed.
      • Commutative – see Associative, above.
  • There would most likely be two repositories required: one to store the data after fast ingestion and processing (the system of record), and one to hold a subset of the data for downstream systems to ingest (the staging repository): note, this also clearly demarcated the responsibilities, and therefore workloads, of each repository.
  • The services that interacted would need to be easily scalable and needed to have the following properties:
      • Asynch IO
      • Stateless
      • Metadata (e.g. state) moves with the data it applies to in the same packet to avoid querying multiple stores for results.
  • The system of record would have to perform at a minimum of 35k TPS and, the faster the system of record, the less complex it would be, and the easier it would be to implement, as optimisations were less likely to be required (e.g. caching).
  • Slow clients always had to have access to the latest data, at the same time giving them the option to miss some updates to get the most recent version of a particular datum (in this case, an offer).

This blog entry aims to describe the architecture chosen at a high level while submitting the principles of the pattern that emerged.

How it works

Mik Quinlan - How It Works

As depicted in the diagram above, data was pushed or pulled from internal (i.e. same organisation) or external (i.e. different organisation) systems from the untrusted domain into the pipeline system where it entered the trusted domain. Data sanity and validation occurred in Pipeline Service 1 prior to triggering streamed data processing. This continued with data being processed and enriched along the pipeline until it was stored in the System of Record and then passed to the Pipeline Data Publisher.

Downstream clients could be either new or legacy. These typically would not have the high-performing SLA requirements of the pipeline, due to it either not being necessary or the nature of the system (e.g., batch processed machine learning). As a result, a staging repository had to be put in place that allowed the downstream clients to consume the data at their own rates. Note that this staging repository only contained a subset of data for multiple reasons including efficiency and cost. If the staging repository were to become corrupted for any reason, a mechanism to repopulate it from the System of Record had to be put in place.

Example

Mik Quinlan - Example

As can be seen from the above diagram, the pipeline system designed was any set of services necessary for the task at hand.

The System of Record needed to be able to handle a very high transactional load (35k TPS) at low latency to ensure that any queues in the system would be drained fast enough. Having looked at other solutions, including Clustrix, MongoDB, Riak, dbshards and Oracle, VoltDB was selected to perform this role as:
• It met our SLA with the ability to horizontally scale to meet increased demand;
• Its high throughput negated additional complexities such as caching;
• Its immediately consistent nature meant there was no need to deal with conflict resolution, programmatically or otherwise, which is a common feature of eventually consistent, NoSQL systems.

As a result, the Pipeline Data Persister chosen was a VoltDB client that connected to the VoltDB cluster to perform its duties. After successful persistence of the data, the persister collected all data required (joining across VoltDB tables where necessary) and sent a flat data structure to the Pipeline Data Publisher (PDP).

The Pipeline Data Publisher chosen was a Kafka Producer, which sent the data to the Kafka store. The size of the disks allocated to the Kafka store had to be sufficient not only to store data given the rate at which it was published, but also for long enough that downstream clients could consume everything they required. Finally, the downstream clients were all Kafka clients. This allowed them to take advantage of the architecture and hard decisions that have been made with the Kafka system, the fundamental difference with traditional messaging systems being that the clients can dictate the rate at which they consume messages. Of note is Kafka Consumer 2. This consumer was also an HDFS client. In this way, data streamed into the pipeline system could be ingested at a much slower rate into HDFS where much longer running processes worked on that data (e.g. Mahout). Optionally, the results of the long running processes could be added alongside the pipeline-ingested data, forming a coherent view of the data in the ecosystem.

Of course, the above is only an example. You should select those technologies that are suitable for your problem at hand when implementing this pattern, be it conformance to organizational guidelines, expertise etc.