Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use meituan hdfs to read or write train data and model #1088

Merged
merged 4 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion deploy/scripts/hdfs_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@ then
export HADOOP_CONF_DIR=$HADOOP_HOME/conf
source "$HADOOP_HOME/conf/hadoop-env.sh" &> /dev/null
else
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
source "$HADOOP_HOME/etc/hadoop/hadoop-env.sh" &> /dev/null
fi

if [[ "$USING_MT_HADOOP" == "True" ]]; then
echo "mt kerberos auth"
python3 /app/deploy/scripts/mt_auth_util.py ${HADOOP_USER_NAME}
export KRB5CCNAME=/tmp/krb5cc_0
echo $KRB5CCNAME
fi
export LD_LIBRARY_PATH=${HADOOP_HOME}/lib/native:${HADOOP_HOME}/lib/native/nfs:${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}
export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$($HADOOP_HOME/bin/hadoop classpath --glob)
else
Expand Down
6 changes: 4 additions & 2 deletions deploy/scripts/sgx/run_trainer_master_sgx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
set -ex
source ~/.env
export CUDA_VISIBLE_DEVICES=
unset HTTPS_PROXY https_proxy http_proxy ftp_proxy
cp /app/sgx/gramine/CI-Examples/tensorflow_io.py ./
source /app/deploy/scripts/hdfs_common.sh || true
source /app/deploy/scripts/pre_start_hook.sh || true
Expand All @@ -39,6 +40,7 @@ local_data_source=$(normalize_env_to_args "--local-data-source" $LOCAL_DATA_SOUR
local_data_path=$(normalize_env_to_args "--local-data-path" $LOCAL_DATA_PATH)
local_start_date=$(normalize_env_to_args "--local-start-date" $LOCAL_START_DATE)
local_end_date=$(normalize_env_to_args "--local-end-date" $LOCAL_END_DATE)
using_mt_hadoop=$(normalize_env_to_args "--using_mt_hadoop" $USING_MT_HADOOP)

if [ -n "$CHECKPOINT_PATH" ]; then
checkpoint_path="--checkpoint-path=$CHECKPOINT_PATH"
Expand Down Expand Up @@ -91,7 +93,6 @@ cd ${ROLE}
cp /app/sgx/gramine/CI-Examples/tensorflow_io.py ./
source /app/deploy/scripts/sgx/enclave_env.sh
cp /app/sgx/token/* ./
unset HTTPS_PROXY https_proxy http_proxy ftp_proxy

make_custom_env 4
source /root/start_aesm_service.sh
Expand Down Expand Up @@ -126,4 +127,5 @@ taskset -c $START_CPU_SN-$END_CPU_SN stdbuf -o0 gramine-sgx python main.py --mas
$summary_save_steps $summary_save_secs \
$local_data_source $local_data_path $local_start_date \
$local_end_date $epoch_num $start_date $end_date \
$shuffle $shuffle_in_day $extra_params $export_model
$shuffle $shuffle_in_day $extra_params $export_model \
$using_mt_hadoop
15 changes: 13 additions & 2 deletions deploy/scripts/sgx/run_trainer_worker_sgx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ fi

echo $LISTEN_PORT > /pod-data/listen_port

unset HTTPS_PROXY https_proxy http_proxy ftp_proxy
cp /app/sgx/gramine/CI-Examples/tensorflow_io.py ./
source /app/deploy/scripts/hdfs_common.sh || true
source /app/deploy/scripts/pre_start_hook.sh || true
Expand All @@ -43,14 +44,22 @@ cd ${ROLE}
cp /app/sgx/gramine/CI-Examples/tensorflow_io.py ./
source /app/deploy/scripts/sgx/enclave_env.sh
cp /app/sgx/token/* ./
unset HTTPS_PROXY https_proxy http_proxy ftp_proxy

mode=$(normalize_env_to_args "--mode" "$MODE")
sparse_estimator=$(normalize_env_to_args "--sparse-estimator" "$SPARSE_ESTIMATOR")
batch_size=$(normalize_env_to_args "--batch-size" "$BATCH_SIZE")
learning_rate=$(normalize_env_to_args "--learning-rate" "$LEARNING_RATE")
extra_params=$(normalize_env_to_args "--extra-params" "$EXTRA_PARAMS")

using_embedding_protection=$(normalize_env_to_args "--using_embedding_protection" $USING_EMBEDDING_PROTECTION)
using_marvell_protection=$(normalize_env_to_args "--using_marvell_protection" $USING_MARVELL_PROTECTION)
discorloss_weight=$(normalize_env_to_args "--discorloss_weight" $DISCORLOSS_WEIGHT)
sumkl_threshold=$(normalize_env_to_args "--sumkl_threshold" $SUMKL_THRESHOLD)
using_emb_attack=$(normalize_env_to_args "--using_emb_attack" $USING_EMB_ATTACK)
using_norm_attack=$(normalize_env_to_args "--using_norm_attack" $USING_NORM_ATTACK)
using_mt_hadoop=$(normalize_env_to_args "--using_mt_hadoop" $USING_MT_HADOOP)


if [ -n "$CLUSTER_SPEC" ]; then
# get master address from clusteSpec["master"]
MASTER_HOST=`python -c "
Expand Down Expand Up @@ -106,4 +115,6 @@ taskset -c $START_CPU_SN-$END_CPU_SN stdbuf -o0 gramine-sgx python main.py --wor
--peer-addr="$PEER_ADDR" \
--worker-rank="$INDEX" \
$server_port $mode $batch_size \
$sparse_estimator $learning_rate
$sparse_estimator $learning_rate \
$using_embedding_protection $using_marvell_protection $discorloss_weight $sumkl_threshold $using_emb_attack $using_norm_attack \
$using_mt_hadoop
186 changes: 186 additions & 0 deletions example/wide_n_deep/leader_mask_loss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright 2020 The FedLearner Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# coding: utf-8
# pylint: disable=no-else-return, inconsistent-return-statements

import tensorflow.compat.v1 as tf
import fedlearner.trainer as flt

ROLE = 'leader'

parser = flt.trainer_worker.create_argument_parser()
parser.add_argument('--batch-size', type=int, default=32,
help='Training batch size.')
args = parser.parse_args()


def input_fn(bridge, trainer_master=None):
dataset = flt.data.DataBlockLoader(
args.batch_size, ROLE, bridge, trainer_master).make_dataset()

def parse_fn(example):
feature_map = {"x_{0}".format(i): tf.VarLenFeature(
tf.int64) for i in range(512)}
feature_map["example_id"] = tf.FixedLenFeature([], tf.string)
feature_map["y"] = tf.FixedLenFeature([], tf.int64)
feature_map["loss_mask"] = tf.FixedLenFeature([], tf.int64)
features = tf.parse_example(example, features=feature_map)
return features, dict(y=features.pop('y'))

dataset = dataset.map(map_func=parse_fn,
num_parallel_calls=tf.data.experimental.AUTOTUNE)

return dataset


def serving_input_receiver_fn():
feature_map = {"x_{0}".format(i): tf.VarLenFeature(
tf.int64) for i in range(512)}
feature_map["example_id"] = tf.FixedLenFeature([], tf.string)

record_batch = tf.placeholder(dtype=tf.string, name='examples')
features = tf.parse_example(record_batch, features=feature_map)
features['act1_f'] = tf.placeholder(dtype=tf.float32, name='act1_f')
receiver_tensors = {
'examples': record_batch,
'act1_f': features['act1_f']
}
return tf.estimator.export.ServingInputReceiver(
features, receiver_tensors)


def model_fn(model, features, labels, mode):
"""Model Builder of wide&deep learning models
Args:
Returns
"""
global_step = tf.train.get_or_create_global_step()

# 新增一个特征,用于决定是否将loss设置为0
loss_mask_feature = features.get('loss_mask', None)

x = dict()
for i in range(512):
x_name = "x_{}".format(i)
x[x_name] = features[x_name]


num_slot = 512
fid_size, embed_size = 101, 16
embeddings = [
tf.get_variable(
'slot_emb{0}'.format(i), shape=[fid_size, embed_size],
dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01))
for i in range(num_slot)]
embed_output = tf.concat(
[
tf.nn.embedding_lookup_sparse(
embeddings[i], x['x_{}'.format(i)], sp_weights=None,
combiner='mean')
for i in range(512)],
axis=1)

output_size = num_slot * embed_size
fc1_size, fc2_size = 256, 64
w1l = tf.get_variable(
'w1l', shape=[output_size, fc1_size], dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01))
b1l = tf.get_variable(
'b1l', shape=[fc1_size], dtype=tf.float32,
initializer=tf.zeros_initializer())
w2 = tf.get_variable(
'w2', shape=[fc1_size, fc2_size], dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01))
b2 = tf.get_variable(
'b2', shape=[fc2_size], dtype=tf.float32,
initializer=tf.zeros_initializer())
w3 = tf.get_variable(
'w3', shape=[fc2_size*2, 2], dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01))

act1_l = tf.nn.relu(tf.nn.bias_add(tf.matmul(embed_output, w1l), b1l))
act2_l = tf.nn.bias_add(tf.matmul(act1_l, w2), b2)

if mode == tf.estimator.ModeKeys.TRAIN:
act1_f = model.recv('act1_f', tf.float32, require_grad=True)
elif mode == tf.estimator.ModeKeys.EVAL:
act1_f = model.recv('act1_f', tf.float32, require_grad=False)
else:
act1_f = features['act1_f']

output = tf.concat([act2_l, act1_f], axis=1)
logits = tf.matmul(output, w3)


if mode == tf.estimator.ModeKeys.PREDICT:
return model.make_spec(mode, predictions=logits)

y = labels['y']
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(
labels=y, logits=logits)
# 打印原始损失
loss_print_op = tf.print("Original loss:", loss)

# 判断loss_mask_feature是否存在,以及其值是否为1,如果为1,则将loss设置为0
if loss_mask_feature is not None:
# 将loss_mask_feature的形状转换为与loss相同,以便进行元素乘法
loss_mask = tf.cast(tf.reshape(loss_mask_feature, [-1]), tf.float32)
# 打印loss_mask
loss_mask_print_op = tf.print("Loss mask:", loss_mask)

# 应用mask,如果loss_mask中的值为0,则相应的loss保持不变;如果为1,则将相应的loss设置为0
loss = loss * (1 - loss_mask)
else:
loss_mask_print_op = tf.no_op()

# 打印应用loss_mask后的损失
loss_after_mask_print_op = tf.print("Loss after applying mask:", loss)
# 确保在计算平均损失之前打印出所有的信息
with tf.control_dependencies([loss_print_op, loss_mask_print_op, loss_after_mask_print_op]):
loss = tf.math.reduce_mean(loss)


if mode == tf.estimator.ModeKeys.EVAL:
auc_pair = tf.metrics.auc(y, logits[:, 1])
return model.make_spec(
mode, loss=loss, eval_metric_ops={'auc': auc_pair})

# mode == tf.estimator.ModeKeys.TRAIN:
logging_hook = tf.train.LoggingTensorHook(
{"loss" : loss}, every_n_iter=10)
metric_hook = flt.GlobalStepMetricTensorHook(tensor_dict={"loss": loss},
every_steps=10)
optimizer = tf.train.GradientDescentOptimizer(0.1)
train_op = model.minimize(optimizer, loss, global_step=global_step)
return model.make_spec(mode, loss=loss, train_op=train_op,
training_hooks=[logging_hook, metric_hook])

class ExportModelHook(flt.trainer_worker.ExportModelHook):
def after_save(self, sess, model, export_dir, inputs, outputs):
print("**************export model hook**************")
print("sess :", sess)
print("model: ", model)
print("export_dir: ", export_dir)
print("inputs: ", inputs)
print("outpus: ", outputs)
print("*********************************************")


if __name__ == '__main__':
flt.trainer_worker.train(
ROLE, args, input_fn,
model_fn, serving_input_receiver_fn,
export_model_hook=ExportModelHook())
2 changes: 2 additions & 0 deletions example/wide_n_deep/make_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
Feature(bytes_list=BytesList(value=[str(idx).encode()]))
features_l['y'] = \
Feature(int64_list=Int64List(value=[random.randint(0, 1)]))
features_l['loss_mask'] = \
Feature(int64_list=Int64List(value=[random.randint(0, 1)]))
for k in range(512):
features_l['x_{0}'.format(k)] = \
Feature(int64_list=Int64List(value=[random.randint(0, 100)]))
Expand Down
12 changes: 9 additions & 3 deletions fedlearner-sgx-dev.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ ENV FEDLEARNER_PATH=/fedlearner

RUN apt-get install -y libmysqlclient-dev

# Build gRPC
# Build gRPC
COPY sgx/grpc/common ${GRPC_PATH}
COPY sgx/grpc/v1.38.1 ${GRPC_PATH}

Expand All @@ -144,8 +144,14 @@ RUN cd ${TF_BUILD_PATH} \
# Build and install fedlearner
COPY . ${FEDLEARNER_PATH}

# For meituan hadoop
RUN if [ -f ${FEDLEARNER_PATH}/docker/hadoop-mt-2.7.0.tar.gz ]; then mkdir -p /opt/meituan/ && tar -xzf ${FEDLEARNER_PATH}/docker/hadoop-mt-2.7.0.tar.gz -C /opt/meituan/; fi

# For meituan hadoop auth
RUN apt-get install -y libkrb5-dev openjdk-8-jdk

RUN pip3 install --upgrade pip \
&& pip3 install -r ${FEDLEARNER_PATH}/requirements.txt
&& pip3 install -r ${FEDLEARNER_PATH}/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/

RUN cd ${FEDLEARNER_PATH} \
&& make protobuf \
Expand Down Expand Up @@ -188,7 +194,7 @@ WORKDIR ${WORK_SPACE_PATH}

EXPOSE 6006 50051 50052

RUN bash -x /app/deploy/scripts/sgx/get_token.sh
RUN bash -x /fedlearner/deploy/scripts/sgx/get_token.sh

RUN chmod +x /root/entrypoint.sh
# ENTRYPOINT ["/root/entrypoint.sh"]
Loading