diff --git a/docs/source/feature/odl_sample.md b/docs/source/feature/odl_sample.md index 0fe6cdda8..1b125f06b 100644 --- a/docs/source/feature/odl_sample.md +++ b/docs/source/feature/odl_sample.md @@ -119,16 +119,17 @@ select aa as request_id, bb as user_id, cc as item_id, 'expose' as event_type, ee as event_time, ff as scene from user_expose_log; + end; ``` - project: datahub project - topic: datahub topic - subId: datahub订阅id, 每个flink sql任务需要单独创建订阅id, 否则会有冲突 - - scene: extra fields, 可选字段 + - scene: 附带信息, 可选字段 1. 样本Events聚合(OnlineSampleAggr): - - 上传资源包: [rec-realtime-0.8-SNAPSHOT.jar](http://easyrec.oss-cn-beijing.aliyuncs.com/deploy/rec-realtime-0.8-SNAPSHOT.jar) + - 上传资源包: [rec-realtime-1.0-SNAPSHOT.jar](http://easyrec.oss-cn-beijing.aliyuncs.com/deploy/rec-realtime-1.0-SNAPSHOT.jar) ![image.png](../../images/odl_events_aggr.png) - 入口参数: com.aliyun.rec.feature.job.OnlineSampleEventsAggr @@ -143,7 +144,7 @@ datahub.inputTopic: user_behavior_log datahub.sinkTopic: odl_sample_aggr datahub.projectName: odl_sample_preprocess - datahub.startInSecs: '1655571600' + datahub.startInSecs: -900 input.userid: user_id input.itemid: item_id @@ -152,9 +153,11 @@ input.event-duration: play_time input.event-ts: ts input.expose-event: expose - input.event-extra: 'scene' - input.wait-positive-secs: '900' - state.max-event-num: '100' + input.event-extra: scene + input.wait-positive-secs: 900 + input.subId: 167XXXXXXX + state.ttl: 86400 + debug-mode: false ``` - datahub参数配置 @@ -165,11 +168,12 @@ - inputTopic: 读取的datahub topic - sinkTopic: 写入的datahub topic - startInSecs: 开始读取的位点,单位是seconds - - input: datahub schema配置 + - input: datahub输入topic schema配置 - userid: userid字段名 - itemid: itemid字段名 - request-id: request_id字段名 - - event-duration: event持续时间 + - event-duration: event持续时间, 可选配置 + - 如不需要构造时长label,可以不设置 - event-type: event类型字段 - event-ts: event发生时间字段(seconds) - expose-event: 曝光事件类型 @@ -177,8 +181,13 @@ - 其它事件延迟会补充下发 - event-extra: 其它event相关字段,多个字段以","分割 - wait-positive-secs: 等待正样本的时间, 单位是seconds + - 等待时间越长label越准确, 但是消耗的内存也越大, 样本下发延迟也越大 - state: - - max-event-num: 只存最新的n个event, 默认n是Integer.MAX_VALUE + - ttl: event在flink中保存的时间, 一般设成1天比较准确, 如果内存不够,可以视情况设小 + - state的存储格式是MapState\, key是event_type, value是event信息 + - 内部实现有两个辅助state: + - timer state: 记录timer是否触发 + - event number state: 记录该request_id, item_id下面发生的event数目 - debug-mode: 默认是false, 为true时, 打印详细的event信息和timer注册以及触发的信息 - datahub topic schema: - inputTopic: user_behavior_log @@ -194,8 +203,8 @@ - events数据格式: ```json [ - {"duration":6493,"eventTime":1659667790,"eventType":"play","properties":{"scene":"main"}}, - {"duration":6259,"eventTime":1659667796,"eventType":"play","properties":{"scene":"main"}} + {"duration":6493,"eventTime":1659667790,"eventType":"play","scene":"main"}, + {"duration":6259,"eventTime":1659667796,"eventType":"play","scene":"main"} ] ``` @@ -222,119 +231,103 @@ ); ``` -1. 样本join全埋点特征 - - ```sql - create temporary table odl_sample_with_lbl( - `request_id` STRING, - `user_id` STRING, - `item_id` STRING, - `ln_play_time` DOUBLE, - `is_valid_play` BIGINT, - `min_ts` BIGINT, - `max_ts` BIGINT, - `ts` AS TO_TIMESTAMP( - FROM_UNIXTIME(if (min_ts is not null and min_ts < UNIX_TIMESTAMP(), - min_ts, UNIX_TIMESTAMP()), 'yyyy-MM-dd HH:mm:ss')), - WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND - ) WITH ( - 'connector' = 'datahub', - 'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/', - 'project' = 'easy_rec_proj', - 'topic' = 'odl_sample_with_lbl', - 'subId' = '165519436817538OG0', - 'accessId' = 'LTAIxxx', - 'accessKey' = 'xxxxxxxxx', - 'startTime' = '2022-07-02 14:30:00' - ); - - create temporary table odl_callback_log( - `request_id` STRING, - `request_time` BIGINT, - `module` STRING, - `user_id` STRING, - `item_id` STRING, - `scene` STRING, - `generate_features` STRING, - `ts` AS - TO_TIMESTAMP(FROM_UNIXTIME(if(request_time is not null and request_time < UNIX_TIMESTAMP(), - request_time, UNIX_TIMESTAMP()), 'yyyy-MM-dd HH:mm:ss')), - WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND - ) WITH ( - 'connector' = 'datahub', - 'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/', - 'project' = 'easy_rec_proj', - 'topic' = 'odl_callback_log', - 'subId' = '16567769418786B4JH', - 'accessId' = 'LTAIxxx', - 'accessKey' = 'xxxxxx' - 'startTime' = '2022-07-02 14:30:00' - ); - - - create temporary view sample_view as - select a.request_id, a.user_id, a.item_id, a.ln_play_time, a.is_valid_play, feature, b.request_time - from odl_sample_with_lbl a - inner join ( - select * from ( - select request_id, item_id, request_time, generate_features as feature, ts, - row_number() over(partition by request_id, item_id order by proctime() asc) as rn - from odl_callback_log - where `module` = 'item' and (generate_features is not null and generate_features <> '') - ) where rn = 1 - ) b - on a.request_id = b.request_id and a.item_id = b.item_id - where a.ts between b.ts - INTERVAL '30' SECONDS and b.ts + INTERVAL '30' MINUTE; - ``` - - - create temporary table注意事项: - - ts作为watermark需要限制小于当前时间, 防止因为异常的timestamp导致watermark混乱 - - temporary table可以只列举需要的字段,不必枚举所有字段 - - datahub connector更多参数请参考[文档](https://help.aliyun.com/document_detail/177534.html) - - kafka connector参考[文档](https://help.aliyun.com/document_detail/177144.html) - - odl_callback_log需要做去重, 防止因为重复调用造成样本重复 - - flink配置开启ttl(millisecond), 控制state大小: - ```sql - table.exec.state.ttl: '2400000' - ``` - - ttl(miliseconds)的设置考虑两个因素: - - odl_sample_with_lbl相对请求时间request_time的延迟 - - ttl \< 相对延迟, 就会有样本丢失 - - 统计相对延迟: - - 将odl_sample_with_lbl / odl_callback_log落到MaxCompute - - 按request_id join 计算ts的差异 - - ttl越大state越大, 保存checkpoint时间越长, 性能下降 - - 存储引擎开启gemini kv分离(generate_features字段值很大): +1. label join 特征 + + - 上传资源包: [rec-realtime-1.0-SNAPSHOT.jar](http://easyrec.oss-cn-beijing.aliyuncs.com/deploy/rec-realtime-1.0-SNAPSHOT.jar) + ![image.png](../../images/odl_events_aggr.png) + + - 入口参数: com.aliyun.rec.feature.job.SampleFeatureJoinJob + + - flink配置: + ```sql - state.backend.gemini.kv.separate.mode: GLOBAL_ENABLE - state.backend.gemini.kv.separate.value.size.threshold: '500' + datahub.endpoint: 'http://dh-cn-hangzhou-int-vpc.aliyuncs.com/' + datahub.accessId: LTAIxxxxxxxxx + datahub.accessKey: Jdqtxxxxxxxx + datahub.projectName: xxx_proj + callback.table_name: pairec_callback_log + callback.request_id: request_id + callback.user_id: user_id + callback.item_id: item_id + callback.user_fea: user_features + callback.generate_fea: generate_features + callback.request_time: request_time + callback.subId: 1671540xxxxxxxxxxx + sample.table_name: odl_sample_with_lbl + sample.request_id: request_id + sample.user_id: user_id + sample.item_id: style_id + sample.event_ts: min_ts + sample.labels: 'ln_play_time:double,is_valid_play:bigint' + sample.subId: 16715xxxxxxxxxxxxx + output.table_name: odl_sample_with_feature_and_lbl + datahub.startInSecs: '-600' + timer.max_wait_ts: '300' + state.ttl: '2400' + state.user_fea_ttl: '7200' + eas.end_point: 13010xxxxxxxxxxx.vpc.cn-beijing.pai-eas.aliyuncs.com + eas.model_name: rank_callback_public + eas.token: YTcwZGU1ZTxxxxxxx + taskmanager.memory.network.max: 64mb ``` -1. 实时样本写入Datahub / Kafka - - ```sql - create temporary table odl_sample_with_fea_and_lbl( - `request_id` string, - `user_id` string, - `item_id` string, - `ln_play_time` double, - `is_valid_play` bigint, - `feature` string, - `request_time` bigint - ) WITH ( - 'connector' = 'datahub', - 'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/', - 'project' = 'odl_sample', - 'topic' = 'odl_sample_with_fea_and_lbl', - 'subId' = '1656xxxxxx', - 'accessId' = 'LTAIxxxxxxx', - 'accessKey' = 'Q82Mxxxxxxxx' - ); - insert into odl_sample_with_fea_and_lbl - select * from sample_view; - ``` - - - subId: datahub subscription id + - datahub参数配置 + - accessId: 鉴权id + - accessKey: 鉴权secret + - projectName: 项目名称 + - endpoint: 使用带vpc的endpoint + - startInSecs: 开始读取的位点,单位是seconds + - sinkTopic: 输出表配置 + - callback topic配置: 样本特征回流埋点配置 + - table_name: 样本特征回流topic + - request_id: 请求id, string类型 + - user_id: 用户id, string类型 + - item_id: 商品id, string类型 + - user_fea: 用户特征 + - generate_fea: fg之后生成的特征 + - request_time: 请求时间, bigint类型, 用于设置watermark + - subId: 订阅Id, 注意不要和其它的任务重复 + - sample topic配置 + - table_name: 样本topic + - request_id: 请求id, string类型 + - user_id: 用户id, string类型 + - item_id: 商品id, string类型 + - event_ts: 样本时间, 用于设置watermark + - labels: 样本label + - 格式为label_name:label_type,多个label之间用","分割 + - 也可以放一些其它要写入join的表的列, 如comments等 + - subId: datahub订阅id + - output topic配置: + - table_name: 输出topic + - request_id: 请求id, 可选配置, 默认和sample.request_id一致 + - user_id: 用户id, 可选配置, 默认和sample.user_id一致 + - item_id: 商品id, 可选配置, 默认和sample.item_id一致 + - features: 样本特征, 可选配置, 默认: "features" + - subId: datahub订阅id + - timer.max_wait_ts: sample到了之后, 等待特征到达的时间 + - 等待时间越长,鲁棒性越好,但是消耗的内存越多 + - state.ttl: 单位seconds + - generate_feature在state中的保留时间 + - 一般要大于timer.max_wait_ts, 否则join成功率比较低 + - 越大join的成功率越高, 但是消耗的内存也越多 + - state.user_fea_ttl: 单位seconds + - 如果内存不够, 同时又配置了callback_eas, 可以将user_fea存储的久一点 + - 当generate_feature过期之后, 仍然可以通过user_fea请求eas获得完整的样本特征 + - callback_eas: 针对延迟的样本, 是否需要重新请求eas获得特征, 默认是true + - eas: 如果callback_eas是true, 需要设置eas相关参数 + - end_point: 请求url + - model_name: 模型名称 + - token: 鉴权token + - debug-mode: 可选配置, 默认是false + - 设置成true打印调试信息, 排查join问题 + - 任务稳定后, 建议关闭, 避免影响性能 + - taskmanager.memory.network.max: flink系统配置, 减少network的内存消耗 + + - 备选方案: 也可以使用flink sql实现label和feature的join + + - [参考文档](./odl_sample_join_sql.md) + - 优点:更灵活 + - 缺点: 性能不高, 内存消耗大 ### 数据诊断 diff --git a/docs/source/feature/odl_sample_join_sql.md b/docs/source/feature/odl_sample_join_sql.md new file mode 100644 index 000000000..64fd1bf4f --- /dev/null +++ b/docs/source/feature/odl_sample_join_sql.md @@ -0,0 +1,115 @@ +# sample join + +## label join 特征 + +```sql + create temporary table odl_sample_with_lbl( + `request_id` STRING, + `user_id` STRING, + `item_id` STRING, + `ln_play_time` DOUBLE, + `is_valid_play` BIGINT, + `min_ts` BIGINT, + `max_ts` BIGINT, + `ts` AS TO_TIMESTAMP( + FROM_UNIXTIME(if (min_ts is not null and min_ts < UNIX_TIMESTAMP(), + min_ts, UNIX_TIMESTAMP()), 'yyyy-MM-dd HH:mm:ss')), + WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND + ) WITH ( + 'connector' = 'datahub', + 'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/', + 'project' = 'easy_rec_proj', + 'topic' = 'odl_sample_with_lbl', + 'subId' = '165519436817538OG0', + 'accessId' = 'LTAIxxx', + 'accessKey' = 'xxxxxxxxx', + 'startTime' = '2022-07-02 14:30:00' + ); + + create temporary table odl_callback_log( + `request_id` STRING, + `request_time` BIGINT, + `module` STRING, + `user_id` STRING, + `item_id` STRING, + `scene` STRING, + `generate_features` STRING, + `ts` AS + TO_TIMESTAMP(FROM_UNIXTIME(if(request_time is not null and request_time < UNIX_TIMESTAMP(), + request_time, UNIX_TIMESTAMP()), 'yyyy-MM-dd HH:mm:ss')), + WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND + ) WITH ( + 'connector' = 'datahub', + 'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/', + 'project' = 'easy_rec_proj', + 'topic' = 'odl_callback_log', + 'subId' = '16567769418786B4JH', + 'accessId' = 'LTAIxxx', + 'accessKey' = 'xxxxxx' + 'startTime' = '2022-07-02 14:30:00' + ); + + + create temporary view sample_view as + select a.request_id, a.user_id, a.item_id, a.ln_play_time, a.is_valid_play, feature, b.request_time + from odl_sample_with_lbl a + inner join ( + select * from ( + select request_id, item_id, request_time, generate_features as feature, ts, + row_number() over(partition by request_id, item_id order by proctime() asc) as rn + from odl_callback_log + where `module` = 'item' and (generate_features is not null and generate_features <> '') + ) where rn = 1 + ) b + on a.request_id = b.request_id and a.item_id = b.item_id + where a.ts between b.ts - INTERVAL '30' SECONDS and b.ts + INTERVAL '30' MINUTE; +``` + +- create temporary table注意事项: + - ts作为watermark需要限制小于当前时间, 防止因为异常的timestamp导致watermark混乱 + - temporary table可以只列举需要的字段,不必枚举所有字段 + - datahub connector更多参数请参考[文档](https://help.aliyun.com/document_detail/177534.html) + - kafka connector参考[文档](https://help.aliyun.com/document_detail/177144.html) +- odl_callback_log需要做去重, 防止因为重复调用造成样本重复 +- flink配置开启ttl(millisecond), 控制state大小: + ```sql + table.exec.state.ttl: '2400000' + ``` + - ttl(miliseconds)的设置考虑两个因素: + - odl_sample_with_lbl相对请求时间request_time的延迟 + - ttl \< 相对延迟, 就会有样本丢失 + - 统计相对延迟: + - 将odl_sample_with_lbl / odl_callback_log落到MaxCompute + - 按request_id join 计算ts的差异 + - ttl越大state越大, 保存checkpoint时间越长, 性能下降 +- 存储引擎开启gemini kv分离(generate_features字段值很大): + ```sql + state.backend.gemini.kv.separate.mode: GLOBAL_ENABLE + state.backend.gemini.kv.separate.value.size.threshold: '500' + ``` + +## 实时样本写入Datahub / Kafka + +```sql + create temporary table odl_sample_with_fea_and_lbl( + `request_id` string, + `user_id` string, + `item_id` string, + `ln_play_time` double, + `is_valid_play` bigint, + `feature` string, + `request_time` bigint + ) WITH ( + 'connector' = 'datahub', + 'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/', + 'project' = 'odl_sample', + 'topic' = 'odl_sample_with_fea_and_lbl', + 'subId' = '1656xxxxxx', + 'accessId' = 'LTAIxxxxxxx', + 'accessKey' = 'Q82Mxxxxxxxx' + ); + insert into odl_sample_with_fea_and_lbl + select * from sample_view; +``` + +- subId: datahub subscription id diff --git a/easy_rec/python/builders/exporter_builder.py b/easy_rec/python/builders/exporter_builder.py new file mode 100644 index 000000000..f4b08b661 --- /dev/null +++ b/easy_rec/python/builders/exporter_builder.py @@ -0,0 +1,67 @@ +# -*- encoding:utf-8 -*- +# Copyright (c) Alibaba, Inc. and its affiliates. + +# when version of tensorflow > 1.8 strip_default_attrs set true will cause +# saved_model inference core, such as: +# [libprotobuf FATAL external/protobuf_archive/src/google/protobuf/map.h:1058] +# CHECK failed: it != end(): key not found: new_axis_mask +# so temporarily modify strip_default_attrs of _SavedModelExporter in +# tf.estimator.exporter to false by default + +import logging + +from easy_rec.python.compat import exporter +from easy_rec.python.utils import config_util + + +def build(exporter_type, export_config, export_input_fn): + exporter_types = [ + x.strip() for x in exporter_type.split(',') if x.strip() != '' + ] + exporters = [] + for tmp_type in exporter_types: + if tmp_type == 'final': + exporters.append( + exporter.FinalExporter( + name='final', serving_input_receiver_fn=export_input_fn)) + elif tmp_type == 'latest': + exporters.append( + exporter.LatestExporter( + name='latest', + serving_input_receiver_fn=export_input_fn, + exports_to_keep=export_config.exports_to_keep)) + elif tmp_type == 'large': + extra_params = config_util.parse_oss_params(export_config.oss_params) + exporters.append( + exporter.LargeExporter( + name='large', + serving_input_receiver_fn=export_input_fn, + extra_params=extra_params, + exports_to_keep=export_config.exports_to_keep)) + elif tmp_type == 'best': + logging.info( + 'will use BestExporter, metric is %s, the bigger the better: %d' % + (export_config.best_exporter_metric, export_config.metric_bigger)) + + def _metric_cmp_fn(best_eval_result, current_eval_result): + logging.info('metric: best = %s current = %s' % + (str(best_eval_result), str(current_eval_result))) + if export_config.metric_bigger: + return (best_eval_result[export_config.best_exporter_metric] < + current_eval_result[export_config.best_exporter_metric]) + else: + return (best_eval_result[export_config.best_exporter_metric] > + current_eval_result[export_config.best_exporter_metric]) + + exporters.append( + exporter.BestExporter( + name='best', + serving_input_receiver_fn=export_input_fn, + compare_fn=_metric_cmp_fn, + exports_to_keep=export_config.exports_to_keep)) + elif tmp_type == 'none': + continue + else: + raise ValueError('Unknown exporter type %s' % tmp_type) + + return exporters diff --git a/easy_rec/python/compat/exporter.py b/easy_rec/python/compat/exporter.py index d8e3ed418..4b946a4d2 100644 --- a/easy_rec/python/compat/exporter.py +++ b/easy_rec/python/compat/exporter.py @@ -19,6 +19,7 @@ from __future__ import division from __future__ import print_function +import logging import os from tensorflow.python.estimator import gc @@ -31,7 +32,9 @@ from tensorflow.python.platform import tf_logging from tensorflow.python.summary import summary_iterator +from easy_rec.python.utils import config_util from easy_rec.python.utils import io_util +from easy_rec.python.utils.export_big_model import export_big_model_to_oss def _loss_smaller(best_eval_result, current_eval_result): @@ -471,3 +474,159 @@ def _export_version_parser(path): except errors_impl.NotFoundError as e: tf_logging.warn('Can not delete %s recursively: %s', p.path, e) # pylint: enable=protected-access + + +class LargeExporter(Exporter): + """This class regularly exports the serving graph and checkpoints. + + In addition to exporting, this class also garbage collects stale exports. + """ + + def __init__(self, + name, + serving_input_receiver_fn, + extra_params={}, + assets_extra=None, + exports_to_keep=5): + """Create an `Exporter` to use with `tf.estimator.EvalSpec`. + + Args: + name: unique name of this `Exporter` that is going to be used in the + export path. + serving_input_receiver_fn: a function that takes no arguments and returns + a `ServingInputReceiver`. + assets_extra: An optional dict specifying how to populate the assets.extra + directory within the exported SavedModel. Each key should give the + destination path (including the filename) relative to the assets.extra + directory. The corresponding value gives the full path of the source + file to be copied. For example, the simple case of copying a single + file without renaming it is specified as + `{'my_asset_file.txt': '/path/to/my_asset_file.txt'}`. + as_text: whether to write the SavedModel proto in text format. Defaults to + `False`. + exports_to_keep: Number of exports to keep. Older exports will be + garbage-collected. Defaults to 5. Set to `None` to disable garbage + collection. + + Raises: + ValueError: if any arguments is invalid. + """ + self._name = name + self._serving_input_fn = serving_input_receiver_fn + self._assets_extra = assets_extra + self._exports_to_keep = exports_to_keep + self._extra_params = extra_params + self._embedding_version = 0 + self._verbose = extra_params.get('verbose', False) + if exports_to_keep is not None and exports_to_keep <= 0: + raise ValueError( + '`exports_to_keep`, if provided, must be positive number') + + @property + def name(self): + return self._name + + def export(self, estimator, export_path, checkpoint_path, eval_result, + is_the_final_export): + pipeline_config_path = os.path.join(estimator.model_dir, 'pipeline.config') + pipeline_config = config_util.get_configs_from_pipeline_file( + pipeline_config_path) + extra_params = dict(self._extra_params) + # Exchange embedding_version to avoid conflict, such as the trainer is overwrite + # embedding, while the online server is reading an old graph with the overwrited + # embedding(which may be incomplete), so we use double versions to ensure stability. + # Only two versions of embeddings are kept to reduce disk space consumption. + extra_params['oss_path'] = os.path.join(extra_params['oss_path'], + str(self._embedding_version)) + self._embedding_version = 1 - self._embedding_version + if pipeline_config.train_config.HasField('incr_save_config'): + incr_save_config = pipeline_config.train_config.incr_save_config + extra_params['incr_update'] = {} + incr_save_type = incr_save_config.WhichOneof('incr_update') + logging.info('incr_save_type=%s' % incr_save_type) + if incr_save_type: + extra_params['incr_update'][incr_save_type] = getattr( + incr_save_config, incr_save_type) + else: + incr_save_config = None + export_result = export_big_model_to_oss(export_path, pipeline_config, + extra_params, + self._serving_input_fn, estimator, + checkpoint_path, None, + self._verbose) + # clear old incr_save updates to reduce burden for file listing + # at server side + if incr_save_config is not None and incr_save_config.HasField('fs'): + fs = incr_save_config.fs + if fs.relative: + incr_save_dir = os.path.join(estimator.model_dir, fs.incr_save_dir) + else: + incr_save_dir = fs.incr_save_dir + global_step = int(checkpoint_path.split('-')[-1]) + limit_step = global_step - 1000 + if limit_step <= 0: + limit_step = 0 + if limit_step > 0: + dense_updates = gfile.Glob(os.path.join(incr_save_dir, 'dense_update*')) + keep_ct, drop_ct = 0, 0 + for k in dense_updates: + if not k.endswith('.done'): + update_step = int(k.split('_')[-1]) + if update_step < limit_step: + gfile.Remove(k + '.done') + gfile.Remove(k) + logging.info('clear old update: %s' % k) + drop_ct += 1 + else: + keep_ct += 1 + logging.info( + '[global_step=%d][limit_step=%d] drop %d and keep %d dense_updates' + % (global_step, limit_step, drop_ct, keep_ct)) + sparse_updates = gfile.Glob( + os.path.join(incr_save_dir, 'sparse_update*')) + keep_ct, drop_ct = 0, 0 + for k in sparse_updates: + if not k.endswith('.done'): + update_step = int(k.split('_')[-1]) + if update_step < limit_step: + gfile.Remove(k + '.done') + gfile.Remove(k) + logging.info('clear old update: %s' % k) + drop_ct += 1 + else: + keep_ct += 1 + logging.info( + '[global_step=%d][limit_step=%d] drop %d and keep %d sparse_updates' + % (global_step, limit_step, drop_ct, keep_ct)) + self._garbage_collect_exports(export_path) + return export_result + + def _garbage_collect_exports(self, export_dir_base): + """Deletes older exports, retaining only a given number of the most recent. + + Export subdirectories are assumed to be named with monotonically increasing + integers; the most recent are taken to be those with the largest values. + + Args: + export_dir_base: the base directory under which each export is in a + versioned subdirectory. + """ + if self._exports_to_keep is None: + return + + def _export_version_parser(path): + # create a simple parser that pulls the export_version from the directory. + filename = os.path.basename(path.path) + if not (len(filename) == 10 and filename.isdigit()): + return None + return path._replace(export_version=int(filename)) + + # pylint: disable=protected-access + keep_filter = gc._largest_export_versions(self._exports_to_keep) + delete_filter = gc._negation(keep_filter) + for p in delete_filter( + gc._get_paths(export_dir_base, parser=_export_version_parser)): + try: + gfile.DeleteRecursively(io_util.fix_oss_dir(p.path)) + except errors_impl.NotFoundError as e: + logging.warning('Can not delete %s recursively: %s' % (p.path, e)) diff --git a/easy_rec/python/inference/predictor.py b/easy_rec/python/inference/predictor.py index 9de110802..a4b1549a0 100644 --- a/easy_rec/python/inference/predictor.py +++ b/easy_rec/python/inference/predictor.py @@ -363,7 +363,7 @@ def input_names(self): Returns: a list, which conaining the name of input nodes available in model """ - return list(self._inputs_map.keys()) + return self._predictor_impl.input_names @property def output_names(self): diff --git a/easy_rec/python/input/datahub_input.py b/easy_rec/python/input/datahub_input.py index 786d48d83..78f4c7785 100644 --- a/easy_rec/python/input/datahub_input.py +++ b/easy_rec/python/input/datahub_input.py @@ -1,11 +1,16 @@ # -*- encoding:utf-8 -*- # Copyright (c) Alibaba, Inc. and its affiliates. +import datetime import json import logging +import time import traceback +import numpy as np import tensorflow as tf from tensorflow.python.framework import dtypes +from tensorflow.python.ops import script_ops +from tensorflow.python.ops import string_ops from tensorflow.python.platform import gfile from easy_rec.python.input.input import Input @@ -19,9 +24,10 @@ try: from datahub import DataHub - from datahub.exceptions import DatahubException + # from datahub.exceptions import DatahubException from datahub.models import RecordType from datahub.models import CursorType + from datahub.models.shard import ShardState import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logging.getLogger('datahub.account').setLevel(logging.INFO) @@ -31,6 +37,12 @@ % traceback.format_exc()) DataHub = None +if tf.__version__ >= '2.0': + ignore_errors = tf.data.experimental.ignore_errors() + tf = tf.compat.v1 +else: + ignore_errors = tf.contrib.data.ignore_errors() + class DataHubInput(Input): """DataHubInput is used for online train.""" @@ -60,7 +72,9 @@ def __init__(self, akId = akId.encode('utf-8') akSecret = akSecret.encode('utf-8') endpoint = endpoint.encode('utf-8') - self._datahub = DataHub(akId, akSecret, endpoint) + self._max_conn_num = 8 + self._datahub = DataHub( + akId, akSecret, endpoint, pool_maxsize=self._max_conn_num) else: self._datahub = None except Exception as ex: @@ -70,12 +84,30 @@ def __init__(self, if datahub_config: shard_result = self._datahub.list_shard(self._datahub_config.project, self._datahub_config.topic) - shards = shard_result.shards + shards = [x for x in shard_result.shards if x.state == ShardState.ACTIVE] self._all_shards = shards - self._shards = [ - shards[i] for i in range(len(shards)) if (i % task_num) == task_index - ] - logging.info('all shards: %s' % str(self._shards)) + + if self._data_config.chief_redundant and self._task_num > 1: + if task_index == 0: + self._shards = [shards[0]] + else: + task_num -= 1 + task_index -= 1 + self._shards = [ + shards[i] + for i in range(len(shards)) + if (i % task_num) == task_index + ] + else: + self._shards = [ + shards[i] + for i in range(len(shards)) + if (i % task_num) == task_index + ] + + logging.info('all_shards[len=%d]: %s task_shards[len=%d]: %s' % + (len(self._all_shards), str( + self._all_shards), len(self._shards), str(self._shards))) offset_type = datahub_config.WhichOneof('offset') if offset_type == 'offset_time': @@ -116,11 +148,22 @@ def __init__(self, for x in self._label_fields: assert x in self._dh_field_names, 'label_field[%s] is not in datahub' % x + self._dh_lbl_ids = [ + self._dh_field_names.index(x) for x in self._label_fields + ] + if self._data_config.HasField('sample_weight'): x = self._data_config.sample_weight assert x in self._dh_field_names, 'sample_weight[%s] is not in datahub' % x - self._read_cnt = 32 + self._read_cnt = 512 + self._log_every_cnts = self._data_config.batch_size * 16 + self._max_retry = 8 + + # record shard read cnt + self._shard_read_cnt = {} + self._shard_cursor_seq = {} + self._last_log_cnt = {} if len(self._dh_fea_ids) > 1: self._filter_fea_func = lambda record: ''.join( @@ -143,10 +186,10 @@ def _dump_offsets(): } return json.dumps(all_offsets) - field_dict[Input.DATA_OFFSET] = tf.py_func(_dump_offsets, [], dtypes.string) + field_dict[Input.DATA_OFFSET] = script_ops.py_func(_dump_offsets, [], + dtypes.string) - for x in self._label_fields: - dh_id = self._dh_field_names.index(x) + for x, dh_id in zip(self._label_fields, self._dh_lbl_ids): field_dict[x] = fields[dh_id] feature_inputs = self.get_feature_input_fields() @@ -164,7 +207,7 @@ def _dump_offsets(): for fea_id in range(1, len(feature_fields)): feature = feature + self._data_config.separator + feature_fields[fea_id] - feature = tf.string_split( + feature = string_ops.string_split( feature, self._data_config.separator, skip_empty=False) fields = tf.reshape(feature.values, [-1, feature_num]) @@ -218,10 +261,64 @@ def _dump_record(self, record): feas.append(self._dh_field_names[fid] + ':' + str(record.values[fid])) return ';'.join(feas) - def _datahub_generator(self): - logging.info('start epoch[%d]' % self._num_epoch) + def _datahub_generator_for_redundant_chief(self): + logging.info('start chief redundant epoch[%d]' % self._num_epoch) self._num_epoch += 1 + self._datahub.wait_shards_ready(self._datahub_config.project, + self._datahub_config.topic) + topic_result = self._datahub.get_topic(self._datahub_config.project, + self._datahub_config.topic) + if topic_result.record_type != RecordType.TUPLE: + logging.error('datahub topic type(%s) illegal' % + str(topic_result.record_type)) + record_schema = topic_result.record_schema + shard = self._shards[0] + shard_id = shard.shard_id + + if shard_id not in self._offset_dict: + cursor_result = self._datahub.get_cursor(self._datahub_config.project, + self._datahub_config.topic, + shard_id, CursorType.OLDEST) + cursor = cursor_result.cursor + else: + cursor = self._offset_dict[shard_id] + all_records = [] + while len(all_records) <= self._batch_size * 8: + try: + get_result = self._datahub.get_tuple_records( + self._datahub_config.project, self._datahub_config.topic, shard_id, + record_schema, cursor, self._read_cnt) + for row_id, record in enumerate(get_result.records): + if self._is_data_empty(record): + logging.warning('skip empty data record: %s' % + self._dump_record(record)) + continue + all_records.append(tuple(record.values)) + except Exception as ex: + logging.warning( + 'get_tuple_records exception: shard_id=%s cursor=%s read_cnt=%d exception:%s traceback:%s' + % + (shard_id, cursor, self._read_cnt, str(ex), traceback.format_exc())) + sid = 0 + while True: + if sid >= len(all_records): + sid = 0 + yield all_records[sid] + sid += 1 + + def _datahub_generator(self, part_id, part_num): + avg_num = len(self._shards) / part_num + res_num = len(self._shards) % part_num + start_id = avg_num * part_id + min(part_id, res_num) + end_id = avg_num * (part_id + 1) + min(part_id + 1, res_num) + + thread_shards = self._shards[start_id:end_id] + + logging.info( + 'start generator[part_id=%d][part_num=%d][shard_num=%d][thread_shard_num=%d:%d]' + % (part_id, part_num, len(self._shards), start_id, end_id)) + try: self._datahub.wait_shards_ready(self._datahub_config.project, self._datahub_config.topic) @@ -232,43 +329,93 @@ def _datahub_generator(self): str(topic_result.record_type)) record_schema = topic_result.record_schema - tid = 0 - while True: - shard_id = self._shards[tid].shard_id - tid += 1 - if tid >= len(self._shards): - tid = 0 - - if shard_id not in self._offset_dict: - cursor_result = self._datahub.get_cursor(self._datahub_config.project, - self._datahub_config.topic, - shard_id, CursorType.OLDEST) - cursor = cursor_result.cursor - else: - cursor = self._offset_dict[shard_id] - - get_result = self._datahub.get_tuple_records( - self._datahub_config.project, self._datahub_config.topic, shard_id, - record_schema, cursor, self._read_cnt) - count = get_result.record_count - if count == 0: - continue - for row_id, record in enumerate(get_result.records): - if self._is_data_empty(record): - logging.warning('skip empty data record: %s' % - self._dump_record(record)) + try: + iter_id = 0 + while True: + shard_id = thread_shards[iter_id].shard_id + iter_id += 1 + if iter_id >= len(thread_shards): + iter_id = 0 + if shard_id not in self._offset_dict: + cursor_result = self._datahub.get_cursor( + self._datahub_config.project, self._datahub_config.topic, + shard_id, CursorType.OLDEST) + cursor = cursor_result.cursor + else: + cursor = self._offset_dict[shard_id] + + max_retry = self._max_retry + get_result = None + while max_retry > 0: + try: + get_result = self._datahub.get_tuple_records( + self._datahub_config.project, self._datahub_config.topic, + shard_id, record_schema, cursor, self._read_cnt) + break + except Exception as ex: + logging.warning( + 'get_tuple_records exception: shard_id=%s cursor=%s read_cnt=%d exception:%s traceback:%s' + % (shard_id, cursor, self._read_cnt, str(ex), + traceback.format_exc())) + max_retry -= 1 + if get_result is None: + logging.error('failed to get_tuple_records after max_retry=%d' % + self._max_retry) + raise RuntimeError( + 'failed to get_tuple_records after max_retry=%d' % + self._max_retry) + count = get_result.record_count + if count == 0: + # avoid too frequent access to datahub server + time.sleep(0.1) continue - if self._filter_fea_func is not None: - if self._filter_fea_func(record): - logging.warning('filter data record: %s' % + self._shard_cursor_seq[shard_id] = get_result.start_seq + (count - 1) + for row_id, record in enumerate(get_result.records): + if self._is_data_empty(record): + logging.warning('skip empty data record: %s' % self._dump_record(record)) continue - yield tuple(list(record.values)) - if shard_id not in self._offset_dict or get_result.next_cursor > self._offset_dict[ - shard_id]: - self._offset_dict[shard_id] = get_result.next_cursor - except DatahubException as ex: - logging.error('DatahubException: %s' % str(ex)) + yield tuple(record.values) + if shard_id not in self._offset_dict or get_result.next_cursor > self._offset_dict[ + shard_id]: + self._offset_dict[shard_id] = get_result.next_cursor + self._update_counter(shard_id, count) + except Exception as ex: + logging.error('fetch_sample thread[shard_id=%s] fail: %s %s' % + (shard_id, str(ex), traceback.format_exc())) + except Exception as ex: + logging.error('_datahub_generator exception: %s %s' % + (str(ex), traceback.format_exc())) + + def _update_counter(self, shard_id, count): + if count == 0: + return + if shard_id not in self._shard_read_cnt: + self._shard_read_cnt[shard_id] = count + self._log_shard(shard_id) + else: + self._shard_read_cnt[shard_id] += count + tmp_cnt = self._last_log_cnt.get(shard_id, 0) + if self._shard_read_cnt[shard_id] - tmp_cnt > self._log_every_cnts: + self._log_shard(shard_id) + self._last_log_cnt[shard_id] = self._shard_read_cnt[shard_id] + + def _log_shard(self, shard_id): + if shard_id not in self._shard_cursor_seq: + return + tmp_seq = self._shard_cursor_seq[shard_id] + if tmp_seq < 0: + return + cursor_result = self._datahub.get_cursor( + self._datahub_config.project, + self._datahub_config.topic, + shard_id, + CursorType.SEQUENCE, + param=tmp_seq) + ts = cursor_result.record_time / 1000.0 + ts_s = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + logging.info('shard[%s]: cursor=%s sequence=%d ts=%.3f datetime=%s' % + (shard_id, cursor_result.cursor, tmp_seq, ts, ts_s)) def _build(self, mode, params): if mode == tf.estimator.ModeKeys.TRAIN: @@ -286,10 +433,22 @@ def _build(self, mode, params): ] list_shapes = tuple(list_shapes) # read datahub - dataset = tf.data.Dataset.from_generator( - self._datahub_generator, - output_types=list_types, - output_shapes=list_shapes) + if self._data_config.chief_redundant and self._task_num > 1 and self._task_index == 0: + dataset = tf.data.Dataset.from_generator( + self._datahub_generator_for_redundant_chief, + output_types=list_types, + output_shapes=list_shapes) + else: + split_num = min(self._max_conn_num, len(self._shards)) + dataset = tf.data.Dataset.from_tensor_slices( + np.arange(split_num)).interleave( + lambda x: tf.data.Dataset.from_generator( + self._datahub_generator, + output_types=list_types, + output_shapes=list_shapes, + args=[x, split_num]), + cycle_length=len(self._shards), + num_parallel_calls=len(self._shards)) if mode == tf.estimator.ModeKeys.TRAIN: if self._data_config.shuffle: dataset = dataset.shuffle( @@ -307,6 +466,8 @@ def _build(self, mode, params): dataset = dataset.map( map_func=self._preprocess, num_parallel_calls=self._data_config.num_parallel_calls) + if self._data_config.ignore_error: + dataset = dataset.apply(ignore_errors) dataset = dataset.prefetch(buffer_size=self._prefetch_size) if mode != tf.estimator.ModeKeys.PREDICT: dataset = dataset.map(lambda x: diff --git a/easy_rec/python/input/input.py b/easy_rec/python/input/input.py index 3e95dc5bb..cef87ef21 100644 --- a/easy_rec/python/input/input.py +++ b/easy_rec/python/input/input.py @@ -805,6 +805,9 @@ def _preprocess(self, field_dict): if self._mode != tf.estimator.ModeKeys.PREDICT: parsed_dict[constant.SAMPLE_WEIGHT] = field_dict[ self._data_config.sample_weight] + + if Input.DATA_OFFSET in field_dict: + parsed_dict[Input.DATA_OFFSET] = field_dict[Input.DATA_OFFSET] return {'feature': parsed_dict, 'label': label_dict} def _lookup_preprocess(self, fc, field_dict): diff --git a/easy_rec/python/main.py b/easy_rec/python/main.py index 2400e7e9d..925e74965 100644 --- a/easy_rec/python/main.py +++ b/easy_rec/python/main.py @@ -15,8 +15,8 @@ from tensorflow.core.protobuf import saved_model_pb2 import easy_rec +from easy_rec.python.builders import exporter_builder from easy_rec.python.builders import strategy_builder -from easy_rec.python.compat import exporter from easy_rec.python.input.input import Input from easy_rec.python.model.easy_rec_estimator import EasyRecEstimator from easy_rec.python.model.easy_rec_model import EasyRecModel @@ -47,16 +47,6 @@ load_class.auto_import() -# when version of tensorflow > 1.8 strip_default_attrs set true will cause -# saved_model inference core, such as: -# [libprotobuf FATAL external/protobuf_archive/src/google/protobuf/map.h:1058] -# CHECK failed: it != end(): key not found: new_axis_mask -# so temporarily modify strip_default_attrs of _SavedModelExporter in -# tf.estimator.exporter to false by default -FinalExporter = exporter.FinalExporter -LatestExporter = exporter.LatestExporter -BestExporter = exporter.BestExporter - def _get_input_fn(data_config, feature_configs, @@ -120,7 +110,7 @@ def _create_estimator(pipeline_config, distribution=None, params={}): run_config = tf.estimator.RunConfig( model_dir=pipeline_config.model_dir, log_step_count_steps=None, # train_config.log_step_count_steps, - save_summary_steps=train_config.save_summary_steps, + save_summary_steps=0, # train_config.save_summary_steps, save_checkpoints_steps=save_checkpoints_steps, save_checkpoints_secs=save_checkpoints_secs, keep_checkpoint_max=train_config.keep_checkpoint_max, @@ -156,44 +146,8 @@ def _create_eval_export_spec(pipeline_config, eval_data, check_mode=False): export_config, check_mode=check_mode, **input_fn_kwargs) - if export_config.exporter_type == 'final': - exporters = [ - FinalExporter(name='final', serving_input_receiver_fn=export_input_fn) - ] - elif export_config.exporter_type == 'latest': - exporters = [ - LatestExporter( - name='latest', - serving_input_receiver_fn=export_input_fn, - exports_to_keep=export_config.exports_to_keep) - ] - elif export_config.exporter_type == 'best': - logging.info( - 'will use BestExporter, metric is %s, the bigger the better: %d' % - (export_config.best_exporter_metric, export_config.metric_bigger)) - - def _metric_cmp_fn(best_eval_result, current_eval_result): - logging.info('metric: best = %s current = %s' % - (str(best_eval_result), str(current_eval_result))) - if export_config.metric_bigger: - return (best_eval_result[export_config.best_exporter_metric] < - current_eval_result[export_config.best_exporter_metric]) - else: - return (best_eval_result[export_config.best_exporter_metric] > - current_eval_result[export_config.best_exporter_metric]) - - exporters = [ - BestExporter( - name='best', - serving_input_receiver_fn=export_input_fn, - compare_fn=_metric_cmp_fn, - exports_to_keep=export_config.exports_to_keep) - ] - elif export_config.exporter_type == 'none': - exporters = [] - else: - raise ValueError('Unknown exporter type %s' % export_config.exporter_type) - + exporters = exporter_builder.build(export_config.exporter_type, export_config, + export_input_fn) # set throttle_secs to a small number, so that we can control evaluation # interval steps by checkpoint saving steps eval_input_fn = _get_input_fn(data_config, feature_configs, eval_data, @@ -733,13 +687,16 @@ def export(export_dir, if incr_save_type: extra_params['incr_update'][incr_save_type] = getattr( incr_save_config, incr_save_type) + tf_config = os.environ.get('TF_CONFIG', None) return export_big_model_to_oss(export_dir, pipeline_config, extra_params, serving_input_fn, estimator, ckpt_path, - verbose) + tf_config, verbose) if 'redis_url' in extra_params: + tf_config = os.environ.get('TF_CONFIG', None) return export_big_model(export_dir, pipeline_config, extra_params, - serving_input_fn, estimator, ckpt_path, verbose) + serving_input_fn, estimator, ckpt_path, tf_config, + verbose) final_export_dir = estimator.export_savedmodel( export_dir_base=export_dir, diff --git a/easy_rec/python/model/easy_rec_estimator.py b/easy_rec/python/model/easy_rec_estimator.py index 968cd6bd2..e4c62e9ed 100644 --- a/easy_rec/python/model/easy_rec_estimator.py +++ b/easy_rec/python/model/easy_rec_estimator.py @@ -155,15 +155,19 @@ def _train_model_fn(self, features, labels, run_config): if Input.DATA_OFFSET in features: task_index, task_num = estimator_utils.get_task_index_and_num() + if self._pipeline_config.data_config.chief_redundant and task_num > 1: + task_index -= 1 + task_num -= 1 data_offset_var = tf.get_variable( name=Input.DATA_OFFSET, dtype=tf.string, shape=[task_num], collections=[tf.GraphKeys.GLOBAL_VARIABLES, Input.DATA_OFFSET], trainable=False) - update_offset = tf.assign(data_offset_var[task_index], - features[Input.DATA_OFFSET]) - ops.add_to_collection(tf.GraphKeys.UPDATE_OPS, update_offset) + if task_index >= 0: + update_offset = tf.assign(data_offset_var[task_index], + features[Input.DATA_OFFSET]) + ops.add_to_collection(tf.GraphKeys.UPDATE_OPS, update_offset) else: data_offset_var = None @@ -174,6 +178,7 @@ def _train_model_fn(self, features, labels, run_config): global_vars = {x.name: x for x in tf.global_variables()} for x in update_ops: if isinstance(x, ops.Operation) and x.inputs[0].name in global_vars: + logging.info('add dense update %s' % x.inputs[0].name) ops.add_to_collection(constant.DENSE_UPDATE_VARIABLES, global_vars[x.inputs[0].name]) update_op = tf.group(*update_ops, name='update_barrier') @@ -398,24 +403,31 @@ def _train_model_fn(self, features, labels, run_config): ready_for_local_init_op=tf.report_uninitialized_variables( var_list=initialize_var_list)) # saver hook - saver_hook = estimator_utils.CheckpointSaverHook( - checkpoint_dir=self.model_dir, - save_secs=self._config.save_checkpoints_secs, - save_steps=self._config.save_checkpoints_steps, - scaffold=scaffold, - write_graph=self.train_config.write_graph, - data_offset_var=data_offset_var, - increment_save_config=self.incr_save_config) chief_hooks = [] if estimator_utils.is_chief(): + saver_hook = estimator_utils.CheckpointSaverHook( + checkpoint_dir=self.model_dir, + save_secs=self._config.save_checkpoints_secs, + save_steps=self._config.save_checkpoints_steps, + scaffold=scaffold, + write_graph=self.train_config.write_graph, + data_offset_var=data_offset_var, + increment_save_config=self.incr_save_config) hooks.append(saver_hook) # profiling hook - if self.train_config.is_profiling and estimator_utils.is_chief(): + if self.train_config.is_profiling and estimator_utils.is_first_worker(): profile_hook = tf.train.ProfilerHook( save_steps=log_step_count_steps, output_dir=self.model_dir) hooks.append(profile_hook) + if estimator_utils.is_first_worker(): + hooks.append( + basic_session_run_hooks.SummarySaverHook( + save_steps=self.train_config.save_summary_steps, + output_dir=self.model_dir, + scaffold=scaffold)) + return tf.estimator.EstimatorSpec( mode=tf.estimator.ModeKeys.TRAIN, loss=loss, @@ -557,6 +569,10 @@ def _export_model_fn(self, features, labels, run_config, params): assert var_name in all_vars, 'dense_train_var[%s] is not found' % var_name ops.add_to_collection(constant.DENSE_UPDATE_VARIABLES, all_vars[var_name]) + logging.info('dense train vars num=%d' % len(all_vars)) + elif self.train_config.HasField('incr_save_config'): + logging.warning('dense_train_var_path does not exist: %s' % + dense_train_var_path) # add more asset files if len(export_config.asset_files) > 0: diff --git a/easy_rec/python/ops/1.12/incr_record.so b/easy_rec/python/ops/1.12/incr_record.so index 91ead3a4e..821391e7b 100755 Binary files a/easy_rec/python/ops/1.12/incr_record.so and b/easy_rec/python/ops/1.12/incr_record.so differ diff --git a/easy_rec/python/ops/1.12_pai/incr_record.so b/easy_rec/python/ops/1.12_pai/incr_record.so index ab607d79a..cb13bf6b5 100755 Binary files a/easy_rec/python/ops/1.12_pai/incr_record.so and b/easy_rec/python/ops/1.12_pai/incr_record.so differ diff --git a/easy_rec/python/ops/1.15/incr_record.so b/easy_rec/python/ops/1.15/incr_record.so index 139a0aa1d..a548b9c9c 100755 Binary files a/easy_rec/python/ops/1.15/incr_record.so and b/easy_rec/python/ops/1.15/incr_record.so differ diff --git a/easy_rec/python/ops/1.15/libembed_op.so b/easy_rec/python/ops/1.15/libembed_op.so index 970ae21ae..f10928d82 100755 Binary files a/easy_rec/python/ops/1.15/libembed_op.so and b/easy_rec/python/ops/1.15/libembed_op.so differ diff --git a/easy_rec/python/protos/export.proto b/easy_rec/python/protos/export.proto index 85724524f..608685ea6 100644 --- a/easy_rec/python/protos/export.proto +++ b/easy_rec/python/protos/export.proto @@ -56,4 +56,6 @@ message ExportConfig { // export asset files repeated string asset_files = 15; + + optional string oss_params = 16; } diff --git a/easy_rec/python/test/dh_local_run.py b/easy_rec/python/test/dh_local_run.py index be514d382..1319cdbef 100644 --- a/easy_rec/python/test/dh_local_run.py +++ b/easy_rec/python/test/dh_local_run.py @@ -6,17 +6,16 @@ import tensorflow as tf -from easy_rec.python.test.odps_command import OdpsCommand -from easy_rec.python.test.odps_test_prepare import prepare +from easy_rec.python.test.odps_test_prepare import change_files from easy_rec.python.test.odps_test_util import OdpsOSSConfig -from easy_rec.python.test.odps_test_util import delete_oss_path -from easy_rec.python.test.odps_test_util import get_oss_bucket +from easy_rec.python.utils import config_util from easy_rec.python.utils import test_utils logging.basicConfig( level=logging.INFO, format='[%(asctime)s][%(levelname)s] %(message)s') -odps_oss_config = OdpsOSSConfig(script_path='./samples/dh_script') +DATAHUB_TEST_SCRIPT_PATH = './samples/dh_script' +odps_oss_config = OdpsOSSConfig(script_path=DATAHUB_TEST_SCRIPT_PATH) class TestPipelineOnEmr(tf.test.TestCase): @@ -32,14 +31,37 @@ def tearDown(self): if self._success: shutil.rmtree(self._test_dir) + def _load_config_for_test(self, config_path, total_steps=50): + pipeline_config = config_util.get_configs_from_pipeline_file(config_path) + pipeline_config.train_config.train_distribute = 0 + pipeline_config.train_config.sync_replicas = False + + pipeline_config.datahub_train_input.akId = odps_oss_config.dh_id + pipeline_config.datahub_train_input.akSecret = odps_oss_config.dh_key + pipeline_config.datahub_train_input.endpoint = odps_oss_config.dh_endpoint + pipeline_config.datahub_train_input.project = odps_oss_config.dh_project + pipeline_config.datahub_train_input.topic = odps_oss_config.dh_topic + + pipeline_config.datahub_eval_input.akId = odps_oss_config.dh_id + pipeline_config.datahub_eval_input.akSecret = odps_oss_config.dh_key + pipeline_config.datahub_eval_input.endpoint = odps_oss_config.dh_endpoint + pipeline_config.datahub_eval_input.project = odps_oss_config.dh_project + pipeline_config.datahub_eval_input.topic = odps_oss_config.dh_topic + return pipeline_config + def test_datahub_train_eval(self): - end = ['deep_fm/drop_table.sql'] - odps_cmd = OdpsCommand(odps_oss_config) + config_path = 'samples/dh_script/configs/deepfm.config' + pipeline_config = self._load_config_for_test(config_path) + test_utils.test_single_train_eval( + pipeline_config, self._test_dir, total_steps=10) + self.assertTrue(self._success) - self._success = test_utils.test_datahub_train_eval( - '%s/configs/deepfm.config' % odps_oss_config.temp_dir, odps_oss_config, - self._test_dir) - odps_cmd.run_list(end) + def test_distributed_datahub_train_eval(self): + config_path = 'samples/dh_script/configs/deepfm.config' + pipeline_config = self._load_config_for_test(config_path) + pipeline_config.data_config.chief_redundant = True + test_utils.test_distributed_train_eval( + pipeline_config, self._test_dir, total_steps=10) self.assertTrue(self._success) @@ -47,58 +69,25 @@ def test_datahub_train_eval(self): parser = argparse.ArgumentParser() parser.add_argument( '--odps_config', type=str, default=None, help='odps config path') - parser.add_argument( - '--oss_config', type=str, default=None, help='ossutilconfig path') - parser.add_argument( - '--bucket_name', type=str, default=None, help='test oss bucket name') - parser.add_argument('--arn', type=str, default=None, help='oss rolearn') - parser.add_argument( - '--odpscmd', type=str, default='odpscmd', help='odpscmd path') - parser.add_argument( - '--algo_project', type=str, default=None, help='algo project name') - parser.add_argument( - '--algo_res_project', - type=str, - default=None, - help='algo resource project name') - parser.add_argument( - '--algo_version', type=str, default=None, help='algo version') args, unknown_args = parser.parse_known_args() sys.argv = [sys.argv[0]] for unk_arg in unknown_args: sys.argv.append(unk_arg) - if args.odps_config: - odps_oss_config.load_odps_config(args.odps_config) - os.environ['ODPS_CONFIG_FILE_PATH'] = args.odps_config - if args.oss_config: - odps_oss_config.load_oss_config(args.oss_config) - if args.odpscmd: - odps_oss_config.odpscmd_path = args.odpscmd - if args.algo_project: - odps_oss_config.algo_project = args.algo_project - if args.algo_res_project: - odps_oss_config.algo_res_project = args.algo_res_project - if args.algo_version: - odps_oss_config.algo_version = args.algo_version - if args.arn: - odps_oss_config.arn = args.arn - if args.bucket_name: - odps_oss_config.bucket_name = args.bucket_name - prepare(odps_oss_config) - start = [ - 'deep_fm/create_external_deepfm_table.sql', - 'deep_fm/create_inner_deepfm_table.sql' - ] - end = ['deep_fm/drop_table.sql'] - odps_cmd = OdpsCommand(odps_oss_config) - odps_cmd.run_list(start) - odps_oss_config.init_dh_and_odps() + assert args.odps_config is not None and args.odps_config != '' + odps_oss_config.load_odps_config(args.odps_config) + os.environ['ODPS_CONFIG_FILE_PATH'] = args.odps_config + + shutil.copytree(DATAHUB_TEST_SCRIPT_PATH, odps_oss_config.temp_dir) + logging.info('temp_dir=%s' % odps_oss_config.temp_dir) + for root, dirs, files in os.walk(odps_oss_config.temp_dir): + for file in files: + file_path = os.path.join(root, file) + change_files(odps_oss_config, file_path) + + odps_oss_config.init_datahub() tf.test.main() - # delete oss path - bucket = get_oss_bucket(odps_oss_config.oss_key, odps_oss_config.oss_secret, - odps_oss_config.endpoint, odps_oss_config.bucket_name) - delete_oss_path(bucket, odps_oss_config.exp_dir, odps_oss_config.bucket_name) + # delete tmp shutil.rmtree(odps_oss_config.temp_dir) diff --git a/easy_rec/python/test/local_incr_test.py b/easy_rec/python/test/local_incr_test.py index 152308257..1e509812a 100644 --- a/easy_rec/python/test/local_incr_test.py +++ b/easy_rec/python/test/local_incr_test.py @@ -7,10 +7,12 @@ import unittest import numpy as np +import six import tensorflow as tf from tensorflow.python.platform import gfile from easy_rec.python.inference.predictor import Predictor +from easy_rec.python.utils import config_util from easy_rec.python.utils import numpy_utils from easy_rec.python.utils import test_utils @@ -27,6 +29,11 @@ def setUp(self): if not gfile.IsDirectory(self._log_dir): gfile.MakeDirs(self._log_dir) + def tearDown(self): + test_utils.set_gpu_id(None) + if self._success: + test_utils.clean_up(self._test_dir) + @unittest.skipIf( 'oss_path' not in os.environ or 'oss_endpoint' not in os.environ and 'oss_ak' not in os.environ or @@ -35,6 +42,31 @@ def test_incr_save(self): self._test_incr_save( 'samples/model_config/taobao_fg_incr_save_local.config') + @unittest.skipIf( + 'oss_path' not in os.environ or + 'oss_endpoint' not in os.environ and 'oss_ak' not in os.environ or + 'oss_sk' not in os.environ, 'Only execute when kafka is available') + def test_incr_save_v2(self): + self._test_incr_save( + 'ali_rec_sln_acc_rnk_v2_woid_woorder_seqopt_odl.config', + fg_path='20221225_woid_woorder/fg.json', + test_data_path='odl_sample_feature_test.txt', + targets='logits_is_click,logits_is_go,probs_is_click,probs_is_go', + total_steps=500) + + @unittest.skipIf( + 'oss_path' not in os.environ or + 'oss_endpoint' not in os.environ and 'oss_ak' not in os.environ or + 'oss_sk' not in os.environ, 'Only execute when kafka is available') + def test_incr_save_v3(self): + self._test_incr_save( + 'ali_rec_sln_acc_rnk_v2_woid_woorder_seqopt_odl_v2.config', + fg_path='20221225_woid_woorder/fg.json', + test_data_path='odl_sample_feature_test.txt', + targets='logits_is_click,logits_is_go,probs_is_click,probs_is_go', + total_steps=50, + auto_export=True) + @unittest.skipIf( 'oss_path' not in os.environ or 'oss_endpoint' not in os.environ and 'oss_ak' not in os.environ or @@ -51,37 +83,58 @@ def test_incr_save_share_ev(self): self._test_incr_save( 'samples/model_config/taobao_fg_incr_save_share_ev_local.config') - def _test_incr_save(self, config_path): + def _test_incr_save(self, + config_path, + fg_path='./samples/rtp_fg/fg.json', + test_data_path='data/test/rtp/taobao_test_feature.txt', + targets='logits,probs', + total_steps=100, + auto_export=False): self._success = False + edit_config_json = { + 'train_config.incr_save_config.fs.mount_path': + os.path.join(self._test_dir, 'train/incr_save/') + } + if auto_export: + edit_config_json['export_config.oss_params'] =\ + '--oss_path=%s --oss_ak=%s --oss_sk=%s --oss_endpoint=%s' % ( + os.environ['oss_path'], os.environ['oss_ak'], os.environ['oss_sk'], + os.environ['oss_endpoint']) + edit_config_json['export_config.exporter_type'] = 'large,final' success = test_utils.test_distributed_train_eval( config_path, self._test_dir, - total_steps=100, - edit_config_json={ - 'train_config.incr_save_config.fs.mount_path': - os.path.join(self._test_dir, 'train/incr_save/') - }) + total_steps=total_steps, + edit_config_json=edit_config_json) self.assertTrue(success) - export_cmd = """ - python -m easy_rec.python.export --pipeline_config_path %s/pipeline.config - --export_dir %s/export/sep/ --oss_path=%s --oss_ak=%s --oss_sk=%s --oss_endpoint=%s - --asset_files ./samples/rtp_fg/fg.json - --checkpoint_path %s/train/model.ckpt-0 - """ % (self._test_dir, self._test_dir, os.environ['oss_path'], - os.environ['oss_ak'], os.environ['oss_sk'], - os.environ['oss_endpoint'], self._test_dir) - proc = test_utils.run_cmd(export_cmd, - '%s/log_export_sep.txt' % self._test_dir) - proc.wait() - self.assertTrue(proc.returncode == 0) - files = gfile.Glob(os.path.join(self._test_dir, 'export/sep/[1-9][0-9]*')) - export_sep_dir = files[0] + if not auto_export: + export_cmd = """ + python -m easy_rec.python.export --pipeline_config_path %s/pipeline.config + --export_dir %s/export/sep/ --oss_path=%s --oss_ak=%s --oss_sk=%s + --oss_endpoint=%s --asset_files %s + --checkpoint_path %s/train/model.ckpt-0 + """ % (self._test_dir, self._test_dir, os.environ['oss_path'], + os.environ['oss_ak'], os.environ['oss_sk'], + os.environ['oss_endpoint'], fg_path, self._test_dir) + proc = test_utils.run_cmd(export_cmd, + '%s/log_export_sep.txt' % self._test_dir) + proc.wait() + self.assertTrue(proc.returncode == 0) + files = gfile.Glob(os.path.join(self._test_dir, 'export/sep/[1-9][0-9]*')) + export_sep_dir = files[0] + else: + files = gfile.Glob( + os.path.join(self._test_dir, 'train/export/large/[1-9][0-9]*')) + files.sort() + export_sep_dir = files[-1] predict_cmd = """ python processor/test.py --saved_model_dir %s - --input_path data/test/rtp/taobao_test_feature.txt + --input_path %s --output_path %s/processor.out --test_dir %s - """ % (export_sep_dir, self._test_dir, self._test_dir) + --targets %s + """ % (export_sep_dir, test_data_path, self._test_dir, self._test_dir, + targets) envs = dict(os.environ) envs['PYTHONPATH'] = 'processor/' proc = test_utils.run_cmd( @@ -95,12 +148,17 @@ def _test_incr_save(self, config_path): line_str = line_str.strip() processor_out.append(json.loads(line_str)) + config = config_util.get_configs_from_pipeline_file(config_path) predictor = Predictor(os.path.join(self._test_dir, 'train/export/final/')) - with open('data/test/rtp/taobao_test_feature.txt', 'r') as fin: + + rtp_separator = config.data_config.rtp_separator + if six.PY2: + rtp_separator = rtp_separator.encode('utf-8') + with open(test_data_path, 'r') as fin: inputs = [] for line_str in fin: line_str = line_str.strip() - line_tok = line_str.split(';')[-1] + line_tok = line_str.split(rtp_separator)[-1] line_tok = line_tok.split(chr(2)) inputs.append(line_tok) output_res = predictor.predict(inputs, batch_size=1024) @@ -110,11 +168,13 @@ def _test_incr_save(self, config_path): fout.write( json.dumps(output_res[i], cls=numpy_utils.NumpyEncoder) + '\n') - for i in range(len(output_res)): - val0 = output_res[i]['probs'] - val1 = processor_out[i]['probs'] - diff = np.abs(val0 - val1) - assert diff < 1e-4, 'too much difference[%.6f] >= 1e-4' % diff + for out_key in targets.split(','): + for i in range(len(output_res)): + val0 = output_res[i][out_key] + val1 = processor_out[i][out_key] + diff = np.abs(val0 - val1) + assert diff < 1e-4, 'outputs[%s] row[%d] too much difference[%.6f] >= 1e-4'\ + % (out_key, i, diff) self._success = True diff --git a/easy_rec/python/test/odps_test_util.py b/easy_rec/python/test/odps_test_util.py index 77087f9b4..d9d175b61 100644 --- a/easy_rec/python/test/odps_test_util.py +++ b/easy_rec/python/test/odps_test_util.py @@ -71,7 +71,6 @@ def __init__(self, script_path='./samples/odps_script'): self.odps_endpoint = '' self.dh = None - self.odps = None # default to algo_public self.algo_project = None @@ -115,11 +114,11 @@ def load_odps_config(self, config_path): def clean_topic(self, dh_project): if not dh_project: - logging.error('project is empty .') - topic_names = self.dh.list_topic(dh_project).topic_names - for topic_name in topic_names: - self.clean_subscription(topic_name) - self.dh.delete_topic(dh_project, topic_name) + return + topic_names = self.dh.list_topic(dh_project).topic_names + for topic_name in topic_names: + self.clean_subscription(topic_name) + self.dh.delete_topic(dh_project, topic_name) def clean_project(self): project_names = self.dh.list_project().project_names @@ -149,15 +148,9 @@ def get_input_type(self, input_type): return DhDict.get(input_type) - def init_dh_and_odps(self): + def init_datahub(self): self.dh = DataHub(self.dh_id, self.dh_key, self.dh_endpoint) - self.odps = ODPS(self.dh_id, self.dh_key, self.project_name, - self.odps_endpoint) - self.odpsTable = 'deepfm_train_%s' % self.time_stamp self.clean_project() - read_odps = DataFrame(self.odps.get_table(self.odpsTable)) - col_name = read_odps.schema.names - col_type = [self.get_input_type(str(i)) for i in read_odps.schema.types] try: self.dh.create_project(self.dh_project, comment='EasyRecTest') logging.info('create project success!') @@ -165,22 +158,27 @@ def init_dh_and_odps(self): logging.warning('project %s already exist!' % self.dh_project) except Exception: logging.error(traceback.format_exc()) - record_schema = RecordSchema.from_lists(col_name, col_type) + col_names = ['label', 'features'] + col_types = ['INT32', 'STRING'] + col_types = [self.get_input_type(x) for x in col_types] + record_schema = RecordSchema.from_lists(col_names, col_types) try: + if self.dh_topic in self.dh.list_topic(self.dh_project).topic_names: + self.dh.delete_topic(self.dh_project, self.dh_topic) # project_name, topic_name, shard_count, life_cycle, record_schema, comment self.dh.create_tuple_topic( - self.dh_project, - self.dh_topic, - 7, - 3, - record_schema, + project_name=self.dh_project, + topic_name=self.dh_topic, + shard_count=2, + life_cycle=1, + record_schema=record_schema, comment='EasyRecTest') - logging.info('create tuple topic %s success!' % self.dh_topic) + logging.info('create tuple topic %s succeed' % self.dh_topic) except ResourceExistException: - logging.info('topic %s already exist!' % self.dh_topic) + logging.error('create topic %s failed: %s' % + (self.dh_topic, traceback.format_exc())) except Exception as ex: - logging.error('exception:%s' % str(ex)) - logging.error(traceback.format_exc()) + logging.error('exception: %s %s' % (str(ex), traceback.format_exc())) try: self.dh.wait_shards_ready(self.dh_project, self.dh_topic) logging.info('datahub[%s,%s] shards all ready' % @@ -189,14 +187,22 @@ def init_dh_and_odps(self): if topic_result.record_type != RecordType.TUPLE: logging.error('invalid topic type: %s' % str(topic_result.record_type)) record_schema = topic_result.record_schema - t = self.odps.get_table(self.odpsTable) - with t.open_reader() as reader: + + with open('data/test/dwd_avazu_ctr_deepmodel_10w.csv', 'r') as fin: record_list = [] - for data in reader: - record = TupleRecord(values=data.values, schema=record_schema) + total_cnt = 0 + for line_str in fin: + line_str = line_str.strip() + sep_pos = line_str.find(',') + label = int(line_str[:sep_pos]) + feature = line_str[(sep_pos + 1):] + record = TupleRecord(values=[label, feature], schema=record_schema) record_list.append(record) - for i in range(10): - self.dh.put_records(self.dh_project, self.dh_topic, record_list) + if len(record_list) >= 8192: + self.dh.put_records(self.dh_project, self.dh_topic, record_list) + total_cnt += len(record_list) + if total_cnt > 8192 * 64: + break except Exception as ex: logging.error('exception: %s' % str(ex)) logging.error(traceback.format_exc()) diff --git a/easy_rec/python/tools/predict_and_chk.py b/easy_rec/python/tools/predict_and_chk.py index bc7353f76..e8a4adfae 100644 --- a/easy_rec/python/tools/predict_and_chk.py +++ b/easy_rec/python/tools/predict_and_chk.py @@ -88,7 +88,7 @@ if 'features' in predictor.input_names: feature = args.separator.join(feature) batch_input.append(feature) - output = predictor.predict(batch_input) + output = predictor.predict(batch_input, batch_size=1024) if args.save_path: fout = open(args.save_path, 'w') @@ -100,12 +100,14 @@ logging.info('compare result path: ' + args.cmp_res_path) logging.info('compare key: ' + args.cmp_key) logging.info('tolerance: ' + str(args.tol)) + all_cmp_keys = args.cmp_key.split(',') with open(args.cmp_res_path, 'r') as fin: for line_id, line_str in enumerate(fin): line_str = line_str.strip() line_pred = json.loads(line_str) - assert np.abs( - line_pred[args.cmp_key] - - output[line_id][args.cmp_key]) < args.tol, 'line[%d]: %.8f' % ( - line_id, - np.abs(line_pred[args.cmp_key] - output[line_id][args.cmp_key])) + for cmp_key in all_cmp_keys: + assert np.abs( + line_pred[cmp_key] - output[line_id][cmp_key] + ) < args.tol, 'line[%d]: %.8f %.8f %.8f' % ( + line_id, np.abs(line_pred[cmp_key] - output[line_id][cmp_key]), + line_pred[cmp_key], output[line_id][cmp_key]) diff --git a/easy_rec/python/utils/config_util.py b/easy_rec/python/utils/config_util.py index f64455b14..cbf3258c0 100644 --- a/easy_rec/python/utils/config_util.py +++ b/easy_rec/python/utils/config_util.py @@ -5,6 +5,7 @@ Such as Hyper parameter tuning or automatic feature expanding. """ +import argparse import datetime import json import logging @@ -563,3 +564,72 @@ def process_neg_sampler_data_path(pipeline_config): if hasattr(sampler_config, 'hard_neg_edge_input_path'): sampler_config.hard_neg_edge_input_path = process_data_path( sampler_config.hard_neg_edge_input_path, hive_util) + + +def parse_oss_params(extra_params): + if not isinstance(extra_params, list): + extra_params = [x.strip() for x in extra_params.split(' ')] + extra_params = [x for x in extra_params if x != ''] + + parser = argparse.ArgumentParser() + parser.add_argument( + '--oss_path', + type=str, + default=None, + help='write embed objects to oss folder, oss://bucket/folder') + parser.add_argument( + '--oss_endpoint', type=str, default=None, help='oss endpoint') + parser.add_argument('--oss_ak', type=str, default=None, help='oss ak') + parser.add_argument('--oss_sk', type=str, default=None, help='oss sk') + parser.add_argument( + '--oss_threads', + type=int, + default=10, + help='# threads access oss at the same time') + parser.add_argument( + '--oss_timeout', + type=int, + default=10, + help='connect to oss, time_out in seconds') + parser.add_argument( + '--oss_expire', type=int, default=24, help='oss expire time in hours') + parser.add_argument( + '--oss_write_kv', + type=int, + default=1, + help='whether to write embedding to oss') + parser.add_argument( + '--oss_embedding_version', + type=str, + default='', + help='oss embedding version') + parser.add_argument( + '--asset_files', type=str, default='', help='more files to add to asset') + parser.add_argument( + '--verbose', + action='store_true', + default=False, + help='print more debug information') + args = parser.parse_args(extra_params) + + extra_params = {} + if args.oss_path: + extra_params['oss_path'] = args.oss_path + if args.oss_endpoint: + extra_params['oss_endpoint'] = args.oss_endpoint + if args.oss_ak: + extra_params['oss_ak'] = args.oss_ak + if args.oss_sk: + extra_params['oss_sk'] = args.oss_sk + if args.oss_timeout > 0: + extra_params['oss_timeout'] = args.oss_timeout + if args.oss_expire > 0: + extra_params['oss_expire'] = args.oss_expire + if args.oss_threads > 0: + extra_params['oss_threads'] = args.oss_threads + if args.oss_write_kv: + extra_params['oss_write_kv'] = True if args.oss_write_kv == 1 else False + if args.oss_embedding_version: + extra_params['oss_embedding_version'] = args.oss_embedding_version + extra_params['verbose'] = args.verbose + return extra_params diff --git a/easy_rec/python/utils/constant.py b/easy_rec/python/utils/constant.py index 8caecaba8..8e286bc44 100644 --- a/easy_rec/python/utils/constant.py +++ b/easy_rec/python/utils/constant.py @@ -6,3 +6,5 @@ DENSE_UPDATE_VARIABLES = 'DENSE_UPDATE_VARIABLES' SPARSE_UPDATE_VARIABLES = 'SPARSE_UPDATE_VARIABLES' + +EMBED_NAME_TO_IDS_FILE = 'embed_name_to_ids.txt' diff --git a/easy_rec/python/utils/embedding_utils.py b/easy_rec/python/utils/embedding_utils.py index 799376310..7761cfeab 100644 --- a/easy_rec/python/utils/embedding_utils.py +++ b/easy_rec/python/utils/embedding_utils.py @@ -2,6 +2,8 @@ # Copyright (c) Alibaba, Inc. and its affiliates. import tensorflow as tf from tensorflow.python.framework import ops +from tensorflow.python.ops.variables import global_variables +from tensorflow.python.platform import gfile from easy_rec.python.utils import constant from easy_rec.python.utils import proto_util @@ -17,22 +19,53 @@ def get_norm_name_to_ids(): normalized names to ids mapping. """ norm_name_to_ids = {} - for x in ops.get_collection(constant.SPARSE_UPDATE_VARIABLES): - norm_name, part_id = proto_util.get_norm_embed_name(x[0].name) - norm_name_to_ids[norm_name] = 1 - + for x in global_variables(): + if 'EmbeddingVariable' in str(type(x)): + norm_name, _ = proto_util.get_norm_embed_name(x.name) + elif '/embedding_weights:' in x.name or '/embedding_weights/part_' in x.name: + norm_name, _ = proto_util.get_norm_embed_name(x.name) + norm_name_to_ids[norm_name] = 1 for tid, t in enumerate(norm_name_to_ids.keys()): norm_name_to_ids[t] = str(tid) return norm_name_to_ids -def get_sparse_name_to_ids(): +def save_norm_name_to_ids(save_path, norm_name_to_ids): + """Save normalize name to ids mapping. + + Args: + save_path: save path. + norm_name_to_ids: dict, map normalized name to ids. + """ + with gfile.GFile(save_path, 'w') as fout: + for k in norm_name_to_ids: + fout.write('%s\t%s\n' % (k, norm_name_to_ids[k])) + + +def load_norm_name_to_ids(save_path): + """Load normalize name to ids mapping from file. + + Args: + save_path: file path. + + Return: + dict, map normalized name to ids. + """ + norm_name_to_ids = {} + with gfile.GFile(save_path, 'r') as fin: + for line_str in fin: + line_str = line_str.strip() + k, v = line_str.split('\t') + norm_name_to_ids[k] = int(v) + return norm_name_to_ids + + +def get_sparse_name_to_ids(norm_name_to_ids): """Get embedding variable(including kv variables) name to ids mapping. Return: - variable names to ids mappping. + dict, normalized variable names to ids mappping. """ - norm_name_to_ids = get_norm_name_to_ids() name_to_ids = {} for x in ops.get_collection(constant.SPARSE_UPDATE_VARIABLES): norm_name, _ = proto_util.get_norm_embed_name(x[0].name) @@ -41,6 +74,11 @@ def get_sparse_name_to_ids(): def get_dense_name_to_ids(): + """Get dense variable(embedding excluded) name to ids mapping. + + Return: + dict, dense variable names to ids mapping. + """ dense_train_vars = ops.get_collection(constant.DENSE_UPDATE_VARIABLES) norm_name_to_ids = {} for tid, x in enumerate(dense_train_vars): diff --git a/easy_rec/python/utils/estimator_utils.py b/easy_rec/python/utils/estimator_utils.py index ee764777e..df3bab134 100644 --- a/easy_rec/python/utils/estimator_utils.py +++ b/easy_rec/python/utils/estimator_utils.py @@ -348,7 +348,13 @@ def __init__(self, logging.info('KAFKA_MAX_MSG_SIZE: %d' % self._kafka_max_msg_size) self._dense_name_to_ids = embedding_utils.get_dense_name_to_ids() - self._sparse_name_to_ids = embedding_utils.get_sparse_name_to_ids() + norm_name_to_ids = embedding_utils.get_norm_name_to_ids() + self._sparse_name_to_ids = embedding_utils.get_sparse_name_to_ids( + norm_name_to_ids) + embed_name_to_id_file = os.path.join(checkpoint_dir, + constant.EMBED_NAME_TO_IDS_FILE) + embedding_utils.save_norm_name_to_ids(embed_name_to_id_file, + norm_name_to_ids) with gfile.GFile( os.path.join(checkpoint_dir, constant.DENSE_UPDATE_VARIABLES), @@ -549,6 +555,8 @@ def _send_sparse(self, global_step, session): if 'EmbeddingVariable' not in str(type(tmp_var)): if tmp_var._save_slice_info is not None: tmp_key += tmp_var._save_slice_info.var_offset[0] + if tmp_key.dtype in [np.int32, np.uint32]: + tmp_key = np.array(tmp_key, dtype=np.int64) bytes_buf += tmp_key.tobytes() bytes_buf += tmp_val.tobytes() if self._kafka_producer is not None: @@ -598,6 +606,15 @@ def after_run(self, run_context, run_values): self._sparse_timer.update_last_triggered_step(global_step) self._send_sparse(global_step, run_context.session) + def _clean_old_offset_files(self): + if self._data_offset_var is not None: + for offset_file in gfile.Glob( + os.path.join(self._checkpoint_dir, 'model.ckpt-*.offset')): + index_file = offset_file[:-6] + 'index' + if not gfile.Exists(index_file): + gfile.Remove(offset_file) + logging.info('clean old offset file: %s' % offset_file) + def _save(self, session, step): """Saves the latest checkpoint, returns should_stop.""" logging.info('Saving checkpoints for %d into %s.', step, self._save_path) @@ -612,6 +629,14 @@ def _save(self, session, step): if x: data_offset_json.update(json.loads(x)) save_dir, _ = os.path.split(self._save_path) + # reuse existing offsets + if len(data_offset_json) == 0: + ckpt_path = latest_checkpoint(save_dir) + if ckpt_path is not None: + ckpt_path += '.offset' + if gfile.Exists(ckpt_path): + with gfile.GFile(ckpt_path, 'r') as fin: + data_offset_json = json.load(fin) save_offset_path = os.path.join(save_dir, 'model.ckpt-%d.offset' % step) with gfile.GFile(save_offset_path, 'w') as fout: json.dump(data_offset_json, fout) @@ -627,6 +652,8 @@ def _save(self, session, step): status=tf.SessionLog.CHECKPOINT, checkpoint_path=self._save_path), step) + self._clean_old_offset_files() + should_stop = False for l in self._listeners: # noqa: E741 if l.after_save(session, step): @@ -895,7 +922,7 @@ def latest_checkpoint(model_dir): model_path: xx/model.ckpt-2000 """ try: - ckpt_metas = gfile.Glob(os.path.join(model_dir, 'model.ckpt-*.meta')) + ckpt_metas = gfile.Glob(os.path.join(model_dir, 'model.ckpt-*.index')) if len(ckpt_metas) == 0: return None @@ -954,6 +981,17 @@ def is_chief(): return True +def is_first_worker(): + if 'TF_CONFIG' in os.environ: + tf_config = json.loads(os.environ['TF_CONFIG']) + if 'worker' in tf_config['cluster']: + return tf_config['task']['type'] == 'worker' and tf_config['task'][ + 'index'] == 0 + else: + return tf_config['task']['type'] in ['chief', 'master'] + return True + + def is_master(): if 'TF_CONFIG' in os.environ: tf_config = json.loads(os.environ['TF_CONFIG']) diff --git a/easy_rec/python/utils/export_big_model.py b/easy_rec/python/utils/export_big_model.py index 6d074581a..b9a403221 100644 --- a/easy_rec/python/utils/export_big_model.py +++ b/easy_rec/python/utils/export_big_model.py @@ -12,10 +12,7 @@ from tensorflow.core.protobuf import config_pb2 from tensorflow.python.framework import ops from tensorflow.python.ops.variables import global_variables -from tensorflow.python.platform.gfile import DeleteRecursively -from tensorflow.python.platform.gfile import Exists -from tensorflow.python.platform.gfile import GFile -from tensorflow.python.platform.gfile import Remove +from tensorflow.python.platform import gfile from tensorflow.python.saved_model import signature_constants from tensorflow.python.training.device_setter import replica_device_setter from tensorflow.python.training.monitored_session import ChiefSessionCreator @@ -24,6 +21,7 @@ import easy_rec from easy_rec.python.utils import constant +from easy_rec.python.utils import embedding_utils from easy_rec.python.utils import estimator_utils from easy_rec.python.utils import io_util from easy_rec.python.utils import proto_util @@ -41,7 +39,8 @@ def export_big_model(export_dir, pipeline_config, redis_params, - serving_input_fn, estimator, checkpoint_path, verbose): + serving_input_fn, estimator, checkpoint_path, tf_config, + verbose): for key in redis_params: logging.info('%s: %s' % (key, redis_params[key])) @@ -70,7 +69,7 @@ def export_big_model(export_dir, pipeline_config, redis_params, server = None cluster = None - if 'TF_CONFIG' in os.environ: + if tf_config is not None: # change chief to master tf_config = estimator_utils.chief_to_master() if tf_config['task']['type'] == 'ps': @@ -252,7 +251,7 @@ def export_big_model(export_dir, pipeline_config, redis_params, graph = tf.get_default_graph() embed_name_to_id_file = os.path.join(export_dir, 'embed_name_to_ids.txt') - with GFile(embed_name_to_id_file, 'w') as fout: + with gfile.GFile(embed_name_to_id_file, 'w') as fout: for tmp_norm_name in norm_name_to_ids: fout.write('%s\t%s\n' % (tmp_norm_name, norm_name_to_ids[tmp_norm_name])) ops.add_to_collection( @@ -263,9 +262,9 @@ def export_big_model(export_dir, pipeline_config, redis_params, export_dir = os.path.join(export_dir, meta_graph_def.meta_info_def.meta_graph_version) export_dir = io_util.fix_oss_dir(export_dir) - if Exists(export_dir): + if gfile.Exists(export_dir): logging.info('will delete old dir: %s' % export_dir) - DeleteRecursively(export_dir) + gfile.DeleteRecursively(export_dir) builder = tf.saved_model.builder.SavedModelBuilder(export_dir) tensor_info_inputs = {} @@ -303,17 +302,18 @@ def export_big_model(export_dir, pipeline_config, redis_params, builder.save() # remove temporary files - Remove(embed_name_to_id_file) + gfile.Remove(embed_name_to_id_file) return def export_big_model_to_oss(export_dir, pipeline_config, oss_params, serving_input_fn, estimator, checkpoint_path, - verbose): + tf_config, verbose): for key in oss_params: - logging.info('%s: %s' % (key, oss_params[key])) + logging.info('oss_params: %s=%s' % (key, oss_params[key])) write_kv_lib_path = os.path.join(easy_rec.ops_dir, 'libembed_op.so') + logging.info('write_kv_lib_path=%s' % write_kv_lib_path) kv_module = tf.load_op_library(write_kv_lib_path) if not checkpoint_path: @@ -323,7 +323,7 @@ def export_big_model_to_oss(export_dir, pipeline_config, oss_params, server = None cluster = None - if 'TF_CONFIG' in os.environ: + if tf_config is not None: # change chief to master tf_config = estimator_utils.chief_to_master() if tf_config['task']['type'] == 'ps': @@ -338,291 +338,298 @@ def export_big_model_to_oss(export_dir, pipeline_config, oss_params, server_target = server.target logging.info('server_target = %s' % server_target) - serving_input = serving_input_fn() - features = serving_input.features - inputs = serving_input.receiver_tensors - - if cluster: - logging.info('cluster = ' + str(cluster)) - with tf.device( - replica_device_setter( - worker_device='/job:master/task:0', cluster=cluster)): - outputs = estimator._export_model_fn(features, None, None, - estimator.params).predictions - - meta_graph_def = export_meta_graph() - meta_graph_def.meta_info_def.meta_graph_version = str(int(time.time())) - oss_embedding_version = oss_params.get('oss_embedding_version', '') - if not oss_embedding_version: - meta_graph_def.meta_info_def.meta_graph_version =\ - str(int(time.time())) - else: - meta_graph_def.meta_info_def.meta_graph_version = oss_embedding_version - - logging.info('meta_graph_version = %s' % - meta_graph_def.meta_info_def.meta_graph_version) - - embed_var_parts = {} - embed_norm_name = {} - embed_spos = {} - # pai embedding variable - embedding_vars = {} - norm_name_to_ids = {} - for x in global_variables(): - tf.logging.info('global var: %s %s %s' % (x.name, str(type(x)), x.device)) - if 'EmbeddingVariable' in str(type(x)): - norm_name, part_id = proto_util.get_norm_embed_name(x.name) - norm_name_to_ids[norm_name] = 1 - tmp_export = x.export() - if x.device not in embedding_vars: - embedding_vars[x.device] = [(norm_name, tmp_export.keys, - tmp_export.values, part_id)] - else: - embedding_vars[x.device].append( - (norm_name, tmp_export.keys, tmp_export.values, part_id)) - elif '/embedding_weights:' in x.name or '/embedding_weights/part_' in x.name: - norm_name, part_id = proto_util.get_norm_embed_name(x.name) - norm_name_to_ids[norm_name] = 1 - embed_norm_name[x] = norm_name - if norm_name not in embed_var_parts: - embed_var_parts[norm_name] = {part_id: x} - else: - embed_var_parts[norm_name][part_id] = x - - for tid, t in enumerate(norm_name_to_ids.keys()): - norm_name_to_ids[t] = str(tid) - - for x in embed_norm_name: - embed_norm_name[x] = norm_name_to_ids[embed_norm_name[x]] - - total_num = 0 - for norm_name in embed_var_parts: - parts = embed_var_parts[norm_name] - spos = 0 - part_ids = list(parts.keys()) - part_ids.sort() - total_num += len(part_ids) - for part_id in part_ids: - embed_spos[parts[part_id]] = spos - spos += parts[part_id].get_shape()[0] - - oss_path = oss_params.get('oss_path', '') - oss_endpoint = oss_params.get('oss_endpoint', '') - oss_ak = oss_params.get('oss_ak', '') - oss_sk = oss_params.get('oss_sk', '') - logging.info('will export to oss: %s %s %s %s', oss_path, oss_endpoint, - oss_ak, oss_sk) - - if oss_params.get('oss_write_kv', ''): - # group embed by devices - per_device_vars = {} - for x in embed_norm_name: - if x.device not in per_device_vars: - per_device_vars[x.device] = [x] - else: - per_device_vars[x.device].append(x) - - all_write_res = [] - for tmp_dev in per_device_vars: - tmp_vars = per_device_vars[tmp_dev] - with tf.device(tmp_dev): - tmp_names = [embed_norm_name[v] for v in tmp_vars] - tmp_spos = [np.array(embed_spos[v], dtype=np.int64) for v in tmp_vars] - write_kv_res = kv_module.oss_write_kv( - tmp_names, - tmp_vars, - tmp_spos, - osspath=oss_path, - endpoint=oss_endpoint, - ak=oss_ak, - sk=oss_sk, - threads=oss_params.get('oss_threads', 5), - timeout=5, - expire=5, - verbose=verbose) - all_write_res.append(write_kv_res) - - for tmp_dev in embedding_vars: - with tf.device(tmp_dev): - tmp_vs = embedding_vars[tmp_dev] - tmp_sparse_names = [norm_name_to_ids[x[0]] for x in tmp_vs] - tmp_sparse_keys = [x[1] for x in tmp_vs] - tmp_sparse_vals = [x[2] for x in tmp_vs] - tmp_part_ids = [x[3] for x in tmp_vs] - write_sparse_kv_res = kv_module.oss_write_sparse_kv( - tmp_sparse_names, - tmp_sparse_vals, - tmp_sparse_keys, - tmp_part_ids, - osspath=oss_path, - endpoint=oss_endpoint, - ak=oss_ak, - sk=oss_sk, - version=meta_graph_def.meta_info_def.meta_graph_version, - threads=oss_params.get('oss_threads', 5), - verbose=verbose) - all_write_res.append(write_sparse_kv_res) - - session_config = ConfigProto( - allow_soft_placement=True, log_device_placement=False) - chief_sess_creator = ChiefSessionCreator( - master=server.target if server else '', - checkpoint_filename_with_path=checkpoint_path, - config=session_config) - with tf.train.MonitoredSession( - session_creator=chief_sess_creator, - hooks=None, - stop_grace_period_secs=120) as sess: - dump_flags = sess.run(all_write_res) - logging.info('write embedding to oss succeed: %s' % str(dump_flags)) - else: - logging.info('will skip write embedding to oss because ' - 'oss_write_kv is set to 0.') - - # delete embedding_weights collections so that it could be re imported - tmp_drop = [] - for k in meta_graph_def.collection_def: - v = meta_graph_def.collection_def[k] - if len( - v.node_list.value) > 0 and 'embedding_weights' in v.node_list.value[0]: - tmp_drop.append(k) - for k in tmp_drop: - meta_graph_def.collection_def.pop(k) - - meta_graph_editor = MetaGraphEditor( - os.path.join(easy_rec.ops_dir, 'libembed_op.so'), - None, - oss_path=oss_path, - oss_endpoint=oss_endpoint, - oss_ak=oss_ak, - oss_sk=oss_sk, - oss_timeout=oss_params.get('oss_timeout', 1500), - meta_graph_def=meta_graph_def, - norm_name_to_ids=norm_name_to_ids, - incr_update_params=oss_params.get('incr_update', None), - debug_dir=export_dir if verbose else '') - meta_graph_editor.edit_graph_for_oss() - tf.reset_default_graph() - - saver = tf.train.import_meta_graph(meta_graph_editor._meta_graph_def) - graph = tf.get_default_graph() - - embed_name_to_id_file = os.path.join(export_dir, 'embed_name_to_ids.txt') - with GFile(embed_name_to_id_file, 'w') as fout: - for tmp_norm_name in norm_name_to_ids: - fout.write('%s\t%s\n' % (tmp_norm_name, norm_name_to_ids[tmp_norm_name])) - ops.add_to_collection( - ops.GraphKeys.ASSET_FILEPATHS, - tf.constant( - embed_name_to_id_file, dtype=tf.string, name='embed_name_to_ids.txt')) + with ops.Graph().as_default(): + serving_input = serving_input_fn() + features = serving_input.features + inputs = serving_input.receiver_tensors + + if cluster: + logging.info('cluster = ' + str(cluster)) + with tf.device( + replica_device_setter( + worker_device='/job:master/task:0', cluster=cluster)): + outputs = estimator._export_model_fn(features, None, None, + estimator.params).predictions + + meta_graph_def = export_meta_graph() + meta_graph_def.meta_info_def.meta_graph_version = str(int(time.time())) + oss_embedding_version = oss_params.get('oss_embedding_version', '') + if not oss_embedding_version: + meta_graph_def.meta_info_def.meta_graph_version =\ + str(int(time.time())) + else: + meta_graph_def.meta_info_def.meta_graph_version = oss_embedding_version + + logging.info('meta_graph_version = %s' % + meta_graph_def.meta_info_def.meta_graph_version) + + if 'incr_update' in oss_params: + embed_name_to_id_file = os.path.join( + os.path.dirname(checkpoint_path), constant.EMBED_NAME_TO_IDS_FILE) + norm_name_to_ids = embedding_utils.load_norm_name_to_ids( + embed_name_to_id_file) + else: + norm_name_to_ids = embedding_utils.get_norm_name_to_ids() + embed_name_to_id_file = os.path.join(export_dir, + constant.EMBED_NAME_TO_IDS_FILE) + embedding_utils.save_norm_name_to_ids(embed_name_to_id_file, + norm_name_to_ids) + + embed_var_parts = {} + embed_norm_name = {} + embed_spos = {} + # pai embedding variable + embedding_vars = {} + for x in global_variables(): + logging.info('global var: %s %s[device=%s]' % + (x.name, str(type(x)), x.device)) + if 'EmbeddingVariable' in str(type(x)): + norm_name, part_id = proto_util.get_norm_embed_name(x.name) + tmp_export = x.export() + if x.device not in embedding_vars: + embedding_vars[x.device] = [(norm_name, tmp_export.keys, + tmp_export.values, part_id)] + else: + embedding_vars[x.device].append( + (norm_name, tmp_export.keys, tmp_export.values, part_id)) + elif '/embedding_weights:' in x.name or '/embedding_weights/part_' in x.name: + norm_name, part_id = proto_util.get_norm_embed_name(x.name) + embed_norm_name[x] = norm_name_to_ids[norm_name] + if norm_name not in embed_var_parts: + embed_var_parts[norm_name] = {part_id: x} + else: + embed_var_parts[norm_name][part_id] = x + + total_num = 0 + for norm_name in embed_var_parts: + parts = embed_var_parts[norm_name] + spos = 0 + part_ids = list(parts.keys()) + part_ids.sort() + total_num += len(part_ids) + for part_id in part_ids: + embed_spos[parts[part_id]] = spos + spos += parts[part_id].get_shape()[0] + + oss_path = oss_params.get('oss_path', '') + oss_endpoint = oss_params.get('oss_endpoint', '') + oss_ak = oss_params.get('oss_ak', '') + oss_sk = oss_params.get('oss_sk', '') + logging.info('will export to oss: %s %s %s %s', oss_path, oss_endpoint, + oss_ak, oss_sk) + + if oss_params.get('oss_write_kv', ''): + # group embed by devices + per_device_vars = {} + for x in embed_norm_name: + if x.device not in per_device_vars: + per_device_vars[x.device] = [x] + else: + per_device_vars[x.device].append(x) + + all_write_res = [] + for tmp_dev in per_device_vars: + tmp_vars = per_device_vars[tmp_dev] + with tf.device(tmp_dev): + tmp_names = [str(embed_norm_name[v]) for v in tmp_vars] + tmp_spos = [np.array(embed_spos[v], dtype=np.int64) for v in tmp_vars] + write_kv_res = kv_module.oss_write_kv( + tmp_names, + tmp_vars, + tmp_spos, + osspath=oss_path, + endpoint=oss_endpoint, + ak=oss_ak, + sk=oss_sk, + threads=oss_params.get('oss_threads', 5), + timeout=5, + expire=5, + verbose=verbose) + all_write_res.append(write_kv_res) + + for tmp_dev in embedding_vars: + with tf.device(tmp_dev): + tmp_vs = embedding_vars[tmp_dev] + tmp_sparse_names = [str(norm_name_to_ids[x[0]]) for x in tmp_vs] + tmp_sparse_keys = [x[1] for x in tmp_vs] + tmp_sparse_vals = [x[2] for x in tmp_vs] + tmp_part_ids = [x[3] for x in tmp_vs] + write_sparse_kv_res = kv_module.oss_write_sparse_kv( + tmp_sparse_names, + tmp_sparse_vals, + tmp_sparse_keys, + tmp_part_ids, + osspath=oss_path, + endpoint=oss_endpoint, + ak=oss_ak, + sk=oss_sk, + version=meta_graph_def.meta_info_def.meta_graph_version, + threads=oss_params.get('oss_threads', 5), + verbose=verbose) + all_write_res.append(write_sparse_kv_res) + + session_config = ConfigProto( + allow_soft_placement=True, log_device_placement=False) + chief_sess_creator = ChiefSessionCreator( + master=server.target if server else '', + checkpoint_filename_with_path=checkpoint_path, + config=session_config) + with tf.train.MonitoredSession( + session_creator=chief_sess_creator, + hooks=None, + stop_grace_period_secs=120) as sess: + dump_flags = sess.run(all_write_res) + logging.info('write embedding to oss succeed: %s' % str(dump_flags)) + else: + logging.info('will skip write embedding to oss because ' + 'oss_write_kv is set to 0.') + + # delete embedding_weights collections so that it could be re imported + tmp_drop = [] + for k in meta_graph_def.collection_def: + v = meta_graph_def.collection_def[k] + if len(v.node_list.value + ) > 0 and 'embedding_weights' in v.node_list.value[0]: + tmp_drop.append(k) + for k in tmp_drop: + meta_graph_def.collection_def.pop(k) + + with ops.Graph().as_default(): + meta_graph_editor = MetaGraphEditor( + os.path.join(easy_rec.ops_dir, 'libembed_op.so'), + None, + oss_path=oss_path, + oss_endpoint=oss_endpoint, + oss_ak=oss_ak, + oss_sk=oss_sk, + oss_timeout=oss_params.get('oss_timeout', 1500), + meta_graph_def=meta_graph_def, + norm_name_to_ids=norm_name_to_ids, + incr_update_params=oss_params.get('incr_update', None), + debug_dir=export_dir if verbose else '') + meta_graph_editor.edit_graph_for_oss() + + with ops.Graph().as_default(): + saver = tf.train.import_meta_graph(meta_graph_editor._meta_graph_def) + graph = tf.get_default_graph() - if 'incr_update' in oss_params: - dense_train_vars_path = os.path.join( - os.path.dirname(checkpoint_path), constant.DENSE_UPDATE_VARIABLES) ops.add_to_collection( ops.GraphKeys.ASSET_FILEPATHS, tf.constant( - dense_train_vars_path, + embed_name_to_id_file, dtype=tf.string, - name=constant.DENSE_UPDATE_VARIABLES)) - - asset_file = 'incr_update.txt' - asset_file_path = os.path.join(export_dir, asset_file) - with GFile(asset_file_path, 'w') as fout: - incr_update = oss_params['incr_update'] - incr_update_json = {} - if 'kafka' in incr_update: - incr_update_json['storage'] = 'kafka' - incr_update_json['kafka'] = json.loads( - json_format.MessageToJson( - incr_update['kafka'], preserving_proto_field_name=True)) - elif 'datahub' in incr_update: - incr_update_json['storage'] = 'datahub' - incr_update_json['datahub'] = json.loads( - json_format.MessageToJson( - incr_update['datahub'], preserving_proto_field_name=True)) - elif 'fs' in incr_update: - incr_update_json['storage'] = 'fs' - incr_update_json['fs'] = {'incr_save_dir': incr_update['fs'].mount_path} - json.dump(incr_update_json, fout, indent=2) - - ops.add_to_collection( - ops.GraphKeys.ASSET_FILEPATHS, - tf.constant(asset_file_path, dtype=tf.string, name=asset_file)) - - export_dir = os.path.join(export_dir, - meta_graph_def.meta_info_def.meta_graph_version) - export_dir = io_util.fix_oss_dir(export_dir) - if Exists(export_dir): - logging.info('will delete old dir: %s' % export_dir) - DeleteRecursively(export_dir) - - builder = tf.saved_model.builder.SavedModelBuilder(export_dir) - tensor_info_inputs = {} - for tmp_key in inputs: - tmp = graph.get_tensor_by_name(inputs[tmp_key].name) - tensor_info_inputs[tmp_key] = \ - tf.saved_model.utils.build_tensor_info(tmp) - - tensor_info_outputs = {} - for tmp_key in outputs: - tmp = graph.get_tensor_by_name(outputs[tmp_key].name) - tensor_info_outputs[tmp_key] = \ - tf.saved_model.utils.build_tensor_info(tmp) - signature = ( - tf.saved_model.signature_def_utils.build_signature_def( - inputs=tensor_info_inputs, - outputs=tensor_info_outputs, - method_name=signature_constants.PREDICT_METHOD_NAME)) - - if 'incr_update' in oss_params: - incr_update_inputs = meta_graph_editor.sparse_update_inputs - incr_update_outputs = meta_graph_editor.sparse_update_outputs - incr_update_inputs.update(meta_graph_editor.dense_update_inputs) - incr_update_outputs.update(meta_graph_editor.dense_update_outputs) - tensor_info_incr_update_inputs = {} - tensor_info_incr_update_outputs = {} - for tmp_key in incr_update_inputs: - tmp = graph.get_tensor_by_name(incr_update_inputs[tmp_key].name) - tensor_info_incr_update_inputs[tmp_key] = \ + name=constant.EMBED_NAME_TO_IDS_FILE)) + + if 'incr_update' in oss_params: + dense_train_vars_path = os.path.join( + os.path.dirname(checkpoint_path), constant.DENSE_UPDATE_VARIABLES) + ops.add_to_collection( + ops.GraphKeys.ASSET_FILEPATHS, + tf.constant( + dense_train_vars_path, + dtype=tf.string, + name=constant.DENSE_UPDATE_VARIABLES)) + + global_step = int(checkpoint_path.split('-')[-1]) + asset_file = 'incr_update.txt' + asset_file_path = os.path.join(estimator.model_dir, asset_file) + with gfile.GFile(asset_file_path, 'w') as fout: + incr_update = oss_params['incr_update'] + incr_update_json = {} + if 'kafka' in incr_update: + incr_update_json['storage'] = 'kafka' + incr_update_json['kafka'] = json.loads( + json_format.MessageToJson( + incr_update['kafka'], preserving_proto_field_name=True)) + elif 'datahub' in incr_update: + incr_update_json['storage'] = 'datahub' + incr_update_json['datahub'] = json.loads( + json_format.MessageToJson( + incr_update['datahub'], preserving_proto_field_name=True)) + elif 'fs' in incr_update: + incr_update_json['storage'] = 'fs' + incr_update_json['fs'] = { + 'incr_save_dir': incr_update['fs'].mount_path + } + # there is no need to update dense from old steps + incr_update_json['dense_step'] = global_step + incr_update_json['sparse_step'] = max(global_step - 100, 0) + json.dump(incr_update_json, fout, indent=2) + + ops.add_to_collection( + ops.GraphKeys.ASSET_FILEPATHS, + tf.constant(asset_file_path, dtype=tf.string, name=asset_file)) + + export_dir = os.path.join(export_dir, + meta_graph_def.meta_info_def.meta_graph_version) + export_dir = io_util.fix_oss_dir(export_dir) + if gfile.Exists(export_dir): + logging.info('will delete old dir: %s' % export_dir) + gfile.DeleteRecursively(export_dir) + + builder = tf.saved_model.builder.SavedModelBuilder(export_dir) + tensor_info_inputs = {} + for tmp_key in inputs: + tmp = graph.get_tensor_by_name(inputs[tmp_key].name) + tensor_info_inputs[tmp_key] = \ tf.saved_model.utils.build_tensor_info(tmp) - for tmp_key in incr_update_outputs: - tmp = graph.get_tensor_by_name(incr_update_outputs[tmp_key].name) - tensor_info_incr_update_outputs[tmp_key] = \ + + tensor_info_outputs = {} + for tmp_key in outputs: + tmp = graph.get_tensor_by_name(outputs[tmp_key].name) + tensor_info_outputs[tmp_key] = \ tf.saved_model.utils.build_tensor_info(tmp) - incr_update_signature = ( + signature = ( tf.saved_model.signature_def_utils.build_signature_def( - inputs=tensor_info_incr_update_inputs, - outputs=tensor_info_incr_update_outputs, + inputs=tensor_info_inputs, + outputs=tensor_info_outputs, method_name=signature_constants.PREDICT_METHOD_NAME)) - else: - incr_update_signature = None - session_config = ConfigProto( - allow_soft_placement=True, log_device_placement=True) - - saver = tf.train.Saver() - with tf.Session(target=server.target if server else '') as sess: - saver.restore(sess, checkpoint_path) - main_op = tf.group([ - Scaffold.default_local_init_op(), - ops.get_collection(EMBEDDING_INITIALIZERS) - ]) - incr_update_sig_map = { - signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature - } - if incr_update_signature is not None: - incr_update_sig_map[INCR_UPDATE_SIGNATURE_KEY] = incr_update_signature - builder.add_meta_graph_and_variables( - sess, [tf.saved_model.tag_constants.SERVING], - signature_def_map=incr_update_sig_map, - assets_collection=ops.get_collection(ops.GraphKeys.ASSET_FILEPATHS), - saver=saver, - main_op=main_op, - strip_default_attrs=True, - clear_devices=True) - builder.save() + if 'incr_update' in oss_params: + incr_update_inputs = meta_graph_editor.sparse_update_inputs + incr_update_outputs = meta_graph_editor.sparse_update_outputs + incr_update_inputs.update(meta_graph_editor.dense_update_inputs) + incr_update_outputs.update(meta_graph_editor.dense_update_outputs) + tensor_info_incr_update_inputs = {} + tensor_info_incr_update_outputs = {} + for tmp_key in incr_update_inputs: + tmp = graph.get_tensor_by_name(incr_update_inputs[tmp_key].name) + tensor_info_incr_update_inputs[tmp_key] = \ + tf.saved_model.utils.build_tensor_info(tmp) + for tmp_key in incr_update_outputs: + tmp = graph.get_tensor_by_name(incr_update_outputs[tmp_key].name) + tensor_info_incr_update_outputs[tmp_key] = \ + tf.saved_model.utils.build_tensor_info(tmp) + incr_update_signature = ( + tf.saved_model.signature_def_utils.build_signature_def( + inputs=tensor_info_incr_update_inputs, + outputs=tensor_info_incr_update_outputs, + method_name=signature_constants.PREDICT_METHOD_NAME)) + else: + incr_update_signature = None - # remove temporary files - Remove(embed_name_to_id_file) - return + session_config = ConfigProto( + allow_soft_placement=True, log_device_placement=True) + + saver = tf.train.Saver() + with tf.Session(target=server.target if server else '') as sess: + saver.restore(sess, checkpoint_path) + main_op = tf.group([ + Scaffold.default_local_init_op(), + ops.get_collection(EMBEDDING_INITIALIZERS) + ]) + incr_update_sig_map = { + signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature + } + if incr_update_signature is not None: + incr_update_sig_map[INCR_UPDATE_SIGNATURE_KEY] = incr_update_signature + builder.add_meta_graph_and_variables( + sess, [tf.saved_model.tag_constants.SERVING], + signature_def_map=incr_update_sig_map, + assets_collection=ops.get_collection(ops.GraphKeys.ASSET_FILEPATHS), + saver=saver, + main_op=main_op, + strip_default_attrs=True, + clear_devices=True) + save_path = builder.save() + return save_path diff --git a/easy_rec/python/utils/meta_graph_editor.py b/easy_rec/python/utils/meta_graph_editor.py index 34362fcd1..afe75d087 100644 --- a/easy_rec/python/utils/meta_graph_editor.py +++ b/easy_rec/python/utils/meta_graph_editor.py @@ -46,7 +46,6 @@ def __init__(self, meta_graph_def = loader.get_meta_graph_def_from_tags(tags) else: assert meta_graph_def, 'either saved_model_dir or meta_graph_def must be set' - tf.reset_default_graph() tf.train.import_meta_graph(meta_graph_def) self._meta_graph_version = meta_graph_def.meta_info_def.meta_graph_version self._signature_def = meta_graph_def.signature_def[ @@ -452,6 +451,10 @@ def add_oss_lookup_op(self, lookup_input_indices, lookup_input_values, uniq_embed_is_kvs[embed_id] = embed_is_kv uniq_embed_dims[embed_id] = embed_dim + for embed_id, embed_dim in zip(uniq_embed_ids, uniq_embed_dims): + assert embed_dim > 0, 'invalid embed_id=%s embed_dim=%d' % (embed_id, + embed_dim) + lookup_init_op = self._lookup_op.oss_init( osspath=self._oss_path, endpoint=self._oss_endpoint, @@ -480,6 +483,7 @@ def add_oss_lookup_op(self, lookup_input_indices, lookup_input_values, # dense variables are updated one by one dense_name_to_ids = embedding_utils.get_dense_name_to_ids() + logging.info('dense_name_to_ids len=%d' % len(dense_name_to_ids)) for x in ops.get_collection(constant.DENSE_UPDATE_VARIABLES): dense_var_id = dense_name_to_ids[x.op.name] dense_input_name = 'incr_update/dense/%d/input' % dense_var_id diff --git a/easy_rec/python/utils/test_utils.py b/easy_rec/python/utils/test_utils.py index 4924a4442..2817ff15b 100644 --- a/easy_rec/python/utils/test_utils.py +++ b/easy_rec/python/utils/test_utils.py @@ -152,8 +152,12 @@ def _replace_data_for_test(data_path): def _load_config_for_test(pipeline_config_path, test_dir, total_steps=50): - pipeline_config = config_util.get_configs_from_pipeline_file( - pipeline_config_path) + if not isinstance(pipeline_config_path, EasyRecConfig): + pipeline_config = config_util.get_configs_from_pipeline_file( + pipeline_config_path) + else: + pipeline_config = pipeline_config_path + train_config = pipeline_config.train_config eval_config = pipeline_config.eval_config data_config = pipeline_config.data_config @@ -175,62 +179,6 @@ def _load_config_for_distribute_eval(pipeline_config_path, test_dir): return pipeline_config -def test_datahub_train_eval(pipeline_config_path, - odps_oss_config, - test_dir, - process_pipeline_func=None, - total_steps=50, - post_check_func=None): - gpus = get_available_gpus() - if len(gpus) > 0: - set_gpu_id(gpus[0]) - else: - set_gpu_id(None) - - if not isinstance(pipeline_config_path, EasyRecConfig): - logging.info('testing pipeline config %s' % pipeline_config_path) - if 'TF_CONFIG' in os.environ: - del os.environ['TF_CONFIG'] - - if isinstance(pipeline_config_path, EasyRecConfig): - pipeline_config = pipeline_config_path - else: - pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, - total_steps) - - pipeline_config.train_config.train_distribute = 0 - pipeline_config.train_config.num_gpus_per_worker = 1 - pipeline_config.train_config.sync_replicas = False - - pipeline_config.datahub_train_input.akId = odps_oss_config.dh_id - pipeline_config.datahub_train_input.akSecret = odps_oss_config.dh_key - pipeline_config.datahub_train_input.region = odps_oss_config.dh_endpoint - pipeline_config.datahub_train_input.project = odps_oss_config.dh_project - pipeline_config.datahub_train_input.topic = odps_oss_config.dh_topic - - pipeline_config.datahub_eval_input.akId = odps_oss_config.dh_id - pipeline_config.datahub_eval_input.akSecret = odps_oss_config.dh_key - pipeline_config.datahub_eval_input.region = odps_oss_config.dh_endpoint - pipeline_config.datahub_eval_input.project = odps_oss_config.dh_project - pipeline_config.datahub_eval_input.topic = odps_oss_config.dh_topic - - if process_pipeline_func is not None: - assert callable(process_pipeline_func) - pipeline_config = process_pipeline_func(pipeline_config) - config_util.save_pipeline_config(pipeline_config, test_dir) - test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') - train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % \ - test_pipeline_config_path - proc = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'master')) - proc_wait(proc, timeout=TEST_TIME_OUT) - if proc.returncode != 0: - logging.error('train %s failed' % test_pipeline_config_path) - return False - if post_check_func: - return post_check_func(pipeline_config) - return True - - def _Load_config_for_test_eval(pipeline_config_path): pipeline_config = config_util.get_configs_from_pipeline_file( pipeline_config_path) @@ -257,11 +205,8 @@ def test_single_train_eval(pipeline_config_path, if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] - if isinstance(pipeline_config_path, EasyRecConfig): - pipeline_config = pipeline_config_path - else: - pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, - total_steps) + pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, + total_steps) pipeline_config.train_config.train_distribute = 0 pipeline_config.train_config.num_gpus_per_worker = 1 @@ -386,7 +331,9 @@ def test_hdfs_train_eval(pipeline_config_path, set_gpu_id(gpus[0]) else: set_gpu_id(None) - logging.info('testing pipeline config %s' % pipeline_config_path) + if not isinstance(pipeline_config_path, EasyRecConfig): + logging.info('testing pipeline config %s' % pipeline_config_path) + logging.info('train_yaml_path %s' % train_yaml_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] @@ -633,7 +580,9 @@ def test_distributed_train_eval(pipeline_config_path, total_steps=50, num_evaluator=0, edit_config_json=None): - logging.info('testing pipeline config %s' % pipeline_config_path) + if not isinstance(pipeline_config_path, EasyRecConfig): + logging.info('testing pipeline config %s' % pipeline_config_path) + pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) if edit_config_json is not None: diff --git a/pai_jobs/run.py b/pai_jobs/run.py index 933a6b3e1..5383f2072 100644 --- a/pai_jobs/run.py +++ b/pai_jobs/run.py @@ -220,7 +220,7 @@ def _wait_ckpt(ckpt_path, max_wait_ts): break else: while time.time() - start_ts < max_wait_ts: - if gfile.Exists(ckpt_path + '.index'): + if not gfile.Exists(ckpt_path + '.index'): logging.info('wait for checkpoint[%s]' % ckpt_path) time.sleep(30) else: diff --git a/processor/test.py b/processor/test.py index 0423e7996..c026bf6bf 100644 --- a/processor/test.py +++ b/processor/test.py @@ -10,6 +10,7 @@ import time import numpy as np +import six from google.protobuf import text_format from easy_rec.python.protos import dataset_pb2 @@ -19,7 +20,9 @@ logging.basicConfig( level=logging.INFO, format='[%(asctime)s][%(levelname)s] %(message)s') -PROCESSOR_VERSION = 'LaRec-0.9.5d-b1b1604-TF-2.5.0-Linux' +# PROCESSOR_VERSION = 'LaRec-0.9.5d-b1b1604-TF-2.5.0-Linux' +# PROCESSOR_VERSION = 'LaRec-1.0.0-66214a6-TF-2.5.0-Linux' +PROCESSOR_VERSION = 'LaRec-1.0.0-62a4cd8-TF-2.5.0-Linux' PROCESSOR_FILE = PROCESSOR_VERSION + '.tar.gz' PROCESSOR_URL = 'http://easyrec.oss-cn-beijing.aliyuncs.com/processor/' + PROCESSOR_FILE PROCESSOR_ENTRY_LIB = 'processor/' + PROCESSOR_VERSION + '/larec/libtf_predictor.so' @@ -29,23 +32,31 @@ def build_array_proto(array_proto, data, dtype): array_proto.array_shape.dim.append(len(data)) if dtype == dataset_pb2.DatasetConfig.STRING: - array_proto.string_val.extend([x.encode('utf-8') for x in data]) + if six.PY2: + array_proto.string_val.extend([x for x in data]) + else: + array_proto.string_val.extend([x.encode('utf-8') for x in data]) array_proto.dtype = tf_predict_pb2.DT_STRING + return len(array_proto.string_val) elif dtype == dataset_pb2.DatasetConfig.FLOAT: array_proto.float_val.extend([float(x) for x in data]) array_proto.dtype = tf_predict_pb2.DT_FLOAT + return len(array_proto.float_val) elif dtype == dataset_pb2.DatasetConfig.DOUBLE: array_proto.double_val.extend([float(x) for x in data]) array_proto.dtype = tf_predict_pb2.DT_DOUBLE + return len(array_proto.double_val) elif dtype == dataset_pb2.DatasetConfig.INT32: array_proto.int_val.extend([int(x) for x in data]) array_proto.dtype = tf_predict_pb2.DT_INT32 + return len(array_proto.int_val) elif dtype == dataset_pb2.DatasetConfig.INT64: array_proto.int64_val.extend([np.int64(x) for x in data]) array_proto.dtype = tf_predict_pb2.DT_INT64 + return len(array_proto.int64_val) else: assert False, 'invalid datatype[%s]' % str(dtype) - return array_proto + return len(data) if __name__ == '__main__': @@ -63,6 +74,8 @@ def build_array_proto(array_proto, data, dtype): '--saved_model_dir', type=str, default=None, help='saved model directory') parser.add_argument( '--test_dir', type=str, default=None, help='test directory') + parser.add_argument( + '--targets', type=str, default='logits,probs', help='save targets') args = parser.parse_args() if not os.path.exists(PROCESSOR_ENTRY_LIB): @@ -96,19 +109,24 @@ def build_array_proto(array_proto, data, dtype): for x in data_config.input_fields if x.input_name not in data_config.label_fields] + rtp_separator = data_config.rtp_separator + if six.PY2: + rtp_separator = rtp_separator.encode('utf-8') with open(args.input_path, 'r') as fin: for line_str in fin: line_str = line_str.strip() - line_toks = line_str.split(data_config.rtp_separator)[-1].split(chr(2)) + line_toks = line_str.split(rtp_separator)[-1].split(chr(2)) for i, tok in enumerate(line_toks): input_fields[i].append(tok) req = tf_predict_pb2.PredictRequest() req.signature_name = 'serving_default' + + offset = len(data_config.label_fields) for i in range(len(input_fields)): - build_array_proto(req.inputs[data_config.input_fields[i + 1].input_name], - input_fields[i], - data_config.input_fields[i + 1].input_type) + build_array_proto( + req.inputs[data_config.input_fields[i + offset].input_name], + input_fields[i], data_config.input_fields[i + offset].input_type) tf_predictor = ctypes.cdll.LoadLibrary(PROCESSOR_ENTRY_LIB) tf_predictor.saved_model_init.restype = ctypes.c_void_p @@ -156,11 +174,18 @@ def build_array_proto(array_proto, data, dtype): res = tf_predict_pb2.PredictResponse() res.ParseFromString(res_bytes) + out_keys = args.targets.split(',') + for k in out_keys: + assert k in res.outputs, 'invalid target: %s, all outputs:%s' % \ + (k, ','.join(res.outputs.keys())) + out_vals = [res.outputs[k].float_val for k in out_keys] + ncol = len(out_keys) + nrow = len(out_vals[0]) + logging.info('num_outputs = %d num_samples = %d' % (ncol, nrow)) with open(args.output_path, 'w') as fout: - logits = res.outputs['logits'].float_val - probs = res.outputs['probs'].float_val - for logit, prob in zip(logits, probs): - fout.write(json.dumps({'logits': logit, 'probs': prob}) + '\n') + for i in range(nrow): + one_res = {out_keys[j]: out_vals[j][i] for j in range(ncol)} + fout.write(json.dumps(one_res) + '\n') # free memory tf_predictor.saved_model_release(ctypes.c_void_p(handle)) diff --git a/samples/dh_script/configs/deepfm.config b/samples/dh_script/configs/deepfm.config index 29ed5e8cc..ee357ec5b 100644 --- a/samples/dh_script/configs/deepfm.config +++ b/samples/dh_script/configs/deepfm.config @@ -14,7 +14,8 @@ train_config { } use_moving_average: false } - log_step_count_steps: 200 + log_step_count_steps: 1 + num_steps: 5 sync_replicas: true } @@ -139,6 +140,8 @@ data_config { auto_expand_input_fields: true label_fields: 'label' + separator: ',' + feature_fields: 'features' batch_size: 1024 num_epochs: 10000 prefetch_size: 32 diff --git a/samples/dh_script/deep_fm/create_inner_deepfm_table.sql b/samples/dh_script/deep_fm/create_inner_deepfm_table.sql deleted file mode 100644 index 1b7955013..000000000 --- a/samples/dh_script/deep_fm/create_inner_deepfm_table.sql +++ /dev/null @@ -1,59 +0,0 @@ -drop TABLE IF EXISTS deepfm_train_{TIME_STAMP}; -create table deepfm_train_{TIME_STAMP}( - label BIGINT - ,`hour` string - ,c1 STRING - ,banner_pos STRING - ,site_id STRING - ,site_domain STRING - ,site_category STRING - ,app_id STRING - ,app_domain STRING - ,app_category STRING - ,device_id STRING - ,device_ip STRING - ,device_model STRING - ,device_type STRING - ,device_conn_type STRING - ,c14 STRING - ,c15 STRING - ,c16 STRING - ,c17 STRING - ,c18 STRING - ,c19 STRING - ,c20 STRING - ,c21 STRING -) -; - -tunnel upload {TEST_DATA_DIR}/train_{TIME_STAMP} deepfm_train_{TIME_STAMP}; - -drop TABLE IF EXISTS deepfm_test_{TIME_STAMP}; -create table deepfm_test_{TIME_STAMP}( - label BIGINT - ,`hour` string - ,c1 STRING - ,banner_pos STRING - ,site_id STRING - ,site_domain STRING - ,site_category STRING - ,app_id STRING - ,app_domain STRING - ,app_category STRING - ,device_id STRING - ,device_ip STRING - ,device_model STRING - ,device_type STRING - ,device_conn_type STRING - ,c14 STRING - ,c15 STRING - ,c16 STRING - ,c17 STRING - ,c18 STRING - ,c19 STRING - ,c20 STRING - ,c21 STRING -) -; - -tunnel upload {TEST_DATA_DIR}/test_{TIME_STAMP} deepfm_test_{TIME_STAMP}; diff --git a/samples/dh_script/deep_fm/drop_table.sql b/samples/dh_script/deep_fm/drop_table.sql deleted file mode 100644 index 77e256560..000000000 --- a/samples/dh_script/deep_fm/drop_table.sql +++ /dev/null @@ -1,5 +0,0 @@ -drop TABLE IF EXISTS external_deepfm_train_{TIME_STAMP}; -drop TABLE IF EXISTS external_deepfm_test_{TIME_STAMP}; -drop TABLE IF EXISTS deepfm_train_{TIME_STAMP}; -drop TABLE IF EXISTS deepfm_test_{TIME_STAMP}; -drop TABLE IF EXISTS deepfm_output_v1_{TIME_STAMP};