import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.persistence.SaveSnapshotSuccess;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
-import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@Test
public void testApplySnapshot() throws Exception {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
ShardTestKit.waitUntilLeader(shard);
final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
final NormalizedNode<?,?> expected = readStore(store, root);
- final Snapshot snapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(expected).serialize(),
+ final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(),
Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
- shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
+ shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
- final NormalizedNode<?,?> actual = readStore(shard, root);
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
- assertEquals("Root node", expected, actual);
+ try {
+ assertEquals("Root node", expected, readStore(shard, root));
+ return;
+ } catch(AssertionError e) {
+ // try again
+ }
+ }
+
+ fail("Snapshot was not applied");
}
@Test
waitUntilLeader(shard);
- // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
-
final TransactionIdentifier transactionID3 = nextTransactionId();
- final MutableCompositeModification modification3 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification3);
+
+ Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
+ shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
+ final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
+ final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
+ final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
final long timeoutSec = 5;
final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(duration);
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.class));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ // Ready 2 more Tx's.
+
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
+ shard.tell(prepareBatchedModifications(transactionID3, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
assertEquals("Commits complete", true, done);
- final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
- inOrder.verify(cohort2).preCommit();
- inOrder.verify(cohort2).commit();
- inOrder.verify(cohort3).canCommit();
- inOrder.verify(cohort3).preCommit();
- inOrder.verify(cohort3).commit();
+ final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
+ cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
+ cohort3.getPreCommit(), cohort3.getCommit());
+ inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
+ inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+ inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
+ inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
+ inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+ inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
+ inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
+ inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+ inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
// Verify data in the data store.
final TransactionIdentifier transactionID = nextTransactionId();
final FiniteDuration duration = duration("5 seconds");
- final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
- final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
- if(mockCohort.get() == null) {
- mockCohort.set(createDelegatingMockCohort("cohort", actual));
- }
-
- return mockCohort.get();
- };
-
- shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
// Send a BatchedModifications to start a transaction.
shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- // Send the CanCommitTransaction message.
+ // Send the CommitTransaction message.
shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.class);
- final InOrder inOrder = inOrder(mockCohort.get());
- inOrder.verify(mockCohort.get()).canCommit();
- inOrder.verify(mockCohort.get()).preCommit();
- inOrder.verify(mockCohort.get()).commit();
-
// Verify data in the data store.
verifyOuterListEntry(shard, 1);
final TransactionIdentifier transactionID = nextTransactionId();
final FiniteDuration duration = duration("5 seconds");
- final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
- final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
- if(mockCohort.get() == null) {
- mockCohort.set(createDelegatingMockCohort("cohort", actual));
- }
-
- return mockCohort.get();
- };
-
- shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
// Send a BatchedModifications to start a transaction.
shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
expectMsgClass(duration, CommitTransactionReply.class);
- final InOrder inOrder = inOrder(mockCohort.get());
- inOrder.verify(mockCohort.get()).canCommit();
- inOrder.verify(mockCohort.get()).preCommit();
- inOrder.verify(mockCohort.get()).commit();
-
// Verify data in the data store.
verifyOuterListEntry(shard, 1);
Failure failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
- shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), txId,
- DataStoreVersions.CURRENT_VERSION, true), getRef());
+ shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
waitUntilLeader(shard);
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
-
- final FiniteDuration duration = duration("5 seconds");
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
-
- expectMsgClass(duration, CommitTransactionReply.class);
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
+ containerNode, true), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
+ }
- final InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
+ expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
@Test
public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
- testCommitWithPersistenceDisabled(true);
- }
-
- @Test
- public void testWriteOnlyCommitWithPersistenceDisabled() throws Throwable {
- testCommitWithPersistenceDisabled(true);
- }
-
- private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.persistent(false);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWithPersistenceDisabled-" + readWrite);
+ "testCommitWithPersistenceDisabled");
waitUntilLeader(shard);
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
// Setup a simulated transactions with a mock cohort.
- final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
-
final FiniteDuration duration = duration("5 seconds");
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ final TransactionIdentifier transactionID = nextTransactionId();
+ final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.class);
- final InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
-
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
}};
private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
// Note that persistence is enabled which would normally result in the entry getting written to the journal
// but here that need not happen
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWhenTransactionHasNoModifications-" + readWrite);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasNoModifications-" + readWrite);
- waitUntilLeader(shard);
+ waitUntilLeader(shard);
- final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
- doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
+ final TransactionIdentifier transactionID = nextTransactionId();
- final FiniteDuration duration = duration("5 seconds");
+ final FiniteDuration duration = duration("5 seconds");
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
+ if(readWrite) {
+ ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
+ newReadWriteTransaction(transactionID);
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()), getRef());
+ }
- // Send the CanCommitTransaction message.
+ expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ // Send the CanCommitTransaction message.
- shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.class);
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
- final InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
- shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
- final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
- // Use MBean for verification
- // Committed transaction count should increase as usual
- assertEquals(1,shardStats.getCommittedTransactionsCount());
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shardStats.getCommittedTransactionsCount());
- // Commit index should not advance because this does not go into the journal
- assertEquals(-1, shardStats.getCommitIndex());
- }
- };
+ // Commit index should not advance because this does not go into the journal
+ assertEquals(-1, shardStats.getCommitIndex());
+ }};
}
@Test
- public void testReadWriteCommitWhenTransactionHasModifications() {
+ public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
testCommitWhenTransactionHasModifications(true);
}
@Test
- public void testWriteOnlyCommitWhenTransactionHasModifications() {
+ public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
testCommitWhenTransactionHasModifications(false);
}
- private void testCommitWhenTransactionHasModifications(final boolean readWrite){
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWhenTransactionHasModifications-" + readWrite);
+ private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasModifications-" + readWrite);
- waitUntilLeader(shard);
+ waitUntilLeader(shard);
- final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- modification.addModification(new DeleteModification(YangInstanceIdentifier.EMPTY));
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
- doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
+ final FiniteDuration duration = duration("5 seconds");
+ final TransactionIdentifier transactionID = nextTransactionId();
- final FiniteDuration duration = duration("5 seconds");
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ }
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
+ expectMsgClass(duration, ReadyTransactionReply.class);
- // Send the CanCommitTransaction message.
+ // Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
- shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.class);
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
- final InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
+ final InOrder inOrder = inOrder(dataTree);
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+ inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+ inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
- shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
- final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
- // Use MBean for verification
- // Committed transaction count should increase as usual
- assertEquals(1, shardStats.getCommittedTransactionsCount());
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1, shardStats.getCommittedTransactionsCount());
- // Commit index should advance as we do not have an empty modification
- assertEquals(0, shardStats.getCommitIndex());
- }
- };
+ // Commit index should advance as we do not have an empty modification
+ assertEquals(0, shardStats.getCommitIndex());
+ }};
}
@Test
public void testCommitPhaseFailure() throws Throwable {
- testCommitPhaseFailure(true);
- testCommitPhaseFailure(false);
- }
-
- private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitPhaseFailure-" + readWrite);
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitPhaseFailure");
waitUntilLeader(shard);
+ final FiniteDuration duration = duration("5 seconds");
+ final Timeout timeout = new Timeout(duration);
+
// Setup 2 simulated transactions with mock cohorts. The first one fails in the
// commit phase.
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
- doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
- doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
- final FiniteDuration duration = duration("5 seconds");
- final Timeout timeout = new Timeout(duration);
+ doThrow(new RuntimeException("mock commit failure")).when(dataTree).commit(any(DataTreeCandidate.class));
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
- final InOrder inOrder = inOrder(cohort1, cohort2);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
+ final InOrder inOrder = inOrder(dataTree);
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+ inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+ inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
}};
}
@Test
public void testPreCommitPhaseFailure() throws Throwable {
- testPreCommitPhaseFailure(true);
- testPreCommitPhaseFailure(false);
- }
-
- private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testPreCommitPhaseFailure-" + readWrite);
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testPreCommitPhaseFailure");
waitUntilLeader(shard);
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ doThrow(new RuntimeException("mock preCommit failure")).when(dataTree).prepare(any(DataTreeModification.class));
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
- final InOrder inOrder = inOrder(cohort1, cohort2);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort2).canCommit();
+ final InOrder inOrder = inOrder(dataTree);
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+ inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
}};
}
@Test
public void testCanCommitPhaseFailure() throws Throwable {
- testCanCommitPhaseFailure(true);
- testCanCommitPhaseFailure(false);
- }
-
- private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCanCommitPhaseFailure-" + readWrite);
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitPhaseFailure");
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
-
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
+ doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
+ doNothing().when(dataTree).validate(any(DataTreeModification.class));
+
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
// Send another can commit to ensure the failed one got cleaned up.
- reset(cohort);
-
final TransactionIdentifier transactionID2 = nextTransactionId();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
}};
}
- @Test
- public void testCanCommitPhaseFalseResponse() throws Throwable {
- testCanCommitPhaseFalseResponse(true);
- testCanCommitPhaseFalseResponse(false);
- }
-
- private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCanCommitPhaseFalseResponse-" + readWrite);
-
- waitUntilLeader(shard);
-
- final FiniteDuration duration = duration("5 seconds");
-
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- // Send the CanCommitTransaction message.
-
- shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
- CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(CanCommitTransactionReply.class));
- assertEquals("getCanCommit", false, reply.getCanCommit());
-
- // Send another can commit to ensure the failed one got cleaned up.
-
- reset(cohort);
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
- reply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(CanCommitTransactionReply.class));
- assertEquals("getCanCommit", true, reply.getCanCommit());
- }};
- }
-
@Test
public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
testImmediateCommitWithCanCommitPhaseFailure(true);
private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
+ doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
+ doNothing().when(dataTree).validate(any(DataTreeModification.class));
+
final FiniteDuration duration = duration("5 seconds");
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ }
expectMsgClass(duration, akka.actor.Status.Failure.class);
// Send another can commit to ensure the failed one got cleaned up.
- reset(cohort);
-
final TransactionIdentifier transactionID2 = nextTransactionId();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).commit();
- final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
- final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
- doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
- doReturn(candidateRoot).when(candidate).getRootNode();
- doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
- doReturn(candidate).when(cohort).getCandidate();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ }
expectMsgClass(duration, CommitTransactionReply.class);
}};
}
+ @SuppressWarnings("serial")
@Test
- public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
- testImmediateCommitWithCanCommitPhaseFalseResponse(true);
- testImmediateCommitWithCanCommitPhaseFalseResponse(false);
- }
-
- private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
+ public void testAbortWithCommitPending() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
-
- waitUntilLeader(shard);
-
- final FiniteDuration duration = duration("5 seconds");
-
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
-
- expectMsgClass(duration, akka.actor.Status.Failure.class);
-
- // Send another can commit to ensure the failed one got cleaned up.
-
- reset(cohort);
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).commit();
- final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
- final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
- doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
- doReturn(candidateRoot).when(candidate).getRootNode();
- doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
- doReturn(candidate).when(cohort).getCandidate();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
-
- expectMsgClass(duration, CommitTransactionReply.class);
- }};
- }
+ final Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(newShardBuilder()) {
+ @Override
+ void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
+ // Simulate an AbortTransaction message occurring during replication, after
+ // persisting and before finishing the commit to the in-memory store.
- @Test
- public void testAbortBeforeFinishCommit() throws Throwable {
- testAbortBeforeFinishCommit(true);
- testAbortBeforeFinishCommit(false);
- }
+ doAbortTransaction(transactionId, null);
+ super.persistPayload(transactionId, payload);
+ }
+ };
+ }
+ };
- private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
- new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortBeforeFinishCommit-" + readWrite);
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testAbortWithCommitPending");
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
final TransactionIdentifier transactionID = nextTransactionId();
- final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
- cohort -> {
- final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-
- // Simulate an AbortTransaction message occurring during replication, after
- // persisting and before finishing the commit to the in-memory store.
- // We have no followers so due to optimizations in the RaftActor, it does not
- // attempt replication and thus we can't send an AbortTransaction message b/c
- // it would be processed too late after CommitTransaction completes. So we'll
- // simulate an AbortTransaction message occurring during replication by calling
- // the shard directly.
- //
- shard.underlyingActor().doAbortTransaction(transactionID, null);
-
- return preCommitFuture;
- };
-
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
- modification, preCommit);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.class);
@Test
public void testTransactionCommitTimeout() throws Throwable {
- testTransactionCommitTimeout(true);
- testTransactionCommitTimeout(false);
- }
-
- private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
-
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testTransactionCommitTimeout-" + readWrite);
+ "testTransactionCommitTimeout");
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
writeToStore(shard, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- // Create 1st Tx - will timeout
+ // Ready 2 Tx's - the first will timeout
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification1);
-
- // Create 2nd Tx
+ shard.tell(prepareBatchedModifications(transactionID1, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
- listNodePath,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
- modification2);
-
- // Ready the Tx's
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+ shard.tell(prepareBatchedModifications(transactionID2, listNodePath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// canCommit 1st Tx. We don't send the commit so it should timeout.
}};
}
- @Test
- public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
- dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
-
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testTransactionCommitQueueCapacityExceeded");
-
- waitUntilLeader(shard);
-
- final FiniteDuration duration = duration("5 seconds");
-
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
-
- final TransactionIdentifier transactionID3 = nextTransactionId();
- final MutableCompositeModification modification3 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
-
- // Ready the Tx's
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- // The 3rd Tx should exceed queue capacity and fail.
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
- expectMsgClass(duration, akka.actor.Status.Failure.class);
-
- // canCommit 1st Tx.
-
- shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, CanCommitTransactionReply.class);
-
- // canCommit the 2nd Tx - it should get queued.
-
- shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
-
- // canCommit the 3rd Tx - should exceed queue capacity and fail.
-
- shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, akka.actor.Status.Failure.class);
- }};
- }
+// @Test
+// @Ignore
+// public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
+// dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
+//
+// new ShardTestKit(getSystem()) {{
+// final TestActorRef<Shard> shard = actorFactory.createTestActor(
+// newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+// "testTransactionCommitQueueCapacityExceeded");
+//
+// waitUntilLeader(shard);
+//
+// final FiniteDuration duration = duration("5 seconds");
+//
+// final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+//
+// final TransactionIdentifier transactionID1 = nextTransactionId();
+// final MutableCompositeModification modification1 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
+// modification1);
+//
+// final TransactionIdentifier transactionID2 = nextTransactionId();
+// final MutableCompositeModification modification2 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+// TestModel.OUTER_LIST_PATH,
+// ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
+// modification2);
+//
+// final TransactionIdentifier transactionID3 = nextTransactionId();
+// final MutableCompositeModification modification3 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
+// modification3);
+//
+// // Ready the Tx's
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+// expectMsgClass(duration, ReadyTransactionReply.class);
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+// expectMsgClass(duration, ReadyTransactionReply.class);
+//
+// // The 3rd Tx should exceed queue capacity and fail.
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
+// expectMsgClass(duration, akka.actor.Status.Failure.class);
+//
+// // canCommit 1st Tx.
+//
+// shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+// expectMsgClass(duration, CanCommitTransactionReply.class);
+//
+// // canCommit the 2nd Tx - it should get queued.
+//
+// shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+//
+// // canCommit the 3rd Tx - should exceed queue capacity and fail.
+//
+// shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+// expectMsgClass(duration, akka.actor.Status.Failure.class);
+// }};
+// }
@Test
public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
- dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
-
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
final FiniteDuration duration = duration("5 seconds");
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final TransactionIdentifier transactionID3 = nextTransactionId();
- final MutableCompositeModification modification3 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
+ shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// All Tx's are readied. We'll send canCommit for the last one but not the others. The others
@Test
public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
- dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
-
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- // CanCommit the first one so it's the current in-progress CohortEntry.
+ // CanCommit the first Tx so it's the current in-progress Tx.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, CanCommitTransactionReply.class);
// Ready the second Tx.
final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Ready the third Tx.
final DataTreeModification modification3 = dataStore.newModification();
new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
.apply(modification3);
- modification3.ready();
+ modification3.ready();
final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
-
shard.tell(readyMessage, getRef());
// Commit the first Tx. After completing, the second should expire from the queue and the third
}
@Test
- public void testAbortCurrentTransaction() throws Throwable {
- testAbortCurrentTransaction(true);
- testAbortCurrentTransaction(false);
- }
-
- private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
+ public void testAbortAfterCanCommit() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortCurrentTransaction-" + readWrite);
+ "testAbortAfterCanCommit");
waitUntilLeader(shard);
- // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
-
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ // Ready 2 transactions - the first one will be aborted.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Wait for the 2nd Tx to complete the canCommit phase.
- Await.ready(canCommitFuture, duration);
-
- final InOrder inOrder = inOrder(cohort1, cohort2);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort2).canCommit();
+ canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
}};
}
@Test
- public void testAbortQueuedTransaction() throws Throwable {
- testAbortQueuedTransaction(true);
- testAbortQueuedTransaction(false);
- }
-
- private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
+ public void testAbortAfterReady() throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
- final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
- @SuppressWarnings("serial")
- final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
- @Override
- public void handleCommand(final Object message) {
- super.handleCommand(message);
- if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
- if(cleaupCheckLatch.get() != null) {
- cleaupCheckLatch.get().countDown();
- }
- }
- }
- };
-
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- Props.create(new DelegatingShardCreator(creator)).withDispatcher(
- Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
waitUntilLeader(shard);
- final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
- doReturn(Futures.immediateFuture(null)).when(cohort).abort();
-
final FiniteDuration duration = duration("5 seconds");
- // Ready the tx.
+ // Ready a tx.
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
-
// Send the AbortTransaction message.
- shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, AbortTransactionReply.class);
- verify(cohort).abort();
-
- // Verify the tx cohort is removed from queue at the cleanup check interval.
-
- cleaupCheckLatch.set(new CountDownLatch(1));
- assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
- cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
-
assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
// Now send CanCommitTransaction - should fail.
- shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
assertTrue("Failure type", failure instanceof IllegalStateException);
+
+ // Ready and CanCommit another and verify success.
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+ }};
+ }
+
+ @Test
+ public void testAbortQueuedTransaction() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready 3 tx's.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Abort the second tx while it's queued.
+
+ shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.class);
+
+ // Commit the other 2.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
}};
}
new ShardTestKit(getSystem()) {{
class TestShard extends Shard {
- protected TestShard(AbstractBuilder<?, ?> builder) {
+ protected TestShard(final AbstractBuilder<?, ?> builder) {
super(builder);
setPersistence(new TestPersistentDataProvider(super.persistence()));
}
awaitAndValidateSnapshot(expectedRoot);
}
- private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
+ private void awaitAndValidateSnapshot(final NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
waitUntilNoLeader(shard);
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterDataTreeChangeListenerReply.class);