Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support running xlearning on security hadoop cluster #58

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.iml
4 changes: 4 additions & 0 deletions conf/xlearning-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,9 @@
<name>xlearning.history.webapp.https.port</name>
<value>19885</value>
</property>
<property>
<name>ipc.client.fallback-to-simple-auth-allowed</name>
<value>true</value>
</property>
</configuration>

Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public ApplicationContainerListener(ApplicationContext applicationContext, Confi
@Override
public void start() {
LOG.info("Starting application containers handler server");
RPC.Builder builder = new RPC.Builder(getConfig());
Configuration conf = SecurityUtil.disableSecureRpc(getConfig());
RPC.Builder builder = new RPC.Builder(conf);
builder.setProtocol(ApplicationContainerProtocol.class);
builder.setInstance(this);
builder.setBindAddress("0.0.0.0");
Expand Down
34 changes: 23 additions & 11 deletions src/main/java/net/qihoo/xlearning/AM/ApplicationMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.ApplicationConstants;
Expand All @@ -34,7 +35,9 @@
import java.lang.reflect.Method;
import java.math.RoundingMode;
import java.net.*;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.text.DecimalFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -900,6 +903,7 @@ private Map<String, String> buildContainerEnv(String role) {
String.valueOf(containerListener.getServerPort()));
containerEnv.put("PATH", System.getenv("PATH") + ":" + System.getenv(XLearningConstants.Environment.USER_PATH.toString()));
containerEnv.put("LD_LIBRARY_PATH", System.getenv("LD_LIBRARY_PATH") + ":" + System.getenv(XLearningConstants.Environment.USER_LD_LIBRARY_PATH.toString()));
SecurityUtil.setupUserEnv(containerEnv);

LOG.debug("env:" + containerEnv.toString());
Set<String> envStr = containerEnv.keySet();
Expand Down Expand Up @@ -961,8 +965,9 @@ private void launchContainer(Map<String, LocalResource> containerLocalResource,
+ container.getId());

containerEnv.put(XLearningConstants.Environment.XLEARNING_TF_INDEX.toString(), String.valueOf(index));
ByteBuffer tokenBuffer = SecurityUtil.copyUserToken();
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
containerLocalResource, containerEnv, containerLaunchcommands, null, null, null);
containerLocalResource, containerEnv, containerLaunchcommands, null, tokenBuffer, null);

try {
nmAsync.startContainerAsync(container, ctx);
Expand Down Expand Up @@ -1871,17 +1876,24 @@ public String getTfEvaluatorId() {
* @param args Command line args
*/
public static void main(String[] args) {
ApplicationMaster appMaster;
try {
appMaster = new ApplicationMaster();
appMaster.init();
if (appMaster.run()) {
LOG.info("Application completed successfully.");
System.exit(0);
} else {
LOG.info("Application failed.");
System.exit(1);
}
UserGroupInformation ugi = SecurityUtil.setupUserGroupInformation();
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
ApplicationMaster appMaster;
appMaster = new ApplicationMaster();
appMaster.init();
if (appMaster.run()) {
LOG.info("Application completed successfully.");
System.exit(0);
} else {
LOG.info("Application failed.");
System.exit(1);
}
return null;
}
});
} catch (Exception e) {
LOG.fatal("Error running ApplicationMaster", e);
System.exit(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import net.qihoo.xlearning.api.ApplicationContext;
import net.qihoo.xlearning.api.ApplicationMessageProtocol;
import net.qihoo.xlearning.common.Message;
import net.qihoo.xlearning.common.SecurityUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -35,7 +36,8 @@ public ApplicationMessageService(ApplicationContext applicationContext, Configur
@Override
public void start() {
LOG.info("Starting application message server");
RPC.Builder builder = new RPC.Builder(getConfig());
Configuration conf = SecurityUtil.disableSecureRpc(getConfig());
RPC.Builder builder = new RPC.Builder(conf);
builder.setProtocol(ApplicationMessageProtocol.class);
builder.setInstance(this);
builder.setBindAddress("0.0.0.0");
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/net/qihoo/xlearning/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import net.qihoo.xlearning.api.XLearningConstants;
import net.qihoo.xlearning.common.LogType;
import net.qihoo.xlearning.common.Message;
import net.qihoo.xlearning.common.SecurityUtil;
import net.qihoo.xlearning.common.exceptions.RequestOverLimitException;
import net.qihoo.xlearning.conf.XLearningConfiguration;
import net.qihoo.xlearning.util.Utilities;
Expand Down Expand Up @@ -33,6 +34,7 @@
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -642,6 +644,7 @@ private boolean submitAndMonitor() throws IOException, YarnException {
Utilities.addPathToEnvironment(appMasterEnv, envKey, appMasterUserEnv.get(envKey));
}
}
SecurityUtil.setupUserEnv(appMasterEnv);

LOG.info("Building application master launch command");
List<String> appMasterArgs = new ArrayList<>(20);
Expand All @@ -667,8 +670,11 @@ private boolean submitAndMonitor() throws IOException, YarnException {
capability.setMemory(conf.getInt(XLearningConfiguration.XLEARNING_AM_MEMORY, XLearningConfiguration.DEFAULT_XLEARNING_AM_MEMORY));
capability.setVirtualCores(conf.getInt(XLearningConfiguration.XLEARNING_AM_CORES, XLearningConfiguration.DEFAULT_XLEARNING_AM_CORES));
applicationContext.setResource(capability);

ByteBuffer tokenBuffer = SecurityUtil.getDelegationTokens(conf, yarnClient);

ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, appMasterEnv, appMasterLaunchcommands, null, null, null);
localResources, appMasterEnv, appMasterLaunchcommands, null, tokenBuffer, null);

applicationContext.setAMContainerSpec(amContainer);

Expand Down
125 changes: 125 additions & 0 deletions src/main/java/net/qihoo/xlearning/common/SecurityUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package net.qihoo.xlearning.common;

import net.qihoo.xlearning.api.XLearningConstants;
import net.qihoo.xlearning.conf.XLearningConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;

public class SecurityUtil {
private static final Log LOG = LogFactory.getLog(SecurityUtil.class);
private static final String XLEARNING_USER = "XLEARNING_USER";

public static Configuration disableSecureRpc(Configuration conf) {
conf = new Configuration(conf);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
SaslRpcServer.AuthMethod.SIMPLE.toString());
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false);
return conf;
}

public static ByteBuffer getDelegationTokens(YarnConfiguration conf, YarnClient yarnClient)
throws IOException, YarnException {
if (!UserGroupInformation.isSecurityEnabled()) {
return null;
}
// Set up security tokens for launching our ApplicationMaster container.
Credentials credentials = new Credentials();
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
throw new IOException(
"Can't get Master kerberos principal for the RM to use as renewer");
}
FileSystem fs = FileSystem.get(conf);
// getting tokens for the default file-system.
final Token<?> tokens[] =
fs.addDelegationTokens(tokenRenewer, credentials);
if (tokens != null) {
for (org.apache.hadoop.security.token.Token<?> token : tokens) {
LOG.info("Got dt for " + fs.getUri() + "; " + token);
}
}
InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
// getting yarn resource manager token
Token<TokenIdentifier> token = ConverterUtils.convertFromYarn(
yarnClient.getRMDelegationToken(new Text(tokenRenewer)),
rmAddress);
LOG.info("Added RM delegation token: " + token);
credentials.addToken(token.getService(), token);

DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}

public static ByteBuffer copyUserToken() throws IOException {
if (!UserGroupInformation.isSecurityEnabled()) {
return null;
}
LOG.info("Setup container token for security hadoop cluster");
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
// Now remove the AM->RM token so that containers cannot access it.
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}

public static void setupUserEnv(Map<String, String> env) {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
try {
env.put(XLEARNING_USER, UserGroupInformation.getCurrentUser().getShortUserName());
} catch (IOException e) {
LOG.warn("Failed to setup env: " + XLEARNING_USER, e);
}
}

public static UserGroupInformation setupUserGroupInformation()
throws IOException {
XLearningConfiguration conf = new XLearningConfiguration();
conf.addResource(new Path(XLearningConstants.XLEARNING_JOB_CONFIGURATION));
UserGroupInformation.setConfiguration(conf);
if (!UserGroupInformation.isSecurityEnabled()) {
return UserGroupInformation.getCurrentUser();
}
String user = System.getenv(XLEARNING_USER);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
for (Token token : UserGroupInformation.getCurrentUser().getTokens()) {
ugi.addToken(token);
}
LOG.info("UserGroupInformation: " + ugi);
return ugi;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import net.qihoo.xlearning.AM.ApplicationMaster;
import net.qihoo.xlearning.api.ApplicationContainerProtocol;
import net.qihoo.xlearning.api.XLearningConstants;
import net.qihoo.xlearning.common.InputInfo;
import net.qihoo.xlearning.common.OutputInfo;
import net.qihoo.xlearning.common.SecurityUtil;
import net.qihoo.xlearning.common.XLearningContainerStatus;
import net.qihoo.xlearning.common.TextMultiOutputFormat;
import net.qihoo.xlearning.conf.XLearningConfiguration;
Expand All @@ -19,7 +21,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
Expand All @@ -32,6 +36,7 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.*;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -977,16 +982,23 @@ private void reportSucceededAndExit() {
}

public static void main(String[] args) {
XLearningContainer container = new XLearningContainer();
final XLearningContainer container = new XLearningContainer();
try {
container.init();
if (container.run()) {
LOG.info("XLearningContainer " + container.getContainerId().toString() + " finish successfully");
container.reportSucceededAndExit();
} else {
LOG.error("XLearningContainer run failed!");
container.reportFailedAndExit();
}
UserGroupInformation ugi = SecurityUtil.setupUserGroupInformation();
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
container.init();
if (container.run()) {
LOG.info("XLearningContainer " + container.getContainerId().toString() + " finish successfully");
container.reportSucceededAndExit();
} else {
LOG.error("XLearningContainer run failed!");
container.reportFailedAndExit();
}
return null;
}
});
} catch (Exception e) {
LOG.error("Some errors has occurred during container running!", e);
container.reportFailedAndExit();
Expand Down