Apache NiFi: Automating tasks using NiPyAPI nipyapi

Apache NiFi: Automating tasks using NiPyAPI

Apache NiFi has a powerful web-based interface which provides a seamless experience between design, control, feedback, and monitoring. Sometimes however, you want to automate tasks instead of doing them manually using the UI. This does not only allow you to perform the tasks a lot quicker but it also helps make them more reproducible. It allows you to incorporate tasks in for example a CI/CD system without requiring human intervention. A NiFi feature to help you automate tasks is its powerful API. In order to more easily use this API from Python, NiPyAPI is available. In this blog post I’ll describe some things you can do with NiPyAPI, some challenges I encountered and how I fixed them.

Apache NiFi: Automating tasks using NiPyAPI nipyapi

You can find my sample code here. Please mind that this code is provided as an example only and should not be used in production without confirming it works well for your environment and use-case.

Getting started with NiPyAPI

TLS

Before you can do anything, you need to specify several properties which are used to connect to NiFi. These include certificate information (when using TLS) and user authentication details. This can be challenging and require some work. See for example here. You will need to create keys, keystores, register them with NiFi, make them available to your Python code. This is also a barrier for other developers to start experimenting with the NiFi API.

You can also decide it is safe enough to not explicitly validate the NiFi public key by using the following in your Python code:

urllib3.disable_warnings()
nipyapi.config.global_ssl_verify = False
nipyapi.config.nifi_config.verify_ssl = False

Your connection will still be encrypted but a man in the middle attack could fool you since the certificate and hostname are not verified. However, since I’m checking the certificate regularly by using a browser to access the NiFi webinterface (which uses the same certificate as the API) and the hostname is fixed in my script, the chances of that happening are negligible. I prefer an easy way to use the API and like to have my environment up and running quickly and easily.

Logging

In order to obtain some useful logging, I’ve added the following lines to my Python script:

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
logging.getLogger('nipyapi.utils').setLevel(logging.INFO)
logging.getLogger('nipyapi.security').setLevel(logging.INFO)
logging.getLogger('nipyapi.versioning').setLevel(logging.INFO)

Next you can use log.info for example to log messages.

Automate tasks

Authenticating to the API

I was having some difficulties when logging into NiFi at a customer. Sometimes it did not work after a single try. This can be devastating to automated processes. I build in some retries in my Python code to make it more robust.

#First specify connection details
nipyapi.config.nifi_config.host = 'https://NIFIHOST/nifi-api'
nipyapi.config.default_nifi_username = 'NIFIUSER'
nipyapi.config.default_nifi_password = 'NIFIPASSWORD'
#Next a login procedure which tries 3 times with a delay 
def nifi_login():
    loggedin = False
    i = 0
    attempts = 3
    delay = 10
    while (i < attempts) and (loggedin is False):
        log.info("Logging in. Attempt: " + str(i))
        loggedin = nipyapi.security.service_login(bool_response=True)
        if loggedin is False:
            time.sleep(delay)
            i = i + 1
    return loggedin

Stopping and starting controller services and process groups

When updating a process group to a new version, processes need to be stopped, queues need to be empty and controller services need to be turned off. If you do this manually, it takes quite some time. Especially the queues and controller services.

The below procedure disables controller services and process groups.

def disable_proc(app):
    app_pg_groups = nipyapi.canvas.list_all_process_groups(name_to_id[app])
    # Disable controllers, process groups and empty queues
    for app_pg_group in app_pg_groups:
        log.info("Disabling process group: " + str(app_pg_group.component.name))
        nipyapi.canvas.schedule_process_group(app_pg_group.id, False)
        log.info("Disabling controllers for: " + str(app_pg_group.component.name))
        acse = nipyapi.nifi.ActivateControllerServicesEntity()
        acse.state = "DISABLED"
        acse.id = app_pg_group.id
        nipyapi.nifi.FlowApi().activate_controller_services(id=app_pg_group.id, body=acse)

