Thursday 2 December 2021

Streaming with gRPC on .NET

 I just couldn’t stop myself to dig more into gRPC on .NET, I found it really interesting and fun in learning. In my last article here gRPC on .NET, I have demonstrated about “How to create gRPC API in .NET 6”. I feel that one is good article to start with for gRPC learning.

Here I’m going to demonstrate the different types of service methods which gRPC supports for API and these are:

1. Unary RPCs : where the client sends a single request to a server and gets a single response, as demonstrate in my previous article. i.e.

rpc GetWeatherForecastForDate (google.protobuf.Timestamp) returns (WeatherForecastReply);

2. Server streaming RPCs: where the client sends a request to the server and gets a stream to read a sequence of messages back. The client reads from the returned stream until there are no more messages. gRPC guarantees message ordering within an individual RPC call. i.e.

rpc GetWeatherForecastStream (google.protobuf.Empty) returns (stream WeatherForecast);

3. Client streaming RPCs: where the client writes a sequence of messages and sends them to the server, again using a provided stream. Once the client has finished writing the messages, it waits for the server to read them and return its response. Again gRPC guarantees message ordering within an individual RPC call. i.e.

rpc GetWeatherForecastDuplexStream (stream StreamMessage) returns (WeatherForecastReply);

4. Bidirectional streaming RPCs: where both sides send a sequence of messages using a read-write stream. The two streams operate independently, so clients and servers can read and write in whatever order they like: for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes. The order of messages in each stream is preserved.

rpc GetWeatherForecastDuplexStream (stream StreamMessage) returns (stream WeatherForecast);

The service methods which I shown here, I’m going to demonstrate how we can write them and consume them. In my last article I used the weather forecast example hence enhancing the same example here.

In all above example I added a new message type “StreamMessage” so lets include that first in your weatherForcaste.proto file and do the same (copy-paste the proto file) for client project as well.

syntax = "proto3";import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
option csharp_namespace = "GrpcServiceDemo";package weatherForcast;// The weather forecast service definition.
service WeatherForcast {
// Get weather forecast: Unary RPCs
rpc GetWeatherForecast (google.protobuf.Empty) returns (WeatherForecastReply);
// Get weather forecast: Unary RPCs
rpc GetWeatherForecastForDate (google.protobuf.Timestamp) returns (WeatherForecastReply);
// Get weather forecast: Server Streaming RPCs
rpc GetWeatherForecastStream (google.protobuf.Empty) returns (stream WeatherForecast);
// Get weather forecast: Client Streaming RPCs
rpc GetWeatherForecastClientStream (stream StreamMessage) returns (WeatherForecastReply);
// Get weather forecast: Bidirectional streaming RPCs
rpc GetWeatherForecastDuplexStream (stream StreamMessage) returns (stream WeatherForecast);
}// The response message containing the weather information.
message WeatherForecastReply {
repeated WeatherForecast Result = 1;
}
message WeatherForecast {
google.protobuf.Timestamp Date = 1;

int32 TemperatureC = 2;

int32 TemperatureF = 3;

string Summary = 4;
}
message StreamMessage{
int32 index = 1;
}

Since we need a new method to handle other services logic so we need to modify the interface IWeatherForcaseService.cs and WeatherForecastService.cs implementation as below.

IWeatherForcaseService.cs

using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
namespace GrpcServiceDemo.Services
{
public interface IWeatherForecastService
{
Task<WeatherForecastReply> GetWeatherForecast(ServerCallContext context);
Task<WeatherForecastReply> GetWeatherForecastForDate(Timestamp date, ServerCallContext context);WeatherForecast GetWeatherForecast(int index);
}
}

WeatherForecastService.cs

using Grpc.Core;
using Google.Protobuf.WellKnownTypes;
namespace GrpcServiceDemo.Services
{
public class WeatherForecastService : IWeatherForecastService
{
private static readonly string[] Summaries = new[]
{
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
};
private readonly ILogger<WeatherForecastService> _logger;public WeatherForecastService(ILogger<WeatherForecastService> logger)
{
_logger = logger;
}
public Task<WeatherForecastReply> GetWeatherForecast(ServerCallContext context)
{
return Task.FromResult<WeatherForecastReply>(GetWeather());
}
public Task<WeatherForecastReply> GetWeatherForecastForDate(Timestamp date, ServerCallContext context)
{
return Task.FromResult<WeatherForecastReply>(GetWeather(date));
}
public WeatherForecast GetWeatherForecast(int index)
{
return GetWeather(index);
}
private WeatherForecastReply GetWeather()
{
var result = new WeatherForecastReply();
for (var index = 1; index <= 5; index++)
{
result.Result.Add(
GetWeather(index)
);
}
return result;
}
private static WeatherForecast GetWeather(int index)
{
return new WeatherForecast
{
Date = Timestamp.FromDateTime(DateTime.UtcNow.AddDays(index)),
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)],
TemperatureF = (int)(32 + (Random.Shared.Next(-20, 55) / 0.5556))
};
}
private WeatherForecastReply GetWeather(Timestamp date)
{
var result = new WeatherForecastReply();
result.Result.Add(
new WeatherForecast
{
Date = date,
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)],
TemperatureF = (int)(32 + (Random.Shared.Next(-20, 55) / 0.5556))
}
);
return result;
}
}
}

Now lets write the gRPC service implementation for newly defined services as below:

Server Streaming RPCs
rpc GetWeatherForecastStream (google.protobuf.Empty) returns (stream WeatherForecast);

public override async Task GetWeatherForecastStream(Empty request, IServerStreamWriter<WeatherForecast> responseStream, ServerCallContext context)
{
var i = 0;
while(!context.CancellationToken.IsCancellationRequested && i <50)
{
await Task.Delay(1000);
await responseStream.WriteAsync(_weatherForecastService.GetWeatherForecast(i));
i++;
}
}

In above code, server keeps writing to responseStream in every 1 seconds until either cancellation requested from the client or ‘i’ index value is less than 50. responseStream is of type Grpc.Core.IServerStreamWriter<T> used for sending response back to the client, it is a writable stream of the response.

Client Streaming RPCs
rpc GetWeatherForecastClientStream (stream StreamMessage) returns (WeatherForecastReply);

public override async Task<WeatherForecastReply> GetWeatherForecastClientStream(IAsyncStreamReader<StreamMessage> requestStream, ServerCallContext context)
{
var response = new WeatherForecastReply();
while (await requestStream.MoveNext() && !context.CancellationToken.IsCancellationRequested)
{
var i = requestStream.Current.Index;
response.Result.Add(_weatherForecastService.GetWeatherForecast(i));
}
return await Task.FromResult<WeatherForecastReply>(response);
}

In above code, server patiently reads input from requestStream until there is something to read(until requestStream.MoveNext() returns true) or cancellation requested and keeps preparing the result for response. Once all read is completed and notified to the server from client via call “RequestStream.CompleteAsync”, server finally sends back the response. requestStream is of type IAsyncStreamReader<T> which allows to read stream of message from request.

Bidirectional streaming RPCs
rpc GetWeatherForecastDuplexStream (stream StreamMessage) returns (stream WeatherForecast);

This is the combination of Server and client streaming and in this case server and client both are engaged to pass stream of message to each other like a chat box. Here is the code from service side.

public override async Task GetWeatherForecastDuplexStream(IAsyncStreamReader<StreamMessage> requestStream, IServerStreamWriter<WeatherForecast> responseStream, ServerCallContext context)
{
while (await requestStream.MoveNext() && !context.CancellationToken.IsCancellationRequested)
{
var i = requestStream.Current.Index;
await Task.Delay(1000);
await responseStream.WriteAsync(_weatherForecastService.GetWeatherForecast(i));
}
}

Here server patiently reads the stream messages from requestStream until there is message available and the same time keep releasing the response (responseStream.WriteAsync) as when it receives message from requestStream.

Server is ready so Now lets consume the gRPC methods from client. Lets add the code in Program.cs in client project which we created as part of previous article.

Client code for Server Streaming RPCs

try
{
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10));
using var streamingCall = weatherClient.GetWeatherForecastStream(new Empty(), cancellationToken: cancellationToken.Token);
await foreach (var weatherData in streamingCall.ResponseStream.ReadAllAsync(cancellationToken: cancellationToken.Token))
{
Console.WriteLine(weatherData);
}

Console.WriteLine("Stream completed.");
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}

In above code, we read all the messages from ResponseStream (streamingCall.ResponseStream.ReadAllAsync) until there is a message to read. Now if run the server and client, you will see the output as “getting weather data one by one every 1 second interval (as server sends in every 1 second interval).

Client code for Client Streaming RPCs

try
{
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10));
using AsyncClientStreamingCall<StreamMessage, WeatherForecastReply> clientStreamingCall = weatherClient.GetWeatherForecastClientStream(cancellationToken: cancellationToken.Token);
var i = 0;
while (true)
{
if (i >= 10)
{
await clientStreamingCall.RequestStream.CompleteAsync();
Console.WriteLine("Client Streaming completed.");
break;
}
else
{
//write to stream
await clientStreamingCall.RequestStream.WriteAsync(new StreamMessage { Index = i });
i++;
}
}
var response = await clientStreamingCall;
Console.WriteLine(response.Result);
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}

In above code, client keeps sending message one by one to the server via await clientStreamingCall.RequestStream.WriteAsync() call. Finally when it done sending the messages to the server, reads the result response from server with the code var response = await clientStreamingCall; and print.

Here if you run the server and client, you will see the output as:

Client code for Bidirectional streaming RPCs

try
{
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10));
using AsyncDuplexStreamingCall<StreamMessage, WeatherForecast> duplexStreamingCall = weatherClient.GetWeatherForecastDuplexStream(cancellationToken: cancellationToken.Token);
var i = 0;
Task task = Task.WhenAll(new[]
{
Task.Run(async () =>{
while (true)
{
if (i >= 10)
{
await duplexStreamingCall.RequestStream.CompleteAsync();
Console.WriteLine("Client Streaming completed.");
break;
}
else
{
//write to stream
await duplexStreamingCall.RequestStream.WriteAsync(new StreamMessage { Index = i });
i++;
}
}
}),
Task.Run(async () =>{
//read from stream
while (!cancellationToken.IsCancellationRequested && await duplexStreamingCall.ResponseStream.MoveNext())
{
Console.WriteLine(duplexStreamingCall.ResponseStream.Current);
}
})
});
try
{
task.Wait(cancellationToken.Token);
}
catch (OperationCanceledException e)
{
await duplexStreamingCall.RequestStream.CompleteAsync();
Thread.Sleep(6000);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}

In above, we have two tasks running parallelly, one for streaming the messages “await duplexStreamingCall.RequestStream.WriteAsync(new StreamMessage { Index = i });” and the other tasks keeps reading messages form response stream sent from the server side “duplexStreamingCall.ResponseStream.Current” until there is a messge available (await duplexStreamingCall.ResponseStream.MoveNext() is true).
If you run the server and client, output will be as:

You can download the whole demonstration code from here:
https://github.com/binodmahto/FunProjects/tree/main/grpcAPIsDemo

Thank you reading. Don’t forget to clap if you like and leave comments for suggestion.

No comments:

Post a Comment