diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp index 7640772ea..0dae494aa 100644 --- a/src/common/AsyncCallbackWrap.cpp +++ b/src/common/AsyncCallbackWrap.cpp @@ -167,15 +167,19 @@ void PullCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool b } MQException exception(err, -1, __FILE__, __LINE__); LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque()); - if (pCallback && bProducePullRequest) + if (pCallback && bProducePullRequest) { pCallback->onException(exception); + deleteAndZero(pCallback); + } } else { try { if (m_pArg.pPullWrapper) { unique_ptr pullResult(m_pClientAPI->processPullResponse(pResponse.get())); PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData); - if (pCallback) + if (pCallback) { pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest); + deleteAndZero(pCallback); + } } else { LOG_ERROR("pPullWrapper had been destroyed with consumer"); } @@ -183,7 +187,10 @@ void PullCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool b LOG_ERROR("%s", e.what()); MQException exception("pullResult error", -1, __FILE__, __LINE__); if (pCallback && bProducePullRequest) + { pCallback->onException(exception); + deleteAndZero(pCallback); + } } } } diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp index 4a79595e5..3b769c395 100644 --- a/src/consumer/DefaultMQPushConsumerImpl.cpp +++ b/src/consumer/DefaultMQPushConsumerImpl.cpp @@ -892,7 +892,7 @@ void DefaultMQPushConsumerImpl::pullMessageAsync(boost::weak_ptr pu } try { request->setLastPullTimestamp(UtilAll::currentTimeMillis()); - AsyncPullCallback* pullCallback = getAsyncPullCallBack(request, messageQueue); + AsyncPullCallback* pullCallback = new AsyncPullCallback(this, request); if (pullCallback == NULL) { LOG_WARN("Can not get pull callback for:%s, Maybe this pull request has been released.", request->m_messageQueue.toString().c_str());