Apache NiFi: Forwarding HTTP headers

Maarten Smeets

Apache NiFi can be used to expose various flavors of webservices. Using NiFi in such a way provides benefits like quick development using a GUI and of course data provenance. You know who called you with which data and where the data went. The NiFi is very scalable, delivery can be guaranteed and NiFi can help with features like back-pressure if a backend system cannot handle requests as quickly as they are offered. Exposing webservices by using NiFi, can have additional benefits such as service virtualization (decoupling). When exposing HTTP(S) webservices, a regular requirement is to pass through HTTP headers. This blog post is about how you can do that using the NiFi processors ListenHTTP, InvokeHTTP, HandleHttpRequest and HandleHttpResponse. I’ve used the environment which is described here.

Forwarding HTTP headers using Apache NiFi

ListenHTTP processor

The ListenHTTP processor can act as an HTTP server and receive requests. This processor however only receives messages and returns a status code (read here). It can not reply with a body and you do not have the option to provide a status message. This processor is typically used for fire-and-forget scenario’s. In the ListenHTTP processor properties, you can specify a base path. You can not use multiple ListenHTTP processors on different paths on the same port since this will cause a port conflict. You can specify, using a regular expression, which headers will be captured in FlowFile attributes. If you specify .* every header will be captured.

HTTP Headers to receive as Attributes

If you call the service, in my case I used the following Curl command:

curl -i -H "X-Custom-Header: test" -d "testdata" http://localhost:8123

My custom header is captured in an attribute as you can see in the log file (using the LogAttribute processor)

Custom header is forwarded

When you want to forward specific headers to another endpoint, you can use the InvokeHTTP processor. This processor has a property: “Attributes to Send”. In this property, you can specify a regular expression which determines the attributes to forward as HTTP headers. Be careful here that if you specify .*, you are adding NiFi information to the call and you will create new HTTP headers which were not present in the original call, such as the uuid. It helps if you have a naming convention for custom HTTP headers which need to be forwarded, for example, prefix them with X-, so you can do a regular expression like X-.* to get your desired result. If you only want to forward specific headers (not determine them by using a regular expression), you can use dynamic attributes like in the screenshot below.

Custom header as dynamic attribute on the InvokeHTTP processor

HandleHttpRequest and HandleHttpResponse

If you need to reply with a response which contains more than what ListenHTTP allows, you can use the HandleHttpRequest and HandleHttpResponse processors. A handy feature of the HandleHttpRequest processor is that HTTP headers are prefixed by the processor with “http.headers.”:

HandleHttpRequest captures headers in attributes which are prefixed

Thus it becomes easy to identify which attributes are captured HTTP headers and which attributes are added by NiFi. Using the identified headers in the InvokeHttp processor however is less straightforward since, as indicated, the headers have been prefixed and the attributes need to be renamed if you want to have the same HTTP headers in the initial request as in the InvokeHTTP.

If you specify “http\.headers\..*” in the “Attributes to Send” property of the InvokeHTTP processor, , you will end up with HTTP headers like http.headers.X-Custom-Header instead of X-Custom-Header. The UpdateAttribute processor requires you to specify individual attributes and is thus not suitable as a generic solution to automatically rename multiple attributes for which you might not know the names before the request is done.

In order to rename attributes you can use the ExecuteScript processor and create a small Groovy script like below (inspired by this).

def flowFile = session.get();
if (flowFile != null) {

    // Get attributes
    def attmap = flowFile.getAttributes()

    for (entry in attmap) {
    	if (entry.key.startsWith("http.headers.")) {
		def mykey = entry.key.replace("http.headers.","")
		flowFile = session.putAttribute(flowFile, mykey, entry.value)
		flowFile = session.removeAttribute(flowFile, entry.key)
	}
    }
    session.transfer(flowFile, REL_SUCCESS)
}

I’ve implemented a flow like this (you can download it here). In order to use it, you will probably need to change the hostname in the InvokeHTTP processor since in this sample, it contains a fixed IP;

Overview of the entire flow including the script to rename attributes

First I receive the request in the HandleHttpRequest. Next I’ve used an ExecuteScript processor to execute my Groovy script. I’ve used wrk to perform a quick benchmark to determine how this processor affects performance.

Command used:

wrk -t12 -c50 -d30s http://localhost:8123

Without ExecuteScript:

Running 30s test @ http://localhost:8123
  12 threads and 50 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    50.06ms   37.47ms 195.99ms   62.55%
    Req/Sec    82.19     27.48   220.00     66.83%
  29560 requests in 30.05s, 3.44MB read
Requests/sec:    983.55
Transfer/sec:    117.18KB

With ExecuteScript:

Running 30s test @ http://localhost:8123
  12 threads and 50 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    63.05ms   48.22ms 471.12ms   73.14%
    Req/Sec    68.07     27.14   171.00     63.98%
  24375 requests in 30.07s, 2.84MB read
Requests/sec:    810.64
Transfer/sec:     96.58KB

As you can see, there is some overhead of using an ExecuteScript processor. In my case this was around 13ms per request.

After the headers were renamed using the script, I used InvokeHTTP to call the flow to the right which just logs the received request and headers. In the InvokeHTTP, I indicate in the “Attributes to Send” property “X-.*”. In the HandleHttpResponse, I indicated to also reply with the X-.* headers. This is why we see the X-Custom-Header in the logging of LogAttribute of my dummy service and in the response.

Custom header forwarded in a request and returned in a reply

Finally

It is relatively easy and customizable to forward specific HTTP headers using Apache NiFi. It helps to use a naming convention for your HTTP headers so you can easily identify them or use a fixed list. There is no easy way though to forward all of the HTTP headers since in the InvokeHTTP processor, you need to explicitly specify the headers you want to forward using a regular expression or dynamic attributes. If you need that functionality, you will have to write your own custom processor which uses the attributes http.headers.* created by HandleHttpRequest as HTTP headers and renames them to their original name before using them.

It is harder to forwarding HTTP headers which have been captured in FlowFile attributes using the ListenHTTP processor. ListenHTTP creates attributes for HTTP headers which have the same name as the header itself. This makes it difficult to distinguish the headers from other FlowFile attributes automatically (you might not require this distinction though).

Only forwarding entire HTTP requests + headers without much manipulation, could of course also be covered by a more simple proxy product like NGINX.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Next Post

Connecting Go application to Oracle Database–On Prem and Autonomous , with and without Oracle Client libraries

I have been struggling a bit with this one: how to connect from a Go application to an Oracle Database – both a locally running database as well as an Autonomous Database Instance running on Oracle Cloud. Ideally, my application does not require Oracle Client Libraries to be installed in […]
%d bloggers like this: