Updated: January 24, 2023

Suitability of Apache NiFi as a Metrics Collection Tool

Pavel Zverev.

Pavel Zverev

CCO, Software Engineer

Software Development
Suitability of Apache NiFi as a Metrics Collection Tool

"If you can't measure it, you can't manage it"

Peter Drucker, Father of Management Thinking

Metrics are essential for a successful company. As a metric, we understand a set of measures used for assessing and comparing a company's performance. They are usually used to build a dashboard that's regularly reviewed by management or analysts.

There are several uses where a good metrics program is almost crucial:

Many metrics are located in different systems, databases, records. So how to determine, collect and monitor them? This article aims to explore the capabilities of Apache NiFi as a means of implementing a metrics collection system.

Tool overview

Apache Nifi (hereinafter "NiFi") is a system that implements ETL (Extract, Transform, Load) processes. The system is distributed under the shareware Apache 2.0 license (owned by the Apache Software Foundation). The system allows collecting data from various sources, transforming it "on the fly" and sending it to the receiving system. The system has a wide range of integration capabilities. The source/recipient of data can be:

To describe the processes, NiFi offers a web-based designer interface that allows describing the process of receiving, transforming, and sending data in the form of blocks and links between them. Accordingly, NiFi implements the FBP concept. The interface looks like this:

To describe the processes NiFi proposes to use the following components:

FlowFile – a data packet capable of containing content and metadata to it in the form of attributes.

Processor – a handler capable of generating, redirecting, converting stream files, or performing other actions. The current version at the time of this writing contained 288 types of handlers. It is worth noting the following separately:

Connection – transferring stream files between processors.

Process group – a group of processors that form a single process.

Input port / Output port – to delegate data input between different project teams.

Each individual block of the process group, like the process groups themselves, can be stopped and started, thus controlling the work process. The installation type is "on-premise" and supports clustering (using Apache ZooKeeper) for fault tolerance and horizontal scalability. Change versioning is supported using the Apache NiFi Registry. Thus, it is possible to save and rollback process description models in NiFi.

Practical task

As part of the development of a metrics collection system, Mad Devs team was tasked with integrating with different systems: Jira, GitLab and SonarQube. Each of them has a REST-based HTTP interface. For Jira, there is already an article "Automating Jira Analytics with Apache NiFi", so we decided to consider integration with GitLab on a publicly accessible repository. For a practical task, the following requirements were determined:

Implementation of a practical task

Conventions

To interact with GitLab, we will take as a basis GitLab Pipelines API v4 and the current Apache NiFi 1.14.0 version in the form of a Docker container. We will use "AutowareAuto" as a publicly available GitLab project.

Installation

Installation and launch:

docker pull apache/nifi
docker run --name nifi -p 8443:8443 -d apache/nifi:latest

# Working

docker stop nifi
docker rm nifi

The system will be available at localhost:8443/nifi (it may take about a couple of minutes for the container to fully start) and will ask for authorization.

Login and password are generated when the system is first started. To find them out, run the following command:

docker exec -ti nifi /bin/bash

Once in the container shell, run grep on the 'Generated' phrase on the logs/nifi-app.log file:

grep Generated logs/nifi-app.log

We get:

Generated Username [username]
Generated Password [password]

To exit the shell, execute the command:

exit

We enter the data without brackets when requesting authorization at the local address localhost:8443/nifi/login.

Process description

The first step is to add a new process group and name it "Build Status Metric":

Double-click on the process group to switch to its local context. To access the GitLab API Pipelines, we need to generate the following URL:

https://gitlab.com/api/v4/projects/{PROJECT_ID}/pipelines?page={CURRENT_PAGE}&per_page={PER_PAGE}

Where: 

In order to start a process in NiFi, an initiating trigger must be defined. For this, the "GenerateFlowFile" processor is suitable, which generates a stream file on a timer. Let's place the "GenerateFlowFile" processor and define its following properties:

• Name "Trigger: dispatch GitLab project ID".

• Scheduled start "60 seconds" - can be adjusted arbitrarily so that stream files are not generated too often.

• In the properties add the attribute "PROJECT_ID" with the value "8229519" (ID of the selected GitLab project).

We have defined an initiating trigger that generates a stream file with the project ID as a metadata attribute. Let's add a processor that additionally sets pagination attributes, since it is not possible to get all the data in one request. To do this, place the "UpdateAttribute" processor and define the following settings for it:


