摘要:上一篇帖子go微服务框架go-micro深度学习
Registry服务的注册和发现详细解释了go-micro是如何做服务注册和发现在,服务端注册server信息,client获取server的地址信息,就可以和服务建立连接,然后就可以进行通信了。

本文主要是教大家实现一个具有基本功能的rpc框架。

python如何通过protobuf实现rpc,pythonprotobufrpc

由于项目组现在用的rpc是基于google protobuf
rpc协议实现的,所以花了点时间了解下protobuf
rpc。rpc对于做分布式系统的人来说肯定不陌生,对于rpc不了解的童鞋可以自行google,这里只是做个简单的介绍。rpc的主要功能是让分布式系统的实现更为简单,为提供强大的远程调用而不损失本地调用语义的简洁性。为了实现这个目标,rpc框架需要提供一种透明调用机制让使用者不必显示区分本地调用还是远程调用。rpc架构涉及的组件如下:

图片 1

客户方像调用本地方法一样去调用远程接口方法,RPC
框架提供接口的代理实现,实际的调用将委托给代理RpcProxy
。代理封装调用信息并将调用转交给RpcInvoker
去实际执行。在客户端的RpcInvoker 通过连接器RpcConnector
去维持与服务端的通道RpcChannel,并使用RpcProtocol
执行协议编码(encode)并将编码后的请求消息通过通道发送给服务方。RPC
服务端接收器 RpcAcceptor 接收客户端的调用请求,同样使用RpcProtocol
执行协议解码(decode)。解码后的调用信息传递给RpcProcessor
去控制处理调用过程,最后再委托调用给RpcInvoker
去实际执行并返回调用结果。

protobuf
rpc在上面组件中主要扮演RpcProtocol的角色,使得我们省去了协议的设计,并且protobuf协议在编码和空间效率都是上非常高效的,这也是很多公司采用protobuf作为数据序列化和通信协议的原因。同时protobuf
rpc定义了一个抽象的rpc框架,如下图所示:

图片 2

RpcServiceStub和RpcService类是protobuf编译器根据proto定义生成的类,RpcService定义了服务端暴露给客户端的函数接口,具体实现需要用户自己继承这个类来实现。RpcServiceStub定义了服务端暴露函数的描述,并将客户端对RpcServiceStub中函数的调用统一转换到调用RpcChannel中的CallMethod方法,CallMethod通过RpcServiceStub传过来的函数描述符和函数参数对该次rpc调用进行encode,最终通过RpcConnecor发送给服务方。对方以客户端相反的过程最终调用RpcSerivice中定义的函数。事实上,protobuf
rpc的框架只是RpcChannel中定义了空的CallMethod,所以具体怎样进行encode和调用RpcConnector都要自己实现。RpcConnector在protobuf中没有定义,所以这个完成由用户自己实现,它的作用就是收发rpc消息包。在服务端,RpcChannel通过调用RpcService中的CallMethod来具体调用RpcService中暴露给客户端的函数。

介绍了这么多,对于怎么样用protobuf
rpc来实现一个rpc肯定还是一头雾水吧,下面就用protobuf
rpc来实现一个简单的python版rpc demo吧。

下面直接给出demo描述PRC的proto文件,至于proto文件的编写规则可以参考protobuf官网。

common.proto文件:

package game;

message RequestMessage
{
  required string message = 1;
}

message ResponseMessage
{
  required string message = 1;
}

game_service.proto文件:

package game;

import "common.proto";
option py_generic_services = true;

service GameService
{
  rpc connect_server(RequestMessage) returns(RequestMessage);
}

common.proto文件描述了RPC中收发的消息;game_service.proto描述了服务器导出的connect_server函数,该函数接受RequestMessage对象作为参数,并返回RequestMessage对象。在使用PRC协议时,必须加上option
py_generic_services  =
true;可选项,要不然编译器不会生成包含connect_server函数的GameService描述。

使用编译器protoc编译proto文件,具体命令为:
protoc.exe –python_out=. game_service.proto
编译后生成的文件为game_service_pb2.py,该文件主要是实现了GameService和GameService_Stub类。GameService_Stub类用于客户端调用者来调用GameService的服务。
前面已经说了,在客户端,RpcChannel只实现了一个空的CallMethod,所以需要继承RpcChannel重新这个函数来encode消息和发送消息。在服务端RpcChannel需要调用CallMethod来调用Service中的函数。具体实现如下:

class MyRpcChannel(service.RpcChannel):
  def __init__(self, rpc_service, conn):
    super(MyRpcChannel, self).__init__()
    self.logger = LogManager.get_logger("MyRpcChannel")

  def CallMethod(self, method_descriptor, rpc_controller, request, response_class, done):
    """"protol buffer rpc 需要的函数,用来发送rpc调用"""
    self.logger.info('CallMethod')
    cmd_index = method_descriptor.index
    assert(cmd_index < 65535)
    data = request.SerializeToString()
    total_len = len(data) + 2
    self.conn.send_data(''.join([pack('<I', total_len), pack('<H', cmd_index), data]))

  def from_request(self):
    """"从网络解析出一个完整的请求之后调的函数"""
    index_data = self.rpc_request.data[0:2]    
    cmd_index = unpack('<H', index_data)[0]  
    rpc_service = self.rpc_service
    s_descriptor = rpc_service.GetDescriptor()
    method = s_descriptor.methods[cmd_index]  
    try:
      request = rpc_service.GetRequestClass(method)()
      serialized = self.rpc_request.data[2:]    
      request.ParseFromString(serialized)  
      rpc_service.CallMethod(method, self.controller, request, None)
    except:
      self.logger.error("Call rpc method failed!")
      self.logger.log_last_except()
    return True

最后就是继承GameService,并实现connect_server函数了。

class GameService(game_service_pb2.GameService):
  def __init__(self):
    self.logger = LogManager.get_logger("GameService")

  def connect_server(self, rpc_controller, request, callback):
    self.logger.info('%s', request.message)

 至于用于网络收发消息的RpcConnector,可以使用python的asyncore库实现,具体实现在这就不讨论了。

从上面的实现来看,protobuf
rpc的实现主要包括编写proto文件并编译生成对应的service_pb2文件,继承RpcChannel并实现CallMethod和调用Service的CallMethod,继承Service来实现暴露给客户端的函数。

以上就是本文的全部内容,希望对大家的学习有所帮助。

go-micro
支持很多通信协议:http、tcp、grpc等,支持的编码方式也很多有json、protobuf、bytes、jsonrpc等。也可以根据自己的需要实现通信协议和编码方式。go-micro
默认的通信协议是http,默认的编码方式是protobuf,我就以默认的方式来分解他的具体实现。

技术选型

很多RPC框架的实现都是基于protobuf的,包括我们公司服务端引擎实现的rpc框架也是基于protobuf。从前做过一些简单的测试,对相同数据进行序列化,protobuf对数据的压缩率明显高于对比的msgpack、bson和json;相同的对于序列化时间上的比较,msgpack则最快。但是protobuf最让我受不了的就是需要定义固定的协议,并且使用对应的工具生成协议代码,然后才能使用,用起来需要一定的学习成本。
因此我在想,是否可以基于msgpack实现一个不需要预先定义通信协议的rpc框架呢?答案当然是肯定的。

您可能感兴趣的文章:

  • python使用xmlrpc实例讲解
  • Python中实现远程调用(RPC、RMI)简单例子
  • Python XML RPC服务器端和客户端实例
  • 使用rpclib进行Python网络编程时的注释问题
  • python基于xmlrpc实现二进制文件传输的方法
  • python使用xmlrpclib模块实现对百度google的ping功能

由于项目组现在用的rpc是基于google protobuf
rpc协议实现的,所以花了点时间了解下protobuf rpc。rpc对…

图片 3

设计原型

class rpc_service {
    void handle_data();
    void register_rpc();
    void remote_call();
};

目前rpc_service这个类只是个原型,我们提供三个接口:

  • handle_data: 用于处理原生数据,将原生数据序列化为msgpack
  • register_rpc: 用于Server端注册rpc函数
  • remote_call: Stub端调用远程rpc函数

服务的启动

msgpack都提供了什么