Why did I use the low level SDK here (nipyapi.nifi) and not the high level SDK (nipyapi.flow)? I got permission denied errors when using the high level SDK. Apparently I did not have sufficient permissions. By using the more fine grained low level SDK, many more things become possible. As a general recommendation, first try to do things with the easy to use high level SDK and if you encounter issues, try the low level SDK. The low level SDK is harder to use but sometimes requires less permissions since it is more specific.

Empty queues

In order to empty a queue, you require the ‘modify the data’ permission. Suppose you do not have this  permission and still want to automate emptying queues, can you? Yes, you can!

The trick is to set the flowfile expiration on the connection to 1 second and wait until the messages are cleared. Then you can restore the original setting to the connection and the queue will be empty. You can automate this. The below sample can be made more robust by but for example checking if the queue is empty every 3 seconds and if not wait a bit more.

def empty_queues(app):
    app_pg_groups = nipyapi.canvas.list_all_process_groups(name_to_id[app])
    # Empty queues
    for app_pg_group in app_pg_groups:
        log.info("Remove all messages on connection queues of: " + str(app_pg_group.component.name))
        for con in nipyapi.canvas.list_all_connections(app_pg_group.id,True):
            if (con.status.aggregate_snapshot.queued_count is not '0'):
                log.info("Empty queue: " + str(con.id))
                mycon = nipyapi.nifi.ConnectionsApi().get_connection(con.id)
                oldexp = mycon.component.flow_file_expiration
                mycon.component.flow_file_expiration = '1 sec'
                nipyapi.nifi.ConnectionsApi().update_connection(con.id, mycon)
                time.sleep(10)
                mycon.component.flow_file_expiration = oldexp
                nipyapi.nifi.ConnectionsApi().update_connection(con.id, mycon)

Cleaning up

When developing, your NiFi environment can become polluted with for example old parameters or controller services you do not use anymore. Wouldn’t it be nice if you can easily find them in order to evaluate whether you can remove them?

Finding unused controller services

You can find unused controller services with a method like below.

def get_unused_controller_services(app):
    app_pg = nipyapi.canvas.get_process_group(identifier_type='id',identifier=name_to_id[app])
    # Enable controllers and process groups
    cs = nipyapi.nifi.FlowApi().get_controller_services_from_group(id=app_pg.id, include_ancestor_groups=False, include_descendant_groups=True).controller_services
    for service in cs:
        if len(service.component.referencing_components)==0:
            print("Controller service " + str(service.component.name) + " has no referencing components. Parent pg: "+id_to_name[service.component.parent_group_id])

After identifying them, you should manually check if they can truly be removed and then you can clean them up.

Finding unused parameters

Parameters can at one point be used but after for example refactoring (moving them to a different parameter context or implementing inheritance for example), they might not be needed anymore. Also when parameters are introduced, newly introduced sensitive values need to be set manually. How can you find them without manually checking? This can be done with a method like below.

def get_unused_parameters(app):
    app_pg_groups = nipyapi.canvas.list_all_process_groups(name_to_id[app])
    # Enable controllers and process groups
    for pg in app_pg_groups:
        if (pg.parameter_context is not None):
            #print("Processing: "+ str(pg.parameter_context.component.name))
            param_context = nipyapi.nifi.ParameterContextsApi().get_parameter_context(pg.parameter_context.component.id)
            params=param_context.component.parameters
            for param in params:
                if len(param.parameter.referencing_components) == 0:
                    print("Context: " + str(pg.parameter_context.component.name)+". Parameter: " + str(param.parameter.name) + " is not being used")
                if param.parameter.sensitive is True and param.parameter.value is None:
                    print("Context: " + str(pg.parameter_context.component.name)+". Sensitive parameter " + str(param.parameter.name) + " has no value set")
         else:
             print("Process group: " + str(pg.component.name) + " has no parameter context assigned")

Finally

By using NiPyAPI in combination with some Python scripting, you can quite easily make your NiFi experience a lot better! Not only is this an enabler for automating deployments but it can also help reduce the effort required for cleaning up an environment of components which are no longer being used.

One Response

  1. oyeyemi February 5, 2024

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.