-
-
Notifications
You must be signed in to change notification settings - Fork 674
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(masking): Various performance and readability fixes
- Loading branch information
Showing
8 changed files
with
354 additions
and
321 deletions.
There are no files selected for viewing
118 changes: 84 additions & 34 deletions
118
src/main/java/org/akhq/utils/JsonMaskByDefaultMasker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,73 +1,123 @@ | ||
package org.akhq.utils; | ||
|
||
import com.google.gson.JsonElement; | ||
import com.google.gson.JsonObject; | ||
import com.google.gson.JsonParser; | ||
import com.google.gson.*; | ||
import io.micronaut.context.annotation.Requires; | ||
import jakarta.inject.Singleton; | ||
import lombok.SneakyThrows; | ||
import org.akhq.configs.DataMasking; | ||
import org.akhq.configs.JsonMaskingFilter; | ||
import org.akhq.models.Record; | ||
|
||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
@Singleton | ||
@Requires(property = "akhq.security.data-masking.mode", value = "json_mask_by_default") | ||
public class JsonMaskByDefaultMasker implements Masker { | ||
|
||
private final List<JsonMaskingFilter> jsonMaskingFilters; | ||
private final Map<String, List<String>> topicToKeysMap; | ||
private final String jsonMaskReplacement; | ||
private static final String NON_JSON_MESSAGE = "This record is unable to be masked as it is not a structured object. This record is unavailable to view due to safety measures from json_mask_by_default to not leak sensitive data."; | ||
private static final String ERROR_MESSAGE = "An exception occurred during an attempt to mask this record. This record is unavailable to view due to safety measures from json_mask_by_default to not leak sensitive data. Please contact akhq administrator."; | ||
|
||
public JsonMaskByDefaultMasker(DataMasking dataMasking) { | ||
this.jsonMaskingFilters = dataMasking.getJsonFilters(); | ||
this.jsonMaskReplacement = dataMasking.getJsonMaskReplacement(); | ||
this.topicToKeysMap = buildTopicKeysMap(dataMasking); | ||
} | ||
|
||
private Map<String, List<String>> buildTopicKeysMap(DataMasking dataMasking) { | ||
return dataMasking.getJsonFilters().stream() | ||
.collect(Collectors.toMap( | ||
JsonMaskingFilter::getTopic, | ||
JsonMaskingFilter::getKeys, | ||
(a, b) -> a, | ||
HashMap::new | ||
)); | ||
} | ||
|
||
public Record maskRecord(Record record) { | ||
if (!isJson(record)) { | ||
return createNonJsonRecord(record); | ||
} | ||
|
||
try { | ||
if(isJson(record)) { | ||
return jsonMaskingFilters | ||
.stream() | ||
.filter(jsonMaskingFilter -> record.getTopic().getName().equalsIgnoreCase(jsonMaskingFilter.getTopic())) | ||
.findFirst() | ||
.map(filter -> applyMasking(record, filter.getKeys())) | ||
.orElseGet(() -> applyMasking(record, List.of())); | ||
} else { | ||
record.setValue("This record is unable to be masked as it is not a structured object. This record is unavailable to view due to safety measures from json_mask_by_default to not leak sensitive data. Please contact akhq administrator."); | ||
} | ||
List<String> unmaskedKeys = getUnmaskedKeysForTopic(record.getTopic().getName()); | ||
return applyMasking(record, unmaskedKeys); | ||
} catch (Exception e) { | ||
LOG.error("Error masking record at topic {}, partition {}, offset {} due to {}", record.getTopic(), record.getPartition(), record.getOffset(), e.getMessage()); | ||
record.setValue("An exception occurred during an attempt to mask this record. This record is unavailable to view due to safety measures from json_mask_by_default to not leak sensitive data. Please contact akhq administrator."); | ||
logMaskingError(record, e); | ||
return createErrorRecord(record); | ||
} | ||
} | ||
|
||
private List<String> getUnmaskedKeysForTopic(String topic) { | ||
return topicToKeysMap.getOrDefault(topic.toLowerCase(), Collections.emptyList()); | ||
} | ||
|
||
private Record createNonJsonRecord(Record record) { | ||
record.setValue(NON_JSON_MESSAGE); | ||
return record; | ||
} | ||
|
||
private Record createErrorRecord(Record record) { | ||
record.setValue(ERROR_MESSAGE); | ||
return record; | ||
} | ||
|
||
private void logMaskingError(Record record, Exception e) { | ||
LOG.error("Error masking record at topic {}, partition {}, offset {} due to {}", | ||
record.getTopic(), record.getPartition(), record.getOffset(), e.getMessage()); | ||
} | ||
|
||
@SneakyThrows | ||
private Record applyMasking(Record record, List<String> keys) { | ||
JsonObject jsonElement = JsonParser.parseString(record.getValue()).getAsJsonObject(); | ||
maskAllExcept(jsonElement, keys); | ||
record.setValue(jsonElement.toString()); | ||
private Record applyMasking(Record record, List<String> unmaskedKeys) { | ||
JsonObject root = JsonParser.parseString(record.getValue()).getAsJsonObject(); | ||
maskJson(root, "", unmaskedKeys); | ||
record.setValue(root.toString()); | ||
return record; | ||
} | ||
|
||
private void maskAllExcept(JsonObject jsonElement, List<String> keys) { | ||
maskAllExcept("", jsonElement, keys); | ||
private void maskJson(JsonElement element, String path, List<String> unmaskedKeys) { | ||
if (element.isJsonObject()) { | ||
maskJsonObject(element.getAsJsonObject(), path, unmaskedKeys); | ||
} else if (element.isJsonArray()) { | ||
maskJsonArray(element.getAsJsonArray(), path, unmaskedKeys); | ||
} | ||
} | ||
|
||
private void maskJsonObject(JsonObject obj, String path, List<String> unmaskedKeys) { | ||
for (Map.Entry<String, JsonElement> entry : obj.entrySet()) { | ||
String newPath = path + entry.getKey(); | ||
JsonElement value = entry.getValue(); | ||
|
||
if (shouldMaskPrimitive(value, newPath, unmaskedKeys)) { | ||
entry.setValue(new JsonPrimitive(jsonMaskReplacement)); | ||
} else if (isNestedStructure(value)) { | ||
maskJson(value, newPath + ".", unmaskedKeys); | ||
} | ||
} | ||
} | ||
|
||
private void maskAllExcept(String currentKey, JsonObject node, List<String> keys) { | ||
if (node.isJsonObject()) { | ||
JsonObject objectNode = node.getAsJsonObject(); | ||
for(Map.Entry<String, JsonElement> entry : objectNode.entrySet()) { | ||
if(entry.getValue().isJsonObject()) { | ||
maskAllExcept(currentKey + entry.getKey() + ".", entry.getValue().getAsJsonObject(), keys); | ||
} else { | ||
if(!keys.contains(currentKey + entry.getKey())) { | ||
objectNode.addProperty(entry.getKey(), jsonMaskReplacement); | ||
} | ||
} | ||
private void maskJsonArray(JsonArray array, String path, List<String> unmaskedKeys) { | ||
boolean shouldMask = !unmaskedKeys.contains(path.substring(0, path.length() - 1)); | ||
|
||
for (int i = 0; i < array.size(); i++) { | ||
JsonElement arrayElement = array.get(i); | ||
if (arrayElement.isJsonPrimitive() && shouldMask) { | ||
array.set(i, new JsonPrimitive(jsonMaskReplacement)); | ||
} else if (isNestedStructure(arrayElement)) { | ||
maskJson(arrayElement, path, unmaskedKeys); | ||
} | ||
} | ||
} | ||
|
||
private boolean shouldMaskPrimitive(JsonElement value, String path, List<String> unmaskedKeys) { | ||
return value.isJsonPrimitive() && !unmaskedKeys.contains(path); | ||
} | ||
|
||
private boolean isNestedStructure(JsonElement value) { | ||
return value.isJsonObject() || value.isJsonArray(); | ||
} | ||
} |
115 changes: 86 additions & 29 deletions
115
src/main/java/org/akhq/utils/JsonShowByDefaultMasker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,65 +1,122 @@ | ||
package org.akhq.utils; | ||
|
||
import com.google.gson.JsonElement; | ||
import com.google.gson.JsonObject; | ||
import com.google.gson.JsonParser; | ||
import com.google.gson.*; | ||
import io.micronaut.context.annotation.Requires; | ||
import jakarta.inject.Singleton; | ||
import lombok.SneakyThrows; | ||
import org.akhq.configs.DataMasking; | ||
import org.akhq.configs.JsonMaskingFilter; | ||
import org.akhq.models.Record; | ||
|
||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
@Singleton | ||
@Requires(property = "akhq.security.data-masking.mode", value = "json_show_by_default") | ||
public class JsonShowByDefaultMasker implements Masker { | ||
|
||
private final List<JsonMaskingFilter> jsonMaskingFilters; | ||
private final Map<String, List<String>> topicToKeysMap; | ||
private final String jsonMaskReplacement; | ||
private static final String ERROR_MESSAGE = "Error masking record"; | ||
|
||
public JsonShowByDefaultMasker(DataMasking dataMasking) { | ||
this.jsonMaskingFilters = dataMasking.getJsonFilters(); | ||
this.jsonMaskReplacement = dataMasking.getJsonMaskReplacement(); | ||
this.topicToKeysMap = buildTopicKeysMap(dataMasking); | ||
} | ||
|
||
private Map<String, List<String>> buildTopicKeysMap(DataMasking dataMasking) { | ||
return dataMasking.getJsonFilters().stream() | ||
.collect(Collectors.toMap( | ||
JsonMaskingFilter::getTopic, | ||
JsonMaskingFilter::getKeys, | ||
(a, b) -> a, | ||
HashMap::new | ||
)); | ||
} | ||
|
||
public Record maskRecord(Record record) { | ||
try { | ||
if(isJson(record)) { | ||
return jsonMaskingFilters | ||
.stream() | ||
.filter(jsonMaskingFilter -> record.getTopic().getName().equalsIgnoreCase(jsonMaskingFilter.getTopic())) | ||
.findFirst() | ||
.map(filter -> applyMasking(record, filter.getKeys())) | ||
.orElse(record); | ||
if (!isJson(record)) { | ||
return record; | ||
} | ||
return maskJsonRecord(record); | ||
} catch (Exception e) { | ||
LOG.error("Error masking record", e); | ||
LOG.error(ERROR_MESSAGE, e); | ||
return record; | ||
} | ||
return record; | ||
} | ||
|
||
private Record maskJsonRecord(Record record) { | ||
String topic = record.getTopic().getName().toLowerCase(); | ||
List<String> maskedKeys = topicToKeysMap.get(topic); | ||
return maskedKeys != null ? applyMasking(record, maskedKeys) : record; | ||
} | ||
|
||
@SneakyThrows | ||
private Record applyMasking(Record record, List<String> keys) { | ||
JsonObject jsonElement = JsonParser.parseString(record.getValue()).getAsJsonObject(); | ||
for(String key : keys) { | ||
maskField(jsonElement, key.split("\\."), 0); | ||
} | ||
record.setValue(jsonElement.toString()); | ||
private Record applyMasking(Record record, List<String> maskedKeys) { | ||
JsonObject root = JsonParser.parseString(record.getValue()).getAsJsonObject(); | ||
String[][] pathArrays = preProcessPaths(maskedKeys); | ||
maskPaths(root, pathArrays); | ||
record.setValue(root.toString()); | ||
return record; | ||
} | ||
|
||
private void maskField(JsonObject node, String[] keys, int index) { | ||
if (index == keys.length - 1) { | ||
if (node.has(keys[index])) { | ||
node.addProperty(keys[index], jsonMaskReplacement); | ||
} | ||
private String[][] preProcessPaths(List<String> maskedKeys) { | ||
return maskedKeys.stream() | ||
.map(key -> key.split("\\.")) | ||
.toArray(String[][]::new); | ||
} | ||
|
||
private void maskPaths(JsonObject root, String[][] pathArrays) { | ||
for (String[] path : pathArrays) { | ||
maskJson(root, path, 0); | ||
} | ||
} | ||
|
||
private void maskJson(JsonElement element, String[] path, int index) { | ||
if (index == path.length) return; | ||
|
||
String currentKey = path[index]; | ||
if (element.isJsonObject()) { | ||
handleJsonObject(element.getAsJsonObject(), path, index, currentKey); | ||
} else if (element.isJsonArray()) { | ||
handleJsonArray(element.getAsJsonArray(), path, index); | ||
} | ||
} | ||
|
||
private void handleJsonObject(JsonObject obj, String[] path, int index, String currentKey) { | ||
if (!obj.has(currentKey)) return; | ||
|
||
if (index == path.length - 1) { | ||
maskTargetElement(obj, currentKey); | ||
} else { | ||
JsonElement childNode = node.get(keys[index]); | ||
if (childNode != null && childNode.isJsonObject()) { | ||
maskField(childNode.getAsJsonObject(), keys, index + 1); | ||
maskJson(obj.get(currentKey), path, index + 1); | ||
} | ||
} | ||
|
||
private void handleJsonArray(JsonArray array, String[] path, int index) { | ||
for (int i = 0; i < array.size(); i++) { | ||
JsonElement arrayElement = array.get(i); | ||
if (arrayElement.isJsonObject()) { | ||
maskJson(arrayElement, path, index); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private void maskTargetElement(JsonObject obj, String currentKey) { | ||
JsonElement target = obj.get(currentKey); | ||
if (target.isJsonArray()) { | ||
maskArrayElement(target.getAsJsonArray()); | ||
} else { | ||
obj.addProperty(currentKey, jsonMaskReplacement); | ||
} | ||
} | ||
|
||
private void maskArrayElement(JsonArray array) { | ||
for (int i = 0; i < array.size(); i++) { | ||
array.set(i, new JsonPrimitive(jsonMaskReplacement)); | ||
} | ||
} | ||
} |
Oops, something went wrong.