Examining distributed training of Keras Models

Posted in: Google Cloud Platform, Technical Track

Creating deep learning models using Keras is pretty straightforward, which is why Keras is often used for prototyping and creating proof-of-concept products.

But when it comes to using it for training bigger models or using very big datasets, we need to either split the dataset or the model and distribute the training, and/or the inference into multiple devices and possibly over multiple machines, which Keras partially supported on “Keras.io” repository. One option would be to use a Python generator to do model training, but that might take a lot of time. The whole process of data ingestion might be bottlenecked by limited I/O throughput as data would be read from disk or possibly from a bucket in a cloud environment.

What is distributed training?

Previously, after you created a Keras model, you needed to convert it to Estimator (tf.keras.estimator.model_to_estimator) and adapt the data feeding mechanism required by the Estimator API – which is different from the way you feed data to a Keras model. With the adoption of the Keras framework as official high-level API for TensorFlow, it became highly integrated in the whole TensorFlow framework – which includes the ability to train a Keras model on multiple GPUs, TPUs, on multiple machines (containing more GPUs), and even on TPU pods. This, together with the adoption of the TF Dataset API, lets us use Keras with large datasets which would not fit in memory. This is done by using distributed training without needing to convert the model to the Estimator API. The use of the Dataset API also enables efficient use of training on TPU. It enables the TPU to directly ingest the data from Google Cloud Storage, instead of data being first transferred to the machine where the training was initiated (where the Python script is running). Estimator API is still supported, and is still required in some cases. For example, if you use TensorFlowExtended (TFX) then your model needs to be an Estimator.

In order to distribute the training of a Keras model, we can use one of the distribution strategies explained below. Bear in mind that not all of these strategies are officially supported at the time of writing, and may be available in TensorFlow nightly builds. During this testing, I used TF 2.0 alpha.

  • Mirrored Strategy
  • MultiWorker Mirrored Strategy
  • Parameter Server Strategy
  • TPU Strategy

Mirrored Strategy

As the name suggests, this strategy mirrors the Keras model onto multiple GPUs on a single machine. The speedup of training/inference is achieved by splitting the input batches so they are spread evenly across the devices. For example, if your current batch size is 128, training on 1 GPU and the GPU is used efficiently, if you want to train the model on 4 GPUs, you can increase the batch size to 128 x 4 = 512. This way each GPU will continue to receive 128 examples per step and be optimally loaded. You might also want to adjust the learning rate after increasing the batch size accordingly.

The training is being done in-sync over the data that is split. As the model is cloned on each GPU, it has its own variables (weights and biases of the neural network) but such variables, on each GPU, are updated with the same values which are aggregated and shared between GPUs by an all-reduce algorithm, which we can choose. By default, it is the NVIDIA NCCL.


# Create the distribution strategy
mirrored_strategy = tf.distribute.MirroredStrategy()

# Build and compile the model inside the strategy scope
with mirrored_strategy.scope():
model = tf.keras.Sequential()
model = add( … )
model.add( tf.keras.layers.Dense(1, input_shape=(1,)) )
model.compile(loss=’mse’, optimizer=’adam’)

# Build a throughput data ingestion with the Dataset API
train_dataset = tf.data.TFRecordDataset( train_files, compression_type=’GZIP’)
train_dataset = train_dataset.map(_parse_and_norm, num_parallel_calls=16) train_dataset = train_dataset.shuffle(buffer_size=BUFFER_SIZE)
train_dataset = train_dataset.repeat(NUM_EPOCHS)
train_dataset = train_dataset.batch(BATCH_SIZE)
train_dataset = train_dataset.prefetch(buffer_size = int(BUFFER_SIZE / BATCH_SIZE))

# Training of the model
model.fit(train_dataset, epochs=10)

When distributing the training on multiple devices/machines, as the throughput of the training processes increases, it is important to make sure that you feed the data to the training process as soon as they are processed. Otherwise the newly provisioned GPUs/workers might get underutilized and the training speed might not scale as expected. This is worth noting as in production environments the amount of training data would be more than you can fit in memory so they need to be read from disk as they are fed to the training process.

