Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for kerberos authentication #33

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ output {
}
```

pulsar with kerberos

```
output {
pulsar{
serviceUrl => "pulsar://localhost:6650"
topic => "persistent://public/default/%{topic_name}"
enable_batching => true
enable_token => false
auth_plugin_class_name => "org.apache.pulsar.client.impl.auth.AuthenticationSasl"
enable_kerberos => "true"
jaas_path => "/{PATH_TO}/pulsar-jaas.conf"
krb5_path => "/{PATH_TO}/krb5.conf"
sasl_jaas_client_section_name => "pulsar_consumer"
server_type => "broker" // ** here is the different **
producer_name => "%{producer_name}"
}
}
```



# Installation

Expand Down
16 changes: 10 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import java.nio.file.Files
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING
import Files
import static StandardCopyOption.REPLACE_EXISTING

def LOGSTASH_CORE_PATH="D:\\Program Files\\logstash\\"
apply plugin: 'java'
apply from: LOGSTASH_CORE_PATH + "/../rubyUtils.gradle"
apply from: LOGSTASH_CORE_PATH + "rubyUtils.gradle"

// ===========================================================================
// plugin info
Expand Down Expand Up @@ -48,18 +49,21 @@ shadowJar {
dependencies {
implementation 'org.apache.logging.log4j:log4j-core:2.17.2'
implementation 'org.apache.logging.log4j:log4j-api:2.17.2'
implementation 'org.apache.pulsar:pulsar-client:2.10.2'
implementation files(LOGSTASH_CORE_PATH + '/../logstash-core/build/libs/logstash-core-7.17.4.jar')
implementation 'org.apache.pulsar:pulsar-client:2.10.3'
implementation 'org.apache.pulsar:pulsar-client-auth-sasl:2.10.3'

testImplementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.12.3'
testImplementation 'junit:junit:4.12'
testImplementation 'org.jruby:jruby-complete:9.1.14.0'

implementation files(LOGSTASH_CORE_PATH + "logstash-core\\build\\libs\\jars\\logstash-core.jar")
}

clean {
delete "${projectDir}/Gemfile"
delete "${projectDir}/" + pluginInfo.pluginFullName() + ".gemspec"
delete "${projectDir}/" + "" + ".gemspec"
delete "${projectDir}/lib/"
delete "${projectDir}/vendor/"
new FileNameFinder().getFileNames(projectDir.toString(), pluginInfo.pluginFullName() + "-?.?.?.gem").each { filename ->
Expand All @@ -78,7 +82,7 @@ tasks.register("vendor"){
String projectGroupPath = project.group.replaceAll('\\.', '/')
File projectJarFile = file("${vendorPathPrefix}/${projectGroupPath}/${pluginInfo.pluginFullName()}/${project.version}/${pluginInfo.pluginFullName()}-${project.version}.jar")
projectJarFile.mkdirs()
Files.copy(file("$buildDir/libs/${project.name}-${project.version}.jar").toPath(), projectJarFile.toPath(), REPLACE_EXISTING)
Files.copy(file("$buildDir/libs/${project.name}-${project.version}.jar").toPath(), projectJarFile.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING)
validatePluginJar(projectJarFile, project.group)
}
}
Expand Down
59 changes: 54 additions & 5 deletions src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
import co.elastic.logstash.api.PluginHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.shade.com.google.common.collect.Maps;

import java.io.ByteArrayOutputStream;
import java.util.Arrays;
Expand All @@ -29,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;


@LogstashPlugin(name = "pulsar")
public class Pulsar implements Output {

Expand Down Expand Up @@ -82,6 +81,19 @@ public class Pulsar implements Output {
private static final PluginConfigSpec<String> CONFIG_TLS_TRUST_STORE_PASSWORD =
PluginConfigSpec.stringSetting("tls_trust_store_password","");

private static final PluginConfigSpec<String> CONFIG_JAAS_PATH =
PluginConfigSpec.stringSetting("jaas_path","");

private static final PluginConfigSpec<String> CONFIG_KRB5_PATH =
PluginConfigSpec.stringSetting("krb5_path","");

private static final PluginConfigSpec<String> CONFIG_SASL_JAAS_CLIENT_SECTION_NAME =
PluginConfigSpec.stringSetting("sasl_jaas_client_section_name","");

private static final PluginConfigSpec<String> CONFIG_SERVER_TYPE =
PluginConfigSpec.stringSetting("server_type","");


private static final PluginConfigSpec<String> CONFIG_AUTH_PLUGIN_CLASS_NAME =
PluginConfigSpec.stringSetting("auth_plugin_class_name",authPluginClassName);

Expand All @@ -94,6 +106,9 @@ public class Pulsar implements Output {
private static final PluginConfigSpec<Boolean> CONFIG_ENABLE_TOKEN =
PluginConfigSpec.booleanSetting("enable_token",false);

private static final PluginConfigSpec<Boolean> CONFIG_ENABLE_KERBEROS =
PluginConfigSpec.booleanSetting("enable_kerberos",false);

private static final PluginConfigSpec<String> CONFIG_AUTH_PLUGIN_PARAMS_STRING =
PluginConfigSpec.stringSetting("auth_plugin_params_String","");

Expand Down Expand Up @@ -125,6 +140,9 @@ public class Pulsar implements Output {
//Token
private final boolean enableToken;

// Kerberos
private final boolean enableKerberos;

// TODO: batchingMaxPublishDelay milliseconds

// TODO: sendTimeoutMs milliseconds 30000
Expand All @@ -148,6 +166,7 @@ public Pulsar(final String id, final Configuration configuration, final Context

enableTls = configuration.get(CONFIG_ENABLE_TLS);
enableToken = configuration.get(CONFIG_ENABLE_TOKEN);
enableKerberos = configuration.get(CONFIG_ENABLE_KERBEROS);

try {
if(enableTls && enableToken){
Expand All @@ -159,6 +178,9 @@ public Pulsar(final String id, final Configuration configuration, final Context
} else if (enableToken) {
// pulsar Token
client = buildTokenPulsar(configuration);
} else if (enableKerberos) {
// pulsar Kerberos
client = buildKerberosPulsar(configuration);
} else {
client = buildNotTlsPulsar();
}
Expand All @@ -182,6 +204,26 @@ private PulsarClient buildTokenPulsar(Configuration configuration) throws Pulsar
.build();
}

private PulsarClient buildKerberosPulsar(Configuration configuration) throws PulsarClientException {
logger.info("Use buildKerberosPulsar get PulsarClient!");

System.setProperty("java.security.auth.login.config", configuration.get(CONFIG_JAAS_PATH));
System.setProperty("java.security.krb5.conf", configuration.get(CONFIG_KRB5_PATH));

Map<String, String> authParams = Maps.newHashMap();
authParams.put("saslJaasClientSectionName", configuration.get(CONFIG_SASL_JAAS_CLIENT_SECTION_NAME));
authParams.put("serverType", configuration.get(CONFIG_SERVER_TYPE)); // ** here is the different **

Authentication saslAuth = AuthenticationFactory
.create(configuration.get(CONFIG_AUTH_PLUGIN_CLASS_NAME), authParams);

return PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(saslAuth)
.build();
}


private PulsarClient buildTlsPulsar(Configuration configuration) throws PulsarClientException {
Boolean allowTlsInsecureConnection = configuration.get(CONFIG_ALLOW_TLS_INSECURE_CONNECTION);
Boolean enableTlsHostnameVerification = configuration.get(CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION);
Expand Down Expand Up @@ -323,7 +365,14 @@ public Collection<PluginConfigSpec<?>> configSchema() {

// Pulsar Token Config
CONFIG_ENABLE_TOKEN,
CONFIG_AUTH_PLUGIN_PARAMS_STRING
CONFIG_AUTH_PLUGIN_PARAMS_STRING,

// Pulsar Kerberos Config
CONFIG_SASL_JAAS_CLIENT_SECTION_NAME,
CONFIG_ENABLE_KERBEROS,
CONFIG_KRB5_PATH,
CONFIG_JAAS_PATH,
CONFIG_SERVER_TYPE
));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void init() {
configValues.put("producer_name", "jun_test");
configValues.put("topic", topic);
configValues.put("enable_batching", true);
configValues.put("enable_token", true);
configValues.put("enable_token", false);
configValues.put("auth_plugin_class_name", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
configValues.put("auth_plugin_params_String", "token:" + token);
String delimiter = "/";
Expand All @@ -37,6 +37,28 @@ public void init() {
configValues.put("codec", new Line(codecConf, null));
}


// Initialize configuration information
public void getKerberosConfiguration() {
configValues.put("serviceUrl", serviceUrl);
configValues.put("producer_name", "jun_test");
configValues.put("topic", topic);
configValues.put("enable_batching", true);

configValues.put("enable_token", false);
configValues.put("auth_plugin_class_name", "org.apache.pulsar.client.impl.auth.AuthenticationSasl");
configValues.put("enable_kerberos", true);
configValues.put("jaas_path", "/path/pulsar_jaas.conf");
configValues.put("krb5_path", "/path/krb5.conf");
configValues.put("sasl_jaas_client_section_name", "PulsarClient");
configValues.put("server_type", "broker");
String delimiter = "/";
Map<String, Object> codecMap = new HashMap<>();
codecMap.put("delimiter", delimiter);
Configuration codecConf = new ConfigurationImpl(codecMap);
configValues.put("codec", new Line(codecConf, null));
}

@Test
@Ignore()
public void testOutputWithPulsarToken() {
Expand Down