网站友情链接如何做,开源网站 做镜像 如何做,拉新推广怎么做,用阿里云搭建WordPress1. 全景图#xff1a;从宏观到微观在钻入代码之前#xff0c;我们需要先在脑海中建立一张全景图。一次同步的 RPC 调用#xff0c;大致可以分为三个阶段#xff1a;消费端#xff08;Consumer#xff09;#xff1a; 动态代理 - 负载均衡 - 封装请求 - 编码…1. 全景图从宏观到微观在钻入代码之前我们需要先在脑海中建立一张全景图。一次同步的 RPC 调用大致可以分为三个阶段消费端Consumer动态代理 - 负载均衡 - 封装请求 - 编码发送 - 同步阻塞等待。服务端Provider解码接收 - 线程池派发 - 过滤器链 - 反射调用 - 封装响应 - 编码发送。消费端Consumer接收响应 - 唤醒等待线程 - 提取结果。本文将略过配置加载和服务发现细节通过核心链路代码将上述过程串联起来。2. 第一阶段消费端发起请求Consumer当我们代码中执行 demoService.sayHello(world) 时实际上是在调用 Dubbo 生成的代理对象。2.1 动态代理入口Dubbo 默认使用 Javassist 生成代理。所有的方法调用都会被转发到 InvokerInvocationHandler。源码位置org.apache.dubbo.rpc.proxy.InvokerInvocationHandlerpublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // ... 省略 Object 类方法的处理 // 将参数封装为 RpcInvocation RpcInvocation invocation new RpcInvocation(method, serviceModel, args); invocation.setTargetServiceUniqueName(invoker.getUrl().getServiceKey()); // invoker 是一层层包装的这里开始进入链式调用 return invoker.invoke(invocation).recreate(); }2.2 集群容错与负载均衡 (Cluster)这里的 invoker 对象通常是 MockClusterInvoker 包装下的 FailoverClusterInvoker默认故障转移策略。源码位置org.apache.dubbo.rpc.cluster.support.FailoverClusterInvokerpublic Result doInvoke(Invocation invocation, final ListInvokerT invokers, LoadBalance loadbalance) { // 1. 获取重试次数默认 retries2 (共调3次) int len getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) 1; for (int i 0; i len; i) { // 2. 负载均衡选择一个 Invoker (例如 DubboInvoker) InvokerT invoker select(loadbalance, invocation, copyInvokers, invoked); try { // 3. 执行调用 Result result invoker.invoke(invocation); return result; } catch (RpcException e) { // 发生异常循环继续即“重试” } } }2.3 过滤器链 (Filter Chain)在选定具体的 DubboInvoker 之前请求会经过一系列 Filter如 ConsumerContextFilter、MonitorFilter。这是通过 ProtocolFilterWrapper 构建的责任链模式。2.4 协议层发送 (Protocol)请求最终到达 DubboInvoker这里是 Dubbo 协议的核心。源码位置org.apache.dubbo.rpc.protocol.dubbo.DubboInvokerprotected Result doInvoke(final Invocation invocation) throws Throwable { // 获取 ExchangeClient (封装了 Netty Client) ExchangeClient currentClient clients[index.getAndIncrement() % clients.length]; // 区分单向调用、异步调用、同步调用 boolean isAsync RpcUtils.isAsync(getUrl(), invocation); boolean isOneway RpcUtils.isOneway(getUrl(), invocation); if (isOneway) { // 单向调用只发不回 currentClient.send(inv, getUrl().getMethodParameter(methodName, SENT_KEY, false)); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else if (isAsync) { // 异步调用 ResponseFuture future currentClient.request(inv, timeout); // ... 返回 Future } else { // 【核心重点】同步调用 (默认) RpcContext.getContext().setFuture(null); // 发送请求获得 DefaultFuture return (Result) currentClient.request(inv, timeout).get(); } }2.5 交换层与同步等待 (Exchange)currentClient.request 会调用 HeaderExchangeChannel.request。这是实现“同步转异步”的关键。源码位置org.apache.dubbo.remoting.exchange.support.DefaultFuture// 发送请求 public ResponseFuture request(Object request, int timeout) throws RemotingException { // 1. 创建请求对象自动生成全局唯一 Request ID Request req new Request(); req.setData(request); // 2. 创建 DefaultFuture映射关系Request ID - Future DefaultFuture future newDefaultFuture(channel, req, timeout); // 3. 通过 Netty 发送数据 channel.send(req); return future; }紧接着DubboInvoker 调用了 future.get()线程在此阻塞。// DefaultFuture.java public Object get(int timeout) throws RemotingException { // 使用 Condition.await 进行阻塞等待服务端响应唤醒 if (!done) { long start System.currentTimeMillis(); lock.lock(); try { while (!done) { done.await(timeout, TimeUnit.MILLISECONDS); // 超时检查逻辑... } } finally { lock.unlock(); } } return returnFromResponse(); }3. 第二阶段服务端处理请求Provider网络报文经过 TCP 传输到达服务端Netty 接收到字节流。3.1 线程派发 (Thread Model)Netty 的 IO 线程Worker Group负责解码解码后的消息会经过 AllChannelHandler默认策略将请求派发到 Dubbo 的业务线程池中去执行避免阻塞 IO 线程。源码位置org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandlerpublic void received(Channel channel, Object message) throws RemotingException { // 获取业务线程池 ExecutorService executor getExecutorService(); try { // 将请求包装成 ChannelEventRunnable 丢给线程池执行 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { // 线程池满的拒绝策略报错 } }3.2 交换层处理 (Exchange)业务线程拿到请求后层层传递到达 HeaderExchangeHandler.received。它区分这是请求Request还是响应Response。源码位置org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandlerpublic void received(Channel channel, Object message) { if (message instanceof Request) { handleRequest(channel, (Request) message); } else if (message instanceof Response) { handleResponse(channel, (Response) message); } } void handleRequest(Channel channel, Request req) { Response res new Response(req.getId()); // 继续调用后续 Handler (DubboProtocol) Object result handler.reply(channel, req.getData()); res.setResult(result); // 发送响应回客户端 channel.send(res); }3.3 协议层与反射调用 (Protocol)handler.reply 最终会调用到 DubboProtocol.requestHandler。在这里根据 ServiceKey 找到服务端暴露的 Exporter。源码位置org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol// 匿名内部类 requestHandler public CompletableFutureObject reply(ExchangeChannel channel, Object message) { Invocation inv (Invocation) message; // 查找 Exporter Invoker? invoker getInvoker(channel, inv); // 执行调用链 (Filter - 实现类) return invoker.invoke(inv); }最终JavassistProxyFactory 生成的 Wrapper 类会直接通过方法名调用你写的 ServiceImpl 代码。4. 第三阶段响应返回与唤醒Consumer服务端 channel.send(res) 将结果发回给消费端。4.1 响应接收消费端的 Netty IO 线程收到响应报文同样经过解码最终到达 HeaderExchangeHandler.received。这次走的是 handleResponse 分支。static void handleResponse(Channel channel, Response response) { // 核心调用 DefaultFuture.received DefaultFuture.received(channel, response); }4.2 唤醒线程Dubbo 怎么知道这个响应对应哪个请求Request ID。源码位置org.apache.dubbo.remoting.exchange.support.DefaultFuturepublic static void received(Channel channel, Response response) { // 1. 根据 Response 中的 ID 从 Map 中移除并获取对应的 Future DefaultFuture future FUTURES.remove(response.getId()); if (future ! null) { // 2. 触发唤醒逻辑 future.doReceived(response); } } private void doReceived(Response res) { lock.lock(); try { response res; done condition.signal(); // 唤醒之前阻塞在 get() 方法的线程 } finally { lock.unlock(); } }4.3 结果返回被唤醒的消费端线程从 get() 方法中苏醒拿到 response.getResult()经过动态代理层层返回最终你的 demoService.sayHello 拿到了返回值。5. 架构师总结回顾整个流程Dubbo 2.7.8 的核心设计精髓在于分层架构每一层Proxy, Cluster, Protocol, Exchange, Transport职责清晰互不干扰。异步转同步利用 DefaultFuture 和 Request ID 机制在 Netty 异步通讯的基础上实现了对上层业务的同步阻塞假象降低了开发复杂度。线程模型IO 线程与业务线程分离AllDispatcher保证了高并发下 Netty IO 的吞吐量防止业务逻辑阻塞网络读写。面试与调优建议超时问题往往发生在 DefaultFuture.get() 等不到 signal或者服务端线程池满导致无法及时处理 Request。线程池满重点关注 Provider 端的 AllChannelHandler 派发逻辑。序列化发生在 Netty 的 Codec 阶段是 CPU 密集型操作。欢迎关注、一起交流、一起进步