ray: Ray component: RLlib: Last sync command failed: Sync process failed warning

What happened + What you expected to happen

I updated Ray to 2.4.0. Now, the following message is being consistently every 5 minutes being printed during training with tune. Tuner: `2023-06-14 20:26:35,366 WARNING syncer.py:548 – Last sync command failed: Sync process failed: GetFileInfo() yielded path ‘C:/Users/user/ray_results/CustomEnv on LSTM/PPO_CustomEnv-v0_e87bc_00000_0_clip_param=0.1351,kl_coeff=0.0044,kl_target=0.0016,lambda=0.9857,lr=0.0000,vf_clip_param=9.7841,v_2023-06-14_20-21-33’, which is outside base dir ‘C:\Users\user\ray_results\CustomEnv on LSTM’ 2023-06-14 20:31:37,678 WARNING syncer.py:548 – Last sync command failed: Sync process failed: GetFileInfo() yielded path ‘C:/Users/user/ray_results/CustomEnv on LSTM/PPO_CustomEnv-v0_e87bc_00000_0_clip_param=0.1351,kl_coeff=0.0044,kl_target=0.0016,lambda=0.9857,lr=0.0000,vf_clip_param=9.7841,v_2023-06-14_20-21-33’, which is outside base dir ‘C:\Users\user\ray_results\CustomEnv on LSTM’ 2023-06-14 20:36:40,126 WARNING syncer.py:548 – Last sync command failed: Sync process failed: GetFileInfo() yielded path ‘C:/Users/user/ray_results/CustomEnv on LSTM/PPO_CustomEnv-v0_e87bc_00000_0_clip_param=0.1351,kl_coeff=0.0044,kl_target=0.0016,lambda=0.9857,lr=0.0000,vf_clip_param=9.7841,v_2023-06-14_20-21-33’, which is outside base dir ‘C:\Users\user\ray_results\CustomEnv on LSTM’. What could be the issue?

I’ve also noticed that the minute this warning is printed, the RAM usage gets to over 95% even on a test Cartpole environment despite the tensorboard results showing that the average is below 86%. image

Versions / Dependencies

Ray 2.4.0 Python 3.10 torch 2.0.1 gymnasium 0.28.1

Reproduction script

` class configs(): def init(self): args = ‘PPO’ self.trainerHPs = PPOLearnerHPs(params=HPRanges()).config self.algo = PPOConfig() self. trainer = ‘PPO’

    # Training
    self.framework = 'torch'
    self.preprocessor_pref = 'rllib'
    self.observation_filter = 'NoFilter'
    self.train_batch_size = HPRanges().train_batch_size
    self.max_seq_len = 20

    #Rollouts
    self.max_iterations = 100
    self.num_rollout_workers = 0
    self.rollout_fragment_length = round(self.train_batch_size / 3)
    self.batch_mode = 'truncate_episodes'
    self.create_env_on_local_worker = True if self.num_rollout_workers == 0 else False
    self.num_envs_per_worker = 1 
    self.remote_worker_envs = True if self.num_envs_per_worker > 1 else False 
    # Remote envs only make sense to use if num_envs > 1 (i.e. environment vectorization is enabled).

    # Evaluation
    self.evaluation_parallel_to_training = False
    self.evaluation_num_workers = 0 if self.evaluation_parallel_to_training == False else 1 
    self.evaluation_duration_unit = "episodes" 
    self.evaluation_duration =  1 # is still don't know why the eval is getting to the full size of data set
    self.evaluation_frequency = round(self.max_iterations / 1) 

    # Exploration
    self.random_steps = 1000

    # Set up env
    self.render_mode= None

    # Resources
    # This config allows for curiosity exploration since it doesn't support parallelism
    #self.num_cpus_per_worker= 1
    #self.num_cpus_for_local_worker= 1
    self.num_cpus_per_trainer_worker = 0 #self.num_envs_per_worker #+ 1 #if num rollout workers is high
    self.num_trainer_workers= 0 #self.num_rollout_workers
    
    self.num_gpus = 0 
    self.num_gpus_per_worker= 0 
    self.num_gpus_per_trainer_worker= 0 
    self._fake_gpus = False 
    self.custom_resources_per_worker= None 
    self.placement_strategy= "PACK"

    # Tuner
    self.num_samples = 2
    self.max_concurrent_trials = 2
    self.time_budget_s = None
    
    # Logging
    self.experiment_name = "CustomEnv on LSTM"
    self.log_name = self.experiment_name
    self.log_dir = "C:/.../TrainLogs/"

    # Metrics
    self.metrics = 'episode_reward_mean' # Find out how to make sure that
    self.mode = 'max'

    # Checkpoints
    self.score = 'episode_reward_mean'
    self.checkpoint_frequency = 100
    self.num_to_keep =2

    # Others
    self.verbose =1

    # Register Model in Ray Registry
    ModelCatalog.register_custom_model(
        model_name = "LSTM 1",
        model_class= CustomNet2
    )

    self.model_3 ={
        'custom_model': 'LSTM 1',
        "max_seq_len": 10,
        "_disable_action_flattening": True,
        "_time_major": True
    }

 
    self.exploration_config = {
        'type': 'StochasticSampling', # Default for PG algorithms
        "random_timesteps": self.random_steps, # Int
        'framework': self. framework,
    }

    self.centered_adam = False
    self.optimizer_config = {
        'type': "RAdam",
        'lr': sample.uniform(5e-6, 1e-5),       
        #'beta1': 0.9, 
        #'beta2': 0.999, # Only used if centered=False.
        #'eps': 1e-08, # Only used if centered=False.
        #'centered': self.centered_adam, 
        #'amsgrad': False # Only used if centered=False.
    }

    train_env_config={
        'render_mode': None,
        'window_size': 100,
        'rewards_window_size': 100,
        'min_periods': 500,
        'max_allowed_loss': 0.3,
        'max_episode_steps': 1500,
    }
    eval_env_config={
        'render_mode': None,
        'window_size': 100,
        'rewards_window_size': 100, 
        'min_periods': 500,
        'max_allowed_loss': 0.3,
        'max_episode_steps': 1500,
    }
    self.evaluation_config_ = self.algo.overrides( # type: ignore
        explore= True,
        env_config= eval_env_config
        )

    self.config = (
        self.algo
        .update_from_dict(config_dict=self.trainerHPs.to_dict())
        .environment(
            env = "CustomEnv-v0", 
            #env = 'CartPole-v1', # Test
            env_config= train_env_config,
            render_env=False,
            clip_rewards= None,
            auto_wrap_old_gym_envs=True
        )
        .framework(framework='torch')
        .debugging(
            log_level = "ERROR", # type: ignore
            log_sys_usage = True)
        .rollouts(
            num_rollout_workers=self.num_rollout_workers,
            create_env_on_local_worker= False if self.num_rollout_workers > 0 else True, 
            enable_connectors=True,
            rollout_fragment_length=self.rollout_fragment_length,
            batch_mode= self.batch_mode,
            remote_worker_envs=self.remote_worker_envs,
            remote_env_batch_wait_ms=0,
            validate_workers_after_construction=True,
            preprocessor_pref= self.preprocessor_pref,
            observation_filter= self.observation_filter,
            synchronize_filter=True,
            compress_observations=False,
            enable_tf1_exec_eagerly=False)
        .fault_tolerance(
            recreate_failed_workers=False,
            max_num_worker_restarts=1,
            delay_between_worker_restarts_s=20,
            restart_failed_sub_environments=False,
            num_consecutive_worker_failures_tolerance=100,
            worker_health_probe_timeout_s=100,
            worker_restore_timeout_s=180
        )
        .resources(
            #num_cpus_per_worker= self.num_cpus_per_worker, 
            #num_gpus_per_worker= self.num_gpus_per_worker, 
            #num_cpus_for_local_worker= self.num_cpus_for_local_worker, 
            placement_strategy= self.placement_strategy
        )
        .exploration(
            explore = True,
            exploration_config=self.exploration_config
        )
        .checkpointing(
            export_native_model_files = False, 
            checkpoint_trainable_policies_only = False) # Bool
        .evaluation(
            evaluation_interval = self.evaluation_frequency,
            evaluation_duration = self.evaluation_duration,
            evaluation_duration_unit = self.evaluation_duration_unit,
            evaluation_sample_timeout_s = 180,
            evaluation_parallel_to_training = self.evaluation_parallel_to_training,
            #evaluation_config = self.evaluation_config_,
            #off_policy_estimation_methods = {},
            ope_split_batch_by_episode = True, # default
            evaluation_num_workers = self.evaluation_num_workers, 
            # custom_evaluation_function = None,
            always_attach_evaluation_results = True,
            enable_async_evaluation = True if self.evaluation_num_workers > 1 else False # Turn on if eval workers is > 1
        )
        .training(
            gamma=0.98,#HPRanges().gamma, # type: ignore
            model=self.model_3,# type: ignore
            train_batch_size=HPRanges().train_batch_size,# type: ignore
            optimizer=self.optimizer_config,
            lr=1e-5,#HPRanges().lr # type: ignore
            grad_clip_by='norm',# type: ignore
            grad_clip=0.3,
            #_enable_learner_api=True,
            #learner_class=None
        ))
    self.config_dict = self.config.to_dict()

    self.stopper = CombinedStopper(
        MaximumIterationStopper(max_iter=self.max_iterations),
        TrialPlateauStopper(
            metric= self.metrics,
            std= 0.04,
            num_results= 10,
            grace_period = 10000,
            metric_threshold = 200,
            mode = 'max'),
        )

    self.checkpointer = CheckpointConfig(
        num_to_keep= self.num_to_keep, 
        checkpoint_score_attribute= self. Score,
        checkpoint_frequency= self.checkpoint_frequency, 
        checkpoint_at_end= True)

    self.failure_check = FailureConfig(
        max_failures= 0,
        fail_fast= True)

    self.sync_config = SyncConfig(
        #upload_dir = None, 
        syncer = "auto", 
        sync_period = DEFAULT_SYNC_PERIOD, 
        sync_timeout = DEFAULT_SYNC_TIMEOUT, 
        sync_on_checkpoint = True)

    hyper_dict ={
        # distribution for resampling
        'gamma' : lambda: np.random.uniform(0.8, 0.9997),
        'lr' : lambda: np.random.uniform(5e-6, 0.003),
        'vf_loss_coeff': lambda: np.random.uniform(1e-3, 1e-1),
        'kl_coeff' : lambda: np.random.uniform(0.0005, 0.01),
        'kl_target': lambda: np.random.uniform(0.0005, 0.003),
        'lambda_' :  lambda: np.random.uniform(0.90, 0.9999),
        'clip_param': lambda: np.random.uniform(0.05, 0.15),
        'grad_clip':  lambda: np.random.uniform(1.0, 40.0),
    }

    self.pbt_scheduler = PopulationBasedTraining(
        time_attr='training_iteration',
        perturbation_interval = self.checkpoint_frequency, 
        burn_in_period = 0, 
        hyperparam_mutations = hyper_dict, #type:ignore
        quantile_fraction = 0.20, # Paper default 
        resample_probability = 0.25, 
        perturbation_factors = (1.2, 0.8), # Paper default
        #custom_explore_fn = None
    )

