Skip to content

Fix another race condition that causes lost metrics #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dxm/src/main/java/io/pcp/parfait/dxm/MetricNameValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,28 @@

package io.pcp.parfait.dxm;

import java.util.HashSet;
import java.util.Set;

import static io.pcp.parfait.dxm.PcpMmvWriter.PCP_CHARSET;

class MetricNameValidator {

private final int nameLimit;
private final int domainLimit;
private final Set<MetricName> validNames = new HashSet<>();

MetricNameValidator(int nameLimit, int domainLimit) {
this.nameLimit = nameLimit;
this.domainLimit = domainLimit;
}

void validateNameConstraints(MetricName metricName) {
validateName(metricName);
validateInstance(metricName);
if (!validNames.contains(metricName)) {
validateName(metricName);
validateInstance(metricName);
validNames.add(metricName);
}
}

private void validateInstance(MetricName metricName) {
Expand Down
20 changes: 16 additions & 4 deletions dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void putBytes(ByteBuffer buffer, String value) {
private volatile State state = State.STOPPED;
private final Monitor stateMonitor = new Monitor();
private final Monitor.Guard isStarted = stateMonitor.newGuard(() -> state == State.STARTED);
private final Monitor.Guard isStopped = stateMonitor.newGuard(() -> state == State.STOPPED);
private volatile Duration maxWaitStart = Duration.ofSeconds(10);
private volatile boolean usePerMetricLock = true;
private final Map<PcpValueInfo,ByteBuffer> perMetricByteBuffers = newConcurrentMap();
Expand Down Expand Up @@ -323,12 +324,23 @@ public final void updateMetric(MetricName name, Object value) {
// implementation here is a little complicated to avoid taking a lock on the happy paths.
if (state == State.STARTED) {
doUpdateMetric(name, value);
} else if (state == State.STARTING) {
if (stateMonitor.enterWhenUninterruptibly(isStarted, maxWaitStart)) {
// Leave the monitor immediately because we only care about being notified about the state change
} else if (stateMonitor.enterIf(isStopped)) {
// In this case, the writer has not been started yet, but it's possible the monitorable has already been
// added back to the writer. If it has, we need to update the initial value so that it gets written
// correctly when the writer is started. If it's not present, then we don't need to do anything because the
// monitorable will be re-added in the future with the correct value.
try {
PcpValueInfo info = metricData.get(name);
if (info != null) {
info.setInitialValue(value);
}
} finally {
stateMonitor.leave();
doUpdateMetric(name, value);
}
} else if (stateMonitor.enterWhenUninterruptibly(isStarted, maxWaitStart)) {
// Leave the monitor immediately because we only care about being notified about the state change
stateMonitor.leave();
doUpdateMetric(name, value);
}
}

Expand Down
6 changes: 5 additions & 1 deletion dxm/src/main/java/io/pcp/parfait/dxm/PcpValueInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class PcpValueInfo implements PcpOffset,MmvWritable {
private static final int VALUE_LENGTH = 32;

private final MetricName metricName;
private final Object initialValue;
private volatile Object initialValue;
private final PcpMetricInfo metricInfo;
private final Instance instance;
private final PcpString largeValue;
Expand Down Expand Up @@ -78,6 +78,10 @@ private Object getInitialValue() {
return initialValue;
}

public void setInitialValue(Object initialValue) {
this.initialValue = initialValue;
}

private int getInstanceOffset() {
return instance == null ? 0 : instance.getOffset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,29 @@ public void putBytes(ByteBuffer buffer, Number value) {
assertMetric("mmv." + order.get(0), is("10.000"));
}

@Test
public void metricUpdatesWhileResettingWriterShouldNotBeLostWhenRecordedBeforeWriterStarted() throws Exception {
pcpMmvWriterV1.reset();
pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1);

pcpMmvWriterV1.start();

waitForReload();

assertMetric("mmv.value1", is("1.000"));

pcpMmvWriterV1.reset();

pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1);
pcpMmvWriterV1.updateMetric(MetricName.parse("value1"), 10);

pcpMmvWriterV1.start();

waitForReload();

assertMetric("mmv.value1", is("10.000"));
}

private void assertMetric(String metricName, Matcher<String> expectedValue) throws Exception {
String actual = pcpClient.getMetric(metricName);
assertThat(actual, expectedValue);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2009-2017 Aconex
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.pcp.parfait.pcp;

import io.pcp.parfait.dxm.MetricName;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CachingMetricNameMapper implements MetricNameMapper {

private final Map<String, MetricName> cache = new ConcurrentHashMap<>();
private final MetricNameMapper innerMapper;

public CachingMetricNameMapper(MetricNameMapper mapper) {
this.innerMapper = mapper;
}

@Override
public MetricName map(String name) {
return cache.computeIfAbsent(name, innerMapper::map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ public class PcpMonitorBridge implements MonitoringView {
private final TextSource shortTextSource;
private final TextSource longTextSource;

private volatile PcpWriter pcpWriter;
private final PcpWriter pcpWriter;
private volatile boolean started;


public PcpMonitorBridge(PcpWriter writer) {
this(writer, MetricNameMapper.PASSTHROUGH_MAPPER, DEFAULT_SHORT_TEXT_SOURCE,
this(writer, new CachingMetricNameMapper(MetricNameMapper.PASSTHROUGH_MAPPER),
DEFAULT_SHORT_TEXT_SOURCE,
DEFAULT_LONG_TEXT_SOURCE);
}

Expand Down
Loading