实践dubbo之rpc

Dubbo 2.7.13

consumer

ReferenceAnnotationBeanPostProcessor

1
2
3
4
5
6
7
InvokerInvocationHandler
MockClusterInvoker
ProtocolFilterWrapper
AsyncToSyncInvoker
DubboInvoker
HeaderExchangeChannel
NettyChannel

发送请求

同步、异步或Future之stack,异步不需要等待应答,直接应答null(客户端通过RpcContext.getContext().getCompletableFuture()获得结果)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
send:162, NettyChannel (org.apache.dubbo.remoting.transport.netty4)
send:178, AbstractClient (org.apache.dubbo.remoting.transport)
send:53, AbstractPeer (org.apache.dubbo.remoting.transport)
request:135, HeaderExchangeChannel (org.apache.dubbo.remoting.exchange.support.header)
request:95, HeaderExchangeClient (org.apache.dubbo.remoting.exchange.support.header)
request:91, ReferenceCountExchangeClient (org.apache.dubbo.rpc.protocol.dubbo)
doInvoke:100, DubboInvoker (org.apache.dubbo.rpc.protocol.dubbo)
invoke:162, AbstractInvoker (org.apache.dubbo.rpc.protocol)
// 开始发送请求
invoke:52, AsyncToSyncInvoker (org.apache.dubbo.rpc.protocol)
invoke:78, ListenerInvokerWrapper (org.apache.dubbo.rpc.listener)
invoke:89, MonitorFilter (org.apache.dubbo.monitor.support)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:51, FutureFilter (org.apache.dubbo.rpc.protocol.dubbo.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:55, ConsumerContextFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:56, InvokerWrapper (org.apache.dubbo.rpc.protocol)
doInvoke:82, FailoverClusterInvoker (org.apache.dubbo.rpc.cluster.support)
invoke:259, AbstractClusterInvoker (org.apache.dubbo.rpc.cluster.support)
intercept:47, ClusterInterceptor (org.apache.dubbo.rpc.cluster.interceptor)
invoke:92, AbstractCluster$InterceptorInvokerNode (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:82, MockClusterInvoker (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:74, InvokerInvocationHandler (org.apache.dubbo.rpc.proxy)
hello:-1, proxy0 (org.apache.dubbo.common.bytecode)
lambda$runner$0:28, DubboRegistryZooKeeperConsumerBootstrap (wang.ray.dubbo.spring.boot.starter.demo.consumer)
run:-1, 1863980798 (wang.ray.dubbo.spring.boot.starter.demo.consumer.DubboRegistryZooKeeperConsumerBootstrap$$Lambda$292)
callRunner:770, SpringApplication (org.springframework.boot)
callRunners:760, SpringApplication (org.springframework.boot)
run:318, SpringApplication (org.springframework.boot)
run:1213, SpringApplication (org.springframework.boot)
run:1202, SpringApplication (org.springframework.boot)
main:21, DubboRegistryZooKeeperConsumerBootstrap (wang.ray.dubbo.spring.boot.starter.demo.consumer)

oneway之stack,不需要等待应答,直接构造空的应答

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
send:162, NettyChannel (org.apache.dubbo.remoting.transport.netty4)
send:178, AbstractClient (org.apache.dubbo.remoting.transport)
doInvoke:95, DubboInvoker (org.apache.dubbo.rpc.protocol.dubbo)
invoke:162, AbstractInvoker (org.apache.dubbo.rpc.protocol)
// 开始发送请求
invoke:52, AsyncToSyncInvoker (org.apache.dubbo.rpc.protocol)
invoke:78, ListenerInvokerWrapper (org.apache.dubbo.rpc.listener)
invoke:89, MonitorFilter (org.apache.dubbo.monitor.support)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:51, FutureFilter (org.apache.dubbo.rpc.protocol.dubbo.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:55, ConsumerContextFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:56, InvokerWrapper (org.apache.dubbo.rpc.protocol)
doInvoke:82, FailoverClusterInvoker (org.apache.dubbo.rpc.cluster.support)
invoke:259, AbstractClusterInvoker (org.apache.dubbo.rpc.cluster.support)
intercept:47, ClusterInterceptor (org.apache.dubbo.rpc.cluster.interceptor)
invoke:92, AbstractCluster$InterceptorInvokerNode (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:82, MockClusterInvoker (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:74, InvokerInvocationHandler (org.apache.dubbo.rpc.proxy)
hello:-1, proxy0 (org.apache.dubbo.common.bytecode)
lambda$runner$0:28, DubboRegistryZooKeeperConsumerBootstrap (wang.ray.dubbo.spring.boot.starter.demo.consumer)
run:-1, 1863980798 (wang.ray.dubbo.spring.boot.starter.demo.consumer.DubboRegistryZooKeeperConsumerBootstrap$$Lambda$292)
callRunner:770, SpringApplication (org.springframework.boot)
callRunners:760, SpringApplication (org.springframework.boot)
run:318, SpringApplication (org.springframework.boot)
run:1213, SpringApplication (org.springframework.boot)
run:1202, SpringApplication (org.springframework.boot)
main:21, DubboRegistryZooKeeperConsumerBootstrap (wang.ray.dubbo.spring.boot.starter.demo.consumer)

接收应答

同步之stack,请求线程从queue里面获取ChannelEventRunnable并执行,然后获得返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// return responseFuture.get(timeout, unit);

// this.complete(res.getResult());
doReceived:208, DefaultFuture (org.apache.dubbo.remoting.exchange.support)
received:175, DefaultFuture (org.apache.dubbo.remoting.exchange.support)
received:163, DefaultFuture (org.apache.dubbo.remoting.exchange.support)
handleResponse:60, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:181, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:51, DecodeHandler (org.apache.dubbo.remoting.transport)
run:57, ChannelEventRunnable (org.apache.dubbo.remoting.transport.dispatcher)

waitAndDrain:93, ThreadlessExecutor (org.apache.dubbo.common.threadpool)
get:179, AsyncRpcResult (org.apache.dubbo.rpc)
// 开始处理应答,执行ChannelEventRunnable
invoke:61, AsyncToSyncInvoker (org.apache.dubbo.rpc.protocol)
invoke:78, ListenerInvokerWrapper (org.apache.dubbo.rpc.listener)
invoke:89, MonitorFilter (org.apache.dubbo.monitor.support)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:51, FutureFilter (org.apache.dubbo.rpc.protocol.dubbo.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:55, ConsumerContextFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:56, InvokerWrapper (org.apache.dubbo.rpc.protocol)
doInvoke:82, FailoverClusterInvoker (org.apache.dubbo.rpc.cluster.support)
invoke:259, AbstractClusterInvoker (org.apache.dubbo.rpc.cluster.support)
intercept:47, ClusterInterceptor (org.apache.dubbo.rpc.cluster.interceptor)
invoke:92, AbstractCluster$InterceptorInvokerNode (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:82, MockClusterInvoker (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:74, InvokerInvocationHandler (org.apache.dubbo.rpc.proxy)
hello:-1, proxy0 (org.apache.dubbo.common.bytecode)
lambda$runner$0:28, DubboRegistryZooKeeperConsumerBootstrap (wang.ray.dubbo.spring.boot.starter.demo.consumer)
run:-1, 1465263219 (wang.ray.dubbo.spring.boot.starter.demo.consumer.DubboRegistryZooKeeperConsumerBootstrap$$Lambda$292)
callRunner:770, SpringApplication (org.springframework.boot)
callRunners:760, SpringApplication (org.springframework.boot)
run:318, SpringApplication (org.springframework.boot)
run:1213, SpringApplication (org.springframework.boot)
run:1202, SpringApplication (org.springframework.boot)
main:21, DubboRegistryZooKeeperConsumerBootstrap (wang.ray.dubbo.spring.boot.starter.demo.consumer)

Netty接收(NettyClientWorker-1-1)应答(同步),构建ChannelEventRunnable,提交给线程池(ThreadlessExecutor,其实是请求线程)处理,其实是放到queue,即结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// queue.add(runnable);
execute:134, ThreadlessExecutor (org.apache.dubbo.common.threadpool)
<init>:46, ChannelEventRunnable (org.apache.dubbo.remoting.transport.dispatcher)
<init>:38, ChannelEventRunnable (org.apache.dubbo.remoting.transport.dispatcher)
// 新建
received:62, AllChannelHandler (org.apache.dubbo.remoting.transport.dispatcher.all)
received:90, HeartbeatHandler (org.apache.dubbo.remoting.exchange.support.header)
received:43, MultiMessageHandler (org.apache.dubbo.remoting.transport)
received:147, AbstractPeer (org.apache.dubbo.remoting.transport)
channelRead:83, NettyClientHandler (org.apache.dubbo.remoting.transport.netty4)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:286, IdleStateHandler (io.netty.handler.timeout)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:310, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:284, ByteToMessageDecoder (io.netty.handler.codec)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:1434, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:965, DefaultChannelPipeline (io.netty.channel)
read:163, AbstractNioByteChannel$NioByteUnsafe (io.netty.channel.nio)
processSelectedKey:647, NioEventLoop (io.netty.channel.nio)
processSelectedKeysOptimized:582, NioEventLoop (io.netty.channel.nio)
processSelectedKeys:499, NioEventLoop (io.netty.channel.nio)
run:461, NioEventLoop (io.netty.channel.nio)
run:884, SingleThreadEventExecutor$5 (io.netty.util.concurrent)
run:30, FastThreadLocalRunnable (io.netty.util.concurrent)
run:748, Thread (java.lang)

Netty接收(NettyClientWorker-1-1)应答(异步或Future),构建ChannelEventRunnable,提交给Dubbo线程池(DubboClientHandler-192.168.0.134:20880-thread-*)中处理,即结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// return responseFuture.get(timeout, unit);

// this.complete(res.getResult());
doReceived:208, DefaultFuture (org.apache.dubbo.remoting.exchange.support)
received:175, DefaultFuture (org.apache.dubbo.remoting.exchange.support)
received:163, DefaultFuture (org.apache.dubbo.remoting.exchange.support)
handleResponse:60, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:181, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:51, DecodeHandler (org.apache.dubbo.remoting.transport)
run:57, ChannelEventRunnable (org.apache.dubbo.remoting.transport.dispatcher)

execute:1343, ThreadPoolExecutor (java.util.concurrent)
received:62, AllChannelHandler (org.apache.dubbo.remoting.transport.dispatcher.all)
received:90, HeartbeatHandler (org.apache.dubbo.remoting.exchange.support.header)
received:43, MultiMessageHandler (org.apache.dubbo.remoting.transport)
received:147, AbstractPeer (org.apache.dubbo.remoting.transport)
channelRead:83, NettyClientHandler (org.apache.dubbo.remoting.transport.netty4)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:286, IdleStateHandler (io.netty.handler.timeout)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:310, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:284, ByteToMessageDecoder (io.netty.handler.codec)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:1434, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:965, DefaultChannelPipeline (io.netty.channel)
read:163, AbstractNioByteChannel$NioByteUnsafe (io.netty.channel.nio)
processSelectedKey:647, NioEventLoop (io.netty.channel.nio)
processSelectedKeysOptimized:582, NioEventLoop (io.netty.channel.nio)
processSelectedKeys:499, NioEventLoop (io.netty.channel.nio)
run:461, NioEventLoop (io.netty.channel.nio)
run:884, SingleThreadEventExecutor$5 (io.netty.util.concurrent)
run:30, FastThreadLocalRunnable (io.netty.util.concurrent)
run:748, Thread (java.lang)

AsyncToSyncInvoker.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult = invoker.invoke(invocation);

        try {
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                /**
                 * NOTICE!
                 * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
                 * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
                 */
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {
            throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof TimeoutException) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else if (t instanceof RemotingException) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        } catch (Throwable e) {
            throw new RpcException(e.getMessage(), e);
        }
        return asyncResult;
    }

AbstractInvoker.java

1
2
3
4
5
6
7
8
    protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
        ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
        if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
            return new ThreadlessExecutor(sharedExecutor);
        } else {
            return sharedExecutor;
        }
    }

DubboInvoker.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

ThreadlessExecutor.java

1

RpcUtils.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static InvokeMode getInvokeMode(URL url, Invocation inv) {
        if (isReturnTypeFuture(inv)) {
            // future应答模式(接口返回值类型为CompletableFuture或者异步泛化调用($invokeAsync))
            return InvokeMode.FUTURE;
        } else if (isAsync(url, inv)) {
            // 异步模式(async=true)
            return InvokeMode.ASYNC;
        } else {
            // 同步模式
            return InvokeMode.SYNC;
        }
    }

// return=false代表oneway,不需要等待应答
public static boolean isOneway(URL url, Invocation inv) {
        boolean isOneway;
        if (Boolean.FALSE.toString().equals(inv.getAttachment(RETURN_KEY))) {
            isOneway = true;
        } else {
            isOneway = !url.getMethodParameter(getMethodName(inv), RETURN_KEY, true);
        }
        return isOneway;
    }

DefaultFuture.java

1
2
3
4
5
6
// key:请求ID(Request/Response),value:Channel
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();

// key:请求ID(Request/Response),value:DefaultFuture
// 异步转同步(AsyncToSyncInvoker)时基于这个匹配请求和响应
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

AsyncRpcResult.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// oneway则直接设置空的应答
public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
        CompletableFuture<AppResponse> future = new CompletableFuture<>();
        AppResponse result = new AppResponse();
        if (t != null) {
            result.setException(t);
        } else {
            result.setValue(value);
        }
        future.complete(result);
        return new AsyncRpcResult(future, invocation);
    }

@Override
    public Object recreate() throws Throwable {
        RpcInvocation rpcInvocation = (RpcInvocation) invocation;
        // future模式返回CompleteFuture
        if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
            return RpcContext.getContext().getFuture();
        }
		// 其他模式返回直接结果,Async模式返回null
        return getAppResponse().recreate();
    }

public Result getAppResponse() {
        try {
            if (responseFuture.isDone()) {
                return responseFuture.get();
            }
        } catch (Exception e) {
            // This should not happen in normal request process;
            logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
            throw new RpcException(e);
        }

        return createDefaultValue(invocation);
    }

private static Result createDefaultValue(Invocation invocation) {
        ConsumerMethodModel method = (ConsumerMethodModel) invocation.get(Constants.METHOD_MODEL);
        return method != null ? new AppResponse(defaultReturn(method.getReturnClass())) : new AppResponse();
    }

AppResponse.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public Object recreate() throws Throwable {
        if (exception != null) {
            // fix issue#619
            try {
                // get Throwable class
                Class clazz = exception.getClass();
                while (!clazz.getName().equals(Throwable.class.getName())) {
                    clazz = clazz.getSuperclass();
                }
                // get stackTrace value
                Field stackTraceField = clazz.getDeclaredField("stackTrace");
                stackTraceField.setAccessible(true);
                Object stackTrace = stackTraceField.get(exception);
                if (stackTrace == null) {
                    exception.setStackTrace(new StackTraceElement[0]);
                }
            } catch (Exception e) {
                // ignore
            }
            throw exception;
        }
        return result;
    }

NettyClientHandler.java

创建DubboClientHandler线程池

用于应答处理,接管IO线程后续的任务。

默认线程池是CachedThreadPool,可以通过dubbo.consumer.threadpool自定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
getExecutor:53, CachedThreadPool (org.apache.dubbo.common.threadpool.support.cached)
getExecutor:-1, ThreadPool$Adaptive (org.apache.dubbo.common.threadpool)
createExecutor:163, DefaultExecutorRepository (org.apache.dubbo.common.threadpool.manager)
lambda$createExecutorIfAbsent$1:80, DefaultExecutorRepository (org.apache.dubbo.common.threadpool.manager)
apply:-1, 568462037 (org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository$$Lambda$286)
computeIfAbsent:1660, ConcurrentHashMap (java.util.concurrent)
createExecutorIfAbsent:80, DefaultExecutorRepository (org.apache.dubbo.common.threadpool.manager)
initExecutor:94, AbstractClient (org.apache.dubbo.remoting.transport)
<init>:59, AbstractClient (org.apache.dubbo.remoting.transport)
<init>:82, NettyClient (org.apache.dubbo.remoting.transport.netty4)
connect:40, NettyTransporter (org.apache.dubbo.remoting.transport.netty4)
connect:-1, Transporter$Adaptive (org.apache.dubbo.remoting)
connect:75, Transporters (org.apache.dubbo.remoting)
connect:39, HeaderExchanger (org.apache.dubbo.remoting.exchange.support.header)
connect:109, Exchangers (org.apache.dubbo.remoting.exchange)
initClient:592, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
buildReferenceCountExchangeClient:560, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
buildReferenceCountExchangeClientList:547, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
getSharedClient:472, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
getClients:427, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
protocolBindingRefer:404, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
refer:104, AbstractProtocol (org.apache.dubbo.rpc.protocol)
refer:72, ProtocolListenerWrapper (org.apache.dubbo.rpc.protocol)
refer:75, QosProtocolWrapper (org.apache.dubbo.qos.protocol)
refer:161, ProtocolFilterWrapper (org.apache.dubbo.rpc.protocol)
refer:-1, Protocol$Adaptive (org.apache.dubbo.rpc)
toInvokers:455, RegistryDirectory (org.apache.dubbo.registry.integration)
refreshInvoker:312, RegistryDirectory (org.apache.dubbo.registry.integration)
refreshOverrideAndInvoker:271, RegistryDirectory (org.apache.dubbo.registry.integration)
notify:254, RegistryDirectory (org.apache.dubbo.registry.integration)
notify:426, AbstractRegistry (org.apache.dubbo.registry.support)
doNotify:373, FailbackRegistry (org.apache.dubbo.registry.support)
notify:364, FailbackRegistry (org.apache.dubbo.registry.support)
doSubscribe:180, ZookeeperRegistry (org.apache.dubbo.registry.zookeeper)
subscribe:299, FailbackRegistry (org.apache.dubbo.registry.support)
subscribe:105, ListenerRegistryWrapper (org.apache.dubbo.registry)
subscribe:185, RegistryDirectory (org.apache.dubbo.registry.integration)
doRefer:429, RegistryProtocol (org.apache.dubbo.registry.integration)
refer:410, RegistryProtocol (org.apache.dubbo.registry.integration)
refer:70, ProtocolListenerWrapper (org.apache.dubbo.rpc.protocol)
refer:73, QosProtocolWrapper (org.apache.dubbo.qos.protocol)
refer:159, ProtocolFilterWrapper (org.apache.dubbo.rpc.protocol)
refer:-1, Protocol$Adaptive (org.apache.dubbo.rpc)
createProxy:318, ReferenceConfig (org.apache.dubbo.config)
init:258, ReferenceConfig (org.apache.dubbo.config)
get:158, ReferenceConfig (org.apache.dubbo.config)
getOrCreateProxy:274, ReferenceAnnotationBeanPostProcessor (org.apache.dubbo.config.spring.beans.factory.annotation)
doGetInjectedBean:143, ReferenceAnnotationBeanPostProcessor (org.apache.dubbo.config.spring.beans.factory.annotation)
getInjectedObject:359, AbstractAnnotationBeanPostProcessor (com.alibaba.spring.beans.factory.annotation)
inject:539, AbstractAnnotationBeanPostProcessor$AnnotatedFieldElement (com.alibaba.spring.beans.factory.annotation)
inject:90, InjectionMetadata (org.springframework.beans.factory.annotation)
postProcessPropertyValues:142, AbstractAnnotationBeanPostProcessor (com.alibaba.spring.beans.factory.annotation)
populateBean:1416, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
doCreateBean:592, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
createBean:515, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
lambda$doGetBean$0:320, AbstractBeanFactory (org.springframework.beans.factory.support)
getObject:-1, 1651667865 (org.springframework.beans.factory.support.AbstractBeanFactory$$Lambda$105)
getSingleton:222, DefaultSingletonBeanRegistry (org.springframework.beans.factory.support)
doGetBean:318, AbstractBeanFactory (org.springframework.beans.factory.support)
getBean:199, AbstractBeanFactory (org.springframework.beans.factory.support)
preInstantiateSingletons:845, DefaultListableBeanFactory (org.springframework.beans.factory.support)
finishBeanFactoryInitialization:877, AbstractApplicationContext (org.springframework.context.support)
refresh:549, AbstractApplicationContext (org.springframework.context.support)
refresh:742, SpringApplication (org.springframework.boot)
refreshContext:389, SpringApplication (org.springframework.boot)
run:311, SpringApplication (org.springframework.boot)
run:1213, SpringApplication (org.springframework.boot)
run:1202, SpringApplication (org.springframework.boot)
main:28, DubboRegistryZooKeeperConsumerBootstrap (wang.ray.dubbo.spring.boot.starter.demo.consumer)

provider

Netty接收请求

由Netty的IO线程池(NettyServerWorker-3-2)处理,构建ChannelEventRunnable,提交由Dubbo线程池执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
execute:1343, ThreadPoolExecutor (java.util.concurrent)
received:62, AllChannelHandler (org.apache.dubbo.remoting.transport.dispatcher.all)
received:90, HeartbeatHandler (org.apache.dubbo.remoting.exchange.support.header)
received:43, MultiMessageHandler (org.apache.dubbo.remoting.transport)
received:147, AbstractPeer (org.apache.dubbo.remoting.transport)
channelRead:98, NettyServerHandler (org.apache.dubbo.remoting.transport.netty4)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:286, IdleStateHandler (io.netty.handler.timeout)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:310, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:284, ByteToMessageDecoder (io.netty.handler.codec)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:1434, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:965, DefaultChannelPipeline (io.netty.channel)
read:163, AbstractNioByteChannel$NioByteUnsafe (io.netty.channel.nio)
processSelectedKey:647, NioEventLoop (io.netty.channel.nio)
processSelectedKeysOptimized:582, NioEventLoop (io.netty.channel.nio)
processSelectedKeys:499, NioEventLoop (io.netty.channel.nio)
run:461, NioEventLoop (io.netty.channel.nio)
run:884, SingleThreadEventExecutor$5 (io.netty.util.concurrent)
run:30, FastThreadLocalRunnable (io.netty.util.concurrent)
run:748, Thread (java.lang)

Dubbo线程池业务处理

DubboServerHandler-192.168.1.134:20880-thread-2

处理请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
hello:13, SampleRemoteServiceImpl (wang.ray.dubbo.spring.boot.starter.demo.provider.service)
invokeMethod:-1, Wrapper1 (org.apache.dubbo.common.bytecode)
doInvoke:47, JavassistProxyFactory$1 (org.apache.dubbo.rpc.proxy.javassist)
invoke:84, AbstractProxyInvoker (org.apache.dubbo.rpc.proxy)
invoke:56, DelegateProviderMetaDataInvoker (org.apache.dubbo.config.invoker)
invoke:56, InvokerWrapper (org.apache.dubbo.rpc.protocol)
invoke:52, ExceptionFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:89, MonitorFilter (org.apache.dubbo.monitor.support)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:44, TimeoutFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:77, TraceFilter (org.apache.dubbo.rpc.protocol.dubbo.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:118, ContextFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:152, GenericFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:38, ClassLoaderFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:41, EchoFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
reply:145, DubboProtocol$1 (org.apache.dubbo.rpc.protocol.dubbo)

handleRequest:100, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:175, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:51, DecodeHandler (org.apache.dubbo.remoting.transport)
run:57, ChannelEventRunnable (org.apache.dubbo.remoting.transport.dispatcher)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)

应答响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
send:157, NettyChannel (org.apache.dubbo.remoting.transport.netty4)
send:98, HeaderExchangeChannel (org.apache.dubbo.remoting.exchange.support.header)
send:87, HeaderExchangeChannel (org.apache.dubbo.remoting.exchange.support.header)
lambda$handleRequest$0:110, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
accept:-1, 1103383837 (org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler$$Lambda$361)
uniWhenComplete:760, CompletableFuture (java.util.concurrent)
uniWhenCompleteStage:778, CompletableFuture (java.util.concurrent)
whenComplete:2140, CompletableFuture (java.util.concurrent)
whenComplete:110, CompletableFuture (java.util.concurrent)

handleRequest:101, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:175, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:51, DecodeHandler (org.apache.dubbo.remoting.transport)
run:57, ChannelEventRunnable (org.apache.dubbo.remoting.transport.dispatcher)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)

创建DubboServerHandler线程池

默认线程池是FixedThreadPool,可以通过dubbo.protocol.threadpool自定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
getExecutor:47, FixedThreadPool (org.apache.dubbo.common.threadpool.support.fixed)
getExecutor:-1, ThreadPool$Adaptive (org.apache.dubbo.common.threadpool)
createExecutor:163, DefaultExecutorRepository (org.apache.dubbo.common.threadpool.manager)
lambda$createExecutorIfAbsent$1:80, DefaultExecutorRepository (org.apache.dubbo.common.threadpool.manager)
apply:-1, 1004390006 (org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository$$Lambda$327)
computeIfAbsent:1660, ConcurrentHashMap (java.util.concurrent)
createExecutorIfAbsent:80, DefaultExecutorRepository (org.apache.dubbo.common.threadpool.manager)
<init>:79, AbstractServer (org.apache.dubbo.remoting.transport)
<init>:77, NettyServer (org.apache.dubbo.remoting.transport.netty4)
bind:35, NettyTransporter (org.apache.dubbo.remoting.transport.netty4)
bind:-1, Transporter$Adaptive (org.apache.dubbo.remoting)
bind:56, Transporters (org.apache.dubbo.remoting)
bind:44, HeaderExchanger (org.apache.dubbo.remoting.exchange.support.header)
bind:70, Exchangers (org.apache.dubbo.remoting.exchange)
createServer:346, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
openServer:320, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
export:303, DubboProtocol (org.apache.dubbo.rpc.protocol.dubbo)
export:62, ProtocolListenerWrapper (org.apache.dubbo.rpc.protocol)
export:153, ProtocolFilterWrapper (org.apache.dubbo.rpc.protocol)
export:66, QosProtocolWrapper (org.apache.dubbo.qos.protocol)
export:-1, Protocol$Adaptive (org.apache.dubbo.rpc)
lambda$doLocalExport$2:244, RegistryProtocol (org.apache.dubbo.registry.integration)
apply:-1, 69160933 (org.apache.dubbo.registry.integration.RegistryProtocol$$Lambda$325)
computeIfAbsent:1660, ConcurrentHashMap (java.util.concurrent)
doLocalExport:242, RegistryProtocol (org.apache.dubbo.registry.integration)
export:199, RegistryProtocol (org.apache.dubbo.registry.integration)
export:60, ProtocolListenerWrapper (org.apache.dubbo.rpc.protocol)
export:151, ProtocolFilterWrapper (org.apache.dubbo.rpc.protocol)
export:64, QosProtocolWrapper (org.apache.dubbo.qos.protocol)
export:-1, Protocol$Adaptive (org.apache.dubbo.rpc)
doExportUrlsFor1Protocol:492, ServiceConfig (org.apache.dubbo.config)
doExportUrls:325, ServiceConfig (org.apache.dubbo.config)
doExport:300, ServiceConfig (org.apache.dubbo.config)
export:206, ServiceConfig (org.apache.dubbo.config)
lambda$exportServices$15:917, DubboBootstrap (org.apache.dubbo.config.bootstrap)
accept:-1, 607526140 (org.apache.dubbo.config.bootstrap.DubboBootstrap$$Lambda$313)
forEach:981, HashMap$Values (java.util)
exportServices:905, DubboBootstrap (org.apache.dubbo.config.bootstrap)
start:745, DubboBootstrap (org.apache.dubbo.config.bootstrap)
onContextRefreshedEvent:59, DubboBootstrapApplicationListener (org.apache.dubbo.config.spring.context)
onApplicationContextEvent:52, DubboBootstrapApplicationListener (org.apache.dubbo.config.spring.context)
onApplicationEvent:40, OneTimeExecutionApplicationContextEventListener (org.apache.dubbo.config.spring.context)
doInvokeListener:172, SimpleApplicationEventMulticaster (org.springframework.context.event)
invokeListener:165, SimpleApplicationEventMulticaster (org.springframework.context.event)
multicastEvent:139, SimpleApplicationEventMulticaster (org.springframework.context.event)
publishEvent:402, AbstractApplicationContext (org.springframework.context.support)
publishEvent:359, AbstractApplicationContext (org.springframework.context.support)
finishRefresh:896, AbstractApplicationContext (org.springframework.context.support)
refresh:552, AbstractApplicationContext (org.springframework.context.support)
refresh:742, SpringApplication (org.springframework.boot)
refreshContext:389, SpringApplication (org.springframework.boot)
run:311, SpringApplication (org.springframework.boot)
run:139, SpringApplicationBuilder (org.springframework.boot.builder)
main:19, DubboRegistryZooKeeperProviderBootstrap (wang.ray.dubbo.spring.boot.starter.demo.provider)

ServiceAnnotationBeanPostProcessor