手撸RPC----解析请求、利用反射进行调用

手撸RPC----解析请求、利用反射进行调用

一、解析请求

基础的通讯的逻辑代码已经完成了,封装请求的代码如下,关于设计私有协议的内容我们在上一个课件中就讲述了,当然其中还包括对负载的序列化和压缩的代码逻辑:

将请求对象进行编码的代码如下,我们继承了MessageToByteEncoder类,而该类也是ChannelOutboundHandler的一个实现,用来处理出站逻辑:

/**

* 4B magic(魔数) --->yrpc.getBytes()

* 1B version(版本) ----> 1

* 2B header length 首部的长度

* 4B full length 报文总长度

* 1B serialize

* 1B compress

* 1B requestType

* 8B requestId

* body

* 出站时,第一个经过的处理器

* @author it楠老师

* @createTime 2023-07-02

*/

@Slf4j

public class YrpcResponseEncoder extends MessageToByteEncoder {

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, YrpcResponse yrpcResponse, ByteBuf byteBuf) throws Exception {

// 4个字节的魔数值

byteBuf.writeBytes(MessageFormatConstant.MAGIC);

// 1个字节的版本号

byteBuf.writeByte(MessageFormatConstant.VERSION);

// 2个字节的头部的长度

byteBuf.writeShort(MessageFormatConstant.HEADER_LENGTH);

// 总长度不清楚,不知道body的长度 writeIndex(写指针)

byteBuf.writerIndex(byteBuf.writerIndex() + MessageFormatConstant.FULL_FIELD_LENGTH);

// 3个类型

byteBuf.writeByte(yrpcResponse.getCode());

byteBuf.writeByte(yrpcResponse.getSerializeType());

byteBuf.writeByte(yrpcResponse.getCompressType());

// 8字节的请求id

byteBuf.writeLong(yrpcResponse.getRequestId());

byteBuf.writeLong(yrpcResponse.getTimeStamp());

// 1、对响应做序列化

byte[] body = null;

if(yrpcResponse.getBody() != null) {

Serializer serializer = SerializerFactory

.getSerializer(yrpcResponse.getSerializeType()).getImpl();

body = serializer.serialize(yrpcResponse.getBody());

// 2、压缩

Compressor compressor = CompressorFactory.getCompressor(

yrpcResponse.getCompressType()

).getImpl();

body = compressor.compress(body);

}

if(body != null){

byteBuf.writeBytes(body);

}

int bodyLength = body == null ? 0 : body.length;

// 重新处理报文的总长度

// 先保存当前的写指针的位置

int writerIndex = byteBuf.writerIndex();

// 将写指针的位置移动到总长度的位置上

byteBuf.writerIndex(MessageFormatConstant.MAGIC.length

+ MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH

);

byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + bodyLength);

// 将写指针归位

byteBuf.writerIndex(writerIndex);

if(log.isDebugEnabled()){

log.debug("响应【{}】已经在服务端完成编码工作。",yrpcResponse.getRequestId());

}

}

}

经过了这个处理器,我们的请求对象就变成了报文,发送给服务提供方,提供方需要解析报文,解析过程如下,其中需要对负载进行解压缩和反序列化:

public class RpcResponseDecoder extends LengthFieldBasedFrameDecoder {

public RpcResponseDecoder() {

super(

// 找到当前报文的总长度,截取报文,截取出来的报文我们可以去进行解析

// 最大帧的长度,超过这个maxFrameLength值会直接丢弃

MessageFormatConstant.MAX_FRAME_LENGTH,

// 长度的字段的偏移量,

MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH,

// 长度的字段的长度

MessageFormatConstant.FULL_FIELD_LENGTH,

// todo 负载的适配长度

-(MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH

+ MessageFormatConstant.HEADER_FIELD_LENGTH + MessageFormatConstant.FULL_FIELD_LENGTH),

0);

}

@Override

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

Object decode = super.decode(ctx, in);

if(decode instanceof ByteBuf byteBuf){

return decodeFrame(byteBuf);

}

return null;

}

private Object decodeFrame(ByteBuf byteBuf) {

// 1、解析魔数

byte[] magic = new byte[MessageFormatConstant.MAGIC.length];

byteBuf.readBytes(magic);

// 检测魔数是否匹配

for (int i = 0; i < magic.length; i++) {

if(magic[i] != MessageFormatConstant.MAGIC[i]){

throw new RuntimeException("The request obtained is not legitimate。");

}

}

// 2、解析版本号

byte version = byteBuf.readByte();

if(version > MessageFormatConstant.VERSION){

throw new RuntimeException("获得的请求版本不被支持。");

}

// 3、解析头部的长度

short headLength = byteBuf.readShort();

// 4、解析总长度

int fullLength = byteBuf.readInt();

// 5、请求类型

byte responseCode = byteBuf.readByte();

// 6、序列化类型

byte serializeType = byteBuf.readByte();

// 7、压缩类型

byte compressType = byteBuf.readByte();

// 8、请求id

long requestId = byteBuf.readLong();

// 9、时间戳

long timeStamp = byteBuf.readLong();

// 我们需要封装

RpcResponse rpcResponse = new RpcResponse();

rpcResponse.setCode(responseCode);

rpcResponse.setCompressType(compressType);

rpcResponse.setSerializeType(serializeType);

rpcResponse.setRequestId(requestId);

rpcResponse.setTimeStamp(timeStamp);

// todo 心跳请求没有负载,此处可以判断并直接返回

// if( requestType == RequestType.HEART_BEAT.getId()){

// return yrpcRequest;

// }

int bodyLength = fullLength - headLength;

byte[] payload = new byte[bodyLength];

byteBuf.readBytes(payload);

if(payload.length > 0) {

// 有了字节数组之后就可以解压缩,反序列化

// 1、解压缩

Compressor compressor = CompressorFactory.getCompressor(compressType).getImpl();

payload = compressor.decompress(payload);

// 2、反序列化

Serializer serializer = SerializerFactory

.getSerializer(yrpcResponse.getSerializeType()).getImpl();

Object body = serializer.deserialize(payload, Object.class);

yrpcResponse.setBody(body);

}

if(log.isDebugEnabled()){

log.debug("响应【{}】已经在调用端完成解码工作。",rpcResponse.getRequestId());

}

return rpcResponse;

}

}

