ray: [streaming]Can not run streaming example and test

What is the problem?

Ray version and other system information (Python version, TensorFlow version, OS): Ray: ray-1.1.0.dev0 (master branch 35f7d84dbe0c4c2bfe8117727eeda5256fe2c3a2) Python: 3.8.5 with conda Java: 11.0.8 with Alibaba Dragonwell OS: Ubuntu 20.04 with WSL2

Reproduction (REQUIRED)

I try to use ray streaming API,and write a example follow the test_word_count.py.

import ray
from ray.streaming import StreamingContext
from ray.streaming.config import Config
import time
import logging


logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

if __name__ == "__main__":
    # Get program parameters
    ray.shutdown()
    ray.init(_load_code_from_local=True)

    ctx = StreamingContext.Builder() \
        .option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \
        .build()
    # A Ray streaming environment with the default configuration
    ctx.set_parallelism(1)  # Each operator will be executed by two actors
    ctx.from_values("a", "b", "c") \
        .set_parallelism(1) \
        .flat_map(lambda x: [x, x]) \
        .map(lambda x: (x, 1)) \
        .key_by(lambda x: x[0]) \
        .reduce(lambda old_value, new_value:
                (old_value[0], old_value[1] + new_value[1])) \
        .sink(lambda x: print("result", x))
    ctx.submit("word_count")

and I get the bellow error(replace the home path with ***):

2020-12-14 20:19:34,087 INFO services.py:1208 -- View the Ray dashboard at http://127.0.0.1:8265
(pid=raylet) [2020-12-14 20:19:35,480 C 24239 24239] worker_pool.cc:917:  Check failed: state != states_by_lang_.end() Required Language isn't supported.
(pid=raylet) [2020-12-14 20:19:35,480 E 24239 24239] logging.cc:414: *** Aborted at 1607948375 (unix time) try "date -d @1607948375" if you are using GNU date ***
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414: PC: @                0x0 (unknown)
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414: *** SIGABRT (@0x3e800005eaf) received by PID 24239 (TID 0x7f42d449a800) from PID 24239; stack trace: ***
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x562c88db7537 google::(anonymous namespace)::FailureSignalHandler()
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x7f42d46cc3c0 (unknown)
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x7f42d44e518b gsignal
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x7f42d44c4859 abort
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x562c8887ca53 _ZN3ray6RayLogD2Ev.cold
(pid=raylet) [2020-12-14 20:19:35,481 E 24239 24239] logging.cc:414:     @     0x562c88902e9f ray::raylet::WorkerPool::GetStateForLanguage()
(pid=raylet) [2020-12-14 20:19:35,482 E 24239 24239] logging.cc:414:     @     0x562c88909b77 ray::raylet::WorkerPool::PopWorker()
(pid=raylet) [2020-12-14 20:19:35,482 E 24239 24239] logging.cc:414:     @     0x562c8896179d ray::raylet::NodeManager::DispatchTasks()
(pid=raylet) [2020-12-14 20:19:35,483 E 24239 24239] logging.cc:414:     @     0x562c88963053 ray::raylet::NodeManager::EnqueuePlaceableTask()
(pid=raylet) [2020-12-14 20:19:35,483 E 24239 24239] logging.cc:414:     @     0x562c8896c1c7 ray::raylet::NodeManager::ScheduleTasks()
(pid=raylet) [2020-12-14 20:19:35,484 E 24239 24239] logging.cc:414:     @     0x562c88977fce ray::raylet::NodeManager::SubmitTask()
(pid=raylet) [2020-12-14 20:19:35,484 E 24239 24239] logging.cc:414:     @     0x562c889793b7 ray::raylet::NodeManager::HandleRequestWorkerLease()
(pid=raylet) [2020-12-14 20:19:35,485 E 24239 24239] logging.cc:414:     @     0x562c888d2a76 _ZN5boost4asio6detail18completion_handlerIZN3ray3rpc14ServerCallImplINS4_25NodeManagerServiceHandlerENS4_25RequestWorkerLeaseRequestENS4_23RequestWorkerLeaseReplyEE13HandleRequestEvEUlvE_E11do_completeEPvPNS1_19scheduler_operationERKNS_6system10error_codeEm
(pid=raylet) [2020-12-14 20:19:35,485 E 24239 24239] logging.cc:414:     @     0x562c890efb41 boost::asio::detail::scheduler::do_run_one()
(pid=raylet) [2020-12-14 20:19:35,485 E 24239 24239] logging.cc:414:     @     0x562c890f1281 boost::asio::detail::scheduler::run()
(pid=raylet) [2020-12-14 20:19:35,485 E 24239 24239] logging.cc:414:     @     0x562c890f38fb boost::asio::io_context::run()
(pid=raylet) [2020-12-14 20:19:35,486 E 24239 24239] logging.cc:414:     @     0x562c88898482 main
(pid=raylet) [2020-12-14 20:19:35,486 E 24239 24239] logging.cc:414:     @     0x7f42d44c60b3 __libc_start_main
(pid=raylet) [2020-12-14 20:19:35,486 E 24239 24239] logging.cc:414:     @     0x562c888adf1e _start
Error in sys.excepthook:
Traceback (most recent call last):
  File "***/ray/python/ray/worker.py", line 875, in custom_excepthook
    ray.state.state.add_worker(worker_id, worker_type, worker_info)
  File "***/ray/python/ray/state.py", line 733, in add_worker
    return self.global_state_accessor.add_worker_info(
AttributeError: 'NoneType' object has no attribute 'add_worker_info'

Original exception was:
Traceback (most recent call last):
  File "./test2.py", line 16, in <module>
2020-12-14 20:20:05,461 WARNING worker.py:1050 -- The node with node id 447728161d99bd4896a1a2c719adcc3930806be4 has been marked dead because the detector has missed too many heartbeats from it. This can happen when a raylet crashes unexpectedly or has lagging heartbeats.
    ctx = StreamingContext.Builder() \
  File "***/ray/python/ray/streaming/context.py", line 48, in build
    ctx = StreamingContext()
  File "***/ray/python/ray/streaming/context.py", line 54, in __init__
    self._j_ctx = self._gateway_client.create_streaming_context()
  File "***/ray/python/ray/streaming/runtime/gateway_client.py", line 21, in create_streaming_context
    return deserialize(ray.get(call))
  File "***/ray/python/ray/worker.py", line 1401, in get
    raise value
ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task.

At the begin,I use the ray-1.0.0rc2 and Building Ray from Source.The I get the error and search method to solve it,then I saw the issue StreamingContext Builder error from Ray Streaming Example #11602.So I try to use ray-1.0.1.post1 and the master branch . There are the same error.

And here is another question,I can not success run the steaming test according to the readme.rst.Can not pass the java test HybridStreamTest.java,The problem occurs in the 48-49 line Preconditions.checkArgument(EnvUtil.executeCommand(ImmutableList.of("ray", "stop"), 5)); And can not success run streaming’s pytest at master branch:

===================================================================================== test session starts =====================================================================================
platform linux -- Python 3.8.5, pytest-6.2.0, py-1.10.0, pluggy-0.13.1
rootdir: ***/ray/streaming/python/tests
collected 11 items / 2 errors / 9 selected                                                                                                                                                    

=========================================================================================== ERRORS ============================================================================================
__________________________________________________________________________ ERROR collecting test_direct_transfer.py ___________________________________________________________________________
ImportError while importing test module '***/ray/streaming/python/tests/test_direct_transfer.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/home/tiger/anaconda3/envs/py38-ray-master/lib/python3.8/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
test_direct_transfer.py:6: in <module>
    import ray.streaming._streaming as _streaming
E   ImportError: ***/ray/python/ray/streaming/_streaming.so: undefined symbol: aligned_free
______________________________________________________________________________ ERROR collecting test_operator.py ______________________________________________________________________________
ImportError while importing test module '***/ray/streaming/python/tests/test_operator.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/home/tiger/anaconda3/envs/py38-ray-master/lib/python3.8/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
test_operator.py:2: in <module>
    from ray.streaming import operator
../../../python/ray/streaming/operator.py:8: in <module>
    from ray.streaming.collector import Collector
../../../python/ray/streaming/collector.py:11: in <module>
    from ray.streaming.runtime.transfer import ChannelID, DataWriter
../../../python/ray/streaming/runtime/transfer.py:9: in <module>
    import ray.streaming._streaming as _streaming
E   ImportError: ***/ray/python/ray/streaming/_streaming.so: undefined symbol: aligned_free
=================================================================================== short test summary info ===================================================================================
ERROR test_direct_transfer.py
ERROR test_operator.py
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 2 errors during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
====================================================================================== 2 errors in 0.27s ======================================================================================

the error at ray-1.0.1.post1 and ray-1.0.0rc2 is same:

=================================================================================== short test summary info ===================================================================================
FAILED test_failover.py::test_word_count - ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task.
FAILED test_hybrid_stream.py::test_hybrid_stream - ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task.
FAILED test_stream.py::test_data_stream - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ra...
FAILED test_stream.py::test_key_data_stream - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling...
FAILED test_stream.py::test_stream_config - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling '...
FAILED test_union_stream.py::test_union_stream - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by call...
FAILED test_word_count.py::test_word_count - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling ...
FAILED test_word_count.py::test_simple_word_count - RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by c...

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 1
  • Comments: 22 (17 by maintainers)

Most upvoted comments

I’m also seeing the same issues when trying to leverage cross-language calls from Python to Java as documented at: https://docs.ray.io/en/master/cross-language.html. If I make a simple local code change to force the “ValueError: Cross language feature needs --load-code-from-local to be set” check to pass after setting the code search path via job config in ray.init(…), then I run into the “worker_pool.cc:917: Check failed: state != states_by_lang_.end() Required Language isn’t supported.” error initially reported.