Skip to content

Apache NiFi: Validando transferencias de ficheros (Caso de uso real)

Share on twitter
Share on linkedin
Share on email
Share on whatsapp
Apache NiFi: Validando transferencias de ficheros

¿Qué es Apache NiFi?

Apache NiFi es un motor de ejecución de flujos de enrutamiento y transformación de datos. Es decir, recoge datos de una fuente, los trata a través de distintos procesadores y los escribe en otra fuente distinta.

Para una primera toma de contacto con la herramienta, se puede consultar este artículo ¿Tus datos se cambian de casa? Apache NiFi te ayuda con la mudanza.

Otros conceptos

Los procesadores son los bloques básicos para crear un flujo de datos, todos consumen o generan flowfiles y cada uno tiene una funcionalidad diferente.

Un FlowFile es un registro de datos, que consiste en un puntero a su contenido (carga útil) más los atributos para respaldar el contenido.

Descripción del caso de uso sobre Apache NiFi

Se deben trasladar los ficheros de un directorio de entrada (FILE o FTP) a un directorio de salida (FILE o FTP). En el proceso, debe ejecutarse un antivirus para detectar ficheros corruptos, eliminarse los metadatos para WORD, EXCEL y PDF y validarse que el fichero tenga nombre correcto y extensión permitida.

En caso de no ser correcto, se escribirá un fichero ${nombreFichero}_NOK.txt en la ruta de salida, se dejará el fichero en una ruta alternativa y se enviará un email informando del error producido.

Toda la configuración estará disponible en un fichero XML.

Desarrollo de los procesadores Java

Si buscamos en la documentación de NiFi, entre todos los procesadores que se encuentran disponibles en sus librerías estándar, nos percatamos de que no existe ninguno que nos permita eliminar los metadatos de un fichero.

Teniendo descargado el código Java (disponible en GitHub), creamos una nueva clase, RemoveMetadata.java, en el paquete org.apache.nifi.processors.standard, que extienda de AbstractProcessor.

Agregamos las anotaciones @Tags, para las etiquetas de búsqueda asociadas, @CapabilityDescription, para la descripción del procesador, @InputRequirement, para indicar si solo se ejecuta cuando llegue un nuevo flowfile a sus relaciones de entrada, @ReadsAttributes, para los atributos que utiliza leídos del flowfile entrante, @WritesAttributes, para los atributos nuevos que agrega al o a los flowfiles salientes.


@Tags({"remove", "extract", "metadata", "file"})
@CapabilityDescription("Process that generates a new flowfile by removing the metadata from the original flowfile if it is WORD or EXCEL. You can configure the absolute path of the file and the attributes of the original flowfile that you want to keep in the generated flowfile.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ReadsAttributes({
        @ReadsAttribute(attribute = "filename", description = "Filename of the original flowfile"),
        @ReadsAttribute(attribute = "absolute.path", description = "Absolute path of the original flowfile")})
@WritesAttributes({
        @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on disk"),
        @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on disk. For example, "
                + "if the <Input Directory> property is set to /tmp, files picked up from /tmp will have the path attribute set to ./. If "
                + "the <Recurse Subdirectories> property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will "
                + "be set to abc/1/2/3"),
        @WritesAttribute(attribute = "file.creationTime", description = "The date and time that the file was created. May not work on all file systems"),
        @WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the file was last modified. May not work on all "
                + "file systems"),
        @WritesAttribute(attribute = "file.lastAccessTime", description = "The date and time that the file was last accessed. May not work on all "
                + "file systems"),
        @WritesAttribute(attribute = "file.owner", description = "The owner of the file. May not work on all file systems"),
        @WritesAttribute(attribute = "file.group", description = "The group owner of the file. May not work on all file systems"),
        @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the file. May not work on all file systems"),
        @WritesAttribute(attribute = "num.file", description = "Sequential index of file number"),
        @WritesAttribute(attribute = "absolute.path", description = "The full/absolute path from where a file was picked up. The current 'path' "
                + "attribute is still populated, but may be a relative path"),
        @WritesAttribute(attribute = "process.error", description = "Error message when extracting metadata")})
public class RemoveMetadata extends AbstractProcessor {

Añadimos las propiedades y relaciones que vaya a tener el procesador.

private static final PropertyDescriptor ABSOLUTE_FILE_PATH = new PropertyDescriptor
		.Builder().name("ABSOLUTE_FILE_PATH")
		.displayName("Absolute File Path")
		.description("Path where file it's.")
		.required(false)
		.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
		.build();

public static final PropertyDescriptor ATTRIBUTES_INHERIT = new PropertyDescriptor.Builder()
		.name("Attributes To Inherit")
		.description("All attributes (separated by comma ',') that must be maintained in the new flowfile")
		.required(false)
		.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
		.build();

private static final Relationship SUCCESS = new Relationship.Builder()
		.name("success")
		.description("Success removing file metadata.")
		.build();

private static final Relationship FAILURE = new Relationship.Builder()
		.name("failure")
		.description("Failure removing file metadata.")
		.build();

private static final Relationship ORIGINAL = new Relationship.Builder()
		.name("original")
		.description("Original file with metadata.")
		.build();

private List<PropertyDescriptor> descriptors;

private Set<Relationship> relationships;

@Override
protected void init(final ProcessorInitializationContext context) {
	final List<PropertyDescriptor> listDescriptors = new ArrayList();
	listDescriptors.add(ABSOLUTE_FILE_PATH);
	listDescriptors.add(ATTRIBUTES_INHERIT);
	this.descriptors = Collections.unmodifiableList(listDescriptors);

	final Set<Relationship> setRelationships = new HashSet();
	setRelationships.add(SUCCESS);
	setRelationships.add(FAILURE);
	setRelationships.add(ORIGINAL);
	this.relationships = Collections.unmodifiableSet(setRelationships);
}

@Override
public Set<Relationship> getRelationships() {
	return this.relationships;
}

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
	return descriptors;
}

Y se codifica la lógica del procesador en el método onTrigger. En este caso, se comprueba que exista en disco el fichero representado por el flowfile,

// Check that the absolute path has been given by the user
String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
String absFilePath = context.getProperty(ABSOLUTE_FILE_PATH).evaluateAttributeExpressions(flowFile).getValue();
if (absFilePath == null) {
    absFilePath = flowFile.getAttribute(CoreAttributes.ABSOLUTE_PATH.key()) + filename;
}
getLogger().debug("Removing metadata to: " + absFilePath);

File file = new File(absFilePath);
getLogger().debug("File path: " + file.getPath());

se lee,

try (FileInputStream inputStream = new FileInputStream(absFilePath)) {

se extrae su extensión para comprobar que sea DOC, DOCX, XLS,XLSX o PDF

//Extract the extension and check if it is DOC,DOCX, XLS or XLSX
String extension = filename.substring(filename.indexOf('.'));
switch (extension) {
	case ".doc":
		try (HWPFDocument doc = new HWPFDocument(inputStream)) {
			removeMetadata(doc.getSummaryInformation(), doc.getDocumentSummaryInformation());
			outputStream = new FileOutputStream(file);
			doc.write(outputStream);
			outputStream.close();
			getLogger().debug("The metadata of a .doc file has been deleted.");
		}
		break;
	case ".docx":
		try (XWPFDocument doc = new XWPFDocument(inputStream)) {
			props = doc.getProperties();
			removeMetadata(props);
			outputStream = new FileOutputStream(file);
			doc.write(outputStream);
			outputStream.close();
			getLogger().debug("The metadata of a .docx file has been deleted.");
		}
		break;
	case ".xls":
		try (HSSFWorkbook xls = new HSSFWorkbook(inputStream)) {
			removeMetadata(xls.getSummaryInformation(), xls.getDocumentSummaryInformation());
			outputStream = new FileOutputStream(file);
			xls.write(outputStream);
			outputStream.close();
			getLogger().debug("The metadata of a .xls file has been deleted.");
		}
		break;
	case ".xlsx":
		try (XSSFWorkbook xlsx = new XSSFWorkbook(inputStream)) {
			props = xlsx.getProperties();
			removeMetadata(props);
			outputStream = new FileOutputStream(file);
			xlsx.write(outputStream);
			outputStream.close();
			getLogger().debug("The metadata of a .xlsx file has been deleted.");
		}
		break;
	case ".pdf":
		removeMetadata(absFilePath);;
		getLogger().debug("The metadata of a .pdf file has been deleted.");
		break;
	default:
		break;
}

y se le eliminan los metadatos.

/**
 * Title, Subject, Creator, Category, Keywords, Description, Last modified User, Status, Manager, Company and HyperlinkBase
 * metadata are removed from DOCX or XLSX files.
 *
 * @param props File properties
 */
private void removeMetadata(POIXMLProperties props) {
	POIXMLProperties.CoreProperties coreProps = props.getCoreProperties();
	POIXMLProperties.ExtendedProperties extProps = props.getExtendedProperties();

	coreProps.setTitle("");
	coreProps.setSubjectProperty("");
	coreProps.setCreator("");
	coreProps.setCategory("");
	coreProps.setKeywords("");
	coreProps.setDescription("");
	coreProps.setLastModifiedByUser("");
	coreProps.setContentStatus("");

	extProps.getUnderlyingProperties().setManager("");
	extProps.getUnderlyingProperties().setCompany("");
	extProps.getUnderlyingProperties().setHyperlinkBase("");
}

/**
 * Title, Subject, Creator, Category, Keywords, Description, Last modified User, Status, Manager, Company and HyperlinkBase
 * metadata are removed from DOC or XLS files.
 *
 * @param summary File properties
 * @param docSummary Other file properties
 */
private void removeMetadata(SummaryInformation summary, DocumentSummaryInformation docSummary) {
	summary.setTitle("");
	summary.setSubject("");
	summary.setAuthor("");
	summary.setKeywords("");
	summary.setComments("");
	summary.setLastAuthor("");

	docSummary.setContentStatus("");
	docSummary.setManager("");
	docSummary.setCompany("");
	docSummary.setCategory("");
	if (docSummary.getSectionCount() >= 2) {
		CustomProperties customProperties = new CustomProperties();
		docSummary.setCustomProperties(customProperties); //HyperlinkBase is removed
	}
}

/**
 * Remove metadata from pdf files. The pdf file is extracted into a byte array and
 * the Title, Author, Subject, Keywords, Creator, Producer, CreationDate, ModDate and Trapped
 * fields are removed.The xmp metadata is also removed.
 *
 * @param src absolute path where the file to be processed is located
 * @throws IOException error not being able process the PDF file
 * @throws DocumentException error not being able to read or edit the fiels of the PDF file
 */
private void removeMetadata(String src) throws IOException, DocumentException{
	// read existing pdf document
	Path pdfPath = Paths.get(src);
	byte[] pdf = Files.readAllBytes(pdfPath);
	PdfReader reader = new PdfReader(pdf);
	PdfStamper stamper = new PdfStamper(reader, new FileOutputStream(src));

	// get and edit meta-data
	HashMap<String, String> hMap = reader.getInfo();
	hMap.put("Title", null);
	hMap.put("Author", null);
	hMap.put("Subject", null);
	hMap.put("Keywords", null);
	hMap.put("Creator", null);
	hMap.put("Producer", null);
	hMap.put("CreationDate", null);
	hMap.put("ModDate", null);
	hMap.put("Trapped", null);

	// add updated meta-data to pdf
	stamper.setMoreInfo(hMap);

	// update xmp meta-data
	ByteArrayOutputStream baos = new ByteArrayOutputStream();
	XmpWriter xmp = new XmpWriter(baos, hMap);
	xmp.close();
	stamper.setXmpMetadata(baos.toByteArray());

	stamper.close();
	reader.close();
}

Por último, se crea un nuevo flowfile que representa a la versión actual del fichero,

// Create a new flowfile for the file without metadata
file = new File(absFilePath);
final long importStart = System.nanoTime();
FlowFile newFlowFile = session.create();
newFlowFile = session.importFrom(file.toPath(), true, newFlowFile);
final long importNanos = System.nanoTime() - importStart;
final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
newFlowFile = session.putAttribute(newFlowFile, CoreAttributes.FILENAME.key(), file.getName());
newFlowFile = session.putAttribute(newFlowFile, CoreAttributes.PATH.key(), file.getPath());
newFlowFile = session.putAttribute(newFlowFile, CoreAttributes.ABSOLUTE_PATH.key(), file.getAbsolutePath());
Map<String, String> attributes = getAttributesFromFile(file.toPath());
if (attributes.size() > 0) {
	newFlowFile = session.putAllAttributes(newFlowFile, attributes);
}
session.getProvenanceReporter().receive(newFlowFile, file.toURI().toString(), importMillis);

// Insert in the new flowfile the attributes that the user wants to inherit
String sAttsInherit = context.getProperty(ATTRIBUTES_INHERIT).getValue();
Map<String, String> attributesInherit = new HashMap<>();
if (!StringUtils.isBlank(sAttsInherit)) {
	List<String> keysAttsInherit = Arrays.asList(sAttsInherit.split(","));
	for (String key : keysAttsInherit) {
		String value = flowFile.getAttribute(key.trim());
		if (!StringUtils.isBlank(value)) {
			attributesInherit.put(key.trim(), value);
		}
	}
}
newFlowFile = session.putAllAttributes(newFlowFile, attributesInherit);

y se transfiere cada flowfile a su relación correspondiente. En caso de que salte una excepción, el nuevo se elimina y el original se transfiere a la relación failure agregando como atributo process.error con el mensaje de error.

	session.transfer(newFlowFile, SUCCESS);
	getLogger().debug("The flowfile has been transferred to Success.");

	session.transfer(flowFile, ORIGINAL);
	getLogger().debug("The original flowfile has been transferred to Original.");
} catch (IOException e) {
	session.transfer(flowFile, FAILURE);
	getLogger().error("The flowfile has been transferred to Failure with error: " + e.getMessage());
}

Continuará…En el próximo artículo veremos como arrancar Apache NiFi y crear todo el flujo de procesadores para que cumpla con los requisitos del caso de uso.

Comparte el artículo

Share on twitter
Twitter
Share on linkedin
LinkedIn
Share on email
Email
Share on whatsapp
WhatsApp

Una nueva generación de servicios tecnológicos y productos para nuestros clientes