Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add an example of a bidirectional gRPC call via tonic #404

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ members = [
"template_tinytemplate",
"template_yarte",
"todo",
"tonic-bidirectional",
"udp-echo",
"unix-socket",
"web-cors/backend",
Expand Down
17 changes: 17 additions & 0 deletions tonic-bidirectional/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "tonic-bidirectional"
version = "0.1.0"
authors = ["Lunar"]
edition = "2018"

[dependencies]
tonic = "0.4"
actix = "0.10"
actix-rt = "1.1"
env_logger = "0.8"
log = "0.4"
futures = "0.3"
prost = "0.7"

[build-dependencies]
tonic-build = { version = "0.4" }
8 changes: 8 additions & 0 deletions tonic-bidirectional/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.compile(
&["proto/echo.proto"],
&["proto"],
)?;
Ok(())
}
15 changes: 15 additions & 0 deletions tonic-bidirectional/proto/echo.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

package echo;

service Echo {
rpc Echo(stream EchoRequest) returns (stream EchoReply) {}
}

message EchoRequest {
string payload = 1;
}

message EchoReply {
string payload = 1;
}
42 changes: 42 additions & 0 deletions tonic-bidirectional/src/echo_rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use actix::prelude::*;
use futures::channel::mpsc::Sender;
use log::info;

use crate::EchoReceived;
use crate::grpc_api::{
EchoRequest,
EchoReply,
};

pub struct EchoRpc {
addr: actix::prelude::Recipient<EchoReceived>,
tx: Sender<EchoRequest>,
}

impl Actor for EchoRpc {
type Context = Context<Self>;
}

impl Handler<EchoRequest> for EchoRpc {
type Result = ();

fn handle(&mut self, msg: EchoRequest, ctx: &mut Context<Self>) {
if let Err(_) = self.tx.try_send(msg) {
info!("Sending echo request failed. Stopping.");
ctx.stop();
}
}
}

impl StreamHandler<Result<EchoReply, tonic::Status>> for EchoRpc {
fn handle(&mut self, msg: Result<EchoReply, tonic::Status>, _: &mut Context<Self>) {
match msg {
Ok(msg) => {
self.addr.send(EchoReceived { payload: msg.payload });
}
Err(status) => {
info!("Stream error: {}", status.message());
}
}
}
}
96 changes: 96 additions & 0 deletions tonic-bidirectional/src/echo_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use actix::prelude::*;
use log::{info, error};

use crate::EchoReceived;
use crate::echo_rpc::EchoRpc;
use crate::grpc_api::echo_client::EchoClient;

#[derive(Message)]
#[rtype(result = "Result<Addr<EchoRpc>, RunningEchoFailed>")]
pub struct RunEcho {
pub addr: actix::prelude::Recipient<EchoReceived>,
}

#[derive(Debug)]
pub struct NotConnectedError;

#[derive(Debug)]
pub struct RunningEchoFailed;

pub struct EchoService {
endpoint: String,
client: Option<EchoClient<tonic::transport::Channel>>,
}

impl EchoService {
pub fn new(endpoint: String) -> EchoService {
EchoService {
endpoint,
client: None,
}
}
}

// EchoService should be responsible for connecting on startup and it'll have a RunEcho message

impl Actor for EchoService {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Context<Self>) {
EchoClient::connect(self.endpoint.clone())
.into_actor(self)
.then(|res, act, ctx| {
match res {
Ok(echo_client) => {
act.client = Some(echo_client);
}
Err(err) => {
error!("Unable to connect to echo server {:?}", err);
ctx.stop();
}
}
fut::ready(())
})
.wait(ctx)
}
}

impl Handler<RunEcho> for EchoService {
type Result = ResponseActFuture<Self, Result<Addr<EchoRpc>, RunningEchoFailed>>;

fn handle(&mut self, msg: RunEcho, _ctx: &mut Context<Self>) -> Self::Result {
if let Some(mut client) = &mut self.client {
const OUTBOUND_CHANNEL_BUFFER: usize = 10;
let (tx, rx) = futures::channel::mpsc::channel(OUTBOUND_CHANNEL_BUFFER);

info!("Sending echo RPC!");
Box::pin(
client.echo(tonic::Request::new(rx))
.into_actor(self)
.map(|res, _act, _ctx| {
match res {
Ok(inbound) => {
Ok(EchoRpc::create(|ctx| {
ctx.add_stream(inbound.into_inner());
EchoRpc {
addr: msg.addr,
tx
}
}))
}
Err(_) => {
// XXX: This is not really useful
Err(RunningEchoFailed)
}
}
})
)
} else {
// XXX: do something smart about retrying. maybe ctx.stop()?
error!("Not connected to the echo server");
Box::pin(fut::err(RunningEchoFailed))
}
}
}

impl Supervised for EchoService {}
13 changes: 13 additions & 0 deletions tonic-bidirectional/src/grpc_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use actix::prelude::{
Message as ActixMessage,
};

tonic::include_proto!("echo");

impl ActixMessage for EchoRequest {
type Result = ();
}

impl ActixMessage for EchoReply {
type Result = ();
}
97 changes: 97 additions & 0 deletions tonic-bidirectional/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
mod echo_service;
mod echo_rpc;
mod grpc_api;

use actix::prelude::*;
use log::{error, info};

use crate::echo_service::{EchoService, RunEcho};
use crate::echo_rpc::EchoRpc;

#[derive(Message)]
#[rtype(result = "()")]
pub struct SendEcho {
pub payload: String,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct EchoReceived {
pub payload: String,
}

struct EchoSender {
service: Addr<EchoService>,
echo_rpc: Option<Addr<EchoRpc>>,
}

impl EchoSender {
fn new(service: Addr<EchoService>) -> EchoSender {
EchoSender {
service,
echo_rpc: None,
}
}
}

impl Actor for EchoSender {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Context<Self>) {
self.service.send(RunEcho { addr: ctx.address().recipient() })
.into_actor(self)
.map(|res, act, ctx| {
match res {
Ok(Ok(echo_rpc)) => {
act.echo_rpc = Some(echo_rpc);
}
_ => {
error!("Unable to start echo RPC");
ctx.stop();
}
}
})
.wait(ctx)
}
}

impl Handler<SendEcho> for EchoSender {
type Result = ();

fn handle(&mut self, msg: SendEcho, ctx: &mut Context<Self>) {
info!("Sending echo: {}", msg.payload);
match &self.echo_rpc {
Some(echo_rpc) => {
echo_rpc.do_send(grpc_api::EchoRequest { payload: msg.payload });
}
None => {
// Maybe we could do something smart like trying to (re)connect here.
error!("Not connected!");
ctx.stop();
}
}
}
}

impl Handler<EchoReceived> for EchoSender {
type Result = ();

fn handle(&mut self, msg: EchoReceived, _: &mut Context<Self>) {
info!("EchoSender has just received: {}", msg.payload)
}
}

const ENDPOINT: &str = "http://127.0.0.1:50051";

#[actix_rt::main]
async fn main() {
env_logger::init();

let service = EchoService::new(ENDPOINT.to_string()).start();
let sender = EchoSender::new(service).start();
sender.do_send(SendEcho { payload: "Alpha".to_string() });
sender.do_send(SendEcho { payload: "Beta".to_string() });
sender.do_send(SendEcho { payload: "Gamma".to_string() });

actix_rt::Arbiter::local_join().await;
}