Skip to content

Apache NiFi: Validating File Transfers (Real Use Case)

Share on twitter
Share on linkedin
Share on email
Share on whatsapp
Apache NiFi: Validating file transfers

What is Apache NiFi?

Apache NiFi is a data routing and transformation stream execution engine. That is, it collects data from one source, processes it through different processors, and writes it to a different source.

For a first contact with the tool, you can consult this article Your data is moving house? Apache NiFi helps you with the move.

Other concepts

Processors are the basic building blocks for creating a data flow, they all consume or generate flowfiles and each one has a different functionality.

A FlowFile is a data record, consisting of a pointer to its contents (payload) plus attributes to support the contents.

Description of the Apache NiFi Use Case

The files must be moved from an input directory (FILE or FTP) to an output directory (FILE or FTP). In the process, run an antivirus to detect corrupt files, remove metadata for WORD, EXCEL and PDF and validate that the file has a correct name and allowed extension.

In case it is not correct, a file ${filename}_NOK.txt shall be written to the output path, the file shall be left in an alternative path and an email shall be sent informing of the error.

All the configuration will be available in an XML file.

Development of Java processors

If we look at the NiFi documentation, among all the processors that are available in its standard libraries, we find that there is none that allows us to remove the metadata from a file.

Having downloaded the Java code(available on GitHub), we create a new class, RemoveMetadata.java, in the org.apache.nifi.processors.standard package, which extends AbstractProcessor.

We add the annotations @Tags, for the associated search tags, @CapabilityDescription, for the description of the processor, @InputRequirement, to indicate if it only runs when a new flowfile arrives in its input dependencies, @ReadsAttributes, for the attributes it uses read from the incoming flowfile, @WritesAttributes, for the new attributes it adds to the outgoing flowfile(s).


@Tags({"remove", "extract", "metadata", "file"})
@CapabilityDescription("Process that generates a new flowfile by removing the metadata from the original flowfile if it is WORD or EXCEL. You can configure the absolute path of the file and the attributes of the original flowfile that you want to keep in the generated flowfile.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ReadsAttributes({
        @ReadsAttribute(attribute = "filename", description = "Filename of the original flowfile"),
        @ReadsAttribute(attribute = "absolute.path", description = "Absolute path of the original flowfile")})
@WritesAttributes({
        @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on disk"),
        @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on disk. For example, "
                + "if the <Input Directory> property is set to /tmp, files picked up from /tmp will have the path attribute set to ./. If "
                + "the <Recurse Subdirectories> property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will "
                + "be set to abc/1/2/3"),
        @WritesAttribute(attribute = "file.creationTime", description = "The date and time that the file was created. May not work on all file systems"),
        @WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the file was last modified. May not work on all "
                + "file systems"),
        @WritesAttribute(attribute = "file.lastAccessTime", description = "The date and time that the file was last accessed. May not work on all "
                + "file systems"),
        @WritesAttribute(attribute = "file.owner", description = "The owner of the file. May not work on all file systems"),
        @WritesAttribute(attribute = "file.group", description = "The group owner of the file. May not work on all file systems"),
        @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the file. May not work on all file systems"),
        @WritesAttribute(attribute = "num.file", description = "Sequential index of file number"),
        @WritesAttribute(attribute = "absolute.path", description = "The full/absolute path from where a file was picked up. The current 'path' "
                + "attribute is still populated, but may be a relative path"),
        @WritesAttribute(attribute = "process.error", description = "Error message when extracting metadata")})
public class RemoveMetadata extends AbstractProcessor {

We add the properties and relations that the processor will have.

private static final PropertyDescriptor ABSOLUTE_FILE_PATH = new PropertyDescriptor
		.Builder().name("ABSOLUTE_FILE_PATH")
		.displayName("Absolute File Path")
		.description("Path where file it's.")
		.required(false)
		.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
		.build();

public static final PropertyDescriptor ATTRIBUTES_INHERIT = new PropertyDescriptor.Builder()
		.name("Attributes To Inherit")
		.description("All attributes (separated by comma ',') that must be maintained in the new flowfile")
		.required(false)
		.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
		.build();

private static final Relationship SUCCESS = new Relationship.Builder()
		.name("success")
		.description("Success removing file metadata.")
		.build();

private static final Relationship FAILURE = new Relationship.Builder()
		.name("failure")
		.description("Failure removing file metadata.")
		.build();

private static final Relationship ORIGINAL = new Relationship.Builder()
		.name("original")
		.description("Original file with metadata.")
		.build();

private List<PropertyDescriptor> descriptors;

private Set<Relationship> relationships;

@Override
protected void init(final ProcessorInitializationContext context) {
	final List<PropertyDescriptor> listDescriptors = new ArrayList();
	listDescriptors.add(ABSOLUTE_FILE_PATH);
	listDescriptors.add(ATTRIBUTES_INHERIT);
	this.descriptors = Collections.unmodifiableList(listDescriptors);

	final Set<Relationship> setRelationships = new HashSet();
	setRelationships.add(SUCCESS);
	setRelationships.add(FAILURE);
	setRelationships.add(ORIGINAL);
	this.relationships = Collections.unmodifiableSet(setRelationships);
}

@Override
public Set<Relationship> getRelationships() {
	return this.relationships;
}

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
	return descriptors;
}

And the processor logic is codified in the onTrigger method. In this case, it is checked that the file represented by the flowfile exists in the disk,

// Check that the absolute path has been given by the user
String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
String absFilePath = context.getProperty(ABSOLUTE_FILE_PATH).evaluateAttributeExpressions(flowFile).getValue();
if (absFilePath == null) {
    absFilePath = flowFile.getAttribute(CoreAttributes.ABSOLUTE_PATH.key()) + filename;
}
getLogger().debug("Removing metadata to: " + absFilePath);

File file = new File(absFilePath);
getLogger().debug("File path: " + file.getPath());

it reads,

try (FileInputStream inputStream = new FileInputStream(absFilePath)) {

its extension is extracted to check if it is DOC, DOCX, XLS, XLSX or PDF.

//Extract the extension and check if it is DOC,DOCX, XLS or XLSX
String extension = filename.substring(filename.indexOf('.'));
switch (extension) {
	case ".doc":
		try (HWPFDocument doc = new HWPFDocument(inputStream)) {
			removeMetadata(doc.getSummaryInformation(), doc.getDocumentSummaryInformation());
			outputStream = new FileOutputStream(file);
			doc.write(outputStream);
			outputStream.close();
			getLogger().debug("The metadata of a .doc file has been deleted.");
		}
		break;
	case ".docx":
		try (XWPFDocument doc = new XWPFDocument(inputStream)) {
			props = doc.getProperties();
			removeMetadata(props);
			outputStream = new FileOutputStream(file);
			doc.write(outputStream);
			outputStream.close();
			getLogger().debug("The metadata of a .docx file has been deleted.");
		}
		break;
	case ".xls":
		try (HSSFWorkbook xls = new HSSFWorkbook(inputStream)) {
			removeMetadata(xls.getSummaryInformation(), xls.getDocumentSummaryInformation());
			outputStream = new FileOutputStream(file);
			xls.write(outputStream);
			outputStream.close();
			getLogger().debug("The metadata of a .xls file has been deleted.");
		}
		break;
	case ".xlsx":
		try (XSSFWorkbook xlsx = new XSSFWorkbook(inputStream)) {
			props = xlsx.getProperties();
			removeMetadata(props);
			outputStream = new FileOutputStream(file);
			xlsx.write(outputStream);
			outputStream.close();
			getLogger().debug("The metadata of a .xlsx file has been deleted.");
		}
		break;
	case ".pdf":
		removeMetadata(absFilePath);;
		getLogger().debug("The metadata of a .pdf file has been deleted.");
		break;
	default:
		break;
}

and the metadata is removed.

/**
 * Title, Subject, Creator, Category, Keywords, Description, Last modified User, Status, Manager, Company and HyperlinkBase
 * metadata are removed from DOCX or XLSX files.
 *
 * @param props File properties
 */
private void removeMetadata(POIXMLProperties props) {
	POIXMLProperties.CoreProperties coreProps = props.getCoreProperties();
	POIXMLProperties.ExtendedProperties extProps = props.getExtendedProperties();

	coreProps.setTitle("");
	coreProps.setSubjectProperty("");
	coreProps.setCreator("");
	coreProps.setCategory("");
	coreProps.setKeywords("");
	coreProps.setDescription("");
	coreProps.setLastModifiedByUser("");
	coreProps.setContentStatus("");

	extProps.getUnderlyingProperties().setManager("");
	extProps.getUnderlyingProperties().setCompany("");
	extProps.getUnderlyingProperties().setHyperlinkBase("");
}

/**
 * Title, Subject, Creator, Category, Keywords, Description, Last modified User, Status, Manager, Company and HyperlinkBase
 * metadata are removed from DOC or XLS files.
 *
 * @param summary File properties
 * @param docSummary Other file properties
 */
private void removeMetadata(SummaryInformation summary, DocumentSummaryInformation docSummary) {
	summary.setTitle("");
	summary.setSubject("");
	summary.setAuthor("");
	summary.setKeywords("");
	summary.setComments("");
	summary.setLastAuthor("");

	docSummary.setContentStatus("");
	docSummary.setManager("");
	docSummary.setCompany("");
	docSummary.setCategory("");
	if (docSummary.getSectionCount() >= 2) {
		CustomProperties customProperties = new CustomProperties();
		docSummary.setCustomProperties(customProperties); //HyperlinkBase is removed
	}
}

/**
 * Remove metadata from pdf files. The pdf file is extracted into a byte array and
 * the Title, Author, Subject, Keywords, Creator, Producer, CreationDate, ModDate and Trapped
 * fields are removed.The xmp metadata is also removed.
 *
 * @param src absolute path where the file to be processed is located
 * @throws IOException error not being able process the PDF file
 * @throws DocumentException error not being able to read or edit the fiels of the PDF file
 */
private void removeMetadata(String src) throws IOException, DocumentException{
	// read existing pdf document
	Path pdfPath = Paths.get(src);
	byte[] pdf = Files.readAllBytes(pdfPath);
	PdfReader reader = new PdfReader(pdf);
	PdfStamper stamper = new PdfStamper(reader, new FileOutputStream(src));

	// get and edit meta-data
	HashMap<String, String> hMap = reader.getInfo();
	hMap.put("Title", null);
	hMap.put("Author", null);
	hMap.put("Subject", null);
	hMap.put("Keywords", null);
	hMap.put("Creator", null);
	hMap.put("Producer", null);
	hMap.put("CreationDate", null);
	hMap.put("ModDate", null);
	hMap.put("Trapped", null);

	// add updated meta-data to pdf
	stamper.setMoreInfo(hMap);

	// update xmp meta-data
	ByteArrayOutputStream baos = new ByteArrayOutputStream();
	XmpWriter xmp = new XmpWriter(baos, hMap);
	xmp.close();
	stamper.setXmpMetadata(baos.toByteArray());

	stamper.close();
	reader.close();
}

Finally, a new flowfile is created that represents the current version of the file,

// Create a new flowfile for the file without metadata
file = new File(absFilePath);
final long importStart = System.nanoTime();
FlowFile newFlowFile = session.create();
newFlowFile = session.importFrom(file.toPath(), true, newFlowFile);
final long importNanos = System.nanoTime() - importStart;
final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
newFlowFile = session.putAttribute(newFlowFile, CoreAttributes.FILENAME.key(), file.getName());
newFlowFile = session.putAttribute(newFlowFile, CoreAttributes.PATH.key(), file.getPath());
newFlowFile = session.putAttribute(newFlowFile, CoreAttributes.ABSOLUTE_PATH.key(), file.getAbsolutePath());
Map<String, String> attributes = getAttributesFromFile(file.toPath());
if (attributes.size() > 0) {
	newFlowFile = session.putAllAttributes(newFlowFile, attributes);
}
session.getProvenanceReporter().receive(newFlowFile, file.toURI().toString(), importMillis);

// Insert in the new flowfile the attributes that the user wants to inherit
String sAttsInherit = context.getProperty(ATTRIBUTES_INHERIT).getValue();
Map<String, String> attributesInherit = new HashMap<>();
if (!StringUtils.isBlank(sAttsInherit)) {
	List<String> keysAttsInherit = Arrays.asList(sAttsInherit.split(","));
	for (String key : keysAttsInherit) {
		String value = flowFile.getAttribute(key.trim());
		if (!StringUtils.isBlank(value)) {
			attributesInherit.put(key.trim(), value);
		}
	}
}
newFlowFile = session.putAllAttributes(newFlowFile, attributesInherit);

and each flowfile is transferred to its corresponding relation. In case of an exception, the new one is deleted and the original one is transferred to the failure relation adding as process.error attribute with the error message.

	session.transfer(newFlowFile, SUCCESS);
	getLogger().debug("The flowfile has been transferred to Success.");

	session.transfer(flowFile, ORIGINAL);
	getLogger().debug("The original flowfile has been transferred to Original.");
} catch (IOException e) {
	session.transfer(flowFile, FAILURE);
	getLogger().error("The flowfile has been transferred to Failure with error: " + e.getMessage());
}

To be continued...In the next article we will see how to start Apache NiFi and create the entire processor flow to meet the requirements of the use case.

Share the article

Share on twitter
Twitter
Share on linkedin
LinkedIn
Share on email
Email
Share on whatsapp
WhatsApp

A new generation of technological services and products for our customers