Learn how to implement client-side streaming in gRPC using Go.
Hey folks, so today we're going to look at client-streaming RPCs. These are RPCs where a client can send a sequence of messages on a single stream and then once the client's finished sending these messages, it will wait for the server to read them and then the server will respond with a single response. So to do this we're going to implement a simple RPC to demonstrate this. And the RPC we're going to build today is going to be like a log-streaming RPC.
So the client will stream a number of logs to the server and the server will basically just log them as they're received and then it will just return with a response to say how many entries have been logged so we'll start out by implementing our protobuf. So we'll just create an RPC called LogStream and that will take a LogStreamRequest and will return a LogStreamResponse and let's implement our messages as well so we'll have a LogStreamRequest and here this will have a Google protobuf timestamp to signify the time of the log.
We can then implement a log level so we'll set an enum for this. Now typically because gRPC enums start at zero, it's usually good to just create like the first record, the first value in the enum as something like unspecified or invalid. And this means it's very easy to differentiate between what's a zero value and what's like an enum that's not been set and one where we have a value. Because like if we started with, let's say this is debug, it would be hard to differentiate between what has where a value has not been set or where the value has been set to the first element.
The common log levels and I think we'll just do fatal as well so then add a level here and finally let's add the message we want to log cool so that's the request and then for the response we'll just have an integer which will be called entries_logged great so because this is a client streaming RPC we just need to add the stream keyword here and this is what signifies that it's client-streaming so now we've got that let's go and generate our code that's run now and if we check our code out now we should have a on our interface we should have a LogStream RPC as well.
So now in our implemented server that we had from the previous video we just need to add our LogStream function and then we can just implement that there so what I'll do is I'll just copy the signature from here and we can just rename this stream. So this, for client-streaming RPCs, they just have one argument and that is the client. Right, so for our implementation, the first thing we need to do is initialize a count, and then loop through all the received messages and inside there we just need to log message increment count and before this we just need to check if the stream is closed.
Great so we'll start out by just initializing a count as zero and then we'll create a for loop and inside here we'll first receive our message and we do this by calling the stream.Recv function so this receive function just returns or just returns a request or an error so now we can check if the error is not nil if that's the case then there's some error between client and server so we can just return the error here but we also should check here the error equals io.EOF that means the client has closed the stream so at this point we can just return then and that basically just ends the RPC.
But if we have our log entry and the client the stream hasn't closed and there's no error then we can log our message we'll do log.Printf and we can say received log let's log the timestamp and then let's log the level and then the message so you can do log_entry.Get Timestamp.AsTime then we can do log_entry.GetLevel we want to get that as a string and then we do log_entry.GetMessage and then once that's done we can just increment the count as well so that should be the main implementation for our client-streaming RPC.
One thing I realized I did make a mistake we don't want to just return nil here stream is closed what we actually want to return is the response from the server so we do this by returning by calling the SendAndClose function and this will take the protobuf response so this is the LogStreamResponse and here we'll just send our count back but because protobuf has int32 and int64 we have to cast basically from a regular int cool so that should be the implementation done on the server side so let's look at the client.
So for this we're going to create a client-streaming and then we'll create a main.go file in there to keep it separate from our other RPC so in here we want to first initialize our gRPC connection then create our client then initialize the client stream and then we can send some log messages and then finally close the stream and log the response from server so let's start by initializing our connection so we'll create a context first and then we'll create the connection we'll use the gRPC dial function like we've been using in other videos we'll pass in WithTransportCredentials and WithBlock and then we'll create our client.
And then we'll initialize our stream so we do this by calling the LogStream method on the client and pass our context and that should be it and that will create our stream and also return error so check the error first and this will return error if the client can't make a connection with the server for whatever reason or also we could potentially have middleware on the server as well which might return an error it might trigger an error in the middleware as well or we might just return it from our implementation and now in here we can send some log messages so let's say let's just iterate and create five log messages so we'll start out by initializing our message.
We'll just send the time of now the log level we'll send info let's say and our message can be hello blog and then we'll send the integer as well and then to actually send the message on the stream we just call the stream. Send function and that takes the protobuf request make this a pointer it takes the protobuf request as a parameter and then we can check the error for that as well here we'll just do the same we'll log.fatal and then just so we can visualize this better when we add sleep just so we can see the log happening.
So then once we've sent all our messages to the server at that point we need to wait for the server to respond basically after it's read all the messages. So to do this we call the stream.CloseAndRecv and this is a blocking function this will block until the server has read all the messages and then responded with its response. So this will respond when this SendAndClose function is called and here the same thing again we can just do log.fatal we could check the error here and check if it's got a certain status code and check maybe if we want to do custom error handling logic but for the purpose of this it's okay and then finally we'll just print number of logs sent and this will print the entries_logged that we get in the response cool so that should be all the implementation we need to do on this on the client side so let's try running it and see what we get.
So we'll start out by running our server and the server is running and now let's run our client. So now you can see the server is logging for each message it's received and then once it's received all the messages from the client and responds the client has logged the message here as well. So that's everything, that's how easy it is to implement a client-streaming RPC using gRPC in Go. So in the next video we're going to look at bi-directional streaming so I'll see you there.