二、使用反射完成方法调用

服务提供方通过解析请求报文,可以获得负载数据,并根据负载数据进行方法调用,负载内容如下:

public class RequestPayload implements Serializable {

// 1、接口的名字 -- com.ydlclass.HelloYrpc

private String interfaceName;

// 2、方法的名字 --sayHi

private String methodName;

// 3、参数列表,参数分为参数类型和具体的参数

// 参数类型用来确定重载方法,具体的参数用来执行方法调用

private Class[] parametersType; // -- {java.long.String}

private Object[] parametersValue; // -- "你好"

// 4、返回值的封装 -- {java.long.String}

private Class returnType;

}

其中包含接口的名字,方法名字,参数类型列表,和参数列表、返回值类型几个核心参数。当我们拥有了这些核心参数之后,我们应该思考,服务提供方应该如何使用这些数据进行方法调用,很明显,我们能直接想到的技术就是【反射】。

在这其中主要涉及两个过程:

提供方接受到请求之后整体的执行流程如下:

1、获取和调用方的连接(channel)。

2、从请求中获取请求所携带的负载。

3、根据负载进行方法调用。

4、写出结果(writeAndFlush)到调用方。

根据负载进行方法调用的过程,callTargetMethod(RequestPayload requestPayload)

1、从负载中获取核心参数。

2、从已经发布的服务中获取具体的实例。(在发布服务时,会将一个具体的实例进行缓存)

3、使用反射进行方法调用。

具体的代码实现如下:

public class MethodCallHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {

// 1、 获得通道

Channel channel = channelHandlerContext.channel();

// 2、获取负载内容

RequestPayload requestPayload = yrpcRequest.getRequestPayload();

// 3、根据负载内容进行方法调用

try {

Object result = callTargetMethod(requestPayload);

if (log.isDebugEnabled()) {

log.debug("请求【{}】已经在服务端完成方法调用。", yrpcRequest.getRequestId());

}

// 4、封装响应

RrpcResponse rpcResponse = new RpcResponse();

rpcResponse.setRequestId(rpcRequest.getRequestId());

rpcResponse.setCompressType(rpcRequest.getCompressType());

rpcResponse.setSerializeType(rpcRequest.getSerializeType());

rpcResponse.setCode(RespCode.SUCCESS.getCode());

rpcResponse.setBody(result);

} catch (Exception e){

log.error("编号为【{}】的请求在调用过程中发生异常。",rpcRequest.getRequestId(),e);

rpcResponse.setCode(RespCode.FAIL.getCode());

}

// 5、写出响应

channel.writeAndFlush(rpcResponse);

}

private Object callTargetMethod(RequestPayload requestPayload) {

String interfaceName = requestPayload.getInterfaceName();

String methodName = requestPayload.getMethodName();

Class[] parametersType = requestPayload.getParametersType();

Object[] parametersValue = requestPayload.getParametersValue();

// 寻找到匹配的暴露出去的具体的实现

ServiceConfig serviceConfig = RpcBootstrap.SERVERS_LIST.get(interfaceName);

Object refImpl = serviceConfig.getRef();

// 通过反射调用 1、获取方法对象 2、执行invoke方法

Object returnValue;

try {

Class aClass = refImpl.getClass();

Method method = aClass.getMethod(methodName, parametersType);

returnValue = method.invoke(refImpl, parametersValue);

} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {

log.error("调用服务【{}】的方法【{}】时发生了异常。", interfaceName, methodName, e);

throw new RuntimeException(e);

}

return returnValue;

}

}

相关推荐

Windows 11驱动备份全攻略:轻松保存,快速恢复,告别系统崩溃烦恼!
苹果怎么插耳机 苹果7耳机孔在什么位置?
365bet足球比

苹果怎么插耳机 苹果7耳机孔在什么位置?

📅 06-30 👁️ 2369
dnf传说灵魂快速获得方法-传说灵魂最新获得途径
声音魅力培训网
det365APP

声音魅力培训网

📅 07-12 👁️ 440
是
det365APP

是"暗度陈仓"还是"暗渡陈仓"哪个正确?

📅 07-03 👁️ 4038
2025年湖南新高考赋分规则:含计算公式、等级赋分对照表