def train(self):
    ''' Tune and Train'''
    if ray.is_initialized(): #type: ignore
        ray.shutdown() # type: ignore
    ray.init( # type: ignore
        #local_mode=True, # Deprecated and will be removed
        include_dashboard=True,
        ignore_reinit_error=False,
        #num_gpus=1,
        #num_cpus=6, # Intel Core i5-8350U 8 cores 6MB Cache 1.7 - 1.9 Ghz
        )
    
    # Register the environment - Called the registry after the init of Ray
    registry.register_env("CustomEnv-v0", lambda env_config: env_creator(env_config))
    #registry.register_env("Cartpole", Cartpole) # using an env creator as seen in SimpleCorridor Example
    start = time.time()
    analysis = tune.Tuner(
        trainable= args, 
        param_space= self.config.to_dict(),
        tune_config= tune.TuneConfig(
            mode = self.mode,
            metric= self.metrics,  
            scheduler= self.pbt_scheduler, 
            num_samples=  self.num_samples, 
            max_concurrent_trials= self.max_concurrent_trials, 
            time_budget_s = self.time_budget_s, 
            reuse_actors = True, 
            #trial_name_creator = None,
            #trial_dirname_creator = None, 
            chdir_to_trial_dir = True),
        run_config= air.RunConfig(
            name= self.experiment_name, 
            #local_dir = None, 
            stop= self.stopper,
            failure_config= self.failure_check, 
            sync_config= self.sync_config, 
            checkpoint_config = self.checkpointer, 
            verbose = 1, 
            #log_to_file = False
            ))
    results = analysis.fit()

    if self.num_samples > 0:
        print(f'Best Trial log directory: {results.get_best_result()}')
        print(f'Best hyperparameters found were: {results.get_best_result().config}')
    
    ray.shutdown()# type: ignore

    taken = time.time() - start
    print(f"Time taken: {taken:.2f} seconds.")

    if self.num_samples > 0:
        self.best_config = results.get_best_result().config
    self.best_logdir_chkpt = results.get_best_result().checkpoint

    return results