这里我使用了msgpack
c++(https://github.com/msgpack/msgpack-c)。我们可以利用msgpack将参数进行打包和解包。

// 将参数打包成字符串
std::tuple<int, bool, std::string> t(1, false, "hi");
std::stringstream buffer;
msgpack::pack(buffer, t);

buffer.seekg(0);
std::string s(buffer.str());

// 对字符串进行解包
std::tuple<int, bool, std::string> dest;
msgpack::object_handle oh = msgpack::unpack(str.data(), str.size());
oh.get().convert(t);

实现一个简单rpc,我们只需要msgpack上面介绍的功能就可以了。

go-micro在启动的时候会选择默认通信协议http和protobuf编码方式,但他是如何路由到具体方法的?在go-micro服务端启动的时候我们需要注册Handler,也就是我们具体实现结构体
,如例子中注册方法时,我们调用的RegisterSayHandler方法

实现remote_call函数

先从最简单的函数开始实现。

template<typename ... Args>
void remote_call(const std::string& method, Args&& ... args);

思路很简单,我们将method字符串和args打包后的msgpack字符串都存入一个字符串流中。这里的字符串流我们定义为rpc_stream(后续将详细讲解rpc_stream)的实现,这里先简单介绍一下:

class rpc_stream : private asio::noncopyable {
public:
    rpc_stream();
public:
    std::istream& read_stream() { return stream_; }
    std::ostream& write_stream() { return stream_; }
    streambuf_ex& buf() { return buf_; }
    void reset();

    char* c_str();

private:
    streambuf_ex buf_;
    std::iostream stream_;
};

typedef std::shared_ptr<rpc_stream> rpc_stream_ptr;

rpc_stream是我写的服务端引擎的一个副产品,基于asio::streambuf的字节流处理类。接口也比较简单易懂:

  • read/write_stream: 读出/输入流
  • buf: 数据的缓冲区,提供size接口获得缓冲区大小
  • c_str: 将字节流中的数据转化为字符串

到目前为止,就可以动手开始写remote_call函数了。

class rpc_service {
public:
    rpc_service(): stream_ptr_(new hex_engine::rpc::rpc_stream()) {}
    template<typename ... Args>
    void remote_call(const std::string& method, Args&& ... args) {
        // 1.构造tuple
        auto t = std::make_tuple(std::forward<Args>(args)...);
        // 2.总长度
        uint32_t total_sz = 0;
        stream_ptr_->write_stream().write((char*)&total_sz, sizeof(total_sz));
        // 3.函数名称
        uint8_t method_sz = method.size();
        stream_ptr_->write_stream().write((char*)&method_sz, sizeof(method_sz));
        stream_ptr_->write_stream().write(method.c_str(), method_sz);
        // 4.写入参数
        msgpack::pack(stream_ptr_->write_stream(), t);
        total_sz = stream_ptr_->buf().size() - sizeof(total_sz);
        // 5.重新写入总长度
        stream_ptr_->write_stream().seekp(0);
        stream_ptr_->write_stream().write((char*)&total_sz, sizeof(total_sz));
    }

    hex_engine::rpc::rpc_stream_ptr stream_ptr() { return stream_ptr_; }

private:
    hex_engine::rpc::rpc_stream_ptr stream_ptr_;
};

这样数据便存在rpc_stream的字节流中了,之后可以通过c_str接口将字节流的数据导出,通过网络传输到远端。

// 注册
Handlerrpcapi.RegisterSayHandler(service.Server(),new(handler.Say))

接下来

后面一章将讲解Server端Rpc的实现

这个方法内部的体实现主要是利用了反射的力量,注册的对象是实现了rpc接口的方法,如我们的Say实现了SayHandler。go-micro默认的router会利用反射把Say对象的信息完全提取出来,解析出结构体内的方法及方法的参数,保存到一个map内->
map[结构体名称][方法信息集合]

具体的实现在rpc_router.go里router的Handle方法,组织完成后map的是下图这样,保存了很多反射信息,用以将来调用。

下面是这个方法的主要代码,删除了一些,很希望大家读一下rpc_router.go里面的代码,prepareMethod方法是具体利用反射提取信息的方法。

func (router *router) Handle(h Handler) error {

router.mu.Lock()

defer router.mu.Unlock()

// …. rcvr := h.Handler()

s :=new

s.typ = reflect.TypeOf

s.rcvr = reflect.ValueOf

// check name

// ….s.name = h.Name()

s.method = make(map[string]*methodType)

// Install the methodsform :=0; m < s.typ.NumMethod(); m++ {

method := s.typ.Method

// prepareMethod会把所有解析的信息返回来ifmt := prepareMethod; mt != nil
{

s.method[method.Name] = mt

}

}

// …..

发表评论

电子邮件地址不会被公开。 必填项已用*标注