grpc-service-development

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

gRPC Service Development

gRPC服务开发

Overview

概述

Develop efficient gRPC services using Protocol Buffers for service definition, with support for unary calls, client streaming, server streaming, and bidirectional streaming patterns.
使用Protocol Buffers定义服务,开发高效的gRPC服务,支持一元调用、客户端流、服务端流和双向流模式。

When to Use

适用场景

  • Building microservices that require high performance
  • Defining service contracts with Protocol Buffers
  • Implementing real-time bidirectional communication
  • Creating internal service-to-service APIs
  • Optimizing bandwidth-constrained environments
  • Building polyglot service architectures
  • 构建需要高性能的微服务
  • 使用Protocol Buffers定义服务契约
  • 实现实时双向通信
  • 创建内部服务间API
  • 优化带宽受限的环境
  • 构建多语言服务架构

Instructions

操作步骤

1. Protocol Buffer Service Definition

1. Protocol Buffer服务定义

protobuf
syntax = "proto3";

package user.service;

message User {
  string id = 1;
  string email = 2;
  string first_name = 3;
  string last_name = 4;
  string role = 5;
  int64 created_at = 6;
  int64 updated_at = 7;
}

message CreateUserRequest {
  string email = 1;
  string first_name = 2;
  string last_name = 3;
  string role = 4;
}

message UpdateUserRequest {
  string id = 1;
  string email = 2;
  string first_name = 3;
  string last_name = 4;
}

message GetUserRequest {
  string id = 1;
}

message ListUsersRequest {
  int32 page = 1;
  int32 limit = 2;
}

message ListUsersResponse {
  repeated User users = 1;
  int32 total = 2;
  int32 page = 3;
}

message DeleteUserRequest {
  string id = 1;
}

message Empty {}

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc UpdateUser(UpdateUserRequest) returns (User);
  rpc DeleteUser(DeleteUserRequest) returns (Empty);
  rpc StreamUsers(Empty) returns (stream User);
  rpc BulkCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
}

message Event {
  string type = 1;
  string user_id = 2;
  string data = 3;
  int64 timestamp = 4;
}

service EventService {
  rpc Subscribe(Empty) returns (stream Event);
  rpc PublishEvent(Event) returns (Empty);
}
protobuf
syntax = "proto3";

package user.service;

message User {
  string id = 1;
  string email = 2;
  string first_name = 3;
  string last_name = 4;
  string role = 5;
  int64 created_at = 6;
  int64 updated_at = 7;
}

message CreateUserRequest {
  string email = 1;
  string first_name = 2;
  string last_name = 3;
  string role = 4;
}

message UpdateUserRequest {
  string id = 1;
  string email = 2;
  string first_name = 3;
  string last_name = 4;
}

message GetUserRequest {
  string id = 1;
}

message ListUsersRequest {
  int32 page = 1;
  int32 limit = 2;
}

message ListUsersResponse {
  repeated User users = 1;
  int32 total = 2;
  int32 page = 3;
}

message DeleteUserRequest {
  string id = 1;
}

message Empty {}

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc UpdateUser(UpdateUserRequest) returns (User);
  rpc DeleteUser(DeleteUserRequest) returns (Empty);
  rpc StreamUsers(Empty) returns (stream User);
  rpc BulkCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
}

message Event {
  string type = 1;
  string user_id = 2;
  string data = 3;
  int64 timestamp = 4;
}

service EventService {
  rpc Subscribe(Empty) returns (stream Event);
  rpc PublishEvent(Event) returns (Empty);
}

2. Node.js gRPC Server Implementation

2. Node.js gRPC服务器实现

javascript
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');

const packageDef = protoLoader.loadSync(
  path.join(__dirname, 'user.proto'),
  { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }
);

const userProto = grpc.loadPackageDefinition(packageDef).user.service;

const users = new Map();
let userIdCounter = 1;

