Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,49 @@

package org.apache.hadoop.ozone.recon.metrics;

import com.google.common.base.CaseFormat;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.ozone.OzoneConsts;

/**
* Metrics for Recon SCM targeted sync execution.
*/
@InterfaceAudience.Private
@Metrics(about = "Recon SCM Container Sync Metrics", context = OzoneConsts.OZONE)
public final class ReconScmContainerSyncMetrics {
public final class ReconScmContainerSyncMetrics implements MetricsSource {

private static final String SOURCE_NAME =
ReconScmContainerSyncMetrics.class.getSimpleName();

private static final HddsProtos.LifeCycleState[] SYNC_STATES = {
HddsProtos.LifeCycleState.OPEN,
HddsProtos.LifeCycleState.QUASI_CLOSED,
HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleState.DELETED
};

private static final MetricsInfo TARGETED_SYNC_STATUS = Interns.info(
"targetedSyncStatus",
"Targeted sync status: 0=idle, 1=in progress, 2=success, 3=failure");

private static final MetricsInfo LAST_TARGETED_SYNC_DURATION_MS = Interns.info(
"lastTargetedSyncDurationMs",
"Time taken by the last targeted sync in milliseconds");

/**
* No targeted sync has run yet, or the latest scheduler cycle did not run one.
*/
Expand All @@ -53,14 +77,22 @@ public final class ReconScmContainerSyncMetrics {
*/
public static final int TARGETED_SYNC_STATUS_FAILURE = 3;

@Metric(about = "Targeted sync status: 0=idle, 1=in progress, "
+ "2=success, 3=failure")
private MutableGaugeInt targetedSyncStatus;

@Metric(about = "Time taken by the last targeted sync in milliseconds")
private MutableGaugeLong lastTargetedSyncDurationMs;
private final AtomicInteger targetedSyncStatus = new AtomicInteger();
private final AtomicLong lastTargetedSyncDurationMs = new AtomicLong();
private final Map<HddsProtos.LifeCycleState, AtomicLong>
lastContainerSyncDurationMs;
private final Map<HddsProtos.LifeCycleState, AtomicLong>
lastContainerCountDrift;
private final Map<HddsProtos.LifeCycleState, MetricsInfo>
containerSyncDurationMetricInfo;
private final Map<HddsProtos.LifeCycleState, MetricsInfo>
containerCountDriftMetricInfo;

private ReconScmContainerSyncMetrics() {
lastContainerSyncDurationMs = initStateGaugeValues();
lastContainerCountDrift = initStateGaugeValues();
containerSyncDurationMetricInfo = initSyncDurationMetricInfo();
containerCountDriftMetricInfo = initCountDriftMetricInfo();
}

public static ReconScmContainerSyncMetrics create() {
Expand All @@ -83,11 +115,105 @@ public void setLastTargetedSyncDurationMs(long durationMs) {
lastTargetedSyncDurationMs.set(durationMs);
}

public void setLastContainerSyncDurationMs(
HddsProtos.LifeCycleState state, long durationMs) {
setStateGauge(lastContainerSyncDurationMs, state, durationMs);
}

public void setLastContainerCountDrift(
HddsProtos.LifeCycleState state, long drift) {
setStateGauge(lastContainerCountDrift, state, drift);
}

public int getTargetedSyncStatus() {
return targetedSyncStatus.value();
return targetedSyncStatus.get();
}

public long getLastTargetedSyncDurationMs() {
return lastTargetedSyncDurationMs.value();
return lastTargetedSyncDurationMs.get();
}

public long getLastContainerSyncDurationMs(
HddsProtos.LifeCycleState state) {
return getStateGauge(lastContainerSyncDurationMs, state);
}

public long getLastContainerCountDrift(
HddsProtos.LifeCycleState state) {
return getStateGauge(lastContainerCountDrift, state);
}

@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder builder = collector.addRecord(SOURCE_NAME);
builder.addGauge(TARGETED_SYNC_STATUS, getTargetedSyncStatus());
builder.addGauge(LAST_TARGETED_SYNC_DURATION_MS,
getLastTargetedSyncDurationMs());
for (HddsProtos.LifeCycleState state : SYNC_STATES) {
builder.addGauge(containerSyncDurationMetricInfo.get(state),
getLastContainerSyncDurationMs(state));
builder.addGauge(containerCountDriftMetricInfo.get(state),
getLastContainerCountDrift(state));
}
}

private static Map<HddsProtos.LifeCycleState, AtomicLong>
initStateGaugeValues() {
Map<HddsProtos.LifeCycleState, AtomicLong> gauges =
new EnumMap<>(HddsProtos.LifeCycleState.class);
for (HddsProtos.LifeCycleState state : SYNC_STATES) {
gauges.put(state, new AtomicLong());
}
return Collections.unmodifiableMap(gauges);
}

private static Map<HddsProtos.LifeCycleState, MetricsInfo>
initSyncDurationMetricInfo() {
Map<HddsProtos.LifeCycleState, MetricsInfo> metrics =
new EnumMap<>(HddsProtos.LifeCycleState.class);
for (HddsProtos.LifeCycleState state : SYNC_STATES) {
String stateName = metricStateName(state);
metrics.put(state, Interns.info(
"last" + stateName + "ContainerSyncDurationMs",
"Time taken by the last " + stateName
+ " container sync pass in milliseconds"));
}
return Collections.unmodifiableMap(metrics);
}

private static Map<HddsProtos.LifeCycleState, MetricsInfo>
initCountDriftMetricInfo() {
Map<HddsProtos.LifeCycleState, MetricsInfo> metrics =
new EnumMap<>(HddsProtos.LifeCycleState.class);
for (HddsProtos.LifeCycleState state : SYNC_STATES) {
String stateName = metricStateName(state);
metrics.put(state, Interns.info(
"last" + stateName + "ContainerCountDrift",
"Last pre-sync observed " + stateName
+ " container count drift, computed as SCM count minus Recon count"));
}
return Collections.unmodifiableMap(metrics);
}

private static String metricStateName(HddsProtos.LifeCycleState state) {
return CaseFormat.UPPER_UNDERSCORE.to(
CaseFormat.UPPER_CAMEL, state.name());
}

private static void setStateGauge(
Map<HddsProtos.LifeCycleState, AtomicLong> gauges,
HddsProtos.LifeCycleState state,
long value) {
AtomicLong gauge = gauges.get(state);
if (gauge != null) {
gauge.set(value);
}
}

private static long getStateGauge(
Map<HddsProtos.LifeCycleState, AtomicLong> gauges,
HddsProtos.LifeCycleState state) {
AtomicLong gauge = gauges.get(state);
return gauge != null ? gauge.get() : 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
containerSyncHelper = new ReconStorageContainerSyncHelper(
scmServiceProvider,
ozoneConfiguration,
containerManager
containerManager,
containerSyncMetrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.recon.metrics.ReconScmContainerSyncMetrics;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -140,13 +142,22 @@ class ReconStorageContainerSyncHelper {
private final StorageContainerServiceProvider scmServiceProvider;
private final OzoneConfiguration ozoneConfiguration;
private final ReconContainerManager containerManager;
private final ReconScmContainerSyncMetrics containerSyncMetrics;

ReconStorageContainerSyncHelper(StorageContainerServiceProvider scmServiceProvider,
OzoneConfiguration ozoneConfiguration,
ReconContainerManager containerManager) {
this(scmServiceProvider, ozoneConfiguration, containerManager, null);
}

ReconStorageContainerSyncHelper(StorageContainerServiceProvider scmServiceProvider,
OzoneConfiguration ozoneConfiguration,
ReconContainerManager containerManager,
ReconScmContainerSyncMetrics containerSyncMetrics) {
this.scmServiceProvider = scmServiceProvider;
this.ozoneConfiguration = ozoneConfiguration;
this.containerManager = containerManager;
this.containerSyncMetrics = containerSyncMetrics;
}

/**
Expand All @@ -167,8 +178,10 @@ public boolean syncWithSCMContainerInfo() {
*/
private boolean syncContainersForState(HddsProtos.LifeCycleState scmState,
boolean incrementalOpen) {
long startTime = Time.monotonicNow();
try {
long total = scmServiceProvider.getContainerCount(scmState);
updateContainerCountDrift(scmState, total);
if (total == 0) {
LOG.debug("{} sync: no containers found in SCM.", scmState);
return true;
Expand Down Expand Up @@ -222,6 +235,8 @@ private boolean syncContainersForState(HddsProtos.LifeCycleState scmState,
} catch (Exception e) {
LOG.error("{} sync: unexpected error.", scmState, e);
return false;
} finally {
updateContainerSyncDuration(scmState, Time.monotonicNow() - startTime);
}
}

Expand Down Expand Up @@ -359,7 +374,9 @@ private int rebuildContainerFromScm(ContainerID containerID,
* @return {@code true} if all RPC calls completed without error
*/
private boolean syncDeletedContainers() {
long startTime = Time.monotonicNow();
try {
updateDeletedContainerCountDrift();
int configuredBatch = ozoneConfiguration.getInt(
OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE,
OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT);
Expand Down Expand Up @@ -390,6 +407,39 @@ private boolean syncDeletedContainers() {
} catch (Exception e) {
LOG.error("DELETED sync: unexpected error.", e);
return false;
} finally {
updateContainerSyncDuration(HddsProtos.LifeCycleState.DELETED,
Time.monotonicNow() - startTime);
}
}

private void updateDeletedContainerCountDrift() {
if (containerSyncMetrics == null) {
return;
}
try {
long total = scmServiceProvider.getContainerCount(
HddsProtos.LifeCycleState.DELETED);
updateContainerCountDrift(HddsProtos.LifeCycleState.DELETED, total);
} catch (Exception e) {
LOG.warn("DELETED sync: unable to update pre-sync count drift metric.", e);
}
}

private void updateContainerCountDrift(HddsProtos.LifeCycleState state,
long scmCount) {
if (containerSyncMetrics == null) {
return;
}
long reconCount = containerManager.getContainerStateCount(state);
containerSyncMetrics.setLastContainerCountDrift(state,
scmCount - reconCount);
}

private void updateContainerSyncDuration(HddsProtos.LifeCycleState state,
long durationMs) {
if (containerSyncMetrics != null) {
containerSyncMetrics.setLastContainerSyncDurationMs(state, durationMs);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.recon.metrics;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import static org.apache.ozone.test.MetricsAsserts.eqName;
import static org.apache.ozone.test.MetricsAsserts.getLongGauge;
import static org.apache.ozone.test.MetricsAsserts.getMetrics;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* Tests for Recon SCM container sync metrics.
*/
class TestReconScmContainerSyncMetrics {

private ReconScmContainerSyncMetrics metrics;

@BeforeEach
void setUp() {
metrics = ReconScmContainerSyncMetrics.create();
}

@AfterEach
void tearDown() {
metrics.unRegister();
}

@Test
void testStateMetricsAreEmittedForReconciledStatesOnly() {
metrics.setLastContainerSyncDurationMs(OPEN, 10L);
metrics.setLastContainerSyncDurationMs(QUASI_CLOSED, 20L);
metrics.setLastContainerSyncDurationMs(CLOSED, 30L);
metrics.setLastContainerSyncDurationMs(DELETED, 40L);
metrics.setLastContainerCountDrift(OPEN, 2L);
metrics.setLastContainerCountDrift(QUASI_CLOSED, 0L);
metrics.setLastContainerCountDrift(CLOSED, -3L);
metrics.setLastContainerCountDrift(DELETED, 4L);
metrics.setLastContainerCountDrift(CLOSING, 100L);
metrics.setLastContainerSyncDurationMs(DELETING, 200L);

MetricsRecordBuilder builder = getMetrics(metrics);

assertEquals(10L, getLongGauge("lastOpenContainerSyncDurationMs", builder));
assertEquals(20L,
getLongGauge("lastQuasiClosedContainerSyncDurationMs", builder));
assertEquals(30L, getLongGauge("lastClosedContainerSyncDurationMs", builder));
assertEquals(40L, getLongGauge("lastDeletedContainerSyncDurationMs", builder));
assertEquals(2L, getLongGauge("lastOpenContainerCountDrift", builder));
assertEquals(0L,
getLongGauge("lastQuasiClosedContainerCountDrift", builder));
assertEquals(-3L, getLongGauge("lastClosedContainerCountDrift", builder));
assertEquals(4L, getLongGauge("lastDeletedContainerCountDrift", builder));

verify(builder, never()).addGauge(
eqName(info("lastClosingContainerSyncDurationMs", "")), eq(100L));
verify(builder, never()).addGauge(
eqName(info("lastDeletingContainerSyncDurationMs", "")), eq(200L));
verify(builder, never()).addGauge(
eqName(info("lastClosingContainerCountDrift", "")), eq(100L));
verify(builder, never()).addGauge(
eqName(info("lastDeletingContainerCountDrift", "")), eq(200L));
}
}
Loading