.netcore grpc双向流方法详解

news/2024/7/22 4:15:35 标签: .net core, rpc

一、双向流处理概述

  1. 简单来讲客户端可以向服务端发送消息流,服务端也可以向客户端传输响应流,即客户端和服务端可以互相通讯
  2. 客户端无需发送消息即可开始双向流式处理调用 。 客户端可选择使用 RequestStream.WriteAsync 发送消息。 使用 ResponseStream.MoveNext() 或 ResponseStream.ReadAllAsync() 可访问从服务流式处理的消息。ResponseStream 没有更多消息时,双向流式处理调用完成。

二、案例简介

  1. 客户端发送请求流通过equestStream.WriteAsync传入到服务端
  2. 服务端响应到客户端的流通过ResponseStream.WriteAsync写入到客户端
  3. 服务端使用System.Threading.Channels保证线程安全交互

三、服务端配置(注意:grpc相关配置参考我之前的文章)

  1. 配置.proto文件
// 1.提供公共的实体proto文件
// 2.服务引用对应的proto文件
// 3.定义三个客户流方法

//定义messages.proto文件令需要注意项目文件中的特性GrpcServices=None;

syntax = "proto3";

option csharp_namespace = "GrpcProject";

package grpc.serviceing;


// 消息推送/接收实体
message ExampleMessage
{
	string msg = 1;
}


// 双向流文件twowaystream.proto

syntax = "proto3";

import "Protos/messages.proto";

option csharp_namespace = "GrpcProject";

package grpc.serviceing;

service BothWaysRpc{
	// 双向流
	rpc StreamingBothWays(stream ExampleMessage) returns (stream ExampleMessage);
}
  1. 1 服务接口实现
    /// <summary>
    /// 双向流服务
    /// </summary>
    public class BothWaysService : BothWaysRpc.BothWaysRpcBase
    {
        /// <summary>
        /// 自动重置事件
        /// </summary>
        private readonly ManualResetEventSlim _event;
        public BothWaysService()
        {
            _event = new ManualResetEventSlim(false);
        }

        public override async Task StreamingBothWays(IAsyncStreamReader<ExampleMessage> requestStream,
                                               IServerStreamWriter<ExampleMessage> responseStream,
                                               ServerCallContext context)
        {

            // 创建线程安全的有限容量通道
            var channel = Channel.CreateBounded<ExampleMessage>(new BoundedChannelOptions(capacity: 5));

            var task = Task.Run(async () =>
            {
                await foreach (var message in requestStream.ReadAllAsync())
                {
                    // 读取消息 写入通道
                    if (!string.IsNullOrWhiteSpace(message.Msg))
                    {
                        await Console.Out.WriteLineAsync($"记录客户端传入消息:{message.Msg}");
                        // todo 消息处理
                        await channel.Writer.WriteAsync(message, context.CancellationToken);
                    }
                }
            }, context.CancellationToken);


            await foreach (var message in channel.Reader.ReadAllAsync())
            {
                // 打印通道接收的消息
                await Console.Out.WriteLineAsync($"通道传入消息:{message.Msg}");

                // 写入响应流
                ExampleMessage exampleMessage = new ExampleMessage() { Msg = $"我已经接收到消息:{message.Msg}" };

                await responseStream.WriteAsync(exampleMessage);

                if (message.Msg.ToLower() == "exit")
                {
                    break;
                }
            }

            // 完结写入通道
            channel.Writer.Complete();
            await task;
        }
    }
  1. 2 Program注入
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);
            builder.Services.AddGrpc();
            var app = builder.Build();
            // 一元方法
            //app.MapGrpcService<DollarService>();
            // 客户端流
            //app.MapGrpcService<ClientStreamService>();
            // 服务端流
            //app.MapGrpcService<ServerStreamService>();
            // 双向流
            app.MapGrpcService<BothWaysService>();
            app.Run();
        }
    }

四、客户端配置

  1. 引用proto文件,配置为客户端类型
  2. 根据编译生成的函数进行传参调用
  3. 创建WPF测试客户端

button按钮触发grpc

 

    /// <summary>
    /// BothWaysClient.xaml 的交互逻辑
    /// </summary>
    public partial class BothWaysClient : Window
    {
        public BothWaysClient()
        {
            InitializeComponent();
        }

        private async void Excute_Click(object sender, RoutedEventArgs e)
        {
            Action<string> action = str => { txtValue.Text += $"{str}\r\n"; };

            await WpfClient.Show(action);


            txtValue.Text += "\r\n\r\n";
        }
    }

grpc客户端接口调用

        /// <summary>
        /// 双向流
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public static async Task Show(Action<string> action)
        {
            var messages = new List<string>()
            {
                "test",
                "one",
                "two",
                "three",
                "false",
                "four",
                "Oooo",
                "dddd",
                "vvvfff",
                "exit"
            };


            Random rnd = new Random(20);


            var channel = GrpcChannel.ForAddress("https://localhost:7188");

            var client = new GrpcProject.BothWaysRpc.BothWaysRpcClient(channel);

            var bothWays = client.StreamingBothWays();

            var requestTask = Task.Run(async () =>
              {
                  while (true)
                  {
                      var index = rnd.Next(messages.Count);
                      var msg = messages[index];
                      await bothWays.RequestStream.WriteAsync(new ExampleMessage { Msg = msg });
                      if (msg == "exit")
                      {
                          break;
                      }
                  }
              });
            await foreach (var item in bothWays.ResponseStream.ReadAllAsync())
            {
                action(item.Msg);
                if (item.Msg == "我已经接收到消息:exit")
                {
                    break;
                }
            }

            await requestTask;
        }

五、执行结果

服务端:

 客户端:

 六、源码地址

链接:https://pan.baidu.com/s/1uCirfbexPJ7C-AujBVtkCQ 
提取码:sd4y

七、后续进阶简介

  1. 接下来会讲解客户端工厂,优化客户端请求地址使用依赖注入提取各个服务
  2. proto文件各个字段详细介绍
  3. token认证
  4. 截止时间(中止请求)和请求取消
  5. AOP切面策略
  6. 重试策略(policy)
  7. 负载均衡策略(grpc本身提供的策略及nginx代理)
  8. 日志记录
  9. 健康检查
  10. 后续有更多特色功能会持续补充

http://www.niftyadmin.cn/n/4938101.html

相关文章

微信记录---推荐系统---23/8/14 小总结

推荐系统---23/8/14 小总结 1. ACM推荐系统专题研讨会2.图神经网络推荐系统3.表1 模型效果对标:MovieLens 1M4.爬虫技术5.TF-IDF算法6.图 2 海量学术大数据推荐系统技术架构7.图 4 CADAL 平台推荐系统框架设计8.企业推荐系统发展概述MLR(Mixed Logistic Regression)DIEN(Deep…

海底两万里思维导图怎么绘制?用这几个绘制步骤就可以了

海底两万里思维导图怎么绘制&#xff1f;思维导图可以帮助我们更好地理解和组织信息。它是一种图形化的工具&#xff0c;可以帮助我们将大量的信息整理成有序、易于理解的结构。通过绘制思维导图&#xff0c;我们可以更好地把握《海底两万里》这本书的主题和章节&#xff0c;更…

每日一题——圆圈中最后剩下的数字(约瑟夫环问题)

圆圈中最后剩下的数字&#xff08;约瑟夫环问题&#xff09; 题目链接 约瑟夫环 这是一道典型的约瑟夫环问题&#xff0c;而约瑟夫问题的一般形式是这样的&#xff1a; 约瑟夫问题是个有名的问题&#xff1a;N个人围成一圈&#xff0c;从第一个开始报数&#xff0c;第M个将被…

mqttfx连上OneNET生成token时的一大坑,报用户名或密码错误

整个流程如下连接&#xff1a; MQTT.fx和MQTTX 链接ONENET物联网开发平台避坑细节干货。 其中在生成token时&#xff0c;搞了半天在连接后都会报用户名密码错误 最后发现是格式问题&#xff0c;输入所有字符后一定要双击看是否可以全选中&#xff0c;可以全选中说明字符的格式…

lodash之cloneDeep()源码阅读笔记

lodash之cloneDeep()源码阅读笔记 基本上都在写业务代码&#xff0c;没有机会写库&#xff0c;还是想了解一下lodash的库源码是怎么样的&#xff0c;平时用的最多的就是cloneDeep()方法了&#xff0c;终于有空详细看看其中的源码。 本文基于lodash5.0.0版本的源码进行阅读。 /…

C语言学习笔记---数据的存储详解

C语言程序设计笔记---015 C语言数据的存储1、数据类型的意义1.1、unsigned与signed数据类型例程11.2、补码与原码相互转换例程2 2、大小端的介绍2.1、大小端的例程12.2、大小端的例程2 --- 判断当前编译器环境属于大端或小端 3、综合练习题探究数据的存储3.1、练习题13.2、练习…

cout还是printf?C++教程 - How to C++系列专栏第4篇

关于专栏 这个专栏是优质的C教程专栏&#xff0c;如果你还没看过第一篇&#xff0c;点击这里去第0篇 本专栏一致使用操作系统&#xff1a;macOS Ventura&#xff0c;代码编辑器&#xff1a;CLion&#xff0c;C编译器&#xff1a;Clang 感谢一路相伴的朋友们&#xff0c;感谢…

Hadoop安装完全分布式搭建

1、安装Hadoop 上传Hadoop的指定路径/root/softwares 解压安装 cd /root/softwares && tar -zxvf hadoop-2.7.3.tar.gz -C /usr/local配置环境变量 vim /etc/profile # Hadoop Environment export HADOOP_HOME/usr/local/hadoop-2.7.3 export PATH$PATH:$HADOOP_HOM…