modin: TEST: Flaky test: TimeoutError from Ray not starting up on Windows
Here is a sample failure from modin/pandas/test/dataframe/test_udf.py
tested on ray on windows. Near the end of the error we get Exception: The current node has not been updated within 30 seconds, this could happen because of some of the Ray processes failed to startup.
. This might be another reason to have a ray-cluster running across multiple tests instead of starting a new one in each test, as @devin-petersohn suggested here.
Show stack trace
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
modin\pandas\test\utils.py:870: in create_test_dfs
post_fn, [pd.DataFrame(*args, **kwargs), pandas.DataFrame(*args, **kwargs)]
modin\logging\logger_metaclass.py:68: in log_wrap
return method(*args, **kwargs)
modin\pandas\dataframe.py:114: in __init__
Engine.subscribe(_update_engine)
modin\config\pubsub.py:213: in subscribe
callback(cls)
modin\pandas\__init__.py:124: in _update_engine
initialize_ray()
modin\core\execution\ray\common\utils.py:210: in initialize_ray
ray.init(**ray_init_kwargs)
C:\Miniconda\envs\modin\lib\site-packages\ray\_private\client_mode_hook.py:105: in wrapper
return func(*args, **kwargs)
C:\Miniconda\envs\modin\lib\site-packages\ray\worker.py:1022: in init
_global_node = ray.node.Node(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <ray.node.Node object at 0x00000298FA890FA0>
ray_params = <ray._private.parameter.RayParams object at 0x00000298F96996D0>
head = True, shutdown_at_exit = False, spawn_reaper = True, connect_only = False
def __init__(
self,
ray_params,
head=False,
shutdown_at_exit=True,
spawn_reaper=True,
connect_only=False,
):
"""Start a node.
Args:
ray_params (ray.params.RayParams): The parameters to use to
configure the node.
head (bool): True if this is the head node, which means it will
start additional processes like the Redis servers, monitor
processes, and web UI.
shutdown_at_exit (bool): If true, spawned processes will be cleaned
up if this process exits normally.
spawn_reaper (bool): If true, spawns a process that will clean up
other spawned processes if this process dies unexpectedly.
connect_only (bool): If true, connect to the node without starting
new processes.
"""
if shutdown_at_exit:
if connect_only:
raise ValueError(
"'shutdown_at_exit' and 'connect_only' cannot both be true."
)
self._register_shutdown_hooks()
self.head = head
self.kernel_fate_share = bool(
spawn_reaper and ray._private.utils.detect_fate_sharing_support()
)
self.all_processes = {}
self.removal_lock = threading.Lock()
# Set up external Redis when `RAY_REDIS_ADDRESS` is specified.
redis_address_env = os.environ.get("RAY_REDIS_ADDRESS")
if ray_params.external_addresses is None and redis_address_env is not None:
external_redis = redis_address_env.split(",")
# Reuse primary Redis as Redis shard when there's only one
# instance provided.
if len(external_redis) == 1:
external_redis.append(external_redis[0])
[primary_redis_ip, port] = external_redis[0].split(":")
ray._private.services.wait_for_redis_to_start(
primary_redis_ip, port, ***
)
ray_params.external_addresses = external_redis
ray_params.num_redis_shards = len(external_redis) - 1
# Try to get node IP address with the parameters.
if ray_params.node_ip_address:
node_ip_address = ray_params.node_ip_address
elif ray_params.redis_address:
node_ip_address = ray.util.get_node_ip_address(ray_params.redis_address)
else:
node_ip_address = ray.util.get_node_ip_address()
self._node_ip_address = node_ip_address
if ray_params.raylet_ip_address:
raylet_ip_address = ray_params.raylet_ip_address
else:
raylet_ip_address = node_ip_address
if raylet_ip_address != node_ip_address and (not connect_only or head):
raise ValueError(
"The raylet IP address should only be different than the node "
"IP address when connecting to an existing raylet; i.e., when "
"head=False and connect_only=True."
)
if (
ray_params._system_config
and len(ray_params._system_config) > 0
and (not head and not connect_only)
):
raise ValueError(
"System config parameters can only be set on the head node."
)
self._raylet_ip_address = raylet_ip_address
ray_params.update_if_absent(
include_log_monitor=True,
resources={},
temp_dir=ray._private.utils.get_ray_temp_dir(),
worker_path=os.path.join(
os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py"
),
setup_worker_path=os.path.join(
os.path.dirname(os.path.abspath(__file__)),
f"workers/{ray_constants.SETUP_WORKER_FILENAME}",
),
)
self._resource_spec = None
self._localhost = socket.gethostbyname("localhost")
self._ray_params = ray_params
self._config = ray_params._system_config or {}
# Configure log rotation parameters.
self.max_bytes = int(
os.getenv("RAY_ROTATION_MAX_BYTES", ray_constants.LOGGING_ROTATE_BYTES)
)
self.backup_count = int(
os.getenv(
"RAY_ROTATION_BACKUP_COUNT", ray_constants.LOGGING_ROTATE_BACKUP_COUNT
)
)
assert self.max_bytes >= 0
assert self.backup_count >= 0
self._redis_address = ray_params.redis_address
if head:
ray_params.update_if_absent(num_redis_shards=1)
self._gcs_address = ray_params.gcs_address
self._gcs_client = None
if not self.head:
self.validate_ip_port(self.address)
self.get_gcs_client()
# Register the temp dir.
if head:
# date including microsecond
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S_%f")
self.session_name = f"session_{date_str}_{os.getpid()}"
else:
session_name = ray._private.utils.internal_kv_get_with_retry(
self.get_gcs_client(),
"session_name",
ray_constants.KV_NAMESPACE_SESSION,
num_retries=NUM_REDIS_GET_RETRIES,
)
self.session_name = ray._private.utils.decode(session_name)
# setup gcs client
self.get_gcs_client()
# Initialize webui url
if head:
self._webui_url = None
else:
self._webui_url = ray._private.services.get_webui_url_from_internal_kv()
self._init_temp()
# Validate and initialize the persistent storage API.
storage._init_storage(ray_params.storage, is_head=head)
# If it is a head node, try validating if
# external storage is configurable.
if head:
self.validate_external_storage()
if connect_only:
# Get socket names from the configuration.
self._plasma_store_socket_name = ray_params.plasma_store_socket_name
self._raylet_socket_name = ray_params.raylet_socket_name
# If user does not provide the socket name, get it from Redis.
if (
self._plasma_store_socket_name is None
or self._raylet_socket_name is None
or self._ray_params.node_manager_port is None
):
# Get the address info of the processes to connect to
# from Redis or GCS.
node_info = ray._private.services.get_node_to_connect_for_driver(
self.redis_address,
self.gcs_address,
self._raylet_ip_address,
redis_password=self.redis_password,
)
self._plasma_store_socket_name = node_info.object_store_socket_name
self._raylet_socket_name = node_info.raylet_socket_name
self._ray_params.node_manager_port = node_info.node_manager_port
else:
# If the user specified a socket name, use it.
self._plasma_store_socket_name = self._prepare_socket_file(
self._ray_params.plasma_store_socket_name, default_prefix="plasma_store"
)
self._raylet_socket_name = self._prepare_socket_file(
self._ray_params.raylet_socket_name, default_prefix="raylet"
)
self.metrics_agent_port = self._get_cached_port(
"metrics_agent_port", default_port=ray_params.metrics_agent_port
)
self._metrics_export_port = self._get_cached_port(
"metrics_export_port", default_port=ray_params.metrics_export_port
)
ray_params.update_if_absent(
metrics_agent_port=self.metrics_agent_port,
metrics_export_port=self._metrics_export_port,
)
# Pick a GCS server port.
if head:
gcs_server_port = os.getenv(ray_constants.GCS_PORT_ENVIRONMENT_VARIABLE)
if gcs_server_port:
ray_params.update_if_absent(gcs_server_port=gcs_server_port)
if ray_params.gcs_server_port is None or ray_params.gcs_server_port == 0:
ray_params.gcs_server_port = self._get_cached_port("gcs_server_port")
if not connect_only and spawn_reaper and not self.kernel_fate_share:
self.start_reaper_process()
if not connect_only:
self._ray_params.update_pre_selected_port()
# Start processes.
if head:
self.start_head_processes()
# Make sure GCS is up.
self.get_gcs_client().internal_kv_put(
b"session_name",
self.session_name.encode(),
True,
ray_constants.KV_NAMESPACE_SESSION,
)
self.get_gcs_client().internal_kv_put(
b"session_dir",
self._session_dir.encode(),
True,
ray_constants.KV_NAMESPACE_SESSION,
)
self.get_gcs_client().internal_kv_put(
b"temp_dir",
self._temp_dir.encode(),
True,
ray_constants.KV_NAMESPACE_SESSION,
)
# Add tracing_startup_hook to redis / internal kv manually
# since internal kv is not yet initialized.
if ray_params.tracing_startup_hook:
self.get_gcs_client().internal_kv_put(
b"tracing_startup_hook",
ray_params.tracing_startup_hook.encode(),
True,
ray_constants.KV_NAMESPACE_TRACING,
)
if not connect_only:
self.start_ray_processes()
# we should update the address info after the node has been started
try:
ray._private.services.wait_for_node(
self.redis_address,
self.gcs_address,
self._plasma_store_socket_name,
self.redis_password,
)
except TimeoutError:
> raise Exception(
"The current node has not been updated within 30 "
"seconds, this could happen because of some of "
"the Ray processes failed to startup."
)
E Exception: The current node has not been updated within 30 seconds, this could happen because of some of the Ray processes failed to startup.
C:\Miniconda\envs\modin\lib\site-packages\ray\node.py:311: Exception
---------- coverage: platform win32, python 3.8.13-final-0 -----------
Coverage XML written to file coverage.xml
=========================== short test summary info ===========================
FAILED modin/pandas/test/dataframe/test_udf.py::test_agg_dict - Exception: Th...
===== 1 failed, 225 passed, 8 xfailed, 504 warnings in 111.10s (0:01:51) ======
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 25
Commits related to this issue
- TEST-#4562: Use local Ray cluster in CI to resolve flaky test-compat-win Signed-off-by: Karthik Velayutham <vkarthik@ponder.io> — committed to pyrito/modin by deleted user 2 years ago
- TEST-#4562: Use local Ray cluster in CI to resolve flaky `test-compat-win` (#5007) Signed-off-by: Karthik Velayutham <vkarthik@ponder.io> — committed to modin-project/modin by deleted user 2 years ago
- TEST-#4562: Keep trying to start test-windows ray cluster until we succeed. Signed-off-by: mvashishtha <mahesh@ponder.io> — committed to mvashishtha/modin by deleted user 2 years ago
- TEST-#4562: In windows CI, try to start ray a few times (#5101) Signed-off-by: mvashishtha <mahesh@ponder.io> — committed to modin-project/modin by mvashishtha 2 years ago
@pyrito I just got another one of these failures on test-windows (not test-compat-win, the one you modified). Should we switch to 1 pytest thread on test-windows, too? https://github.com/modin-project/modin/actions/runs/3141086969/jobs/5103182790
I wonder if that would improve if one would try any of the following: a) remove parallel test runs by dropping
-n 2
option ofpytest
b) limit amount of cpus used by each instance by settingMODIN_CPUS=1
c) increase amount of Ray workers beyond “1 for each core”As right now if you run a single Ray server for multiple instances, they start to compete with each other for available workers, and tests are facing contention instead of being sped up by parallelism.
Since adding a short
sleep
aftershutdown
I have not seen the same errors. Maybe worth a try.here is a discussion of another case where
CreateFileMapping
returns a 1450 error. The chain of calls comes fromfake_mmap
inobject_manager/plasma
, and there seem to be more options at play on non-windows.