Skip to content

Commit

Permalink
NIFI-7377 Cleaned up nifi-stateless logs.
Browse files Browse the repository at this point in the history
Refactored masking logic to CipherUtility and indicated masking with label and Base64 output.
Added JSON masking logic to nifi-stateless module.
Added argument masking functionality to Program.
Moved groovy unit tests to proper Maven directory structure.
Modified plain argument output to use filtering/masking methods in provided utility.
Refactored utility methods.
Updated unit tests.

This closes #4222.

Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>

Signed-off-by: Andy LoPresto <alopresto@apache.org>
  • Loading branch information
alopresto committed Apr 28, 2020
1 parent 996688b commit 148537d
Show file tree
Hide file tree
Showing 13 changed files with 679 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,40 @@ public static int getSaltLengthForAlgorithm(String algorithm) {
}
return saltLength;
}

/**
* Returns a securely-derived, deterministic value from the provided plaintext property
* value. This is because sensitive values should not be disclosed through the
* logs. However, the equality or difference of the sensitive value can be important, so it cannot be ignored completely.
*
* The specific derivation process is unimportant as long as it is a salted,
* cryptographically-secure hash function with an iteration cost sufficient for password
* storage in other applications.
*
* @param sensitivePropertyValue the plaintext property value
* @return a deterministic string value which represents this input but is safe to print in a log
*/
public static String getLoggableRepresentationOfSensitiveValue(String sensitivePropertyValue) {
// TODO: Use DI/IoC to inject this implementation in the constructor of the FingerprintFactory
// There is little initialization cost, so it doesn't make sense to cache this as a field
SecureHasher secureHasher = new Argon2SecureHasher();

// TODO: Extend {@link StringEncryptor} with secure hashing capability and inject?
return getLoggableRepresentationOfSensitiveValue(sensitivePropertyValue, secureHasher);
}

