diff --git a/.gitignore b/.gitignore index 013ac4aa0..fb4ed321b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea +.cache/ cmake-build-debug/ bin build diff --git a/include/CProducer.h b/include/CProducer.h index 296b13f71..fb18d2848 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -64,6 +64,7 @@ ROCKETMQCLIENT_API int SetProducerSendMsgTimeout(CProducer* producer, int timeou ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer* producer, int level); ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size); ROCKETMQCLIENT_API int SetProducerMessageTrace(CProducer* consumer, CTraceModel openTrace); +ROCKETMQCLIENT_API int SetProducerRetryAnotherBrokerWhenNotStoreOK(CProducer* producer, int retry); ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result); ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result); @@ -106,4 +107,4 @@ ROCKETMQCLIENT_API int SendMessageTransaction(CProducer* producer, #ifdef __cplusplus } #endif -#endif //__C_PRODUCER_H__ \ No newline at end of file +#endif //__C_PRODUCER_H__ diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h index 9eab3a810..7a336dbf6 100644 --- a/include/DefaultMQProducer.h +++ b/include/DefaultMQProducer.h @@ -98,6 +98,9 @@ class ROCKETMQCLIENT_API DefaultMQProducer { int getMaxMessageSize() const; void setMaxMessageSize(int maxMessageSize); + bool getRetryAnotherBrokerWhenNotStoreOK() const; + void setRetryAnotherBrokerWhenNotStoreOK(bool retry); + int getRetryTimes() const; void setRetryTimes(int times); diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index 7834f3369..cfb2eaa52 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -218,6 +218,7 @@ typedef struct __DefaultProducer__ { int producerType; char* version; } DefaultProducer; + CProducer* CreateProducer(const char* groupId) { if (groupId == NULL) { return NULL; @@ -267,6 +268,7 @@ CProducer* CreateTransactionProducer(const char* groupId, CLocalTransactionCheck defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0; return (CProducer*)defaultMQProducer; } + int DestroyProducer(CProducer* pProducer) { if (pProducer == NULL) { return NULL_POINTER; @@ -290,6 +292,7 @@ int DestroyProducer(CProducer* pProducer) { delete reinterpret_cast(pProducer); return OK; } + int StartProducer(CProducer* producer) { if (producer == NULL) { return NULL_POINTER; @@ -307,6 +310,7 @@ int StartProducer(CProducer* producer) { } return OK; } + int ShutdownProducer(CProducer* producer) { if (producer == NULL) { return NULL_POINTER; @@ -324,6 +328,7 @@ int ShutdownProducer(CProducer* producer) { } return OK; } + const char* ShowProducerVersion(CProducer* producer) { if (producer == NULL) { return DEFAULT_SDK_VERSION; @@ -332,6 +337,7 @@ const char* ShowProducerVersion(CProducer* producer) { return defaultMQProducer->version; } + int SetProducerNameServerAddress(CProducer* producer, const char* namesrv) { if (producer == NULL) { return NULL_POINTER; @@ -577,6 +583,7 @@ int SendMessageOrderly(CProducer* producer, } return OK; } + int SendMessageOrderlyByShardingKey(CProducer* producer, CMessage* msg, const char* shardingKey, CSendResult* result) { if (producer == NULL || msg == NULL || shardingKey == NULL || result == NULL) { return NULL_POINTER; @@ -629,6 +636,7 @@ int SendMessageTransaction(CProducer* producer, } return OK; } + int SetProducerGroupName(CProducer* producer, const char* groupName) { if (producer == NULL) { return NULL_POINTER; @@ -663,6 +671,7 @@ int SetProducerInstanceName(CProducer* producer, const char* instanceName) { } return OK; } + int SetProducerSessionCredentials(CProducer* producer, const char* accessKey, const char* secretKey, @@ -683,6 +692,7 @@ int SetProducerSessionCredentials(CProducer* producer, } return OK; } + int SetProducerLogPath(CProducer* producer, const char* logPath) { if (producer == NULL) { return NULL_POINTER; @@ -798,6 +808,7 @@ int SetProducerMaxMessageSize(CProducer* producer, int size) { } return OK; } + int SetProducerMessageTrace(CProducer* producer, CTraceModel openTrace) { if (producer == NULL) { return NULL_POINTER; @@ -816,6 +827,25 @@ int SetProducerMessageTrace(CProducer* producer, CTraceModel openTrace) { } return OK; } + +int SetProducerRetryAnotherBrokerWhenNotStoreOK(CProducer* producer, int retry) { + if (producer == NULL) { + return NULL_POINTER; + } + + DefaultProducer* defaultMQProducer = (DefaultProducer*)producer; + + try { + if (CAPI_C_PRODUCER_TYPE_TRANSACTION != defaultMQProducer->producerType) { + defaultMQProducer->innerProducer->setRetryAnotherBrokerWhenNotStoreOK(retry != 0); + } + } catch (exception& e) { + MQClientErrorContainer::setErr(string(e.what())); + return PRODUCER_START_FAILED; + } + return OK; +} + #ifdef __cplusplus }; #endif diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp index 5b770305a..5ffe15a9f 100644 --- a/src/producer/DefaultMQProducer.cpp +++ b/src/producer/DefaultMQProducer.cpp @@ -135,6 +135,14 @@ int DefaultMQProducer::getMaxMessageSize() const { return impl->getMaxMessageSize(); } +bool DefaultMQProducer::getRetryAnotherBrokerWhenNotStoreOK() const { + return impl->getRetryAnotherBrokerWhenNotStoreOK(); +} + +void DefaultMQProducer::setRetryAnotherBrokerWhenNotStoreOK(bool retry) { + impl->setRetryAnotherBrokerWhenNotStoreOK(retry); +} + void DefaultMQProducer::setRetryTimes4Async(int times) { impl->setRetryTimes4Async(times); } @@ -254,4 +262,4 @@ const std::string& DefaultMQProducer::getSslPropertyFile() const { return impl->getSslPropertyFile(); } -} // namespace rocketmq \ No newline at end of file +} // namespace rocketmq diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp index cfa1f3787..3fd0bf485 100644 --- a/src/producer/DefaultMQProducerImpl.cpp +++ b/src/producer/DefaultMQProducerImpl.cpp @@ -41,7 +41,7 @@ DefaultMQProducerImpl::DefaultMQProducerImpl(const string& groupname) : m_sendMsgTimeout(3000), m_compressMsgBodyOverHowmuch(4 * 1024), m_maxMessageSize(1024 * 128), - // m_retryAnotherBrokerWhenNotStoreOK(false), + m_retryAnotherBrokerWhenNotStoreOK(false), m_compressLevel(5), m_retryTimes(5), m_retryTimes4Async(1), @@ -410,7 +410,7 @@ SendResult DefaultMQProducerImpl::sendDefaultImpl(MQMessage& msg, case ComMode_ONEWAY: return sendResult; case ComMode_SYNC: - if (sendResult.getSendStatus() != SEND_OK) { + if (sendResult.getSendStatus() != SEND_OK && m_retryAnotherBrokerWhenNotStoreOk) { if (bActiveMQ) { topicPublishInfo->updateNonServiceMessageQueue(mq, getSendMsgTimeout()); } @@ -620,6 +620,14 @@ bool DefaultMQProducerImpl::tryToCompressMessage(MQMessage& msg) { return false; } +bool DefaultMQProducerImpl::getRetryAnotherBrokerWhenNotStoreOK() const { + return m_retryAnotherBrokerWhenNotStoreOK; +} + +void DefaultMQProducerImpl::setRetryAnotherBrokerWhenNotStoreOK(bool retry) { + m_retryAnotherBrokerWhenNotStoreOK = retry; +} + int DefaultMQProducerImpl::getRetryTimes() const { return m_retryTimes; } diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h index d2eb1879d..6760dc4ad 100644 --- a/src/producer/DefaultMQProducerImpl.h +++ b/src/producer/DefaultMQProducerImpl.h @@ -84,6 +84,9 @@ class DefaultMQProducerImpl : public MQProducer { void setRetryTimes4Async(int times); void submitSendTraceRequest(const MQMessage& msg, SendCallback* pSendCallback); + bool getRetryAnotherBrokerWhenNotStoreOK() const; + void setRetryAnotherBrokerWhenNotStoreOK(bool retry); + protected: SendResult sendAutoRetrySelectImpl(MQMessage& msg, MessageQueueSelector* pSelector, @@ -122,7 +125,7 @@ class DefaultMQProducerImpl : public MQProducer { int m_sendMsgTimeout; int m_compressMsgBodyOverHowmuch; int m_maxMessageSize; //