modin: TEST: Flaky test: TimeoutError from Ray not starting up on Windows

Here is a sample failure from modin/pandas/test/dataframe/test_udf.py tested on ray on windows. Near the end of the error we get Exception: The current node has not been updated within 30 seconds, this could happen because of some of the Ray processes failed to startup.. This might be another reason to have a ray-cluster running across multiple tests instead of starting a new one in each test, as @devin-petersohn suggested here.

Show stack trace
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
modin\pandas\test\utils.py:870: in create_test_dfs
    post_fn, [pd.DataFrame(*args, **kwargs), pandas.DataFrame(*args, **kwargs)]
modin\logging\logger_metaclass.py:68: in log_wrap
    return method(*args, **kwargs)
modin\pandas\dataframe.py:114: in __init__
    Engine.subscribe(_update_engine)
modin\config\pubsub.py:213: in subscribe
    callback(cls)
modin\pandas\__init__.py:124: in _update_engine
    initialize_ray()
modin\core\execution\ray\common\utils.py:210: in initialize_ray
    ray.init(**ray_init_kwargs)
C:\Miniconda\envs\modin\lib\site-packages\ray\_private\client_mode_hook.py:105: in wrapper
    return func(*args, **kwargs)
C:\Miniconda\envs\modin\lib\site-packages\ray\worker.py:1022: in init
    _global_node = ray.node.Node(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <ray.node.Node object at 0x00000298FA890FA0>
ray_params = <ray._private.parameter.RayParams object at 0x00000298F96996D0>
head = True, shutdown_at_exit = False, spawn_reaper = True, connect_only = False
    def __init__(
        self,
        ray_params,
        head=False,
        shutdown_at_exit=True,
        spawn_reaper=True,
        connect_only=False,
    ):
        """Start a node.
        Args:
            ray_params (ray.params.RayParams): The parameters to use to
                configure the node.
            head (bool): True if this is the head node, which means it will
                start additional processes like the Redis servers, monitor
                processes, and web UI.
            shutdown_at_exit (bool): If true, spawned processes will be cleaned
                up if this process exits normally.
            spawn_reaper (bool): If true, spawns a process that will clean up
                other spawned processes if this process dies unexpectedly.
            connect_only (bool): If true, connect to the node without starting
                new processes.
        """
        if shutdown_at_exit:
            if connect_only:
                raise ValueError(
                    "'shutdown_at_exit' and 'connect_only' cannot both be true."
                )
            self._register_shutdown_hooks()
        self.head = head
        self.kernel_fate_share = bool(
            spawn_reaper and ray._private.utils.detect_fate_sharing_support()
        )
        self.all_processes = {}
        self.removal_lock = threading.Lock()
        # Set up external Redis when `RAY_REDIS_ADDRESS` is specified.
        redis_address_env = os.environ.get("RAY_REDIS_ADDRESS")
        if ray_params.external_addresses is None and redis_address_env is not None:
            external_redis = redis_address_env.split(",")
            # Reuse primary Redis as Redis shard when there's only one
            # instance provided.
            if len(external_redis) == 1:
                external_redis.append(external_redis[0])
            [primary_redis_ip, port] = external_redis[0].split(":")
            ray._private.services.wait_for_redis_to_start(
                primary_redis_ip, port, ***
            )
            ray_params.external_addresses = external_redis
            ray_params.num_redis_shards = len(external_redis) - 1
        # Try to get node IP address with the parameters.
        if ray_params.node_ip_address:
            node_ip_address = ray_params.node_ip_address
        elif ray_params.redis_address:
            node_ip_address = ray.util.get_node_ip_address(ray_params.redis_address)
        else:
            node_ip_address = ray.util.get_node_ip_address()
        self._node_ip_address = node_ip_address
        if ray_params.raylet_ip_address:
            raylet_ip_address = ray_params.raylet_ip_address
        else:
            raylet_ip_address = node_ip_address
        if raylet_ip_address != node_ip_address and (not connect_only or head):
            raise ValueError(
                "The raylet IP address should only be different than the node "
                "IP address when connecting to an existing raylet; i.e., when "
                "head=False and connect_only=True."
            )
        if (
            ray_params._system_config
            and len(ray_params._system_config) > 0
            and (not head and not connect_only)
        ):
            raise ValueError(
                "System config parameters can only be set on the head node."
            )
        self._raylet_ip_address = raylet_ip_address
        ray_params.update_if_absent(
            include_log_monitor=True,
            resources={},
            temp_dir=ray._private.utils.get_ray_temp_dir(),
            worker_path=os.path.join(
                os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py"
            ),
            setup_worker_path=os.path.join(
                os.path.dirname(os.path.abspath(__file__)),
                f"workers/{ray_constants.SETUP_WORKER_FILENAME}",
            ),
        )
        self._resource_spec = None
        self._localhost = socket.gethostbyname("localhost")
        self._ray_params = ray_params
        self._config = ray_params._system_config or {}
        # Configure log rotation parameters.
        self.max_bytes = int(
            os.getenv("RAY_ROTATION_MAX_BYTES", ray_constants.LOGGING_ROTATE_BYTES)
        )
        self.backup_count = int(
            os.getenv(
                "RAY_ROTATION_BACKUP_COUNT", ray_constants.LOGGING_ROTATE_BACKUP_COUNT
            )
        )
        assert self.max_bytes >= 0
        assert self.backup_count >= 0
        self._redis_address = ray_params.redis_address
        if head:
            ray_params.update_if_absent(num_redis_shards=1)
        self._gcs_address = ray_params.gcs_address
        self._gcs_client = None
        if not self.head:
            self.validate_ip_port(self.address)
            self.get_gcs_client()
        # Register the temp dir.
        if head:
            # date including microsecond
            date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
            self.session_name = f"session_{date_str}_{os.getpid()}"
        else:
            session_name = ray._private.utils.internal_kv_get_with_retry(
                self.get_gcs_client(),
                "session_name",
                ray_constants.KV_NAMESPACE_SESSION,
                num_retries=NUM_REDIS_GET_RETRIES,
            )
            self.session_name = ray._private.utils.decode(session_name)
            # setup gcs client
            self.get_gcs_client()
        # Initialize webui url
        if head:
            self._webui_url = None
        else:
            self._webui_url = ray._private.services.get_webui_url_from_internal_kv()
        self._init_temp()
        # Validate and initialize the persistent storage API.
        storage._init_storage(ray_params.storage, is_head=head)
        # If it is a head node, try validating if
        # external storage is configurable.
        if head:
            self.validate_external_storage()
        if connect_only:
            # Get socket names from the configuration.
            self._plasma_store_socket_name = ray_params.plasma_store_socket_name
            self._raylet_socket_name = ray_params.raylet_socket_name
            # If user does not provide the socket name, get it from Redis.
            if (
                self._plasma_store_socket_name is None
                or self._raylet_socket_name is None
                or self._ray_params.node_manager_port is None
            ):
                # Get the address info of the processes to connect to
                # from Redis or GCS.
                node_info = ray._private.services.get_node_to_connect_for_driver(
                    self.redis_address,
                    self.gcs_address,
                    self._raylet_ip_address,
                    redis_password=self.redis_password,
                )
                self._plasma_store_socket_name = node_info.object_store_socket_name
                self._raylet_socket_name = node_info.raylet_socket_name
                self._ray_params.node_manager_port = node_info.node_manager_port
        else:
            # If the user specified a socket name, use it.
            self._plasma_store_socket_name = self._prepare_socket_file(
                self._ray_params.plasma_store_socket_name, default_prefix="plasma_store"
            )
            self._raylet_socket_name = self._prepare_socket_file(
                self._ray_params.raylet_socket_name, default_prefix="raylet"
            )
        self.metrics_agent_port = self._get_cached_port(
            "metrics_agent_port", default_port=ray_params.metrics_agent_port
        )
        self._metrics_export_port = self._get_cached_port(
            "metrics_export_port", default_port=ray_params.metrics_export_port
        )
        ray_params.update_if_absent(
            metrics_agent_port=self.metrics_agent_port,
            metrics_export_port=self._metrics_export_port,
        )
        # Pick a GCS server port.
        if head:
            gcs_server_port = os.getenv(ray_constants.GCS_PORT_ENVIRONMENT_VARIABLE)
            if gcs_server_port:
                ray_params.update_if_absent(gcs_server_port=gcs_server_port)
            if ray_params.gcs_server_port is None or ray_params.gcs_server_port == 0:
                ray_params.gcs_server_port = self._get_cached_port("gcs_server_port")
        if not connect_only and spawn_reaper and not self.kernel_fate_share:
            self.start_reaper_process()
        if not connect_only:
            self._ray_params.update_pre_selected_port()
        # Start processes.
        if head:
            self.start_head_processes()
            # Make sure GCS is up.
            self.get_gcs_client().internal_kv_put(
                b"session_name",
                self.session_name.encode(),
                True,
                ray_constants.KV_NAMESPACE_SESSION,
            )
            self.get_gcs_client().internal_kv_put(
                b"session_dir",
                self._session_dir.encode(),
                True,
                ray_constants.KV_NAMESPACE_SESSION,
            )
            self.get_gcs_client().internal_kv_put(
                b"temp_dir",
                self._temp_dir.encode(),
                True,
                ray_constants.KV_NAMESPACE_SESSION,
            )
            # Add tracing_startup_hook to redis / internal kv manually
            # since internal kv is not yet initialized.
            if ray_params.tracing_startup_hook:
                self.get_gcs_client().internal_kv_put(
                    b"tracing_startup_hook",
                    ray_params.tracing_startup_hook.encode(),
                    True,
                    ray_constants.KV_NAMESPACE_TRACING,
                )
        if not connect_only:
            self.start_ray_processes()
            # we should update the address info after the node has been started
            try:
                ray._private.services.wait_for_node(
                    self.redis_address,
                    self.gcs_address,
                    self._plasma_store_socket_name,
                    self.redis_password,
                )
            except TimeoutError:
>               raise Exception(
                    "The current node has not been updated within 30 "
                    "seconds, this could happen because of some of "
                    "the Ray processes failed to startup."
                )
E               Exception: The current node has not been updated within 30 seconds, this could happen because of some of the Ray processes failed to startup.
C:\Miniconda\envs\modin\lib\site-packages\ray\node.py:311: Exception
---------- coverage: platform win32, python 3.8.13-final-0 -----------
Coverage XML written to file coverage.xml
=========================== short test summary info ===========================
FAILED modin/pandas/test/dataframe/test_udf.py::test_agg_dict - Exception: Th...
===== 1 failed, 225 passed, 8 xfailed, 504 warnings in 111.10s (0:01:51) ======

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 25

Commits related to this issue

Most upvoted comments

@pyrito I just got another one of these failures on test-windows (not test-compat-win, the one you modified). Should we switch to 1 pytest thread on test-windows, too? https://github.com/modin-project/modin/actions/runs/3141086969/jobs/5103182790

I wonder if that would improve if one would try any of the following: a) remove parallel test runs by dropping -n 2 option of pytest b) limit amount of cpus used by each instance by setting MODIN_CPUS=1 c) increase amount of Ray workers beyond “1 for each core”

As right now if you run a single Ray server for multiple instances, they start to compete with each other for available workers, and tests are facing contention instead of being sped up by parallelism.

Since adding a short sleep after shutdown I have not seen the same errors. Maybe worth a try.

here is a discussion of another case where CreateFileMapping returns a 1450 error. The chain of calls comes from fake_mmap in object_manager/plasma, and there seem to be more options at play on non-windows.