Skip to content

make message trace compatible with RocketMQ main project #399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/consumer/ConsumeMessageConcurrentlyService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
consumeMessageContext.setClientId(pConsumer->getMQClientId());
pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
}
}
Expand Down Expand Up @@ -195,6 +196,9 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
consumeMessageContext.setMsgIndex(i);
consumeMessageContext.setStatus("RECONSUME_LATER");
consumeMessageContext.setSuccess(false);
std::map<std::string, std::string> props;
props.insert(std::make_pair("ConsumeContextType", "FAILED"));
consumeMessageContext.setProps(props);
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
continue;
}
Expand All @@ -212,6 +216,9 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
if (status == CONSUME_SUCCESS) {
consumeMessageContext.setStatus("CONSUME_SUCCESS");
consumeMessageContext.setSuccess(true);
std::map<std::string, std::string> props;
props.insert(std::make_pair("ConsumeContextType", "SUCCESS"));
consumeMessageContext.setProps(props);
} else {
status = RECONSUME_LATER;
consumeMessageContext.setStatus("RECONSUME_LATER");
Expand Down
14 changes: 12 additions & 2 deletions src/consumer/ConsumeMessageHookImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ void ConsumeMessageHookImpl::executeHookBefore(ConsumeMessageContext* context) {
bean.setStoreTime((*it).getStoreTimestamp());
bean.setBodyLength((*it).getStoreSize());
bean.setRetryTimes((*it).getReconsumeTimes());
bean.setClientHost(context->getClientId());
std::string regionId = (*it).getProperty(MQMessage::PROPERTY_MSG_REGION);
if (regionId.empty()) {
regionId = TraceContant::DEFAULT_REDION;
Expand All @@ -73,7 +74,7 @@ void ConsumeMessageHookImpl::executeHookBefore(ConsumeMessageContext* context) {
}
traceContext->setTimeStamp(UtilAll::currentTimeMillis());

std::string topic = TraceContant::TRACE_TOPIC + traceContext->getRegionId();
std::string topic = TraceContant::TRACE_TOPIC;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trace Topic is different for local and cloud in Java SDK :
private String getTraceTopicName(String regionId) {
AccessChannel accessChannel = AsyncTraceDispatcher.this.getAccessChannel();
if (AccessChannel.CLOUD == accessChannel) {
return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}

        return AsyncTraceDispatcher.this.getTraceTopicName();
    }


TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(traceContext);
MQMessage message(topic, ben.getTransData());
Expand Down Expand Up @@ -101,8 +102,17 @@ void ConsumeMessageHookImpl::executeHookAfter(ConsumeMessageContext* context) {
subAfterContext.setTraceBeanIndex(context->getMsgIndex());
TraceBean bean = subBeforeContext->getTraceBeans()[subAfterContext.getTraceBeanIndex()];
subAfterContext.setTraceBean(bean);
auto contextTypeIter = context->getProps().find("ConsumeContextType");
if(contextTypeIter != context->getProps().end()) {
string contextType = contextTypeIter->second;
if (contextType.find("SUCCESS") != string::npos) {
subAfterContext.setContextCode(0);
} else {
subAfterContext.setContextCode(3);
};
}

std::string topic = TraceContant::TRACE_TOPIC + subAfterContext.getRegionId();
std::string topic = TraceContant::TRACE_TOPIC;
TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(&subAfterContext);
MQMessage message(topic, ben.getTransData());
message.setKeys(ben.getTransKey());
Expand Down
5 changes: 5 additions & 0 deletions src/consumer/ConsumeMessageOrderlyService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> p
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
consumeMessageContext.setClientId(pConsumer->getMQClientId());
pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
}
}
Expand All @@ -214,6 +215,9 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> p
consumeMessageContext.setMsgIndex(0);
consumeMessageContext.setStatus("RECONSUME_LATER");
consumeMessageContext.setSuccess(false);
std::map<std::string, std::string> props;
props.insert(std::make_pair("ConsumeContextType", "FAILED"));
consumeMessageContext.setProps(props);
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
}
if (msgs[0].getReconsumeTimes() <= 15) {
Expand All @@ -237,6 +241,7 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> p
consumeMessageContext.setMsgIndex(0);
consumeMessageContext.setStatus("CONSUME_SUCCESS");
consumeMessageContext.setSuccess(true);
consumeMessageContext.getProps().insert(std::map<string, string>::value_type("ConsumeContextType", "SUCCESS"));
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
}
m_pConsumer->updateConsumeOffset(request->m_messageQueue, request->commit());
Expand Down
1 change: 1 addition & 0 deletions src/producer/DefaultMQProducerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ SendResult DefaultMQProducerImpl::sendKernelImpl(MQMessage& msg,
pSendMesgContext->setMessageQueue(mq);
pSendMesgContext->setMsgType(TRACE_NORMAL_MSG);
pSendMesgContext->setNameSpace(getNameSpace());
pSendMesgContext->setClientId(getMQClientId());
string tranMsg = msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED);
if (!tranMsg.empty() && tranMsg == "true") {
pSendMesgContext->setMsgType(TRACE_TRANS_HALF_MSG);
Expand Down
3 changes: 2 additions & 1 deletion src/producer/SendMessageHookImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ void SendMessageHookImpl::executeHookAfter(SendMessageContext* context) {
traceBean.setMsgId(context->getMessage()->getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
traceBean.setOffsetMsgId(context->getSendResult()->getOffsetMsgId());
traceBean.setStoreTime(traceContext->getTimeStamp() + (costTime / 2));
traceBean.setClientHost(context->getClientId());

traceContext->setTraceBean(traceBean);

topic = TraceContant::TRACE_TOPIC + traceContext->getRegionId();
topic = TraceContant::TRACE_TOPIC;
TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(traceContext.get());
// encode data
MQMessage message(topic, ben.getTransData());
Expand Down
16 changes: 16 additions & 0 deletions src/trace/ConsumeMessageContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ void ConsumeMessageContext::setTraceContext(TraceContext* mTraceContext) {
m_traceContext.reset(mTraceContext);
}

std::map<std::string, std::string> ConsumeMessageContext::getProps() {
return m_props;
}

void ConsumeMessageContext::setProps(std::map<std::string, std::string>& props) {
m_props = props;
}

std::string ConsumeMessageContext::getClientId() {
return m_clientId;
}

void ConsumeMessageContext::setClientId(const std::string& clientId) {
m_clientId = clientId;
}

std::string ConsumeMessageContext::getNameSpace() {
return m_nameSpace;
}
Expand Down
10 changes: 10 additions & 0 deletions src/trace/ConsumeMessageContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ class ConsumeMessageContext {

void setTraceContext(TraceContext* mTraceContext);

std::map<std::string, std::string> getProps();

void setProps(std::map<std::string, std::string>& props);

std::string getClientId();

void setClientId(const std::string& clientId);

std::string getNameSpace();

void setNameSpace(const std::string& mNameSpace);
Expand All @@ -80,6 +88,8 @@ class ConsumeMessageContext {
DefaultMQPushConsumerImpl* m_defaultMQPushConsumer;
// TraceContext* m_traceContext;
std::shared_ptr<TraceContext> m_traceContext;
std::map<std::string, std::string> m_props;
std::string m_clientId;
std::string m_nameSpace;
};
} // namespace rocketmq
Expand Down
8 changes: 8 additions & 0 deletions src/trace/SendMessageContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,12 @@ std::string SendMessageContext::getNameSpace() {
void SendMessageContext::setNameSpace(const std::string& mNameSpace) {
m_nameSpace = mNameSpace;
}

std::string SendMessageContext::getClientId() {
return m_clientId;
}

void SendMessageContext::setClientId(const std::string& clientId) {
m_clientId = clientId;
}
} // namespace rocketmq
5 changes: 5 additions & 0 deletions src/trace/SendMessageContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class SendMessageContext {

void setNameSpace(const std::string& mNameSpace);

std::string getClientId();

void setClientId(const std::string& clientId);

private:
std::string m_producerGroup;
MQMessage m_message;
Expand All @@ -90,6 +94,7 @@ class SendMessageContext {
SendResult m_sendResult;
TraceContext* m_traceContext;
std::string m_nameSpace;
std::string m_clientId;
};

} // namespace rocketmq
Expand Down
2 changes: 1 addition & 1 deletion src/trace/TraceContant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

namespace rocketmq {
std::string TraceContant::GROUP_NAME = "_INNER_TRACE_PRODUCER";
std::string TraceContant::TRACE_TOPIC = "rmq_sys_TRACE_DATA_";
std::string TraceContant::TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
std::string TraceContant::DEFAULT_REDION = "DEFAULT_REGION";
char TraceContant::CONTENT_SPLITOR = 1;
char TraceContant::FIELD_SPLITOR = 2;
Expand Down
8 changes: 8 additions & 0 deletions src/trace/TraceContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ void TraceContext::setTraceBeanIndex(int traceBeanIndex) {
m_traceBeanIndex = traceBeanIndex;
}

int TraceContext::getContextCode() const {
return m_contextCode;
}

void TraceContext::setContextCode(int contextCode) {
m_contextCode = contextCode;
}

const vector<TraceBean>& TraceContext::getTraceBeans() const {
return m_traceBeans;
}
Expand Down
5 changes: 5 additions & 0 deletions src/trace/TraceContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class TraceContext {

void setTraceBeanIndex(int traceBeanIndex);

int getContextCode() const;

void setContextCode(int contextCode);

const std::vector<TraceBean>& getTraceBeans() const;

void setTraceBean(const TraceBean& traceBean);
Expand All @@ -82,6 +86,7 @@ class TraceContext {
bool m_status;
std::string m_requestId;
int m_traceBeanIndex;
int m_contextCode = 0;
std::vector<TraceBean> m_traceBeans;
};
} // namespace rocketmq
Expand Down
11 changes: 8 additions & 3 deletions src/trace/TraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx)
ss << ctx->getCostTime() << TraceContant::CONTENT_SPLITOR;
ss << it->getMsgType() << TraceContant::CONTENT_SPLITOR;
ss << it->getOffsetMsgId() << TraceContant::CONTENT_SPLITOR;
ss << (ctx->getStatus() ? "true" : "false") << TraceContant::FIELD_SPLITOR;
ss << (ctx->getStatus() ? "true" : "false") << TraceContant::CONTENT_SPLITOR;
ss << it->getClientHost() << TraceContant::FIELD_SPLITOR;
} break;

case SubBefore: {
Expand All @@ -71,7 +72,8 @@ TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx)
if (!it->getKeys().empty()) {
defaultKey = it->getKeys();
}
ss << defaultKey << TraceContant::FIELD_SPLITOR;
ss << defaultKey << TraceContant::CONTENT_SPLITOR;
ss << it->getClientHost() << TraceContant::FIELD_SPLITOR;
}
} break;

Expand All @@ -87,7 +89,10 @@ TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx)
if (!it->getKeys().empty()) {
defaultKey = it->getKeys();
}
ss << defaultKey << TraceContant::FIELD_SPLITOR;
ss << defaultKey << TraceContant::CONTENT_SPLITOR;
ss << ctx->getContextCode() << TraceContant::CONTENT_SPLITOR;
ss << ctx->getTimeStamp() << TraceContant::CONTENT_SPLITOR;
ss << ctx->getGroupName() << TraceContant::FIELD_SPLITOR;
} break;

default:
Expand Down