Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][accelerated DAGs] Assertion check fails when driver exits during teardown #45061

Closed
stephanie-wang opened this issue Apr 30, 2024 · 0 comments · Fixed by #45286
Closed
Assignees
Labels
accelerated-dag bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks

Comments

@stephanie-wang
Copy link
Contributor

stephanie-wang commented Apr 30, 2024

What happened + What you expected to happen

  1. Create and run DAG for several iterations
  2. During a DAG execution, one actor sys.exits
  3. Driver's monitor thread calls dag.teardown()
  4. Driver exits -> assertion check failure on destruction
[2024-04-30 12:03:23,220 C 50426 50426] experimental_mutable_object_manager.cc:169:  Check failed: (_left_ == _right_)  -1 vs 0
*** StackTrace Information ***
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(+0xfec7ba) [0x717b675ce7ba] ray::operator<<()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(+0xfee077) [0x717b675d0077] ray::SpdLogMessage::Flush()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x37) [0x717b675d0517] ray::RayLog::~RayLog()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(_ZN3ray12experimental20MutableObjectManager17DestroySemaphoresERKNS_8ObjectIDE+0x517) [0x717b66d864d7] ray::experimental::MutableObjectManager::DestroySemaphores()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(+0x7a49dc) [0x717b66d869dc] ray::experimental::MutableObjectManager::~MutableObjectManager()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(+0x62901a) [0x717b66c0b01a] std::_Sp_counted_base<>::_M_release()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorkerD1Ev+0x441) [0x717b66d42381] ray::core::CoreWorker::~CoreWorker()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(+0x62901a) [0x717b66c0b01a] std::_Sp_counted_base<>::_M_release()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(_ZN3ray4core21CoreWorkerProcessImpl14ShutdownDriverEv+0x17c) [0x717b66d8106c] ray::core::CoreWorkerProcessImpl::ShutdownDriver()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(_ZN3ray4core17CoreWorkerProcess8ShutdownEv+0x2b) [0x717b66d8116b] ray::core::CoreWorkerProcess::Shutdown()
/home/ray/anaconda3/lib/python3.9/site-packages/ray/_raylet.so(+0x5b4d34) [0x717b66b96d34] __pyx_pw_3ray_7_raylet_10CoreWorker_3shutdown_driver()
python() [0x4fc658] method_vectorcall_NOARGS
python(_PyEval_EvalFrameDefault+0x686) [0x4e7bf6] _PyEval_EvalFrameDefault
python() [0x4e666a] _PyEval_EvalCode
python(_PyFunction_Vectorcall+0xd5) [0x4f79a5] _PyFunction_Vectorcall
python(_PyEval_EvalFrameDefault+0x3764) [0x4eacd4] _PyEval_EvalFrameDefault
python() [0x4e666a] _PyEval_EvalCode
python(_PyFunction_Vectorcall+0xd5) [0x4f79a5] _PyFunction_Vectorcall
python() [0x5de3b6] atexit_callfuncs
python() [0x5c0a3e] call_py_exitfuncs
python(Py_FinalizeEx+0x4d) [0x5c069d] Py_FinalizeEx
python(Py_RunMain+0x110) [0x5b3910] Py_RunMain
python(Py_BytesMain+0x39) [0x587199] Py_BytesMain
/usr/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf3) [0x717c1c566083] __libc_start_main
python() [0x58704e]

Versions / Dependencies

3.0dev

Reproduction script

# coding: utf-8
import logging
import os
import sys
import torch
import pickle
import io
import sys

import pytest

import ray
from ray.air._internal import torch_utils
import ray.cluster_utils
from ray.dag import InputNode
from ray.tests.conftest import *  # noqa

from ray.experimental.channel.torch_tensor_type import (
    TorchTensorType,
)


# 100MiB tensor.
SHAPE = (50_000_000,)
DTYPE = torch.float16

@ray.remote
class TorchTensorWorker:
    def __init__(self, fail_after_num_sends=-1):
        self.device = torch_utils.get_devices()[0]
        self.t = None

        self.num_sends = 0
        self.fail_after_num_sends = fail_after_num_sends
        self.received = torch.zeros(10, device=self.device)
        self.received_idx = 0

    def send(self, shape, dtype, value: int):
        if self.fail_after_num_sends > 0 and self.num_sends >= self.fail_after_num_sends:
            sys.exit(1)
        self.num_sends += 1

        if self.t is None:
            self.t = torch.ones(shape, dtype=dtype, device=self.device)
        self.t[0] = value
        return self.t

    def recv(self, tensor):
        if self.received_idx >= len(self.received):
            self.received = torch.cat((self.received, torch.zeros(len(self.received), device=self.device)))
        self.received[self.received_idx] = tensor[0]
        self.received_idx += 1

        assert tensor.device == self.device
        return (tensor[0].item(), tensor.shape, tensor.dtype)

    def get_received(self):
        return self.received.to(device="cpu")


def exec_ray_dag(sender, receiver):
    # Test torch.Tensor sent between actors.
    with InputNode() as inp:
        dag = sender.send.bind(SHAPE, DTYPE, inp)
        dag = dag.with_type_hint(TorchTensorType(SHAPE, DTYPE))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile(buffer_size_bytes=1000_000_000)

    try:
        for i in range(100):
            output_channel = compiled_dag.execute(i)
            # TODO(swang): Replace with fake ObjectRef.
            result = output_channel.begin_read()
            print(result)
            assert result == (i, SHAPE, DTYPE)
            output_channel.end_read()
    except:
        pass
    finally:
        compiled_dag.teardown() 

        print(ray.get(receiver.get_received.remote()))


if __name__ == "__main__":
    sender = TorchTensorWorker.remote(fail_after_num_sends=10)
    receiver = TorchTensorWorker.remote()
    exec_ray_dag(sender, receiver)

Issue Severity

None

@stephanie-wang stephanie-wang added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) core Issues that should be addressed in Ray Core accelerated-dag P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Apr 30, 2024
@jackhumphries jackhumphries self-assigned this Apr 30, 2024
stephanie-wang pushed a commit that referenced this issue May 14, 2024
This fixes #45061.

The core worker and the raylet each have their own MutableObjectManager
instance, and when both a reader and a writer are on the same machine,
the reader and writer will each register the same object with separate
MutableObjectManager instances. Thus, when the second instance is
destructed, it will call sem_unlink() on the same two semaphores. As the
two semaphores have already been unlinked by the first instance, the
sem_unlink() calls in the second instance will both fail with ENOENT.

Prior to this PR, we checked that *all* calls to sem_unlink() return 0,
which is incorrect.

---------

Signed-off-by: Jack Humphries <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accelerated-dag bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants