Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public PublisherHandler addHandler(
new PublisherHandler(handlerId, replies, handlerMetrics, this, registerTransferQueue(handlerId));
handlers.put(handlerId, newHandler);
metrics.currentPublisherCount().set(handlers.size());
LOGGER.log(TRACE, "Added new handler {0}", handlerId);
return newHandler;
}

Expand Down Expand Up @@ -374,13 +375,21 @@ public void closeBlock(final BlockProof blockEndProof, final long handlerId) {
metrics.blocksClosedIncomplete.increment();
} else {
metrics.blocksClosedComplete.increment();
// @todo(1413) Also log completed blocks metric and any other relevant
// @todo(1416) Also log completed blocks metric and any other relevant
// actions. Also check if we have incomplete blocks lower than the
// block that completed, and possibly enter the resend process to
// have handlers go back and get the block that was too slow resent
// from a different publisher (don't forget to keep/track last
// completed block, and retain data in queue(s) for
// completed-but-not-forwarded blocks).

// @todo(1414)
// @todo(1415) Remove this log when the related tickets are done.
LOGGER.log(
DEBUG,
"Completed blocks: {0}, Incompleted blocks: {1}",
metrics.blocksClosedComplete.get(),
metrics.blocksClosedIncomplete.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.INFO;
import static java.lang.System.Logger.Level.TRACE;
import static java.lang.System.Logger.Level.WARNING;
import static org.hiero.block.node.spi.BlockNodePlugin.METRICS_CATEGORY;
import static org.hiero.block.node.spi.BlockNodePlugin.UNKNOWN_BLOCK_NUMBER;
Expand Down Expand Up @@ -209,12 +210,14 @@ private void handleBlockItemsRequest(
try {
header = BlockHeader.PROTOBUF.parse(headerBytes);
} catch (final ParseException e) {
LOGGER.log(DEBUG, "Failed to parse BlockHeader due to {0}", e);
// if we have reached this block, this means that the
// request is invalid
sendEndAndResetState(Code.INVALID_REQUEST);
return;
}
} else {
LOGGER.log(DEBUG, "Handler {0} received a BlockHeader with null bytes", handlerId);
// this should never happen
sendEndAndResetState(Code.ERROR);
return;
Expand All @@ -224,6 +227,11 @@ private void handleBlockItemsRequest(
// update the current streaming block number
currentStreamingBlockNumber.set(blockNumber);
} else {
LOGGER.log(
DEBUG,
"Handler {0} received a BlockHeader while already streaming block {1}",
handlerId,
blockNumber);
// If we have entered here, we have an invalid request, the
// block number is not reset which means that the block
// from the request prior to this one has not been streamed in
Expand All @@ -240,7 +248,7 @@ private void handleBlockItemsRequest(
// header batch to arrive and the "skip" response to be sent back,
// due to network latency and processing time.
// @todo(1416) add metrics
// @todo(1413) add logs
LOGGER.log(DEBUG, "Handler {0} dropping batch because first block item is not BlockHeader", handlerId);
return;
}
// now we need to query the manager with the block number currently
Expand Down Expand Up @@ -370,6 +378,8 @@ public void closeCommunication() {
* verified and persisted.
*/
public void sendAcknowledgement(final long newLastAcknowledgedBlockNumber) {
LOGGER.log(
TRACE, "Handler {0} sending acknowledgement for block {1}", handlerId, newLastAcknowledgedBlockNumber);
// We only ever need to acknowledge once for a given block number, even
// if there are several blocks "behind" that acknowledgement.
// The publishers expect that acknowledgement for block N implicitly
Expand Down Expand Up @@ -450,6 +460,7 @@ private boolean sendResponse(final PublishStreamResponse response) {
* @return the id of this handler
*/
public long handleFailedVerification(final long blockNumber) {
LOGGER.log(DEBUG, "Handler {0} handling failed verification for block {1}", handlerId, blockNumber);
if (unacknowledgedStreamedBlocks.remove(blockNumber)) {
// If the block number that failed verification was sent by this
// handler, we need to send an EndOfStream with BAD_BLOCK_PROOF code.
Expand Down Expand Up @@ -536,6 +547,7 @@ private BatchHandleResult handleAccept(
* Handle the SKIP action for a block.
*/
private BatchHandleResult handleSkip(final long blockNumber) {
LOGGER.log(DEBUG, "Handler {0} is sending SKIP for block {1}", handlerId, blockNumber);
// If the action is SKIP, we need to send a skip response
// to the publisher and not propagate the items.
final SkipBlock skipBlock =
Expand All @@ -554,6 +566,8 @@ private BatchHandleResult handleSkip(final long blockNumber) {
* Handle the RESEND action for a block.
*/
private BatchHandleResult handleResend() {
final long blockToResend = publisherManager.getLatestBlockNumber() + 1L;
LOGGER.log(DEBUG, "Handler {0} is sending RESEND({1})", handlerId, blockToResend);
// If the action is RESEND, we need to send a resend
// response to the publisher and not propagate the items.
final ResendBlock resendBlock = ResendBlock.newBuilder()
Expand All @@ -573,6 +587,7 @@ private BatchHandleResult handleResend() {
* Handle the END_BEHIND action for a block.
*/
private BatchHandleResult handleEndBehind() {
LOGGER.log(DEBUG, "Handler {0} is sending BEHIND({1}).", handlerId, publisherManager.getLatestBlockNumber());
// If the action is END_BEHIND, we need to send an end of stream
// response to the publisher and not propagate the items.
sendEndOfStream(Code.BEHIND);
Expand All @@ -583,6 +598,11 @@ private BatchHandleResult handleEndBehind() {
* Handle the END_DUPLICATE action for a block.
*/
private BatchHandleResult handleEndDuplicate() {
LOGGER.log(
DEBUG,
"Handler {0} is sending DUPLICATE_BLOCK({1}).",
handlerId,
publisherManager.getLatestBlockNumber());
// If the action is END_DUPLICATE, we need to send an end of stream
// response to the publisher and not propagate the items.
sendEndOfStream(Code.DUPLICATE_BLOCK);
Expand All @@ -593,6 +613,7 @@ private BatchHandleResult handleEndDuplicate() {
* Handle the END_ERROR action for a block.
*/
private BatchHandleResult handleEndError() {
LOGGER.log(DEBUG, "Handler {0} is sending ERROR", handlerId);
// If the action is END_ERROR, we need to send an end of stream
// response to the publisher and not propagate the items.
sendEndOfStream(Code.ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
*
*/
public final class StreamPublisherPlugin implements BlockNodePlugin, BlockStreamPublishServiceInterface {
// @todo(1413) add proper logging usage to this class.
/** The logger for this class. */
private final System.Logger LOGGER = System.getLogger(getClass().getName());

Expand Down
Loading