const userServiceImpl = {
  getUser: (call, callback) => {
    const user = users.get(call.request.id);
    if (!user) {
      return callback({ code: grpc.status.NOT_FOUND, details: 'User not found' });
    }
    callback(null, user);
  },

  listUsers: (call, callback) => {
    const page = call.request.page || 1;
    const limit = call.request.limit || 20;
    const offset = (page - 1) * limit;

    const userArray = Array.from(users.values());
    const paginatedUsers = userArray.slice(offset, offset + limit);

    callback(null, {
      users: paginatedUsers,
      total: userArray.length,
      page: page
    });
  },

  createUser: (call, callback) => {
    const id = String(userIdCounter++);
    const user = {
      id,
      email: call.request.email,
      first_name: call.request.first_name,
      last_name: call.request.last_name,
      role: call.request.role,
      created_at: Date.now(),
      updated_at: Date.now()
    };
    users.set(id, user);
    callback(null, user);
  },

  updateUser: (call, callback) => {
    const user = users.get(call.request.id);
    if (!user) {
      return callback({ code: grpc.status.NOT_FOUND, details: 'User not found' });
    }

    Object.assign(user, {
      email: call.request.email || user.email,
      first_name: call.request.first_name || user.first_name,
      last_name: call.request.last_name || user.last_name,
      updated_at: Date.now()
    });

    callback(null, user);
  },

  deleteUser: (call, callback) => {
    users.delete(call.request.id);
    callback(null, {});
  },

  streamUsers: (call) => {
    Array.from(users.values()).forEach(user => {
      call.write(user);
    });
    call.end();
  },

  bulkCreateUsers: (call, callback) => {
    const createdUsers = [];

    call.on('data', (request) => {
      const id = String(userIdCounter++);
      const user = {
        id,
        email: request.email,
        first_name: request.first_name,
        last_name: request.last_name,
        role: request.role,
        created_at: Date.now(),
        updated_at: Date.now()
      };
      users.set(id, user);
      createdUsers.push(user);
    });

    call.on('end', () => {
      callback(null, { users: createdUsers, total: createdUsers.length, page: 1 });
    });

    call.on('error', (err) => {
      callback(err);
    });
  }
};

const server = new grpc.Server();
server.addService(userProto.UserService.service, userServiceImpl);

server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
  console.log('gRPC server running on port 50051');
  server.start();
});
javascript
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');

const packageDef = protoLoader.loadSync(
  path.join(__dirname, 'user.proto'),
  { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }
);

const userProto = grpc.loadPackageDefinition(packageDef).user.service;

const users = new Map();
let userIdCounter = 1;

const userServiceImpl = {
  getUser: (call, callback) => {
    const user = users.get(call.request.id);
    if (!user) {
      return callback({ code: grpc.status.NOT_FOUND, details: 'User not found' });
    }
    callback(null, user);
  },

  listUsers: (call, callback) => {
    const page = call.request.page || 1;
    const limit = call.request.limit || 20;
    const offset = (page - 1) * limit;

    const userArray = Array.from(users.values());
    const paginatedUsers = userArray.slice(offset, offset + limit);

    callback(null, {
      users: paginatedUsers,
      total: userArray.length,
      page: page
    });
  },

  createUser: (call, callback) => {
    const id = String(userIdCounter++);
    const user = {
      id,
      email: call.request.email,
      first_name: call.request.first_name,
      last_name: call.request.last_name,
      role: call.request.role,
      created_at: Date.now(),
      updated_at: Date.now()
    };
    users.set(id, user);
    callback(null, user);
  },

  updateUser: (call, callback) => {
    const user = users.get(call.request.id);
    if (!user) {
      return callback({ code: grpc.status.NOT_FOUND, details: 'User not found' });
    }

    Object.assign(user, {
      email: call.request.email || user.email,
      first_name: call.request.first_name || user.first_name,
      last_name: call.request.last_name || user.last_name,
      updated_at: Date.now()
    });

    callback(null, user);
  },

  deleteUser: (call, callback) => {
    users.delete(call.request.id);
    callback(null, {});
  },

  streamUsers: (call) => {
    Array.from(users.values()).forEach(user => {
      call.write(user);
    });
    call.end();
  },

  bulkCreateUsers: (call, callback) => {
    const createdUsers = [];

    call.on('data', (request) => {
      const id = String(userIdCounter++);
      const user = {
        id,
        email: request.email,
        first_name: request.first_name,
        last_name: request.last_name,
        role: request.role,
        created_at: Date.now(),
        updated_at: Date.now()
      };
      users.set(id, user);
      createdUsers.push(user);
    });

    call.on('end', () => {
      callback(null, { users: createdUsers, total: createdUsers.length, page: 1 });
    });

    call.on('error', (err) => {
      callback(err);
    });
  }
};

