Skip to content

Apache NiFi: File extraction, validation, transformation and upload flow (Real use case)

Share on twitter
Share on linkedin
Share on email
Share on whatsapp
Apache Nifi: Extraction, validation, transformation and file upload flow

The data flow described in this article makes use of the RemoveMetadata processor, developed from scratch in the previous publication, Apache Nifi: Validating File Transfers.

Starting Apache NiFi

Once we have finished the development, we need to have a NiFi installation on which to build our workflow. The steps to boot into a Linux environment are available here, for Windows, they are as follows:

  1. Download Apache NiFi (available here)
  2. Unzip the file
  3. Access the bin folder
  4. Run run-nifi.bat
  5. We access, through the browser, to the address http://localhost:8080/nifi.

Create Workflow

Configuration Reader Development

The reading of the properties of the XML file is done independently from the rest of the flow, so that it is executed once and stored in memory.

This flow consists of reading one or more XML configuration files, through the processors and ListFile y FetchFile. They list and consume the files, respectively.

Subsequently, the value of each property is extracted and a key is defined to store it in memory. This is done using:

  • The processor EvaluateXQueryprocessor, configured with the xPath of the property to extract.
  • The processor UpdateAttributeProcessor: A new attribute is created, name.property, whose value will be the key with which the configuration property is stored.

Finally, with the processor PutDistributedMapCache driver the properties are saved in memory, making use of the controller DistributedMapCacheClientService.

Throughout the flow, you incorporate processors that write custom log traces. LogMessage processors that write custom log traces. By default, the file they log is nifi-app.log.

Development of the rest of the NiFi flow

The last step, once we have all the necessary elements for this flow available, is to carry out its development in NiFi.

The flow starts by getting the files from local folders or via FTP connections, with the processors GetFile y GetFTPprocessors, respectively.

These processors are configured to be executed only by the primary node of the cluster, to avoid concurrent access errors. Right after their execution, the load is balanced among all the nodes, since the success relationship is configured as RoudRobin.

Subsequently, the properties that are stored in memory are read. To do this, the LookupAttributeprocessor, which adds a new attribute to the flowfile with the value obtained.

This processor uses the DistributedMapCacheLookupServicedriver, which is configured with a DistributedMapCacheClientService type driver. The latter will be used by the PutDistributedMapCache processor that writes the properties in memory.

The next step is to execute the validations and transformations on the file.

The first thing to do is to check, with the processor RouteOnAttributeif the associated property is set to true. This is done before executing any of the groups of processors and if it is true, each of them will be executed.

Antivirus

The file is written to a temporary folder, accessible by McAfee, with the processor PutFile processor.

The antivirus application is executed via command line, with the processor ExecuteStreamCommand. In addition, you leave the original flowfile in a processor Waitand report the console output with the processors. Notify.

If any of the Notify processors fail to execute, a log is recorded in the log file, with the LogMessage processor.

Finally, if the response was not 'OK', the file is written to an audit folder and the error is processed.

Name validator

The first level of validation is performed with a RouteOnAttribute processor, which checks if the filename has the structure 'YY-XX-ZZ'. If this is not the case, a new attribute 'message.error' is added with the UpdateAttribute processor and the error is processed.

If this condition is met, the first and second particles will be extracted. Then, the original flowfile will be left waiting, with the Wait processor, for the database query to be executed.

Once the result of the query is reported by the processor group, if its value is not 'OK' the error is processed.

Database Query

The select is executed with the ExecuteSQLthen convert the result to JSON with the processor ConvertAvroToJSONand the result is extracted with the processor. EvaluateJsonPath.

Finally, with the Notify processor, the original flowfile running the previous Wait processor is notified of this result.

Extensions

To extract the actual file extension from the processed file, you use the IdentifyMimeType. Next, the extracted value is checked to see if it matches the extension in the file name with the RouteOnAttribute processor.

Afterwards, it is distinguished if you must check the allowed or denied extensions, with the RouteOnAttribute processor. In the first case, it validates that the file extension is registered among the configured ones, and in the second case, that it is not.

Metadata

First, the file is written to a temporary folder with the PutFile processor. Second, this path is added as an attribute of the flowfile using the UpdateAttribute processor. Finally, remove the metadata from the file with the RemoveMetadata processor.

Finally, if the file processing is successful:

  • the RouteOnAttribute processor is used to check whether the output is local or FTP.
  • The file is written to its destination path, with the PutFile or PutFTP processors.

If, on the other hand, an error occurs during the process, a record is left in the log file, with the LogMessage processor. Also, a '.NOK.txt' file is generated, both with the content of the 'message.error' property. In addition, the administrator's email address is searched in memory, with the LookupAttribute processor, and a mail is sent to him using the PutEmail.

Finally, only if the error does not come from the Antivirus processor group, the file will be written to an error path, with the PutFile processor.

Generate .NOK.txt file

A file with the contents of the 'message.error' attribute is generated using the GenerateFlowFile. Then, it is assigned the same name as the original file, but with '.NOK.txt' extension, with the UpdateAttribute processor.

Then, it checks, with the RouteOnAttribute processor, if the output is local or FTP, to write the file to the destination path with the PutFile or PutFTP processors, respectively.

And with this, we would finish the whole process.

Share the article

Share on twitter
Twitter
Share on linkedin
LinkedIn
Share on email
Email
Share on whatsapp
WhatsApp

A new generation of technological services and products for our customers