From a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 25 Jul 2016 20:59:37 +0200 Subject: [PATCH] BUG-5280: expand ShardDataTree to cover transaction mechanics A chunk of ShardCommitCoordinator should actually be implemented by ShardDataTree. This includes transaction queueing, commit timers, interaction with user cohorts and persistence. This patch implements the relevant operations in an message-agnostic, callback-driven way. Fix: ShardDataTreeTest (missing ShardStat MBean) Change-Id: I353bacce8245df85c5f4d6b4cc0ce5416f2f0337 Signed-off-by: Robert Varga Signed-off-by: Vaclav Demcak Signed-off-by: Tom Pantelis --- .../datastore/ChainedCommitCohort.java | 39 +- .../cluster/datastore/CohortEntry.java | 110 +- .../datastore/CompositeDataTreeCohort.java | 25 +- .../DataTreeCohortActorRegistry.java | 39 +- .../controller/cluster/datastore/Shard.java | 286 ++--- .../datastore/ShardCommitCoordinator.java | 543 +++------- .../cluster/datastore/ShardDataTree.java | 385 ++++++- .../datastore/ShardDataTreeCohort.java | 34 +- .../datastore/ShardSnapshotCohort.java | 8 - .../datastore/SimpleShardDataTreeCohort.java | 226 ++-- .../cluster/datastore/AbstractShardTest.java | 283 +++-- .../DataChangeListenerSupportTest.java | 8 +- .../DataTreeChangeListenerSupportTest.java | 5 +- .../DistributedDataStoreIntegrationTest.java | 21 +- .../datastore/ShardDataTreeMocking.java | 142 +++ .../cluster/datastore/ShardDataTreeTest.java | 106 +- .../ShardRecoveryCoordinatorTest.java | 5 +- .../cluster/datastore/ShardTest.java | 999 +++++++----------- .../ShardTransactionFailureTest.java | 5 +- .../datastore/ShardTransactionTest.java | 74 +- .../SimpleShardDataTreeCohortTest.java | 234 ++-- .../CandidateListChangeListenerTest.java | 20 +- ...DistributedEntityOwnershipServiceTest.java | 5 +- .../EntityOwnerChangeListenerTest.java | 8 +- .../EntityOwnershipShardTest.java | 5 +- .../EntityOwnershipStatisticsTest.java | 10 +- .../PruningDataTreeModificationTest.java | 10 +- 27 files changed, 1931 insertions(+), 1704 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java index 9c2e91d7bb..401c15b542 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java @@ -8,9 +8,11 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; +import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; @@ -29,33 +31,36 @@ final class ChainedCommitCohort extends ShardDataTreeCohort { } @Override - public ListenableFuture commit() { - final ListenableFuture ret = delegate.commit(); - - Futures.addCallback(ret, new FutureCallback() { + public void commit(final FutureCallback callback) { + delegate.commit(new FutureCallback() { @Override - public void onSuccess(Void result) { + public void onSuccess(final UnsignedLong result) { chain.clearTransaction(transaction); LOG.debug("Committed transaction {}", transaction); + callback.onSuccess(result); } @Override - public void onFailure(Throwable t) { + public void onFailure(final Throwable t) { LOG.error("Transaction {} commit failed, cannot recover", transaction, t); + callback.onFailure(t); } }); + } - return ret; + @Override + public TransactionIdentifier getIdentifier() { + return delegate.getIdentifier(); } @Override - public ListenableFuture canCommit() { - return delegate.canCommit(); + public void canCommit(final FutureCallback callback) { + delegate.canCommit(callback); } @Override - public ListenableFuture preCommit() { - return delegate.preCommit(); + public void preCommit(final FutureCallback callback) { + delegate.preCommit(callback); } @Override @@ -72,4 +77,14 @@ final class ChainedCommitCohort extends ShardDataTreeCohort { DataTreeModification getDataTreeModification() { return delegate.getDataTreeModification(); } + + @Override + public boolean isFailed() { + return delegate.isFailed(); + } + + @Override + public State getState() { + return delegate.getState(); + } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java index 06d3ec9d67..767749af29 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java @@ -8,38 +8,22 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; -import akka.util.Timeout; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import scala.concurrent.duration.Duration; final class CohortEntry { - enum State { - PENDING, - CAN_COMMITTED, - PRE_COMMITTED, - COMMITTED, - ABORTED - } - - private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); - - private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); private final ReadWriteShardDataTreeTransaction transaction; private final TransactionIdentifier transactionID; - private final CompositeDataTreeCohort userCohorts; private final short clientVersion; - private State state = State.PENDING; private RuntimeException lastBatchedModificationsException; private int totalBatchedModificationsReceived; private ShardDataTreeCohort cohort; @@ -47,36 +31,25 @@ final class CohortEntry { private ActorRef replySender; private Shard shard; - private CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction, - DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) { + private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) { this.transaction = Preconditions.checkNotNull(transaction); - this.transactionID = Preconditions.checkNotNull(transactionID); + this.transactionID = transaction.getId(); this.clientVersion = clientVersion; - this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT); } - private CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry, - SchemaContext schema, short clientVersion) { - this.transactionID = Preconditions.checkNotNull(transactionID); - this.cohort = cohort; + private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) { + this.cohort = Preconditions.checkNotNull(cohort); + this.transactionID = cohort.getIdentifier(); this.transaction = null; this.clientVersion = clientVersion; - this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT); - } - - static CohortEntry createOpen(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction, - DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) { - return new CohortEntry(transactionID, transaction, cohortRegistry, schema, clientVersion); } - static CohortEntry createReady(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, - DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) { - return new CohortEntry(transactionID, cohort, cohortRegistry, schema, clientVersion); + static CohortEntry createOpen(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) { + return new CohortEntry(transaction, clientVersion); } - void updateLastAccessTime() { - lastAccessTimer.reset(); - lastAccessTimer.start(); + static CohortEntry createReady(final ShardDataTreeCohort cohort, final short clientVersion) { + return new CohortEntry(cohort, clientVersion); } TransactionIdentifier getTransactionID() { @@ -87,12 +60,8 @@ final class CohortEntry { return clientVersion; } - State getState() { - return state; - } - - DataTreeCandidate getCandidate() { - return cohort.getCandidate(); + boolean isFailed() { + return cohort != null && cohort.isFailed(); } DataTreeModification getDataTreeModification() { @@ -111,7 +80,7 @@ final class CohortEntry { return lastBatchedModificationsException; } - void applyModifications(Iterable modifications) { + void applyModifications(final Iterable modifications) { totalBatchedModificationsReceived++; if(lastBatchedModificationsException == null) { for (Modification modification : modifications) { @@ -125,43 +94,25 @@ final class CohortEntry { } } - boolean canCommit() throws InterruptedException, ExecutionException { - state = State.CAN_COMMITTED; - - // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry - // about possibly accessing our state on a different thread outside of our dispatcher. - // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why - // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously - // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker. - return cohort.canCommit().get(); + void canCommit(final FutureCallback callback) { + cohort.canCommit(callback); } - - - void preCommit() throws InterruptedException, ExecutionException, TimeoutException { - state = State.PRE_COMMITTED; - cohort.preCommit().get(); - userCohorts.canCommit(cohort.getCandidate()); - userCohorts.preCommit(); + void preCommit(final FutureCallback callback) { + cohort.preCommit(callback); } - void commit() throws InterruptedException, ExecutionException, TimeoutException { - state = State.COMMITTED; - cohort.commit().get(); - userCohorts.commit(); + void commit(final FutureCallback callback) { + cohort.commit(callback); } void abort() throws InterruptedException, ExecutionException, TimeoutException { - state = State.ABORTED; cohort.abort().get(); - userCohorts.abort(); } - void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) { + void ready(final CohortDecorator cohortDecorator) { Preconditions.checkState(cohort == null, "cohort was already set"); - setDoImmediateCommit(doImmediateCommit); - cohort = transaction.ready(); if(cohortDecorator != null) { @@ -170,19 +121,11 @@ final class CohortEntry { } } - boolean isReadyToCommit() { - return replySender != null; - } - - boolean isExpired(long expireTimeInMillis) { - return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis; - } - boolean isDoImmediateCommit() { return doImmediateCommit; } - void setDoImmediateCommit(boolean doImmediateCommit) { + void setDoImmediateCommit(final boolean doImmediateCommit) { this.doImmediateCommit = doImmediateCommit; } @@ -190,7 +133,7 @@ final class CohortEntry { return replySender; } - void setReplySender(ActorRef replySender) { + void setReplySender(final ActorRef replySender) { this.replySender = replySender; } @@ -198,15 +141,10 @@ final class CohortEntry { return shard; } - void setShard(Shard shard) { + void setShard(final Shard shard) { this.shard = shard; } - - boolean isAborted() { - return state == State.ABORTED; - } - @Override public String toString() { final StringBuilder builder = new StringBuilder(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java index d833962277..8115473a05 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -21,12 +21,13 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import java.util.Collection; import java.util.Iterator; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; @@ -85,7 +86,7 @@ class CompositeDataTreeCohort { protected static final Recover EXCEPTION_TO_MESSAGE = new Recover() { @Override - public Failure recover(Throwable error) throws Throwable { + public Failure recover(final Throwable error) throws Throwable { return new Failure(error); } }; @@ -98,21 +99,21 @@ class CompositeDataTreeCohort { private Iterable successfulFromPrevious; private State state = State.IDLE; - CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, TransactionIdentifier transactionID, - SchemaContext schema, Timeout timeout) { + CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID, + final SchemaContext schema, final Timeout timeout) { this.registry = Preconditions.checkNotNull(registry); this.txId = Preconditions.checkNotNull(transactionID); this.schema = Preconditions.checkNotNull(schema); this.timeout = Preconditions.checkNotNull(timeout); } - void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException { + void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException { Collection messages = registry.createCanCommitMessages(txId, tip, schema); // FIXME: Optimize empty collection list with pre-created futures, containing success. Future> canCommitsFuture = Futures.traverse(messages, new Function>() { @Override - public Future apply(CanCommit input) { + public Future apply(final CanCommit input) { return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, ExecutionContexts.global()); } @@ -135,24 +136,26 @@ class CompositeDataTreeCohort { processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED); } - void abort() throws TimeoutException { + Optional>> abort() { if (successfulFromPrevious != null) { - sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)); + return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId))); } + + return Optional.empty(); } private Future> sendMesageToSuccessful(final Object message) { return Futures.traverse(successfulFromPrevious, new Function>() { @Override - public Future apply(DataTreeCohortActor.Success cohortResponse) throws Exception { + public Future apply(final DataTreeCohortActor.Success cohortResponse) throws Exception { return Patterns.ask(cohortResponse.getCohort(), message, timeout); } }, ExecutionContexts.global()); } - private void processResponses(Future> resultsFuture, State currentState, State afterState) + private void processResponses(final Future> resultsFuture, final State currentState, final State afterState) throws TimeoutException, ExecutionException { final Iterable results; try { @@ -179,7 +182,7 @@ class CompositeDataTreeCohort { changeStateFrom(currentState, afterState); } - void changeStateFrom(State expected, State followup) { + void changeStateFrom(final State expected, final State followup) { Preconditions.checkState(state == expected); state = followup; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java index 28b3c707a9..fb3743d426 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Status; +import akka.util.Timeout; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; @@ -48,7 +49,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { private final Map> cohortToNode = new HashMap<>(); - void registerCohort(ActorRef sender, RegisterCohort cohort) { + void registerCohort(final ActorRef sender, final RegisterCohort cohort) { takeLock(); try { final ActorRef cohortRef = cohort.getCohort(); @@ -65,7 +66,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { sender.tell(new Status.Success(null), ActorRef.noSender()); } - void removeCommitCohort(ActorRef sender, RemoveCohort message) { + void removeCommitCohort(final ActorRef sender, final RemoveCohort message) { final ActorRef cohort = message.getCohort(); final RegistrationTreeNode node = cohortToNode.get(cohort); if (node != null) { @@ -76,14 +77,14 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { cohort.tell(PoisonPill.getInstance(), cohort); } - Collection createCanCommitMessages(TransactionIdentifier txId, - DataTreeCandidate candidate, SchemaContext schema) { + Collection createCanCommitMessages(final TransactionIdentifier txId, + final DataTreeCandidate candidate, final SchemaContext schema) { try (RegistrationTreeSnapshot cohorts = takeSnapshot()) { return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode()); } } - void process(ActorRef sender, CohortRegistryCommand message) { + void process(final ActorRef sender, final CohortRegistryCommand message) { if (message instanceof RegisterCohort) { registerCohort(sender, (RegisterCohort) message); } else if (message instanceof RemoveCohort) { @@ -95,7 +96,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { private final ActorRef cohort; - CohortRegistryCommand(ActorRef cohort) { + CohortRegistryCommand(final ActorRef cohort) { this.cohort = Preconditions.checkNotNull(cohort); } @@ -108,7 +109,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { private final DOMDataTreeIdentifier path; - RegisterCohort(DOMDataTreeIdentifier path, ActorRef cohort) { + RegisterCohort(final DOMDataTreeIdentifier path, final ActorRef cohort) { super(cohort); this.path = path; @@ -122,7 +123,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { static class RemoveCohort extends CohortRegistryCommand { - RemoveCohort(ActorRef cohort) { + RemoveCohort(final ActorRef cohort) { super(cohort); } @@ -136,14 +137,14 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { private final Collection messages = new ArrayList<>(); - CanCommitMessageBuilder(TransactionIdentifier txId, DataTreeCandidate candidate, SchemaContext schema) { + CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate, final SchemaContext schema) { this.txId = Preconditions.checkNotNull(txId); this.candidate = Preconditions.checkNotNull(candidate); this.schema = schema; } - private void lookupAndCreateCanCommits(List args, int offset, - RegistrationTreeNode node) { + private void lookupAndCreateCanCommits(final List args, final int offset, + final RegistrationTreeNode node) { if (args.size() != offset) { final PathArgument arg = args.get(offset); @@ -159,8 +160,8 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { } } - private void lookupAndCreateCanCommits(YangInstanceIdentifier path, RegistrationTreeNode regNode, - DataTreeCandidateNode candNode) { + private void lookupAndCreateCanCommits(final YangInstanceIdentifier path, final RegistrationTreeNode regNode, + final DataTreeCandidateNode candNode) { if (candNode.getModificationType() == ModificationType.UNMODIFIED) { LOG.debug("Skipping unmodified candidate {}", path); return; @@ -186,8 +187,8 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { } } - private void createCanCommits(Collection regs, YangInstanceIdentifier path, - DataTreeCandidateNode node) { + private void createCanCommits(final Collection regs, final YangInstanceIdentifier path, + final DataTreeCandidateNode node) { final DOMDataTreeCandidate candidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node); for (final ActorRef reg : regs) { final CanCommit message = new DataTreeCohortActor.CanCommit(txId, candidate, schema, reg); @@ -195,15 +196,19 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { } } - private static DOMDataTreeIdentifier treeIdentifier(YangInstanceIdentifier path) { + private static DOMDataTreeIdentifier treeIdentifier(final YangInstanceIdentifier path) { return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path); } - private Collection perform(RegistrationTreeNode rootNode) { + private Collection perform(final RegistrationTreeNode rootNode) { final List toLookup = candidate.getRootPath().getPathArguments(); lookupAndCreateCanCommits(toLookup, 0, rootNode); return messages; } } + CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId, + final Timeout commitStepTimeout) { + return new CompositeDataTreeCohort(this, txId, schemaContext, commitStepTimeout); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index bd44f0b41d..3fe349f798 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -16,12 +16,11 @@ import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; +import com.google.common.base.Ticker; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -39,7 +38,6 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; @@ -52,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; @@ -67,9 +64,8 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; @@ -134,7 +130,7 @@ public class Shard extends RaftActor { private final ShardTransactionMessageRetrySupport messageRetrySupport; - protected Shard(AbstractBuilder builder) { + protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); @@ -146,9 +142,17 @@ public class Shard extends RaftActor { LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); - store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType(), - new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"), - new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"), name); + ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher = + new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"); + ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher = + new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"); + if(builder.getDataTree() != null) { + store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), + treeChangeListenerPublisher, dataChangeListenerPublisher, name); + } else { + store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), + treeChangeListenerPublisher, dataChangeListenerPublisher, name); + } shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(), datastoreContext.getDataStoreMXBeanType()); @@ -158,9 +162,7 @@ public class Shard extends RaftActor { getContext().become(new MeteringBehavior(this)); } - commitCoordinator = new ShardCommitCoordinator(store, - datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(), - datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, this.name); + commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name); setTransactionCommitTimeout(); @@ -185,7 +187,7 @@ public class Shard extends RaftActor { datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2; } - private Optional createRoleChangeNotifier(String shardId) { + private Optional createRoleChangeNotifier(final String shardId) { ActorRef shardRoleChangeNotifier = this.getContext().actorOf( RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); return Optional.of(shardRoleChangeNotifier); @@ -199,7 +201,7 @@ public class Shard extends RaftActor { messageRetrySupport.close(); - if(txCommitTimeoutCheckSchedule != null) { + if (txCommitTimeoutCheckSchedule != null) { txCommitTimeoutCheckSchedule.cancel(); } @@ -255,24 +257,25 @@ public class Shard extends RaftActor { setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress()); } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) { + store.checkForExpiredTransactions(transactionCommitTimeout); commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); - } else if(message instanceof DatastoreContext) { + } else if (message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); - } else if(message instanceof RegisterRoleChangeListener){ + } else if (message instanceof RegisterRoleChangeListener){ roleChangeNotifier.get().forward(message, context()); } else if (message instanceof FollowerInitialSyncUpStatus) { shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone()); context().parent().tell(message, self()); - } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){ + } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)){ sender().tell(getShardMBean(), self()); - } else if(message instanceof GetShardDataTree) { + } else if (message instanceof GetShardDataTree) { sender().tell(store.getDataTree(), self()); - } else if(message instanceof ServerRemoved){ + } else if (message instanceof ServerRemoved){ context().parent().forward(message, context()); - } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { + } else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { messageRetrySupport.onTimerMessage(message); } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { - commitCoordinator.processCohortRegistryCommand(getSender(), + store.processCohortRegistryCommand(getSender(), (DataTreeCohortActorRegistry.CohortRegistryCommand) message); } else { super.handleNonRaftCommand(message); @@ -285,7 +288,7 @@ public class Shard extends RaftActor { } public int getPendingTxCommitQueueSize() { - return commitCoordinator.getQueueSize(); + return store.getQueueSize(); } public int getCohortCacheSize() { @@ -298,61 +301,40 @@ public class Shard extends RaftActor { } @Override - protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { + protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion) : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } - protected void onDatastoreContext(DatastoreContext context) { + protected void onDatastoreContext(final DatastoreContext context) { datastoreContext = context; - commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity()); - setTransactionCommitTimeout(); - if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) { + if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) { setPersistence(true); - } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) { + } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) { setPersistence(false); } updateConfigParams(datastoreContext.getShardRaftConfig()); } - private static boolean isEmptyCommit(final DataTreeCandidate candidate) { - return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType()); + boolean canSkipPayload() { + // If we do not have any followers and we are not using persistence we can apply modification to the state + // immediately + return !hasFollowers() && !persistence().isRecoveryApplicable(); } - void continueCommit(final CohortEntry cohortEntry) { - final DataTreeCandidate candidate = cohortEntry.getCandidate(); - final TransactionIdentifier transactionId = cohortEntry.getTransactionID(); - - // If we do not have any followers and we are not using persistence - // or if cohortEntry has no modifications - // we can apply modification to the state immediately - if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { - applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate); - return; - } - - final Payload payload; - try { - payload = CommitTransactionPayload.create(transactionId, candidate); - } catch (IOException e) { - LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate, - e); - // TODO: do we need to do something smarter here? - throw Throwables.propagate(e); - } - - persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload); + // applyState() will be invoked once consensus is reached on the payload + void persistPayload(final TransactionIdentifier transactionId, final Payload payload) { + // We are faking the sender + persistData(self(), transactionId, payload); } private void handleCommitTransaction(final CommitTransaction commit) { if (isLeader()) { - if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { - shardMBean.incrementFailedTransactionsCount(); - } + commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -365,86 +347,6 @@ public class Shard extends RaftActor { } } - private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID, - @Nonnull final CohortEntry cohortEntry) { - LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); - - try { - try { - cohortEntry.commit(); - } catch(ExecutionException e) { - // We may get a "store tree and candidate base differ" IllegalStateException from commit under - // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last - // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before - // applying it to the state. We then become the leader and a second tx is pre-committed and - // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign - // candidate via applyState prior to the second tx. Since the second tx has already been - // pre-committed, when it gets here to commit it will get an IllegalStateException. - - // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner - // solution will be forthcoming. - if(e.getCause() instanceof IllegalStateException) { - LOG.debug("{}: commit failed for transaction {} - retrying as foreign candidate", persistenceId(), - transactionID, e); - store.applyForeignCandidate(transactionID, cohortEntry.getCandidate()); - } else { - throw e; - } - } - - sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf()); - - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); - - } catch (Exception e) { - sender.tell(new akka.actor.Status.Failure(e), getSelf()); - - LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(), - transactionID, e); - shardMBean.incrementFailedTransactionsCount(); - } finally { - commitCoordinator.currentTransactionComplete(transactionID, true); - } - } - - private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) { - // With persistence enabled, this method is called via applyState by the leader strategy - // after the commit has been replicated to a majority of the followers. - - CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); - if (cohortEntry == null) { - // The transaction is no longer the current commit. This can happen if the transaction - // was aborted prior, most likely due to timeout in the front-end. We need to finish - // committing the transaction though since it was successfully persisted and replicated - // however we can't use the original cohort b/c it was already preCommitted and may - // conflict with the current commit or may have been aborted so we commit with a new - // transaction. - cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); - if(cohortEntry != null) { - try { - store.applyForeignCandidate(transactionID, cohortEntry.getCandidate()); - } catch (DataValidationFailedException e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e); - } - - sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), - getSelf()); - } else { - // This really shouldn't happen - it likely means that persistence or replication - // took so long to complete such that the cohort entry was expired from the cache. - IllegalStateException ex = new IllegalStateException( - String.format("%s: Could not finish committing transaction %s - no CohortEntry found", - persistenceId(), transactionID)); - LOG.error(ex.getMessage()); - sender.tell(new akka.actor.Status.Failure(ex), getSelf()); - } - } else { - finishCommit(sender, transactionID, cohortEntry); - } - } - private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID()); @@ -462,7 +364,7 @@ public class Shard extends RaftActor { } } - protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) { + protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { try { commitCoordinator.handleBatchedModifications(batched, sender, this); } catch (Exception e) { @@ -472,7 +374,7 @@ public class Shard extends RaftActor { } } - private void handleBatchedModifications(BatchedModifications batched) { + private void handleBatchedModifications(final BatchedModifications batched) { // This message is sent to prepare the modifications transaction directly on the Shard as an // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last // BatchedModifications message, the caller sets the ready flag in the message indicating @@ -504,15 +406,15 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(), newModifications.size(), leader); - for(BatchedModifications bm: newModifications) { + for (BatchedModifications bm : newModifications) { leader.forward(bm, getContext()); } } } } - private boolean failIfIsolatedLeader(ActorRef sender) { - if(isIsolatedLeader()) { + private boolean failIfIsolatedLeader(final ActorRef sender) { + if (isIsolatedLeader()) { sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( "Shard %s was the leader but has lost contact with all of its followers. Either all" + " other follower nodes are down or this node is isolated by a network partition.", @@ -552,13 +454,12 @@ public class Shard extends RaftActor { } } - private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) { + private void handleForwardedReadyTransaction(final ForwardedReadyTransaction forwardedReady) { LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID()); boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { - commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this, - store.getSchemaContext()); + commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { @@ -598,9 +499,9 @@ public class Shard extends RaftActor { store.closeTransactionChain(closeTransactionChain.getIdentifier()); } - private void createTransaction(CreateTransaction createTransaction) { + private void createTransaction(final CreateTransaction createTransaction) { try { - if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && + if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && failIfIsolatedLeader(getSender())) { return; } @@ -615,25 +516,12 @@ public class Shard extends RaftActor { } } - private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) { + private ActorRef createTransaction(final int transactionType, final TransactionIdentifier transactionId) { LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId); return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType), transactionId); } - private void commitWithNewTransaction(final BatchedModifications modification) { - ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID()); - modification.apply(tx.getSnapshot()); - try { - snapshotCohort.syncCommitTransaction(tx); - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); - } catch (Exception e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("{}: Failed to commit", persistenceId(), e); - } - } - private void updateSchemaContext(final UpdateSchemaContext message) { updateSchemaContext(message.getSchemaContext()); } @@ -669,7 +557,7 @@ public class Shard extends RaftActor { getContext().parent().tell(new ActorInitialized(), getSelf()); // Being paranoid here - this method should only be called once but just in case... - if(txCommitTimeoutCheckSchedule == null) { + if (txCommitTimeoutCheckSchedule == null) { // Schedule a message to be periodically sent to check if the current in-progress // transaction should be expired and aborted. FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS); @@ -685,13 +573,13 @@ public class Shard extends RaftActor { if (clientActor == null) { // No clientActor indicates a replica coming from the leader try { - store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue()); + store.applyStateFromLeader(identifier, (DataTreeCandidateSupplier)data); } catch (DataValidationFailedException | IOException e) { LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e); } } else { // Replication consensus reached, proceed to commit - finishCommit(clientActor, identifier); + store.payloadReplicationComplete(identifier, (DataTreeCandidateSupplier)data); } } else { LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data, @@ -699,25 +587,6 @@ public class Shard extends RaftActor { } } - private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) { - if(modification == null) { - LOG.error( - "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}", - persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null); - } else if(clientActor == null) { - // There's no clientActor to which to send a commit reply so we must be applying - // replicated state from the leader. - - // The only implementation we know of is BatchedModifications, which also carries a transaction - // identifier -- which we really need that. - Preconditions.checkArgument(modification instanceof BatchedModifications); - commitWithNewTransaction((BatchedModifications)modification); - } else { - // This must be the OK to commit after replication consensus. - finishCommit(clientActor, identifier); - } - } - @Override protected void onStateChanged() { boolean isLeader = isLeader(); @@ -727,7 +596,7 @@ public class Shard extends RaftActor { // If this actor is no longer the leader close all the transaction chains if (!isLeader) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug( "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader", persistenceId(), getId()); @@ -736,29 +605,29 @@ public class Shard extends RaftActor { store.closeAllTransactionChains(); } - if(hasLeader && !isIsolatedLeader()) { + if (hasLeader && !isIsolatedLeader()) { messageRetrySupport.retryMessages(); } } @Override - protected void onLeaderChanged(String oldLeader, String newLeader) { + protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); boolean hasLeader = hasLeader(); - if(hasLeader && !isLeader()) { + if (hasLeader && !isLeader()) { // Another leader was elected. If we were the previous leader and had pending transactions, convert // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); - if(leader != null) { - Collection messagesToForward = commitCoordinator.convertPendingTransactionsToMessages( - datastoreContext.getShardBatchedModificationCount()); + if (leader != null) { + Collection messagesToForward = commitCoordinator.convertPendingTransactionsToMessages( + datastoreContext.getShardBatchedModificationCount()); - if(!messagesToForward.isEmpty()) { + if (!messagesToForward.isEmpty()) { LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(), messagesToForward.size(), leader); - for(Object message: messagesToForward) { + for (Object message : messagesToForward) { leader.tell(message, self()); } } @@ -769,15 +638,15 @@ public class Shard extends RaftActor { } } - if(hasLeader && !isIsolatedLeader()) { + if (hasLeader && !isIsolatedLeader()) { messageRetrySupport.retryMessages(); } } @Override - protected void pauseLeader(Runnable operation) { + protected void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); - commitCoordinator.setRunOnPendingTransactionsComplete(operation); + store.setRunOnPendingTransactionsComplete(operation); } @Override @@ -815,9 +684,10 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; private SchemaContext schemaContext; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; + private TipProducingDataTree dataTree; private volatile boolean sealed; - protected AbstractBuilder(Class shardClass) { + protected AbstractBuilder(final Class shardClass) { this.shardClass = shardClass; } @@ -830,36 +700,42 @@ public class Shard extends RaftActor { return (T) this; } - public T id(ShardIdentifier id) { + public T id(final ShardIdentifier id) { checkSealed(); this.id = id; return self(); } - public T peerAddresses(Map peerAddresses) { + public T peerAddresses(final Map peerAddresses) { checkSealed(); this.peerAddresses = peerAddresses; return self(); } - public T datastoreContext(DatastoreContext datastoreContext) { + public T datastoreContext(final DatastoreContext datastoreContext) { checkSealed(); this.datastoreContext = datastoreContext; return self(); } - public T schemaContext(SchemaContext schemaContext) { + public T schemaContext(final SchemaContext schemaContext) { checkSealed(); this.schemaContext = schemaContext; return self(); } - public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) { + public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) { checkSealed(); this.restoreFromSnapshot = restoreFromSnapshot; return self(); } + public T dataTree(final TipProducingDataTree dataTree) { + checkSealed(); + this.dataTree = dataTree; + return self(); + } + public ShardIdentifier getId() { return id; } @@ -880,6 +756,10 @@ public class Shard extends RaftActor { return restoreFromSnapshot; } + public TipProducingDataTree getDataTree() { + return dataTree; + } + public TreeType getTreeType() { switch (datastoreContext.getLogicalStoreType()) { case CONFIGURATION: @@ -910,4 +790,8 @@ public class Shard extends RaftActor { super(Shard.class); } } + + Ticker ticker() { + return Ticker.systemTicker(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index eb0c04dbbd..b3feadcfb9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -12,30 +12,33 @@ import akka.actor.Status.Failure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; -import java.util.List; import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ExecutionException; -import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage; import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.slf4j.Logger; /** @@ -45,89 +48,48 @@ import org.slf4j.Logger; */ final class ShardCommitCoordinator { - // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. + // Interface hook for unit tests to replace or decorate the ShardDataTreeCohorts. + @VisibleForTesting public interface CohortDecorator { ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual); } private final Map cohortCache = new HashMap<>(); - private CohortEntry currentCohortEntry; - private final ShardDataTree dataTree; - private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); - - // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls - // since this should only be accessed on the shard's dispatcher. - private final Queue queuedCohortEntries = new LinkedList<>(); - - private int queueCapacity; - private final Logger log; private final String name; - private final long cacheExpiryTimeoutInMillis; - - // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. + // This is a hook for unit tests to replace or decorate the ShardDataTreeCohorts. + @VisibleForTesting private CohortDecorator cohortDecorator; private ReadyTransactionReply readyTransactionReply; - private Runnable runOnPendingTransactionsComplete; - - ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, - String name) { - - this.queueCapacity = queueCapacity; + ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) { this.log = log; this.name = name; this.dataTree = Preconditions.checkNotNull(dataTree); - this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis; - } - - int getQueueSize() { - return queuedCohortEntries.size(); } int getCohortCacheSize() { return cohortCache.size(); } - void setQueueCapacity(int queueCapacity) { - this.queueCapacity = queueCapacity; + private String persistenceId() { + return dataTree.logContext(); } - private ReadyTransactionReply readyTransactionReply(Shard shard) { - if(readyTransactionReply == null) { - readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self())); + private ReadyTransactionReply readyTransactionReply(final ActorRef cohort) { + if (readyTransactionReply == null) { + readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(cohort)); } return readyTransactionReply; } - private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) { - if(queuedCohortEntries.size() < queueCapacity) { - queuedCohortEntries.offer(cohortEntry); - - log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(), - queuedCohortEntries.size()); - - return true; - } else { - cohortCache.remove(cohortEntry.getTransactionID()); - - final RuntimeException ex = new RuntimeException( - String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+ - " capacity %d has been reached.", - name, cohortEntry.getTransactionID(), queueCapacity)); - log.error(ex.getMessage()); - sender.tell(new Failure(ex), shard.self()); - return false; - } - } - /** * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit. @@ -137,21 +99,16 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor * @param schema */ - void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard, - SchemaContext schema) { + void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender, + final Shard shard) { log.debug("{}: Readying transaction {}, client version {}", name, ready.getTransactionID(), ready.getTxnClientVersion()); final ShardDataTreeCohort cohort = ready.getTransaction().ready(); - final CohortEntry cohortEntry = CohortEntry.createReady(ready.getTransactionID(), cohort, cohortRegistry, - schema, ready.getTxnClientVersion()); + final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion()); cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); - if(!queueCohortEntry(cohortEntry, sender, shard)) { - return; - } - - if(ready.isDoImmediateCommit()) { + if (ready.isDoImmediateCommit()) { cohortEntry.setDoImmediateCommit(true); cohortEntry.setReplySender(sender); cohortEntry.setShard(shard); @@ -159,7 +116,7 @@ final class ShardCommitCoordinator { } else { // The caller does not want immediate commit - the 3-phase commit will be coordinated by the // front-end so send back a ReadyTransactionReply with our actor path. - sender.tell(readyTransactionReply(shard), shard.self()); + sender.tell(readyTransactionReply(shard.self()), shard.self()); } } @@ -172,52 +129,48 @@ final class ShardCommitCoordinator { * @param batched the BatchedModifications message to process * @param sender the sender of the message */ - void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) { + void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID()); - if(cohortEntry == null) { - cohortEntry = CohortEntry.createOpen(batched.getTransactionID(), - dataTree.newReadWriteTransaction(batched.getTransactionID()), - cohortRegistry, dataTree.getSchemaContext(), batched.getVersion()); + if (cohortEntry == null) { + cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionID()), + batched.getVersion()); cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); } - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("{}: Applying {} batched modifications for Tx {}", name, batched.getModifications().size(), batched.getTransactionID()); } cohortEntry.applyModifications(batched.getModifications()); - if(batched.isReady()) { - if(cohortEntry.getLastBatchedModificationsException() != null) { + if (batched.isReady()) { + if (cohortEntry.getLastBatchedModificationsException() != null) { cohortCache.remove(cohortEntry.getTransactionID()); throw cohortEntry.getLastBatchedModificationsException(); } - if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) { + if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) { cohortCache.remove(cohortEntry.getTransactionID()); throw new IllegalStateException(String.format( "The total number of batched messages received %d does not match the number sent %d", cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent())); } - if(!queueCohortEntry(cohortEntry, sender, shard)) { - return; - } - - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("{}: Readying Tx {}, client version {}", name, batched.getTransactionID(), batched.getVersion()); } - cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady()); + cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); + cohortEntry.ready(cohortDecorator); - if(batched.isDoCommitOnReady()) { + if (batched.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); cohortEntry.setShard(shard); handleCanCommit(cohortEntry); } else { - sender.tell(readyTransactionReply(shard), shard.self()); + sender.tell(readyTransactionReply(shard.self()), shard.self()); } } else { sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self()); @@ -233,18 +186,13 @@ final class ShardCommitCoordinator { * @param sender the sender of the message * @param shard the transaction's shard actor */ - void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) { - final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(), - message.getTransactionID()); - final CohortEntry cohortEntry = CohortEntry.createReady(message.getTransactionID(), cohort, cohortRegistry, - dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION); + void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) { + final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionID(), + message.getModification()); + final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION); cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); - if(!queueCohortEntry(cohortEntry, sender, shard)) { - return; - } - log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID()); if (message.isDoCommitOnReady()) { @@ -252,14 +200,14 @@ final class ShardCommitCoordinator { cohortEntry.setShard(shard); handleCanCommit(cohortEntry); } else { - sender.tell(readyTransactionReply(shard), shard.self()); + sender.tell(readyTransactionReply(shard.self()), shard.self()); } } Collection createForwardedBatchedModifications(final BatchedModifications from, final int maxModificationsPerBatch) { - CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID()); - if(cohortEntry == null || cohortEntry.getTransaction() == null) { + CohortEntry cohortEntry = cohortCache.remove(from.getTransactionID()); + if (cohortEntry == null || cohortEntry.getTransaction() == null) { return Collections.singletonList(from); } @@ -269,7 +217,7 @@ final class ShardCommitCoordinator { cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() { @Override protected BatchedModifications getModifications() { - if(newModifications.isEmpty() || + if (newModifications.isEmpty() || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion())); } @@ -285,34 +233,30 @@ final class ShardCommitCoordinator { return newModifications; } - private void handleCanCommit(CohortEntry cohortEntry) { - cohortEntry.updateLastAccessTime(); - - if(currentCohortEntry != null) { - // There's already a Tx commit in progress so we can't process this entry yet - but it's in the - // queue and will get processed after all prior entries complete. + private void handleCanCommit(final CohortEntry cohortEntry) { + cohortEntry.canCommit(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionID()); - if(log.isDebugEnabled()) { - log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now", - name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID()); + if (cohortEntry.isDoImmediateCommit()) { + doCommit(cohortEntry); + } else { + cohortEntry.getReplySender().tell( + CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable(), + cohortEntry.getShard().self()); + } } - return; - } + @Override + public void onFailure(final Throwable t) { + log.debug("{}: An exception occurred during canCommit for {}: {}", name, + cohortEntry.getTransactionID(), t); - // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make - // it the current entry and proceed with canCommit. - // Purposely checking reference equality here. - if(queuedCohortEntries.peek() == cohortEntry) { - currentCohortEntry = queuedCohortEntries.poll(); - doCanCommit(currentCohortEntry); - } else { - if(log.isDebugEnabled()) { - log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name, - queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???", - cohortEntry.getTransactionID()); + cohortCache.remove(cohortEntry.getTransactionID()); + cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self()); } - } + }); } /** @@ -322,15 +266,15 @@ final class ShardCommitCoordinator { * @param sender the actor to which to send the response * @param shard the transaction's shard actor */ - void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) { + void handleCanCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { // Lookup the cohort entry that was cached previously (or should have been) by // transactionReady (via the ForwardedReadyTransaction message). final CohortEntry cohortEntry = cohortCache.get(transactionID); - if(cohortEntry == null) { - // Either canCommit was invoked before ready(shouldn't happen) or a long time passed - // between canCommit and ready and the entry was expired from the cache. + if (cohortEntry == null) { + // Either canCommit was invoked before ready (shouldn't happen) or a long time passed + // between canCommit and ready and the entry was expired from the cache or it was aborted. IllegalStateException ex = new IllegalStateException( - String.format("%s: No cohort entry found for transaction %s", name, transactionID)); + String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID)); log.error(ex.getMessage()); sender.tell(new Failure(ex), shard.self()); return; @@ -342,70 +286,54 @@ final class ShardCommitCoordinator { handleCanCommit(cohortEntry); } - private void doCanCommit(final CohortEntry cohortEntry) { - boolean canCommit = false; - try { - canCommit = cohortEntry.canCommit(); - - log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit); - - if(cohortEntry.isDoImmediateCommit()) { - if(canCommit) { - doCommit(cohortEntry); - } else { - cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException( - "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self()); - } - } else { - cohortEntry.getReplySender().tell( - canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() : - CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(), - cohortEntry.getShard().self()); - } - } catch (Exception e) { - log.debug("{}: An exception occurred during canCommit", name, e); - - Throwable failure = e; - if(e instanceof ExecutionException) { - failure = e.getCause(); - } - - cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); - } finally { - if(!canCommit) { - // Remove the entry from the cache now. - currentTransactionComplete(cohortEntry.getTransactionID(), true); - } - } - } - - private boolean doCommit(CohortEntry cohortEntry) { + private void doCommit(final CohortEntry cohortEntry) { log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID()); - boolean success = false; - // We perform the preCommit phase here atomically with the commit phase. This is an // optimization to eliminate the overhead of an extra preCommit message. We lose front-end // coordination of preCommit across shards in case of failure but preCommit should not // normally fail since we ensure only one concurrent 3-phase commit. + cohortEntry.preCommit(new FutureCallback() { + @Override + public void onSuccess(final DataTreeCandidate candidate) { + finishCommit(cohortEntry.getReplySender(), cohortEntry); + } - try { - cohortEntry.preCommit(); + @Override + public void onFailure(final Throwable t) { + log.error("{} An exception occurred while preCommitting transaction {}", name, + cohortEntry.getTransactionID(), t); + + cohortCache.remove(cohortEntry.getTransactionID()); + cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self()); + } + }); + } - cohortEntry.getShard().continueCommit(cohortEntry); + private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) { + log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); - cohortEntry.updateLastAccessTime(); + cohortEntry.commit(new FutureCallback() { + @Override + public void onSuccess(final UnsignedLong result) { + final TransactionIdentifier txId = cohortEntry.getTransactionID(); + log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result, + sender); - success = true; - } catch (Exception e) { - log.error("{} An exception occurred while preCommitting transaction {}", - name, cohortEntry.getTransactionID(), e); - cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self()); + cohortCache.remove(cohortEntry.getTransactionID()); + sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), + cohortEntry.getShard().self()); + } - currentTransactionComplete(cohortEntry.getTransactionID(), true); - } + @Override + public void onFailure(final Throwable t) { + log.error("{}, An exception occurred while committing transaction {}", persistenceId(), + cohortEntry.getTransactionID(), t); - return success; + cohortCache.remove(cohortEntry.getTransactionID()); + sender.tell(new Failure(t), cohortEntry.getShard().self()); + } + }); } /** @@ -414,39 +342,26 @@ final class ShardCommitCoordinator { * @param transactionID the ID of the transaction to commit * @param sender the actor to which to send the response * @param shard the transaction's shard actor - * @return true if the transaction was successfully prepared, false otherwise. */ - boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { - // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to - // this transaction. - final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); - if(cohortEntry == null) { - // We're not the current Tx - the Tx was likely expired b/c it took too long in - // between the canCommit and commit messages. + void handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { + final CohortEntry cohortEntry = cohortCache.get(transactionID); + if (cohortEntry == null) { + // Either a long time passed between canCommit and commit and the entry was expired from the cache + // or it was aborted. IllegalStateException ex = new IllegalStateException( - String.format("%s: Cannot commit transaction %s - it is not the current transaction", - name, transactionID)); + String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID)); log.error(ex.getMessage()); sender.tell(new Failure(ex), shard.self()); - return false; + return; } cohortEntry.setReplySender(sender); - return doCommit(cohortEntry); + doCommit(cohortEntry); } void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) { - CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); - if(cohortEntry != null) { - // We don't remove the cached cohort entry here (ie pass false) in case the Tx was - // aborted during replication in which case we may still commit locally if replication - // succeeds. - currentTransactionComplete(transactionID, false); - } else { - cohortEntry = getAndRemoveCohortEntry(transactionID); - } - - if(cohortEntry == null) { + CohortEntry cohortEntry = cohortCache.remove(transactionID); + if (cohortEntry == null) { return; } @@ -458,223 +373,107 @@ final class ShardCommitCoordinator { shard.getShardMBean().incrementAbortTransactionsCount(); - if(sender != null) { + if (sender != null) { sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } } catch (Exception e) { log.error("{}: An exception happened during abort", name, e); - if(sender != null) { + if (sender != null) { sender.tell(new Failure(e), self); } } } void checkForExpiredTransactions(final long timeout, final Shard shard) { - CohortEntry cohortEntry = getCurrentCohortEntry(); - if(cohortEntry != null) { - if(cohortEntry.isExpired(timeout)) { - log.warn("{}: Current transaction {} has timed out after {} ms - aborting", - name, cohortEntry.getTransactionID(), timeout); - - handleAbort(cohortEntry.getTransactionID(), null, shard); + Iterator iter = cohortCache.values().iterator(); + while (iter.hasNext()) { + CohortEntry cohortEntry = iter.next(); + if(cohortEntry.isFailed()) { + iter.remove(); } } - - cleanupExpiredCohortEntries(); } void abortPendingTransactions(final String reason, final Shard shard) { - if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) { - return; - } - - List cohortEntries = getAndClearPendingCohortEntries(); + final Failure failure = new Failure(new RuntimeException(reason)); + Collection pending = dataTree.getAndClearPendingTransactions(); - log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size()); + log.debug("{}: Aborting {} pending queued transactions", name, pending.size()); - for(CohortEntry cohortEntry: cohortEntries) { - if(cohortEntry.getReplySender() != null) { - cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self()); + for (ShardDataTreeCohort cohort : pending) { + CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier()); + if (cohortEntry == null) { + continue; } - } - } - - private List getAndClearPendingCohortEntries() { - List cohortEntries = new ArrayList<>(); - - if(currentCohortEntry != null) { - cohortEntries.add(currentCohortEntry); - cohortCache.remove(currentCohortEntry.getTransactionID()); - currentCohortEntry = null; - } - for(CohortEntry cohortEntry: queuedCohortEntries) { - cohortEntries.add(cohortEntry); - cohortCache.remove(cohortEntry.getTransactionID()); + if (cohortEntry.getReplySender() != null) { + cohortEntry.getReplySender().tell(failure, shard.self()); + } } - queuedCohortEntries.clear(); - return cohortEntries; + cohortCache.clear(); } - Collection convertPendingTransactionsToMessages(final int maxModificationsPerBatch) { - if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) { - return Collections.emptyList(); - } - - Collection messages = new ArrayList<>(); - List cohortEntries = getAndClearPendingCohortEntries(); - for(CohortEntry cohortEntry: cohortEntries) { - if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) { + Collection convertPendingTransactionsToMessages(final int maxModificationsPerBatch) { + final Collection messages = new ArrayList<>(); + for (ShardDataTreeCohort cohort : dataTree.getAndClearPendingTransactions()) { + CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier()); + if (cohortEntry == null) { continue; } - final LinkedList newModifications = new LinkedList<>(); + final Deque newMessages = new ArrayDeque<>(); cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() { @Override protected BatchedModifications getModifications() { - if(newModifications.isEmpty() || - newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { - newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(), - cohortEntry.getClientVersion())); - } + final BatchedModifications lastBatch = newMessages.peekLast(); - return newModifications.getLast(); + if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) { + return lastBatch; + } + + // Allocate a new message + final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionID(), + cohortEntry.getClientVersion()); + newMessages.add(ret); + return ret; } }); - if(!newModifications.isEmpty()) { - BatchedModifications last = newModifications.getLast(); - last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit()); + final BatchedModifications last = newMessages.peekLast(); + if (last != null) { + final boolean immediate = cohortEntry.isDoImmediateCommit(); + last.setDoCommitOnReady(immediate); last.setReady(true); - last.setTotalMessagesSent(newModifications.size()); - messages.addAll(newModifications); - - if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) { - messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(), - cohortEntry.getClientVersion())); - } + last.setTotalMessagesSent(newMessages.size()); - if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) { - messages.add(new CommitTransaction(cohortEntry.getTransactionID(), - cohortEntry.getClientVersion())); - } - } - } + messages.addAll(newMessages); - return messages; - } - - /** - * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID - * matches the current entry. - * - * @param transactionID the ID of the transaction - * @return the current CohortEntry or null if the given transaction ID does not match the - * current entry. - */ - CohortEntry getCohortEntryIfCurrent(Identifier transactionID) { - if(isCurrentTransaction(transactionID)) { - return currentCohortEntry; - } - - return null; - } - - CohortEntry getCurrentCohortEntry() { - return currentCohortEntry; - } - - CohortEntry getAndRemoveCohortEntry(Identifier transactionID) { - return cohortCache.remove(transactionID); - } - - boolean isCurrentTransaction(Identifier transactionID) { - return currentCohortEntry != null && - currentCohortEntry.getTransactionID().equals(transactionID); - } - - /** - * This method is called when a transaction is complete, successful or not. If the given - * given transaction ID matches the current in-progress transaction, the next cohort entry, - * if any, is dequeued and processed. - * - * @param transactionID the ID of the completed transaction - * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from - * the cache. - */ - void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) { - if(removeCohortEntry) { - cohortCache.remove(transactionID); - } - - if(isCurrentTransaction(transactionID)) { - currentCohortEntry = null; - - log.debug("{}: currentTransactionComplete: {}", name, transactionID); - - maybeProcessNextCohortEntry(); - } - } - - private void maybeProcessNextCohortEntry() { - // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also - // clean out expired entries. - final Iterator iter = queuedCohortEntries.iterator(); - while(iter.hasNext()) { - final CohortEntry next = iter.next(); - if(next.isReadyToCommit()) { - if(currentCohortEntry == null) { - if(log.isDebugEnabled()) { - log.debug("{}: Next entry to canCommit {}", name, next); + if (!immediate) { + switch (cohort.getState()) { + case CAN_COMMIT_COMPLETE: + case CAN_COMMIT_PENDING: + messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(), + cohortEntry.getClientVersion())); + break; + case PRE_COMMIT_COMPLETE: + case PRE_COMMIT_PENDING: + messages.add(new CommitTransaction(cohortEntry.getTransactionID(), + cohortEntry.getClientVersion())); + break; + default: + break; } - - iter.remove(); - currentCohortEntry = next; - currentCohortEntry.updateLastAccessTime(); - doCanCommit(currentCohortEntry); } - - break; - } else if(next.isExpired(cacheExpiryTimeoutInMillis)) { - log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache", - name, next.getTransactionID(), cacheExpiryTimeoutInMillis); - } else if(!next.isAborted()) { - break; } - - iter.remove(); - cohortCache.remove(next.getTransactionID()); } - maybeRunOperationOnPendingTransactionsComplete(); - } - - void cleanupExpiredCohortEntries() { - maybeProcessNextCohortEntry(); - } - - void setRunOnPendingTransactionsComplete(Runnable operation) { - runOnPendingTransactionsComplete = operation; - maybeRunOperationOnPendingTransactionsComplete(); - } - - private void maybeRunOperationOnPendingTransactionsComplete() { - if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) { - log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete); - - runOnPendingTransactionsComplete.run(); - runOnPendingTransactionsComplete = null; - } + return messages; } @VisibleForTesting - void setCohortDecorator(CohortDecorator cohortDecorator) { + void setCohortDecorator(final CohortDecorator cohortDecorator) { this.cohortDecorator = cohortDecorator; } - - void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) { - cohortRegistry.process(sender, message); - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 3296280390..89fa8fbc25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -7,38 +7,61 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.primitives.UnsignedLong; +import java.io.IOException; import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; /** * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, @@ -49,31 +72,60 @@ import org.slf4j.LoggerFactory; */ @NotThreadSafe public class ShardDataTree extends ShardDataTreeTransactionParent { + private static final class CommitEntry { + final SimpleShardDataTreeCohort cohort; + long lastAccess; + + CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) { + this.cohort = Preconditions.checkNotNull(cohort); + lastAccess = now; + } + } + + private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); private final Map transactionChains = new HashMap<>(); + private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); + private final Queue pendingTransactions = new ArrayDeque<>(); private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final TipProducingDataTree dataTree; private final String logContext; + private final Shard shard; + private Runnable runOnPendingTransactionsComplete; + private SchemaContext schemaContext; - public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType, + public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { - dataTree = InMemoryDataTreeFactory.getInstance().create(treeType); + this.dataTree = dataTree; updateSchemaContext(schemaContext); + this.shard = Preconditions.checkNotNull(shard); this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher); this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher); this.logContext = Preconditions.checkNotNull(logContext); } - public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType) { - this(schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(), + public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, + final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, + final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { + this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType), + treeChangeListenerPublisher, dataChangeListenerPublisher, logContext); + } + + @VisibleForTesting + public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { + this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); } + String logContext() { + return logContext; + } + public TipProducingDataTree getDataTree() { return dataTree; } @@ -92,6 +144,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException { + // FIXME: purge any outstanding transactions + final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); @@ -191,6 +245,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return new SimpleEntry<>(reg, readCurrentData()); } + int getQueueSize() { + return pendingTransactions.size(); + } + void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); @@ -214,7 +272,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return new SimpleShardDataTreeCohort(this, snapshot, transaction.getId()); + + return createReadyCohort(transaction.getId(), snapshot); } public Optional> readNode(final YangInstanceIdentifier path) { @@ -238,4 +297,320 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { dataTree.commit(candidate); return candidate; } + + public Collection getAndClearPendingTransactions() { + Collection ret = new ArrayList<>(pendingTransactions.size()); + for(CommitEntry entry: pendingTransactions) { + ret.add(entry.cohort); + } + + pendingTransactions.clear(); + return ret; + } + + private void processNextTransaction() { + while (!pendingTransactions.isEmpty()) { + final CommitEntry entry = pendingTransactions.peek(); + final SimpleShardDataTreeCohort cohort = entry.cohort; + final DataTreeModification modification = cohort.getDataTreeModification(); + + if(cohort.getState() != State.CAN_COMMIT_PENDING) { + break; + } + + LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); + Exception cause; + try { + dataTree.validate(modification); + LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); + cohort.successfulCanCommit(); + entry.lastAccess = shard.ticker().read(); + return; + } catch (ConflictingModificationAppliedException e) { + LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), + e.getPath()); + cause = new OptimisticLockFailedException("Optimistic lock failed.", e); + } catch (DataValidationFailedException e) { + LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(), + e.getPath(), e); + + // For debugging purposes, allow dumping of the modification. Coupled with the above + // precondition log, it should allow us to understand what went on. + LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree); + cause = new TransactionCommitFailedException("Data did not pass validation.", e); + } catch (Exception e) { + LOG.warn("{}: Unexpected failure in validation phase", logContext, e); + cause = e; + } + + // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one + pendingTransactions.poll().cohort.failedCanCommit(cause); + } + + maybeRunOperationOnPendingTransactionsComplete(); + } + + void startCanCommit(final SimpleShardDataTreeCohort cohort) { + final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort; + if (!cohort.equals(current)) { + LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier()); + return; + } + + processNextTransaction(); + } + + private void failPreCommit(final Exception cause) { + shard.getShardMBean().incrementFailedTransactionsCount(); + pendingTransactions.poll().cohort.failedPreCommit(cause); + processNextTransaction(); + } + + void startPreCommit(final SimpleShardDataTreeCohort cohort) { + final CommitEntry entry = pendingTransactions.peek(); + Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort); + + final SimpleShardDataTreeCohort current = entry.cohort; + Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); + final DataTreeCandidateTip candidate; + try { + candidate = dataTree.prepare(cohort.getDataTreeModification()); + } catch (Exception e) { + failPreCommit(e); + return; + } + + try { + cohort.userPreCommit(candidate); + } catch (ExecutionException | TimeoutException e) { + failPreCommit(e); + return; + } + + entry.lastAccess = shard.ticker().read(); + cohort.successfulPreCommit(candidate); + } + + private void failCommit(final Exception cause) { + shard.getShardMBean().incrementFailedTransactionsCount(); + pendingTransactions.poll().cohort.failedCommit(cause); + processNextTransaction(); + } + + private void finishCommit(final SimpleShardDataTreeCohort cohort) { + final TransactionIdentifier txId = cohort.getIdentifier(); + final DataTreeCandidate candidate = cohort.getCandidate(); + + LOG.debug("{}: Resuming commit of transaction {}", logContext, txId); + + try { + try { + dataTree.commit(candidate); + } catch (IllegalStateException e) { + // We may get a "store tree and candidate base differ" IllegalStateException from commit under + // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last + // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before + // applying it to the state. We then become the leader and a second tx is pre-committed and + // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign + // candidate via applyState prior to the second tx. Since the second tx has already been + // pre-committed, when it gets here to commit it will get an IllegalStateException. + + // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner + // solution will be forthcoming. + + LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e); + applyForeignCandidate(txId, candidate); + } + } catch (Exception e) { + failCommit(e); + return; + } + + shard.getShardMBean().incrementCommittedTransactionCount(); + shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); + + // FIXME: propagate journal index + + pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO); + + LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); + notifyListeners(candidate); + + processNextTransaction(); + } + + void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { + final CommitEntry entry = pendingTransactions.peek(); + Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort); + + final SimpleShardDataTreeCohort current = entry.cohort; + Verify.verify(cohort.equals(current), "Attempted to commit %s while %s is pending", cohort, current); + + if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) { + LOG.debug("{}: No replication required, proceeding to finish commit", logContext); + finishCommit(cohort); + return; + } + + final TransactionIdentifier txId = cohort.getIdentifier(); + final Payload payload; + try { + payload = CommitTransactionPayload.create(txId, candidate); + } catch (IOException e) { + LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e); + pendingTransactions.poll().cohort.failedCommit(e); + return; + } + + // Once completed, we will continue via payloadReplicationComplete + entry.lastAccess = shard.ticker().read(); + shard.persistPayload(txId, payload); + LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); + } + + private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) { + final CommitEntry current = pendingTransactions.peek(); + if (current == null) { + LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); + return; + } + + if (!current.cohort.getIdentifier().equals(txId)) { + LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext, + current.cohort.getIdentifier(), txId); + return; + } + + finishCommit(current.cohort); + } + + void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) { + // For now we do not care about anything else but transactions + Verify.verify(identifier instanceof TransactionIdentifier); + payloadReplicationComplete((TransactionIdentifier)identifier, payload); + } + + void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { + cohortRegistry.process(sender, message); + } + + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, + final DataTreeModification modification) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, + cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); + pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read())); + return cohort; + } + + void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload) + throws DataValidationFailedException, IOException { + applyForeignCandidate(identifier, payload.getCandidate().getValue()); + } + + void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { + final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); + final long now = shard.ticker().read(); + final CommitEntry currentTx = pendingTransactions.peek(); + if (currentTx != null && currentTx.lastAccess + timeout < now) { + LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, + currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState()); + boolean processNext = true; + switch (currentTx.cohort.getState()) { + case CAN_COMMIT_PENDING: + pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException()); + break; + case CAN_COMMIT_COMPLETE: + pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); + break; + case PRE_COMMIT_PENDING: + pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException()); + break; + case PRE_COMMIT_COMPLETE: + // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we + // are ready we should commit the transaction, not abort it. Our current software stack does + // not allow us to do that consistently, because we persist at the time of commit, hence + // we can end up in a state where we have pre-committed a transaction, then a leader failover + // occurred ... the new leader does not see the pre-committed transaction and does not have + // a running timer. To fix this we really need two persistence events. + // + // The first one, done at pre-commit time will hold the transaction payload. When consensus + // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not + // apply the state in this event. + // + // The second one, done at commit (or abort) time holds only the transaction identifier and + // signals to followers that the state should (or should not) be applied. + // + // In order to make the pre-commit timer working across failovers, though, we need + // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately + // restart the timer. + pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); + break; + case COMMIT_PENDING: + LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = now; + processNext = false; + return; + case ABORTED: + case COMMITTED: + case FAILED: + case READY: + default: + pendingTransactions.poll(); + } + + if (processNext) { + processNextTransaction(); + } + } + } + + void startAbort(final SimpleShardDataTreeCohort cohort) { + final Iterator it = pendingTransactions.iterator(); + if (!it.hasNext()) { + LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier()); + return; + } + + // First entry is special, as it may already be committing + final CommitEntry first = it.next(); + if (cohort.equals(first.cohort)) { + if (cohort.getState() != State.COMMIT_PENDING) { + LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(), + cohort.getIdentifier()); + pendingTransactions.poll(); + processNextTransaction(); + } else { + LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); + } + + return; + } + + while (it.hasNext()) { + final CommitEntry e = it.next(); + if (cohort.equals(e.cohort)) { + LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier()); + it.remove(); + return; + } + } + + LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier()); + } + + void setRunOnPendingTransactionsComplete(final Runnable operation) { + runOnPendingTransactionsComplete = operation; + maybeRunOperationOnPendingTransactionsComplete(); + } + + private void maybeRunOperationOnPendingTransactionsComplete() { + if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) { + LOG.debug("{}: Pending transactions complete - running operation {}", logContext, + runOnPendingTransactionsComplete); + + runOnPendingTransactionsComplete.run(); + runOnPendingTransactionsComplete = null; + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index 47876123cf..0a3a6ae177 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -8,11 +8,29 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -public abstract class ShardDataTreeCohort { +public abstract class ShardDataTreeCohort implements Identifiable { + public enum State { + READY, + CAN_COMMIT_PENDING, + CAN_COMMIT_COMPLETE, + PRE_COMMIT_PENDING, + PRE_COMMIT_COMPLETE, + COMMIT_PENDING, + + ABORTED, + COMMITTED, + FAILED, + } + ShardDataTreeCohort() { // Prevent foreign instantiation } @@ -20,15 +38,23 @@ public abstract class ShardDataTreeCohort { // FIXME: This leaks internal state generated in preCommit, // should be result of canCommit abstract DataTreeCandidateTip getCandidate(); + abstract DataTreeModification getDataTreeModification(); // FIXME: Should return rebased DataTreeCandidateTip @VisibleForTesting - public abstract ListenableFuture canCommit(); + public abstract void canCommit(FutureCallback callback); + @VisibleForTesting - public abstract ListenableFuture preCommit(); + public abstract void preCommit(FutureCallback callback); + @VisibleForTesting public abstract ListenableFuture abort(); + @VisibleForTesting - public abstract ListenableFuture commit(); + public abstract void commit(FutureCallback callback); + + public abstract boolean isFailed(); + + public abstract State getState(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 7812d70917..8bef15bbba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -12,7 +12,6 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.Optional; -import java.util.concurrent.ExecutionException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; @@ -99,13 +98,6 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { } - void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction) - throws ExecutionException, InterruptedException { - ShardDataTreeCohort commitCohort = store.finishTransaction(transaction); - commitCohort.preCommit().get(); - commitCohort.commit().get(); - } - @Override public void applySnapshot(final byte[] snapshotBytes) { // Since this will be done only on Recovery or when this actor is a Follower diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index 2842881510..bb016a28bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -7,34 +7,52 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.dispatch.ExecutionContexts; +import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; -import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; -final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { +final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class); - private static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); private final DataTreeModification transaction; private final ShardDataTree dataTree; private final TransactionIdentifier transactionId; + private final CompositeDataTreeCohort userCohorts; + + private State state = State.READY; private DataTreeCandidateTip candidate; + private FutureCallback callback; + private Exception nextFailure; SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction, - final TransactionIdentifier transactionId) { + final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) { this.dataTree = Preconditions.checkNotNull(dataTree); this.transaction = Preconditions.checkNotNull(transaction); this.transactionId = Preconditions.checkNotNull(transactionId); + this.userCohorts = Preconditions.checkNotNull(userCohorts); + } + + @Override + public TransactionIdentifier getIdentifier() { + return transactionId; } @Override @@ -43,80 +61,162 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { } @Override - public ListenableFuture canCommit() { - DataTreeModification modification = getDataTreeModification(); - try { - dataTree.getDataTree().validate(modification); - LOG.trace("Transaction {} validated", transaction); - return TRUE_FUTURE; - } - catch (ConflictingModificationAppliedException e) { - LOG.warn("Store Tx {}: Conflicting modification for path {}.", transactionId, e.getPath()); - return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e)); - } catch (DataValidationFailedException e) { - LOG.warn("Store Tx {}: Data validation failed for path {}.", transactionId, e.getPath(), e); - - // For debugging purposes, allow dumping of the modification. Coupled with the above - // precondition log, it should allow us to understand what went on. - LOG.debug("Store Tx {}: modifications: {} tree: {}", transactionId, modification, dataTree.getDataTree()); - - return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e)); - } catch (Exception e) { - LOG.warn("Unexpected failure in validation phase", e); - return Futures.immediateFailedFuture(e); + public DataTreeModification getDataTreeModification() { + DataTreeModification dataTreeModification = transaction; + if (transaction instanceof PruningDataTreeModification){ + dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification(); } + return dataTreeModification; + } + + private void checkState(State expected) { + Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected); } @Override - public ListenableFuture preCommit() { - try { - candidate = dataTree.getDataTree().prepare(getDataTreeModification()); - /* - * FIXME: this is the place where we should be interacting with persistence, specifically by invoking - * persist on the candidate (which gives us a Future). - */ - LOG.trace("Transaction {} prepared candidate {}", transaction, candidate); - return VOID_FUTURE; - } catch (Exception e) { - if(LOG.isTraceEnabled()) { - LOG.trace("Transaction {} failed to prepare", transaction, e); - } else { - LOG.error("Transaction failed to prepare", e); - } - return Futures.immediateFailedFuture(e); + public void canCommit(final FutureCallback callback) { + if(state == State.CAN_COMMIT_PENDING) { + return; } + + checkState(State.READY); + this.callback = Preconditions.checkNotNull(callback); + state = State.CAN_COMMIT_PENDING; + dataTree.startCanCommit(this); } @Override - DataTreeModification getDataTreeModification() { - DataTreeModification dataTreeModification = transaction; - if(transaction instanceof PruningDataTreeModification){ - dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification(); + public void preCommit(final FutureCallback callback) { + checkState(State.CAN_COMMIT_COMPLETE); + this.callback = Preconditions.checkNotNull(callback); + state = State.PRE_COMMIT_PENDING; + + if (nextFailure == null) { + dataTree.startPreCommit(this); + } else { + failedPreCommit(nextFailure); } - return dataTreeModification; } @Override public ListenableFuture abort() { - // No-op, really - return VOID_FUTURE; + dataTree.startAbort(this); + state = State.ABORTED; + + final Optional>> maybeAborts = userCohorts.abort(); + if (!maybeAborts.isPresent()) { + return VOID_FUTURE; + } + + final Future> aborts = maybeAborts.get(); + if (aborts.isCompleted()) { + return VOID_FUTURE; + } + + final SettableFuture ret = SettableFuture.create(); + aborts.onComplete(new OnComplete>() { + @Override + public void onComplete(final Throwable failure, final Iterable objs) { + if (failure != null) { + ret.setException(failure); + } else { + ret.set(null); + } + } + }, ExecutionContexts.global()); + + return ret; } @Override - public ListenableFuture commit() { + public void commit(final FutureCallback callback) { + checkState(State.PRE_COMMIT_COMPLETE); + this.callback = Preconditions.checkNotNull(callback); + state = State.COMMIT_PENDING; + dataTree.startCommit(this, candidate); + } + + private FutureCallback switchState(final State newState) { + @SuppressWarnings("unchecked") + final FutureCallback ret = (FutureCallback) this.callback; + this.callback = null; + LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState); + this.state = newState; + return ret; + } + + void successfulCanCommit() { + switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null); + } + + void failedCanCommit(final Exception cause) { + switchState(State.FAILED).onFailure(cause); + } + + /** + * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that + * any failure to validate is propagated before we record the transaction. + * + * @param candidate {@link DataTreeCandidate} under consideration + * @throws ExecutionException + * @throws TimeoutException + */ + // FIXME: this should be asynchronous + void userPreCommit(final DataTreeCandidate candidate) throws ExecutionException, TimeoutException { + userCohorts.canCommit(candidate); + userCohorts.preCommit(); + } + + void successfulPreCommit(final DataTreeCandidateTip candidate) { + LOG.trace("Transaction {} prepared candidate {}", transaction, candidate); + this.candidate = Verify.verifyNotNull(candidate); + switchState(State.PRE_COMMIT_COMPLETE).onSuccess(candidate); + } + + void failedPreCommit(final Exception cause) { + if (LOG.isTraceEnabled()) { + LOG.trace("Transaction {} failed to prepare", transaction, cause); + } else { + LOG.error("Transaction {} failed to prepare", transactionId, cause); + } + + userCohorts.abort(); + switchState(State.FAILED).onFailure(cause); + } + + void successfulCommit(final UnsignedLong journalIndex) { try { - dataTree.getDataTree().commit(candidate); - } catch (Exception e) { - if(LOG.isTraceEnabled()) { - LOG.trace("Transaction {} failed to commit", transaction, e); - } else { - LOG.error("Transaction failed to commit", e); - } - return Futures.immediateFailedFuture(e); + userCohorts.commit(); + } catch (TimeoutException | ExecutionException e) { + // We are probably dead, depending on what the cohorts end up doing + LOG.error("User cohorts failed to commit", e); } - LOG.trace("Transaction {} committed, proceeding to notify", transaction); - dataTree.notifyListeners(candidate); - return VOID_FUTURE; + switchState(State.COMMITTED).onSuccess(journalIndex); + } + + void failedCommit(final Exception cause) { + if (LOG.isTraceEnabled()) { + LOG.trace("Transaction {} failed to commit", transaction, cause); + } else { + LOG.error("Transaction failed to commit", cause); + } + + userCohorts.abort(); + switchState(State.FAILED).onFailure(cause); + } + + @Override + public State getState() { + return state; + } + + void reportFailure(final Exception cause) { + this.nextFailure = Preconditions.checkNotNull(cause); + } + + @Override + public boolean isFailed() { + return state == State.FAILED || nextFailure != null; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index a42a628331..8469d02795 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -13,10 +13,15 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCanCommit; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCommit; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulPreCommit; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -25,13 +30,15 @@ import akka.japi.Creator; import akka.pattern.Patterns; import akka.testkit.TestActorRef; import akka.util.Timeout; -import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.util.concurrent.Futures; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -41,8 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; @@ -73,6 +78,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; @@ -200,135 +206,77 @@ public abstract class AbstractShardTest extends AbstractActorTest{ Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied)); } - protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName, - final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, - final MutableCompositeModification modification) { - return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null); - } - - protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName, - final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, - final MutableCompositeModification modification, - final Function> preCommit) { - - final ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction(nextTransactionId()); - tx.getSnapshot().write(path, data); - final ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); - - modification.addModification(new WriteModification(path, data)); - - return cohort; - } - - protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName, - final ShardDataTreeCohort actual) { - return createDelegatingMockCohort(cohortName, actual, null); - } + protected TipProducingDataTree createDelegatingMockDataTree() throws Exception { + TipProducingDataTree actual = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION); + final TipProducingDataTree mock = mock(TipProducingDataTree.class); - protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName, - final ShardDataTreeCohort actual, - final Function> preCommit) { - final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName); + doAnswer(invocation -> { + actual.validate(invocation.getArgumentAt(0, DataTreeModification.class)); + return null; + }).when(mock).validate(any(DataTreeModification.class)); - doAnswer(new Answer>() { - @Override - public ListenableFuture answer(final InvocationOnMock invocation) { - return actual.canCommit(); - } - }).when(cohort).canCommit(); + doAnswer(invocation -> { + return actual.prepare(invocation.getArgumentAt(0, DataTreeModification.class)); + }).when(mock).prepare(any(DataTreeModification.class)); - doAnswer(new Answer>() { - @Override - public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { - if(preCommit != null) { - return preCommit.apply(actual); - } else { - return actual.preCommit(); - } - } - }).when(cohort).preCommit(); + doAnswer(invocation -> { + actual.commit(invocation.getArgumentAt(0, DataTreeCandidate.class)); + return null; + }).when(mock).commit(any(DataTreeCandidate.class)); - doAnswer(new Answer>() { - @Override - public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { - return actual.commit(); - } - }).when(cohort).commit(); + doAnswer(invocation -> { + actual.setSchemaContext(invocation.getArgumentAt(0, SchemaContext.class)); + return null; + }).when(mock).setSchemaContext(any(SchemaContext.class)); - doAnswer(new Answer>() { - @Override - public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { - return actual.abort(); - } - }).when(cohort).abort(); + doAnswer(invocation -> { + return actual.takeSnapshot(); + }).when(mock).takeSnapshot(); - doAnswer(new Answer() { - @Override - public DataTreeCandidateTip answer(final InvocationOnMock invocation) { - return actual.getCandidate(); - } - }).when(cohort).getCandidate(); + doAnswer(invocation -> { + return actual.getRootPath(); + }).when(mock).getRootPath(); - return cohort; - } - - protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort, - TransactionIdentifier transactionID, MutableCompositeModification modification, - boolean doCommitOnReady) { - if(remoteReadWriteTransaction){ - return prepareForwardedReadyTransaction(cohort, transactionID, CURRENT_VERSION, - doCommitOnReady); - } else { - setupCohortDecorator(shard, cohort); - return prepareBatchedModifications(transactionID, modification, doCommitOnReady); - } + return mock; } protected ShardDataTreeCohort mockShardDataTreeCohort() { ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); - doReturn(Futures.immediateFuture(null)).when(cohort).commit(); - doReturn(mockCandidate("candidate")).when(cohort).getCandidate(); + DataTreeCandidate candidate = mockCandidate("candidate"); + successfulCanCommit(cohort); + successfulPreCommit(cohort, candidate); + successfulCommit(cohort); + doReturn(candidate).when(cohort).getCandidate(); return cohort; } - static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) { - ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class); - doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class)); - doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class)); - return mockParent; - } - - protected ForwardedReadyTransaction prepareForwardedReadyTransaction(ShardDataTreeCohort cohort, - TransactionIdentifier transactionID, short version, boolean doCommitOnReady) { - return new ForwardedReadyTransaction(transactionID, version, - new ReadWriteShardDataTreeTransaction(newShardDataTreeTransactionParent(cohort), transactionID, - mock(DataTreeModification.class)), doCommitOnReady); - } - - protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort, - TransactionIdentifier transactionID, MutableCompositeModification modification) { - return prepareReadyTransactionMessage(remoteReadWriteTransaction, shard, cohort, transactionID, modification, false); - } + protected Map setupCohortDecorator(final Shard shard, + final TransactionIdentifier... transactionIDs) { + final Map cohortMap = new HashMap<>(); + for(TransactionIdentifier id: transactionIDs) { + cohortMap.put(id, new CapturingShardDataTreeCohort()); + } - protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) { shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() { @Override - public ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual) { + public ShardDataTreeCohort decorate(final Identifier transactionID, final ShardDataTreeCohort actual) { + CapturingShardDataTreeCohort cohort = cohortMap.get(transactionID); + cohort.setDelegate(actual); return cohort; } }); + + return cohortMap; } - protected BatchedModifications prepareBatchedModifications(TransactionIdentifier transactionID, - MutableCompositeModification modification) { + protected BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID, + final MutableCompositeModification modification) { return prepareBatchedModifications(transactionID, modification, false); } - private static BatchedModifications prepareBatchedModifications(TransactionIdentifier transactionID, - MutableCompositeModification modification, - boolean doCommitOnReady) { + protected static BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID, + final MutableCompositeModification modification, + final boolean doCommitOnReady) { final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION); batchedModifications.addModification(modification); batchedModifications.setReady(true); @@ -337,6 +285,21 @@ public abstract class AbstractShardTest extends AbstractActorTest{ return batchedModifications; } + protected static BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID, + final YangInstanceIdentifier path, final NormalizedNode data, final boolean doCommitOnReady) { + final MutableCompositeModification modification = new MutableCompositeModification(); + modification.addModification(new WriteModification(path, data)); + return prepareBatchedModifications(transactionID, modification, doCommitOnReady); + } + + protected static ForwardedReadyTransaction prepareForwardedReadyTransaction(final TestActorRef shard, + final TransactionIdentifier transactionID, final YangInstanceIdentifier path, + final NormalizedNode data, final boolean doCommitOnReady) { + ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore(). + newReadWriteTransaction(transactionID); + rwTx.getSnapshot().write(path, data); + return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady); + } public static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) throws ExecutionException, InterruptedException { @@ -364,9 +327,9 @@ public abstract class AbstractShardTest extends AbstractActorTest{ transaction.getSnapshot().write(id, node); final ShardDataTreeCohort cohort = transaction.ready(); - cohort.canCommit().get(); - cohort.preCommit().get(); - cohort.commit(); + immediateCanCommit(cohort); + immediatePreCommit(cohort); + immediateCommit(cohort); } public void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id, @@ -375,9 +338,9 @@ public abstract class AbstractShardTest extends AbstractActorTest{ transaction.getSnapshot().merge(id, node); final ShardDataTreeCohort cohort = transaction.ready(); - cohort.canCommit().get(); - cohort.preCommit().get(); - cohort.commit(); + immediateCanCommit(cohort); + immediatePreCommit(cohort); + immediateCommit(cohort); } public static void writeToStore(final DataTree store, final YangInstanceIdentifier id, @@ -478,4 +441,94 @@ public abstract class AbstractShardTest extends AbstractActorTest{ return delegate.create(); } } + + public static class CapturingShardDataTreeCohort extends ShardDataTreeCohort { + private volatile ShardDataTreeCohort delegate; + private FutureCallback canCommit; + private FutureCallback preCommit; + private FutureCallback commit; + + public void setDelegate(ShardDataTreeCohort delegate) { + this.delegate = delegate; + } + + public FutureCallback getCanCommit() { + assertNotNull("canCommit was not invoked", canCommit); + return canCommit; + } + + public FutureCallback getPreCommit() { + assertNotNull("preCommit was not invoked", preCommit); + return preCommit; + } + + public FutureCallback getCommit() { + assertNotNull("commit was not invoked", commit); + return commit; + } + + @Override + public TransactionIdentifier getIdentifier() { + return delegate.getIdentifier(); + } + + @Override + DataTreeCandidateTip getCandidate() { + return delegate.getCandidate(); + } + + @Override + DataTreeModification getDataTreeModification() { + return delegate.getDataTreeModification(); + } + + @Override + public void canCommit(FutureCallback callback) { + canCommit = mockFutureCallback(callback); + delegate.canCommit(canCommit); + } + + @Override + public void preCommit(FutureCallback callback) { + preCommit = mockFutureCallback(callback); + delegate.preCommit(preCommit); + } + + @Override + public void commit(FutureCallback callback) { + commit = mockFutureCallback(callback); + delegate.commit(commit); + } + + @SuppressWarnings("unchecked") + private FutureCallback mockFutureCallback(final FutureCallback actual ) { + FutureCallback mock = mock(FutureCallback.class); + doAnswer(invocation -> { + actual.onFailure(invocation.getArgumentAt(0, Throwable.class)); + return null; + }).when(mock).onFailure(any(Throwable.class)); + + doAnswer(invocation -> { + actual.onSuccess((T) invocation.getArgumentAt(0, Throwable.class)); + return null; + }).when(mock).onSuccess((T) any(Object.class)); + + return mock; + } + + @Override + public ListenableFuture abort() { + return delegate.abort(); + } + + @Override + public boolean isFailed() { + return delegate.isFailed(); + } + + @Override + public State getState() { + return delegate.getState(); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java index ac6f8017b8..ffe3226042 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java @@ -47,7 +47,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { private DataChangeListenerSupport support; @Before - public void setup() { + public void setup() throws InterruptedException { shard = createShard(); support = new DataChangeListenerSupport(shard); } @@ -151,8 +151,8 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { listener.verifyCreatedData(0, innerEntryPath(2, "four")); } - private MockDataChangeListener registerChangeListener(YangInstanceIdentifier path, DataChangeScope scope, - int expectedEvents, boolean isLeader) { + private MockDataChangeListener registerChangeListener(final YangInstanceIdentifier path, final DataChangeScope scope, + final int expectedEvents, final boolean isLeader) { MockDataChangeListener listener = new MockDataChangeListener(expectedEvents); ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener)); @@ -162,6 +162,8 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { private Shard createShard() { TestActorRef actor = actorFactory.createTestActor(newShardProps()); + ShardTestKit.waitUntilLeader(actor); + return actor.underlyingActor(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java index 9baea72d8d..a11fc6bb1c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java @@ -113,8 +113,8 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest { listener2.verifyNoNotifiedData(innerEntryPath(2, "three"), innerEntryPath(2, "four")); } - private MockDataTreeChangeListener registerChangeListener(YangInstanceIdentifier path, - int expectedEvents, boolean isLeader) { + private MockDataTreeChangeListener registerChangeListener(final YangInstanceIdentifier path, + final int expectedEvents, final boolean isLeader) { MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents); ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener)); support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true); @@ -123,6 +123,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest { private Shard createShard() { TestActorRef actor = actorFactory.createTestActor(newShardProps()); + ShardTestKit.waitUntilLeader(actor); return actor.underlyingActor(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 1b2657e25f..47cc35937f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -49,7 +49,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; -import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; @@ -79,9 +79,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; public class DistributedDataStoreIntegrationTest { @@ -1158,7 +1160,7 @@ public class DistributedDataStoreIntegrationTest { } @Test - public void testRestoreFromDatastoreSnapshot() throws Exception{ + public void testRestoreFromDatastoreSnapshot() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String name = "transactionIntegrationTest"; @@ -1166,20 +1168,21 @@ public class DistributedDataStoreIntegrationTest { CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); - ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL); + DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); + dataTree.setSchemaContext(SchemaContextHelper.full()); AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); - NormalizedNode root = AbstractShardTest.readStore(dataTree.getDataTree(), - YangInstanceIdentifier.EMPTY); + NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - Snapshot carsSnapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(root).serialize(), + Snapshot carsSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(), Collections.emptyList(), 2, 1, 2, 1, 1, "member-1"); NormalizedNode peopleNode = PeopleModel.create(); - dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL); + dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); + dataTree.setSchemaContext(SchemaContextHelper.full()); AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); - root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.EMPTY); + root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - Snapshot peopleSnapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(root).serialize(), + Snapshot peopleSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(), Collections.emptyList(), 2, 1, 2, 1, 1, "member-1"); restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java new file mode 100644 index 0000000000..dff416258a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; +import org.mockito.InOrder; +import org.mockito.invocation.InvocationOnMock; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +public final class ShardDataTreeMocking { + + private ShardDataTreeMocking() { + throw new UnsupportedOperationException(); + } + + @SuppressWarnings("unchecked") + private static FutureCallback mockCallback() { + return mock(FutureCallback.class); + } + + public static ShardDataTreeCohort immediateCanCommit(final ShardDataTreeCohort cohort) { + final FutureCallback callback = mockCallback(); + doNothing().when(callback).onSuccess(null); + cohort.canCommit(callback); + + verify(callback).onSuccess(null); + verifyNoMoreInteractions(callback); + return cohort; + } + + public static ShardDataTreeCohort immediatePreCommit(final ShardDataTreeCohort cohort) { + final FutureCallback callback = mockCallback(); + doNothing().when(callback).onSuccess(any(DataTreeCandidate.class)); + cohort.preCommit(callback); + + verify(callback).onSuccess(any(DataTreeCandidate.class)); + verifyNoMoreInteractions(callback); + return cohort; + } + + public static ShardDataTreeCohort immediateCommit(final ShardDataTreeCohort cohort) { + final FutureCallback callback = mockCallback(); + doNothing().when(callback).onSuccess(any(UnsignedLong.class)); + cohort.commit(callback); + + verify(callback, timeout(5000)).onSuccess(any(UnsignedLong.class)); + verifyNoMoreInteractions(callback); + return cohort; + } + + @SuppressWarnings("unchecked") + private static Object invokeSuccess(final InvocationOnMock invocation, final T value) { + invocation.getArgumentAt(0, FutureCallback.class).onSuccess(value); + return null; + } + + private static Object invokeFailure(final InvocationOnMock invocation) { + invocation.getArgumentAt(0, FutureCallback.class).onFailure(mock(Exception.class)); + return null; + } + + @SuppressWarnings("unchecked") + public static ShardDataTreeCohort failedCanCommit(final ShardDataTreeCohort mock) { + doAnswer(invocation -> { + return invokeFailure(invocation); + }).when(mock).canCommit(any(FutureCallback.class)); + return mock; + } + + @SuppressWarnings("unchecked") + public static ShardDataTreeCohort failedPreCommit(final ShardDataTreeCohort mock) { + doAnswer(invocation -> { + return invokeFailure(invocation); + }).when(mock).preCommit(any(FutureCallback.class)); + return mock; + } + + @SuppressWarnings("unchecked") + public static ShardDataTreeCohort failedCommit(final ShardDataTreeCohort mock) { + doAnswer(invocation -> { + return invokeFailure(invocation); + }).when(mock).commit(any(FutureCallback.class)); + return mock; + } + + @SuppressWarnings("unchecked") + public static ShardDataTreeCohort successfulCanCommit(final ShardDataTreeCohort mock) { + doAnswer(invocation -> { + return invokeSuccess(invocation, null); + }).when(mock).canCommit(any(FutureCallback.class)); + + return mock; + } + + public static ShardDataTreeCohort successfulPreCommit(final ShardDataTreeCohort mock) { + return successfulPreCommit(mock, mock(DataTreeCandidate.class)); + } + + @SuppressWarnings("unchecked") + public static ShardDataTreeCohort successfulPreCommit(final ShardDataTreeCohort mock, final DataTreeCandidate candidate) { + doAnswer(invocation -> { + return invokeSuccess(invocation, candidate); + }).when(mock).preCommit(any(FutureCallback.class)); + + return mock; + } + + public static ShardDataTreeCohort successfulCommit(final ShardDataTreeCohort mock) { + return successfulCommit(mock, UnsignedLong.ZERO); + } + + @SuppressWarnings("unchecked") + public static ShardDataTreeCohort successfulCommit(final ShardDataTreeCohort mock, final UnsignedLong index) { + doAnswer(invocation -> { + return invokeSuccess(invocation, index); + }).when(mock).commit(any(FutureCallback.class)); + + return mock; + } + + @SuppressWarnings("unchecked") + public static void assertSequencedCommit(final ShardDataTreeCohort mock) { + final InOrder inOrder = inOrder(mock); + inOrder.verify(mock).canCommit(any(FutureCallback.class)); + inOrder.verify(mock).preCommit(any(FutureCallback.class)); + inOrder.verify(mock).commit(any(FutureCallback.class)); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java index d35b2dbc34..5d27224398 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java @@ -10,13 +10,20 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.doReturn; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit; +import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit; import com.google.common.base.Optional; +import com.google.common.base.Ticker; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -30,31 +37,38 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class ShardDataTreeTest extends AbstractTest { - SchemaContext fullSchema; + private final Shard mockShard = Mockito.mock(Shard.class); + + + private SchemaContext fullSchema; @Before - public void setUp(){ + public void setUp() { + doReturn(true).when(mockShard).canSkipPayload(); + doReturn(Ticker.systemTicker()).when(mockShard).ticker(); + doReturn(Mockito.mock(ShardStats.class)).when(mockShard).getShardMBean(); + fullSchema = SchemaContextHelper.full(); } @Test public void testWrite() throws ExecutionException, InterruptedException { - modify(new ShardDataTree(fullSchema, TreeType.OPERATIONAL), false, true, true); + modify(new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL), false, true, true); } @Test public void testMerge() throws ExecutionException, InterruptedException { - modify(new ShardDataTree(fullSchema, TreeType.OPERATIONAL), true, true, true); + modify(new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL), true, true, true); } - private void modify(ShardDataTree shardDataTree, boolean merge, boolean expectedCarsPresent, boolean expectedPeoplePresent) throws ExecutionException, InterruptedException { + private void modify(final ShardDataTree shardDataTree, final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) throws ExecutionException, InterruptedException { assertEquals(fullSchema, shardDataTree.getSchemaContext()); - ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId()); + final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId()); - DataTreeModification snapshot = transaction.getSnapshot(); + final DataTreeModification snapshot = transaction.getSnapshot(); assertNotNull(snapshot); @@ -66,21 +80,21 @@ public class ShardDataTreeTest extends AbstractTest { snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()); } - ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); - - cohort.preCommit().get(); - cohort.commit().get(); + final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); + immediateCanCommit(cohort); + immediatePreCommit(cohort); + immediateCommit(cohort); - ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId()); + final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId()); - DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot(); + final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot(); - Optional> optional = snapshot1.readNode(CarsModel.BASE_PATH); + final Optional> optional = snapshot1.readNode(CarsModel.BASE_PATH); assertEquals(expectedCarsPresent, optional.isPresent()); - Optional> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH); + final Optional> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH); assertEquals(expectedPeoplePresent, optional1.isPresent()); @@ -88,52 +102,52 @@ public class ShardDataTreeTest extends AbstractTest { @Test public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException { - ShardDataTree shardDataTree = new ShardDataTree(fullSchema, TreeType.OPERATIONAL); + final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL); - List candidates = new ArrayList<>(); + final List candidates = new ArrayList<>(); candidates.add(addCar(shardDataTree)); candidates.add(removeCar(shardDataTree)); - NormalizedNode expected = getCars(shardDataTree); + final NormalizedNode expected = getCars(shardDataTree); applyCandidates(shardDataTree, candidates); - NormalizedNode actual = getCars(shardDataTree); + final NormalizedNode actual = getCars(shardDataTree); assertEquals(expected, actual); } @Test public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException { - ShardDataTree shardDataTree = new ShardDataTree(fullSchema, TreeType.OPERATIONAL); + final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL); - List candidates = new ArrayList<>(); + final List candidates = new ArrayList<>(); candidates.add(addCar(shardDataTree)); candidates.add(removeCar(shardDataTree)); candidates.add(addCar(shardDataTree)); candidates.add(removeCar(shardDataTree)); - NormalizedNode expected = getCars(shardDataTree); + final NormalizedNode expected = getCars(shardDataTree); applyCandidates(shardDataTree, candidates); - NormalizedNode actual = getCars(shardDataTree); + final NormalizedNode actual = getCars(shardDataTree); assertEquals(expected, actual); } - private static NormalizedNode getCars(ShardDataTree shardDataTree) { - ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId()); - DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot(); + private static NormalizedNode getCars(final ShardDataTree shardDataTree) { + final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId()); + final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot(); - Optional> optional = snapshot1.readNode(CarsModel.BASE_PATH); + final Optional> optional = snapshot1.readNode(CarsModel.BASE_PATH); assertEquals(true, optional.isPresent()); return optional.get(); } - private static DataTreeCandidateTip addCar(ShardDataTree shardDataTree) throws ExecutionException, InterruptedException { + private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree) throws ExecutionException, InterruptedException { return doTransaction(shardDataTree, snapshot -> { snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); @@ -141,7 +155,7 @@ public class ShardDataTreeTest extends AbstractTest { }); } - private static DataTreeCandidateTip removeCar(ShardDataTree shardDataTree) throws ExecutionException, InterruptedException { + private static DataTreeCandidateTip removeCar(final ShardDataTree shardDataTree) throws ExecutionException, InterruptedException { return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima"))); } @@ -150,34 +164,34 @@ public class ShardDataTreeTest extends AbstractTest { void execute(DataTreeModification snapshot); } - private static DataTreeCandidateTip doTransaction(ShardDataTree shardDataTree, DataTreeOperation operation) + private static DataTreeCandidateTip doTransaction(final ShardDataTree shardDataTree, final DataTreeOperation operation) throws ExecutionException, InterruptedException { - ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId()); - DataTreeModification snapshot = transaction.getSnapshot(); + final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId()); + final DataTreeModification snapshot = transaction.getSnapshot(); operation.execute(snapshot); - ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); + final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); - cohort.canCommit().get(); - cohort.preCommit().get(); - DataTreeCandidateTip candidate = cohort.getCandidate(); - cohort.commit().get(); + immediateCanCommit(cohort); + immediatePreCommit(cohort); + final DataTreeCandidateTip candidate = cohort.getCandidate(); + immediateCommit(cohort); return candidate; } - private static DataTreeCandidateTip applyCandidates(ShardDataTree shardDataTree, List candidates) + private static DataTreeCandidateTip applyCandidates(final ShardDataTree shardDataTree, final List candidates) throws ExecutionException, InterruptedException { - ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId()); - DataTreeModification snapshot = transaction.getSnapshot(); - for(DataTreeCandidateTip candidateTip : candidates){ + final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId()); + final DataTreeModification snapshot = transaction.getSnapshot(); + for(final DataTreeCandidateTip candidateTip : candidates){ DataTreeCandidates.applyToModification(snapshot, candidateTip); } - ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); + final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); - cohort.canCommit().get(); - cohort.preCommit().get(); - DataTreeCandidateTip candidate = cohort.getCandidate(); - cohort.commit().get(); + immediateCanCommit(cohort); + immediatePreCommit(cohort); + final DataTreeCandidateTip candidate = cohort.getCandidate(); + immediateCommit(cohort); return candidate; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java index 1ceb2c7dad..acac104c79 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java @@ -14,6 +14,7 @@ import com.google.common.base.Optional; import java.io.IOException; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; @@ -42,7 +43,9 @@ public class ShardRecoveryCoordinatorTest extends AbstractTest { peopleSchemaContext = SchemaContextHelper.select(SchemaContextHelper.PEOPLE_YANG); carsSchemaContext = SchemaContextHelper.select(SchemaContextHelper.CARS_YANG); - peopleDataTree = new ShardDataTree(peopleSchemaContext, TreeType.OPERATIONAL); + final Shard mockShard = Mockito.mock(Shard.class); + + peopleDataTree = new ShardDataTree(mockShard, peopleSchemaContext, TreeType.OPERATIONAL); } @Deprecated diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 8f7dd8e5b7..d4dcc9cda2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -14,11 +14,10 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -31,14 +30,13 @@ import akka.pattern.Patterns; import akka.persistence.SaveSnapshotSuccess; import akka.testkit.TestActorRef; import akka.util.Timeout; -import com.google.common.base.Function; import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -64,6 +62,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; @@ -75,11 +74,10 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeCh import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; @@ -90,6 +88,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; @@ -101,6 +100,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -112,11 +112,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; @@ -411,7 +410,8 @@ public class ShardTest extends AbstractShardTest { @Test public void testApplySnapshot() throws Exception { - final TestActorRef shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot"); + final TestActorRef shard = actorFactory.createTestActor(newShardProps(). + withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot"); ShardTestKit.waitUntilLeader(shard); @@ -428,14 +428,24 @@ public class ShardTest extends AbstractShardTest { final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY; final NormalizedNode expected = readStore(store, root); - final Snapshot snapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(expected).serialize(), + final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(), Collections.emptyList(), 1, 2, 3, 4); - shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState()); + shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender()); - final NormalizedNode actual = readStore(shard, root); + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS); - assertEquals("Root node", expected, actual); + try { + assertEquals("Root node", expected, readStore(shard, root)); + return; + } catch(AssertionError e) { + // try again + } + } + + fail("Snapshot was not applied"); } @Test @@ -518,35 +528,22 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - - final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification1 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - final TransactionIdentifier transactionID2 = nextTransactionId(); - final MutableCompositeModification modification2 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), - modification2); - final TransactionIdentifier transactionID3 = nextTransactionId(); - final MutableCompositeModification modification3 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), - modification3); + + Map cohortMap = setupCohortDecorator( + shard.underlyingActor(), transactionID1, transactionID2, transactionID3); + final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1); + final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2); + final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3); final long timeoutSec = 5; final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); final Timeout timeout = new Timeout(duration); - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); + shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.class)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -558,10 +555,15 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); + // Ready 2 more Tx's. + + shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); + shard.tell(prepareBatchedModifications(transactionID3, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and @@ -655,16 +657,18 @@ public class ShardTest extends AbstractShardTest { assertEquals("Commits complete", true, done); - final InOrder inOrder = inOrder(cohort1, cohort2, cohort3); - inOrder.verify(cohort1).canCommit(); - inOrder.verify(cohort1).preCommit(); - inOrder.verify(cohort1).commit(); - inOrder.verify(cohort2).canCommit(); - inOrder.verify(cohort2).preCommit(); - inOrder.verify(cohort2).commit(); - inOrder.verify(cohort3).canCommit(); - inOrder.verify(cohort3).preCommit(); - inOrder.verify(cohort3).commit(); + final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(), + cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(), + cohort3.getPreCommit(), cohort3.getCommit()); + inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class)); + inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class)); + inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class)); + inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class)); + inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class)); + inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class)); + inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class)); + inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class)); + inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class)); // Verify data in the data store. @@ -686,17 +690,6 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier transactionID = nextTransactionId(); final FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); - final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> { - if(mockCohort.get() == null) { - mockCohort.set(createDelegatingMockCohort("cohort", actual)); - } - - return mockCohort.get(); - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - // Send a BatchedModifications to start a transaction. shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, @@ -721,16 +714,11 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send the CanCommitTransaction message. + // Send the CommitTransaction message. shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, CommitTransactionReply.class); - final InOrder inOrder = inOrder(mockCohort.get()); - inOrder.verify(mockCohort.get()).canCommit(); - inOrder.verify(mockCohort.get()).preCommit(); - inOrder.verify(mockCohort.get()).commit(); - // Verify data in the data store. verifyOuterListEntry(shard, 1); @@ -749,17 +737,6 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier transactionID = nextTransactionId(); final FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); - final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> { - if(mockCohort.get() == null) { - mockCohort.set(createDelegatingMockCohort("cohort", actual)); - } - - return mockCohort.get(); - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - // Send a BatchedModifications to start a transaction. shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, @@ -778,11 +755,6 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CommitTransactionReply.class); - final InOrder inOrder = inOrder(mockCohort.get()); - inOrder.verify(mockCohort.get()).canCommit(); - inOrder.verify(mockCohort.get()).preCommit(); - inOrder.verify(mockCohort.get()).commit(); - // Verify data in the data store. verifyOuterListEntry(shard, 1); @@ -956,8 +928,8 @@ public class ShardTest extends AbstractShardTest { Failure failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); - shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), txId, - DataStoreVersions.CURRENT_VERSION, true), getRef()); + shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); @@ -985,24 +957,16 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - final TransactionIdentifier transactionID = nextTransactionId(); - final MutableCompositeModification modification = new MutableCompositeModification(); final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, - TestModel.TEST_PATH, containerNode, modification); - - final FiniteDuration duration = duration("5 seconds"); - - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef()); - - expectMsgClass(duration, CommitTransactionReply.class); + if(readWrite) { + shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, + containerNode, true), getRef()); + } else { + shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef()); + } - final InOrder inOrder = inOrder(cohort); - inOrder.verify(cohort).canCommit(); - inOrder.verify(cohort).preCommit(); - inOrder.verify(cohort).commit(); + expectMsgClass(duration("5 seconds"), CommitTransactionReply.class); final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); @@ -1085,36 +1049,21 @@ public class ShardTest extends AbstractShardTest { @Test public void testReadWriteCommitWithPersistenceDisabled() throws Throwable { - testCommitWithPersistenceDisabled(true); - } - - @Test - public void testWriteOnlyCommitWithPersistenceDisabled() throws Throwable { - testCommitWithPersistenceDisabled(true); - } - - private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable { dataStoreContextBuilder.persistent(false); new ShardTestKit(getSystem()) {{ final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWithPersistenceDisabled-" + readWrite); + "testCommitWithPersistenceDisabled"); waitUntilLeader(shard); - final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - // Setup a simulated transactions with a mock cohort. - final TransactionIdentifier transactionID = nextTransactionId(); - final MutableCompositeModification modification = new MutableCompositeModification(); - final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, - TestModel.TEST_PATH, containerNode, modification); - final FiniteDuration duration = duration("5 seconds"); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); + final TransactionIdentifier transactionID = nextTransactionId(); + final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1129,11 +1078,6 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, CommitTransactionReply.class); - final InOrder inOrder = inOrder(cohort); - inOrder.verify(cohort).canCommit(); - inOrder.verify(cohort).preCommit(); - inOrder.verify(cohort).commit(); - final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); }}; @@ -1152,153 +1096,134 @@ public class ShardTest extends AbstractShardTest { private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){ // Note that persistence is enabled which would normally result in the entry getting written to the journal // but here that need not happen - new ShardTestKit(getSystem()) { - { - final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWhenTransactionHasNoModifications-" + readWrite); + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = actorFactory.createTestActor( + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCommitWhenTransactionHasNoModifications-" + readWrite); - waitUntilLeader(shard); + waitUntilLeader(shard); - final TransactionIdentifier transactionID = nextTransactionId(); - final MutableCompositeModification modification = new MutableCompositeModification(); - final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); - doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate(); + final TransactionIdentifier transactionID = nextTransactionId(); - final FiniteDuration duration = duration("5 seconds"); + final FiniteDuration duration = duration("5 seconds"); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); + if(readWrite) { + ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore(). + newReadWriteTransaction(transactionID); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef()); + } else { + shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()), getRef()); + } - // Send the CanCommitTransaction message. + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); - final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.class)); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); + // Send the CanCommitTransaction message. - shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.class); + shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.class)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); - final InOrder inOrder = inOrder(cohort); - inOrder.verify(cohort).canCommit(); - inOrder.verify(cohort).preCommit(); - inOrder.verify(cohort).commit(); + shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); - shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); - final ShardStats shardStats = expectMsgClass(duration, ShardStats.class); + shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); + final ShardStats shardStats = expectMsgClass(duration, ShardStats.class); - // Use MBean for verification - // Committed transaction count should increase as usual - assertEquals(1,shardStats.getCommittedTransactionsCount()); + // Use MBean for verification + // Committed transaction count should increase as usual + assertEquals(1,shardStats.getCommittedTransactionsCount()); - // Commit index should not advance because this does not go into the journal - assertEquals(-1, shardStats.getCommitIndex()); - } - }; + // Commit index should not advance because this does not go into the journal + assertEquals(-1, shardStats.getCommitIndex()); + }}; } @Test - public void testReadWriteCommitWhenTransactionHasModifications() { + public void testReadWriteCommitWhenTransactionHasModifications() throws Exception { testCommitWhenTransactionHasModifications(true); } @Test - public void testWriteOnlyCommitWhenTransactionHasModifications() { + public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception { testCommitWhenTransactionHasModifications(false); } - private void testCommitWhenTransactionHasModifications(final boolean readWrite){ - new ShardTestKit(getSystem()) { - { - final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWhenTransactionHasModifications-" + readWrite); + private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception { + new ShardTestKit(getSystem()) {{ + final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final TestActorRef shard = actorFactory.createTestActor( + newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCommitWhenTransactionHasModifications-" + readWrite); - waitUntilLeader(shard); + waitUntilLeader(shard); - final TransactionIdentifier transactionID = nextTransactionId(); - final MutableCompositeModification modification = new MutableCompositeModification(); - modification.addModification(new DeleteModification(YangInstanceIdentifier.EMPTY)); - final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); - doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate(); + final FiniteDuration duration = duration("5 seconds"); + final TransactionIdentifier transactionID = nextTransactionId(); - final FiniteDuration duration = duration("5 seconds"); + if(readWrite) { + shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); + } else { + shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); + } - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); + expectMsgClass(duration, ReadyTransactionReply.class); - // Send the CanCommitTransaction message. + // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); - final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.class)); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); + shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.class)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); - shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.class); + shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); - final InOrder inOrder = inOrder(cohort); - inOrder.verify(cohort).canCommit(); - inOrder.verify(cohort).preCommit(); - inOrder.verify(cohort).commit(); + final InOrder inOrder = inOrder(dataTree); + inOrder.verify(dataTree).validate(any(DataTreeModification.class)); + inOrder.verify(dataTree).prepare(any(DataTreeModification.class)); + inOrder.verify(dataTree).commit(any(DataTreeCandidate.class)); - shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); - final ShardStats shardStats = expectMsgClass(duration, ShardStats.class); + shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); + final ShardStats shardStats = expectMsgClass(duration, ShardStats.class); - // Use MBean for verification - // Committed transaction count should increase as usual - assertEquals(1, shardStats.getCommittedTransactionsCount()); + // Use MBean for verification + // Committed transaction count should increase as usual + assertEquals(1, shardStats.getCommittedTransactionsCount()); - // Commit index should advance as we do not have an empty modification - assertEquals(0, shardStats.getCommitIndex()); - } - }; + // Commit index should advance as we do not have an empty modification + assertEquals(0, shardStats.getCommitIndex()); + }}; } @Test public void testCommitPhaseFailure() throws Throwable { - testCommitPhaseFailure(true); - testCommitPhaseFailure(false); - } - - private void testCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ + final TipProducingDataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitPhaseFailure-" + readWrite); + newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCommitPhaseFailure"); waitUntilLeader(shard); + final FiniteDuration duration = duration("5 seconds"); + final Timeout timeout = new Timeout(duration); + // Setup 2 simulated transactions with mock cohorts. The first one fails in the // commit phase. - final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification1 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); - doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); - doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit(); - doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate(); - - final TransactionIdentifier transactionID2 = nextTransactionId(); - final MutableCompositeModification modification2 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); - - final FiniteDuration duration = duration("5 seconds"); - final Timeout timeout = new Timeout(duration); + doThrow(new RuntimeException("mock commit failure")).when(dataTree).commit(any(DataTreeCandidate.class)); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); + final TransactionIdentifier transactionID1 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); + final TransactionIdentifier transactionID2 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1332,46 +1257,37 @@ public class ShardTest extends AbstractShardTest { assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS)); - final InOrder inOrder = inOrder(cohort1, cohort2); - inOrder.verify(cohort1).canCommit(); - inOrder.verify(cohort1).preCommit(); - inOrder.verify(cohort1).commit(); - inOrder.verify(cohort2).canCommit(); + final InOrder inOrder = inOrder(dataTree); + inOrder.verify(dataTree).validate(any(DataTreeModification.class)); + inOrder.verify(dataTree).prepare(any(DataTreeModification.class)); + inOrder.verify(dataTree).commit(any(DataTreeCandidate.class)); + inOrder.verify(dataTree).validate(any(DataTreeModification.class)); }}; } @Test public void testPreCommitPhaseFailure() throws Throwable { - testPreCommitPhaseFailure(true); - testPreCommitPhaseFailure(false); - } - - private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ + final TipProducingDataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testPreCommitPhaseFailure-" + readWrite); + newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testPreCommitPhaseFailure"); waitUntilLeader(shard); - final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification1 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); - doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit(); - - final TransactionIdentifier transactionID2 = nextTransactionId(); - final MutableCompositeModification modification2 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); - final FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); + doThrow(new RuntimeException("mock preCommit failure")).when(dataTree).prepare(any(DataTreeModification.class)); + + final TransactionIdentifier transactionID1 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); + final TransactionIdentifier transactionID2 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1405,35 +1321,31 @@ public class ShardTest extends AbstractShardTest { assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS)); - final InOrder inOrder = inOrder(cohort1, cohort2); - inOrder.verify(cohort1).canCommit(); - inOrder.verify(cohort1).preCommit(); - inOrder.verify(cohort2).canCommit(); + final InOrder inOrder = inOrder(dataTree); + inOrder.verify(dataTree).validate(any(DataTreeModification.class)); + inOrder.verify(dataTree).prepare(any(DataTreeModification.class)); + inOrder.verify(dataTree).validate(any(DataTreeModification.class)); }}; } @Test public void testCanCommitPhaseFailure() throws Throwable { - testCanCommitPhaseFailure(true); - testCanCommitPhaseFailure(false); - } - - private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ + final TipProducingDataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCanCommitPhaseFailure-" + readWrite); + newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCanCommitPhaseFailure"); waitUntilLeader(shard); final FiniteDuration duration = duration("5 seconds"); - final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification = new MutableCompositeModification(); - final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); - doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef()); + doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")). + doNothing().when(dataTree).validate(any(DataTreeModification.class)); + + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1443,12 +1355,9 @@ public class ShardTest extends AbstractShardTest { // Send another can commit to ensure the failed one got cleaned up. - reset(cohort); - final TransactionIdentifier transactionID2 = nextTransactionId(); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef()); + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); @@ -1458,54 +1367,6 @@ public class ShardTest extends AbstractShardTest { }}; } - @Test - public void testCanCommitPhaseFalseResponse() throws Throwable { - testCanCommitPhaseFalseResponse(true); - testCanCommitPhaseFalseResponse(false); - } - - private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { - new ShardTestKit(getSystem()) {{ - final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCanCommitPhaseFalseResponse-" + readWrite); - - waitUntilLeader(shard); - - final FiniteDuration duration = duration("5 seconds"); - - final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification = new MutableCompositeModification(); - final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); - doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); - - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); - - // Send the CanCommitTransaction message. - - shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); - CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(CanCommitTransactionReply.class)); - assertEquals("getCanCommit", false, reply.getCanCommit()); - - // Send another can commit to ensure the failed one got cleaned up. - - reset(cohort); - - final TransactionIdentifier transactionID2 = nextTransactionId(); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); - - shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); - reply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(CanCommitTransactionReply.class)); - assertEquals("getCanCommit", true, reply.getCanCommit()); - }}; - } - @Test public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable { testImmediateCommitWithCanCommitPhaseFailure(true); @@ -1514,137 +1375,81 @@ public class ShardTest extends AbstractShardTest { private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ + final TipProducingDataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite); waitUntilLeader(shard); + doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")). + doNothing().when(dataTree).validate(any(DataTreeModification.class)); + final FiniteDuration duration = duration("5 seconds"); final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification = new MutableCompositeModification(); - final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); - doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef()); + if(readWrite) { + shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + } else { + shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + } expectMsgClass(duration, akka.actor.Status.Failure.class); // Send another can commit to ensure the failed one got cleaned up. - reset(cohort); - final TransactionIdentifier transactionID2 = nextTransactionId(); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); - doReturn(Futures.immediateFuture(null)).when(cohort).commit(); - final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); - final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); - doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); - doReturn(candidateRoot).when(candidate).getRootNode(); - doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath(); - doReturn(candidate).when(cohort).getCandidate(); - - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); + if(readWrite) { + shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + } else { + shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + } expectMsgClass(duration, CommitTransactionReply.class); }}; } + @SuppressWarnings("serial") @Test - public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable { - testImmediateCommitWithCanCommitPhaseFalseResponse(true); - testImmediateCommitWithCanCommitPhaseFalseResponse(false); - } - - private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { + public void testAbortWithCommitPending() throws Throwable { new ShardTestKit(getSystem()) {{ - final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite); - - waitUntilLeader(shard); - - final FiniteDuration duration = duration("5 seconds"); - - final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification = new MutableCompositeModification(); - final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); - doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); - - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef()); - - expectMsgClass(duration, akka.actor.Status.Failure.class); - - // Send another can commit to ensure the failed one got cleaned up. - - reset(cohort); - - final TransactionIdentifier transactionID2 = nextTransactionId(); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); - doReturn(Futures.immediateFuture(null)).when(cohort).commit(); - final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); - final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); - doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); - doReturn(candidateRoot).when(candidate).getRootNode(); - doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath(); - doReturn(candidate).when(cohort).getCandidate(); - - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); - - expectMsgClass(duration, CommitTransactionReply.class); - }}; - } + final Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new Shard(newShardBuilder()) { + @Override + void persistPayload(final TransactionIdentifier transactionId, final Payload payload) { + // Simulate an AbortTransaction message occurring during replication, after + // persisting and before finishing the commit to the in-memory store. - @Test - public void testAbortBeforeFinishCommit() throws Throwable { - testAbortBeforeFinishCommit(true); - testAbortBeforeFinishCommit(false); - } + doAbortTransaction(transactionId, null); + super.persistPayload(transactionId, payload); + } + }; + } + }; - private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable { - new ShardTestKit(getSystem()) {{ final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testAbortBeforeFinishCommit-" + readWrite); + Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), + "testAbortWithCommitPending"); waitUntilLeader(shard); final FiniteDuration duration = duration("5 seconds"); - final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); final TransactionIdentifier transactionID = nextTransactionId(); - final Function> preCommit = - cohort -> { - final ListenableFuture preCommitFuture = cohort.preCommit(); - - // Simulate an AbortTransaction message occurring during replication, after - // persisting and before finishing the commit to the in-memory store. - // We have no followers so due to optimizations in the RaftActor, it does not - // attempt replication and thus we can't send an AbortTransaction message b/c - // it would be processed too late after CommitTransaction completes. So we'll - // simulate an AbortTransaction message occurring during replication by calling - // the shard directly. - // - shard.underlyingActor().doAbortTransaction(transactionID, null); - - return preCommitFuture; - }; - - final MutableCompositeModification modification = new MutableCompositeModification(); - final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), - modification, preCommit); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); + shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); - final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.class)); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); + expectMsgClass(duration, CanCommitTransactionReply.class); shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, CommitTransactionReply.class); @@ -1660,55 +1465,33 @@ public class ShardTest extends AbstractShardTest { @Test public void testTransactionCommitTimeout() throws Throwable { - testTransactionCommitTimeout(true); - testTransactionCommitTimeout(false); - } - - private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); - new ShardTestKit(getSystem()) {{ final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testTransactionCommitTimeout-" + readWrite); + "testTransactionCommitTimeout"); waitUntilLeader(shard); final FiniteDuration duration = duration("5 seconds"); - final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); writeToStore(shard, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - // Create 1st Tx - will timeout + // Ready 2 Tx's - the first will timeout final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification1 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), - modification1); - - // Create 2nd Tx + shard.tell(prepareBatchedModifications(transactionID1, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); final TransactionIdentifier transactionID2 = nextTransactionId(); - final MutableCompositeModification modification2 = new MutableCompositeModification(); final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, - listNodePath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), - modification2); - - // Ready the Tx's - - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); - - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); + shard.tell(prepareBatchedModifications(transactionID2, listNodePath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1736,71 +1519,73 @@ public class ShardTest extends AbstractShardTest { }}; } - @Test - public void testTransactionCommitQueueCapacityExceeded() throws Throwable { - dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2); - - new ShardTestKit(getSystem()) {{ - final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testTransactionCommitQueueCapacityExceeded"); - - waitUntilLeader(shard); - - final FiniteDuration duration = duration("5 seconds"); - - final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - - final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification1 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - - final TransactionIdentifier transactionID2 = nextTransactionId(); - final MutableCompositeModification modification2 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), - modification2); - - final TransactionIdentifier transactionID3 = nextTransactionId(); - final MutableCompositeModification modification3 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); - - // Ready the Tx's - - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); - - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); - - // The 3rd Tx should exceed queue capacity and fail. - - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); - expectMsgClass(duration, akka.actor.Status.Failure.class); - - // canCommit 1st Tx. - - shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); - expectMsgClass(duration, CanCommitTransactionReply.class); - - // canCommit the 2nd Tx - it should get queued. - - shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); - - // canCommit the 3rd Tx - should exceed queue capacity and fail. - - shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef()); - expectMsgClass(duration, akka.actor.Status.Failure.class); - }}; - } +// @Test +// @Ignore +// public void testTransactionCommitQueueCapacityExceeded() throws Throwable { +// dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2); +// +// new ShardTestKit(getSystem()) {{ +// final TestActorRef shard = actorFactory.createTestActor( +// newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), +// "testTransactionCommitQueueCapacityExceeded"); +// +// waitUntilLeader(shard); +// +// final FiniteDuration duration = duration("5 seconds"); +// +// final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); +// +// final TransactionIdentifier transactionID1 = nextTransactionId(); +// final MutableCompositeModification modification1 = new MutableCompositeModification(); +// final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, +// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1, +// modification1); +// +// final TransactionIdentifier transactionID2 = nextTransactionId(); +// final MutableCompositeModification modification2 = new MutableCompositeModification(); +// final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, +// TestModel.OUTER_LIST_PATH, +// ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2, +// modification2); +// +// final TransactionIdentifier transactionID3 = nextTransactionId(); +// final MutableCompositeModification modification3 = new MutableCompositeModification(); +// final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, +// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3, +// modification3); +// +// // Ready the Tx's +// +// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); +// expectMsgClass(duration, ReadyTransactionReply.class); +// +// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); +// expectMsgClass(duration, ReadyTransactionReply.class); +// +// // The 3rd Tx should exceed queue capacity and fail. +// +// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); +// expectMsgClass(duration, akka.actor.Status.Failure.class); +// +// // canCommit 1st Tx. +// +// shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); +// expectMsgClass(duration, CanCommitTransactionReply.class); +// +// // canCommit the 2nd Tx - it should get queued. +// +// shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); +// +// // canCommit the 3rd Tx - should exceed queue capacity and fail. +// +// shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef()); +// expectMsgClass(duration, akka.actor.Status.Failure.class); +// }}; +// } @Test public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable { - dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); - + dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), @@ -1810,30 +1595,19 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification1 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); final TransactionIdentifier transactionID2 = nextTransactionId(); - final MutableCompositeModification modification2 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); - - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); final TransactionIdentifier transactionID3 = nextTransactionId(); - final MutableCompositeModification modification3 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3); - - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); + shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // All Tx's are readied. We'll send canCommit for the last one but not the others. The others @@ -1846,8 +1620,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable { - dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); - + dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), @@ -1860,14 +1633,11 @@ public class ShardTest extends AbstractShardTest { final ShardDataTree dataStore = shard.underlyingActor().getDataStore(); final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification1 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); + shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - // CanCommit the first one so it's the current in-progress CohortEntry. + // CanCommit the first Tx so it's the current in-progress Tx. shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, CanCommitTransactionReply.class); @@ -1875,11 +1645,8 @@ public class ShardTest extends AbstractShardTest { // Ready the second Tx. final TransactionIdentifier transactionID2 = nextTransactionId(); - final MutableCompositeModification modification2 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); - - shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); + shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Ready the third Tx. @@ -1888,9 +1655,8 @@ public class ShardTest extends AbstractShardTest { final DataTreeModification modification3 = dataStore.newModification(); new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME)) .apply(modification3); - modification3.ready(); + modification3.ready(); final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true); - shard.tell(readyMessage, getRef()); // Commit the first Tx. After completing, the second should expire from the queue and the third @@ -1921,45 +1687,33 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testAbortCurrentTransaction() throws Throwable { - testAbortCurrentTransaction(true); - testAbortCurrentTransaction(false); - } - - private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable { + public void testAbortAfterCanCommit() throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testAbortCurrentTransaction-" + readWrite); + "testAbortAfterCanCommit"); waitUntilLeader(shard); - // Setup 2 simulated transactions with mock cohorts. The first one will be aborted. - - final TransactionIdentifier transactionID1 = nextTransactionId(); - final MutableCompositeModification modification1 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); - doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); - - final TransactionIdentifier transactionID2 = nextTransactionId(); - final MutableCompositeModification modification2 = new MutableCompositeModification(); - final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); - final FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); + // Ready 2 transactions - the first one will be aborted. + + final TransactionIdentifier transactionID1 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); + final TransactionIdentifier transactionID2 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); - final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); @@ -1977,78 +1731,101 @@ public class ShardTest extends AbstractShardTest { // Wait for the 2nd Tx to complete the canCommit phase. - Await.ready(canCommitFuture, duration); - - final InOrder inOrder = inOrder(cohort1, cohort2); - inOrder.verify(cohort1).canCommit(); - inOrder.verify(cohort2).canCommit(); + canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); }}; } @Test - public void testAbortQueuedTransaction() throws Throwable { - testAbortQueuedTransaction(true); - testAbortQueuedTransaction(false); - } - - private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable { + public void testAbortAfterReady() throws Throwable { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ - final AtomicReference cleaupCheckLatch = new AtomicReference<>(); - @SuppressWarnings("serial") - final Creator creator = () -> new Shard(newShardBuilder()) { - @Override - public void handleCommand(final Object message) { - super.handleCommand(message); - if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) { - if(cleaupCheckLatch.get() != null) { - cleaupCheckLatch.get().countDown(); - } - } - } - }; - final TestActorRef shard = actorFactory.createTestActor( - Props.create(new DelegatingShardCreator(creator)).withDispatcher( - Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite); + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady"); waitUntilLeader(shard); - final TransactionIdentifier transactionID = nextTransactionId(); - final MutableCompositeModification modification = new MutableCompositeModification(); - final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort"); - doReturn(Futures.immediateFuture(null)).when(cohort).abort(); - final FiniteDuration duration = duration("5 seconds"); - // Ready the tx. + // Ready a tx. - shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); + final TransactionIdentifier transactionID1 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize()); - // Send the AbortTransaction message. - shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, AbortTransactionReply.class); - verify(cohort).abort(); - - // Verify the tx cohort is removed from queue at the cleanup check interval. - - cleaupCheckLatch.set(new CountDownLatch(1)); - assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true, - cleaupCheckLatch.get().await(5, TimeUnit.SECONDS)); - assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize()); // Now send CanCommitTransaction - should fail. - shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); - + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause(); assertTrue("Failure type", failure instanceof IllegalStateException); + + // Ready and CanCommit another and verify success. + + final TransactionIdentifier transactionID2 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.class); + }}; + } + + @Test + public void testAbortQueuedTransaction() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = actorFactory.createTestActor( + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + // Ready 3 tx's. + + final TransactionIdentifier transactionID1 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + final TransactionIdentifier transactionID2 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + final TransactionIdentifier transactionID3 = nextTransactionId(); + shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // Abort the second tx while it's queued. + + shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, AbortTransactionReply.class); + + // Commit the other 2. + + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.class); + + shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); + + shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.class); + + shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); + + assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize()); }}; } @@ -2084,7 +1861,7 @@ public class ShardTest extends AbstractShardTest { new ShardTestKit(getSystem()) {{ class TestShard extends Shard { - protected TestShard(AbstractBuilder builder) { + protected TestShard(final AbstractBuilder builder) { super(builder); setPersistence(new TestPersistentDataProvider(super.persistence())); } @@ -2125,7 +1902,7 @@ public class ShardTest extends AbstractShardTest { awaitAndValidateSnapshot(expectedRoot); } - private void awaitAndValidateSnapshot(NormalizedNode expectedRoot) throws InterruptedException, IOException { + private void awaitAndValidateSnapshot(final NormalizedNode expectedRoot) throws InterruptedException, IOException { assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); assertTrue("Invalid saved snapshot " + savedSnapshot.get(), @@ -2371,8 +2148,6 @@ public class ShardTest extends AbstractShardTest { waitUntilNoLeader(shard); - final YangInstanceIdentifier path = TestModel.TEST_PATH; - shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterDataTreeChangeListenerReply.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index de832c0b60..dae71b96cc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -13,6 +13,7 @@ import akka.actor.Props; import akka.testkit.TestActorRef; import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; @@ -39,7 +40,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest { private static final TransactionType RW = TransactionType.READ_WRITE; private static final TransactionType WO = TransactionType.WRITE_ONLY; - private static final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL); + private static final Shard mockShard = Mockito.mock(Shard.class); + + private static final ShardDataTree store = new ShardDataTree(mockShard, testSchemaContext, TreeType.OPERATIONAL); private static final ShardIdentifier SHARD_IDENTIFIER = ShardIdentifier.create("inventory", MemberName.forName("member-1"), "operational"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 55060d155f..917c374c7b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -17,17 +17,16 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status.Failure; import akka.actor.Terminated; +import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import java.util.concurrent.TimeUnit; -import org.junit.Ignore; +import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -41,19 +40,17 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class ShardTransactionTest extends AbstractActorTest { - private static final SchemaContext testSchemaContext = TestModel.createTestContext(); private static final TransactionType RO = TransactionType.READ_ONLY; private static final TransactionType RW = TransactionType.READ_WRITE; private static final TransactionType WO = TransactionType.WRITE_ONLY; @@ -61,28 +58,24 @@ public class ShardTransactionTest extends AbstractActorTest { private static final ShardIdentifier SHARD_IDENTIFIER = ShardIdentifier.create("inventory", MEMBER_NAME, "config"); + private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build(); - private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build(); + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); - private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore"); + private TestActorRef shard; + private ShardDataTree store; - private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL); - - private ActorRef createShard() { - ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext). - schemaContext(TestModel.createTestContext()).props()); + @Before + public void setUp() { + shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext). + schemaContext(TestModel.createTestContext()).props().withDispatcher(Dispatchers.DefaultDispatcherId())); ShardTestKit.waitUntilLeader(shard); - return shard; - } - - private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction transaction, String name) { - return newTransactionActor(type, transaction, null, name); + store = shard.underlyingActor().getDataStore(); } - private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction transaction, ActorRef shard, String name) { - Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(), - datastoreContext, shardStats); - return getSystem().actorOf(props, name); + private ActorRef newTransactionActor(final TransactionType type, final AbstractShardDataTreeTransaction transaction, final String name) { + Props props = ShardTransaction.props(type, transaction, shard, datastoreContext, shard.underlyingActor().getShardMBean()); + return actorFactory.createActor(props, name); } private ReadOnlyShardDataTreeTransaction readOnlyTransaction() { @@ -96,11 +89,9 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); + testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO")); - testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO")); - - testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW")); + testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW")); } private void testOnReceiveReadData(final ActorRef transaction) { @@ -116,13 +107,11 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadDataWhenDataNotFound() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - testOnReceiveReadDataWhenDataNotFound(newTransactionActor( - RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO")); + RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO")); testOnReceiveReadDataWhenDataNotFound(newTransactionActor( - RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW")); + RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW")); } private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) { @@ -137,12 +126,10 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDataExistsPositive() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - - testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard, + testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO")); - testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard, + testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW")); } @@ -159,12 +146,10 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDataExistsNegative() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = createShard(); - - testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard, + testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO")); - testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard, + testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW")); } @@ -384,19 +369,6 @@ public class ShardTransactionTest extends AbstractActorTest { }}; } - // Unknown operations are being logged - @Ignore - @Test(expected=UnknownMessageException.class) - public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { - final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard, - datastoreContext, shardStats); - final TestActorRef transaction = TestActorRef.apply(props,getSystem()); - - transaction.receive(new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION), - ActorRef.noSender()); - } - @Test public void testShardTransactionInactivity() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java index 657d7b3e83..1830290d60 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java @@ -7,29 +7,31 @@ */ package org.opendaylight.controller.cluster.datastore; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.Collections; +import java.util.Optional; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; +import scala.concurrent.Promise; /** * Unit tests for SimpleShardDataTreeCohort. @@ -37,106 +39,198 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree * @author Thomas Pantelis */ public class SimpleShardDataTreeCohortTest extends AbstractTest { - @Mock - private TipProducingDataTree mockDataTree; - @Mock private ShardDataTree mockShardDataTree; @Mock private DataTreeModification mockModification; + @Mock + private CompositeDataTreeCohort mockUserCohorts; + + @Mock + private FutureCallback mockPreCallback; + private SimpleShardDataTreeCohort cohort; @Before - public void setup() { + public void setup() throws Exception { MockitoAnnotations.initMocks(this); - doReturn(mockDataTree).when(mockShardDataTree).getDataTree(); + doNothing().when(mockUserCohorts).commit(); + doReturn(Optional.empty()).when(mockUserCohorts).abort(); - cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId()); + cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(), + mockUserCohorts); } @Test public void testCanCommitSuccess() throws Exception { - ListenableFuture future = cohort.canCommit(); - assertNotNull("Future is null", future); - assertEquals("Future", true, future.get(3, TimeUnit.SECONDS)); - verify(mockDataTree).validate(mockModification); + canCommitSuccess(); } - @Test(expected=OptimisticLockFailedException.class) - public void testCanCommitWithConflictingModEx() throws Throwable { - doThrow(new ConflictingModificationAppliedException(YangInstanceIdentifier.EMPTY, "mock")). - when(mockDataTree).validate(mockModification); - try { - cohort.canCommit().get(); - } catch (ExecutionException e) { - throw e.getCause(); - } + private void canCommitSuccess() { + doAnswer(invocation -> { + invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCanCommit(); + return null; + }).when(mockShardDataTree).startCanCommit(cohort); + + @SuppressWarnings("unchecked") + final FutureCallback callback = mock(FutureCallback.class); + cohort.canCommit(callback); + + verify(callback).onSuccess(null); + verifyNoMoreInteractions(callback); } - @Test(expected=TransactionCommitFailedException.class) - public void testCanCommitWithDataValidationEx() throws Throwable { - doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock")). - when(mockDataTree).validate(mockModification); - try { - cohort.canCommit().get(); - } catch (ExecutionException e) { - throw e.getCause(); - } + private void testValidatationPropagates(final Exception cause) throws DataValidationFailedException { + doAnswer(invocation -> { + invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).failedCanCommit(cause); + return null; + }).when(mockShardDataTree).startCanCommit(cohort); + + @SuppressWarnings("unchecked") + final FutureCallback callback = mock(FutureCallback.class); + cohort.canCommit(callback); + + verify(callback).onFailure(cause); + verifyNoMoreInteractions(callback); } - @Test(expected=IllegalArgumentException.class) - public void testCanCommitWithIllegalArgumentEx() throws Throwable { - doThrow(new IllegalArgumentException("mock")).when(mockDataTree).validate(mockModification); - try { - cohort.canCommit().get(); - } catch (ExecutionException e) { - throw e.getCause(); - } + @Test + public void testCanCommitWithConflictingModEx() throws DataValidationFailedException { + testValidatationPropagates(new ConflictingModificationAppliedException(YangInstanceIdentifier.EMPTY, "mock")); } @Test - public void testPreCommitAndCommitSuccess() throws Exception { - DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class); - doReturn(mockCandidate ).when(mockDataTree).prepare(mockModification); + public void testCanCommitWithDataValidationEx() throws DataValidationFailedException { + testValidatationPropagates(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock")); + } - ListenableFuture future = cohort.preCommit(); - assertNotNull("Future is null", future); - future.get(); - verify(mockDataTree).prepare(mockModification); + @Test + public void testCanCommitWithIllegalArgumentEx() throws DataValidationFailedException { + testValidatationPropagates(new IllegalArgumentException("mock")); + } + + private DataTreeCandidateTip preCommitSuccess() { + final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class); + doAnswer(invocation -> { + invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulPreCommit(mockCandidate); + return null; + }).when(mockShardDataTree).startPreCommit(cohort); + + @SuppressWarnings("unchecked") + final FutureCallback callback = mock(FutureCallback.class); + cohort.preCommit(callback); + + verify(callback).onSuccess(mockCandidate); + verifyNoMoreInteractions(callback); assertSame("getCandidate", mockCandidate, cohort.getCandidate()); - future = cohort.commit(); - assertNotNull("Future is null", future); - future.get(); - verify(mockDataTree).commit(mockCandidate); + return mockCandidate; + } + + @Test + public void testPreCommitAndCommitSuccess() throws Exception { + canCommitSuccess(); + final DataTreeCandidateTip candidate = preCommitSuccess(); + + doAnswer(invocation -> { + invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCommit(UnsignedLong.valueOf(0)); + return null; + }).when(mockShardDataTree).startCommit(cohort, candidate); + + @SuppressWarnings("unchecked") + final + FutureCallback mockCommitCallback = mock(FutureCallback.class); + cohort.commit(mockCommitCallback); + + verify(mockCommitCallback).onSuccess(any(UnsignedLong.class)); + verifyNoMoreInteractions(mockCommitCallback); + + verify(mockUserCohorts).commit(); } - @Test(expected=IllegalArgumentException.class) + @Test public void testPreCommitWithIllegalArgumentEx() throws Throwable { - doThrow(new IllegalArgumentException("mock")).when(mockDataTree).prepare(mockModification); - try { - cohort.preCommit().get(); - } catch (ExecutionException e) { - throw e.getCause(); - } + canCommitSuccess(); + + final Exception cause = new IllegalArgumentException("mock"); + doAnswer(invocation -> { + invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).failedPreCommit(cause); + return null; + }).when(mockShardDataTree).startPreCommit(cohort); + + @SuppressWarnings("unchecked") + final FutureCallback callback = mock(FutureCallback.class); + cohort.preCommit(callback); + + verify(callback).onFailure(cause); + verifyNoMoreInteractions(callback); + + verify(mockUserCohorts).abort(); } - @Test(expected=IllegalArgumentException.class) - public void testCommitWithIllegalArgumentEx() throws Throwable { - doThrow(new IllegalArgumentException("mock")).when(mockDataTree).commit(any(DataTreeCandidateTip.class)); - try { - cohort.commit().get(); - } catch (ExecutionException e) { - throw e.getCause(); - } + @Test + public void testPreCommitWithReportedFailure() throws Throwable { + canCommitSuccess(); + + final Exception cause = new IllegalArgumentException("mock"); + cohort.reportFailure(cause); + + @SuppressWarnings("unchecked") + final FutureCallback callback = mock(FutureCallback.class); + cohort.preCommit(callback); + + verify(callback).onFailure(cause); + verifyNoMoreInteractions(callback); + + verify(mockShardDataTree, never()).startPreCommit(cohort); + } + + @Test + public void testCommitWithIllegalArgumentEx() { + canCommitSuccess(); + final DataTreeCandidateTip candidate = preCommitSuccess(); + + final Exception cause = new IllegalArgumentException("mock"); + doAnswer(invocation -> { + invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).failedCommit(cause); + return null; + }).when(mockShardDataTree).startCommit(cohort, candidate); + + @SuppressWarnings("unchecked") + final FutureCallback callback = mock(FutureCallback.class); + cohort.commit(callback); + + verify(callback).onFailure(cause); + verifyNoMoreInteractions(callback); + + verify(mockUserCohorts).abort(); } @Test public void testAbort() throws Exception { + doNothing().when(mockShardDataTree).startAbort(cohort); + cohort.abort().get(); + + verify(mockShardDataTree).startAbort(cohort); + } + + @Test + public void testAbortWithCohorts() throws Exception { + doNothing().when(mockShardDataTree).startAbort(cohort); + + final Promise> cohortFuture = akka.dispatch.Futures.promise(); + doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort(); + + final ListenableFuture abortFuture = cohort.abort(); + + cohortFuture.success(Collections.emptyList()); + + abortFuture.get(); + verify(mockShardDataTree).startAbort(cohort); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java index 33fb4e3ddd..66bd0489a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java @@ -15,8 +15,12 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import akka.testkit.JavaTestKit; import com.google.common.collect.ImmutableSet; import java.util.concurrent.TimeUnit; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded; import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved; @@ -40,8 +44,16 @@ public class CandidateListChangeListenerTest extends AbstractActorTest { private static final YangInstanceIdentifier ENTITY_ID2 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2")); - private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(), - TreeType.OPERATIONAL); + private ShardDataTree shardDataTree; + + @Mock + private Shard mockShard; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(), TreeType.OPERATIONAL); + } @Test public void testOnDataTreeChanged() throws Exception { @@ -95,11 +107,11 @@ public class CandidateListChangeListenerTest extends AbstractActorTest { ImmutableSet.copyOf(candidateRemoved.getRemainingCandidates())); } - private void writeNode(YangInstanceIdentifier path, NormalizedNode node) throws DataValidationFailedException { + private void writeNode(final YangInstanceIdentifier path, final NormalizedNode node) throws DataValidationFailedException { AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree); } - private void deleteNode(YangInstanceIdentifier path) throws DataValidationFailedException { + private void deleteNode(final YangInstanceIdentifier path) throws DataValidationFailedException { AbstractEntityOwnershipTest.deleteNode(path, shardDataTree); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index 6d092cf869..52adc5af42 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; @@ -220,7 +221,9 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh DistributedEntityOwnershipService service = spy(DistributedEntityOwnershipService.start( dataStore.getActorContext(), EntityOwnerSelectionStrategyConfig.newBuilder().build())); - ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(), TreeType.OPERATIONAL); + final Shard mockShard = Mockito.mock(Shard.class); + ShardDataTree shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(), + TreeType.OPERATIONAL); when(service.getLocalEntityOwnershipShardDataTree()).thenReturn(shardDataTree.getDataTree()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java index 3c4fea0b59..17c6d11f68 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java @@ -19,7 +19,9 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; @@ -47,7 +49,9 @@ public class EntityOwnerChangeListenerTest { private static final DOMEntity ENTITY1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1); private static final DOMEntity ENTITY2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2); - private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(), + private final Shard mockShard = Mockito.mock(Shard.class); + + private final ShardDataTree shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(), TreeType.OPERATIONAL); private final EntityOwnershipListenerSupport mockListenerSupport = mock(EntityOwnershipListenerSupport.class); private EntityOwnerChangeListener listener; @@ -133,7 +137,7 @@ public class EntityOwnerChangeListenerTest { anyBoolean(), anyBoolean()); } - private void writeNode(YangInstanceIdentifier path, NormalizedNode node) throws DataValidationFailedException { + private void writeNode(final YangInstanceIdentifier path, final NormalizedNode node) throws DataValidationFailedException { AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index d3c3d1d48a..af347903f8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -592,7 +592,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { TestActorRef leader = actorFactory.createTestActor(newShardProps(leaderId, ImmutableMap.builder().put(localId.toString(), shard.path().toString()).build(), - LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString()); + leaderId.getMemberName().getName(), EntityOwnerSelectionStrategyConfig.newBuilder().build()) + .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString()); leader.tell(TimeoutNow.INSTANCE, leader); ShardTestKit.waitUntilLeader(leader); @@ -833,7 +834,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { TestEntityOwnershipShard(ShardIdentifier name, Map peerAddresses, DatastoreContext datastoreContext) { super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext). - schemaContext(SCHEMA_CONTEXT).localMemberName(MemberName.forName(LOCAL_MEMBER_NAME))); + schemaContext(SCHEMA_CONTEXT).localMemberName(name.getMemberName())); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipStatisticsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipStatisticsTest.java index a621ac65ab..320e830450 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipStatisticsTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipStatisticsTest.java @@ -16,7 +16,9 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import java.util.Map; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.yangtools.yang.common.QName; @@ -35,7 +37,9 @@ public class EntityOwnershipStatisticsTest extends AbstractActorTest { private static final YangInstanceIdentifier ENTITY_ID2 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2")); - private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(), + private final Shard mockShard = Mockito.mock(Shard.class); + + private final ShardDataTree shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(), TreeType.OPERATIONAL); private EntityOwnershipStatistics ownershipStatistics; @@ -132,11 +136,11 @@ public class EntityOwnershipStatisticsTest extends AbstractActorTest { } - private static void assertStatistics(Map> statistics, String memberName, long val) { + private static void assertStatistics(final Map> statistics, final String memberName, final long val) { assertEquals(val, statistics.get(ENTITY_TYPE).get(memberName).longValue()); } - private void writeNode(YangInstanceIdentifier path, NormalizedNode node) throws DataValidationFailedException { + private void writeNode(final YangInstanceIdentifier path, final NormalizedNode node) throws DataValidationFailedException { AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree); } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java index 7bf7d50086..c078c94637 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java @@ -30,7 +30,9 @@ import java.lang.reflect.Method; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; @@ -80,7 +82,7 @@ public class PruningDataTreeModificationTest { realModification = dataTree.takeSnapshot().newModification(); proxyModification = Reflection.newProxy(DataTreeModification.class, new InvocationHandler() { @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { try { method.invoke(mockModification, args); return method.invoke(realModification, args); @@ -201,8 +203,10 @@ public class PruningDataTreeModificationTest { } @Test - public void testWriteRootNodeWithInvalidChild() throws Exception{ - ShardDataTree shardDataTree = new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION); + public void testWriteRootNodeWithInvalidChild() throws Exception { + final Shard mockShard = Mockito.mock(Shard.class); + + ShardDataTree shardDataTree = new ShardDataTree(mockShard, SCHEMA_CONTEXT, TreeType.CONFIGURATION); NormalizedNode root = shardDataTree.readNode(YangInstanceIdentifier.EMPTY).get(); NormalizedNode normalizedNode = ImmutableContainerNodeBuilder.create().withNodeIdentifier( -- 2.36.6