Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.alipay.sofa.rpc.core.exception.SofaRouteException;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
Expand Down Expand Up @@ -65,6 +66,7 @@

import static com.alipay.sofa.rpc.client.ProviderInfoAttrs.ATTR_TIMEOUT;
import static com.alipay.sofa.rpc.common.RpcConfigs.getIntValue;
import static com.alipay.sofa.rpc.common.RpcConstants.CONFIG_KEY_DEADLINE_ENABLED;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_INVOKE_TIMEOUT;

/**
Expand Down Expand Up @@ -609,6 +611,21 @@ protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport tran
checkProviderVersion(providerInfo, request); // 根据服务端版本特殊处理
String invokeType = request.getInvokeType();
int timeout = resolveTimeout(request, consumerConfig, providerInfo);

Long upStreamDeadlineTime = RpcInvokeContext.getContext().getDeadline();
if (upStreamDeadlineTime != null) {
int remain = (int) (upStreamDeadlineTime - System.currentTimeMillis());
if (remain > 0) {
timeout = Math.min(timeout, remain);
request.addRequestProp(RpcConstants.RPC_REQUEST_DEADLINE, remain);
} else {
throw new SofaTimeOutException("Deadline exceeded before sending request");
}
} else if (Boolean.parseBoolean(consumerConfig.getParameter(CONFIG_KEY_DEADLINE_ENABLED))) {
// 如果启用了deadline机制,使用timeout值作为deadline进行透传
request.addRequestProp(RpcConstants.RPC_REQUEST_DEADLINE, timeout);
}

request.setTimeout(timeout);
SofaResponse response = null;
// 同步调用
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,11 @@ public class RpcConstants {
*/
public static final String CONFIG_KEY_TIMEOUT = "timeout";

/**
* 配置key:deadlineEnabled
*/
public static final String CONFIG_KEY_DEADLINE_ENABLED = "deadlineEnabled";

/**
* 配置key:concurrents
*/
Expand Down Expand Up @@ -744,4 +749,9 @@ public class RpcConstants {
public static final String INTERNAL_KEY_RPC_REQUEST_COMMAND = INTERNAL_KEY_PREFIX +
"rpc_request_command";

/**
* deadline time
*/
public static final String RPC_REQUEST_DEADLINE = "sofa_head_deadline";

}
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ public String getRejectedExecutionPolicy() {
/**
* Sets rejected execution policy.
*
* @param rejectedExecutionPolicy the rejected execution policy
* @param rejectedExecutionPolicy the rejected execution policy
* @return the rejected execution policy
*/
public ConsumerConfig<T> setRejectedExecutionPolicy(String rejectedExecutionPolicy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public class RpcInvokeContext {
* 用户自定义超时时间,单次调用生效
*/
protected Integer timeout;
/**
* 用户自定义的deadline时间
*/
protected Long deadline;
/**
* 用户自定义对方地址,单次调用生效
*/
Expand Down Expand Up @@ -186,6 +190,33 @@ public RpcInvokeContext setTimeout(Integer timeout) {
return this;
}

/**
* 得到deadline时间
*
* @return deadline时间
*/
public Long getDeadline() {
return deadline;
}

/**
* 设置调用级别deadline时间
*
* @param deadline deadline时间
* @return 当前
*/
public RpcInvokeContext setDeadline(Long deadline) {
this.deadline = deadline;
return this;
}

/**
* 判断是否deadline超时,用户可以在业务执行时进行判断
*/
public boolean isDeadlineTimeout() {
return deadline != null && System.currentTimeMillis() >= deadline;
}

/**
* 设置一个调用上下文数据
*
Expand Down Expand Up @@ -461,6 +492,7 @@ public String toString() {
final StringBuilder sb = new StringBuilder(128);
sb.append(super.toString());
sb.append("{timeout=").append(timeout);
sb.append(", deadline=").append(deadline);
sb.append(", targetURL='").append(targetURL).append('\'');
sb.append(", targetGroup='").append(targetGroup).append('\'');
sb.append(", responseCallback=").append(responseCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.rpc.filter;

import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
Expand All @@ -33,6 +34,8 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import static com.alipay.sofa.rpc.common.RpcConstants.CONFIG_KEY_DEADLINE_ENABLED;

/**
* 服务端调用业务实现类
*
Expand Down Expand Up @@ -93,6 +96,15 @@ public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
long startTime = RpcRuntimeContext.now();
long bizStartTime = System.nanoTime();
RpcInvokeContext.getContext().put(RpcConstants.INTERNAL_KEY_PROVIDER_INVOKE_START_TIME_NANO, System.nanoTime());

// 在服务端配置中,检查是否启用deadline功能(默认启用,只有明确设置为"false"时才禁用)
String deadlineEnabled = providerConfig.getParameter(CONFIG_KEY_DEADLINE_ENABLED);
// 获取deadline时间
Integer deadline = (Integer) request.getRequestProp(RpcConstants.RPC_REQUEST_DEADLINE);
if (!StringUtils.FALSE.equalsIgnoreCase(deadlineEnabled) && deadline != null) {
RpcInvokeContext.getContext().setDeadline(deadline + System.currentTimeMillis());
}

try {
// 反射 真正调用业务代码
Method method = request.getMethod();
Expand Down
Loading
Loading