diff --git a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java index d54f2da49..62eb13bef 100644 --- a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java +++ b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java @@ -33,6 +33,7 @@ import com.alipay.sofa.rpc.core.exception.SofaRouteException; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException; +import com.alipay.sofa.rpc.core.exception.SofaTimeOutException; import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback; import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; @@ -65,6 +66,7 @@ import static com.alipay.sofa.rpc.client.ProviderInfoAttrs.ATTR_TIMEOUT; import static com.alipay.sofa.rpc.common.RpcConfigs.getIntValue; +import static com.alipay.sofa.rpc.common.RpcConstants.CONFIG_KEY_DEADLINE_ENABLED; import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_INVOKE_TIMEOUT; /** @@ -609,6 +611,21 @@ protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport tran checkProviderVersion(providerInfo, request); // 根据服务端版本特殊处理 String invokeType = request.getInvokeType(); int timeout = resolveTimeout(request, consumerConfig, providerInfo); + + Long upStreamDeadlineTime = RpcInvokeContext.getContext().getDeadline(); + if (upStreamDeadlineTime != null) { + int remain = (int) (upStreamDeadlineTime - System.currentTimeMillis()); + if (remain > 0) { + timeout = Math.min(timeout, remain); + request.addRequestProp(RpcConstants.RPC_REQUEST_DEADLINE, remain); + } else { + throw new SofaTimeOutException("Deadline exceeded before sending request"); + } + } else if (Boolean.parseBoolean(consumerConfig.getParameter(CONFIG_KEY_DEADLINE_ENABLED))) { + // 如果启用了deadline机制,使用timeout值作为deadline进行透传 + request.addRequestProp(RpcConstants.RPC_REQUEST_DEADLINE, timeout); + } + request.setTimeout(timeout); SofaResponse response = null; // 同步调用 diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/common/RpcConstants.java b/core/api/src/main/java/com/alipay/sofa/rpc/common/RpcConstants.java index 13b261eba..b8e131cc1 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/common/RpcConstants.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/common/RpcConstants.java @@ -407,6 +407,11 @@ public class RpcConstants { */ public static final String CONFIG_KEY_TIMEOUT = "timeout"; + /** + * 配置key:deadlineEnabled + */ + public static final String CONFIG_KEY_DEADLINE_ENABLED = "deadlineEnabled"; + /** * 配置key:concurrents */ @@ -744,4 +749,9 @@ public class RpcConstants { public static final String INTERNAL_KEY_RPC_REQUEST_COMMAND = INTERNAL_KEY_PREFIX + "rpc_request_command"; + /** + * deadline time + */ + public static final String RPC_REQUEST_DEADLINE = "sofa_head_deadline"; + } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/ConsumerConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/ConsumerConfig.java index 2dfa5ef4c..4f3b15376 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/ConsumerConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/ConsumerConfig.java @@ -658,7 +658,7 @@ public String getRejectedExecutionPolicy() { /** * Sets rejected execution policy. * - * @param rejectedExecutionPolicy the rejected execution policy + * @param rejectedExecutionPolicy the rejected execution policy * @return the rejected execution policy */ public ConsumerConfig setRejectedExecutionPolicy(String rejectedExecutionPolicy) { diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java b/core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java index 40886f2bd..461dcb69c 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/context/RpcInvokeContext.java @@ -49,6 +49,10 @@ public class RpcInvokeContext { * 用户自定义超时时间,单次调用生效 */ protected Integer timeout; + /** + * 用户自定义的deadline时间 + */ + protected Long deadline; /** * 用户自定义对方地址,单次调用生效 */ @@ -186,6 +190,33 @@ public RpcInvokeContext setTimeout(Integer timeout) { return this; } + /** + * 得到deadline时间 + * + * @return deadline时间 + */ + public Long getDeadline() { + return deadline; + } + + /** + * 设置调用级别deadline时间 + * + * @param deadline deadline时间 + * @return 当前 + */ + public RpcInvokeContext setDeadline(Long deadline) { + this.deadline = deadline; + return this; + } + + /** + * 判断是否deadline超时,用户可以在业务执行时进行判断 + */ + public boolean isDeadlineTimeout() { + return deadline != null && System.currentTimeMillis() >= deadline; + } + /** * 设置一个调用上下文数据 * @@ -461,6 +492,7 @@ public String toString() { final StringBuilder sb = new StringBuilder(128); sb.append(super.toString()); sb.append("{timeout=").append(timeout); + sb.append(", deadline=").append(deadline); sb.append(", targetURL='").append(targetURL).append('\''); sb.append(", targetGroup='").append(targetGroup).append('\''); sb.append(", responseCallback=").append(responseCallback); diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java b/core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java index 9cf35d3f6..53db37211 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/filter/ProviderInvoker.java @@ -17,6 +17,7 @@ package com.alipay.sofa.rpc.filter; import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.context.RpcInternalContext; import com.alipay.sofa.rpc.context.RpcInvokeContext; @@ -33,6 +34,8 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import static com.alipay.sofa.rpc.common.RpcConstants.CONFIG_KEY_DEADLINE_ENABLED; + /** * 服务端调用业务实现类 * @@ -93,6 +96,15 @@ public SofaResponse invoke(SofaRequest request) throws SofaRpcException { long startTime = RpcRuntimeContext.now(); long bizStartTime = System.nanoTime(); RpcInvokeContext.getContext().put(RpcConstants.INTERNAL_KEY_PROVIDER_INVOKE_START_TIME_NANO, System.nanoTime()); + + // 在服务端配置中,检查是否启用deadline功能(默认启用,只有明确设置为"false"时才禁用) + String deadlineEnabled = providerConfig.getParameter(CONFIG_KEY_DEADLINE_ENABLED); + // 获取deadline时间 + Integer deadline = (Integer) request.getRequestProp(RpcConstants.RPC_REQUEST_DEADLINE); + if (!StringUtils.FALSE.equalsIgnoreCase(deadlineEnabled) && deadline != null) { + RpcInvokeContext.getContext().setDeadline(deadline + System.currentTimeMillis()); + } + try { // 反射 真正调用业务代码 Method method = request.getMethod(); diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/AbstractDeadlineChainTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/AbstractDeadlineChainTest.java new file mode 100644 index 000000000..1aaf327d0 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/AbstractDeadlineChainTest.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.deadline; + +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.MethodConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.core.exception.SofaTimeOutException; +import com.alipay.sofa.rpc.test.ActivelyDestroyTest; +import org.junit.Assert; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.alipay.sofa.rpc.common.RpcConstants.CONFIG_KEY_DEADLINE_ENABLED; + +/** + * Deadline 机制的调用链集成测试抽象基类 + * 调用链: Client -> ServiceA(3s) -> ServiceB(5s) -> ServiceC(5s) + */ +public abstract class AbstractDeadlineChainTest extends ActivelyDestroyTest { + private final AtomicBoolean isServiceCStarted = new AtomicBoolean(false); + + // 服务接口定义 + public interface ServiceA { + String processA(String message); + } + + public interface ServiceB { + String processB(String message); + } + + public interface ServiceC { + String processC(String message); + } + + // ServiceC 实现 - 最底层服务,模拟5秒处理时间 + public class ServiceCImpl implements ServiceC { + private volatile int processTime; + + public ServiceCImpl(int processTime) { + this.processTime = processTime; + } + + @Override + public String processC(String message) { + try { + isServiceCStarted.set(true); // 标记ServiceC已开始处理,移到开始时设置 + Thread.sleep(processTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("ServiceC interrupted", e); + } + return "ServiceC-" + message; + } + } + + // ServiceB 实现 - 中间层服务,模拟5秒处理时间并调用ServiceC + public class ServiceBImpl implements ServiceB { + private final int processTime; + private ServiceC serviceC; + + public ServiceBImpl(int processTime) { + this.processTime = processTime; + } + + public void setServiceC(ServiceC serviceC) { + this.serviceC = serviceC; + } + + @Override + public String processB(String message) { + try { + Thread.sleep(processTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("ServiceB interrupted", e); + } + // 调用下游服务C + String resultC = serviceC.processC(message); + return "ServiceB-" + resultC; + } + } + + // ServiceA 实现 - 上层服务,模拟3秒处理时间并调用ServiceB + public class ServiceAImpl implements ServiceA { + private final int processTime; + private ServiceB serviceB; + + public ServiceAImpl(int processTime) { + this.processTime = processTime; + } + + public void setServiceB(ServiceB serviceB) { + this.serviceB = serviceB; + } + + @Override + public String processA(String message) { + try { + Thread.sleep(processTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("ServiceA interrupted", e); + } + // 调用下游服务B + String resultB = serviceB.processB(message); + return "ServiceA-" + resultB; + } + } + + protected abstract String getProtocolType(); + + protected abstract int getBasePort(); + + protected void configureProvider(ProviderConfig providerConfig, ServerConfig serverConfig, + ApplicationConfig appConfig) { + } + + protected void configureConsumer(ConsumerConfig consumerConfig, String protocol, String url, + ApplicationConfig appConfig) { + } + + /** + * 执行Deadline调用链测试 + */ + protected void doTestDeadlineChain() throws InterruptedException { + int basePort = getBasePort(); + String protocol = getProtocolType(); + + // 声明需要清理的资源 + ProviderConfig providerC = null; + ProviderConfig providerB = null; + ProviderConfig providerA = null; + ConsumerConfig consumerConfigC = null; + ConsumerConfig consumerConfigB = null; + + try { + // 配置ServiceC + ServerConfig serverConfigC = new ServerConfig() + .setPort(basePort) + .setProtocol(protocol) + .setDaemon(true); + + providerC = new ProviderConfig() + .setInterfaceId(ServiceC.class.getName()) + .setRef(new ServiceCImpl(5000)) // 5秒处理时间 + .setServer(serverConfigC) + .setApplication(new ApplicationConfig().setAppName("serviceC")) + .setRegister(false); + + // 调用协议特定配置 + configureProvider(providerC, serverConfigC, new ApplicationConfig().setAppName("serviceC")); + providerC.export(); + + // 配置ServiceB + ServerConfig serverConfigB = new ServerConfig() + .setPort(basePort + 1) + .setProtocol(protocol) + .setDaemon(true); + + ServiceBImpl serviceBImpl = new ServiceBImpl(5000); // 5秒处理时间 + + // ServiceB调用ServiceC的客户端配置 + consumerConfigC = new ConsumerConfig() + .setInterfaceId(ServiceC.class.getName()) + .setTimeout(30000) + .setApplication(new ApplicationConfig().setAppName("serviceB")); + + // 调用协议特定配置 + String urlC = protocol + "://127.0.0.1:" + basePort; + configureConsumer(consumerConfigC, protocol, urlC, new ApplicationConfig().setAppName("serviceB")); + if (consumerConfigC.getDirectUrl() == null) { + consumerConfigC.setDirectUrl(urlC); + } + + ServiceC serviceCProxy = consumerConfigC.refer(); + serviceBImpl.setServiceC(serviceCProxy); + + providerB = new ProviderConfig() + .setInterfaceId(ServiceB.class.getName()) + .setRef(serviceBImpl) + .setServer(serverConfigB) + .setApplication(new ApplicationConfig().setAppName("serviceB")) + .setRegister(false); + + // 调用协议特定配置 + configureProvider(providerB, serverConfigB, new ApplicationConfig().setAppName("serviceB")); + providerB.export(); + + // 配置ServiceA + ServerConfig serverConfigA = new ServerConfig() + .setPort(basePort + 2) + .setProtocol(protocol) + .setDaemon(true); + + ServiceAImpl serviceAImpl = new ServiceAImpl(3000); // 3秒处理时间 + + // ServiceA调用ServiceB的客户端配置 + consumerConfigB = new ConsumerConfig() + .setInterfaceId(ServiceB.class.getName()) + .setTimeout(30000) + .setApplication(new ApplicationConfig().setAppName("serviceA")); + + // 调用协议特定配置 + String urlB = protocol + "://127.0.0.1:" + (basePort + 1); + configureConsumer(consumerConfigB, protocol, urlB, new ApplicationConfig().setAppName("serviceA")); + if (consumerConfigB.getDirectUrl() == null) { + consumerConfigB.setDirectUrl(urlB); + } + + ServiceB serviceBProxy = consumerConfigB.refer(); + serviceAImpl.setServiceB(serviceBProxy); + + providerA = new ProviderConfig() + .setInterfaceId(ServiceA.class.getName()) + .setRef(serviceAImpl) + .setServer(serverConfigA) + .setApplication(new ApplicationConfig().setAppName("serviceA")) + .setRegister(false); + + // 调用协议特定配置 + configureProvider(providerA, serverConfigA, new ApplicationConfig().setAppName("serviceA")); + providerA.export(); + + Thread.sleep(1000); // 等待服务启动 + + testDeadlineTimeout(protocol, basePort + 2); + + } finally { + // 清理资源 - 按创建的逆序清理 + if (providerA != null) { + try { + providerA.unExport(); + } catch (Exception e) { + // 记录异常但不抛出,确保其他资源能继续清理 + System.err.println("Failed to unexport providerA: " + e.getMessage()); + } + } + + if (providerB != null) { + try { + providerB.unExport(); + } catch (Exception e) { + System.err.println("Failed to unexport providerB: " + e.getMessage()); + } + } + + if (providerC != null) { + try { + providerC.unExport(); + } catch (Exception e) { + System.err.println("Failed to unexport providerC: " + e.getMessage()); + } + } + + // 清理消费者配置 + if (consumerConfigB != null) { + try { + consumerConfigB.unRefer(); + } catch (Exception e) { + System.err.println("Failed to unrefer consumerConfigB: " + e.getMessage()); + } + } + + if (consumerConfigC != null) { + try { + consumerConfigC.unRefer(); + } catch (Exception e) { + System.err.println("Failed to unrefer consumerConfigC: " + e.getMessage()); + } + } + } + } + + /** + * 测试deadline超时 + */ + private void testDeadlineTimeout(String protocol, int port) throws InterruptedException { + ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(ServiceA.class.getName()) + .setParameter(CONFIG_KEY_DEADLINE_ENABLED, "true") + .setTimeout(6000) + .setRepeatedReferLimit(-1) // 允许重复引用 + .setApplication(new ApplicationConfig().setAppName("client2")); + + // 调用协议特定配置 + String url = protocol + "://127.0.0.1:" + port; + configureConsumer(consumerConfig, protocol, url, new ApplicationConfig().setAppName("client2")); + if (consumerConfig.getDirectUrl() == null) { + consumerConfig.setDirectUrl(url); + } + + isServiceCStarted.set(false); + ServiceA serviceA = consumerConfig.refer(); + + boolean error = false; + try { + serviceA.processA("test-message"); + Assert.fail("Should throw timeout exception"); + } catch (Exception e) { + Assert.assertTrue(e instanceof SofaTimeOutException); + Thread.sleep(9000); + Assert.assertFalse(isServiceCStarted.get()); + error = true; + } + Assert.assertTrue(error); + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/BoltDeadlineChainTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/BoltDeadlineChainTest.java new file mode 100644 index 000000000..b92116715 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/BoltDeadlineChainTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.deadline; + +import com.alipay.sofa.rpc.common.RpcConstants; +import org.junit.Test; + +/** + * Bolt协议的Deadline调用链集成测试 + * + * @author GengZhang + */ +public class BoltDeadlineChainTest extends AbstractDeadlineChainTest { + + @Override + protected String getProtocolType() { + return RpcConstants.PROTOCOL_TYPE_BOLT; + } + + @Override + protected int getBasePort() { + return 22300; // Bolt协议使用22300-22302端口 + } + + @Test + public void testBoltDeadlineChain() throws InterruptedException { + doTestDeadlineChain(); + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/Http2DeadlineChainTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/Http2DeadlineChainTest.java new file mode 100644 index 000000000..61109da9c --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/Http2DeadlineChainTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.deadline; + +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import org.junit.Test; + +/** + * HTTP/2协议的Deadline调用链集成测试 + * HTTP/2协议支持标准的RPC调用链模式 + * + * @author GengZhang + */ +public class Http2DeadlineChainTest extends AbstractDeadlineChainTest { + + @Override + protected String getProtocolType() { + return RpcConstants.PROTOCOL_TYPE_H2C; + } + + @Override + protected int getBasePort() { + return 24300; // HTTP/2协议使用24300-24302端口 + } + + /** + * HTTP/2协议需要特殊的Consumer配置和URL前缀 + */ + @Override + protected void configureConsumer(ConsumerConfig consumerConfig, String protocol, String url, + ApplicationConfig appConfig) { + String h2cUrl = url.replace(protocol + "://", "h2c://"); + consumerConfig.setDirectUrl(h2cUrl) + .setProtocol(RpcConstants.PROTOCOL_TYPE_H2C); + } + + @Test + public void testHttp2DeadlineChain() throws InterruptedException { + doTestDeadlineChain(); + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/TripleDeadlineChainTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/TripleDeadlineChainTest.java new file mode 100644 index 000000000..bbff559ae --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/deadline/TripleDeadlineChainTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.deadline; + +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import org.junit.Test; + +/** + * Triple协议的Deadline调用链集成测试 + * 注意:Triple协议需要特殊的配置和URL前缀 + * + * @author GengZhang + */ +public class TripleDeadlineChainTest extends AbstractDeadlineChainTest { + + @Override + protected String getProtocolType() { + return RpcConstants.PROTOCOL_TYPE_TRIPLE; + } + + @Override + protected int getBasePort() { + return 25300; // Triple协议使用25300-25302端口 + } + + @Override + protected void configureProvider(ProviderConfig providerConfig, ServerConfig serverConfig, + ApplicationConfig appConfig) { + providerConfig.setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE); + } + + @Override + protected void configureConsumer(ConsumerConfig consumerConfig, String protocol, String url, + ApplicationConfig appConfig) { + String tripleUrl = url.replace(protocol + "://", "tri://"); + consumerConfig.setDirectUrl(tripleUrl) + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE); + } + + @Test + public void testTripleDeadlineChain() throws InterruptedException { + doTestDeadlineChain(); + } +}