1 | package com.reallifedeveloper.common.infrastructure.messaging; | |
2 | ||
3 | import java.io.File; | |
4 | import java.io.FileNotFoundException; | |
5 | import java.io.IOException; | |
6 | import java.nio.file.Files; | |
7 | import java.nio.file.StandardCopyOption; | |
8 | import java.util.Map; | |
9 | ||
10 | import org.apache.kafka.common.config.SaslConfigs; | |
11 | import org.apache.kafka.common.config.SslConfigs; | |
12 | import org.springframework.core.io.DefaultResourceLoader; | |
13 | import org.springframework.core.io.Resource; | |
14 | ||
15 | import lombok.experimental.UtilityClass; | |
16 | ||
17 | /** | |
18 | * Utility class used to set the security-related Kafka configuration properties when creating consumers and producers. | |
19 | * <p> | |
20 | * In particular, it allows reading the truststore from a classpath resource using a {@code classpath:} prefix. | |
21 | * | |
22 | * @author RealLifeDeveloper | |
23 | */ | |
24 | @UtilityClass | |
25 | public final class KafkaSecurityConfiguration { | |
26 | ||
27 | /** | |
28 | * The configuration property to use to set security protocol, e.g., {@code SASL_SSL}. | |
29 | */ | |
30 | public static final String SECURITY_PROTOCOL_CONFIGURATION_KEY = "security.protocol"; | |
31 | ||
32 | /** | |
33 | * The configuration property to use to set the location of the truststore, supports {@code classpath} prefixes. | |
34 | */ | |
35 | public static final String SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; | |
36 | ||
37 | /** | |
38 | * The configuration property to use to set the password of the truststore. | |
39 | */ | |
40 | public static final String SSL_TRUSTSTORE_PASSWORD_CONFIGURATION_KEY = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; | |
41 | ||
42 | /** | |
43 | * The configuration property to use to set the SASL mechanism, e.g., {@code PLAIN}. | |
44 | */ | |
45 | public static final String SASL_MECHNISM_CONFIGURATION_KEY = SaslConfigs.SASL_MECHANISM; | |
46 | ||
47 | /** | |
48 | * The configuration property to use to set the JAAS config. | |
49 | */ | |
50 | public static final String SASL_JAAS_CONFIG_CONFIGURATION_KEY = SaslConfigs.SASL_JAAS_CONFIG; | |
51 | ||
52 | private static final DefaultResourceLoader RESOURCE_LOADER = new DefaultResourceLoader(); | |
53 | ||
54 | /** | |
55 | * Sets the value of a configuration property. | |
56 | * <p> | |
57 | * A value can be set only once for a particular property; trying to reset a value causes an exception to be thrown. | |
58 | * <p> | |
59 | * There is special handling of the {@value #SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY} configuration property, with support for reading | |
60 | * a trust store from the classpath if the configuration value starts with {@code classpath:}. | |
61 | * <p> | |
62 | * In the case that the {@value #SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY} value starts with {@code classpath:}, the trust store is | |
63 | * read as a resource using the current thread's context classloader. The store is copied to a temporary file, and the path of the file | |
64 | * is used as the value to set. | |
65 | * | |
66 | * @param configurationKey the configuration key to apply | |
67 | * @param configurationValue the configuration value | |
68 | * @param configurationProperties the currently set configuration properties | |
69 | * | |
70 | * @throws IllegalStateException if trying to reset a value | |
71 | * @throws IOException if reading the trust store from classpath fails | |
72 | */ | |
73 | public static void applySecurityConfiguration(final String configurationKey, final String configurationValue, | |
74 | final Map<String, Object> configurationProperties) throws IOException { | |
75 | ||
76 |
1
1. applySecurityConfiguration : negated conditional → KILLED |
if (SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY.equals(configurationKey)) { |
77 |
1
1. applySecurityConfiguration : removed call to com/reallifedeveloper/common/infrastructure/messaging/KafkaSecurityConfiguration::applySslTruststoreLocationConfiguration → KILLED |
applySslTruststoreLocationConfiguration(configurationValue, configurationProperties); |
78 | } else { | |
79 |
1
1. applySecurityConfiguration : removed call to com/reallifedeveloper/common/infrastructure/messaging/KafkaSecurityConfiguration::applyConfigurationIfNotAlreadySet → KILLED |
applyConfigurationIfNotAlreadySet(configurationKey, configurationValue, configurationProperties); |
80 | } | |
81 | } | |
82 | ||
83 | private static void applySslTruststoreLocationConfiguration(final String sslTruststoreConfigurationValue, | |
84 | final Map<String, Object> configurationProperties) throws IOException { | |
85 |
1
1. applySslTruststoreLocationConfiguration : negated conditional → KILLED |
if (sslTruststoreConfigurationValue.startsWith("classpath:")) { |
86 | final Resource jksResource = RESOURCE_LOADER.getResource(sslTruststoreConfigurationValue); | |
87 | ||
88 |
1
1. applySslTruststoreLocationConfiguration : negated conditional → KILLED |
if (!jksResource.exists()) { |
89 | throw new FileNotFoundException("Kafka trust store not found: " + sslTruststoreConfigurationValue); | |
90 | } | |
91 | ||
92 | final File tempFile = File.createTempFile("truststore", ".jks"); | |
93 | Files.copy(jksResource.getInputStream(), tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); | |
94 |
1
1. applySslTruststoreLocationConfiguration : removed call to com/reallifedeveloper/common/infrastructure/messaging/KafkaSecurityConfiguration::applyConfigurationIfNotAlreadySet → KILLED |
applyConfigurationIfNotAlreadySet(SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY, tempFile.getCanonicalPath(), |
95 | configurationProperties); | |
96 | } else { | |
97 |
1
1. applySslTruststoreLocationConfiguration : removed call to com/reallifedeveloper/common/infrastructure/messaging/KafkaSecurityConfiguration::applyConfigurationIfNotAlreadySet → KILLED |
applyConfigurationIfNotAlreadySet(SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY, sslTruststoreConfigurationValue, |
98 | configurationProperties); | |
99 | } | |
100 | } | |
101 | ||
102 | private static void applyConfigurationIfNotAlreadySet(String configurationKey, String configurationValue, | |
103 | Map<String, Object> configurationProperties) { | |
104 |
1
1. applyConfigurationIfNotAlreadySet : negated conditional → KILLED |
if (configurationProperties.containsKey(configurationKey)) { |
105 | String msg = String.format( | |
106 | "Trying to reset the value of property that has already been set: configurationKey=%s, " | |
107 | + "configurationValue=%s, configurationProperties=%s", | |
108 | configurationKey, configurationValue, configurationProperties); | |
109 | throw new IllegalStateException(msg); | |
110 | } | |
111 | configurationProperties.put(configurationKey, configurationValue); | |
112 | } | |
113 | ||
114 | } | |
Mutations | ||
76 |
1.1 |
|
77 |
1.1 |
|
79 |
1.1 |
|
85 |
1.1 |
|
88 |
1.1 |
|
94 |
1.1 |
|
97 |
1.1 |
|
104 |
1.1 |