That is why in the example above I have put the whole Dataset API to ingest the data – the important part is the dataset.prefetch(), which enables reading and processing data in the background while the previously fetched data is being used for the training step. Also in dataset.map() transformation, we can use num_parallel_calls to use multiple CPUs/threads – all in favor of being able to increase the throughput of the data ingestion process and make sure this is not the bottleneck. If disk I/O cannot keep up, we can also use the dataset.interlave() transformation to perform the reads of the training files in parallel.

Basically, the idea is to make sure the bottleneck are the GPUs as they are the most expensive resource in the stack. We want to utilize them fully and reduce their idle time as much as possible.

For NVIDIA GPUs, we can monitor their utilization by using (example with using 2 x NVIDIA Tesla V100 GPUs) :

$ nvidia-smi
Thu May 9 10:19:06 2019
| NVIDIA-SMI 410.72 Driver Version: 410.72 CUDA Version: 10.0 |
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| 0 Tesla V100-SXM2… Off | 00000000:00:04.0 Off | 0 |
| N/A 49C P0 280W / 300W | 15820MiB / 16130MiB | 98% Default |
| 1 Tesla V100-SXM2… Off | 00000000:00:05.0 Off | 0 |
| N/A 49C P0 295W / 300W | 15820MiB / 16130MiB | 100% Default |

| Processes: GPU Memory |
| GPU PID Type Process name Usage |
| 0 2665 C python 15809MiB |
| 1 2665 C python 15809MiB |

MultiWorker Mirrored Strategy

This strategy is similar to the mirrored strategy but enables the distribution to be spread not just on GPUs on a single machine, but on multiple machines. The model is again cloned on all the GPUs on all machines and yhe same updates are being done in-sync over all of them, in-sync on each step. As the synchronization of these values needs to be done over a network, the optimal algorithm needs to be chosen depending on the type of GPU and interconnect network. You can opt for this to be chosen automatically at run-time like this:

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(tf.distribute.experimental.CollectiveCommunication.AUTO)

Also, we would need to set the TF_CONFIG environment variable with the specification of the nodes being used for training. Other than that, you can use the same code as above – so any Keras model training can be easily adjusted to be scaled on multiple machines using this strategy.

Parameter Server Strategy

This strategy is the same as one already used with TF Estimators for distributed training. It can be used both for training on multiple GPUs on one machine or over more machines. This is the default strategy in Google’s Cloud ML Engine. It is an asynchronous training, where training is done over all the data on all devices/machines independently (on the workers) but parameters are shared between them over so-called parameter servers. The term asynchronous here means that variables are not cloned but the computation is replicated across all GPUs of all the workers and executed independently, while parameters are shared over the parameter servers. Whenever one worker computes the gradients, it updates variables on the parameter server.

As it is an asynchronous training process, when used in a cluster over a network it would not be impacted by the network response time, leading to additional delays for the variables to be updated in-sync, so if used over a greater number of machines, this approach will be more scalable.

In order to use this on multiple machines, you need to specify worker machines which do the training, and parameter servers which are used just for asynchronous sharing of the parameters. You also need to set TF_CONFIG with appropriate machine configuration.

TPU Strategy

The TPU strategy enables the use of Google’s TPUs (or TPU pods) for training instead of CPU or GPU. This is also based on a MirroredStrategy synchronous approach.

You might wonder if the synchronous way of working might impede the performance when training on distributed TPUs in a TPU pod, but per Google, the TPU pods provide very low latency communication channels between the TPUs, as the design is based on interconnect communication channels from supercomputer clusters.

Here is an excerpt from Google Docs on how to use the TPUs in Colab or Google Cloud:

resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
tpu_strategy = tf.distribute.experimental.TPUStrategy(resolver)

TPUClusterResolver instance helps locate the TPUs. In Colab, you don’t need to specify any arguments to it. If you want to use this for Cloud TPUs, you will need to specify the name of your TPU resource in TPU argument. We also need to initialize the TPU system explicitly at the start of the program. This is required before TPUs can be used for computation and should ideally be done at the beginning because it also wipes out the TPU memory so all state will be lost.


Interested in working with Gorjan? Schedule a tech call.

No comments

Leave a Reply

Your email address will not be published. Required fields are marked *