Skip to main content

RPC Variants

Maglev RPC supports streaming and unary data, both in the requesting and response directions. This can be extraordinarily useful in many cases. The API for each is a little different though.

Variants#

There are 4 possible RPC variants in proto3:

service GreeterService {
// Unary
// A single requests is sent, and you get a single response back.
rpc Unary(HelloRequest) returns (HelloResponse);
// Streaming from host
// A single request is sent, but you get a stream of data back.
rpc StreamFromHost(HelloRequest) returns (stream HelloResponse);
// Streaming to host
// A stream of data is sent to the host, and at some point the host will send
// back one message. The timing on the response is application specific
// however, the host might reply immediately, after some delay, or once the
// stream is closed.
rpc StreamToHost(stream HelloRequest) returns (HelloResponse);
// Bi-directional streaming
// A stream is sent to the host, and a stream is returned from the host. Both
// streams are independent in timing and are application specific. For example
// the response stream might return an item for each put into the outbound
// stream, or it might be totally unrelated to the outbound stream.
rpc StreamingBoth(stream HelloRequest) returns (stream HelloResponse);
}

Clients#

Synthesized client code is used a little different depending on unary vs streaming for an RPC. Given the above proto definition we can create a client and invoke the various RPC types like so:

const invokeRpcs = async () => {
const client = GreeterService.createClient(nodeRef);
// Unary, send one message, get one back.
const responsePromise = await client.unary({ yourName: 'Foo' });
// Calling an RPC that streams from a host returns a ChannelReceiver which
// can be asynchronously iterated.
const channelReceiver = client.streamFromHost({ yourName: 'Foo' });
for await (const item of channelReceiver) {
// The result object has either an `err` field, or a `msg` field depending
// on the item type. TypeScript will recognize that msg is defined if err is
// not defined too, which is very helpful.
if (item.err) {
throw err;
}
console.log(item.msg);
}
// Calling an RPC that streams to a host is a little different, you need to
// provide the call with the receiving side of a channel. So we start by
// making channel and destructuring it into sender and receiver.
const [sender, receiver] = channel<HelloRequest>();
// Then we can invoke the RPC which returns a Promise<HelloResponse>. In this
// case we've chosen to ignore the response though. We use the `void` keyword
// to signal that we understand we are ignoring a promise.
void client.StreamToHost(receiver);
// Then you can send data into the channel and finally close it when done.
sender.send({ yourName: 'Foo' });
sender.send({ yourName: 'Bar' });
sender.close();
// Streaming data both directions is just the last two cases combined. In this
// case we'll just send an item every second, forever. Note that you can
// safely enqueue messages into a stream BEFORE passing it to an RPC, they are
// simply buffered until the RPC can transmit them.
const [sender, receiver] = channel<HelloRequest>();
setInterval(() => sender.send({ yourName: 'Beetlejuice' }), 1_000);
// Call the RPC and iterate the resulting stream.
for await (const item of channelReceiver) {
// Shouldn't have said it 3 times...
if (item.err) {
throw new Error('RUUUUUUUN');
}
console.log(item.msg);
}
};
// Client RPC invocation requires awaiting promises (or using `.then()`) which
// is why it's wrapped in an async function.
void invokeRpcs();

Hosting an RPC#

At the other end of the RPC is the 'host', ie the node responding to the RPC call. It's important to keep that fact in mind, as the request and response are logically flipped. Unlike the client API, the host API offers several variations on implementing the same RPC type. Depending on what your use case is, one might be better than another.

The handler object can be either a plain old JS object (which is much easier to deal with in TypeScript because of type inference) or a class instance that implements the <SERVICE-NAME>Handler interface. We will use the latter in these examples.

GreeterService.registerHandler(maglev, {
unary: async function (req, ctx) {
return { response: `Go away ${req.yourName}` };
},
// Other RPC implementations...
streamingBoth: async function* (req, ctx) {
... SNIP ...
},
// All are optional though, you only need to implement the ones you want.
// Calling a non-implemented RPC results in an error on the client side.
});

As mentioned before, RPC handlers for each type come in a few variants.

Unary#

Unary RPC handlers need to either return a value directly, or a Promise<T>.

GreeterService.registerHandler(maglev, {
// Non async, result must be immediately returned.
unary: function (req, ctx) {
return { response: `Go away ${req.yourName}` };
},
});
GreeterService.registerHandler(maglev, {
// Async, which returns a Promise<HelloResponse>. You can use `await` in the
// handler.
unary: async function (req, ctx) {
await sleep(1_000);
return { response: `Go away ${req.yourName}` };
},
});

Streaming From Host#

Streaming from the host means the handler is returning a stream of data to the client. This can be achieved in one of 3 ways.

By returning a ChannelReceiver<T>, which is often useful if the data is being "generated" elsewhere in code. The ChannelReceiver<T> can be returned directly, or as a Promise<ChannelReceiver<T>>.

GreeterService.registerHandler(maglev, {
streamFromHost: function (req, ctx) {
const [sender, receiver] = channel<HelloResponse>();
// Send some junk into the channel.
setInterval(() => sender.send({ response: 'Hi hi' }), 1_000);
// And return the channel receiver.
return receiver;
},
});
GreeterService.registerHandler(maglev, {
streamFromHost: async function (req, ctx) {
return await getSomeChannelReceiverThatTakesAWhile();
},
});

If the data is being generated directly though, it's often more useful to use the ES6 async generator API.

GreeterService.registerHandler(maglev, {
// Note the "*" at the end of the function. This turns it into an ES6
// generator.
streamFromHost: async function* (req, ctx) {
for (let i = 0; i < 3; i++) {
await sleep(1_000);
// Each `yield` will result in an item being sent back to the client.
yield { response: `Number ${i}` };
}
// Returning from the generator signifies the end of the stream.
},
});

Streaming To Host#

Streaming to the host is a little less interesting, the req parameter is just a ChannelReceiver<T>.

GreeterService.registerHandler(maglev, {
// Note the "*" at the end of the function. This turns it into an ES6
// generator.
streamToHost: async function (req, ctx) {
for await (const item of req) {
if (item.err) {
return;
}
// Do something with item.msg
}
},
});

Bi-directional Streaming#

Bi-directional streaming has the same variants as streaming from the host, ie. you can either return a ChannelReceiver<T> or use an ES6 async generator. For example...

GreeterService.registerHandler(maglev, {
streamingBoth: async function* (req, ctx) {
let count = 0;
for await (const item of req) {
if (item.err) {
return;
}
count++;
if (count === 3) {
throw new Error('Look behind you...');
}
yield { response: `Say it ${count - 3} more times, I dare you...` };
}
},
});