Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][core] GcsClient async binding, aka remove PythonGcsClient. #45289

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

rynewang
Copy link
Contributor

@rynewang rynewang commented May 13, 2024

This PR is a proof of concept that, we can do bindings for Async C++ APIs. Specifically, we can wrap a C++ ray::gcs::GcsClient Async APIs (by callbacks) into Python Async APIs (by async/await).

Why?

On Python gRPC, we now have a very complicated and inconsistent way. We have:

  • Python grpcio based Client.
  • Python `grpcio.aio`` based Client and Servers,
  • Python GcsClient -> C++ PythonGcsClient -> C++ grpc::Channel,
  • Python GcsAioClient -> thread pool executor -> Python GcsClient -> C++ PythonGcsClient -> C++ grpc::Channel,
  • Python Gcs.*Subscriber (sync)
  • Python GcsAio.*Subscriber (async)

All of them talking to the GCS with more or less similar but subtly different APIs. This introduces maintenance overhead, makes debugging harder, and makes it harder to add new features.

Beyond Python, all these APIs are also having slightly different semantics than the C++ GcsClient itself as used by core_worker C++ code. For example,

  1. in _raylet.pyx we liberally added many _auto_reconnect to APIs. This applies to Python GcsClient and GcsAioClient, but not to C++ GcsClient or the Python subscribers. If we tweaked retry counts to "happen to work", it only works for the python code but not core worker code.
  2. in PythonGcsClient::Connect we retry several times, each time recreating a GcsChannel. This is supposed to "make reconnection" faster by not waiting in the grpc-internal backoff. But this is not applied in C++ GcsClient or the Python subscribers. In fact, in C++ GcsClient, we don't manage the channel at all. We use the Ray-wide GcsRpcClient to manage it. Indeed, if we wanna "fast recreate" channels, we may want it to be consistenly applied to all clients.
  3. in Python GcsClient, we have a method get_all_node_info that forwards the RPC. However we also have a self._gcs_node_info_stub.GetAllNodeInfo call in node_head.py, because they want the full reply whereas the Python GcsClient method only returns partial data the original caller wanted.
  4. ...and more.

What's blocking us?

Async. Cython is not known to be good at binding async APIs. We have a few challenges:

  1. We need to invoke Python functions in C++ callbacks. This involves a C++ callback class to hold a Python object with all its implications. Today's Ray C++ code base is largely CPython-free.
    1. Cython reference counting. In experimenting I accidentally decreased a PyObject's refcount to -9.
    2. GIL. We need to properly hold and release the locks, requires careful coding and testing.
  2. Event loops. In C++ (asio) loop we received the callback, but the python awaiter is waiting in the Python event loop. We need to play with asyncio futures and event loops.
  3. (Still risky) Types. C++ callbacks receive C++ Protobuf messages, and Python callbacks receive Python Protobuf messages or Python dicts. We can:
    1. Serialize the C++ Protobuf to string, and deserialize it in Python. This is the approach I chose in this PR.
    2. Bind all C++ protobuf types' fields as .pxd methods (readonly is fine.)

What's in this PR?

A simple "MyGcsClient" that wraps the C++ GcsClient. It has only 1 method to asynchronously return a "next job id". See this:

import ray
from ray._raylet import JobID

ray.init()

x = ray._raylet.my_gcs_client()
import asyncio
async def f():
    wrapper = x.get_next_job_id()
    fut = wrapper.future
    bs = await fut
    job_id = JobID(bs)
    print(job_id)
asyncio.run(f())

What's next?

In P2 (not urgent), we need to evaluate if this is something worth proceeding. We need to answer: (with my current answers)

Q: In endgame, what's benefit?
A: Removal of all bindings in ##Why? section above, with a single API consistent with C++ GcsClient. With a notable exception: we probably don't want to bind async servers (needs more experiment, risky).

Q: User visible?
A: No. This is a refactor.

Q: Risk?
A: Types and perf costs. I think the async game is derisked.

Q: Effort, large or small?
A: Large. The binding itself is OK-ish, but there are so many callsites to change.

Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
@rynewang
Copy link
Contributor Author

rynewang commented May 23, 2024

Status Report

I made it to a point that it can replace all of the existing GcsAioClient, and the internal_kv_* part of the GcsClient, with some incompatibilities.

Architecture

Gcs[Aio]Client -> Python MyGcsClient -> cython binding to C++ gcs::GcsClient -> invokes RPCs on ASIO event loops

On completion,

  • (async) C++ Callback -> python_callbacks.h converter -> python callback to send the py obj to asyncio thread -> post processing (e.g. exception raising)
  • (sync) C++ return value -> post processing (e.g. exception raising)

Note on async: the C++ GcsClient requires a boost asio event loop (instrumented_io_context) to work. But Python API needs GcsClient even without a running Ray worker. So I implemented a singletonIoContext specifically for all GcsClients created from python side, if not retrieved from worker.

Status Quo

These async methods are supported:

def async_internal_kv_get(self, c_string key, namespace=None, timeout=None) -> Future[Optional[bytes]]:
def async_internal_kv_multi_get(self, keys: List[bytes], namespace=None, timeout=None) -> Future[Dict[bytes,bytes]]:
def async_internal_kv_put(self, c_string key, c_string value, c_bool overwrite=False,
def async_internal_kv_del(self, c_string key, c_bool del_by_prefix,
def async_internal_kv_keys(self, c_string prefix, namespace=None, timeout=None) -> Future[List[bytes]]:
def async_internal_kv_exists(self, c_string key, namespace=None, timeout=None) -> Future[bool]:
def async_check_alive(
def async_get_next_job_id(self) -> Future[JobID]:
def async_get_all_job_info(self) -> Future[Dict[str, gcs_pb2.JobTableData]]:

That's all what GcsAioClient needs.

These sync methods are supported:

# ctor
def from_core_worker() -> "MyGcsClient":
def standalone(gcs_address: str, cluster_id: str = None) -> "MyGcsClient":

# properties
def address(self) -> str:
def cluster_id(self) -> ray.ClusterID:

# methods
def internal_kv_get(self, c_string key, namespace=None, timeout=None) -> Optional[bytes]:
def internal_kv_multi_get(self, keys: List[bytes], namespace=None, timeout=None) -> Dict[bytes,bytes]:
def internal_kv_put(self, c_string key, c_string value, c_bool overwrite=False,
def internal_kv_del(self, c_string key, c_bool del_by_prefix,
def internal_kv_keys(self, c_string prefix, namespace=None, timeout=None) -> List[bytes]:
def internal_kv_exists(self, c_string key, namespace=None, timeout=None) -> bool:
def check_alive(self, node_ips: List[bytes], timeout: Optional[float] = None) -> List[bool]:

Demo

In this PR, I replaced GcsAioClient and (part of) GcsClient implementations to the new binding. I tweaked test_basic_2.py and test_gcs_utils.py to pass.

Incompatibilities

There are many nuanced API differences between the PythonGcsClient and the C++ GcsClient.
In this PR I randomly "fixed" some of the issues, but we may really wanna keep the interface
consistent with older versions so we must revisit:

  • timeout: PythonGcsClient returns RpcError, C++ GcsClient returns TimedOut -> GetTimeoutError
  • internal_kv_get not found: PythonGcsClient returns KeyError, C++ GcsClient returns NotFound
  • C++ GcsClient Del() had arg del_by_prefix but did not really support it (bug)
  • bad test infra in stop_gcs_server
  • maybe more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants