This lesson is locked. Login or Subscribe for more access!

server streaming in gRPC

Duration: 14 mins

Learn how to implement server-streaming in gRPC using Go.

Instructor

Chris Shepherd

Share with a friend!

Transcript

Hey folks, in this video, we’re exploring server streaming by building a simple RPC. I’ve already set up a project with all the boilerplate code we’ve covered in previous videos.

Today, we’ll build a server-streaming RPC that streams the current time from the server to the client at specified intervals. First, we’ll create our protobuf. Let’s define an RPC called ServerStreamTime. It’ll take a StreamServerTimeRequest and return a StreamServerTimeResponse.

To make it a server-streaming RPC, we add the keyword stream next to the response. Now, let’s create our message types for the request and response. Since this is a server-streaming RPC, it takes a single request and streams back responses whenever the server sends data to the client.

For this ServerStreamTime RPC, we’ll pass an int32 called interval_seconds from the client to the server. This tells the server how often to stream data back. In the response, we’ll return a timestamp using the Google Protobuf Timestamp type, called current_time. We just need to add the import for that.

Great! With our RPC defined, let’s generate the protobuf code using our make protogen command, which runs protoc. This generates our gRPC file and message definitions.

Now, let’s go to our service and implement the interface, just like before. For server-streaming RPCs, they take the request as the first parameter and a stream object as the second. We’ll rename it to stream for clarity.

Inside here, we’ll initialize a ticker based on our interval, then loop through and listen on it. In that loop, we’ll get the current time, build our response, and send it to the client. We also need to check if the context is cancelled—if it is, we’ll shut down the stream.

Let’s start by checking the request. If interval_seconds is zero, we’ll return an invalid argument error: "Interval must be set." Since this is a streaming RPC, we return an error, not a response.

Now, let’s initialize our ticker. We’ll create a time.Duration from interval_seconds, multiply it by time.Second, and use time.NewTicker. We’ll defer ticker.Stop() to close it gracefully.

Next, we’ll loop and use a select statement to handle two channels: one from the context to check if it’s done—if so, we return nil to close the stream—and another to listen to the ticker channel. Every time the interval passes, we’ll create our response and send it.

First, we get the current time and build a StreamServerTimeResponse with current_time. We use timestamp.New() from the Go gRPC types package to convert it to a Protobuf Timestamp. Then, we call stream.Send() to stream it to the client. If this fails, we return an error to close the stream.

That’s the server implementation! Now, let’s tackle the client side. There was an error in the server’s main file—the proto package wasn’t imported, but that’s fixed now.

On the client, we’ll initialize a gRPC connection and create the client. Then, we’ll initialize the stream and loop through the responses from the server. Once the server closes the stream, we’ll exit gracefully. For now, we’ll just log each response.

Let’s start with the connection. We’ll create a context and use grpc.Dial to connect to localhost with insecure credentials and WithBlock() to catch errors early. If there’s an error, we’ll call log.Fatal().

With the connection set, we’ll create our client and initialize the stream. Calling ServerStreamTime is like a unary RPC since it sends a single request, but it returns a stream instead of a single response. We’ll pass an interval_seconds of 2, and if there’s an error, we’ll log it.

Now, we’ll loop through the stream. The Recv() function returns a response or an error. If the error isn’t nil, we’ll log it. Otherwise, we’ll print "Received time from server:" and use current_time.AsTime() to convert the Timestamp to a time.Time object.

We also need to handle stream closure. When the server closes it, Recv() returns an EOF error. We’ll check if it’s io.EOF and break the loop. Recv() blocks until a response arrives or the stream closes, so we’ll log "Server stream closed" when it’s done.

That’s all for the client and server implementation! Let’s run it. We’ll start the server, then the client, and we’ll see the server streaming the time every two seconds.

That’s it! That’s how we create server-streaming RPCs using Go. In the next video, we’ll explore client-streaming RPCs. See you there!