/**
* Returns a securely-derived, deterministic value from the provided plaintext property
* value. This is because sensitive values should not be disclosed through the
* logs. However, the equality or difference of the sensitive value can be important, so it cannot be ignored completely.
*
* The specific derivation process is determined by the provided {@link SecureHasher} implementation.
*
* @param sensitivePropertyValue the plaintext property value
* @param secureHasher an instance of {@link SecureHasher} which will be used to mask the value
* @return a deterministic string value which represents this input but is safe to print in a log
*/
public static String getLoggableRepresentationOfSensitiveValue(String sensitivePropertyValue, SecureHasher secureHasher) {
return "[MASKED] (" + secureHasher.hashBase64(sensitivePropertyValue) + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@
*/
package org.apache.nifi.fingerprint;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Stream;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
Expand All @@ -25,8 +40,7 @@
import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.security.util.crypto.Argon2SecureHasher;
import org.apache.nifi.security.util.crypto.SecureHasher;
import org.apache.nifi.security.util.crypto.CipherUtility;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.LoggingXmlParserErrorHandler;
Expand All @@ -41,22 +55,6 @@
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Stream;

/**
* <p>Creates a fingerprint of a flow.xml. The order of elements or attributes in the flow.xml does not influence the fingerprint generation.
*
Expand Down Expand Up @@ -549,12 +547,7 @@ private StringBuilder addPropertyFingerprint(final StringBuilder builder, final
* @return a deterministic string value which represents this input but is safe to print in a log
*/
private String getLoggableRepresentationOfSensitiveValue(String encryptedPropertyValue) {
// TODO: Use DI/IoC to inject this implementation in the constructor of the FingerprintFactory
// There is little initialization cost, so it doesn't make sense to cache this as a field
SecureHasher secureHasher = new Argon2SecureHasher();

// TODO: Extend {@link StringEncryptor} with secure hashing capability and inject?
return secureHasher.hashHex(decrypt(encryptedPropertyValue));
return CipherUtility.getLoggableRepresentationOfSensitiveValue(decrypt(encryptedPropertyValue));
}

private StringBuilder addPortFingerprint(final StringBuilder builder, final Element portElem) throws FingerprintException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,8 @@ class FingerprintFactoryGroovyTest extends GroovyTestCase {

// Assert the fingerprint does not contain the password
assert !(fingerprint =~ "originalPlaintextPassword")
def maskedValue = (fingerprint =~ /\[MASKED\] \([\w\/\+=]+\)/)
assert maskedValue
logger.info("Masked value: ${maskedValue[0]}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void testRemoteProcessGroupFingerprintWithProxy() throws Exception {
when(component.getVersionedComponentId()).thenReturn(Optional.empty());

// Assert fingerprints with expected one.
final String hashedProxyPassword = new Argon2SecureHasher().hashHex(proxyPassword);
final String hashedProxyPassword = "[MASKED] (" + new Argon2SecureHasher().hashBase64(proxyPassword) + ")";
final String expected = "id" +
"NO_VALUE" +
"http://node1:8080/nifi, http://node2:8080/nifi" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<artifactId>nifi-nar-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-expression-language</artifactId>
Expand All @@ -68,7 +73,7 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down Expand Up @@ -113,6 +118,11 @@
<type>war</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-json</artifactId>
<version>${nifi.groovy.version}</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
Expand All @@ -45,20 +58,7 @@
import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.stateless.bootstrap.RunnableFlow;

import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.stateless.core.security.StatelessSecurityUtility;

public class StatelessFlow implements RunnableFlow {

Expand Down Expand Up @@ -319,6 +319,7 @@ private void findRemoteGroupRecursive(final VersionedProcessGroup group, final M



@Override
public boolean run(final Queue<InMemoryFlowFile> output) {
while (!this.stopRequested) {
for (final StatelessComponent pw : roots) {
Expand All @@ -332,6 +333,7 @@ public boolean run(final Queue<InMemoryFlowFile> output) {
return true;
}

@Override
public boolean runOnce(Queue<InMemoryFlowFile> output) {
for (final StatelessComponent pw : roots) {
final boolean successful = pw.runRecursive(output);
Expand All @@ -354,6 +356,7 @@ public static SSLContext getSSLContext(final JsonObject config) {
}

final JsonObject sslObject = config.get(SSL).getAsJsonObject();
// TODO: Only evaluates to true when all properties are present; some flows can have truststore properties and no keystore or vice-versa
if (sslObject.has(KEYSTORE) && sslObject.has(KEYSTORE_PASS) && sslObject.has(KEYSTORE_TYPE)
&& sslObject.has(TRUSTSTORE) && sslObject.has(TRUSTSTORE_PASS) && sslObject.has(TRUSTSTORE_TYPE)) {

Expand All @@ -377,38 +380,38 @@ public static SSLContext getSSLContext(final JsonObject config) {
return null;
}

public static StatelessFlow createAndEnqueueFromJSON(final JsonObject args, final ClassLoader systemClassLoader, final File narWorkingDir)
public static StatelessFlow createAndEnqueueFromJSON(final JsonObject jsonObject, final ClassLoader systemClassLoader, final File narWorkingDir)
throws InitializationException, IOException, ProcessorInstantiationException, NiFiRegistryException {
if (args == null) {
if (jsonObject == null) {
throw new IllegalArgumentException("Flow arguments can not be null");
}

System.out.println("Running flow from json: " + args.toString());
System.out.println("Running flow from json: " + StatelessSecurityUtility.getLoggableRepresentationOfJsonObject(jsonObject));

if (!args.has(REGISTRY) || !args.has(BUCKETID) || !args.has(FLOWID)) {
if (!jsonObject.has(REGISTRY) || !jsonObject.has(BUCKETID) || !jsonObject.has(FLOWID)) {
throw new IllegalArgumentException("The following parameters must be provided: " + REGISTRY + ", " + BUCKETID + ", " + FLOWID);
}

final String registryurl = args.getAsJsonPrimitive(REGISTRY).getAsString();
final String bucketID = args.getAsJsonPrimitive(BUCKETID).getAsString();
final String flowID = args.getAsJsonPrimitive(FLOWID).getAsString();
final String registryurl = jsonObject.getAsJsonPrimitive(REGISTRY).getAsString();
final String bucketID = jsonObject.getAsJsonPrimitive(BUCKETID).getAsString();
final String flowID = jsonObject.getAsJsonPrimitive(FLOWID).getAsString();

int flowVersion = -1;
if (args.has(FLOWVERSION)) {
flowVersion = args.getAsJsonPrimitive(FLOWVERSION).getAsInt();
if (jsonObject.has(FLOWVERSION)) {
flowVersion = jsonObject.getAsJsonPrimitive(FLOWVERSION).getAsInt();
}

boolean materializeContent = true;
if (args.has(MATERIALIZECONTENT)) {
materializeContent = args.getAsJsonPrimitive(MATERIALIZECONTENT).getAsBoolean();
if (jsonObject.has(MATERIALIZECONTENT)) {
materializeContent = jsonObject.getAsJsonPrimitive(MATERIALIZECONTENT).getAsBoolean();
}

final List<String> failurePorts = new ArrayList<>();
if (args.has(FAILUREPORTS)) {
args.getAsJsonArray(FAILUREPORTS).forEach(port ->failurePorts.add(port.getAsString()));
if (jsonObject.has(FAILUREPORTS)) {
jsonObject.getAsJsonArray(FAILUREPORTS).forEach(port ->failurePorts.add(port.getAsString()));
}

final SSLContext sslContext = getSSLContext(args);
final SSLContext sslContext = getSSLContext(jsonObject);
final VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion);

final Map<VariableDescriptor, String> inputVariables = new HashMap<>();
Expand All @@ -423,8 +426,8 @@ public static StatelessFlow createAndEnqueueFromJSON(final JsonObject args, fina

final Set<Parameter> parameters = new HashSet<>();
final Set<String> parameterNames = new HashSet<>();
if (args.has(PARAMETERS)) {
final JsonElement parametersElement = args.get(PARAMETERS);
if (jsonObject.has(PARAMETERS)) {
final JsonElement parametersElement = jsonObject.get(PARAMETERS);
final JsonObject parametersObject = parametersElement.getAsJsonObject();

for (final Map.Entry<String, JsonElement> entry : parametersObject.entrySet()) {
Expand Down Expand Up @@ -467,7 +470,7 @@ public static StatelessFlow createAndEnqueueFromJSON(final JsonObject args, fina
final ParameterContext parameterContext = new StatelessParameterContext(parameters);
final ExtensionManager extensionManager = ExtensionDiscovery.discover(narWorkingDir, systemClassLoader);
final StatelessFlow flow = new StatelessFlow(snapshot.getFlowContents(), extensionManager, () -> inputVariables, failurePorts, materializeContent, sslContext, parameterContext);
flow.enqueueFromJSON(args);
flow.enqueueFromJSON(jsonObject);
return flow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@
package org.apache.nifi.stateless.core;

import com.google.gson.JsonObject;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.stream.io.StreamUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -40,6 +33,12 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.stream.io.StreamUtils;

public class StatelessFlowFile implements InMemoryFlowFile {

Expand Down Expand Up @@ -104,7 +103,7 @@ public StatelessFlowFile(boolean materializeContent) {
this.id = nextID.getAndIncrement();
this.entryDate = System.currentTimeMillis();
this.lastEnqueuedDate = entryDate;
attributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()) + ".statelessFlowFile");
attributes.put(CoreAttributes.FILENAME.key(), System.nanoTime() + ".statelessFlowFile");
attributes.put(CoreAttributes.PATH.key(), "target");
attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
}
Expand Down Expand Up @@ -241,6 +240,7 @@ public String toStringFull() {
try {
result.addProperty("content", new String(this.getDataArray(), StandardCharsets.UTF_8));
} catch (IOException e) {
// TODO: Hex encode binary content if it is not plaintext
result.addProperty("content", "Exception getting content: " + e.getMessage());
}

Expand Down
Loading

0 comments on commit 148537d

Please # to comment.