const server = new grpc.Server();
server.addService(userProto.UserService.service, userServiceImpl);

server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
  console.log('gRPC server running on port 50051');
  server.start();
});

3. Python gRPC Server (grpcio)

3. Python gRPC服务器(grpcio)

python
import grpc
from concurrent import futures
import user_pb2
import user_pb2_grpc
from datetime import datetime

class UserServicer(user_pb2_grpc.UserServiceServicer):
    def __init__(self):
        self.users = {}
        self.user_counter = 1

    def GetUser(self, request, context):
        if request.id not in self.users:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details('User not found')
            return user_pb2.User()
        return self.users[request.id]

    def ListUsers(self, request, context):
        users_list = list(self.users.values())
        page = request.page or 1
        limit = request.limit or 20
        offset = (page - 1) * limit

        return user_pb2.ListUsersResponse(
            users=users_list[offset:offset + limit],
            total=len(users_list),
            page=page
        )

    def CreateUser(self, request, context):
        user_id = str(self.user_counter)
        self.user_counter += 1

        user = user_pb2.User(
            id=user_id,
            email=request.email,
            first_name=request.first_name,
            last_name=request.last_name,
            role=request.role,
            created_at=int(datetime.now().timestamp()),
            updated_at=int(datetime.now().timestamp())
        )
        self.users[user_id] = user
        return user

    def StreamUsers(self, request, context):
        for user in self.users.values():
            yield user

    def BulkCreateUsers(self, request_iterator, context):
        created_users = []
        for request in request_iterator:
            user = self.CreateUser(request, context)
            created_users.append(user)

        return user_pb2.ListUsersResponse(
            users=created_users,
            total=len(created_users),
            page=1
        )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    print('gRPC server running on port 50051')
    server.wait_for_termination()

if __name__ == '__main__':
    serve()
python
import grpc
from concurrent import futures
import user_pb2
import user_pb2_grpc
from datetime import datetime

class UserServicer(user_pb2_grpc.UserServiceServicer):
    def __init__(self):
        self.users = {}
        self.user_counter = 1

    def GetUser(self, request, context):
        if request.id not in self.users:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details('User not found')
            return user_pb2.User()
        return self.users[request.id]

    def ListUsers(self, request, context):
        users_list = list(self.users.values())
        page = request.page or 1
        limit = request.limit or 20
        offset = (page - 1) * limit

        return user_pb2.ListUsersResponse(
            users=users_list[offset:offset + limit],
            total=len(users_list),
            page=page
        )

    def CreateUser(self, request, context):
        user_id = str(self.user_counter)
        self.user_counter += 1

        user = user_pb2.User(
            id=user_id,
            email=request.email,
            first_name=request.first_name,
            last_name=request.last_name,
            role=request.role,
            created_at=int(datetime.now().timestamp()),
            updated_at=int(datetime.now().timestamp())
        )
        self.users[user_id] = user
        return user

    def StreamUsers(self, request, context):
        for user in self.users.values():
            yield user

    def BulkCreateUsers(self, request_iterator, context):
        created_users = []
        for request in request_iterator:
            user = self.CreateUser(request, context)
            created_users.append(user)

        return user_pb2.ListUsersResponse(
            users=created_users,
            total=len(created_users),
            page=1
        )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    print('gRPC server running on port 50051')
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

