2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.inOrder;
20 import static org.mockito.Mockito.mock;
21 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.Props;
26 import akka.actor.Status.Failure;
27 import akka.dispatch.Dispatchers;
28 import akka.dispatch.OnComplete;
29 import akka.japi.Creator;
30 import akka.pattern.Patterns;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Stopwatch;
35 import com.google.common.base.Throwables;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import java.util.Collections;
38 import java.util.HashSet;
40 import java.util.Optional;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicReference;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
50 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
63 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
64 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
67 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
68 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
72 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
75 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
79 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
80 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
83 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
84 import org.opendaylight.controller.cluster.raft.RaftActorContext;
85 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
86 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
87 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
88 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
89 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
91 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
92 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
94 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
95 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
96 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
97 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
98 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
99 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
100 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
101 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
102 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
103 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
104 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
105 import org.opendaylight.yangtools.concepts.Identifier;
106 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
107 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
117 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
118 import scala.concurrent.Await;
119 import scala.concurrent.Future;
120 import scala.concurrent.duration.FiniteDuration;
122 public class ShardTest extends AbstractShardTest {
123 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
124 + "InMemorySnapshotStore and journal recovery seq number will start from 1";
127 public void testRegisterDataTreeChangeListener() throws Exception {
128 new ShardTestKit(getSystem()) {
130 final TestActorRef<Shard> shard = actorFactory.createTestActor(
131 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
132 "testRegisterDataTreeChangeListener");
134 waitUntilLeader(shard);
136 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
138 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
139 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
140 TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
142 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
144 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
145 RegisterDataTreeNotificationListenerReply.class);
146 final String replyPath = reply.getListenerRegistrationPath().toString();
147 assertTrue("Incorrect reply path: " + replyPath,
148 replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
150 final YangInstanceIdentifier path = TestModel.TEST_PATH;
151 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
153 listener.waitForChangeEvents();
158 @SuppressWarnings("serial")
160 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
161 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
162 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
163 final Creator<Shard> creator = new Creator<Shard>() {
164 boolean firstElectionTimeout = true;
167 public Shard create() {
168 return new Shard(newShardBuilder()) {
170 public void handleCommand(final Object message) {
171 if (message instanceof ElectionTimeout && firstElectionTimeout) {
172 firstElectionTimeout = false;
173 final ActorRef self = getSelf();
175 Uninterruptibles.awaitUninterruptibly(
176 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
177 self.tell(message, self);
180 onFirstElectionTimeout.countDown();
182 super.handleCommand(message);
189 setupInMemorySnapshotStore();
191 final YangInstanceIdentifier path = TestModel.TEST_PATH;
192 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
193 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
194 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
196 final TestActorRef<Shard> shard = actorFactory.createTestActor(
197 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
198 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
200 new ShardTestKit(getSystem()) {
202 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
204 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
205 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
206 RegisterDataTreeNotificationListenerReply.class);
207 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
209 shard.tell(FindLeader.INSTANCE, getRef());
210 final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
211 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
213 onChangeListenerRegistered.countDown();
215 // TODO: investigate why we do not receive data chage events
216 listener.waitForChangeEvents();
222 public void testCreateTransaction() {
223 new ShardTestKit(getSystem()) {
225 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
227 waitUntilLeader(shard);
229 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
231 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
232 DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
234 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
235 CreateTransactionReply.class);
237 final String path = reply.getTransactionPath().toString();
238 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
239 "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
240 shardID.getShardName(), shardID.getMemberName().getName())));
246 public void testCreateTransactionOnChain() {
247 new ShardTestKit(getSystem()) {
249 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
251 waitUntilLeader(shard);
253 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
254 DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
256 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
257 CreateTransactionReply.class);
259 final String path = reply.getTransactionPath().toString();
260 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
261 "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
262 shardID.getShardName(), shardID.getMemberName().getName())));
268 public void testPeerAddressResolved() {
269 new ShardTestKit(getSystem()) {
271 final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
273 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
274 .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
275 .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
277 final String address = "akka://foobar";
278 shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
280 shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
281 final OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
282 assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
288 public void testApplySnapshot() throws Exception {
290 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
291 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
293 ShardTestKit.waitUntilLeader(shard);
295 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
298 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
299 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
300 .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
301 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
303 writeToStore(store, TestModel.TEST_PATH, container);
305 final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
306 final NormalizedNode<?,?> expected = readStore(store, root);
308 final Snapshot snapshot = Snapshot.create(
309 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
310 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4, -1, null, null);
312 shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
314 final Stopwatch sw = Stopwatch.createStarted();
315 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
316 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
319 assertEquals("Root node", expected, readStore(shard, root));
321 } catch (final AssertionError e) {
326 fail("Snapshot was not applied");
330 public void testApplyState() throws Exception {
331 final TestActorRef<Shard> shard = actorFactory.createTestActor(
332 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
334 ShardTestKit.waitUntilLeader(shard);
336 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
339 final DataTreeModification writeMod = store.takeSnapshot().newModification();
340 final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
341 writeMod.write(TestModel.TEST_PATH, node);
344 final TransactionIdentifier tx = nextTransactionId();
345 shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
347 final Stopwatch sw = Stopwatch.createStarted();
348 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
349 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
351 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
352 if (actual != null) {
353 assertEquals("Applied state", node, actual);
358 fail("State was not applied");
362 public void testDataTreeCandidateRecovery() throws Exception {
363 // Set up the InMemorySnapshotStore.
364 final DataTree source = setupInMemorySnapshotStore();
366 final DataTreeModification writeMod = source.takeSnapshot().newModification();
367 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
369 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
371 // Set up the InMemoryJournal.
372 InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
373 payloadForModification(source, writeMod, nextTransactionId())));
375 final int nListEntries = 16;
376 final Set<Integer> listEntryKeys = new HashSet<>();
378 // Add some ModificationPayload entries
379 for (int i = 1; i <= nListEntries; i++) {
380 listEntryKeys.add(Integer.valueOf(i));
382 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
383 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
385 final DataTreeModification mod = source.takeSnapshot().newModification();
386 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
389 InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
390 payloadForModification(source, mod, nextTransactionId())));
393 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
394 new ApplyJournalEntries(nListEntries));
396 testRecovery(listEntryKeys);
400 @SuppressWarnings("checkstyle:IllegalCatch")
401 public void testConcurrentThreePhaseCommits() throws Exception {
402 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
403 final CountDownLatch commitLatch = new CountDownLatch(2);
405 final long timeoutSec = 5;
406 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
407 final Timeout timeout = new Timeout(duration);
409 final TestActorRef<Shard> shard = actorFactory.createTestActor(
410 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
411 "testConcurrentThreePhaseCommits");
413 class OnFutureComplete extends OnComplete<Object> {
414 private final Class<?> expRespType;
416 OnFutureComplete(final Class<?> expRespType) {
417 this.expRespType = expRespType;
421 public void onComplete(final Throwable error, final Object resp) {
423 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
426 assertEquals("Commit response type", expRespType, resp.getClass());
428 } catch (final Exception e) {
434 void onSuccess(final Object resp) {
438 class OnCommitFutureComplete extends OnFutureComplete {
439 OnCommitFutureComplete() {
440 super(CommitTransactionReply.class);
444 public void onComplete(final Throwable error, final Object resp) {
445 super.onComplete(error, resp);
446 commitLatch.countDown();
450 class OnCanCommitFutureComplete extends OnFutureComplete {
451 private final TransactionIdentifier transactionID;
453 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
454 super(CanCommitTransactionReply.class);
455 this.transactionID = transactionID;
459 void onSuccess(final Object resp) {
460 final CanCommitTransactionReply canCommitReply =
461 CanCommitTransactionReply.fromSerializable(resp);
462 assertEquals("Can commit", true, canCommitReply.getCanCommit());
464 final Future<Object> commitFuture = Patterns.ask(shard,
465 new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
466 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
470 new ShardTestKit(getSystem()) {
472 waitUntilLeader(shard);
474 final TransactionIdentifier transactionID1 = nextTransactionId();
475 final TransactionIdentifier transactionID2 = nextTransactionId();
476 final TransactionIdentifier transactionID3 = nextTransactionId();
478 final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
479 shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
480 final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
481 final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
482 final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
484 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
485 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
486 final ReadyTransactionReply readyReply = ReadyTransactionReply
487 .fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
488 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
489 // Send the CanCommitTransaction message for the first Tx.
491 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
492 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
493 .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
494 assertEquals("Can commit", true, canCommitReply.getCanCommit());
496 // Ready 2 more Tx's.
498 shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
499 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
500 expectMsgClass(duration, ReadyTransactionReply.class);
503 prepareBatchedModifications(transactionID3,
504 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
505 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
506 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
508 expectMsgClass(duration, ReadyTransactionReply.class);
510 // Send the CanCommitTransaction message for the next 2 Tx's.
511 // These should get queued and
512 // processed after the first Tx completes.
514 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
515 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
517 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
518 new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
520 // Send the CommitTransaction message for the first Tx. After it
521 // completes, it should
522 // trigger the 2nd Tx to proceed which should in turn then
525 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
526 expectMsgClass(duration, CommitTransactionReply.class);
528 // Wait for the next 2 Tx's to complete.
530 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
532 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
534 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
536 final Throwable t = caughtEx.get();
538 Throwables.propagateIfPossible(t, Exception.class);
539 throw new RuntimeException(t);
542 assertEquals("Commits complete", true, done);
544 // final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
545 // cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
546 // cohort3.getPreCommit(), cohort3.getCommit());
547 // inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
548 // inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
549 // inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
550 // inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
551 // inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
552 // inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
553 // inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
554 // inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
555 // inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
557 // Verify data in the data store.
559 verifyOuterListEntry(shard, 1);
561 verifyLastApplied(shard, 5);
567 public void testBatchedModificationsWithNoCommitOnReady() {
568 new ShardTestKit(getSystem()) {
570 final TestActorRef<Shard> shard = actorFactory.createTestActor(
571 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
572 "testBatchedModificationsWithNoCommitOnReady");
574 waitUntilLeader(shard);
576 final TransactionIdentifier transactionID = nextTransactionId();
577 final FiniteDuration duration = duration("5 seconds");
579 // Send a BatchedModifications to start a transaction.
581 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
582 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
583 expectMsgClass(duration, BatchedModificationsReply.class);
585 // Send a couple more BatchedModifications.
588 newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
589 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
591 expectMsgClass(duration, BatchedModificationsReply.class);
593 shard.tell(newBatchedModifications(transactionID,
594 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
595 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
596 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
598 expectMsgClass(duration, ReadyTransactionReply.class);
600 // Send the CanCommitTransaction message.
602 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
603 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
604 .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
605 assertEquals("Can commit", true, canCommitReply.getCanCommit());
607 // Send the CommitTransaction message.
609 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
610 expectMsgClass(duration, CommitTransactionReply.class);
612 // Verify data in the data store.
614 verifyOuterListEntry(shard, 1);
620 public void testBatchedModificationsWithCommitOnReady() {
621 new ShardTestKit(getSystem()) {
623 final TestActorRef<Shard> shard = actorFactory.createTestActor(
624 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
625 "testBatchedModificationsWithCommitOnReady");
627 waitUntilLeader(shard);
629 final TransactionIdentifier transactionID = nextTransactionId();
630 final FiniteDuration duration = duration("5 seconds");
632 // Send a BatchedModifications to start a transaction.
634 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
635 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
636 expectMsgClass(duration, BatchedModificationsReply.class);
638 // Send a couple more BatchedModifications.
640 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
641 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
643 expectMsgClass(duration, BatchedModificationsReply.class);
645 shard.tell(newBatchedModifications(transactionID,
646 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
647 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
648 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
651 expectMsgClass(duration, CommitTransactionReply.class);
653 // Verify data in the data store.
655 verifyOuterListEntry(shard, 1);
660 @Test(expected = IllegalStateException.class)
661 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
662 new ShardTestKit(getSystem()) {
664 final TestActorRef<Shard> shard = actorFactory.createTestActor(
665 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
666 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
668 waitUntilLeader(shard);
670 final TransactionIdentifier transactionID = nextTransactionId();
671 final BatchedModifications batched = new BatchedModifications(transactionID,
672 DataStoreVersions.CURRENT_VERSION);
674 batched.setTotalMessagesSent(2);
676 shard.tell(batched, getRef());
678 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
680 if (failure != null) {
681 Throwables.propagateIfPossible(failure.cause(), Exception.class);
682 throw new RuntimeException(failure.cause());
689 public void testBatchedModificationsWithOperationFailure() {
690 new ShardTestKit(getSystem()) {
692 final TestActorRef<Shard> shard = actorFactory.createTestActor(
693 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
694 "testBatchedModificationsWithOperationFailure");
696 waitUntilLeader(shard);
698 // Test merge with invalid data. An exception should occur when
699 // the merge is applied. Note that
700 // write will not validate the children for performance reasons.
702 final TransactionIdentifier transactionID = nextTransactionId();
704 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
705 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
706 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
708 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
709 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
710 shard.tell(batched, getRef());
711 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
713 final Throwable cause = failure.cause();
715 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
717 batched.setTotalMessagesSent(2);
719 shard.tell(batched, getRef());
721 failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
722 assertEquals("Failure cause", cause, failure.cause());
728 public void testBatchedModificationsOnTransactionChain() {
729 new ShardTestKit(getSystem()) {
731 final TestActorRef<Shard> shard = actorFactory.createTestActor(
732 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
733 "testBatchedModificationsOnTransactionChain");
735 waitUntilLeader(shard);
737 final LocalHistoryIdentifier historyId = nextHistoryId();
738 final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
739 final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
741 final FiniteDuration duration = duration("5 seconds");
743 // Send a BatchedModifications to start a chained write
744 // transaction and ready it.
746 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
747 final YangInstanceIdentifier path = TestModel.TEST_PATH;
748 shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), getRef());
749 expectMsgClass(duration, ReadyTransactionReply.class);
751 // Create a read Tx on the same chain.
753 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
754 DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
756 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"),
757 CreateTransactionReply.class);
759 getSystem().actorSelection(createReply.getTransactionPath())
760 .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
761 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
762 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
764 // Commit the write transaction.
766 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
767 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
768 .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
769 assertEquals("Can commit", true, canCommitReply.getCanCommit());
771 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
772 expectMsgClass(duration, CommitTransactionReply.class);
774 // Verify data in the data store.
776 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
777 assertEquals("Stored node", containerNode, actualNode);
783 public void testOnBatchedModificationsWhenNotLeader() {
784 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
785 new ShardTestKit(getSystem()) {
787 final Creator<Shard> creator = new Creator<Shard>() {
788 private static final long serialVersionUID = 1L;
791 public Shard create() {
792 return new Shard(newShardBuilder()) {
794 protected boolean isLeader() {
795 return overrideLeaderCalls.get() ? false : super.isLeader();
799 public ActorSelection getLeader() {
800 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path())
807 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
808 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
809 "testOnBatchedModificationsWhenNotLeader");
811 waitUntilLeader(shard);
813 overrideLeaderCalls.set(true);
815 final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
816 DataStoreVersions.CURRENT_VERSION);
818 shard.tell(batched, ActorRef.noSender());
820 expectMsgEquals(batched);
826 public void testTransactionMessagesWithNoLeader() {
827 new ShardTestKit(getSystem()) {
829 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
830 .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
831 final TestActorRef<Shard> shard = actorFactory.createTestActor(
832 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
833 "testTransactionMessagesWithNoLeader");
835 waitUntilNoLeader(shard);
837 final TransactionIdentifier txId = nextTransactionId();
838 shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef());
839 Failure failure = expectMsgClass(Failure.class);
840 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
842 shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
843 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
844 failure = expectMsgClass(Failure.class);
845 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
847 shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
849 failure = expectMsgClass(Failure.class);
850 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
856 public void testReadyWithReadWriteImmediateCommit() {
857 testReadyWithImmediateCommit(true);
861 public void testReadyWithWriteOnlyImmediateCommit() {
862 testReadyWithImmediateCommit(false);
865 private void testReadyWithImmediateCommit(final boolean readWrite) {
866 new ShardTestKit(getSystem()) {
868 final TestActorRef<Shard> shard = actorFactory.createTestActor(
869 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
870 "testReadyWithImmediateCommit-" + readWrite);
872 waitUntilLeader(shard);
874 final TransactionIdentifier transactionID = nextTransactionId();
875 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
877 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
878 containerNode, true), getRef());
880 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
884 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
886 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
887 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
893 public void testReadyLocalTransactionWithImmediateCommit() {
894 new ShardTestKit(getSystem()) {
896 final TestActorRef<Shard> shard = actorFactory.createTestActor(
897 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
898 "testReadyLocalTransactionWithImmediateCommit");
900 waitUntilLeader(shard);
902 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
904 final DataTreeModification modification = dataStore.newModification();
906 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
907 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
908 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
909 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
911 final TransactionIdentifier txId = nextTransactionId();
912 modification.ready();
913 final ReadyLocalTransaction readyMessage =
914 new ReadyLocalTransaction(txId, modification, true, Optional.empty());
916 shard.tell(readyMessage, getRef());
918 expectMsgClass(CommitTransactionReply.class);
920 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
921 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
927 public void testReadyLocalTransactionWithThreePhaseCommit() {
928 new ShardTestKit(getSystem()) {
930 final TestActorRef<Shard> shard = actorFactory.createTestActor(
931 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
932 "testReadyLocalTransactionWithThreePhaseCommit");
934 waitUntilLeader(shard);
936 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
938 final DataTreeModification modification = dataStore.newModification();
940 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
941 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
942 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
943 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
945 final TransactionIdentifier txId = nextTransactionId();
946 modification.ready();
947 final ReadyLocalTransaction readyMessage =
948 new ReadyLocalTransaction(txId, modification, false, Optional.empty());
950 shard.tell(readyMessage, getRef());
952 expectMsgClass(ReadyTransactionReply.class);
954 // Send the CanCommitTransaction message.
956 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
957 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
958 .fromSerializable(expectMsgClass(CanCommitTransactionReply.class));
959 assertEquals("Can commit", true, canCommitReply.getCanCommit());
961 // Send the CanCommitTransaction message.
963 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
964 expectMsgClass(CommitTransactionReply.class);
966 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
967 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
973 public void testReadWriteCommitWithPersistenceDisabled() {
974 dataStoreContextBuilder.persistent(false);
975 new ShardTestKit(getSystem()) {
977 final TestActorRef<Shard> shard = actorFactory.createTestActor(
978 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
979 "testCommitWithPersistenceDisabled");
981 waitUntilLeader(shard);
983 // Setup a simulated transactions with a mock cohort.
985 final FiniteDuration duration = duration("5 seconds");
987 final TransactionIdentifier transactionID = nextTransactionId();
988 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
989 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
991 expectMsgClass(duration, ReadyTransactionReply.class);
993 // Send the CanCommitTransaction message.
995 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
996 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
997 .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
998 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1000 // Send the CanCommitTransaction message.
1002 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1003 expectMsgClass(duration, CommitTransactionReply.class);
1005 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1006 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1012 public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
1013 testCommitWhenTransactionHasModifications(true);
1017 public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
1018 testCommitWhenTransactionHasModifications(false);
1021 private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
1022 new ShardTestKit(getSystem()) {
1024 final DataTree dataTree = createDelegatingMockDataTree();
1025 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1026 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1027 "testCommitWhenTransactionHasModifications-" + readWrite);
1029 waitUntilLeader(shard);
1031 final FiniteDuration duration = duration("5 seconds");
1032 final TransactionIdentifier transactionID = nextTransactionId();
1035 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
1036 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1038 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1039 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1042 expectMsgClass(duration, ReadyTransactionReply.class);
1044 // Send the CanCommitTransaction message.
1046 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1047 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1048 .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1049 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1051 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1052 expectMsgClass(duration, CommitTransactionReply.class);
1054 final InOrder inOrder = inOrder(dataTree);
1055 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1056 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1057 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1059 // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1061 Thread.sleep(HEARTBEAT_MILLIS * 2);
1063 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1064 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1066 // Use MBean for verification
1067 // Committed transaction count should increase as usual
1068 assertEquals(1, shardStats.getCommittedTransactionsCount());
1070 // Commit index should advance as we do not have an empty
1072 assertEquals(1, shardStats.getCommitIndex());
1078 public void testCommitPhaseFailure() throws Exception {
1079 new ShardTestKit(getSystem()) {
1081 final DataTree dataTree = createDelegatingMockDataTree();
1082 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1083 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1084 "testCommitPhaseFailure");
1086 waitUntilLeader(shard);
1088 final FiniteDuration duration = duration("5 seconds");
1089 final Timeout timeout = new Timeout(duration);
1091 // Setup 2 simulated transactions with mock cohorts. The first
1095 doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1096 .commit(any(DataTreeCandidate.class));
1098 final TransactionIdentifier transactionID1 = nextTransactionId();
1099 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1100 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1101 expectMsgClass(duration, ReadyTransactionReply.class);
1103 final TransactionIdentifier transactionID2 = nextTransactionId();
1104 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1105 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1106 expectMsgClass(duration, ReadyTransactionReply.class);
1108 // Send the CanCommitTransaction message for the first Tx.
1110 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1111 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1112 .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1113 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1115 // Send the CanCommitTransaction message for the 2nd Tx. This
1116 // should get queued and
1117 // processed after the first Tx completes.
1119 final Future<Object> canCommitFuture = Patterns.ask(shard,
1120 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1122 // Send the CommitTransaction message for the first Tx. This
1123 // should send back an error
1124 // and trigger the 2nd Tx to proceed.
1126 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1127 expectMsgClass(duration, akka.actor.Status.Failure.class);
1129 // Wait for the 2nd Tx to complete the canCommit phase.
1131 final CountDownLatch latch = new CountDownLatch(1);
1132 canCommitFuture.onComplete(new OnComplete<Object>() {
1134 public void onComplete(final Throwable failure, final Object resp) {
1137 }, getSystem().dispatcher());
1139 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1141 final InOrder inOrder = inOrder(dataTree);
1142 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1143 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1145 // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1146 // validate performs wrapping and we capture that mock
1147 // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1149 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1155 public void testPreCommitPhaseFailure() throws Exception {
1156 new ShardTestKit(getSystem()) {
1158 final DataTree dataTree = createDelegatingMockDataTree();
1159 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1160 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1161 "testPreCommitPhaseFailure");
1163 waitUntilLeader(shard);
1165 final FiniteDuration duration = duration("5 seconds");
1166 final Timeout timeout = new Timeout(duration);
1168 doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1169 .prepare(any(DataTreeModification.class));
1171 final TransactionIdentifier transactionID1 = nextTransactionId();
1172 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1173 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1174 expectMsgClass(duration, ReadyTransactionReply.class);
1176 final TransactionIdentifier transactionID2 = nextTransactionId();
1177 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1178 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1179 expectMsgClass(duration, ReadyTransactionReply.class);
1181 // Send the CanCommitTransaction message for the first Tx.
1183 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1184 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1185 .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1186 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1188 // Send the CanCommitTransaction message for the 2nd Tx. This
1189 // should get queued and
1190 // processed after the first Tx completes.
1192 final Future<Object> canCommitFuture = Patterns.ask(shard,
1193 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1195 // Send the CommitTransaction message for the first Tx. This
1196 // should send back an error
1197 // and trigger the 2nd Tx to proceed.
1199 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1200 expectMsgClass(duration, akka.actor.Status.Failure.class);
1202 // Wait for the 2nd Tx to complete the canCommit phase.
1204 final CountDownLatch latch = new CountDownLatch(1);
1205 canCommitFuture.onComplete(new OnComplete<Object>() {
1207 public void onComplete(final Throwable failure, final Object resp) {
1210 }, getSystem().dispatcher());
1212 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1214 final InOrder inOrder = inOrder(dataTree);
1215 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1216 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1217 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1223 public void testCanCommitPhaseFailure() throws Exception {
1224 new ShardTestKit(getSystem()) {
1226 final DataTree dataTree = createDelegatingMockDataTree();
1227 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1228 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1229 "testCanCommitPhaseFailure");
1231 waitUntilLeader(shard);
1233 final FiniteDuration duration = duration("5 seconds");
1234 final TransactionIdentifier transactionID1 = nextTransactionId();
1236 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1237 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1239 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1240 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1241 expectMsgClass(duration, ReadyTransactionReply.class);
1243 // Send the CanCommitTransaction message.
1245 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1246 expectMsgClass(duration, akka.actor.Status.Failure.class);
1248 // Send another can commit to ensure the failed one got cleaned
1251 final TransactionIdentifier transactionID2 = nextTransactionId();
1252 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1253 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1254 expectMsgClass(duration, ReadyTransactionReply.class);
1256 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1257 final CanCommitTransactionReply reply = CanCommitTransactionReply
1258 .fromSerializable(expectMsgClass(CanCommitTransactionReply.class));
1259 assertEquals("getCanCommit", true, reply.getCanCommit());
1265 public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1266 testImmediateCommitWithCanCommitPhaseFailure(true);
1267 testImmediateCommitWithCanCommitPhaseFailure(false);
1270 private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1271 new ShardTestKit(getSystem()) {
1273 final DataTree dataTree = createDelegatingMockDataTree();
1274 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1275 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1276 "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1278 waitUntilLeader(shard);
1280 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1281 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1283 final FiniteDuration duration = duration("5 seconds");
1285 final TransactionIdentifier transactionID1 = nextTransactionId();
1288 shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1289 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1291 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1292 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1295 expectMsgClass(duration, akka.actor.Status.Failure.class);
1297 // Send another can commit to ensure the failed one got cleaned
1300 final TransactionIdentifier transactionID2 = nextTransactionId();
1302 shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1303 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1305 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1306 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1309 expectMsgClass(duration, CommitTransactionReply.class);
1315 public void testAbortWithCommitPending() {
1316 new ShardTestKit(getSystem()) {
1318 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1320 void persistPayload(final Identifier id, final Payload payload,
1321 final boolean batchHint) {
1322 // Simulate an AbortTransaction message occurring during
1323 // replication, after
1324 // persisting and before finishing the commit to the
1327 doAbortTransaction(id, null);
1328 super.persistPayload(id, payload, batchHint);
1332 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1333 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1334 "testAbortWithCommitPending");
1336 waitUntilLeader(shard);
1338 final FiniteDuration duration = duration("5 seconds");
1340 final TransactionIdentifier transactionID = nextTransactionId();
1342 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1343 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1344 expectMsgClass(duration, ReadyTransactionReply.class);
1346 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1347 expectMsgClass(duration, CanCommitTransactionReply.class);
1349 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1350 expectMsgClass(duration, CommitTransactionReply.class);
1352 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1354 // Since we're simulating an abort occurring during replication
1355 // and before finish commit,
1356 // the data should still get written to the in-memory store
1357 // since we've gotten past
1358 // canCommit and preCommit and persisted the data.
1359 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1365 public void testTransactionCommitTimeout() throws Exception {
1366 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1367 new ShardTestKit(getSystem()) {
1369 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1370 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1371 "testTransactionCommitTimeout");
1373 waitUntilLeader(shard);
1375 final FiniteDuration duration = duration("5 seconds");
1377 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1378 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1379 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1381 // Ready 2 Tx's - the first will timeout
1383 final TransactionIdentifier transactionID1 = nextTransactionId();
1385 prepareBatchedModifications(transactionID1,
1386 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1387 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1388 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1390 expectMsgClass(duration, ReadyTransactionReply.class);
1392 final TransactionIdentifier transactionID2 = nextTransactionId();
1393 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1394 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1396 prepareBatchedModifications(transactionID2, listNodePath,
1397 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false),
1399 expectMsgClass(duration, ReadyTransactionReply.class);
1401 // canCommit 1st Tx. We don't send the commit so it should
1404 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1405 expectMsgClass(duration, CanCommitTransactionReply.class);
1407 // canCommit the 2nd Tx - it should complete after the 1st Tx
1410 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1411 expectMsgClass(duration, CanCommitTransactionReply.class);
1413 // Try to commit the 1st Tx - should fail as it's not the
1416 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1417 expectMsgClass(duration, akka.actor.Status.Failure.class);
1419 // Commit the 2nd Tx.
1421 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1422 expectMsgClass(duration, CommitTransactionReply.class);
1424 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1425 assertNotNull(listNodePath + " not found", node);
1432 // public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1433 // dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1435 // new ShardTestKit(getSystem()) {{
1436 // final TestActorRef<Shard> shard = actorFactory.createTestActor(
1437 // newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1438 // "testTransactionCommitQueueCapacityExceeded");
1440 // waitUntilLeader(shard);
1442 // final FiniteDuration duration = duration("5 seconds");
1444 // final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1446 // final TransactionIdentifier transactionID1 = nextTransactionId();
1447 // final MutableCompositeModification modification1 = new MutableCompositeModification();
1448 // final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1449 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1452 // final TransactionIdentifier transactionID2 = nextTransactionId();
1453 // final MutableCompositeModification modification2 = new MutableCompositeModification();
1454 // final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1455 // TestModel.OUTER_LIST_PATH,
1456 // ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1459 // final TransactionIdentifier transactionID3 = nextTransactionId();
1460 // final MutableCompositeModification modification3 = new MutableCompositeModification();
1461 // final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1462 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1465 // // Ready the Tx's
1467 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1468 // modification1), getRef());
1469 // expectMsgClass(duration, ReadyTransactionReply.class);
1471 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1472 // modification2), getRef());
1473 // expectMsgClass(duration, ReadyTransactionReply.class);
1475 // // The 3rd Tx should exceed queue capacity and fail.
1477 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1478 // modification3), getRef());
1479 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1481 // // canCommit 1st Tx.
1483 // shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1484 // expectMsgClass(duration, CanCommitTransactionReply.class);
1486 // // canCommit the 2nd Tx - it should get queued.
1488 // shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1490 // // canCommit the 3rd Tx - should exceed queue capacity and fail.
1492 // shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1493 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1498 public void testTransactionCommitWithPriorExpiredCohortEntries() {
1499 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1500 new ShardTestKit(getSystem()) {
1502 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1503 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1504 "testTransactionCommitWithPriorExpiredCohortEntries");
1506 waitUntilLeader(shard);
1508 final FiniteDuration duration = duration("5 seconds");
1510 final TransactionIdentifier transactionID1 = nextTransactionId();
1511 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1512 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1513 expectMsgClass(duration, ReadyTransactionReply.class);
1515 final TransactionIdentifier transactionID2 = nextTransactionId();
1516 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1517 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1518 expectMsgClass(duration, ReadyTransactionReply.class);
1520 final TransactionIdentifier transactionID3 = nextTransactionId();
1521 shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1522 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1523 expectMsgClass(duration, ReadyTransactionReply.class);
1525 // All Tx's are readied. We'll send canCommit for the last one
1526 // but not the others. The others
1527 // should expire from the queue and the last one should be
1530 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1531 expectMsgClass(duration, CanCommitTransactionReply.class);
1537 public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1538 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1539 new ShardTestKit(getSystem()) {
1541 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1542 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1543 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1545 waitUntilLeader(shard);
1547 final FiniteDuration duration = duration("5 seconds");
1549 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1551 final TransactionIdentifier transactionID1 = nextTransactionId();
1552 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1553 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1554 expectMsgClass(duration, ReadyTransactionReply.class);
1556 // CanCommit the first Tx so it's the current in-progress Tx.
1558 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1559 expectMsgClass(duration, CanCommitTransactionReply.class);
1561 // Ready the second Tx.
1563 final TransactionIdentifier transactionID2 = nextTransactionId();
1564 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1565 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1566 expectMsgClass(duration, ReadyTransactionReply.class);
1568 // Ready the third Tx.
1570 final TransactionIdentifier transactionID3 = nextTransactionId();
1571 final DataTreeModification modification3 = dataStore.newModification();
1572 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1573 .apply(modification3);
1574 modification3.ready();
1575 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1576 true, Optional.empty());
1577 shard.tell(readyMessage, getRef());
1579 // Commit the first Tx. After completing, the second should
1580 // expire from the queue and the third
1583 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1584 expectMsgClass(duration, CommitTransactionReply.class);
1586 // Expect commit reply from the third Tx.
1588 expectMsgClass(duration, CommitTransactionReply.class);
1590 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1591 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1597 public void testCanCommitBeforeReadyFailure() {
1598 new ShardTestKit(getSystem()) {
1600 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1601 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1602 "testCanCommitBeforeReadyFailure");
1604 shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
1605 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1611 public void testAbortAfterCanCommit() throws Exception {
1612 new ShardTestKit(getSystem()) {
1614 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1615 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1617 waitUntilLeader(shard);
1619 final FiniteDuration duration = duration("5 seconds");
1620 final Timeout timeout = new Timeout(duration);
1622 // Ready 2 transactions - the first one will be aborted.
1624 final TransactionIdentifier transactionID1 = nextTransactionId();
1625 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1626 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1627 expectMsgClass(duration, ReadyTransactionReply.class);
1629 final TransactionIdentifier transactionID2 = nextTransactionId();
1630 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1631 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1632 expectMsgClass(duration, ReadyTransactionReply.class);
1634 // Send the CanCommitTransaction message for the first Tx.
1636 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1637 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1638 .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1639 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1641 // Send the CanCommitTransaction message for the 2nd Tx. This
1642 // should get queued and
1643 // processed after the first Tx completes.
1645 final Future<Object> canCommitFuture = Patterns.ask(shard,
1646 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1648 // Send the AbortTransaction message for the first Tx. This
1649 // should trigger the 2nd
1652 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1653 expectMsgClass(duration, AbortTransactionReply.class);
1655 // Wait for the 2nd Tx to complete the canCommit phase.
1657 canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
1658 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1664 public void testAbortAfterReady() {
1665 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1666 new ShardTestKit(getSystem()) {
1668 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1669 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1671 waitUntilLeader(shard);
1673 final FiniteDuration duration = duration("5 seconds");
1677 final TransactionIdentifier transactionID1 = nextTransactionId();
1678 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1679 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1680 expectMsgClass(duration, ReadyTransactionReply.class);
1682 // Send the AbortTransaction message.
1684 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1685 expectMsgClass(duration, AbortTransactionReply.class);
1687 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1689 // Now send CanCommitTransaction - should fail.
1691 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1692 final Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1693 assertTrue("Failure type", failure instanceof IllegalStateException);
1695 // Ready and CanCommit another and verify success.
1697 final TransactionIdentifier transactionID2 = nextTransactionId();
1698 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1699 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1700 expectMsgClass(duration, ReadyTransactionReply.class);
1702 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1703 expectMsgClass(duration, CanCommitTransactionReply.class);
1709 public void testAbortQueuedTransaction() {
1710 new ShardTestKit(getSystem()) {
1712 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1713 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1715 waitUntilLeader(shard);
1717 final FiniteDuration duration = duration("5 seconds");
1721 final TransactionIdentifier transactionID1 = nextTransactionId();
1722 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1723 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1724 expectMsgClass(duration, ReadyTransactionReply.class);
1726 final TransactionIdentifier transactionID2 = nextTransactionId();
1727 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1728 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1729 expectMsgClass(duration, ReadyTransactionReply.class);
1731 final TransactionIdentifier transactionID3 = nextTransactionId();
1733 newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1734 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1),
1736 expectMsgClass(duration, ReadyTransactionReply.class);
1738 // Abort the second tx while it's queued.
1740 shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1741 expectMsgClass(duration, AbortTransactionReply.class);
1743 // Commit the other 2.
1745 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1746 expectMsgClass(duration, CanCommitTransactionReply.class);
1748 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1749 expectMsgClass(duration, CommitTransactionReply.class);
1751 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1752 expectMsgClass(duration, CanCommitTransactionReply.class);
1754 shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1755 expectMsgClass(duration, CommitTransactionReply.class);
1757 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1763 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1764 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1768 public void testCreateSnapshot() throws Exception {
1769 testCreateSnapshot(true, "testCreateSnapshot");
1772 private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1773 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1775 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1776 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1777 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1782 public void saveSnapshot(final Object obj) {
1783 savedSnapshot.set(obj);
1784 super.saveSnapshot(obj);
1788 dataStoreContextBuilder.persistent(persistent);
1790 class TestShard extends Shard {
1792 protected TestShard(final AbstractBuilder<?, ?> builder) {
1794 setPersistence(new TestPersistentDataProvider(super.persistence()));
1798 public void handleCommand(final Object message) {
1799 super.handleCommand(message);
1801 // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1802 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1803 latch.get().countDown();
1808 public RaftActorContext getRaftActorContext() {
1809 return super.getRaftActorContext();
1813 new ShardTestKit(getSystem()) {
1815 final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1817 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1818 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1821 waitUntilLeader(shard);
1822 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1824 final NormalizedNode<?, ?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
1826 // Trigger creation of a snapshot by ensuring
1827 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1828 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1829 awaitAndValidateSnapshot(expectedRoot);
1831 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1832 awaitAndValidateSnapshot(expectedRoot);
1835 private void awaitAndValidateSnapshot(final NormalizedNode<?, ?> expectedRoot)
1836 throws InterruptedException {
1837 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1839 assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1841 verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1843 latch.set(new CountDownLatch(1));
1844 savedSnapshot.set(null);
1847 private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot) {
1848 final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot()
1849 .getRootNode().get();
1850 assertEquals("Root node", expectedRoot, actual);
1856 * This test simply verifies that the applySnapShot logic will work.
1859 public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1860 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1863 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1864 putTransaction.write(TestModel.TEST_PATH,
1865 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1866 commitTransaction(store, putTransaction);
1869 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
1871 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1873 writeTransaction.delete(YangInstanceIdentifier.EMPTY);
1874 writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
1876 commitTransaction(store, writeTransaction);
1878 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
1880 assertEquals(expected, actual);
1884 public void testRecoveryApplicable() {
1886 final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1887 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1889 final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1890 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1892 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1893 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1895 final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1896 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1898 new ShardTestKit(getSystem()) {
1900 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1902 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1904 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1906 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1912 public void testOnDatastoreContext() {
1913 new ShardTestKit(getSystem()) {
1915 dataStoreContextBuilder.persistent(true);
1917 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(),
1918 "testOnDatastoreContext");
1920 assertEquals("isRecoveryApplicable", true,
1921 shard.underlyingActor().persistence().isRecoveryApplicable());
1923 waitUntilLeader(shard);
1925 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1927 assertEquals("isRecoveryApplicable", false,
1928 shard.underlyingActor().persistence().isRecoveryApplicable());
1930 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1932 assertEquals("isRecoveryApplicable", true,
1933 shard.underlyingActor().persistence().isRecoveryApplicable());
1939 public void testRegisterRoleChangeListener() {
1940 new ShardTestKit(getSystem()) {
1942 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1943 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1944 "testRegisterRoleChangeListener");
1946 waitUntilLeader(shard);
1948 final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1950 shard.tell(new RegisterRoleChangeListener(), listener);
1952 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1954 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1955 ShardLeaderStateChanged.class);
1956 assertEquals("getLocalShardDataTree present", true,
1957 leaderStateChanged.getLocalShardDataTree().isPresent());
1958 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
1959 leaderStateChanged.getLocalShardDataTree().get());
1961 MessageCollectorActor.clearMessages(listener);
1963 // Force a leader change
1965 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
1967 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1968 ShardLeaderStateChanged.class);
1969 assertEquals("getLocalShardDataTree present", false,
1970 leaderStateChanged.getLocalShardDataTree().isPresent());
1976 public void testFollowerInitialSyncStatus() {
1977 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1978 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1979 "testFollowerInitialSyncStatus");
1981 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1982 "member-1-shard-inventory-operational"));
1984 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1986 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1987 "member-1-shard-inventory-operational"));
1989 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1993 public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1994 new ShardTestKit(getSystem()) {
1996 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1997 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1998 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2000 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2001 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
2002 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2004 setupInMemorySnapshotStore();
2006 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2007 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2008 actorFactory.generateActorId(testName + "-shard"));
2010 waitUntilNoLeader(shard);
2012 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2013 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2014 RegisterDataTreeNotificationListenerReply.class);
2015 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2017 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
2018 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2020 listener.waitForChangeEvents();
2026 public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
2027 new ShardTestKit(getSystem()) {
2029 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
2030 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
2031 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2033 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
2034 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
2035 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2037 setupInMemorySnapshotStore();
2039 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2040 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2041 actorFactory.generateActorId(testName + "-shard"));
2043 waitUntilNoLeader(shard);
2045 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2046 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2047 RegisterDataTreeNotificationListenerReply.class);
2048 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2050 final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
2051 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
2052 expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
2054 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
2055 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2057 listener.expectNoMoreChanges("Received unexpected change after close");
2063 public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2064 new ShardTestKit(getSystem()) {
2066 final String testName = "testClusteredDataTreeChangeListenerRegistration";
2067 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2068 MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2070 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2071 MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2073 final TestActorRef<Shard> followerShard = actorFactory
2074 .createTestActor(Shard.builder().id(followerShardID)
2075 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
2076 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2077 "akka://test/user/" + leaderShardID.toString()))
2078 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
2079 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2081 final TestActorRef<Shard> leaderShard = actorFactory
2082 .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
2083 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
2084 "akka://test/user/" + followerShardID.toString()))
2085 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
2086 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2088 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
2089 final String leaderPath = waitUntilLeader(followerShard);
2090 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2092 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2093 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2094 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
2095 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2097 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2098 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2099 RegisterDataTreeNotificationListenerReply.class);
2100 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2102 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2104 listener.waitForChangeEvents();
2110 public void testServerRemoved() {
2111 final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
2112 .withDispatcher(Dispatchers.DefaultDispatcherId()));
2114 final ActorRef shard = parent.underlyingActor().context().actorOf(
2115 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2116 "testServerRemoved");
2118 shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2120 MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);