Let's connect the processors with the "success" link (stretching the arrow icon from the first to the second, respectively):

At this stage, we have all the data to make an HTTP request to the GitLab API. In NiFi, the most functional processor for this is the "InvokeHTTP". Let's place this processor and define its following settings:

https://gitlab.com/api/v4/projects/${PROJECT_ID}/pipelines?page=${CURRENT_PAGE}&per_page=${PER_PAGE}

Make sure that "Ignore response's content" is set to "false".

Connect the processors with the "success" link:

GitLab API, if there are elements to be returned, will return them as an array in JSON. If there are no elements, an empty array will be returned - in this case, no further action is required. It is possible to calculate the length of an array with elements using the "EvaluateJsonPath" processor. Let's place this processor and define its following settings:

After we connect the processors with the "Response" link:

Now we redirect the stream files, provided that the length of the array is greater than or equal to zero. Let's place the "RouteOnAttribute" processor and define the following settings for it:

Configure automatic termination of sending stream files in the "empty", "unmatched" states.

Then we connect the processors with a "matched" link:

If the array passes empty then the "empty" rule will stop transferring files from the stream. If the length of the array is positive, then you need to do 2 types of actions:

First of all, we will loop receiving data using one more "UpdateAttribute" processor. We place the processor and define the following settings for it:

We will also connect the processors of redirecting, updating pagination and receiving data from GitLab in the manner shown in the screenshot below:

Now we continue working on the existing data. The array of data returned by the GitLab API contains several elements that have the following structure:

[
 {
"id": 381036067,
"project_id":
8229519,
"sha":
"2b5d26934666bb2f7cec933e4542b6b5012c26f6",
"ref":
"1352-add-offset-to-2d-ground-truth",
"status":
"success",
"source":
"push",
"created_at":
"2021-10-01T16:22:40.870Z",
"updated_at":
"2021-10-01T17:24:00.798Z",
"web_url":

"https://gitlab.com/autowarefoundation/autoware.auto/AutowareAuto/-/pipelines/381036067"
}
]

We see that there is more data than required. To clean up data from redundant data, first split the stream file with the array into small stream files with its elements. Let's do it using the "SplitJson" processor. We place the processor and define the following settings for it:

Connect the processors with the "filled" link (as with the pagination attribute update processor):

Now several stream files will be generated, and each of them will contain a separate pipeline data object. Let's finish cleaning up the redundancy using the "JoltTransformJSON" processor. It will allow the Jolt transformation notation to be applied to the data of stream files (sandbox). Let's place the processor and define the following settings for it:

[
  {
    "operation": "shift",
    "spec": {
      "id": "id",
      "sha": "sha",
      "status": "status",
      "source": "source"
    }
  }
]

So we connect the processors with a "split" link:

At this stage, it remains to send the converted data to the receiving system. For simplicity of experiment, let's implement a simple web server on the NodeJS platform and the ExpressJS framework:

npm init

{
  "name": "test-server",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "author": "",
  "license": "ISC",
  "dependencies": {
    "express": "^4.17.1"
  }
}

npm install

const express = require('express')
const bodyParser = require('body-parser')
const app = express()
const address = "0.0.0.0";
const port = 3000


app.use(bodyParser.json());

app.post('/', (request, response) => {
    console.log("PROJECT", request.headers.header_current_project);
    console.log("PAGE:", request.headers.header_current_page);
    console.log(request.body);

    response.send('');
})

app.listen(port, address, () => {
    console.log(`Example app listening at http://localhost:${port}`);
})

node index.js

Add one more processor "InvokeHTTP" to NiFi and define the following settings for it:

Connect the processors with the "success" link:

Open the context menu on an empty space in the workspace by right-clicking

Items "Start"/"Stop" should lead to the start and stop of data transfer. The item "Empty all queues" should empty the queues that could have accumulated during stops. The item "Download flow definition" will save the current process as a JSON file. It can be loaded when adding a process group:

Configurator competencies and qualities

Consider the competencies and qualities required by the user, who will subsequently be responsible for configuring and maintaining the system:

Key features:

Optional features:

Conclusion

When your company is already swimming in data, metric collection helps you focus on only useful data for improving processes and achieving business goals. For this purpose, the Apache NiFi system is considered. The generated process definition illustrates the possibilities when used as a tool for a metrics collection system. The system is suitable because it has the necessary functionality and characteristics for the implementation of these processes.