Skip to content

Commit

Permalink
OP-23081: Added authentication support for AWS MSK
Browse files Browse the repository at this point in the history
  • Loading branch information
Utkarsh Shukla committed Feb 4, 2025
1 parent 678a202 commit 0040f88
Showing 1 changed file with 55 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,66 @@ public String getUserActivityQueueEndPoint() {

private String getSecurityString() {
String securityString = "";
if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "SCRAM-SHA-256")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=org.apache.kafka.common.security.scram.ScramLoginModule required username=\""
+ kafkaProperties.getSasl().getUsername()
+ "\" password=\""
+ kafkaProperties.getSasl().getPassword()
+ "\";";
if (kafkaProperties.getSecurity() == null) {
return "";
}

if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "PLAIN")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\""
+ kafkaProperties.getSasl().getUsername()
+ "\\\" password=\\\""
+ kafkaProperties.getSasl().getPassword()
+ "\\\";";
}
securityString =
securityString + "&securityProtocol=" + kafkaProperties.getSecurity().getProtocol();

if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "AWS_MSK_IAM")) {
// The below logic only works for specific use cases.
if (kafkaProperties.getSasl() != null) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=software.amazon.msk.auth.iam.IAMLoginModule required;"
+ "&additionalProperties.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler";
}
securityString + "&saslMechanism=" + kafkaProperties.getSasl().getMechanism();

if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "OAUTHBEARER")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
+ "&additionalProperties.sasl.login.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler"
+ "&additionalProperties.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler";
System.setProperty("aws.accessKeyId", kafkaProperties.getAccessKeyId());
System.setProperty("aws.secretKey", kafkaProperties.getSecretKey());
if (Objects.equals(kafkaProperties.getSasl().getMechanism(), "SCRAM-SHA-512")) {
securityString =
securityString
+ "&saslJaasConfig=org.apache.kafka.common.security.scram.ScramLoginModule required username=\""
+ kafkaProperties.getSasl().getUsername()
+ "\" password=\""
+ kafkaProperties.getSasl().getPassword()
+ "\";"
+ "&sslTruststoreLocation="
+ kafkaProperties.getSsl().getTruststoreLocation();
}

if (Objects.equals(kafkaProperties.getSasl().getMechanism(), "SCRAM-SHA-256")) {
securityString =
securityString
+ "&saslJaasConfig=org.apache.kafka.common.security.scram.ScramLoginModule required username=\""
+ kafkaProperties.getSasl().getUsername()
+ "\" password=\""
+ kafkaProperties.getSasl().getPassword()
+ "\";";
}

if (Objects.equals(kafkaProperties.getSasl().getMechanism(), "PLAIN")) {
securityString =
securityString
+ "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\""
+ kafkaProperties.getSasl().getUsername()
+ "\\\" password=\\\""
+ kafkaProperties.getSasl().getPassword()
+ "\\\";";
}

if (Objects.equals(kafkaProperties.getSasl().getMechanism(), "AWS_MSK_IAM")) {
securityString =
securityString
+ "&saslJaasConfig=software.amazon.msk.auth.iam.IAMLoginModule required;"
+ "&additionalProperties.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler";
}

if (Objects.equals(kafkaProperties.getSasl().getMechanism(), "OAUTHBEARER")) {
securityString =
securityString
+ "&saslJaasConfig=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
+ "&additionalProperties.sasl.login.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler"
+ "&additionalProperties.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler";
System.setProperty("aws.accessKeyId", kafkaProperties.getAccessKeyId());
System.setProperty("aws.secretKey", kafkaProperties.getSecretKey());
}
}
return securityString;
}
Expand Down

0 comments on commit 0040f88

Please sign in to comment.