diff --git a/dxm/src/main/java/io/pcp/parfait/dxm/MetricNameValidator.java b/dxm/src/main/java/io/pcp/parfait/dxm/MetricNameValidator.java index 791a6127..a91a83b0 100644 --- a/dxm/src/main/java/io/pcp/parfait/dxm/MetricNameValidator.java +++ b/dxm/src/main/java/io/pcp/parfait/dxm/MetricNameValidator.java @@ -16,12 +16,16 @@ package io.pcp.parfait.dxm; +import java.util.HashSet; +import java.util.Set; + import static io.pcp.parfait.dxm.PcpMmvWriter.PCP_CHARSET; class MetricNameValidator { private final int nameLimit; private final int domainLimit; + private final Set validNames = new HashSet<>(); MetricNameValidator(int nameLimit, int domainLimit) { this.nameLimit = nameLimit; @@ -29,8 +33,11 @@ class MetricNameValidator { } void validateNameConstraints(MetricName metricName) { - validateName(metricName); - validateInstance(metricName); + if (!validNames.contains(metricName)) { + validateName(metricName); + validateInstance(metricName); + validNames.add(metricName); + } } private void validateInstance(MetricName metricName) { diff --git a/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java b/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java index dbbd2a8f..6f527d5f 100644 --- a/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java +++ b/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java @@ -167,6 +167,7 @@ public void putBytes(ByteBuffer buffer, String value) { private volatile State state = State.STOPPED; private final Monitor stateMonitor = new Monitor(); private final Monitor.Guard isStarted = stateMonitor.newGuard(() -> state == State.STARTED); + private final Monitor.Guard isStopped = stateMonitor.newGuard(() -> state == State.STOPPED); private volatile Duration maxWaitStart = Duration.ofSeconds(10); private volatile boolean usePerMetricLock = true; private final Map perMetricByteBuffers = newConcurrentMap(); @@ -323,12 +324,23 @@ public final void updateMetric(MetricName name, Object value) { // implementation here is a little complicated to avoid taking a lock on the happy paths. if (state == State.STARTED) { doUpdateMetric(name, value); - } else if (state == State.STARTING) { - if (stateMonitor.enterWhenUninterruptibly(isStarted, maxWaitStart)) { - // Leave the monitor immediately because we only care about being notified about the state change + } else if (stateMonitor.enterIf(isStopped)) { + // In this case, the writer has not been started yet, but it's possible the monitorable has already been + // added back to the writer. If it has, we need to update the initial value so that it gets written + // correctly when the writer is started. If it's not present, then we don't need to do anything because the + // monitorable will be re-added in the future with the correct value. + try { + PcpValueInfo info = metricData.get(name); + if (info != null) { + info.setInitialValue(value); + } + } finally { stateMonitor.leave(); - doUpdateMetric(name, value); } + } else if (stateMonitor.enterWhenUninterruptibly(isStarted, maxWaitStart)) { + // Leave the monitor immediately because we only care about being notified about the state change + stateMonitor.leave(); + doUpdateMetric(name, value); } } diff --git a/dxm/src/main/java/io/pcp/parfait/dxm/PcpValueInfo.java b/dxm/src/main/java/io/pcp/parfait/dxm/PcpValueInfo.java index c4d8d3c8..0ca17b68 100644 --- a/dxm/src/main/java/io/pcp/parfait/dxm/PcpValueInfo.java +++ b/dxm/src/main/java/io/pcp/parfait/dxm/PcpValueInfo.java @@ -32,7 +32,7 @@ public final class PcpValueInfo implements PcpOffset,MmvWritable { private static final int VALUE_LENGTH = 32; private final MetricName metricName; - private final Object initialValue; + private volatile Object initialValue; private final PcpMetricInfo metricInfo; private final Instance instance; private final PcpString largeValue; @@ -78,6 +78,10 @@ private Object getInitialValue() { return initialValue; } + public void setInitialValue(Object initialValue) { + this.initialValue = initialValue; + } + private int getInstanceOffset() { return instance == null ? 0 : instance.getOffset(); } diff --git a/dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java b/dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java index d7472562..4b0e5f3d 100644 --- a/dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java +++ b/dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java @@ -203,6 +203,29 @@ public void putBytes(ByteBuffer buffer, Number value) { assertMetric("mmv." + order.get(0), is("10.000")); } + @Test + public void metricUpdatesWhileResettingWriterShouldNotBeLostWhenRecordedBeforeWriterStarted() throws Exception { + pcpMmvWriterV1.reset(); + pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1); + + pcpMmvWriterV1.start(); + + waitForReload(); + + assertMetric("mmv.value1", is("1.000")); + + pcpMmvWriterV1.reset(); + + pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1); + pcpMmvWriterV1.updateMetric(MetricName.parse("value1"), 10); + + pcpMmvWriterV1.start(); + + waitForReload(); + + assertMetric("mmv.value1", is("10.000")); + } + private void assertMetric(String metricName, Matcher expectedValue) throws Exception { String actual = pcpClient.getMetric(metricName); assertThat(actual, expectedValue); diff --git a/parfait-pcp/src/main/java/io/pcp/parfait/pcp/CachingMetricNameMapper.java b/parfait-pcp/src/main/java/io/pcp/parfait/pcp/CachingMetricNameMapper.java new file mode 100644 index 00000000..4ea0a0c8 --- /dev/null +++ b/parfait-pcp/src/main/java/io/pcp/parfait/pcp/CachingMetricNameMapper.java @@ -0,0 +1,37 @@ +/* + * Copyright 2009-2017 Aconex + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.pcp.parfait.pcp; + +import io.pcp.parfait.dxm.MetricName; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class CachingMetricNameMapper implements MetricNameMapper { + + private final Map cache = new ConcurrentHashMap<>(); + private final MetricNameMapper innerMapper; + + public CachingMetricNameMapper(MetricNameMapper mapper) { + this.innerMapper = mapper; + } + + @Override + public MetricName map(String name) { + return cache.computeIfAbsent(name, innerMapper::map); + } +} diff --git a/parfait-pcp/src/main/java/io/pcp/parfait/pcp/PcpMonitorBridge.java b/parfait-pcp/src/main/java/io/pcp/parfait/pcp/PcpMonitorBridge.java index e840b6a5..5e36aa3c 100644 --- a/parfait-pcp/src/main/java/io/pcp/parfait/pcp/PcpMonitorBridge.java +++ b/parfait-pcp/src/main/java/io/pcp/parfait/pcp/PcpMonitorBridge.java @@ -67,12 +67,13 @@ public class PcpMonitorBridge implements MonitoringView { private final TextSource shortTextSource; private final TextSource longTextSource; - private volatile PcpWriter pcpWriter; + private final PcpWriter pcpWriter; private volatile boolean started; public PcpMonitorBridge(PcpWriter writer) { - this(writer, MetricNameMapper.PASSTHROUGH_MAPPER, DEFAULT_SHORT_TEXT_SOURCE, + this(writer, new CachingMetricNameMapper(MetricNameMapper.PASSTHROUGH_MAPPER), + DEFAULT_SHORT_TEXT_SOURCE, DEFAULT_LONG_TEXT_SOURCE); }