ray: Ray component: RLlib: Last sync command failed: Sync process failed warning
What happened + What you expected to happen
I updated Ray to 2.4.0. Now, the following message is being consistently every 5 minutes being printed during training with tune. Tuner:
`2023-06-14 20:26:35,366 WARNING syncer.py:548 – Last sync command failed: Sync process failed: GetFileInfo() yielded path ‘C:/Users/user/ray_results/CustomEnv on LSTM/PPO_CustomEnv-v0_e87bc_00000_0_clip_param=0.1351,kl_coeff=0.0044,kl_target=0.0016,lambda=0.9857,lr=0.0000,vf_clip_param=9.7841,v_2023-06-14_20-21-33’, which is outside base dir ‘C:\Users\user\ray_results\CustomEnv on LSTM’
2023-06-14 20:31:37,678 WARNING syncer.py:548 – Last sync command failed: Sync process failed: GetFileInfo() yielded path ‘C:/Users/user/ray_results/CustomEnv on LSTM/PPO_CustomEnv-v0_e87bc_00000_0_clip_param=0.1351,kl_coeff=0.0044,kl_target=0.0016,lambda=0.9857,lr=0.0000,vf_clip_param=9.7841,v_2023-06-14_20-21-33’, which is outside base dir ‘C:\Users\user\ray_results\CustomEnv on LSTM’
2023-06-14 20:36:40,126 WARNING syncer.py:548 – Last sync command failed: Sync process failed: GetFileInfo() yielded path ‘C:/Users/user/ray_results/CustomEnv on LSTM/PPO_CustomEnv-v0_e87bc_00000_0_clip_param=0.1351,kl_coeff=0.0044,kl_target=0.0016,lambda=0.9857,lr=0.0000,vf_clip_param=9.7841,v_2023-06-14_20-21-33’, which is outside base dir ‘C:\Users\user\ray_results\CustomEnv on LSTM’.
What could be the issue?
I’ve also noticed that the minute this warning is printed, the RAM usage gets to over 95% even on a test Cartpole environment despite the tensorboard results showing that the average is below 86%.
Versions / Dependencies
Ray 2.4.0 Python 3.10 torch 2.0.1 gymnasium 0.28.1
Reproduction script
` class configs(): def init(self): args = ‘PPO’ self.trainerHPs = PPOLearnerHPs(params=HPRanges()).config self.algo = PPOConfig() self. trainer = ‘PPO’
# Training
self.framework = 'torch'
self.preprocessor_pref = 'rllib'
self.observation_filter = 'NoFilter'
self.train_batch_size = HPRanges().train_batch_size
self.max_seq_len = 20
#Rollouts
self.max_iterations = 100
self.num_rollout_workers = 0
self.rollout_fragment_length = round(self.train_batch_size / 3)
self.batch_mode = 'truncate_episodes'
self.create_env_on_local_worker = True if self.num_rollout_workers == 0 else False
self.num_envs_per_worker = 1
self.remote_worker_envs = True if self.num_envs_per_worker > 1 else False
# Remote envs only make sense to use if num_envs > 1 (i.e. environment vectorization is enabled).
# Evaluation
self.evaluation_parallel_to_training = False
self.evaluation_num_workers = 0 if self.evaluation_parallel_to_training == False else 1
self.evaluation_duration_unit = "episodes"
self.evaluation_duration = 1 # is still don't know why the eval is getting to the full size of data set
self.evaluation_frequency = round(self.max_iterations / 1)
# Exploration
self.random_steps = 1000
# Set up env
self.render_mode= None
# Resources
# This config allows for curiosity exploration since it doesn't support parallelism
#self.num_cpus_per_worker= 1
#self.num_cpus_for_local_worker= 1
self.num_cpus_per_trainer_worker = 0 #self.num_envs_per_worker #+ 1 #if num rollout workers is high
self.num_trainer_workers= 0 #self.num_rollout_workers
self.num_gpus = 0
self.num_gpus_per_worker= 0
self.num_gpus_per_trainer_worker= 0
self._fake_gpus = False
self.custom_resources_per_worker= None
self.placement_strategy= "PACK"
# Tuner
self.num_samples = 2
self.max_concurrent_trials = 2
self.time_budget_s = None
# Logging
self.experiment_name = "CustomEnv on LSTM"
self.log_name = self.experiment_name
self.log_dir = "C:/.../TrainLogs/"
# Metrics
self.metrics = 'episode_reward_mean' # Find out how to make sure that
self.mode = 'max'
# Checkpoints
self.score = 'episode_reward_mean'
self.checkpoint_frequency = 100
self.num_to_keep =2
# Others
self.verbose =1
# Register Model in Ray Registry
ModelCatalog.register_custom_model(
model_name = "LSTM 1",
model_class= CustomNet2
)
self.model_3 ={
'custom_model': 'LSTM 1',
"max_seq_len": 10,
"_disable_action_flattening": True,
"_time_major": True
}
self.exploration_config = {
'type': 'StochasticSampling', # Default for PG algorithms
"random_timesteps": self.random_steps, # Int
'framework': self. framework,
}
self.centered_adam = False
self.optimizer_config = {
'type': "RAdam",
'lr': sample.uniform(5e-6, 1e-5),
#'beta1': 0.9,
#'beta2': 0.999, # Only used if centered=False.
#'eps': 1e-08, # Only used if centered=False.
#'centered': self.centered_adam,
#'amsgrad': False # Only used if centered=False.
}
train_env_config={
'render_mode': None,
'window_size': 100,
'rewards_window_size': 100,
'min_periods': 500,
'max_allowed_loss': 0.3,
'max_episode_steps': 1500,
}
eval_env_config={
'render_mode': None,
'window_size': 100,
'rewards_window_size': 100,
'min_periods': 500,
'max_allowed_loss': 0.3,
'max_episode_steps': 1500,
}
self.evaluation_config_ = self.algo.overrides( # type: ignore
explore= True,
env_config= eval_env_config
)
self.config = (
self.algo
.update_from_dict(config_dict=self.trainerHPs.to_dict())
.environment(
env = "CustomEnv-v0",
#env = 'CartPole-v1', # Test
env_config= train_env_config,
render_env=False,
clip_rewards= None,
auto_wrap_old_gym_envs=True
)
.framework(framework='torch')
.debugging(
log_level = "ERROR", # type: ignore
log_sys_usage = True)
.rollouts(
num_rollout_workers=self.num_rollout_workers,
create_env_on_local_worker= False if self.num_rollout_workers > 0 else True,
enable_connectors=True,
rollout_fragment_length=self.rollout_fragment_length,
batch_mode= self.batch_mode,
remote_worker_envs=self.remote_worker_envs,
remote_env_batch_wait_ms=0,
validate_workers_after_construction=True,
preprocessor_pref= self.preprocessor_pref,
observation_filter= self.observation_filter,
synchronize_filter=True,
compress_observations=False,
enable_tf1_exec_eagerly=False)
.fault_tolerance(
recreate_failed_workers=False,
max_num_worker_restarts=1,
delay_between_worker_restarts_s=20,
restart_failed_sub_environments=False,
num_consecutive_worker_failures_tolerance=100,
worker_health_probe_timeout_s=100,
worker_restore_timeout_s=180
)
.resources(
#num_cpus_per_worker= self.num_cpus_per_worker,
#num_gpus_per_worker= self.num_gpus_per_worker,
#num_cpus_for_local_worker= self.num_cpus_for_local_worker,
placement_strategy= self.placement_strategy
)
.exploration(
explore = True,
exploration_config=self.exploration_config
)
.checkpointing(
export_native_model_files = False,
checkpoint_trainable_policies_only = False) # Bool
.evaluation(
evaluation_interval = self.evaluation_frequency,
evaluation_duration = self.evaluation_duration,
evaluation_duration_unit = self.evaluation_duration_unit,
evaluation_sample_timeout_s = 180,
evaluation_parallel_to_training = self.evaluation_parallel_to_training,
#evaluation_config = self.evaluation_config_,
#off_policy_estimation_methods = {},
ope_split_batch_by_episode = True, # default
evaluation_num_workers = self.evaluation_num_workers,
# custom_evaluation_function = None,
always_attach_evaluation_results = True,
enable_async_evaluation = True if self.evaluation_num_workers > 1 else False # Turn on if eval workers is > 1
)
.training(
gamma=0.98,#HPRanges().gamma, # type: ignore
model=self.model_3,# type: ignore
train_batch_size=HPRanges().train_batch_size,# type: ignore
optimizer=self.optimizer_config,
lr=1e-5,#HPRanges().lr # type: ignore
grad_clip_by='norm',# type: ignore
grad_clip=0.3,
#_enable_learner_api=True,
#learner_class=None
))
self.config_dict = self.config.to_dict()
self.stopper = CombinedStopper(
MaximumIterationStopper(max_iter=self.max_iterations),
TrialPlateauStopper(
metric= self.metrics,
std= 0.04,
num_results= 10,
grace_period = 10000,
metric_threshold = 200,
mode = 'max'),
)
self.checkpointer = CheckpointConfig(
num_to_keep= self.num_to_keep,
checkpoint_score_attribute= self. Score,
checkpoint_frequency= self.checkpoint_frequency,
checkpoint_at_end= True)
self.failure_check = FailureConfig(
max_failures= 0,
fail_fast= True)
self.sync_config = SyncConfig(
#upload_dir = None,
syncer = "auto",
sync_period = DEFAULT_SYNC_PERIOD,
sync_timeout = DEFAULT_SYNC_TIMEOUT,
sync_on_checkpoint = True)
hyper_dict ={
# distribution for resampling
'gamma' : lambda: np.random.uniform(0.8, 0.9997),
'lr' : lambda: np.random.uniform(5e-6, 0.003),
'vf_loss_coeff': lambda: np.random.uniform(1e-3, 1e-1),
'kl_coeff' : lambda: np.random.uniform(0.0005, 0.01),
'kl_target': lambda: np.random.uniform(0.0005, 0.003),
'lambda_' : lambda: np.random.uniform(0.90, 0.9999),
'clip_param': lambda: np.random.uniform(0.05, 0.15),
'grad_clip': lambda: np.random.uniform(1.0, 40.0),
}
self.pbt_scheduler = PopulationBasedTraining(
time_attr='training_iteration',
perturbation_interval = self.checkpoint_frequency,
burn_in_period = 0,
hyperparam_mutations = hyper_dict, #type:ignore
quantile_fraction = 0.20, # Paper default
resample_probability = 0.25,
perturbation_factors = (1.2, 0.8), # Paper default
#custom_explore_fn = None
)
def train(self):
''' Tune and Train'''
if ray.is_initialized(): #type: ignore
ray.shutdown() # type: ignore
ray.init( # type: ignore
#local_mode=True, # Deprecated and will be removed
include_dashboard=True,
ignore_reinit_error=False,
#num_gpus=1,
#num_cpus=6, # Intel Core i5-8350U 8 cores 6MB Cache 1.7 - 1.9 Ghz
)
# Register the environment - Called the registry after the init of Ray
registry.register_env("CustomEnv-v0", lambda env_config: env_creator(env_config))
#registry.register_env("Cartpole", Cartpole) # using an env creator as seen in SimpleCorridor Example
start = time.time()
analysis = tune.Tuner(
trainable= args,
param_space= self.config.to_dict(),
tune_config= tune.TuneConfig(
mode = self.mode,
metric= self.metrics,
scheduler= self.pbt_scheduler,
num_samples= self.num_samples,
max_concurrent_trials= self.max_concurrent_trials,
time_budget_s = self.time_budget_s,
reuse_actors = True,
#trial_name_creator = None,
#trial_dirname_creator = None,
chdir_to_trial_dir = True),
run_config= air.RunConfig(
name= self.experiment_name,
#local_dir = None,
stop= self.stopper,
failure_config= self.failure_check,
sync_config= self.sync_config,
checkpoint_config = self.checkpointer,
verbose = 1,
#log_to_file = False
))
results = analysis.fit()
if self.num_samples > 0:
print(f'Best Trial log directory: {results.get_best_result()}')
print(f'Best hyperparameters found were: {results.get_best_result().config}')
ray.shutdown()# type: ignore
taken = time.time() - start
print(f"Time taken: {taken:.2f} seconds.")
if self.num_samples > 0:
self.best_config = results.get_best_result().config
self.best_logdir_chkpt = results.get_best_result().checkpoint
return results
`
Issue Severity
Medium: It is a significant difficulty but I can work around it.
About this issue
- Original URL
- State: closed
- Created a year ago
- Reactions: 1
- Comments: 23 (1 by maintainers)
Hey @harryseely , have you tried this with PBT?? I’m unable to use
storage_path="./"as it fails all the time.It is a bug - somewhere in the Ray code the remote directory was set as the local directory, and Windows does not take it well when Ray tries to copy checkpoints to itself. I posted on discussion board earlier - try not to bore everyone and repeat what I said here.
For now I can confirm I downgraded to 2.3 and it works.
Looking forward to the next release as I could not get it to work with
storage_path="./"eitherThanks for raising this and following up. This is indeed a bug, and it should be fixed here: https://github.com/ray-project/ray/pull/38319
The fix will be included in Ray 2.7.
As a workaround, you can set the
storage_pathto a relative local directory, which will then not trigger the buggy code path.I have the same problem in ray 2.6.1 on Windows.