-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributed training with Kubernetes #17
Comments
This is an interesting topic. I would imagine that a simple distribution pattern would be to run multiple instances of the current code on multiple machine, each simulating their own set of environments. The network parameters would need to be shared using parameter servers. |
@danijar So basically you want to do data parallel replicated training with TensorFlow I think that would work out of the box with the TfJob CRD. The CRD will take care of provisioning a set of workers and parameter servers. The workers can run any code (docker image); so they could use tensorflow/agents to launch a bunch of environments as subprocesses. You could also move the environments into 1 or more sidecars and communicate via the filesystem or sockets. This might be beneficial if
Would it be beneficial to run the simulations on different machines from the workers? It sounds like the pattern is many simulations to 1 worker; i.e. you have N workers and each worker uses B simulations to generate a batch of size B. If tensorflow/agents adds an RPC mode of communication, then I think K8s should be able to handle distributing both TensorFlow workers and the simulations across multiple machines. Using StatefulSets you could start N * B simulations; each with a stable DNS name. Each worker could then use B of these simulations. |
I think in many scenarios it makes sense to simulate and train on the same machine, and just scale the number of those machines. That's mainly because it seems to reduce communication overhead significantly by only sharing parameters, not all the collected data. I'd be super curious to know if that works out of the box as you mentioned it might! In some new scenarios we cannot simulate the environments on the same machine, let's say because the data is generated by some specialized service or on a real robot. In that case, we could implement a Gym environment that under the hood communicates with the data service via RPCs. But instead of communicating for every frame, we might want to send a policy to the data collector and get back full trajectories, which would require some more changes. |
So would I. Is there a simple example we could try to parallelize across multiple machines using TfJob? |
Maybe something like this: https://gist.github.com/danijar/1dde5eb528b61560734c16d4fd0d93a1 We will probably have to modify |
Sure! |
Still working on it! Hit some issues with my kube deployment when trying to re-deploy with accelerators. I'll push a version here in a minute to share for discussion. |
So here's the work in progress, let me know if you guys have thoughts about this model for how model code is built into images and run. It doesn't have to be so prescriptive but this is a workable model for now. |
OK so here's a cleaner version of that PR kubeflow/training-operator#159. So next up is extending the example with some form of parallelization initially with the environments still running in the same containers as where replicas of the model are being trained. @danijar @jlewi do you guys have thoughts on including the example in tf/k8s vs tf/agents? I don't want to go out of scope on k8s and this issue was originally posted here. |
Cool, I looked at the PR and it looks quite nice. The examples directory of the k8s repo is probably a good place for this code. I'm planning on adding some tutorials for Agents and would be happy to include a page about this and a link to the example. Did you do a full distributed training run and have some graphs to look at? |
Thank you. Also made the example a bit simpler per @jlewi's suggestion, see cwbeitel/k8s@790dfb1. That sounds good about the tutorial happy to help user test those. You should be able to tensorboard the logs from one of the runs (let me know if you can't access it, marked "make public") tensorboard --logdir gs://dev01-181118-181500-k8s/jobs/tensorflow-20171117102413/20171117T182424-pybullet_ant Rendering is running so I'll upload a gif to the readme when that's done / I have a chance. Have a suggestion of which graphs to include in the readme or did you just mean for browsing? Haven't made it distributed yet, that's next up! |
Thanks. TensorBoard is getting a 401 Unauthorized return code. Yep, checked the code and saw that it's single-instance training for now. Let me know if you run into problems when working on the distributed version, maybe I can help. For the readme, the graph with the mean score is probably most interesting, or a screenshot of all the scores on TensorBoard. |
Ok gs://agents-k8s-share/jobs/... should work going forward including gs://agents-k8s-share/jobs/tensorflow-20171117102413/20171117T182424-pybullet_ant Figures: Thanks happy to have help. Getting up to speed a bit on this class of algorithm and away this week. Maybe at some point we can write up in pseudocode what exactly we want to parallelize. Here's pseudocode for the KL and clipped versions of PPO: ## PPO-1 (KL)
for iteration do
Run policy for T timesteps over N trajectories
Estimate advantage function at all timesteps
Do SGD on above objective for some number of epochs
If KL too high, increase beta
If KL too low, decrease beta
end for ## PPO-2 (Clipped objective)
for iteration do
Run policy for T timesteps or N trajectories
Estimate advantage function at all timesteps
Do SGD on L^CLIP(theta) objective for some number of epochs
end for It seems like we basically want to do distributed SGD in the region of some policy parameterization where individual agents accumulate their own experience trajectories, perform a bit of private SGD, then all share their results with a central policy vector? Or were you thinking more that there would be a single node for policy optimization but distributed accumulation of experience (i.e. many parallel actors but a single critic?) |
I think we will have multiple machine that each run a batch of environments and an agent. The agents just need to synchronize their gradients. Since most of the time is spent collecting data, this will hopefully be only a small overhead. |
@danijar This sounds like standard between-graph replication training. Do we need to do anything special? |
Agree this sounds like a standard instance of between-graph replication and the remaining task is to put the policy parameters on the parameter servers. Just looking for these and understanding the structure/model of agents and the PPO algorithm. If I understand correctly in this example our policy is a parameterization of |
Yes, the model parameters are created within the |
Sounds good! |
So just to update having some issues with variable initialization, current version here. Currently non-chief nodes hang, repeating: INFO:tensorflow:Waiting for model to be ready. Ready_for_local_init_op:
Variables not initialized: ppo_temporary/episodes/Variable,
ppo_temporary/episodes/Variable_1, ppo_temporary/episodes/Variable_2,
ppo_temporary/episodes/Variable_3, ppo_temporary/episodes/Variable_4,
ppo_temporary/episodes/Variable_5, ppo_temporary/last_action,
ppo_temporary/last_mean, ppo_temporary/last_logstd, ready: None Presumably because these variables, which are private to an individual worker, are not initialized by the chief. It seems we should be initializing these variables in the local_init_op in response to the, ready_for_local_init_op, not blocking the local_init_op until these local variables are initialized. And there should be more variables on this list - environment simulation variables should be private to workers as well. I'm guessing there's something simple to be done to either signal workers to initialize their local variables or mark these variables in a way that the ready_for_local_init_op won't block for them (thinking they're global variables?). |
Hmm, I thought that the |
Yeah me too. They don't appear to be on the param servers. |
I'm wondering if I'm mixing things up a bit here. Probably shouldn't be putting anything on the master and using that only for initialization of variables on the workers and ps's. |
It really depends on how the code is setup. Running ops on the master("chief") is quite common. Typically worker 0 is the chief and is also running computations. |
@cwbeitel Are you still working on fixing this? |
Took a break from it (this has been a bit frustrating) but still mean to fix what can be. As I was explaining in my email, there is currently an error that occurs at the 10min mark when Saver() is called, caused by expecting Session instead of MonitoredTrainingSession: Traceback (most recent call last):
File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/app/trainer/task.py", line 299, in <module>
tf.app.run()
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/platform/app.py", line 48, in run
_sys.exit(main(_sys.argv[:1] + flags_passthrough))
File "/app/trainer/task.py", line 282, in main
for score in train(agents_config):
File "trainer/train.py", line 214, in train
loop._store_checkpoint(sess, saver, global_step)
File "/app/agents/agents/tools/loop.py", line 233, in _store_checkpoint
saver.save(sess, filename, global_step)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/saver.py", line 1565, in save
raise TypeError("'sess' must be a Session; %s" % sess)
TypeError: 'sess' must be a Session; <tensorflow.python.training.monitored_session.MonitoredSession object at 0x7ff9586de710> This causes jobs to crash then re-start, logging to the same directory as the previous run, causing the following error: INFO:tensorflow:Waiting for model to be ready. Ready_for_local_init_op:
Variables not initialized: ppo_temporary/episodes/Variable,
ppo_temporary/episodes/Variable_1, ppo_temporary/episodes/Variable_2,
ppo_temporary/episodes/Variable_3, ppo_temporary/episodes/Variable_4,
ppo_temporary/episodes/Variable_5, ppo_temporary/last_action,
ppo_temporary/last_mean, ppo_temporary/last_logstd, ready: None One approach here would be to disable Agents' checkpoint writing behavior and use that of MTS in favor of various other benefits of the latter including convenience of initializing ops related to distributed training and a convenient mechanism for introducing SessionRunHook's, e.g. for triggering render jobs. I believe this issue is distinct from the problem of local variables not being initialized in the context of distributed training which we discussed at length. (which was solved by adding non-shared variables to the local collection) I also believe this problem is distinct from the op not fetchable error that occurs when using SyncReplicasOptimizer since that error occurs immediately and the above only occurs once the checkpoint saver is triggered (after ~10min; or maybe it has a step number periodicity). The issue with SRO should be solvable. Sounds like punting that to an expert hasn't worked. Going forward I think the approach should be to look through the SRO code to see where it's adding new ops to the graph since it seems like the problem is that it is creating an op inside one of the tf.scan and/or tf.cond contexts. My initial attempt at initializing it outside of those and passing in the initialized SyncReplicasOptimizer via the agents config did not fix the problem but my guess is that something like this would. Another important observation is that since introducing MTS the mean_score is highly variable. This is with using an unmodified version of the Agents codebase. Also, when not using MTS, when a run does reach a successful conclusion it then (always) exits with a system error and causes jobs to re-start which then crash-loop from then onwards. |
When using MonitoredTrainingSession, you should disable its logging and checkpointing logic. I was using it in the beginning but it was too restrictive so I switched to a standard session. I believe you can get the underlying session from a MonitoredTrainingSession as Instead of SyncReplicasOptimizer, you could also try to manually combine the gradients across workers, but I'm unsure how much effort that would be. |
Thanks for the suggestion. I'll give that a try. Yeah that's been in the back of my mind as well that we could go that route. I'd like to do some more investigating with SRO before doing that myself. But it would be interesting! Certainly SRO provides a clear model of how to do this kind of synchronization. Also it would be good to see what the training quality actually looks like with asynchronous distributed training before deeming it worth investing in synchronous training. My bad on that - during development I added both the use of MTS and pooling of gradients among multiple workers and assumed at that time the variability in mean_score was due to the syncing of stale gradients. But now it's clear that this variability is the result of some other change that was made in switching over to using MTS since it can be seen when running with the current MTS setup on a single worker. |
Opening this issue to start a discussion about whether it would be worth investing to make it easy to run tensorflow agents K8s.
For some inspiration you can look at TfJob CRD.
Some questions:
* Is data fetched from all simulations simultaneously?
* Does each simulation need to be individually addressable?
The text was updated successfully, but these errors were encountered: