-
Notifications
You must be signed in to change notification settings - Fork 940
Conversion Guide
When converting a single-node TensorFlow application to a distributed TensorFlowOnSpark application, we recommend the following development process:
- Develop and test your TensorFlow application as a single-node application on small scale data. This will allow you to iterate faster and troubleshoot any TensorFlow-specific issues without introducing the complexities of Spark and distributed TensorFlow.
-
Convert your single-node TensorFlow application into a distributed TensorFlow application. At this point, you will add TensorFlow-specific code to allow your application to work in a distributed manner. If you use the higher-level
tf.keras
ortf.estimator
APIs, this will be much simpler than using the low-level APIs. You should still be able to test your code on a single machine by just running multiple processes/ports. -
Convert your distributed TensorFlow application to TensorFlowOnSpark. If your application is now "distributed", this conversion step should be fairly simple. If you have a single-node Spark Standalone installation, you can continue working on a single machine. Otherwise, use a small "cluster" of 1-3 executors while you sort out any Spark-specific issues (e.g.
spark.executorEnv.LD_LIBRARY_PATH
andspark.executorEnv.CLASSPATH
). At this point, you will need to choose whichInputMode
to use. In general, you should preferInputMode.TENSORFLOW
, unless you need to feed data directly from a Spark RDD or DataFrame (without persisting to disk first). - Gradually scale up your cluster. Once your application is working at a small scale, you can gradually increase the size of your cluster. You may need to adjust TensorFlow hyper-parameters as you scale out, e.g. batch sizes, learning rates, etc. Additionally, you may need to adjust various Spark settings, e.g. number of executors, memory limits, etc.
We have included several converted sample TensorFlow applications in this repository to help illustrate the conversion steps. For step 3, we'll highlight some of the main points below.
The argv
parameter will contain a full copy of the arguments supplied on the PySpark command line, while the ctx
parameter will contain node metadata, like job_name
and task_id
. Also, make sure that the import tensorflow as tf
occurs within this function, since this will be executed/imported on the executors. If there are any functions used by the main function, ensure that they are defined or imported inside the main_fun
block. If you see any Spark pickle errors, try separating the Spark and Tensorflow code into separate files and then use --py-files
to add the TensorFlow code to the executors.
# def main():
def main_fun(argv, ctx)
import tensorflow as tf
tf.app.run()
executes the TensorFlow main
function. Replace it with the following code to set up PySpark and launch TensorFlow on the executors. Note that we're using argparse
here mostly because the tf.app.FLAGS
mechanism is currently not an officially supported TensorFlow API.
if __name__ == '__main__':
# tf.app.run()
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from tensorflowonspark import TFCluster
import argparse
sc = SparkContext(conf=SparkConf().setAppName("your_app_name"))
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
parser = argparse.ArgumentParser()
parser.add_argument("--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, default=num_executors)
parser.add_argument("--num_ps", help="number of parameter servers", type=int, default=1)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW)
cluster.shutdown()
Note: if your application is using tf.app.FLAGS
, you can use the following code instead:
def main_fun(argv, ctx):
sys.argv = argv
FLAGS = tf.app.flags.FLAGS
...
if __name__ == '__main__':
...
args, rem = parser.parse_known_args()
cluster = TFCluster.run(sc. main_fun, rem, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW)
For tf.estimator
applications, use tf.estimator.train_and_evaluate
instead of separate train
and evaluate
calls
The train_and_evaluate API is a "utility function provides consistent behavior for both local (non-distributed) and distributed configurations". This is required for distributed estimators.
For tf.keras
applications, use tf.keras.estimator.model_to_estimator
with tf.estimator.train_and_evaluate
or use a DistributionStrategy
The support for distributed training in the Keras API has been evolving over time, so there are multiple techniques. The model_to_estimator API is the oldest and most stable. It essentially converts the Keras model into an estimator and you can proceed from there. Alternatively, the DistributionStrategy
API is the newest method and it will be the preferred method going forward, but it is still evolving.
Note: the TensorFlow team recommends using the high-level keras (and estimator) APIs going forward.
There should be code that:
- extracts the addresses for the
ps
andworker
nodes from the command line args - creates a cluster spec
- starts the TensorFlow server
These can all be replaced as follows.
# ps_hosts = FLAGS.ps_hosts.split(',')
# worker_hosts = FLAGS.worker_hosts.split(',')
# cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts, 'worker': worker_hosts})
# server = tf.train.Server( {'ps': ps_hosts, 'worker': worker_hosts},
# job_name=FLAGS.job_name, task_index=FLAGS.task_id)
cluster_spec, server = TFNode.start_cluster_server(ctx)
if job_name == "ps":
server.join()
elif job_name == "worker":
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):
If your code uses tf.Session
, you will need to use tf.train.MonitoredTrainingSession
instead, e.g.
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(task_index == 0),
scaffold=tf.train.Scaffold(init_op=init_op, summary_op=summary_op, saver=saver),
checkpoint_dir=model_dir,
hooks=[tf.train.StopAtStepHook(last_step=args.steps)]) as sess:
Finally, if using TensorBoard, ensure that the summaries are saved to the local disk of the "chief" worker with the following naming convention. The tensorboard server on the "chief" worker will look in this specific directory, so do not change the path.
summary_writer = tf.summary.FileWriter("tensorboard_%d" %(ctx.worker_num), graph=tf.get_default_graph())