From 731e7284cf0895fdb1b89427f91762e80e67c2ff Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 12 Jun 2018 07:54:30 -0400 Subject: [PATCH] Fix shard deadlock in 3 nodes JIRA: CONTROLLER-1836 Change-Id: I10a9cb43bcdb35f66abebb054f37c05e7fda54e7 Signed-off-by: Tom Pantelis --- .../datastore/AbstractFrontendHistory.java | 4 +- .../datastore/ChainedCommitCohort.java | 7 + .../cluster/datastore/CohortEntry.java | 10 +- .../cluster/datastore/DataStoreVersions.java | 3 +- .../FrontendReadWriteTransaction.java | 5 +- .../datastore/LocalFrontendHistory.java | 7 +- .../LocalThreePhaseCommitCohort.java | 13 +- .../datastore/LocalTransactionContext.java | 7 +- .../datastore/NoOpTransactionContext.java | 5 +- .../ReadWriteShardDataTreeTransaction.java | 6 +- .../datastore/RemoteTransactionContext.java | 17 +- .../controller/cluster/datastore/Shard.java | 12 +- .../datastore/ShardCommitCoordinator.java | 13 +- .../cluster/datastore/ShardDataTree.java | 126 +++- .../datastore/ShardDataTreeCohort.java | 4 + .../ShardDataTreeTransactionChain.java | 12 +- .../ShardDataTreeTransactionParent.java | 8 +- .../datastore/ShardWriteTransaction.java | 8 +- .../datastore/SimpleShardDataTreeCohort.java | 43 +- .../datastore/StandaloneFrontendHistory.java | 7 +- .../ThreePhaseCommitCohortProxy.java | 2 +- .../cluster/datastore/TransactionContext.java | 4 +- .../datastore/TransactionContextWrapper.java | 8 +- .../cluster/datastore/TransactionProxy.java | 53 +- ...EntityOwnershipShardCommitCoordinator.java | 6 +- .../messages/BatchedModifications.java | 52 +- .../messages/ForwardedReadyTransaction.java | 23 +- .../messages/ReadyLocalTransaction.java | 19 +- .../ReadyLocalTransactionSerializer.java | 2 +- .../cluster/datastore/AbstractShardTest.java | 26 +- .../AbstractTransactionProxyTest.java | 12 +- ...butedDataStoreRemotingIntegrationTest.java | 11 +- .../FrontendReadWriteTransactionTest.java | 14 +- .../LocalTransactionContextTest.java | 10 +- .../ShardCommitCoordinationTest.java | 563 ++++++++++++++++++ .../cluster/datastore/ShardDataTreeTest.java | 8 +- .../cluster/datastore/ShardTest.java | 16 +- .../datastore/ShardTransactionTest.java | 8 +- .../SimpleShardDataTreeCohortTest.java | 2 +- .../datastore/TransactionProxyTest.java | 54 +- .../messages/BatchedModificationsTest.java | 44 +- .../ReadyLocalTransactionSerializerTest.java | 11 +- .../md/cluster/datastore/model/TestModel.java | 8 + .../src/test/resources/application.conf | 15 + 44 files changed, 1117 insertions(+), 171 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 89dd59a5f5..42c0541662 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -16,6 +16,7 @@ import com.google.common.primitives.UnsignedLong; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.SortedSet; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; @@ -208,7 +209,8 @@ abstract class AbstractFrontendHistory implements Identifiable> participatingShardNames); @Override public String toString() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java index 19c0627463..2cb4dee374 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java @@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; @@ -87,4 +89,9 @@ final class ChainedCommitCohort extends ShardDataTreeCohort { public State getState() { return delegate.getState(); } + + @Override + Optional> getParticipatingShardNames() { + return delegate.getParticipatingShardNames(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java index a66a49685f..3d5238ee77 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java @@ -11,6 +11,8 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -109,10 +111,10 @@ final class CohortEntry { cohort.abort(callback); } - void ready(final CohortDecorator cohortDecorator) { + void ready(final Optional> participatingShardNames, final CohortDecorator cohortDecorator) { Preconditions.checkState(cohort == null, "cohort was already set"); - cohort = transaction.ready(); + cohort = transaction.ready(participatingShardNames); if (cohortDecorator != null) { // Call the hook for unit tests. @@ -120,6 +122,10 @@ final class CohortEntry { } } + Optional> getParticipatingShardNames() { + return cohort != null ? cohort.getParticipatingShardNames() : Optional.empty(); + } + boolean isDoImmediateCommit() { return doImmediateCommit; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java index 3a24260a88..0b4115a6fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java @@ -18,5 +18,6 @@ public interface DataStoreVersions { short HELIUM_2_VERSION = 2; short LITHIUM_VERSION = 3; short BORON_VERSION = 5; - short CURRENT_VERSION = BORON_VERSION; + short FLUORINE_VERSION = 9; + short CURRENT_VERSION = FLUORINE_VERSION; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 5af7c7954e..d5fcc5e741 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -513,7 +513,8 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { if (optFailure.isPresent()) { state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get())); } else { - state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification)); + state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification, + java.util.Optional.empty())); } if (request.isCoordinated()) { @@ -611,7 +612,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } applyModifications(modifications); - state = new Ready(checkOpen().ready()); + state = new Ready(checkOpen().ready(java.util.Optional.empty())); LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java index d7c30121bf..73d2c9f805 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java @@ -14,6 +14,8 @@ import com.google.common.collect.TreeRangeSet; import com.google.common.primitives.UnsignedLong; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -75,7 +77,8 @@ final class LocalFrontendHistory extends AbstractFrontendHistory { } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) { - return chain.createReadyCohort(id, mod); + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod, + final Optional> participatingShardNames) { + return chain.createReadyCohort(id, mod, participatingShardNames); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java index 23207f01ce..8d0068172a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java @@ -12,6 +12,8 @@ import akka.dispatch.Futures; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; @@ -59,18 +61,19 @@ class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { this.modification = null; } - private Future initiateCommit(final boolean immediate) { + private Future initiateCommit(final boolean immediate, + final Optional> participatingShardNames) { if (operationError != null) { return Futures.failed(operationError); } final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier(), - modification, immediate); + modification, immediate, participatingShardNames); return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout()); } - Future initiateCoordinatedCommit() { - final Future messageFuture = initiateCommit(false); + Future initiateCoordinatedCommit(Optional> participatingShardNames) { + final Future messageFuture = initiateCommit(false, participatingShardNames); final Future ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext, transaction.getIdentifier()); ret.onComplete(new OnComplete() { @@ -90,7 +93,7 @@ class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { } Future initiateDirectCommit() { - final Future messageFuture = initiateCommit(true); + final Future messageFuture = initiateCommit(true, Optional.empty()); messageFuture.onComplete(new OnComplete() { @Override public void onComplete(final Throwable failure, final Object message) throws Throwable { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java index 9cf5312c1e..f4ade4ff2a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java @@ -13,6 +13,8 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; @@ -78,9 +80,10 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { } @Override - public Future readyTransaction(final Boolean havePermit) { + public Future readyTransaction(final Boolean havePermit, + final Optional> participatingShardNames) { final LocalThreePhaseCommitCohort cohort = ready(); - return cohort.initiateCoordinatedCommit(); + return cohort.initiateCoordinatedCommit(participatingShardNames); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index d14a936ac3..03ed5ad0ea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; @@ -41,7 +43,8 @@ final class NoOpTransactionContext extends AbstractTransactionContext { } @Override - public Future readyTransaction(final Boolean havePermit) { + public Future readyTransaction(final Boolean havePermit, + final Optional> participatingShardNamess) { LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure); return akka.dispatch.Futures.failed(failure); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java index fe5dba8a38..f28d0d08b3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -18,8 +20,8 @@ public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTr super(parent, id, modification); } - ShardDataTreeCohort ready() { + ShardDataTreeCohort ready(Optional> participatingShardNames) { Preconditions.checkState(close(), "Transaction is already closed"); - return getParent().finishTransaction(this); + return getParent().finishTransaction(this, participatingShardNames); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 27969b3e8e..ce4bda74cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -13,6 +13,8 @@ import akka.dispatch.Futures; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -80,11 +82,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send the remaining batched modifications, if any, with the ready flag set. bumpPermits(havePermit); - return sendBatchedModifications(true, true); + return sendBatchedModifications(true, true, Optional.empty()); } @Override - public Future readyTransaction(final Boolean havePermit) { + public Future readyTransaction(final Boolean havePermit, + final Optional> participatingShardNames) { logModificationCount(); LOG.debug("Tx {} readyTransaction called", getIdentifier()); @@ -92,7 +95,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send the remaining batched modifications, if any, with the ready flag set. bumpPermits(havePermit); - Future lastModificationsFuture = sendBatchedModifications(true, false); + Future lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames); return transformReadyReply(lastModificationsFuture); } @@ -133,10 +136,11 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } protected Future sendBatchedModifications() { - return sendBatchedModifications(false, false); + return sendBatchedModifications(false, false, Optional.empty()); } - protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) { + protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady, + final Optional> participatingShardNames) { Future sent = null; if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) { if (batchedModifications == null) { @@ -146,7 +150,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext { LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(), batchedModifications.getModifications().size(), ready); - batchedModifications.setReady(ready); batchedModifications.setDoCommitOnReady(doCommitOnReady); batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); @@ -155,6 +158,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext { batchPermits = 0; if (ready) { + batchedModifications.setReady(participatingShardNames); + batchedModifications.setDoCommitOnReady(doCommitOnReady); batchedModifications = null; } else { batchedModifications = newBatchedModifications(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 47a25d7ea3..c3cd8ef80f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -161,6 +161,8 @@ public class Shard extends RaftActor { /// The name of this shard private final String name; + private final String shardName; + private final ShardStats shardMBean; private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean; @@ -201,6 +203,7 @@ public class Shard extends RaftActor { Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); this.name = builder.getId().toString(); + this.shardName = builder.getId().getShardName(); this.datastoreContext = builder.getDatastoreContext(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); this.frontendMetadata = new FrontendMetadata(name); @@ -586,6 +589,10 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + String getShardName() { + return shardName; + } + @Override protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { @@ -754,7 +761,8 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(), - forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit()); + forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(), + forwardedReady.getParticipatingShardNames()); readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(readyLocal, getContext()); } @@ -923,6 +931,8 @@ public class Shard extends RaftActor { messagesToForward.size(), leader); for (Object message : messagesToForward) { + LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message); + leader.tell(message, self()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 080d3eec23..0dd50d4790 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -103,7 +103,7 @@ final class ShardCommitCoordinator { log.debug("{}: Readying transaction {}, client version {}", name, ready.getTransactionId(), ready.getTxnClientVersion()); - final ShardDataTreeCohort cohort = ready.getTransaction().ready(); + final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion()); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); @@ -162,7 +162,7 @@ final class ShardCommitCoordinator { } cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); - cohortEntry.ready(cohortDecorator); + cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator); if (batched.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); @@ -187,7 +187,8 @@ final class ShardCommitCoordinator { */ void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) { final TransactionIdentifier txId = message.getTransactionId(); - final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification()); + final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(), + message.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); @@ -227,7 +228,9 @@ final class ShardCommitCoordinator { BatchedModifications last = newModifications.getLast(); last.setDoCommitOnReady(from.isDoCommitOnReady()); - last.setReady(from.isReady()); + if (from.isReady()) { + last.setReady(from.getParticipatingShardNames()); + } last.setTotalMessagesSent(newModifications.size()); return newModifications; } @@ -454,7 +457,7 @@ final class ShardCommitCoordinator { if (last != null) { final boolean immediate = cohortEntry.isDoImmediateCommit(); last.setDoCommitOnReady(immediate); - last.setReady(true); + last.setReady(cohortEntry.getParticipatingShardNames()); last.setTotalMessagesSent(newMessages.size()); messages.addAll(newMessages); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 6d9f78aeab..5f1e67e988 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -27,11 +27,14 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; +import java.util.SortedSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -100,6 +103,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.cohort = Preconditions.checkNotNull(cohort); lastAccess = now; } + + @Override + public String toString() { + return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]"; + } } private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); @@ -115,7 +123,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); - private final Queue pendingTransactions = new ArrayDeque<>(); + private final Deque pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); private final Queue pendingFinishCommits = new ArrayDeque<>(); @@ -643,11 +651,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction, + final java.util.Optional> participatingShardNames) { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return createReadyCohort(transaction.getIdentifier(), snapshot); + return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames); } void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { @@ -786,13 +795,108 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } if (!cohort.equals(head.cohort)) { - LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier()); - return; + // The tx isn't at the head of the queue so we can't start canCommit at this point. Here we check if this + // tx should be moved ahead of other tx's in the READY state in the pendingTransactions queue. If this tx + // has other participating shards, it could deadlock with other tx's accessing the same shards + // depending on the order the tx's are readied on each shard + // (see https://jira.opendaylight.org/browse/CONTROLLER-1836). Therefore, if the preceding participating + // shard names for a preceding pending tx, call it A, in the queue matches that of this tx, then this tx + // is allowed to be moved ahead of tx A in the queue so it is processed first to avoid potential deadlock + // if tx A is behind this tx in the pendingTransactions queue for a preceding shard. In other words, since + // canCommmit for this tx was requested before tx A, honor that request. If this tx is moved to the head of + // the queue as a result, then proceed with canCommit. + + Collection precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames()); + if (precedingShardNames.isEmpty()) { + LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier()); + return; + } + + LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}", + logContext, cohort.getIdentifier(), precedingShardNames); + final Iterator iter = pendingTransactions.iterator(); + int index = -1; + int moveToIndex = -1; + while (iter.hasNext()) { + final CommitEntry entry = iter.next(); + ++index; + + if (cohort.equals(entry.cohort)) { + if (moveToIndex < 0) { + LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit", + logContext, cohort.getIdentifier()); + return; + } + + LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue", + logContext, cohort.getIdentifier(), moveToIndex); + iter.remove(); + insertEntry(pendingTransactions, entry, moveToIndex); + + if (!cohort.equals(pendingTransactions.peek().cohort)) { + LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit", + logContext, cohort.getIdentifier()); + return; + } + + LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit", + logContext, cohort.getIdentifier()); + break; + } + + if (entry.cohort.getState() != State.READY) { + LOG.debug("{}: Skipping pending transaction {} in state {}", + logContext, entry.cohort.getIdentifier(), entry.cohort.getState()); + continue; + } + + final Collection pendingPrecedingShardNames = extractPrecedingShardNames( + entry.cohort.getParticipatingShardNames()); + + if (precedingShardNames.equals(pendingPrecedingShardNames)) { + if (moveToIndex < 0) { + LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}", + logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index); + moveToIndex = index; + } else { + LOG.debug( + "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}", + logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex); + } + } else { + LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping", + logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier()); + } + } } processNextPendingTransaction(); } + private void insertEntry(Deque queue, CommitEntry entry, int atIndex) { + if (atIndex == 0) { + queue.addFirst(entry); + return; + } + + LOG.trace("Inserting into Deque at index {}", atIndex); + + Deque tempStack = new ArrayDeque<>(atIndex); + for (int i = 0; i < atIndex; i++) { + tempStack.push(queue.poll()); + } + + queue.addFirst(entry); + + tempStack.forEach(queue::addFirst); + } + + private Collection extractPrecedingShardNames( + java.util.Optional> participatingShardNames) { + return participatingShardNames.map((Function, Collection>) + set -> set.headSet(shard.getShardName())).orElse(Collections.emptyList()); + } + private void failPreCommit(final Throwable cause) { shard.getShardMBean().incrementFailedTransactionsCount(); pendingTransactions.poll().cohort.failedPreCommit(cause); @@ -950,22 +1054,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final java.util.Optional> participatingShardNames) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable), - COMMIT_STEP_TIMEOUT)); + COMMIT_STEP_TIMEOUT), participatingShardNames); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics // the newReadWriteTransaction() - ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final java.util.Optional> participatingShardNames) { if (txId.getHistoryId().getHistoryId() == 0) { - return createReadyCohort(txId, mod); + return createReadyCohort(txId, mod, participatingShardNames); } - return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod); + return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames); } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index d989cbf50c..581768c0ed 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -12,6 +12,8 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -43,6 +45,8 @@ public abstract class ShardDataTreeCohort implements Identifiable> getParticipatingShardNames(); + // FIXME: Should return rebased DataTreeCandidateTip @VisibleForTesting public abstract void canCommit(FutureCallback callback); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java index f2e9af3656..a774d647d3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import java.util.Optional; +import java.util.SortedSet; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -88,13 +90,14 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent } @Override - ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction, + final Optional> participatingShardNames) { Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction); // dataTree is finalizing ready the transaction, we just record it for the next // transaction in chain - final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction); + final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction, participatingShardNames); openTransaction = null; previousTx = transaction; LOG.debug("Committing transaction {}", transaction); @@ -125,7 +128,8 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { - return dataTree.createReadyCohort(txId, mod); + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final Optional> participatingShardNames) { + return dataTree.createReadyCohort(txId, mod, participatingShardNames); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java index 23fa0c286a..0db6f083ac 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -16,9 +18,11 @@ abstract class ShardDataTreeTransactionParent { abstract void abortTransaction(AbstractShardDataTreeTransaction transaction, Runnable callback); - abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); + abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction, + Optional> participatingShardNames); - abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier txId, DataTreeModification mod); + abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier txId, DataTreeModification mod, + Optional> participatingShardNames); abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier txId, DataTreeModification mod, Exception failure); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index dc77b290c9..b3f4b0b0d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -77,7 +77,7 @@ public class ShardWriteTransaction extends ShardTransaction { totalBatchedModificationsReceived, batched.getTotalMessagesSent())); } - readyTransaction(false, batched.isDoCommitOnReady(), batched.getVersion()); + readyTransaction(batched); } else { getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); } @@ -109,13 +109,13 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit, short clientTxVersion) { + private void readyTransaction(BatchedModifications batched) { TransactionIdentifier transactionID = getTransactionId(); LOG.debug("readyTransaction : {}", transactionID); - getShardActor().forward(new ForwardedReadyTransaction(transactionID, clientTxVersion, - transaction, doImmediateCommit), getContext()); + getShardActor().forward(new ForwardedReadyTransaction(transactionID, batched.getVersion(), + transaction, batched.isDoCommitOnReady(), batched.getParticipatingShardNames()), getContext()); // The shard will handle the commit from here so we're no longer needed - self-destruct. getSelf().tell(PoisonPill.getInstance(), getSelf()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index 3104cb4972..b5b49c2396 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -7,13 +7,17 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; import java.util.Optional; +import java.util.SortedSet; import java.util.concurrent.CompletionStage; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; @@ -28,6 +32,8 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { private final ShardDataTree dataTree; private final TransactionIdentifier transactionId; private final CompositeDataTreeCohort userCohorts; + @Nullable + private final SortedSet participatingShardNames; private State state = State.READY; private DataTreeCandidateTip candidate; @@ -35,20 +41,23 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { private Exception nextFailure; SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction, - final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) { - this.dataTree = Preconditions.checkNotNull(dataTree); - this.transaction = Preconditions.checkNotNull(transaction); - this.transactionId = Preconditions.checkNotNull(transactionId); - this.userCohorts = Preconditions.checkNotNull(userCohorts); + final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts, + final Optional> participatingShardNames) { + this.dataTree = requireNonNull(dataTree); + this.transaction = requireNonNull(transaction); + this.transactionId = requireNonNull(transactionId); + this.userCohorts = requireNonNull(userCohorts); + this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null); } SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction, final TransactionIdentifier transactionId, final Exception nextFailure) { - this.dataTree = Preconditions.checkNotNull(dataTree); - this.transaction = Preconditions.checkNotNull(transaction); - this.transactionId = Preconditions.checkNotNull(transactionId); + this.dataTree = requireNonNull(dataTree); + this.transaction = requireNonNull(transaction); + this.transactionId = requireNonNull(transactionId); this.userCohorts = null; - this.nextFailure = Preconditions.checkNotNull(nextFailure); + this.participatingShardNames = null; + this.nextFailure = requireNonNull(nextFailure); } @Override @@ -66,8 +75,14 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { return transaction; } + @Override + Optional> getParticipatingShardNames() { + return Optional.ofNullable(participatingShardNames); + } + private void checkState(final State expected) { - Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected); + Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s", + state, expected, getIdentifier()); } @Override @@ -77,7 +92,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { } checkState(State.READY); - this.callback = Preconditions.checkNotNull(newCallback); + this.callback = requireNonNull(newCallback); state = State.CAN_COMMIT_PENDING; if (nextFailure == null) { @@ -90,7 +105,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { @Override public void preCommit(final FutureCallback newCallback) { checkState(State.CAN_COMMIT_COMPLETE); - this.callback = Preconditions.checkNotNull(newCallback); + this.callback = requireNonNull(newCallback); state = State.PRE_COMMIT_PENDING; if (nextFailure == null) { @@ -128,7 +143,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { @Override public void commit(final FutureCallback newCallback) { checkState(State.PRE_COMMIT_COMPLETE); - this.callback = Preconditions.checkNotNull(newCallback); + this.callback = requireNonNull(newCallback); state = State.COMMIT_PENDING; if (nextFailure == null) { @@ -257,7 +272,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { void reportFailure(final Exception cause) { if (nextFailure == null) { - this.nextFailure = Preconditions.checkNotNull(cause); + this.nextFailure = requireNonNull(cause); } else { LOG.debug("Transaction {} already has a set failure, not updating it", transactionId, cause); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java index 7fd53c7b7c..3d4e373f33 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java @@ -14,6 +14,8 @@ import com.google.common.collect.TreeRangeSet; import com.google.common.primitives.UnsignedLong; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; @@ -79,7 +81,8 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory { } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) { - return tree.createReadyCohort(id, mod); + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod, + final Optional> participatingShardNames) { + return tree.createReadyCohort(id, mod, participatingShardNames); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index fdcd30602b..90dcec238a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -216,7 +216,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< for (CohortInfo cohort : cohorts) { Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion()); - LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort); + LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor()); futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message, actorContext.getTransactionCommitOperationTimeout())); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index 543834c2cb..d9a53ab291 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import scala.concurrent.Future; @@ -20,7 +22,7 @@ import scala.concurrent.Future; interface TransactionContext { void closeTransaction(); - Future readyTransaction(Boolean havePermit); + Future readyTransaction(Boolean havePermit, Optional> participatingShardNames); void executeModification(AbstractModification modification, Boolean havePermit); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index d11ee3e0bd..38f55f300f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -15,6 +15,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedSet; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -194,18 +196,18 @@ class TransactionContextWrapper { } } - Future readyTransaction() { + Future readyTransaction(Optional> participatingShardNames) { // avoid the creation of a promise and a TransactionOperation final TransactionContext localContext = transactionContext; if (localContext != null) { - return localContext.readyTransaction(null); + return localContext.readyTransaction(null, participatingShardNames); } final Promise promise = Futures.promise(); enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) { - promise.completeWith(newTransactionContext.readyTransaction(havePermit)); + promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames)); } }); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 5a1cb6740d..b04dd29a58 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.DataExists; @@ -61,28 +63,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction txContextWrappers = new TreeMap<>(); private final AbstractTransactionContextFactory txContextFactory; private final TransactionType type; @@ -251,7 +231,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createMultiCommitCohort( - final Set> txContextWrapperEntries) { - - final List cohorts = new ArrayList<>(txContextWrapperEntries.size()); + private AbstractThreePhaseCommitCohort createMultiCommitCohort() { - synchronized (GLOBAL_TX_READY_LOCK) { - for (Entry e : txContextWrapperEntries) { - LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); + final List cohorts = new ArrayList<>(txContextWrappers.size()); + final java.util.Optional> shardNames = + java.util.Optional.of(new TreeSet<>(txContextWrappers.keySet())); + for (Entry e : txContextWrappers.entrySet()) { + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - final TransactionContextWrapper wrapper = e.getValue(); + final TransactionContextWrapper wrapper = e.getValue(); - // The remote tx version is obtained the via TransactionContext which may not be available yet so - // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the - // TransactionContext is available. - Supplier txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion(); + // The remote tx version is obtained the via TransactionContext which may not be available yet so + // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the + // TransactionContext is available. + Supplier txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion(); - cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier)); - } + cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames), + txVersionSupplier)); } return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts, getIdentifier()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index 6058e7e997..4e344cd688 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -237,7 +237,9 @@ class EntityOwnershipShardCommitCoordinator { BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(), toPrune.getVersion()); prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady()); - prunedModifications.setReady(toPrune.isReady()); + if (toPrune.isReady()) { + prunedModifications.setReady(toPrune.getParticipatingShardNames()); + } prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent()); for (Modification mod: toPrune.getModifications()) { if (canForwardModificationToNewLeader(mod)) { @@ -275,7 +277,7 @@ class EntityOwnershipShardCommitCoordinator { BatchedModifications modifications = new BatchedModifications( new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION); modifications.setDoCommitOnReady(true); - modifications.setReady(true); + modifications.setReady(); modifications.setTotalMessagesSent(1); return modifications; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java index 3773beee57..b38cd873df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java @@ -7,11 +7,18 @@ */ package org.opendaylight.controller.cluster.datastore.messages; +import static java.util.Objects.requireNonNull; + import com.google.common.base.Preconditions; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; /** @@ -26,21 +33,33 @@ public class BatchedModifications extends MutableCompositeModification { private boolean doCommitOnReady; private int totalMessagesSent; private TransactionIdentifier transactionId; + @Nullable + private SortedSet participatingShardNames; public BatchedModifications() { } public BatchedModifications(TransactionIdentifier transactionId, short version) { super(version); - this.transactionId = Preconditions.checkNotNull(transactionId, "transactionID can't be null"); + this.transactionId = requireNonNull(transactionId, "transactionID can't be null"); } public boolean isReady() { return ready; } - public void setReady(boolean ready) { - this.ready = ready; + public void setReady(Optional> possibleParticipatingShardNames) { + this.ready = true; + this.participatingShardNames = requireNonNull(possibleParticipatingShardNames).orElse(null); + Preconditions.checkArgument(this.participatingShardNames == null || this.participatingShardNames.size() > 1); + } + + public void setReady() { + setReady(Optional.empty()); + } + + public Optional> getParticipatingShardNames() { + return Optional.ofNullable(participatingShardNames); } public boolean isDoCommitOnReady() { @@ -63,7 +82,6 @@ public class BatchedModifications extends MutableCompositeModification { return transactionId; } - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); @@ -71,6 +89,18 @@ public class BatchedModifications extends MutableCompositeModification { ready = in.readBoolean(); totalMessagesSent = in.readInt(); doCommitOnReady = in.readBoolean(); + + if (getVersion() >= DataStoreVersions.FLUORINE_VERSION) { + final int count = in.readInt(); + if (count != 0) { + SortedSet shardNames = new TreeSet<>(); + for (int i = 0; i < count; i++) { + shardNames.add((String) in.readObject()); + } + + participatingShardNames = shardNames; + } + } } @Override @@ -80,12 +110,24 @@ public class BatchedModifications extends MutableCompositeModification { out.writeBoolean(ready); out.writeInt(totalMessagesSent); out.writeBoolean(doCommitOnReady); + + if (getVersion() >= DataStoreVersions.FLUORINE_VERSION) { + if (participatingShardNames != null) { + out.writeInt(participatingShardNames.size()); + for (String shardName: participatingShardNames) { + out.writeObject(shardName); + } + } else { + out.writeInt(0); + } + } } @Override public String toString() { return "BatchedModifications [transactionId=" + transactionId - + ", ready=" + ready + + ", ready=" + isReady() + + ", participatingShardNames=" + participatingShardNames + ", totalMessagesSent=" + totalMessagesSent + ", modifications size=" + getModifications().size() + "]"; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index 9cd5e66d06..529b7e2153 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -7,7 +7,11 @@ */ package org.opendaylight.controller.cluster.datastore.messages; -import com.google.common.base.Preconditions; +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.SortedSet; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.ReadWriteShardDataTreeTransaction; @@ -21,13 +25,17 @@ public class ForwardedReadyTransaction { private final ReadWriteShardDataTreeTransaction transaction; private final boolean doImmediateCommit; private final short txnClientVersion; + @Nullable + private final SortedSet participatingShardNames; public ForwardedReadyTransaction(TransactionIdentifier transactionId, short txnClientVersion, - ReadWriteShardDataTreeTransaction transaction, boolean doImmediateCommit) { - this.transactionId = Preconditions.checkNotNull(transactionId); - this.transaction = Preconditions.checkNotNull(transaction); + ReadWriteShardDataTreeTransaction transaction, boolean doImmediateCommit, + Optional> participatingShardNames) { + this.transactionId = requireNonNull(transactionId); + this.transaction = requireNonNull(transaction); this.txnClientVersion = txnClientVersion; this.doImmediateCommit = doImmediateCommit; + this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null); } public TransactionIdentifier getTransactionId() { @@ -46,9 +54,14 @@ public class ForwardedReadyTransaction { return doImmediateCommit; } + public Optional> getParticipatingShardNames() { + return Optional.ofNullable(participatingShardNames); + } + @Override public String toString() { - return "ForwardedReadyTransaction [transactionId=" + transactionId + ", doImmediateCommit=" + doImmediateCommit + return "ForwardedReadyTransaction [transactionId=" + transactionId + ", transaction=" + transaction + + ", doImmediateCommit=" + doImmediateCommit + ", participatingShardNames=" + participatingShardNames + ", txnClientVersion=" + txnClientVersion + "]"; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java index 2664fc17bc..bff6ea8620 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java @@ -7,7 +7,11 @@ */ package org.opendaylight.controller.cluster.datastore.messages; -import com.google.common.base.Preconditions; +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.SortedSet; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -23,15 +27,18 @@ public final class ReadyLocalTransaction { private final DataTreeModification modification; private final TransactionIdentifier transactionId; private final boolean doCommitOnReady; + @Nullable + private final SortedSet participatingShardNames; // The version of the remote system used only when needing to convert to BatchedModifications. private short remoteVersion = DataStoreVersions.CURRENT_VERSION; public ReadyLocalTransaction(final TransactionIdentifier transactionId, final DataTreeModification modification, - final boolean doCommitOnReady) { - this.transactionId = Preconditions.checkNotNull(transactionId); - this.modification = Preconditions.checkNotNull(modification); + final boolean doCommitOnReady, Optional> participatingShardNames) { + this.transactionId = requireNonNull(transactionId); + this.modification = requireNonNull(modification); this.doCommitOnReady = doCommitOnReady; + this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null); } public TransactionIdentifier getTransactionId() { @@ -53,4 +60,8 @@ public final class ReadyLocalTransaction { public void setRemoteVersion(short remoteVersion) { this.remoteVersion = remoteVersion; } + + public Optional> getParticipatingShardNames() { + return Optional.ofNullable(participatingShardNames); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java index 60a85b9a0b..53731fa491 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java @@ -48,7 +48,7 @@ public final class ReadyLocalTransactionSerializer extends JSerializer { readyLocal.getRemoteVersion()); batched.setDoCommitOnReady(readyLocal.isDoCommitOnReady()); batched.setTotalMessagesSent(1); - batched.setReady(true); + batched.setReady(readyLocal.getParticipatingShardNames()); readyLocal.getModification().applyToCursor(new BatchedCursor(batched)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index f2e0e0a598..b289b6b160 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.SortedSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -265,7 +266,7 @@ public abstract class AbstractShardTest extends AbstractActorTest { final boolean doCommitOnReady) { final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION); batchedModifications.addModification(modification); - batchedModifications.setReady(true); + batchedModifications.setReady(); batchedModifications.setDoCommitOnReady(doCommitOnReady); batchedModifications.setTotalMessagesSent(1); return batchedModifications; @@ -284,7 +285,7 @@ public abstract class AbstractShardTest extends AbstractActorTest { ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore() .newReadWriteTransaction(transactionID); rwTx.getSnapshot().write(path, data); - return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady); + return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady, Optional.empty()); } public static NormalizedNode readStore(final TestActorRef shard, @@ -330,7 +331,7 @@ public abstract class AbstractShardTest extends AbstractActorTest { final NormalizedNode node) throws DataValidationFailedException { final BatchedModifications batched = new BatchedModifications(nextTransactionId(), CURRENT_VERSION); batched.addModification(new MergeModification(id, node)); - batched.setReady(true); + batched.setReady(); batched.setDoCommitOnReady(true); batched.setTotalMessagesSent(1); @@ -366,12 +367,24 @@ public abstract class AbstractShardTest extends AbstractActorTest { final boolean doCommitOnReady, final int messagesSent) { final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION); batched.addModification(new WriteModification(path, data)); - batched.setReady(ready); + if (ready) { + batched.setReady(); + } batched.setDoCommitOnReady(doCommitOnReady); batched.setTotalMessagesSent(messagesSent); return batched; } + static BatchedModifications newReadyBatchedModifications(final TransactionIdentifier transactionID, + final YangInstanceIdentifier path, final NormalizedNode data, + final SortedSet participatingShardNames) { + final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION); + batched.addModification(new WriteModification(path, data)); + batched.setReady(Optional.of(participatingShardNames)); + batched.setTotalMessagesSent(1); + return batched; + } + @SuppressWarnings("unchecked") static void verifyOuterListEntry(final TestActorRef shard, final Object expIDValue) throws Exception { final NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); @@ -519,5 +532,10 @@ public abstract class AbstractShardTest extends AbstractActorTest { public State getState() { return delegate.getState(); } + + @Override + Optional> getParticipatingShardNames() { + return delegate.getParticipatingShardNames(); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index 0d35a17598..a998fbffcb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -98,10 +98,10 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest { private final Configuration configuration = new MockConfiguration() { Map strategyMap = ImmutableMap.builder().put( - "junk", new ShardStrategy() { + TestModel.JUNK_QNAME.getLocalName(), new ShardStrategy() { @Override public String findShard(final YangInstanceIdentifier path) { - return "junk"; + return TestModel.JUNK_QNAME.getLocalName(); } @Override @@ -109,10 +109,10 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest { return YangInstanceIdentifier.EMPTY; } }).put( - "cars", new ShardStrategy() { + CarsModel.BASE_QNAME.getLocalName(), new ShardStrategy() { @Override public String findShard(final YangInstanceIdentifier path) { - return "cars"; + return CarsModel.BASE_QNAME.getLocalName(); } @Override @@ -129,9 +129,9 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest { @Override public String getModuleNameFromNameSpace(final String nameSpace) { if (TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) { - return "junk"; + return TestModel.JUNK_QNAME.getLocalName(); } else if (CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)) { - return "cars"; + return CarsModel.BASE_QNAME.getLocalName(); } return null; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index aaab6b19e1..4f0f9297c9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -653,7 +653,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); modification.ready(); - ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true); + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, + java.util.Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -672,7 +673,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); modification.ready(); - readyLocal = new ReadyLocalTransaction(tx2 , modification, false); + readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -721,7 +722,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true); + Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true, + java.util.Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -741,7 +743,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false); + Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false, + java.util.Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransactionTest.java index c3c62067d2..847b24b92e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransactionTest.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -17,6 +18,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import akka.actor.ActorRef; +import java.util.Optional; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; @@ -59,7 +61,7 @@ public class FrontendReadWriteTransactionTest { shardTransaction = new ReadWriteShardDataTreeTransaction(mockParent, TX_ID, mockModification); openTx = FrontendReadWriteTransaction.createOpen(mockHistory, shardTransaction); - when(mockParent.finishTransaction(same(shardTransaction))).thenReturn(mockCohort); + when(mockParent.finishTransaction(same(shardTransaction), eq(Optional.empty()))).thenReturn(mockCohort); } private TransactionSuccess handleRequest(final TransactionRequest request) throws RequestException { @@ -87,7 +89,7 @@ public class FrontendReadWriteTransactionTest { final TransactionRequest readyReq = b.build(); assertNotNull(handleRequest(readyReq)); - verify(mockParent).finishTransaction(same(shardTransaction)); + verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty())); assertNotNull(handleRequest(readyReq)); verifyNoMoreInteractions(mockParent); @@ -101,7 +103,7 @@ public class FrontendReadWriteTransactionTest { final TransactionRequest readyReq = b.build(); assertNull(handleRequest(readyReq)); - verify(mockParent).finishTransaction(same(shardTransaction)); + verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty())); assertNull(handleRequest(readyReq)); verifyNoMoreInteractions(mockParent); @@ -115,7 +117,7 @@ public class FrontendReadWriteTransactionTest { final TransactionRequest readyReq = b.build(); assertNull(handleRequest(readyReq)); - verify(mockParent).finishTransaction(same(shardTransaction)); + verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty())); assertNull(handleRequest(readyReq)); verifyNoMoreInteractions(mockParent); @@ -129,7 +131,7 @@ public class FrontendReadWriteTransactionTest { final TransactionRequest readyReq = b.build(); assertNotNull(handleRequest(readyReq)); - verify(mockParent).finishTransaction(same(shardTransaction)); + verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty())); handleRequest(new ReadTransactionRequest(TX_ID, 0, mock(ActorRef.class), YangInstanceIdentifier.EMPTY, true)); } @@ -142,7 +144,7 @@ public class FrontendReadWriteTransactionTest { final TransactionRequest readyReq = b.build(); assertNotNull(handleRequest(readyReq)); - verify(mockParent).finishTransaction(same(shardTransaction)); + verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty())); b.setSequence(1); b.addModification(mock(TransactionModification.class)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java index 0df870cba3..1fb2f8eea5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java @@ -112,10 +112,11 @@ public class LocalTransactionContextTest { @Test public void testReady() { final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class); - doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(); + doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit( + java.util.Optional.empty()); doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null); - Future future = localTransactionContext.readyTransaction(null); + Future future = localTransactionContext.readyTransaction(null, java.util.Optional.empty()); assertTrue(future.isCompleted()); verify(mockReadySupport).onTransactionReady(readWriteTransaction, null); @@ -171,9 +172,10 @@ public class LocalTransactionContextTest { private void doReadyWithExpectedError(final RuntimeException expError) { LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class); - doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(); + doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit( + java.util.Optional.empty()); doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError); - localTransactionContext.readyTransaction(null); + localTransactionContext.readyTransaction(null, java.util.Optional.empty()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java new file mode 100644 index 0000000000..5239ff785c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java @@ -0,0 +1,563 @@ +/* + * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertNotNull; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.ID_QNAME; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.NAME_QNAME; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_QNAME; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerMapPath; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerMapNode; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode; + +import akka.dispatch.Dispatchers; +import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; +import com.google.common.collect.ImmutableSortedSet; +import java.util.SortedSet; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +/** + * Unit tests for various 3PC coordination scenarios. + * + * @author Thomas Pantelis + */ +public class ShardCommitCoordinationTest extends AbstractShardTest { + private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTest.class); + + /** + * Test 2 tx's accessing the same shards. + *
+     *   tx1 -> shard A, shard B
+     *   tx2 -> shard A, shard B
+     * 
+ * The tx's are readied such the pendingTransactions queue are as follows: + *
+     *   Queue for shard A -> tx1, tx2
+     *   Queue for shard B -> tx2, tx1
+     * 
+ * This is a potential deadlock scenario (ABBA) which should be avoided by allowing tx1 to proceed on shard B + * even though it isn't at the head of the queues. + */ + @Test + public void testTwoTransactionsWithSameTwoParticipatingShards() throws Exception { + final String testName = "testTwoTransactionsWithSameTwoParticipatingShards"; + LOG.info("{} starting", testName); + + final TestKit kit1 = new TestKit(getSystem()); + final TestKit kit2 = new TestKit(getSystem()); + + final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config"); + final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config"); + + final TestActorRef shardA = actorFactory.createTestActor( + newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardA); + + final TestActorRef shardB = actorFactory.createTestActor( + newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardB); + + final TransactionIdentifier txId1 = nextTransactionId(); + final TransactionIdentifier txId2 = nextTransactionId(); + + SortedSet participatingShardNames = ImmutableSortedSet.of(shardAId.getShardName(), + shardBId.getShardName()); + + // Ready [tx1, tx2] on shard A. + + shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef()); + kit1.expectMsgClass(ReadyTransactionReply.class); + + shardA.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1), + participatingShardNames), kit2.getRef()); + kit2.expectMsgClass(ReadyTransactionReply.class); + + // Ready [tx2, tx1] on shard B. + + shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1), + participatingShardNames), kit2.getRef()); + kit2.expectMsgClass(ReadyTransactionReply.class); + + shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef()); + kit1.expectMsgClass(ReadyTransactionReply.class); + + // Send tx2 CanCommit to A - tx1 is at the head of the queue so tx2 should not proceed as A is the first shard + // in the participating shard list. + + shardA.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + + // Send tx1 CanCommit to A - it's at the head of the queue so should proceed. + + shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CanCommitTransactionReply.class); + + // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating + // shard list [A] matches that of tx2 [A] so tx1 should be de-queued and allowed to proceed. + + shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CanCommitTransactionReply.class); + + // Send tx2 CanCommit to B - tx1 should now be at the head of he queue. + + shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + + // Finish commit of tx1. + + shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CommitTransactionReply.class); + + shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CommitTransactionReply.class); + + // Finish commit of tx2. + + kit2.expectMsgClass(CanCommitTransactionReply.class); + kit2.expectMsgClass(CanCommitTransactionReply.class); + + shardA.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CommitTransactionReply.class); + + shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CommitTransactionReply.class); + + // Verify data in the data store. + + verifyOuterListEntry(shardA, 1); + verifyOuterListEntry(shardB, 1); + + LOG.info("{} ending", testName); + } + + /** + * Test multiple tx's accessing a mix of same and differing shards. + *
+     *   tx1 -> shard X, shard B
+     *   tx2 -> shard X, shard B
+     *   tx3 -> shard A, shard B
+     *   tx4 -> shard A, shard B
+     *   tx5 -> shard A, shard B
+     * 
+ * The tx's are readied such the pendingTransactions queue are as follows: + *
+     *   Queue for shard A -> tx3, tx4, tx5
+     *   Queue for shard B -> tx1, tx2, tx5, tx4, tx3
+     * 
+ * Note: shard X means any other shard which isn't relevant for the test. + * This is a potential deadlock scenario (ABBA) which should be avoided by moving tx3 ahead of tx5 on shard B when + * CanCommit is requested. + */ + @Test + public void testMultipleTransactionsWithMixedParticipatingShards() throws Exception { + final String testName = "testMultipleTransactionsWithMixedParticipatingShards"; + LOG.info("{} starting", testName); + + final TestKit kit1 = new TestKit(getSystem()); + final TestKit kit2 = new TestKit(getSystem()); + final TestKit kit3 = new TestKit(getSystem()); + final TestKit kit4 = new TestKit(getSystem()); + final TestKit kit5 = new TestKit(getSystem()); + + final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config"); + final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config"); + + final TestActorRef shardA = actorFactory.createTestActor( + newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardA); + + final TestActorRef shardB = actorFactory.createTestActor( + newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardB); + + final TransactionIdentifier txId1 = nextTransactionId(); + final TransactionIdentifier txId2 = nextTransactionId(); + final TransactionIdentifier txId3 = nextTransactionId(); + final TransactionIdentifier txId4 = nextTransactionId(); + final TransactionIdentifier txId5 = nextTransactionId(); + + final SortedSet participatingShardNames1 = ImmutableSortedSet.of(shardAId.getShardName(), + shardBId.getShardName()); + final SortedSet participatingShardNames2 = ImmutableSortedSet.of("shardX", shardBId.getShardName()); + + // Ready [tx3, tx4, tx5] on shard A. + + shardA.tell(newReadyBatchedModifications(txId3, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit3.getRef()); + kit3.expectMsgClass(ReadyTransactionReply.class); + + shardA.tell(newReadyBatchedModifications(txId4, OUTER_LIST_PATH, outerMapNode(), + participatingShardNames1), kit4.getRef()); + kit4.expectMsgClass(ReadyTransactionReply.class); + + shardA.tell(newReadyBatchedModifications(txId5, outerEntryPath(1), + ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit5.getRef()); + kit5.expectMsgClass(ReadyTransactionReply.class); + + // Ready [tx1, tx2, tx5, tx4, tx3] on shard B. + + shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit1.getRef()); + kit1.expectMsgClass(ReadyTransactionReply.class); + + shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerMapNode(), + participatingShardNames2), kit2.getRef()); + kit2.expectMsgClass(ReadyTransactionReply.class); + + shardB.tell(newReadyBatchedModifications(txId5, innerEntryPath(1, "one"), + ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, "one"), participatingShardNames1), kit5.getRef()); + kit5.expectMsgClass(ReadyTransactionReply.class); + + shardB.tell(newReadyBatchedModifications(txId4, innerMapPath(1), innerNode(), + participatingShardNames1), kit4.getRef()); + kit4.expectMsgClass(ReadyTransactionReply.class); + + shardB.tell(newReadyBatchedModifications(txId3, outerEntryPath(1), + ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit3.getRef()); + kit3.expectMsgClass(ReadyTransactionReply.class); + + // Send tx3 CanCommit to A - it's at the head of the queue so should proceed. + + shardA.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef()); + kit3.expectMsgClass(CanCommitTransactionReply.class); + + // Send tx1 CanCommit to B - it's at the head of the queue so should proceed. + + shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CanCommitTransactionReply.class); + + // Send tx3 CanCommit to B - tx1 is at the head of the queue but the preceding shards in tx3's participating + // shard list [A] matches that of tx5 so tx3 should be moved ahead of tx5 in the queue. + + shardB.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef()); + kit3.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + + // Send tx4 CanCommit to B - tx4's participating shard list [A] matches that of tx3 and tx5 - so tx4 should + // be moved ahead of tx5 in the queue but not tx3 since should be in the CAN_COMMIT_PENDING state. + + shardB.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef()); + kit4.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + + // Send tx5 CanCommit to B - it's position in the queue should remain the same. + + shardB.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef()); + kit5.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + + // Finish commit of tx1. + + shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CommitTransactionReply.class); + + // Finish commit of tx2. + + shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CanCommitTransactionReply.class); + + shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CommitTransactionReply.class); + + // Finish commit of tx3. + + // From shard B + kit3.expectMsgClass(CanCommitTransactionReply.class); + + shardA.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef()); + kit3.expectMsgClass(CommitTransactionReply.class); + + shardB.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef()); + kit3.expectMsgClass(CommitTransactionReply.class); + + // Finish commit of tx4. + + // From shard B + kit4.expectMsgClass(CanCommitTransactionReply.class); + + shardA.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef()); + kit4.expectMsgClass(CanCommitTransactionReply.class); + shardA.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef()); + kit4.expectMsgClass(CommitTransactionReply.class); + + shardB.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef()); + kit4.expectMsgClass(CommitTransactionReply.class); + + // Finish commit of tx5. + + // From shard B + kit5.expectMsgClass(CanCommitTransactionReply.class); + + shardA.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef()); + kit5.expectMsgClass(CanCommitTransactionReply.class); + shardA.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef()); + kit5.expectMsgClass(CommitTransactionReply.class); + + shardB.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef()); + kit5.expectMsgClass(CommitTransactionReply.class); + + verifyOuterListEntry(shardA, 1); + verifyInnerListEntry(shardB, 1, "one"); + + LOG.info("{} ending", testName); + } + + /** + * Test 2 tx's accessing 2 shards, the second in common. + *
+     *   tx1 -> shard A, shard C
+     *   tx2 -> shard B, shard C
+     * 
+ * The tx's are readied such the pendingTransactions queue are as follows: + *
+     *   Queue for shard A -> tx1
+     *   Queue for shard B -> tx2
+     *   Queue for shard C -> tx2, tx1
+     * 
+ * When the tx's re committed verify the ready order is preserved. + */ + @Test + public void testTwoTransactionsWithOneCommonParticipatingShard1() throws Exception { + final String testName = "testTwoTransactionsWithOneCommonParticipatingShard1"; + LOG.info("{} starting", testName); + + final TestKit kit1 = new TestKit(getSystem()); + final TestKit kit2 = new TestKit(getSystem()); + + final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config"); + final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config"); + final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config"); + + final TestActorRef shardA = actorFactory.createTestActor( + newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardA); + + final TestActorRef shardB = actorFactory.createTestActor( + newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardB); + + final TestActorRef shardC = actorFactory.createTestActor( + newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardC); + + final TransactionIdentifier txId1 = nextTransactionId(); + final TransactionIdentifier txId2 = nextTransactionId(); + + SortedSet participatingShardNames1 = + ImmutableSortedSet.of(shardAId.getShardName(), shardCId.getShardName()); + SortedSet participatingShardNames2 = + ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName()); + + // Ready [tx1] on shard A. + + shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef()); + kit1.expectMsgClass(ReadyTransactionReply.class); + + // Ready [tx2] on shard B. + + shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef()); + kit2.expectMsgClass(ReadyTransactionReply.class); + + // Ready [tx2, tx1] on shard C. + + shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef()); + kit2.expectMsgClass(ReadyTransactionReply.class); + + shardC.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1), + participatingShardNames1), kit1.getRef()); + kit1.expectMsgClass(ReadyTransactionReply.class); + + // Send tx1 CanCommit to A - should succeed. + + shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CanCommitTransactionReply.class); + + // Send tx2 CanCommit to B - should succeed. + + shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CanCommitTransactionReply.class); + + // Send tx1 CanCommit to C - tx2 is at the head of the queue but the preceding shards in tx1's participating + // shard list [A] do not match that of tx2 [B] so tx1 should not be allowed to proceed. + + shardC.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + + // Send tx2 CanCommit to C - it's at the head of the queue so should proceed. + + shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CanCommitTransactionReply.class); + + // Finish commit of tx2. + + shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CommitTransactionReply.class); + + shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CommitTransactionReply.class); + + // Finish commit of tx1. + + kit1.expectMsgClass(CanCommitTransactionReply.class); + shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CommitTransactionReply.class); + + shardC.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CommitTransactionReply.class); + + // Verify data in the data store. + + verifyOuterListEntry(shardC, 1); + + LOG.info("{} ending", testName); + } + + /** + * Test 2 tx's accessing 2 shards, the first for one and the second for the other in common. + *
+     *   tx1 -> shard A, shard B
+     *   tx2 -> shard B, shard C
+     * 
+ * The tx's are readied such the pendingTransactions queue are as follows: + *
+     *   Queue for shard A -> tx1
+     *   Queue for shard B -> tx2, tx1
+     *   Queue for shard C -> tx2
+     * 
+ * When the tx's re committed verify the ready order is preserved. + */ + @Test + public void testTwoTransactionsWithOneCommonParticipatingShard2() throws Exception { + final String testName = "testTwoTransactionsWithOneCommonParticipatingShard2"; + LOG.info("{} starting", testName); + + final TestKit kit1 = new TestKit(getSystem()); + final TestKit kit2 = new TestKit(getSystem()); + + final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config"); + final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config"); + final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config"); + + final TestActorRef shardA = actorFactory.createTestActor( + newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardA); + + final TestActorRef shardB = actorFactory.createTestActor( + newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardB); + + final TestActorRef shardC = actorFactory.createTestActor( + newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + ShardTestKit.waitUntilLeader(shardC); + + final TransactionIdentifier txId1 = nextTransactionId(); + final TransactionIdentifier txId2 = nextTransactionId(); + + SortedSet participatingShardNames1 = + ImmutableSortedSet.of(shardAId.getShardName(), shardBId.getShardName()); + SortedSet participatingShardNames2 = + ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName()); + + // Ready [tx1] on shard A. + + shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef()); + kit1.expectMsgClass(ReadyTransactionReply.class); + + // Ready [tx2, tx1] on shard B. + + shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef()); + kit2.expectMsgClass(ReadyTransactionReply.class); + + shardB.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1), + participatingShardNames1), kit1.getRef()); + kit1.expectMsgClass(ReadyTransactionReply.class); + + // Ready [tx2] on shard C. + + shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH, + ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef()); + kit2.expectMsgClass(ReadyTransactionReply.class); + + // Send tx1 CanCommit to A - should succeed. + + shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CanCommitTransactionReply.class); + + // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating + // shard list [A] do not match that of tx2 [] so tx1 should not be allowed to proceed. + + shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + + // Send tx2 CanCommit to B - it's at the head of the queue so should proceed. + + shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CanCommitTransactionReply.class); + + // Finish commit of tx2. + + shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CanCommitTransactionReply.class); + + shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CommitTransactionReply.class); + + shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef()); + kit2.expectMsgClass(CommitTransactionReply.class); + + // Finish commit of tx1. + + kit1.expectMsgClass(CanCommitTransactionReply.class); + shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CommitTransactionReply.class); + + shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef()); + kit1.expectMsgClass(CommitTransactionReply.class); + + // Verify data in the data store. + + verifyOuterListEntry(shardB, 1); + + LOG.info("{} ending", testName); + } + + static void verifyInnerListEntry(TestActorRef shard, int outerID, String innerID) + throws Exception { + final YangInstanceIdentifier path = innerEntryPath(outerID, innerID); + final NormalizedNode innerListEntry = readStore(shard, path); + assertNotNull(path + " not found", innerListEntry); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java index 6e060f027f..eb829f116f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java @@ -113,7 +113,7 @@ public class ShardDataTreeTest extends AbstractTest { snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()); } - final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); + final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty()); immediateCanCommit(cohort); immediatePreCommit(cohort); @@ -482,7 +482,7 @@ public class ShardDataTreeTest extends AbstractTest { shardDataTree.newReadWriteTransaction(nextTransactionId()); final DataTreeModification snapshot = transaction.getSnapshot(); operation.execute(snapshot); - return shardDataTree.finishTransaction(transaction); + return shardDataTree.finishTransaction(transaction, Optional.empty()); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -541,7 +541,7 @@ public class ShardDataTreeTest extends AbstractTest { shardDataTree.newReadWriteTransaction(nextTransactionId()); final DataTreeModification snapshot = transaction.getSnapshot(); operation.execute(snapshot); - final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); + final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty()); immediateCanCommit(cohort); immediatePreCommit(cohort); @@ -559,7 +559,7 @@ public class ShardDataTreeTest extends AbstractTest { for (final DataTreeCandidate candidateTip : candidates) { DataTreeCandidates.applyToModification(snapshot, candidateTip); } - final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); + final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty()); immediateCanCommit(cohort); immediatePreCommit(cohort); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index cca0bffaa8..e8f7e32310 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -671,7 +672,7 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier transactionID = nextTransactionId(); final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION); - batched.setReady(true); + batched.setReady(); batched.setTotalMessagesSent(2); shard.tell(batched, getRef()); @@ -714,7 +715,7 @@ public class ShardTest extends AbstractShardTest { final Throwable cause = failure.cause(); batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION); - batched.setReady(true); + batched.setReady(); batched.setTotalMessagesSent(2); shard.tell(batched, getRef()); @@ -845,7 +846,8 @@ public class ShardTest extends AbstractShardTest { failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); - shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef()); + shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()), + getRef()); failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); } @@ -910,7 +912,8 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier txId = nextTransactionId(); modification.ready(); - final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true); + final ReadyLocalTransaction readyMessage = + new ReadyLocalTransaction(txId, modification, true, Optional.empty()); shard.tell(readyMessage, getRef()); @@ -943,7 +946,8 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier txId = nextTransactionId(); modification.ready(); - final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false); + final ReadyLocalTransaction readyMessage = + new ReadyLocalTransaction(txId, modification, false, Optional.empty()); shard.tell(readyMessage, getRef()); @@ -1571,7 +1575,7 @@ public class ShardTest extends AbstractShardTest { .apply(modification3); modification3.ready(); final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, - true); + true, Optional.empty()); shard.tell(readyMessage, getRef()); // Commit the first Tx. After completing, the second should diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index f3acf29e7a..0c894a38fb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -244,7 +244,7 @@ public class ShardTransactionTest extends AbstractActorTest { assertEquals("getNumBatched", 1, reply.getNumBatched()); batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); - batched.setReady(true); + batched.setReady(); batched.setTotalMessagesSent(2); transaction.tell(batched, getRef()); @@ -272,7 +272,7 @@ public class ShardTransactionTest extends AbstractActorTest { BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION); batched.addModification(new WriteModification(writePath, writeData)); - batched.setReady(true); + batched.setReady(); batched.setDoCommitOnReady(true); batched.setTotalMessagesSent(1); @@ -311,7 +311,7 @@ public class ShardTransactionTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); - batched.setReady(true); + batched.setReady(); batched.setTotalMessagesSent(2); transaction.tell(batched, getRef()); @@ -339,7 +339,7 @@ public class ShardTransactionTest extends AbstractActorTest { BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION); - batched.setReady(true); + batched.setReady(); batched.setTotalMessagesSent(2); transaction.tell(batched, getRef()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java index 4550894fed..be19a3d0da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java @@ -60,7 +60,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { doReturn(Optional.empty()).when(mockUserCohorts).abort(); cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(), - mockUserCohorts); + mockUserCohorts, Optional.empty()); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 5fbf87827c..4fa4fcd1dd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -31,6 +31,7 @@ import akka.dispatch.Futures; import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; @@ -38,12 +39,14 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collection; import java.util.List; +import java.util.SortedSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.MemberName; @@ -57,6 +60,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; @@ -524,22 +528,59 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { public void testReadyWithMultipleShardWrites() throws Exception { ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); - ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk"); + ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, + TestModel.JUNK_QNAME.getLocalName()); expectBatchedModificationsReady(actorRef1); expectBatchedModificationsReady(actorRef2); + ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class)); + + doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext) + .actorSelection(actorRef3.path().toString()); + + doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext) + .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName())); + + expectReadyLocalTransaction(actorRef3, false); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME)); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1), - actorSelection(actorRef2)); + actorSelection(actorRef2), actorSelection(actorRef3)); + + SortedSet expShardNames = + ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD, + TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName()); + + ArgumentCaptor batchedMods = ArgumentCaptor.forClass(BatchedModifications.class); + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class)); + assertEquals("Participating shards present", true, + batchedMods.getValue().getParticipatingShardNames().isPresent()); + assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get()); + + batchedMods = ArgumentCaptor.forClass(BatchedModifications.class); + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class)); + assertEquals("Participating shards present", true, + batchedMods.getValue().getParticipatingShardNames().isPresent()); + assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get()); + + ArgumentCaptor readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class); + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class)); + assertEquals("Participating shards present", true, + readyLocalTx.getValue().getParticipatingShardNames().isPresent()); + assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get()); } @Test @@ -657,6 +698,12 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof SingleCommitCohortProxy); verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); + + ArgumentCaptor readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class); + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class)); + assertEquals("Participating shards present", false, + readyLocalTx.getValue().getParticipatingShardNames().isPresent()); } @Test @@ -725,7 +772,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); - ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk"); + ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, + TestModel.JUNK_QNAME.getLocalName()); doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java index 1d4cc3dc71..b01c07213e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java @@ -9,7 +9,10 @@ package org.opendaylight.controller.cluster.datastore.messages; import static org.junit.Assert.assertEquals; +import com.google.common.collect.ImmutableSortedSet; import java.io.Serializable; +import java.util.Optional; +import java.util.SortedSet; import org.apache.commons.lang.SerializationUtils; import org.junit.Test; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -49,7 +52,9 @@ public class BatchedModificationsTest extends AbstractTest { batched.addModification(new WriteModification(writePath, writeData)); batched.addModification(new MergeModification(mergePath, mergeData)); batched.addModification(new DeleteModification(deletePath)); - batched.setReady(true); + assertEquals("isReady", false, batched.isReady()); + batched.setReady(); + assertEquals("isReady", true, batched.isReady()); batched.setTotalMessagesSent(5); BatchedModifications clone = (BatchedModifications) SerializationUtils.clone( @@ -58,6 +63,8 @@ public class BatchedModificationsTest extends AbstractTest { assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion()); assertEquals("getTransactionID", tx1, clone.getTransactionId()); assertEquals("isReady", true, clone.isReady()); + assertEquals("isDoCommitOnReady", false, clone.isDoCommitOnReady()); + assertEquals("participatingShardNames present", false, clone.getParticipatingShardNames().isPresent()); assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent()); assertEquals("getModifications size", 3, clone.getModifications().size()); @@ -76,18 +83,49 @@ public class BatchedModificationsTest extends AbstractTest { assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion()); assertEquals("getPath", deletePath, delete.getPath()); - // Test with different params. + // Test with participating shard names. + final TransactionIdentifier tx2 = nextTransactionId(); batched = new BatchedModifications(tx2, (short)10000); + final SortedSet shardNames = ImmutableSortedSet.of("one", "two"); + batched.setReady(Optional.of(shardNames)); + batched.setDoCommitOnReady(true); + assertEquals("isReady", true, batched.isReady()); clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable()); assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion()); assertEquals("getTransactionID", tx2, clone.getTransactionId()); - assertEquals("isReady", false, clone.isReady()); + assertEquals("isReady", true, clone.isReady()); + assertEquals("isDoCommitOnReady", true, clone.isDoCommitOnReady()); + assertEquals("participatingShardNames present", true, clone.getParticipatingShardNames().isPresent()); + assertEquals("participatingShardNames", shardNames, clone.getParticipatingShardNames().get()); + assertEquals("getModifications size", 0, clone.getModifications().size()); + + // Test not ready. + batched = new BatchedModifications(tx2, DataStoreVersions.CURRENT_VERSION); + + clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable()); + + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion()); + assertEquals("getTransactionID", tx2, clone.getTransactionId()); + assertEquals("isReady", false, clone.isReady()); assertEquals("getModifications size", 0, clone.getModifications().size()); + // Test pre-Flourine + + batched = new BatchedModifications(tx2, DataStoreVersions.BORON_VERSION); + batched.addModification(new WriteModification(writePath, writeData)); + batched.setReady(Optional.of(ImmutableSortedSet.of("one", "two"))); + + clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable()); + + assertEquals("getVersion", DataStoreVersions.BORON_VERSION, clone.getVersion()); + assertEquals("getTransactionID", tx2, clone.getTransactionId()); + assertEquals("isReady", true, clone.isReady()); + assertEquals("participatingShardNames present", false, clone.getParticipatingShardNames().isPresent()); + assertEquals("getModifications size", 1, clone.getModifications().size()); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializerTest.java index 16e1ab3fcb..257ce4f683 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializerTest.java @@ -12,8 +12,11 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ExtendedActorSystem; import akka.testkit.javadsl.TestKit; +import com.google.common.collect.ImmutableSortedSet; import java.io.NotSerializableException; import java.util.List; +import java.util.Optional; +import java.util.SortedSet; import org.junit.Test; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.AbstractTest; @@ -48,8 +51,10 @@ public class ReadyLocalTransactionSerializerTest extends AbstractTest { MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification); + final SortedSet shardNames = ImmutableSortedSet.of("one", "two"); TransactionIdentifier txId = nextTransactionId(); - ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true); + ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true, + Optional.of(shardNames)); final ExtendedActorSystem system = (ExtendedActorSystem) ExtendedActorSystem.create("test"); final Object deserialized; @@ -66,6 +71,10 @@ public class ReadyLocalTransactionSerializerTest extends AbstractTest { BatchedModifications batched = (BatchedModifications)deserialized; assertEquals("getTransactionID", txId, batched.getTransactionId()); assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, batched.getVersion()); + assertEquals("isReady", true, batched.isReady()); + assertEquals("isDoCommitOnReady", true, batched.isDoCommitOnReady()); + assertEquals("participatingShardNames present", true, batched.getParticipatingShardNames().isPresent()); + assertEquals("participatingShardNames", shardNames, batched.getParticipatingShardNames().get()); List batchedMods = batched.getModifications(); assertEquals("getModifications size", 2, batchedMods.size()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index 836d7f9cfa..1f7c3fd67f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -60,6 +60,10 @@ public final class TestModel { return YangParserTestUtils.parseYangResource(DATASTORE_TEST_YANG); } + public static DataContainerChild outerMapNode() { + return ImmutableNodes.mapNodeBuilder(OUTER_LIST_QNAME).build(); + } + public static DataContainerChild outerNode(final int... ids) { CollectionNodeBuilder outer = ImmutableNodes.mapNodeBuilder(OUTER_LIST_QNAME); for (int id: ids) { @@ -115,4 +119,8 @@ public final class TestModel { public static YangInstanceIdentifier innerEntryPath(final int id, final String name) { return OUTER_LIST_PATH.node(outerEntryKey(id)).node(INNER_LIST_QNAME).node(innerEntryKey(name)); } + + public static YangInstanceIdentifier innerMapPath(final int id) { + return OUTER_LIST_PATH.node(outerEntryKey(id)).node(INNER_LIST_QNAME); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 5b084753b6..abefdf0ce9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -5,6 +5,7 @@ akka { loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] actor { + warn-about-java-serializer-usage = false } } @@ -63,6 +64,8 @@ test-config { serialization-bindings { "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } + + warn-about-java-serializer-usage = false } remote { log-remote-lifecycle-events = off @@ -127,6 +130,8 @@ Member1 { serialization-bindings { "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } + + warn-about-java-serializer-usage = false } remote { log-remote-lifecycle-events = off @@ -188,6 +193,8 @@ Member2 { serialization-bindings { "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } + + warn-about-java-serializer-usage = false } remote { log-remote-lifecycle-events = off @@ -251,6 +258,8 @@ Member3 { serialization-bindings { "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } + + warn-about-java-serializer-usage = false } remote { log-remote-lifecycle-events = off @@ -314,6 +323,8 @@ Member4 { serialization-bindings { "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } + + warn-about-java-serializer-usage = false } remote { log-remote-lifecycle-events = off @@ -377,6 +388,8 @@ Member5 { serialization-bindings { "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } + + warn-about-java-serializer-usage = false } remote { log-remote-lifecycle-events = off @@ -440,6 +453,8 @@ Member256 { serialization-bindings { "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } + + warn-about-java-serializer-usage = false } remote { log-remote-lifecycle-events = off -- 2.36.6