private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = true;
private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
+ private boolean transactionDebugContextEnabled = false;
public static Set<String> getGlobalDatastoreTypes() {
return globalDatastoreTypes;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
+ this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return shardCommitQueueExpiryTimeoutInMillis;
}
+ public boolean isTransactionDebugContextEnabled() {
+ return transactionDebugContextEnabled;
+ }
+
public static class Builder {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
return this;
}
+ public Builder transactionDebugContextEnabled(boolean value) {
+ datastoreContext.transactionDebugContextEnabled = value;
+ return this;
+ }
+
public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
return this;
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * An AbstractThreePhaseCommitCohort implementation used for debugging. If a failure occurs, the transaction
+ * call site is printed.
+ *
+ * @author Thomas Pantelis
+ */
+class DebugThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort<Object> {
+ private static final Logger LOG = LoggerFactory.getLogger(DebugThreePhaseCommitCohort.class);
+
+ private final AbstractThreePhaseCommitCohort<?> delegate;
+ private final Throwable debugContext;
+ private final TransactionIdentifier transactionId;
+ private Logger log = LOG;
+
+ DebugThreePhaseCommitCohort(TransactionIdentifier transactionId, AbstractThreePhaseCommitCohort<?> delegate,
+ Throwable debugContext) {
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.debugContext = Preconditions.checkNotNull(debugContext);
+ this.transactionId = Preconditions.checkNotNull(transactionId);
+ }
+
+ private <V> ListenableFuture<V> addFutureCallback(ListenableFuture<V> future) {
+ Futures.addCallback(future, new FutureCallback<V>() {
+ @Override
+ public void onSuccess(V result) {
+ // no-op
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("Transaction {} failed with error \"{}\" - was allocated in the following context",
+ transactionId, t, debugContext);
+ }
+ });
+
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return addFutureCallback(delegate.canCommit());
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return addFutureCallback(delegate.preCommit());
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ return addFutureCallback(delegate.commit());
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegate.abort();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ List<Future<Object>> getCohortFutures() {
+ return ((AbstractThreePhaseCommitCohort)delegate).getCohortFutures();
+ }
+
+ @VisibleForTesting
+ void setLogger(Logger log) {
+ this.log = log;
+ }
+}
@VisibleForTesting
public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
- super(txContextFactory.nextIdentifier(), false);
+ super(txContextFactory.nextIdentifier(), txContextFactory.getActorContext().getDatastoreContext()
+ .isTransactionDebugContextEnabled());
this.txContextFactory = txContextFactory;
this.type = Preconditions.checkNotNull(type);
}
txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
- return ret;
+
+ final Throwable debugContext = getDebugContext();
+ return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
}
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
long getTransactionCreationInitialRateLimit();
+ boolean getTransactionContextDebugEnabled();
+
int getMaxShardDataChangeExecutorPoolSize();
int getMaxShardDataChangeExecutorQueueSize();
return context.getTransactionCreationInitialRateLimit();
}
+ @Override
+ public boolean getTransactionContextDebugEnabled() {
+ return context.isTransactionDebugContextEnabled();
+ }
+
@Override
public int getMaxShardDataChangeExecutorPoolSize() {
return context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
.shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
.shardCommitQueueExpiryTimeoutInSeconds(
props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
+ .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
.build();
return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
.shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
.shardCommitQueueExpiryTimeoutInSeconds(
props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
+ .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
.build();
return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
description "Enable or disable data persistence";
}
- leaf shard-isolated-leader-check-interval-in-millis {
+ leaf shard-isolated-leader-check-interval-in-millis {
default 5000;
type heartbeat-interval-type;
description "The interval at which the leader of the shard will check if its majority
followers are active and term itself as isolated";
- }
+ }
- leaf transaction-creation-initial-rate-limit {
+ leaf transaction-creation-initial-rate-limit {
default 100;
type non-zero-uint32-type;
description "The initial number of transactions per second that are allowed before the data store
should begin applying back pressure. This number is only used as an initial guidance,
subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
- }
+ }
+
+ leaf transaction-debug-context-enabled {
+ default false;
+ type boolean;
+ description "Enable or disable transaction context debug. This will log the call site trace for
+ transactions that fail";
+ }
}
// Augments the 'configuration' choice node under modules/module.
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.slf4j.Logger;
+import scala.concurrent.Future;
+
+/**
+ * Unit tests for DebugThreePhaseCommitCohort.
+ *
+ * @author Thomas Pantelis
+ */
+public class DebugThreePhaseCommitCohortTest {
+
+ @Test
+ public void test() {
+ AbstractThreePhaseCommitCohort<?> mockDelegate = mock(AbstractThreePhaseCommitCohort.class);
+ Exception failure = new Exception("mock failure");
+ ListenableFuture<Object> expFailedFuture = Futures.immediateFailedFuture(failure);
+ doReturn(expFailedFuture).when(mockDelegate).canCommit();
+ doReturn(expFailedFuture).when(mockDelegate).preCommit();
+ doReturn(expFailedFuture).when(mockDelegate).commit();
+
+ ListenableFuture<Object> expAbortFuture = Futures.immediateFuture(null);
+ doReturn(expAbortFuture).when(mockDelegate).abort();
+
+ List<Future<Object>> expCohortFutures = new ArrayList<>();
+ doReturn(expCohortFutures).when(mockDelegate).getCohortFutures();
+
+ TransactionIdentifier transactionId = TransactionIdentifier.create("1", 1, "");
+ Throwable debugContext = new RuntimeException("mock");
+ DebugThreePhaseCommitCohort cohort = new DebugThreePhaseCommitCohort(transactionId , mockDelegate , debugContext );
+
+ Logger mockLogger = mock(Logger.class);
+ cohort.setLogger(mockLogger);
+
+ assertSame("canCommit", expFailedFuture, cohort.canCommit());
+ verify(mockLogger).warn(anyString(), same(transactionId), same(failure), same(debugContext));
+
+ reset(mockLogger);
+ assertSame("preCommit", expFailedFuture, cohort.preCommit());
+ verify(mockLogger).warn(anyString(), same(transactionId), same(failure), same(debugContext));
+
+ reset(mockLogger);
+ assertSame("commit", expFailedFuture, cohort.commit());
+ verify(mockLogger).warn(anyString(), same(transactionId), same(failure), same(debugContext));
+
+ assertSame("abort", expAbortFuture, cohort.abort());
+
+ assertSame("getCohortFutures", expCohortFutures, cohort.getCohortFutures());
+
+ reset(mockLogger);
+ ListenableFuture<Boolean> expSuccessFuture = Futures.immediateFuture(Boolean.TRUE);
+ doReturn(expSuccessFuture).when(mockDelegate).canCommit();
+
+ assertSame("canCommit", expSuccessFuture, cohort.canCommit());
+ verify(mockLogger, never()).warn(anyString(), any(TransactionIdentifier.class), any(Throwable.class),
+ any(Throwable.class));
+ }
+}
verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
}
+ @Test
+ public void testReadyWithDebugContextEnabled() throws Exception {
+ dataStoreContextBuilder.transactionDebugContextEnabled(true);
+
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ expectBatchedModificationsReady(actorRef, true);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+ transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof DebugThreePhaseCommitCohort);
+
+ verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
+ }
+
private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());