Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input pipeline #44

Open
Pytq opened this issue Sep 4, 2018 · 1 comment
Open

Input pipeline #44

Pytq opened this issue Sep 4, 2018 · 1 comment

Comments

@Pytq
Copy link

Pytq commented Sep 4, 2018

Hello,

I profiled the training and it seems that GPU operation and CPU operation are not done in parrallel as they should with the line "dataset.prefetch(4)" in train.py
Here is a screenshot of what I mean:
https://imgur.com/a/tHHQ3OK

So I tried something simple. I converted the dataset into a TFRecordDataset and read from that instead of the current pipeline. The resulting pipeline was 1) much faster 2) executed in parallel, here is the new profile:
https://imgur.com/a/Vlh2VmG

On a K80 it roughtly doubled (EDIT: after removing the profiler it is x4.7) the pos/s on a 6x64 network.

Here is some code to transform into a TFRecordDataset:

def _bytes_feature(value):
    value = tf.compat.as_bytes(value.tostring())
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def write_dataset(dataset, train_iterator, test_iterator, train_ratio):
    session = tf.Session()
    handle = tf.placeholder(tf.string, shape=[])
    iterator = tf.data.Iterator.from_string_handle(
        handle, dataset.output_types, dataset.output_shapes)
    next_batch = iterator.get_next()
    handles = {'train': session.run(train_iterator.string_handle()),
               'test': session.run(test_iterator.string_handle())}
    x = next_batch[0]  # tf.placeholder(tf.float32, [None, 112, 8*8])
    y_ = next_batch[1]  # tf.placeholder(tf.float32, [None, 1858])
    z_ = next_batch[2]  # tf.placeholder(tf.float32, [None, 1])

    filenames = {'train': 'train_bytes2', 'test': 'test_bytes2'}

    options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.GZIP)
    writers = {key: tf.python_io.TFRecordWriter(filenames[key], options=options)
               for key in filenames}
    train_every = int(train_ratio / (1 - train_ratio)) + 1
    for i in range(200):
        t = time.time()
        key = 'train'
        if not i % train_every:
            key = 'test'
        datas = session.run([tf.reshape(x, [-1, 112 * 8 * 8]), y_, z_],
                            feed_dict={handle: handles[key]})
        assert datas[0].shape[0] == datas[1].shape[0] == datas[2].shape[0]
        batch_size = datas[0].shape[0]
        for k in range(batch_size):
            x_raw = np.array(datas[0][k])
            _y_raw = np.array(datas[1][k])
            _z_raw = np.array(datas[2][k])

            example_bytes = tf.train.Example(
                features=tf.train.Features(
                    feature={
                        'x': _bytes_feature(x_raw),
                        '_y': _bytes_feature(_y_raw),
                        '_z': _bytes_feature(_z_raw)
                    }))
            writers[key].write(example_bytes.SerializeToString())
        print(key, (time.time() - t) / batch_size, batch_size / (time.time() - t))

    for key in writers:
        writers[key].close()

And here to read it :

    def extract(example):
        features = {
            'x': tf.FixedLenFeature((), tf.string),
            '_y': tf.FixedLenFeature((), tf.string),
            '_z': tf.FixedLenFeature((), tf.string)
        }
        parsed_example = tf.parse_single_example(example, features)
        x = tf.decode_raw(parsed_example['x'], tf.float32)
        _y = tf.decode_raw(parsed_example['_y'], tf.float32)
        _z = tf.decode_raw(parsed_example['_z'], tf.float32)
        x.set_shape([112 * 64])
        _y.set_shape([1858])
        _z.set_shape([1])
        x = tf.reshape(x, [112, 64])
        return x, _y, _z

    filenames = {'train': 'test_bytes', 'test': 'test_bytes'}

    dataset = tf.data.TFRecordDataset(filenames=[filenames['train']],
                                      compression_type='GZIP')
    dataset = dataset.map(extract)
    dataset = dataset.batch(total_batch_size)
    dataset = dataset.prefetch(4)
    train_iterator = dataset.make_one_shot_iterator()

    dataset = tf.data.TFRecordDataset(filenames=[filenames['test']],
                                      compression_type='GZIP')
    dataset = dataset.map(extract)
    dataset = dataset.batch(total_batch_size)
    dataset = dataset.prefetch(4)
    test_iterator = dataset.make_one_shot_iterator()

The reason I am interested in this is that I would like to train very small networks to try different architectures but the bottleneck becomes the input pipeline for small networks.

I think pre-read the data to write them in TFRecords format is worth. Or there is simpler solution?
Do you have any thought on that? I have no idea how the current pipeline works.

Thanks

EDIT: Actually when removing the profiler the gain was even bigger.
With current pipeline: (773.638 pos/s)
With TFRecordDataset: (3661.37 pos/s)
This is still with K80, 6x64 network, batch_size: 1024 and no batch split.

@Error323
Copy link
Member

Error323 commented Sep 5, 2018

Interesting, a few things:

  1. I'm getting ~5000 pos/s on a single GTX 1080Ti with our current architecture (6 CPU cores with HT)
  2. The TFRecord will probably blow up in memory as it's fed into the shufflebuffer compared to our binary format
  3. Using the tensorflow default functions is good to achieve better parallelism

I'm not CPU bottlenecked, but it's very likely that our implementation isn't parallel wrt CPU / GPU usage. As you said on discord, you have 2 CPUs, this is likely the issue.

I'm implementing a multigpu version that utilizes standard tensorflow API better which should help with this. Thank you for reporting!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants