From: Tony Tkacik
Date: Thu, 14 Jan 2016 17:42:33 +0000 (+0100)
Subject: Bug 1435: CDS: Added support for custom commit cohort.
X-Git-Tag: release/boron~202
X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1d3c54640b9fff649fe8d0f57e20d56f8f936cc1
Bug 1435: CDS: Added support for custom commit cohort.
Implemented support for user provided commit
cohorts, which implements DOMDataTreeCommitCohort
interface contract.
Messages are only in-JVM so cohort needs to be colocated
with replica.
Change-Id: I04e592c0232383e70fa8944b966b1aa341730a98
Signed-off-by: Tony Tkacik
---
diff --git a/features/mdsal/src/main/features/features.xml b/features/mdsal/src/main/features/features.xml
index ca5593d993..88a18f7543 100644
--- a/features/mdsal/src/main/features/features.xml
+++ b/features/mdsal/src/main/features/features.xml
@@ -32,7 +32,7 @@
odl-yangtools-common
- odl-mdsal-binding-runtime
+ odl-mdsal-binding-dom-adapter
odl-mdsal-models
odl-mdsal-common
odl-config-startup
diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml
index d545cf49ec..f2719bccc8 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml
+++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml
@@ -129,6 +129,10 @@
sal-dom-broker-config
+
+ org.opendaylight.mdsal
+ mdsal-dom-api
+
org.opendaylight.controller
sal-core-spi
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java
index e8b19b8346..02362b9371 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ConcurrentDOMDataBroker.java
@@ -29,6 +29,10 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.util.DurationStatisticsTracker;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.Logger;
@@ -42,7 +46,7 @@ import org.slf4j.LoggerFactory;
* @author Thomas Pantelis
*/
@Beta
-public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
+public class ConcurrentDOMDataBroker extends AbstractDOMBroker implements DOMDataTreeCommitCohortRegistry {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
private static final String CAN_COMMIT = "CAN_COMMIT";
private static final String PRE_COMMIT = "PRE_COMMIT";
@@ -325,4 +329,25 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
}
}
}
+
+ @Override
+ public DOMDataTreeCommitCohortRegistration registerCommitCohort(
+ DOMDataTreeIdentifier path, T cohort) {
+ DOMStore store = getTxFactories().get(toLegacy(path.getDatastoreType()));
+ if (store instanceof DOMDataTreeCommitCohortRegistry) {
+ return ((DOMDataTreeCommitCohortRegistry) store).registerCommitCohort(path, cohort);
+ }
+ throw new UnsupportedOperationException("Commit cohort is not supported for " + path);
+ }
+
+ private LogicalDatastoreType toLegacy(org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType) {
+ switch (datastoreType) {
+ case CONFIGURATION:
+ return LogicalDatastoreType.CONFIGURATION;
+ case OPERATIONAL:
+ return LogicalDatastoreType.OPERATIONAL;
+ default:
+ throw new IllegalArgumentException("Unsupported data store type: " + datastoreType);
+ }
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java
new file mode 100644
index 0000000000..07ff936b2f
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Status;
+import akka.actor.Status.Failure;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.dispatch.Recover;
+import akka.japi.Function;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+/**
+ *
+ * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
+ *
+ * It tracks current operation and list of cohorts which successfuly finished previous phase in
+ * case, if abort is necessary to invoke it only on cohort steps which are still active.
+ *
+ */
+class CompositeDataTreeCohort {
+
+ private enum State {
+ /**
+ * Cohorts are idle, no messages were sent.
+ */
+ IDLE,
+ /**
+ * CanCommit message was sent to all participating cohorts.
+ */
+ CAN_COMMIT_SENT,
+ /**
+ * Successful canCommit responses were received from every participating cohort.
+ */
+ CAN_COMMIT_SUCCESSFUL,
+ /**
+ * PreCommit message was sent to all participating cohorts.
+ */
+ PRE_COMMIT_SENT,
+ /**
+ * Successful preCommit responses were received from every participating cohort.
+ */
+ PRE_COMMIT_SUCCESSFUL,
+ /**
+ * Commit message was send to all participating cohorts.
+ */
+ COMMIT_SENT,
+ /**
+ * Successful commit responses were received from all participating cohorts.
+ */
+ COMMITED,
+ /**
+ * Some of cohorts responsed back with unsuccessful message.
+ *
+ */
+ FAILED,
+ /**
+ *
+ * Abort message was send to all cohorts which responded with success previously.
+ *
+ */
+ ABORTED
+ }
+
+ protected static final Recover
*/
public class Shard extends RaftActor {
+
@VisibleForTesting
static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
@Override
@@ -269,6 +270,9 @@ public class Shard extends RaftActor {
context().parent().forward(message, context());
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+ commitCoordinator.processCohortRegistryCommand(getSender(),
+ (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else {
super.handleNonRaftCommand(message);
}
@@ -334,9 +338,9 @@ public class Shard extends RaftActor {
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
- if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
- shardMBean.incrementFailedTransactionsCount();
- }
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+ shardMBean.incrementFailedTransactionsCount();
+ }
} else {
ActorSelection leader = getLeader();
if (leader == null) {
@@ -354,7 +358,7 @@ public class Shard extends RaftActor {
try {
try {
- cohortEntry.commit();
+ cohortEntry.commit();
} catch(ExecutionException e) {
// We may get a "store tree and candidate base differ" IllegalStateException from commit under
// certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
@@ -432,7 +436,7 @@ public class Shard extends RaftActor {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
@@ -447,7 +451,7 @@ public class Shard extends RaftActor {
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
- commitCoordinator.handleBatchedModifications(batched, sender, this);
+ commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext());
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
@@ -516,7 +520,7 @@ public class Shard extends RaftActor {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
try {
- commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext());
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
@@ -540,7 +544,8 @@ public class Shard extends RaftActor {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
- commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+ store.getSchemaContext());
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
index 739321b068..45fa7727e4 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
+import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
@@ -24,6 +25,8 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -39,7 +42,9 @@ import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModifi
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
+import scala.concurrent.duration.Duration;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@ -59,6 +64,8 @@ class ShardCommitCoordinator {
private final ShardDataTree dataTree;
+ private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
+
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
private final Queue queuedCohortEntries = new LinkedList<>();
@@ -78,8 +85,11 @@ class ShardCommitCoordinator {
private Runnable runOnPendingTransactionsComplete;
- ShardCommitCoordinator(ShardDataTree dataTree,
- long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
+
+ private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+
+ ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
+ String name) {
this.queueCapacity = queueCapacity;
this.log = log;
@@ -119,7 +129,7 @@ class ShardCommitCoordinator {
} else {
cohortCache.remove(cohortEntry.getTransactionID());
- RuntimeException ex = new RuntimeException(
+ final RuntimeException ex = new RuntimeException(
String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
" capacity %d has been reached.",
name, cohortEntry.getTransactionID(), queueCapacity));
@@ -136,13 +146,15 @@ class ShardCommitCoordinator {
* @param ready the ForwardedReadyTransaction message to process
* @param sender the sender of the message
* @param shard the transaction's shard actor
+ * @param schema
*/
- void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+ void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
+ SchemaContext schema) {
log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionID(), ready.getTxnClientVersion());
- ShardDataTreeCohort cohort = ready.getTransaction().ready();
- CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
+ final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+ final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
cohortCache.put(ready.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -171,12 +183,12 @@ class ShardCommitCoordinator {
* @param sender the sender of the message
* @param shard the transaction's shard actor
*/
- void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
+ void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
- dataTree.newReadWriteTransaction(batched.getTransactionID(),
- batched.getTransactionChainID()), batched.getVersion());
+ dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
+ cohortRegistry, schema, batched.getVersion());
cohortCache.put(batched.getTransactionID(), cohortEntry);
}
@@ -225,16 +237,18 @@ class ShardCommitCoordinator {
/**
* This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
- * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+ * been prepared beforehand by the sender and we just need to drive them through into the
+ * dataTree.
*
* @param message the ReadyLocalTransaction message to process
* @param sender the sender of the message
* @param shard the transaction's shard actor
*/
- void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+ void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard,
+ SchemaContext schema) {
final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
message.getTransactionID());
- final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
DataStoreVersions.CURRENT_VERSION);
cohortCache.put(message.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
@@ -503,6 +517,7 @@ class ShardCommitCoordinator {
private List getAndClearPendingCohortEntries() {
List cohortEntries = new ArrayList<>();
+
if(currentCohortEntry != null) {
cohortEntries.add(currentCohortEntry);
cohortCache.remove(currentCohortEntry.getTransactionID());
@@ -538,7 +553,7 @@ class ShardCommitCoordinator {
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
cohortEntry.getClientVersion(), ""));
- }
+ }
return newModifications.getLast();
}
@@ -621,9 +636,9 @@ class ShardCommitCoordinator {
private void maybeProcessNextCohortEntry() {
// Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
// clean out expired entries.
- Iterator iter = queuedCohortEntries.iterator();
+ final Iterator iter = queuedCohortEntries.iterator();
while(iter.hasNext()) {
- CohortEntry next = iter.next();
+ final CohortEntry next = iter.next();
if(next.isReadyToCommit()) {
if(currentCohortEntry == null) {
if(log.isDebugEnabled()) {
@@ -674,6 +689,10 @@ class ShardCommitCoordinator {
this.cohortDecorator = cohortDecorator;
}
+ void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
+ cohortRegistry.process(sender, message);
+ }
+
static class CohortEntry {
enum State {
PENDING,
@@ -681,7 +700,7 @@ class ShardCommitCoordinator {
PRE_COMMITTED,
COMMITTED,
ABORTED
- }
+ }
private final String transactionID;
private ShardDataTreeCohort cohort;
@@ -694,18 +713,23 @@ class ShardCommitCoordinator {
private int totalBatchedModificationsReceived;
private State state = State.PENDING;
private final short clientVersion;
+ private final CompositeDataTreeCohort userCohorts;
- CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
+ CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
+ DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionID = transactionID;
this.clientVersion = clientVersion;
+ this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
- CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
+ CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
+ SchemaContext schema, short clientVersion) {
this.transactionID = transactionID;
this.cohort = cohort;
this.transaction = null;
this.clientVersion = clientVersion;
+ this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
void updateLastAccessTime() {
@@ -770,19 +794,25 @@ class ShardCommitCoordinator {
return cohort.canCommit().get();
}
- void preCommit() throws InterruptedException, ExecutionException {
+
+
+ void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
state = State.PRE_COMMITTED;
cohort.preCommit().get();
+ userCohorts.canCommit(cohort.getCandidate());
+ userCohorts.preCommit();
}
- void commit() throws InterruptedException, ExecutionException {
+ void commit() throws InterruptedException, ExecutionException, TimeoutException {
state = State.COMMITTED;
cohort.commit().get();
+ userCohorts.commit();
}
- void abort() throws InterruptedException, ExecutionException {
+ void abort() throws InterruptedException, ExecutionException, TimeoutException {
state = State.ABORTED;
cohort.abort().get();
+ userCohorts.abort();
}
void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
@@ -837,7 +867,7 @@ class ShardCommitCoordinator {
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
.append(doImmediateCommit).append("]");
return builder.toString();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
index ad5aab3453..618ea90e26 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
@@ -214,6 +214,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return dataTree.takeSnapshot().newModification();
}
+ // FIXME: This should be removed, it violates encapsulation
public DataTreeCandidate commit(DataTreeModification modification) throws DataValidationFailedException {
modification.ready();
dataTree.validate(modification);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
index f7cdd4e8dc..47876123cf 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
@@ -17,9 +17,12 @@ public abstract class ShardDataTreeCohort {
// Prevent foreign instantiation
}
+ // FIXME: This leaks internal state generated in preCommit,
+ // should be result of canCommit
abstract DataTreeCandidateTip getCandidate();
abstract DataTreeModification getDataTreeModification();
+ // FIXME: Should return rebased DataTreeCandidateTip
@VisibleForTesting
public abstract ListenableFuture canCommit();
@VisibleForTesting
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java
new file mode 100644
index 0000000000..84521e5ce6
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.cluster.Cluster;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.common.api.DataValidationFailedException;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.PostCanCommitStep;
+import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
+
+public class DataTreeCohortIntegrationTest {
+
+ private static final DataValidationFailedException FAILED_CAN_COMMIT =
+ new DataValidationFailedException(YangInstanceIdentifier.class, TestModel.TEST_PATH, "Test failure.");
+ private static final CheckedFuture FAILED_CAN_COMMIT_FUTURE =
+ Futures.immediateFailedCheckedFuture(FAILED_CAN_COMMIT);
+
+ private static final DOMDataTreeIdentifier TEST_ID =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+ private static final Timeout TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+
+ private static ActorSystem system;
+
+ private final DatastoreContext.Builder datastoreContextBuilder =
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ final Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+ Cluster.get(system).join(member1Address);
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ protected ActorSystem getSystem() {
+ return system;
+ }
+
+ @Test
+ public void registerNoopCohortTest() throws Exception {
+ final DOMDataTreeCommitCohort cohort = mock(DOMDataTreeCommitCohort.class);
+ Mockito.doReturn(PostCanCommitStep.NOOP_SUCCESS_FUTURE).when(cohort).canCommit(any(Object.class),
+ any(DOMDataTreeCandidate.class), any(SchemaContext.class));
+ ArgumentCaptor candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class);
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+ {
+ final DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1");
+ final ObjectRegistration cohortReg = dataStore.registerCommitCohort(TEST_ID, cohort);
+ Thread.sleep(1000); // Registration is asynchronous
+ assertNotNull(cohortReg);
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ Mockito.verify(cohort).canCommit(any(Object.class), candidateCapt.capture(), any(SchemaContext.class));
+ DOMDataTreeCandidate candidate = candidateCapt.getValue();
+ assertNotNull(candidate);
+ assertEquals(TEST_ID, candidate.getRootPath());
+ testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+ Mockito.verify(cohort, Mockito.times(2)).canCommit(any(Object.class), any(DOMDataTreeCandidate.class),
+ any(SchemaContext.class));
+ cohortReg.close();
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ Mockito.verifyNoMoreInteractions(cohort);
+ cleanup(dataStore);
+ }
+ };
+ }
+
+ @Test
+ public void failCanCommitTest() throws Exception {
+ final DOMDataTreeCommitCohort failedCohort = mock(DOMDataTreeCommitCohort.class);
+
+ Mockito.doReturn(FAILED_CAN_COMMIT_FUTURE).when(failedCohort).canCommit(any(Object.class),
+ any(DOMDataTreeCandidate.class), any(SchemaContext.class));
+
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+ {
+ final DistributedDataStore dataStore =
+ setupDistributedDataStore("transactionIntegrationTest", "test-1");
+ dataStore.registerCommitCohort(TEST_ID, failedCohort);
+ Thread.sleep(1000); // Registration is asynchronous
+
+ DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
+ try {
+ // FIXME: Weird thing is that invoking canCommit on front-end invokes also
+ // preCommit on backend.
+ dsCohort.canCommit().get();
+ fail("Exception should be raised.");
+ } catch (Exception e) {
+ assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e));
+ }
+ cleanup(dataStore);
+ }
+ };
+ }
+
+ /**
+ *
+ * FIXME: Weird thing is that invoking canCommit on front-end invokes also preCommit on backend
+ * so we can not test abort after can commit.
+ *
+ */
+ @Test
+ @Ignore
+ public void canCommitSuccessExternallyAborted() throws Exception {
+ final DOMDataTreeCommitCohort cohortToAbort = mock(DOMDataTreeCommitCohort.class);
+ final PostCanCommitStep stepToAbort = mock(PostCanCommitStep.class);
+ Mockito.doReturn(Futures.immediateCheckedFuture(stepToAbort)).when(cohortToAbort).canCommit(any(Object.class),
+ any(DOMDataTreeCandidate.class), any(SchemaContext.class));
+ Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort();
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+ {
+ final DistributedDataStore dataStore =
+ setupDistributedDataStore("transactionIntegrationTest", "test-1");
+ dataStore.registerCommitCohort(TEST_ID, cohortToAbort);
+ Thread.sleep(1000); // Registration is asynchronous
+
+ DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready();
+
+ dsCohort.canCommit().get();
+ dsCohort.abort().get();
+ Mockito.verify(stepToAbort, Mockito.times(1)).abort();
+ cleanup(dataStore);
+ }
+ };
+ }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
index 3e2313dc27..2846397160 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
@@ -19,6 +19,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
@@ -1552,6 +1553,7 @@ public class ShardTest extends AbstractShardTest {
final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
doReturn(candidate).when(cohort).getCandidate();
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
@@ -1597,6 +1599,7 @@ public class ShardTest extends AbstractShardTest {
final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
doReturn(candidate).when(cohort).getCandidate();
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());