`

Issue Severity

Medium: It is a significant difficulty but I can work around it.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Reactions: 1
  • Comments: 23 (1 by maintainers)

Most upvoted comments

storage_path=“./” works for me and I can perform tuning without saving any model checkpoints. However, if I do need to save checkpoints in the future this will not work. Hopefully this is fully fixed in 2.7 release.

Thanks for raising this and following up. This is indeed a bug, and it should be fixed here: #38319 The fix will be included in Ray 2.7. As a workaround, you can set the storage_path to a relative local directory, which will then not trigger the buggy code path.

from ray import air, tune

tuner = tune.Tuner(
    train_fn,
    run_config=air.RunConfig(storage_path="./")
)
tuner.fit()

Hey @harryseely , have you tried this with PBT?? I’m unable to use storage_path="./" as it fails all the time.

It is a bug - somewhere in the Ray code the remote directory was set as the local directory, and Windows does not take it well when Ray tries to copy checkpoints to itself. I posted on discussion board earlier - try not to bore everyone and repeat what I said here.

For now I can confirm I downgraded to 2.3 and it works.

Looking forward to the next release as I could not get it to work with storage_path="./" either

Thanks for raising this and following up. This is indeed a bug, and it should be fixed here: https://github.com/ray-project/ray/pull/38319

The fix will be included in Ray 2.7.

As a workaround, you can set the storage_path to a relative local directory, which will then not trigger the buggy code path.

from ray import air, tune

tuner = tune.Tuner(
    train_fn,
    run_config=air.RunConfig(storage_path="./")
)
tuner.fit()

I have the same problem in ray 2.6.1 on Windows.