| 1 | package com.reallifedeveloper.common.infrastructure.messaging; | |
| 2 | ||
| 3 | import java.io.File; | |
| 4 | import java.io.FileNotFoundException; | |
| 5 | import java.io.IOException; | |
| 6 | import java.nio.file.Files; | |
| 7 | import java.nio.file.StandardCopyOption; | |
| 8 | import java.util.Map; | |
| 9 | ||
| 10 | import org.apache.kafka.common.config.SaslConfigs; | |
| 11 | import org.apache.kafka.common.config.SslConfigs; | |
| 12 | import org.springframework.core.io.DefaultResourceLoader; | |
| 13 | import org.springframework.core.io.Resource; | |
| 14 | ||
| 15 | import lombok.experimental.UtilityClass; | |
| 16 | ||
| 17 | /** | |
| 18 | * Utility class used to set the security-related Kafka configuration properties when creating consumers and producers. | |
| 19 | * <p> | |
| 20 | * In particular, it allows reading the truststore from a classpath resource using a {@code classpath:} prefix. | |
| 21 | * | |
| 22 | * @author RealLifeDeveloper | |
| 23 | */ | |
| 24 | @UtilityClass | |
| 25 | public final class KafkaSecurityConfiguration { | |
| 26 | ||
| 27 | /** | |
| 28 | * The configuration property to use to set security protocol, e.g., {@code SASL_SSL}. | |
| 29 | */ | |
| 30 | public static final String SECURITY_PROTOCOL_CONFIGURATION_KEY = "security.protocol"; | |
| 31 | ||
| 32 | /** | |
| 33 | * The configuration property to use to set the location of the truststore, supports {@code classpath} prefixes. | |
| 34 | */ | |
| 35 | public static final String SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; | |
| 36 | ||
| 37 | /** | |
| 38 | * The configuration property to use to set the password of the truststore. | |
| 39 | */ | |
| 40 | public static final String SSL_TRUSTSTORE_PASSWORD_CONFIGURATION_KEY = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; | |
| 41 | ||
| 42 | /** | |
| 43 | * The configuration property to use to set the SASL mechanism, e.g., {@code PLAIN}. | |
| 44 | */ | |
| 45 | public static final String SASL_MECHNISM_CONFIGURATION_KEY = SaslConfigs.SASL_MECHANISM; | |
| 46 | ||
| 47 | /** | |
| 48 | * The configuration property to use to set the JAAS config. | |
| 49 | */ | |
| 50 | public static final String SASL_JAAS_CONFIG_CONFIGURATION_KEY = SaslConfigs.SASL_JAAS_CONFIG; | |
| 51 | ||
| 52 | private static final DefaultResourceLoader RESOURCE_LOADER = new DefaultResourceLoader(); | |
| 53 | ||
| 54 | /** | |
| 55 | * Sets the value of a configuration property. | |
| 56 | * <p> | |
| 57 | * A value can be set only once for a particular property; trying to reset a value causes an exception to be thrown. | |
| 58 | * <p> | |
| 59 | * There is special handling of the {@value #SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY} configuration property, with support for reading | |
| 60 | * a trust store from the classpath if the configuration value starts with {@code classpath:}. | |
| 61 | * <p> | |
| 62 | * In the case that the {@value #SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY} value starts with {@code classpath:}, the trust store is | |
| 63 | * read as a resource using the current thread's context classloader. The store is copied to a temporary file, and the path of the file | |
| 64 | * is used as the value to set. | |
| 65 | * | |
| 66 | * @param configurationKey the configuration key to apply | |
| 67 | * @param configurationValue the configuration value | |
| 68 | * @param configurationProperties the currently set configuration properties | |
| 69 | * | |
| 70 | * @throws IllegalStateException if trying to reset a value | |
| 71 | * @throws IOException if reading the trust store from classpath fails | |
| 72 | */ | |
| 73 | public static void applySecurityConfiguration(final String configurationKey, final String configurationValue, | |
| 74 | final Map<String, Object> configurationProperties) throws IOException { | |
| 75 | ||
| 76 |
1
1. applySecurityConfiguration : negated conditional → KILLED |
if (SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY.equals(configurationKey)) { |
| 77 |
1
1. applySecurityConfiguration : removed call to com/reallifedeveloper/common/infrastructure/messaging/KafkaSecurityConfiguration::applySslTruststoreLocationConfiguration → KILLED |
applySslTruststoreLocationConfiguration(configurationValue, configurationProperties); |
| 78 | } else { | |
| 79 |
1
1. applySecurityConfiguration : removed call to com/reallifedeveloper/common/infrastructure/messaging/KafkaSecurityConfiguration::applyConfigurationIfNotAlreadySet → KILLED |
applyConfigurationIfNotAlreadySet(configurationKey, configurationValue, configurationProperties); |
| 80 | } | |
| 81 | } | |
| 82 | ||
| 83 | private static void applySslTruststoreLocationConfiguration(final String sslTruststoreConfigurationValue, | |
| 84 | final Map<String, Object> configurationProperties) throws IOException { | |
| 85 |
1
1. applySslTruststoreLocationConfiguration : negated conditional → KILLED |
if (sslTruststoreConfigurationValue.startsWith("classpath:")) { |
| 86 | final Resource jksResource = RESOURCE_LOADER.getResource(sslTruststoreConfigurationValue); | |
| 87 | ||
| 88 |
1
1. applySslTruststoreLocationConfiguration : negated conditional → KILLED |
if (!jksResource.exists()) { |
| 89 | throw new FileNotFoundException("Kafka trust store not found: " + sslTruststoreConfigurationValue); | |
| 90 | } | |
| 91 | ||
| 92 | final File tempFile = File.createTempFile("truststore", ".jks"); | |
| 93 | Files.copy(jksResource.getInputStream(), tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); | |
| 94 |
1
1. applySslTruststoreLocationConfiguration : removed call to com/reallifedeveloper/common/infrastructure/messaging/KafkaSecurityConfiguration::applyConfigurationIfNotAlreadySet → KILLED |
applyConfigurationIfNotAlreadySet(SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY, tempFile.getCanonicalPath(), |
| 95 | configurationProperties); | |
| 96 | } else { | |
| 97 |
1
1. applySslTruststoreLocationConfiguration : removed call to com/reallifedeveloper/common/infrastructure/messaging/KafkaSecurityConfiguration::applyConfigurationIfNotAlreadySet → KILLED |
applyConfigurationIfNotAlreadySet(SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY, sslTruststoreConfigurationValue, |
| 98 | configurationProperties); | |
| 99 | } | |
| 100 | } | |
| 101 | ||
| 102 | private static void applyConfigurationIfNotAlreadySet(String configurationKey, String configurationValue, | |
| 103 | Map<String, Object> configurationProperties) { | |
| 104 |
1
1. applyConfigurationIfNotAlreadySet : negated conditional → KILLED |
if (configurationProperties.containsKey(configurationKey)) { |
| 105 | String msg = String.format( | |
| 106 | "Trying to reset the value of property that has already been set: configurationKey=%s, " | |
| 107 | + "configurationValue=%s, configurationProperties=%s", | |
| 108 | configurationKey, configurationValue, configurationProperties); | |
| 109 | throw new IllegalStateException(msg); | |
| 110 | } | |
| 111 | configurationProperties.put(configurationKey, configurationValue); | |
| 112 | } | |
| 113 | ||
| 114 | } | |
Mutations | ||
| 76 |
1.1 |
|
| 77 |
1.1 |
|
| 79 |
1.1 |
|
| 85 |
1.1 |
|
| 88 |
1.1 |
|
| 94 |
1.1 |
|
| 97 |
1.1 |
|
| 104 |
1.1 |