KafkaSecurityConfiguration.java

package com.reallifedeveloper.common.infrastructure.messaging;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Map;

import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;

import lombok.experimental.UtilityClass;

/**
 * Utility class used to set the security-related Kafka configuration properties when creating consumers and producers.
 * <p>
 * In particular, it allows reading the truststore from a classpath resource using a {@code classpath:} prefix.
 *
 * @author RealLifeDeveloper
 */
@UtilityClass
public final class KafkaSecurityConfiguration {

    /**
     * The configuration property to use to set security protocol, e.g., {@code SASL_SSL}.
     */
    public static final String SECURITY_PROTOCOL_CONFIGURATION_KEY = "security.protocol";

    /**
     * The configuration property to use to set the location of the truststore, supports {@code classpath} prefixes.
     */
    public static final String SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;

    /**
     * The configuration property to use to set the password of the truststore.
     */
    public static final String SSL_TRUSTSTORE_PASSWORD_CONFIGURATION_KEY = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;

    /**
     * The configuration property to use to set the SASL mechanism, e.g., {@code PLAIN}.
     */
    public static final String SASL_MECHNISM_CONFIGURATION_KEY = SaslConfigs.SASL_MECHANISM;

    /**
     * The configuration property to use to set the JAAS config.
     */
    public static final String SASL_JAAS_CONFIG_CONFIGURATION_KEY = SaslConfigs.SASL_JAAS_CONFIG;

    private static final DefaultResourceLoader RESOURCE_LOADER = new DefaultResourceLoader();

    /**
     * Sets the value of a configuration property.
     * <p>
     * A value can be set only once for a particular property; trying to reset a value causes an exception to be thrown.
     * <p>
     * There is special handling of the {@value #SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY} configuration property, with support for reading
     * a trust store from the classpath if the configuration value starts with {@code classpath:}.
     * <p>
     * In the case that the {@value #SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY} value starts with {@code classpath:}, the trust store is
     * read as a resource using the current thread's context classloader. The store is copied to a temporary file, and the path of the file
     * is used as the value to set.
     *
     * @param configurationKey        the configuration key to apply
     * @param configurationValue      the configuration value
     * @param configurationProperties the currently set configuration properties
     *
     * @throws IllegalStateException if trying to reset a value
     * @throws IOException           if reading the trust store from classpath fails
     */
    public static void applySecurityConfiguration(final String configurationKey, final String configurationValue,
            final Map<String, Object> configurationProperties) throws IOException {

        if (SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY.equals(configurationKey)) {
            applySslTruststoreLocationConfiguration(configurationValue, configurationProperties);
        } else {
            applyConfigurationIfNotAlreadySet(configurationKey, configurationValue, configurationProperties);
        }
    }

    private static void applySslTruststoreLocationConfiguration(final String sslTruststoreConfigurationValue,
            final Map<String, Object> configurationProperties) throws IOException {
        if (sslTruststoreConfigurationValue.startsWith("classpath:")) {
            final Resource jksResource = RESOURCE_LOADER.getResource(sslTruststoreConfigurationValue);

            if (!jksResource.exists()) {
                throw new FileNotFoundException("Kafka trust store not found: " + sslTruststoreConfigurationValue);
            }

            final File tempFile = File.createTempFile("truststore", ".jks");
            Files.copy(jksResource.getInputStream(), tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            applyConfigurationIfNotAlreadySet(SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY, tempFile.getCanonicalPath(),
                    configurationProperties);
        } else {
            applyConfigurationIfNotAlreadySet(SSL_TRUSTSTORE_LOCATION_CONFIGURATION_KEY, sslTruststoreConfigurationValue,
                    configurationProperties);
        }
    }

    private static void applyConfigurationIfNotAlreadySet(String configurationKey, String configurationValue,
            Map<String, Object> configurationProperties) {
        if (configurationProperties.containsKey(configurationKey)) {
            String msg = String.format(
                    "Trying to reset the value of property that has already been set: configurationKey=%s, "
                            + "configurationValue=%s, configurationProperties=%s",
                    configurationKey, configurationValue, configurationProperties);
            throw new IllegalStateException(msg);
        }
        configurationProperties.put(configurationKey, configurationValue);
    }

}