4. Client Implementation

4. 客户端实现

javascript
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');

const packageDef = protoLoader.loadSync(
  path.join(__dirname, 'user.proto')
);

const userProto = grpc.loadPackageDefinition(packageDef).user.service;
const client = new userProto.UserService('localhost:50051', grpc.credentials.createInsecure());

// Unary call
client.getUser({ id: '123' }, (err, user) => {
  if (err) console.error(err);
  console.log('User:', user);
});

// Server streaming
const stream = client.streamUsers({});
stream.on('data', (user) => {
  console.log('Received user:', user);
});
stream.on('end', () => {
  console.log('Stream ended');
});

// Client streaming
const writeStream = client.bulkCreateUsers((err, response) => {
  if (err) console.error(err);
  console.log('Created users:', response.users.length);
});

writeStream.write({ email: 'user1@example.com', first_name: 'John', last_name: 'Doe' });
writeStream.write({ email: 'user2@example.com', first_name: 'Jane', last_name: 'Smith' });
writeStream.end();
javascript
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');

const packageDef = protoLoader.loadSync(
  path.join(__dirname, 'user.proto')
);

const userProto = grpc.loadPackageDefinition(packageDef).user.service;
const client = new userProto.UserService('localhost:50051', grpc.credentials.createInsecure());

// 一元调用
client.getUser({ id: '123' }, (err, user) => {
  if (err) console.error(err);
  console.log('User:', user);
});

// 服务端流
const stream = client.streamUsers({});
stream.on('data', (user) => {
  console.log('Received user:', user);
});
stream.on('end', () => {
  console.log('Stream ended');
});

// 客户端流
const writeStream = client.bulkCreateUsers((err, response) => {
  if (err) console.error(err);
  console.log('Created users:', response.users.length);
});

writeStream.write({ email: 'user1@example.com', first_name: 'John', last_name: 'Doe' });
writeStream.write({ email: 'user2@example.com', first_name: 'Jane', last_name: 'Smith' });
writeStream.end();

Best Practices

最佳实践

✅ DO

✅ 建议

  • Use clear message and service naming
  • Implement proper error handling with gRPC status codes
  • Add metadata for logging and tracing
  • Version your protobuf definitions
  • Use streaming for large datasets
  • Implement timeouts and deadlines
  • Monitor gRPC metrics
  • 使用清晰的消息和服务命名
  • 结合gRPC状态码实现完善的错误处理
  • 添加元数据用于日志和追踪
  • 为protobuf定义添加版本标识
  • 对大型数据集使用流模式
  • 实现超时和截止时间
  • 监控gRPC指标

❌ DON'T

❌ 避免

  • Use gRPC for browser-based clients (use gRPC-Web)
  • Expose sensitive data in proto definitions
  • Create deeply nested messages
  • Ignore error status codes
  • Send uncompressed large payloads
  • Skip security with TLS in production
  • 为浏览器端客户端使用gRPC(请使用gRPC-Web)
  • 在proto定义中暴露敏感数据
  • 创建深度嵌套的消息
  • 忽略错误状态码
  • 发送未压缩的大型负载
  • 生产环境中跳过TLS安全配置

Deployment

部署

bash
undefined
bash
undefined

Generate protobuf code

生成protobuf代码

protoc --go_out=. --go-grpc_out=. *.proto
protoc --go_out=. --go-grpc_out=. *.proto

Compile Node.js gRPC server

编译Node.js gRPC服务器

npm install @grpc/grpc-js @grpc/proto-loader
npm install @grpc/grpc-js @grpc/proto-loader

Compile Python gRPC server

编译Python gRPC服务器

pip install grpcio grpcio-tools python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user.proto
undefined
pip install grpcio grpcio-tools python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user.proto
undefined