Java: Developing an in-memory data transformation pipeline

Joe Honour
7 min readNov 17, 2019
Image Source: https://www.javaworld.com/article/3272244/what-is-the-jvm-introducing-the-java-virtual-machine.html

I have often found the need to take a source of data, and apply a series of transformation over it. For instance, I have a series of weather readings I need to read in from a file. In order to parse these, I would generally need to perform the following steps:

  1. Read each line of the file in (where each line represents an individual weather reading).
  2. Parse the line into some Java object (POJO).
  3. Perform some data manipulation (for example, converting rainfall in millimeters to centremeters).
  4. Send each individual weather reading to a downstream service (for example, store the result in a database).

We could quite easily deliver something that meets this exact use case, but really this pattern is seen multiple times throughout the project. For instance, we may work over the same source data multiple times, but require the ability to apply different changes on the data (maybe rainfall has to be in nanometers for one use case). Or another common deviation is the data input comes from a different source, but we want to apply the same transformation over it. So the question arises: Can we generalise this pattern to something more reusable ?

If you wish, you can skip to the end of this article to see the full implementation and example.

The Architecture

The goal of this article is to end up with a generic framework that can let us define a data transformation pipeline. It should take any data input, apply any set of transformations over this data, and then perform some final action on the result of the pipeline. It is also really important that we make use of the Java type system to provide type-safety over our transformation (we should always know what the output of a previous transformation was, in order to know whether the transformation we are adding is able to work on the data in its current shape).

In order to generalise this pattern, we need to define what a pipeline over a data source consists of (Figure 1).

Figure 1: a data tranformation pipeline

Data Input: all our pipelines start with some input source. This could be a file, some in memory data structure, or a possibly an infinite stream.

Data Transformation: a data transformation stage should take a source type S, and return an output type O, where O is the result of applying the transformation function on S. We also want this to use Java’s type system to give us type safety on our transformations (we should always be able to verify at compile time the transformation is correct for this stage in the pipeline). If the transformation expects an input of type String, and produces an output of type Integer, we should only be able to add it in the pipeline where the previous stage produces a String output.

Data Sink: this is the final stage of the pipeline. It should take the result of the data input, plus any transformations, and perform some final action over the data. This can be thought of as a transformation with no return type. We also want this to be type safe, so if the final action requires a String input, we must make sure the result of our Data Input (and any subsequent transformations) produces a final type of String to feed to our data sink.

With this structure, we will now implement our data transformation pipeline.

Stage 1: Data Input

In order to handle data input, we need to be able to create an abstraction that is able to present each starting message from our data source. For instance, we could read each line from a data file as an individual input to the pipeline in a String format. Luckily, Java already has an interface we can use as our starting point for this, Iterable<T>.

Java’s Iterable<T> represents a (possibly infinite) sequence of items of type T. The interface forces us to provide an Iterator<T> object. An Iterator<T> has two main functions: to identify if there is another message to read, and to get the next message in the stream of input messages.

If we rely on the Iterable<T> interface being the input to our data pipeline, any user of our pipeline can implement this in order to provide us a source of data. This makes use of built in objects in the Java framework, meaning our pipeline becomes easier to adopt as we don’t enforce our consumers to write adapters to place data in the format our pipeline expects (All collections in Java already extend this interface, meaning we immediately allow these to work as a source to our pipeline with no custom logic required).

For instance, if we wanted to provide something that reads each line of a local file individually, and presents the result as a String, we could implement the following class as a source of a Data to our pipeline (Figure 2).

Figure 2: (An example file line reader class using the Iterable interface).

If we expect the consumer of our pipeline to provide us an Iterable<T> for our data source, we then need to create the first class of our pipeline: something that provides access to that Iterable (Figure 3).

Figure 3: The Data Source class of our pipeline.

As you can see, our DataSource class takes in an Iterable<T> of type T, and also captures this information in the class type T. This means this DataSource is able to provide type information for the starting type of the data being fed into the pipeline. If we were to pass the FileLineReader as our DataSource, we would have source of type String.

With this, the next stage is to implement the capability to provide transformations over the data.

Stage 2: Transformations

Now we have a data source, we want to be able to safely provide transformations on that data source. In order to make these type safe, and make best us of Java’s type system, we need to capture the input type and output type of a transformation. If we have this information we can enforce that the next transformation in the pipeline must always accept an input type of the output of the previous transformation (Figure 4).

Figure 4: The input type must match the output of the previous stage in the pipeline.

In order to provide this our transformation class will need to capture some information:

  • The input type <T>.
  • The output type <D>, which is the the result of applying the transformation on the input <T>.
  • The source of the input T (which is the previous stages Iterator<T>).

If we have this information we can implement a transformation using the same Iterator<D> interface, where we execute this transformation by taking the next element of our input, apply the transformation function, and producing an element D (Figure 5).

Figure 5: the data transformation stages.

In the transformation implementation there are 2 key areas:

  • ApplyFunction: is something the consumer of our pipeline needs to implement in order to transform an input element T, to a desired output element D.
  • Transformation: This class allows us to use the underlying iterator pattern to control the execution of the ApplyFunction. When the transformation is iterated (the next method is called), you can see that it takes the input Iterable<T> (which could be the original DataSource, or a previous transformation), gets the next message T, executes the ApplyFunction on it, then returns the resulting type D.

With this pattern in place, we can now define as many transformations as we like in our pipeline with complete type safety. Our final stage is then to provide the Sink interface.

Stage 3: Sinks

In order to execute our pipeline, we need to have a final stage that takes the final Iterator<T> from the last transformation stage, and is able to force it to execute. We also want to provide some final function over the result of the pipeline. For instance, we may want to store or print the result of the data transformation. In order to provide this functionality, we need to implement a Sink<T> (Figure 6).

Figure 6: the data sink stage.

We again have 2 key areas here:

  • ApplyResult: takes in the final type T of the pipeline, and returns void. This is the function that handles the result of the pipelines execution over a single message.
  • Sink: This class provides the ability to execute the pipeline. You can see by calling execute we: take that Iterator (that will cause the DataSource Iterable to get the next message), apply each transformation in order, and apply the final ApplyResult function.

We have now built a fully customisable in-memory data pipeline, that is capable of taking any input source (that implements the built-in Java Iterable interface) and applies 1 to N data transformations to it.

Example and Full Implementation Code

To view how we might transform a List of String’s, turning them into Integers, then Doubles, and then print them out, you can view the following example (Figure 7).

Figure 7: full data transformation pipeline with example.

I hope this is a useful insight into how the use of Java’s Iterable<T> interface can provide powerful, type-safe, functionality to transform streams of data.

Disclaimer: this is very similar to how the underlying Java Streams API works. If this is something you need to do, investigate this API first, best not reinvent the wheel if you don’t need to! :)

If you would like to find out more, please feel free to contact me.

--

--

Joe Honour

Software Engineer, with interests in Distributed Computing.