FreezeJ' Blog

python gRPC使用Demo

2022-08-09

参考:
https://blog.csdn.net/bocai_xiaodaidai/article/details/103958468
https://blog.csdn.net/Xw_Classmate/article/details/118643368

简介

gRPC是由 google开发的一个高性能、通用的开源RPC框架,主要面向移动应用开发且基于HTTP/2协议标准而设计,同时支持大多数流行的编程语言。

优点

  • 高效的二进制编码机制
  • 采用http2协议
  • 清晰的接口规范
  • 对流的支持

安装模块

pip3 install grpcio-tools==1.47.0

# 自动安装好了以下依赖模块
# Installing collected packages: protobuf, grpcio, grpcio-tools
# Successfully installed grpcio-1.48.0 grpcio-tools-1.47.0 protobuf-3.20.1

文件结构

├── client.py  # gRPC 客户端代码
├── data_pb2_grpc.py  # 由data.proto生成
├── data_pb2.py  # 由data.proto生成
├── data.proto  # protobuf数据格式
└── server.py  # gRPC 服务端代码

生成protobuf文件

syntax = "proto3";

// 定义服务,用在rpc传输中
service GRPCTest {
  rpc RemoteFunc(request_data) returns (response_data);
}

// 请求服务数据结构
message request_data {
  string text = 1;
}

// 返回结果数据结果
message response_data {
  int32 code = 1;  // 状态码
  string result = 2;  // 结果
}
python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./data.proto

data_pb2_grpc.py:

# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import data_pb2 as data__pb2


class GRPCTestStub(object):
    """定义服务,用在rpc传输中
    """

    def __init__(self, channel):
        """Constructor.

        Args:
            channel: A grpc.Channel.
        """
        self.RemoteFunc = channel.unary_unary(
                '/GRPCTest/RemoteFunc',
                request_serializer=data__pb2.request_data.SerializeToString,
                response_deserializer=data__pb2.response_data.FromString,
                )


class GRPCTestServicer(object):
    """定义服务,用在rpc传输中
    """

    def RemoteFunc(self, request, context):
        """Missing associated documentation comment in .proto file."""
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')


def add_GRPCTestServicer_to_server(servicer, server):
    rpc_method_handlers = {
            'RemoteFunc': grpc.unary_unary_rpc_method_handler(
                    servicer.RemoteFunc,
                    request_deserializer=data__pb2.request_data.FromString,
                    response_serializer=data__pb2.response_data.SerializeToString,
            ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
            'GRPCTest', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))


 # This class is part of an EXPERIMENTAL API.
class GRPCTest(object):
    """定义服务,用在rpc传输中
    """

    @staticmethod
    def RemoteFunc(request,
            target,
            options=(),
            channel_credentials=None,
            call_credentials=None,
            insecure=False,
            compression=None,
            wait_for_ready=None,
            timeout=None,
            metadata=None):
        return grpc.experimental.unary_unary(request, target, '/GRPCTest/RemoteFunc',
            data__pb2.request_data.SerializeToString,
            data__pb2.response_data.FromString,
            options, channel_credentials,
            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

data_pb2.py:

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: data.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ndata.proto\"\x1c\n\x0crequest_data\x12\x0c\n\x04text\x18\x01 \x01(\
t\"-\n\rresponse_data\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x05\x12\x0e\n\x06result\x18\x02 \x01(\t27\n\x08GRPCTest\x12+\n\nRemoteFunc
\x12\r.request_data\x1a\x0e.response_datab\x06proto3')



_REQUEST_DATA = DESCRIPTOR.message_types_by_name['request_data']
_RESPONSE_DATA = DESCRIPTOR.message_types_by_name['response_data']
request_data = _reflection.GeneratedProtocolMessageType('request_data', (_message.Message,), {
  'DESCRIPTOR' : _REQUEST_DATA,
  '__module__' : 'data_pb2'
  # @@protoc_insertion_point(class_scope:request_data)
  })
_sym_db.RegisterMessage(request_data)

response_data = _reflection.GeneratedProtocolMessageType('response_data', (_message.Message,), {
  'DESCRIPTOR' : _RESPONSE_DATA,
  '__module__' : 'data_pb2'
  # @@protoc_insertion_point(class_scope:response_data)
  })
_sym_db.RegisterMessage(response_data)

_GRPCTEST = DESCRIPTOR.services_by_name['GRPCTest']
if _descriptor._USE_C_DESCRIPTORS == False:

  DESCRIPTOR._options = None
  _REQUEST_DATA._serialized_start=14
  _REQUEST_DATA._serialized_end=42
  _RESPONSE_DATA._serialized_start=44
  _RESPONSE_DATA._serialized_end=89
  _GRPCTEST._serialized_start=91
  _GRPCTEST._serialized_end=146
# @@protoc_insertion_point(module_scope)

服务端代码

server.py:

import grpc
import time
from concurrent import futures
import data_pb2, data_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = 'localhost'
_PORT = '8080'

# 继承类并且复写接口
class GRPCTestServicer(data_pb2_grpc.GRPCTestServicer):
    # 复写函数
    def RemoteFunc(self, request, context):
        text = request.text
        if text:
            return data_pb2.response_data(result=text.upper(), code=0)
        else:
            return data_pb2.response_data(result='传入参数为空字符!', code=1)


def serve():
    # 定义服务器并设置最大连接数,corcurrent.futures是一个并发库,类似于线程池的概念
    grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=4))   # 创建一个服务器
    data_pb2_grpc.add_GRPCTestServicer_to_server(GRPCTestServicer(), grpcServer)  # 在服务器中添加派生的接口服务(自己实现了处理函数)
    grpcServer.add_insecure_port(_HOST + ':' + _PORT)    # 添加监听端口
    grpcServer.start()    #  启动服务器
    print('启动RPC Server')
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        grpcServer.stop(0) # 关闭服务器


if __name__ == '__main__':
    serve()

客户端代码

import grpc
import data_pb2, data_pb2_grpc

_HOST = 'localhost'
_PORT = '8080'

def run():
    text = input('输入字符串,字母转为大写:')
    conn = grpc.insecure_channel(_HOST + ':' + _PORT)  # 监听频道
    client = data_pb2_grpc.GRPCTestStub(channel=conn)   # 客户端使用Stub类发送请求,参数为频道,为了绑定链接
    response = client.RemoteFunc(data_pb2.request_data(text=text))   # 返回的结果就是proto中定义的类
    code = response.code
    if code == 0:
        print("返回结果:" + response.result)
    else:
        print("操作错误:" + response.result)

if __name__ == '__main__':
    run()

运行结果

# 启动服务端:
$ python3 server.py                                                                                                               
启动RPC Server

# 客户端请求:
$ python3 client.py                                                                                                               
输入字符串,字母转为大写:test123
返回结果:TEST123

$ python3 client.py                                                                                                               
输入字符串,字母转为大写:
操作错误:传入参数为空字符!
Tags: Linux