Merge "CDS: Split out TransactionFutureCallback"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.Dispatchers;
17 import akka.dispatch.OnComplete;
18 import akka.japi.Creator;
19 import akka.pattern.Patterns;
20 import akka.persistence.SaveSnapshotSuccess;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.io.IOException;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.Test;
41 import org.mockito.InOrder;
42 import org.opendaylight.controller.cluster.DataPersistenceProvider;
43 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
44 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
61 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
62 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
63 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
64 import org.opendaylight.controller.cluster.datastore.modification.Modification;
65 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
66 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
67 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
68 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
69 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
70 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
71 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
72 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
73 import org.opendaylight.controller.cluster.raft.RaftActorContext;
74 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
75 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
76 import org.opendaylight.controller.cluster.raft.Snapshot;
77 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
80 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
81 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
82 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
84 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
85 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
86 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
87 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
88 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
89 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
90 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
91 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
92 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
93 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
95 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
97 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
98 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
100 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
101 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
102 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
103 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
104 import scala.concurrent.Await;
105 import scala.concurrent.Future;
106 import scala.concurrent.duration.FiniteDuration;
107
108 public class ShardTest extends AbstractShardTest {
109
110     @Test
111     public void testRegisterChangeListener() throws Exception {
112         new ShardTestKit(getSystem()) {{
113             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
114                     newShardProps(),  "testRegisterChangeListener");
115
116             waitUntilLeader(shard);
117
118             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
119
120             MockDataChangeListener listener = new MockDataChangeListener(1);
121             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
122                     "testRegisterChangeListener-DataChangeListener");
123
124             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
125                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
126
127             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
128                     RegisterChangeListenerReply.class);
129             String replyPath = reply.getListenerRegistrationPath().toString();
130             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
131                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
132
133             YangInstanceIdentifier path = TestModel.TEST_PATH;
134             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
135
136             listener.waitForChangeEvents(path);
137
138             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
139             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
140         }};
141     }
142
143     @SuppressWarnings("serial")
144     @Test
145     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
146         // This test tests the timing window in which a change listener is registered before the
147         // shard becomes the leader. We verify that the listener is registered and notified of the
148         // existing data when the shard becomes the leader.
149         new ShardTestKit(getSystem()) {{
150             // For this test, we want to send the RegisterChangeListener message after the shard
151             // has recovered from persistence and before it becomes the leader. So we subclass
152             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
153             // we know that the shard has been initialized to a follower and has started the
154             // election process. The following 2 CountDownLatches are used to coordinate the
155             // ElectionTimeout with the sending of the RegisterChangeListener message.
156             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
157             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
158             Creator<Shard> creator = new Creator<Shard>() {
159                 boolean firstElectionTimeout = true;
160
161                 @Override
162                 public Shard create() throws Exception {
163                     // Use a non persistent provider because this test actually invokes persist on the journal
164                     // this will cause all other messages to not be queued properly after that.
165                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
166                     // it does do a persist)
167                     return new Shard(shardID, Collections.<String,String>emptyMap(),
168                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
169                         @Override
170                         public void onReceiveCommand(final Object message) throws Exception {
171                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
172                                 // Got the first ElectionTimeout. We don't forward it to the
173                                 // base Shard yet until we've sent the RegisterChangeListener
174                                 // message. So we signal the onFirstElectionTimeout latch to tell
175                                 // the main thread to send the RegisterChangeListener message and
176                                 // start a thread to wait on the onChangeListenerRegistered latch,
177                                 // which the main thread signals after it has sent the message.
178                                 // After the onChangeListenerRegistered is triggered, we send the
179                                 // original ElectionTimeout message to proceed with the election.
180                                 firstElectionTimeout = false;
181                                 final ActorRef self = getSelf();
182                                 new Thread() {
183                                     @Override
184                                     public void run() {
185                                         Uninterruptibles.awaitUninterruptibly(
186                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
187                                         self.tell(message, self);
188                                     }
189                                 }.start();
190
191                                 onFirstElectionTimeout.countDown();
192                             } else {
193                                 super.onReceiveCommand(message);
194                             }
195                         }
196                     };
197                 }
198             };
199
200             MockDataChangeListener listener = new MockDataChangeListener(1);
201             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
202                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
203
204             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
205                     Props.create(new DelegatingShardCreator(creator)),
206                     "testRegisterChangeListenerWhenNotLeaderInitially");
207
208             // Write initial data into the in-memory store.
209             YangInstanceIdentifier path = TestModel.TEST_PATH;
210             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
211
212             // Wait until the shard receives the first ElectionTimeout message.
213             assertEquals("Got first ElectionTimeout", true,
214                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
215
216             // Now send the RegisterChangeListener and wait for the reply.
217             shard.tell(new RegisterChangeListener(path, dclActor,
218                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
219
220             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
221                     RegisterChangeListenerReply.class);
222             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
223
224             // Sanity check - verify the shard is not the leader yet.
225             shard.tell(new FindLeader(), getRef());
226             FindLeaderReply findLeadeReply =
227                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
228             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
229
230             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
231             // with the election process.
232             onChangeListenerRegistered.countDown();
233
234             // Wait for the shard to become the leader and notify our listener with the existing
235             // data in the store.
236             listener.waitForChangeEvents(path);
237
238             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
239             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
240         }};
241     }
242
243     @Test
244     public void testCreateTransaction(){
245         new ShardTestKit(getSystem()) {{
246             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
247
248             waitUntilLeader(shard);
249
250             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
251
252             shard.tell(new CreateTransaction("txn-1",
253                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
254
255             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
256                     CreateTransactionReply.class);
257
258             String path = reply.getTransactionActorPath().toString();
259             assertTrue("Unexpected transaction path " + path,
260                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
261
262             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
263         }};
264     }
265
266     @Test
267     public void testCreateTransactionOnChain(){
268         new ShardTestKit(getSystem()) {{
269             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
270
271             waitUntilLeader(shard);
272
273             shard.tell(new CreateTransaction("txn-1",
274                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
275                     getRef());
276
277             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
278                     CreateTransactionReply.class);
279
280             String path = reply.getTransactionActorPath().toString();
281             assertTrue("Unexpected transaction path " + path,
282                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
283
284             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
285         }};
286     }
287
288     @SuppressWarnings("serial")
289     @Test
290     public void testPeerAddressResolved() throws Exception {
291         new ShardTestKit(getSystem()) {{
292             final CountDownLatch recoveryComplete = new CountDownLatch(1);
293             class TestShard extends Shard {
294                 TestShard() {
295                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
296                             newDatastoreContext(), SCHEMA_CONTEXT);
297                 }
298
299                 Map<String, String> getPeerAddresses() {
300                     return getRaftActorContext().getPeerAddresses();
301                 }
302
303                 @Override
304                 protected void onRecoveryComplete() {
305                     try {
306                         super.onRecoveryComplete();
307                     } finally {
308                         recoveryComplete.countDown();
309                     }
310                 }
311             }
312
313             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
314                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
315                         @Override
316                         public TestShard create() throws Exception {
317                             return new TestShard();
318                         }
319                     })), "testPeerAddressResolved");
320
321             //waitUntilLeader(shard);
322             assertEquals("Recovery complete", true,
323                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
324
325             String address = "akka://foobar";
326             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
327
328             assertEquals("getPeerAddresses", address,
329                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
330
331             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
332         }};
333     }
334
335     @Test
336     public void testApplySnapshot() throws Exception {
337         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
338                 "testApplySnapshot");
339
340         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
341         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
342
343         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
344
345         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
346         NormalizedNode<?,?> expected = readStore(store, root);
347
348         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
349                 SerializationUtils.serializeNormalizedNode(expected),
350                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
351
352         shard.underlyingActor().onReceiveCommand(applySnapshot);
353
354         NormalizedNode<?,?> actual = readStore(shard, root);
355
356         assertEquals("Root node", expected, actual);
357
358         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
359     }
360
361     @Test
362     public void testApplyState() throws Exception {
363
364         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
365
366         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
367
368         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
369                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
370
371         shard.underlyingActor().onReceiveCommand(applyState);
372
373         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
374         assertEquals("Applied state", node, actual);
375
376         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
377     }
378
379     @Test
380     public void testRecovery() throws Exception {
381
382         // Set up the InMemorySnapshotStore.
383
384         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
385         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
386
387         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
388
389         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
390
391         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
392                 SerializationUtils.serializeNormalizedNode(root),
393                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
394
395         // Set up the InMemoryJournal.
396
397         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
398                   new WriteModification(TestModel.OUTER_LIST_PATH,
399                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
400
401         int nListEntries = 16;
402         Set<Integer> listEntryKeys = new HashSet<>();
403
404         // Add some ModificationPayload entries
405         for(int i = 1; i <= nListEntries; i++) {
406             listEntryKeys.add(Integer.valueOf(i));
407             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
408                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
409             Modification mod = new MergeModification(path,
410                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
411             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
412                     newModificationPayload(mod)));
413         }
414
415         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
416                 new ApplyJournalEntries(nListEntries));
417
418         testRecovery(listEntryKeys);
419     }
420
421     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
422         MutableCompositeModification compMod = new MutableCompositeModification();
423         for(Modification mod: mods) {
424             compMod.addModification(mod);
425         }
426
427         return new ModificationPayload(compMod);
428     }
429
430     @SuppressWarnings({ "unchecked" })
431     @Test
432     public void testConcurrentThreePhaseCommits() throws Throwable {
433         new ShardTestKit(getSystem()) {{
434             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
435                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
436                     "testConcurrentThreePhaseCommits");
437
438             waitUntilLeader(shard);
439
440          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
441
442             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
443
444             String transactionID1 = "tx1";
445             MutableCompositeModification modification1 = new MutableCompositeModification();
446             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
447                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
448
449             String transactionID2 = "tx2";
450             MutableCompositeModification modification2 = new MutableCompositeModification();
451             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
452                     TestModel.OUTER_LIST_PATH,
453                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
454                     modification2);
455
456             String transactionID3 = "tx3";
457             MutableCompositeModification modification3 = new MutableCompositeModification();
458             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
459                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
460                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
461                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
462                     modification3);
463
464             long timeoutSec = 5;
465             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
466             final Timeout timeout = new Timeout(duration);
467
468             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
469             // by the ShardTransaction.
470
471             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
472                     cohort1, modification1, true), getRef());
473             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
474                     expectMsgClass(duration, ReadyTransactionReply.class));
475             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
476
477             // Send the CanCommitTransaction message for the first Tx.
478
479             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
480             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
481                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
482             assertEquals("Can commit", true, canCommitReply.getCanCommit());
483
484             // Send the ForwardedReadyTransaction for the next 2 Tx's.
485
486             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
487                     cohort2, modification2, true), getRef());
488             expectMsgClass(duration, ReadyTransactionReply.class);
489
490             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
491                     cohort3, modification3, true), getRef());
492             expectMsgClass(duration, ReadyTransactionReply.class);
493
494             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
495             // processed after the first Tx completes.
496
497             Future<Object> canCommitFuture1 = Patterns.ask(shard,
498                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
499
500             Future<Object> canCommitFuture2 = Patterns.ask(shard,
501                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
502
503             // Send the CommitTransaction message for the first Tx. After it completes, it should
504             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
505
506             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
507             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
508
509             // Wait for the next 2 Tx's to complete.
510
511             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
512             final CountDownLatch commitLatch = new CountDownLatch(2);
513
514             class OnFutureComplete extends OnComplete<Object> {
515                 private final Class<?> expRespType;
516
517                 OnFutureComplete(final Class<?> expRespType) {
518                     this.expRespType = expRespType;
519                 }
520
521                 @Override
522                 public void onComplete(final Throwable error, final Object resp) {
523                     if(error != null) {
524                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
525                     } else {
526                         try {
527                             assertEquals("Commit response type", expRespType, resp.getClass());
528                             onSuccess(resp);
529                         } catch (Exception e) {
530                             caughtEx.set(e);
531                         }
532                     }
533                 }
534
535                 void onSuccess(final Object resp) throws Exception {
536                 }
537             }
538
539             class OnCommitFutureComplete extends OnFutureComplete {
540                 OnCommitFutureComplete() {
541                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
542                 }
543
544                 @Override
545                 public void onComplete(final Throwable error, final Object resp) {
546                     super.onComplete(error, resp);
547                     commitLatch.countDown();
548                 }
549             }
550
551             class OnCanCommitFutureComplete extends OnFutureComplete {
552                 private final String transactionID;
553
554                 OnCanCommitFutureComplete(final String transactionID) {
555                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
556                     this.transactionID = transactionID;
557                 }
558
559                 @Override
560                 void onSuccess(final Object resp) throws Exception {
561                     CanCommitTransactionReply canCommitReply =
562                             CanCommitTransactionReply.fromSerializable(resp);
563                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
564
565                     Future<Object> commitFuture = Patterns.ask(shard,
566                             new CommitTransaction(transactionID).toSerializable(), timeout);
567                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
568                 }
569             }
570
571             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
572                     getSystem().dispatcher());
573
574             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
575                     getSystem().dispatcher());
576
577             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
578
579             if(caughtEx.get() != null) {
580                 throw caughtEx.get();
581             }
582
583             assertEquals("Commits complete", true, done);
584
585             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
586             inOrder.verify(cohort1).canCommit();
587             inOrder.verify(cohort1).preCommit();
588             inOrder.verify(cohort1).commit();
589             inOrder.verify(cohort2).canCommit();
590             inOrder.verify(cohort2).preCommit();
591             inOrder.verify(cohort2).commit();
592             inOrder.verify(cohort3).canCommit();
593             inOrder.verify(cohort3).preCommit();
594             inOrder.verify(cohort3).commit();
595
596             // Verify data in the data store.
597
598             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
599             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
600             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
601                     outerList.getValue() instanceof Iterable);
602             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
603             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
604                        entry instanceof MapEntryNode);
605             MapEntryNode mapEntry = (MapEntryNode)entry;
606             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
607                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
608             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
609             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
610
611             verifyLastApplied(shard, 2);
612
613             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
614         }};
615     }
616
617     private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
618             NormalizedNode<?, ?> data, boolean ready) {
619         return newBatchedModifications(transactionID, null, path, data, ready);
620     }
621
622     private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
623             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
624         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
625         batched.addModification(new WriteModification(path, data));
626         batched.setReady(ready);
627         return batched;
628     }
629
630     @SuppressWarnings("unchecked")
631     @Test
632     public void testMultipleBatchedModifications() throws Throwable {
633         new ShardTestKit(getSystem()) {{
634             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
635                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
636                     "testMultipleBatchedModifications");
637
638             waitUntilLeader(shard);
639
640             final String transactionID = "tx";
641             FiniteDuration duration = duration("5 seconds");
642
643             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
644             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
645                 @Override
646                 public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
647                     if(mockCohort.get() == null) {
648                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
649                     }
650
651                     return mockCohort.get();
652                 }
653             };
654
655             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
656
657             // Send a BatchedModifications to start a transaction.
658
659             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
660                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
661             expectMsgClass(duration, BatchedModificationsReply.class);
662
663             // Send a couple more BatchedModifications.
664
665             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
666                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
667             expectMsgClass(duration, BatchedModificationsReply.class);
668
669             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
670                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
671                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
672             expectMsgClass(duration, ReadyTransactionReply.class);
673
674             // Send the CanCommitTransaction message.
675
676             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
677             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
678                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
679             assertEquals("Can commit", true, canCommitReply.getCanCommit());
680
681             // Send the CanCommitTransaction message.
682
683             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
684             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
685
686             InOrder inOrder = inOrder(mockCohort.get());
687             inOrder.verify(mockCohort.get()).canCommit();
688             inOrder.verify(mockCohort.get()).preCommit();
689             inOrder.verify(mockCohort.get()).commit();
690
691             // Verify data in the data store.
692
693             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
694             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
695             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
696                     outerList.getValue() instanceof Iterable);
697             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
698             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
699                        entry instanceof MapEntryNode);
700             MapEntryNode mapEntry = (MapEntryNode)entry;
701             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
702                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
703             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
704             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
705
706             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
707         }};
708     }
709
710     @Test
711     public void testBatchedModificationsOnTransactionChain() throws Throwable {
712         new ShardTestKit(getSystem()) {{
713             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
714                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
715                     "testBatchedModificationsOnTransactionChain");
716
717             waitUntilLeader(shard);
718
719             String transactionChainID = "txChain";
720             String transactionID1 = "tx1";
721             String transactionID2 = "tx2";
722
723             FiniteDuration duration = duration("5 seconds");
724
725             // Send a BatchedModifications to start a chained write transaction and ready it.
726
727             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
728             YangInstanceIdentifier path = TestModel.TEST_PATH;
729             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
730                     containerNode, true), getRef());
731             expectMsgClass(duration, ReadyTransactionReply.class);
732
733             // Create a read Tx on the same chain.
734
735             shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
736                     transactionChainID).toSerializable(), getRef());
737
738             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
739
740             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
741             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
742             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
743
744             // Commit the write transaction.
745
746             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
747             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
748                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
749             assertEquals("Can commit", true, canCommitReply.getCanCommit());
750
751             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
752             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
753
754             // Verify data in the data store.
755
756             NormalizedNode<?, ?> actualNode = readStore(shard, path);
757             assertEquals("Stored node", containerNode, actualNode);
758
759             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
760         }};
761     }
762
763     @Test
764     public void testOnBatchedModificationsWhenNotLeader() {
765         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
766         new ShardTestKit(getSystem()) {{
767             Creator<Shard> creator = new Creator<Shard>() {
768                 @Override
769                 public Shard create() throws Exception {
770                     return new Shard(shardID, Collections.<String,String>emptyMap(),
771                             newDatastoreContext(), SCHEMA_CONTEXT) {
772                         @Override
773                         protected boolean isLeader() {
774                             return overrideLeaderCalls.get() ? false : super.isLeader();
775                         }
776
777                         @Override
778                         protected ActorSelection getLeader() {
779                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
780                                 super.getLeader();
781                         }
782                     };
783                 }
784             };
785
786             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
787                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
788
789             waitUntilLeader(shard);
790
791             overrideLeaderCalls.set(true);
792
793             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
794
795             shard.tell(batched, ActorRef.noSender());
796
797             expectMsgEquals(batched);
798
799             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
800         }};
801     }
802
803     @Test
804     public void testCommitWithPersistenceDisabled() throws Throwable {
805         dataStoreContextBuilder.persistent(false);
806         new ShardTestKit(getSystem()) {{
807             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
808                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
809                     "testCommitWithPersistenceDisabled");
810
811             waitUntilLeader(shard);
812
813             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
814
815             // Setup a simulated transactions with a mock cohort.
816
817             String transactionID = "tx";
818             MutableCompositeModification modification = new MutableCompositeModification();
819             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
820             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
821                     TestModel.TEST_PATH, containerNode, modification);
822
823             FiniteDuration duration = duration("5 seconds");
824
825             // Simulate the ForwardedReadyTransaction messages that would be sent
826             // by the ShardTransaction.
827
828             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
829                     cohort, modification, true), getRef());
830             expectMsgClass(duration, ReadyTransactionReply.class);
831
832             // Send the CanCommitTransaction message.
833
834             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
835             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
836                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
837             assertEquals("Can commit", true, canCommitReply.getCanCommit());
838
839             // Send the CanCommitTransaction message.
840
841             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
842             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
843
844             InOrder inOrder = inOrder(cohort);
845             inOrder.verify(cohort).canCommit();
846             inOrder.verify(cohort).preCommit();
847             inOrder.verify(cohort).commit();
848
849             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
850             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
851
852             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
853         }};
854     }
855
856     @Test
857     public void testCommitWhenTransactionHasNoModifications(){
858         // Note that persistence is enabled which would normally result in the entry getting written to the journal
859         // but here that need not happen
860         new ShardTestKit(getSystem()) {
861             {
862                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
863                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
864                         "testCommitWhenTransactionHasNoModifications");
865
866                 waitUntilLeader(shard);
867
868                 String transactionID = "tx1";
869                 MutableCompositeModification modification = new MutableCompositeModification();
870                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
871                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
872                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
873                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
874
875                 FiniteDuration duration = duration("5 seconds");
876
877                 // Simulate the ForwardedReadyTransaction messages that would be sent
878                 // by the ShardTransaction.
879
880                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
881                         cohort, modification, true), getRef());
882                 expectMsgClass(duration, ReadyTransactionReply.class);
883
884                 // Send the CanCommitTransaction message.
885
886                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
887                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
888                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
889                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
890
891                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
892                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
893
894                 InOrder inOrder = inOrder(cohort);
895                 inOrder.verify(cohort).canCommit();
896                 inOrder.verify(cohort).preCommit();
897                 inOrder.verify(cohort).commit();
898
899                 // Use MBean for verification
900                 // Committed transaction count should increase as usual
901                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
902
903                 // Commit index should not advance because this does not go into the journal
904                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
905
906                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
907
908             }
909         };
910     }
911
912     @Test
913     public void testCommitWhenTransactionHasModifications(){
914         new ShardTestKit(getSystem()) {
915             {
916                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
917                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
918                         "testCommitWhenTransactionHasModifications");
919
920                 waitUntilLeader(shard);
921
922                 String transactionID = "tx1";
923                 MutableCompositeModification modification = new MutableCompositeModification();
924                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
925                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
926                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
927                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
928                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
929
930                 FiniteDuration duration = duration("5 seconds");
931
932                 // Simulate the ForwardedReadyTransaction messages that would be sent
933                 // by the ShardTransaction.
934
935                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
936                         cohort, modification, true), getRef());
937                 expectMsgClass(duration, ReadyTransactionReply.class);
938
939                 // Send the CanCommitTransaction message.
940
941                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
942                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
943                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
944                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
945
946                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
947                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
948
949                 InOrder inOrder = inOrder(cohort);
950                 inOrder.verify(cohort).canCommit();
951                 inOrder.verify(cohort).preCommit();
952                 inOrder.verify(cohort).commit();
953
954                 // Use MBean for verification
955                 // Committed transaction count should increase as usual
956                 assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
957
958                 // Commit index should advance as we do not have an empty modification
959                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
960
961                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
962
963             }
964         };
965     }
966
967     @Test
968     public void testCommitPhaseFailure() throws Throwable {
969         new ShardTestKit(getSystem()) {{
970             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
971                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
972                     "testCommitPhaseFailure");
973
974             waitUntilLeader(shard);
975
976          // Setup 2 simulated transactions with mock cohorts. The first one fails in the
977             // commit phase.
978
979             String transactionID1 = "tx1";
980             MutableCompositeModification modification1 = new MutableCompositeModification();
981             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
982             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
983             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
984             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
985
986             String transactionID2 = "tx2";
987             MutableCompositeModification modification2 = new MutableCompositeModification();
988             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
989             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
990
991             FiniteDuration duration = duration("5 seconds");
992             final Timeout timeout = new Timeout(duration);
993
994             // Simulate the ForwardedReadyTransaction messages that would be sent
995             // by the ShardTransaction.
996
997             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
998                     cohort1, modification1, true), getRef());
999             expectMsgClass(duration, ReadyTransactionReply.class);
1000
1001             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1002                     cohort2, modification2, true), getRef());
1003             expectMsgClass(duration, ReadyTransactionReply.class);
1004
1005             // Send the CanCommitTransaction message for the first Tx.
1006
1007             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1008             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1009                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1010             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1011
1012             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1013             // processed after the first Tx completes.
1014
1015             Future<Object> canCommitFuture = Patterns.ask(shard,
1016                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1017
1018             // Send the CommitTransaction message for the first Tx. This should send back an error
1019             // and trigger the 2nd Tx to proceed.
1020
1021             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1022             expectMsgClass(duration, akka.actor.Status.Failure.class);
1023
1024             // Wait for the 2nd Tx to complete the canCommit phase.
1025
1026             final CountDownLatch latch = new CountDownLatch(1);
1027             canCommitFuture.onComplete(new OnComplete<Object>() {
1028                 @Override
1029                 public void onComplete(final Throwable t, final Object resp) {
1030                     latch.countDown();
1031                 }
1032             }, getSystem().dispatcher());
1033
1034             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1035
1036             InOrder inOrder = inOrder(cohort1, cohort2);
1037             inOrder.verify(cohort1).canCommit();
1038             inOrder.verify(cohort1).preCommit();
1039             inOrder.verify(cohort1).commit();
1040             inOrder.verify(cohort2).canCommit();
1041
1042             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1043         }};
1044     }
1045
1046     @Test
1047     public void testPreCommitPhaseFailure() throws Throwable {
1048         new ShardTestKit(getSystem()) {{
1049             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1050                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1051                     "testPreCommitPhaseFailure");
1052
1053             waitUntilLeader(shard);
1054
1055             String transactionID = "tx1";
1056             MutableCompositeModification modification = new MutableCompositeModification();
1057             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1058             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1059             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1060
1061             FiniteDuration duration = duration("5 seconds");
1062
1063             // Simulate the ForwardedReadyTransaction messages that would be sent
1064             // by the ShardTransaction.
1065
1066             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1067                     cohort, modification, true), getRef());
1068             expectMsgClass(duration, ReadyTransactionReply.class);
1069
1070             // Send the CanCommitTransaction message.
1071
1072             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1073             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1074                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1075             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1076
1077             // Send the CommitTransaction message. This should send back an error
1078             // for preCommit failure.
1079
1080             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1081             expectMsgClass(duration, akka.actor.Status.Failure.class);
1082
1083             InOrder inOrder = inOrder(cohort);
1084             inOrder.verify(cohort).canCommit();
1085             inOrder.verify(cohort).preCommit();
1086
1087             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1088         }};
1089     }
1090
1091     @Test
1092     public void testCanCommitPhaseFailure() throws Throwable {
1093         new ShardTestKit(getSystem()) {{
1094             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1095                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1096                     "testCanCommitPhaseFailure");
1097
1098             waitUntilLeader(shard);
1099
1100             final FiniteDuration duration = duration("5 seconds");
1101
1102             String transactionID = "tx1";
1103             MutableCompositeModification modification = new MutableCompositeModification();
1104             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1105             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1106
1107             // Simulate the ForwardedReadyTransaction messages that would be sent
1108             // by the ShardTransaction.
1109
1110             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1111                     cohort, modification, true), getRef());
1112             expectMsgClass(duration, ReadyTransactionReply.class);
1113
1114             // Send the CanCommitTransaction message.
1115
1116             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1117             expectMsgClass(duration, akka.actor.Status.Failure.class);
1118
1119             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1120         }};
1121     }
1122
1123     @Test
1124     public void testAbortBeforeFinishCommit() throws Throwable {
1125         new ShardTestKit(getSystem()) {{
1126             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1127                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1128                     "testAbortBeforeFinishCommit");
1129
1130             waitUntilLeader(shard);
1131
1132             final FiniteDuration duration = duration("5 seconds");
1133             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1134
1135             final String transactionID = "tx1";
1136             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1137                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1138                 @Override
1139                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1140                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1141
1142                     // Simulate an AbortTransaction message occurring during replication, after
1143                     // persisting and before finishing the commit to the in-memory store.
1144                     // We have no followers so due to optimizations in the RaftActor, it does not
1145                     // attempt replication and thus we can't send an AbortTransaction message b/c
1146                     // it would be processed too late after CommitTransaction completes. So we'll
1147                     // simulate an AbortTransaction message occurring during replication by calling
1148                     // the shard directly.
1149                     //
1150                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1151
1152                     return preCommitFuture;
1153                 }
1154             };
1155
1156             MutableCompositeModification modification = new MutableCompositeModification();
1157             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1158                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1159                     modification, preCommit);
1160
1161             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1162                     cohort, modification, true), getRef());
1163             expectMsgClass(duration, ReadyTransactionReply.class);
1164
1165             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1166             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1167                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1168             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1169
1170             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1171             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1172
1173             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1174
1175             // Since we're simulating an abort occurring during replication and before finish commit,
1176             // the data should still get written to the in-memory store since we've gotten past
1177             // canCommit and preCommit and persisted the data.
1178             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1179
1180             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1181         }};
1182     }
1183
1184     @Test
1185     public void testTransactionCommitTimeout() throws Throwable {
1186         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1187
1188         new ShardTestKit(getSystem()) {{
1189             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1190                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1191                     "testTransactionCommitTimeout");
1192
1193             waitUntilLeader(shard);
1194
1195             final FiniteDuration duration = duration("5 seconds");
1196
1197             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1198
1199             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1200             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1201                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1202
1203             // Create 1st Tx - will timeout
1204
1205             String transactionID1 = "tx1";
1206             MutableCompositeModification modification1 = new MutableCompositeModification();
1207             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1208                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1209                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1210                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1211                     modification1);
1212
1213             // Create 2nd Tx
1214
1215             String transactionID2 = "tx3";
1216             MutableCompositeModification modification2 = new MutableCompositeModification();
1217             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1218                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1219             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1220                     listNodePath,
1221                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1222                     modification2);
1223
1224             // Ready the Tx's
1225
1226             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1227                     cohort1, modification1, true), getRef());
1228             expectMsgClass(duration, ReadyTransactionReply.class);
1229
1230             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1231                     cohort2, modification2, true), getRef());
1232             expectMsgClass(duration, ReadyTransactionReply.class);
1233
1234             // canCommit 1st Tx. We don't send the commit so it should timeout.
1235
1236             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1237             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1238
1239             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1240
1241             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1242             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1243
1244             // Commit the 2nd Tx.
1245
1246             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1247             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1248
1249             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1250             assertNotNull(listNodePath + " not found", node);
1251
1252             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1253         }};
1254     }
1255
1256     @Test
1257     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1258         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1259
1260         new ShardTestKit(getSystem()) {{
1261             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1262                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1263                     "testTransactionCommitQueueCapacityExceeded");
1264
1265             waitUntilLeader(shard);
1266
1267             final FiniteDuration duration = duration("5 seconds");
1268
1269             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1270
1271             String transactionID1 = "tx1";
1272             MutableCompositeModification modification1 = new MutableCompositeModification();
1273             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1274                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1275
1276             String transactionID2 = "tx2";
1277             MutableCompositeModification modification2 = new MutableCompositeModification();
1278             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1279                     TestModel.OUTER_LIST_PATH,
1280                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1281                     modification2);
1282
1283             String transactionID3 = "tx3";
1284             MutableCompositeModification modification3 = new MutableCompositeModification();
1285             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1286                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1287
1288             // Ready the Tx's
1289
1290             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1291                     cohort1, modification1, true), getRef());
1292             expectMsgClass(duration, ReadyTransactionReply.class);
1293
1294             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1295                     cohort2, modification2, true), getRef());
1296             expectMsgClass(duration, ReadyTransactionReply.class);
1297
1298             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1299                     cohort3, modification3, true), getRef());
1300             expectMsgClass(duration, ReadyTransactionReply.class);
1301
1302             // canCommit 1st Tx.
1303
1304             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1305             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1306
1307             // canCommit the 2nd Tx - it should get queued.
1308
1309             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1310
1311             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1312
1313             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1314             expectMsgClass(duration, akka.actor.Status.Failure.class);
1315
1316             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1317         }};
1318     }
1319
1320     @Test
1321     public void testCanCommitBeforeReadyFailure() throws Throwable {
1322         new ShardTestKit(getSystem()) {{
1323             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1324                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1325                     "testCanCommitBeforeReadyFailure");
1326
1327             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1328             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1329
1330             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1331         }};
1332     }
1333
1334     @Test
1335     public void testAbortTransaction() throws Throwable {
1336         new ShardTestKit(getSystem()) {{
1337             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1338                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1339                     "testAbortTransaction");
1340
1341             waitUntilLeader(shard);
1342
1343             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1344
1345             String transactionID1 = "tx1";
1346             MutableCompositeModification modification1 = new MutableCompositeModification();
1347             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1348             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1349             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1350
1351             String transactionID2 = "tx2";
1352             MutableCompositeModification modification2 = new MutableCompositeModification();
1353             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1354             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1355
1356             FiniteDuration duration = duration("5 seconds");
1357             final Timeout timeout = new Timeout(duration);
1358
1359             // Simulate the ForwardedReadyTransaction messages that would be sent
1360             // by the ShardTransaction.
1361
1362             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1363                     cohort1, modification1, true), getRef());
1364             expectMsgClass(duration, ReadyTransactionReply.class);
1365
1366             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1367                     cohort2, modification2, true), getRef());
1368             expectMsgClass(duration, ReadyTransactionReply.class);
1369
1370             // Send the CanCommitTransaction message for the first Tx.
1371
1372             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1373             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1374                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1375             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1376
1377             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1378             // processed after the first Tx completes.
1379
1380             Future<Object> canCommitFuture = Patterns.ask(shard,
1381                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1382
1383             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1384             // Tx to proceed.
1385
1386             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1387             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1388
1389             // Wait for the 2nd Tx to complete the canCommit phase.
1390
1391             Await.ready(canCommitFuture, duration);
1392
1393             InOrder inOrder = inOrder(cohort1, cohort2);
1394             inOrder.verify(cohort1).canCommit();
1395             inOrder.verify(cohort2).canCommit();
1396
1397             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1398         }};
1399     }
1400
1401     @Test
1402     public void testCreateSnapshot() throws Exception {
1403         testCreateSnapshot(true, "testCreateSnapshot");
1404     }
1405
1406     @Test
1407     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1408         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1409     }
1410
1411     @SuppressWarnings("serial")
1412     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1413
1414         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1415
1416         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1417         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1418             TestPersistentDataProvider(DataPersistenceProvider delegate) {
1419                 super(delegate);
1420             }
1421
1422             @Override
1423             public void saveSnapshot(Object o) {
1424                 savedSnapshot.set(o);
1425                 super.saveSnapshot(o);
1426             }
1427         }
1428
1429         dataStoreContextBuilder.persistent(persistent);
1430
1431         new ShardTestKit(getSystem()) {{
1432             class TestShard extends Shard {
1433
1434                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
1435                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
1436                     super(name, peerAddresses, datastoreContext, schemaContext);
1437                     setPersistence(new TestPersistentDataProvider(super.persistence()));
1438                 }
1439
1440                 @Override
1441                 public void handleCommand(Object message) {
1442                     super.handleCommand(message);
1443
1444                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
1445                         latch.get().countDown();
1446                     }
1447                 }
1448
1449                 @Override
1450                 public RaftActorContext getRaftActorContext() {
1451                     return super.getRaftActorContext();
1452                 }
1453             }
1454
1455             Creator<Shard> creator = new Creator<Shard>() {
1456                 @Override
1457                 public Shard create() throws Exception {
1458                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
1459                             newDatastoreContext(), SCHEMA_CONTEXT);
1460                 }
1461             };
1462
1463             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1464                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1465
1466             waitUntilLeader(shard);
1467
1468             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1469
1470             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1471
1472             // Trigger creation of a snapshot by ensuring
1473             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1474             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1475
1476             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1477
1478             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1479                     savedSnapshot.get() instanceof Snapshot);
1480
1481             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1482
1483             latch.set(new CountDownLatch(1));
1484             savedSnapshot.set(null);
1485
1486             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1487
1488             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1489
1490             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1491                     savedSnapshot.get() instanceof Snapshot);
1492
1493             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1494
1495             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1496         }
1497
1498         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1499
1500             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1501             assertEquals("Root node", expectedRoot, actual);
1502
1503         }};
1504     }
1505
1506     /**
1507      * This test simply verifies that the applySnapShot logic will work
1508      * @throws ReadFailedException
1509      */
1510     @Test
1511     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1512         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1513
1514         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1515
1516         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1517         putTransaction.write(TestModel.TEST_PATH,
1518             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1519         commitTransaction(putTransaction);
1520
1521
1522         NormalizedNode<?, ?> expected = readStore(store);
1523
1524         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1525
1526         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1527         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1528
1529         commitTransaction(writeTransaction);
1530
1531         NormalizedNode<?, ?> actual = readStore(store);
1532
1533         assertEquals(expected, actual);
1534     }
1535
1536     @Test
1537     public void testRecoveryApplicable(){
1538
1539         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1540                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1541
1542         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1543                 persistentContext, SCHEMA_CONTEXT);
1544
1545         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1546                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1547
1548         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1549                 nonPersistentContext, SCHEMA_CONTEXT);
1550
1551         new ShardTestKit(getSystem()) {{
1552             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1553                     persistentProps, "testPersistence1");
1554
1555             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1556
1557             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1558
1559             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1560                     nonPersistentProps, "testPersistence2");
1561
1562             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1563
1564             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1565
1566         }};
1567
1568     }
1569
1570     @Test
1571     public void testOnDatastoreContext() {
1572         new ShardTestKit(getSystem()) {{
1573             dataStoreContextBuilder.persistent(true);
1574
1575             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1576
1577             assertEquals("isRecoveryApplicable", true,
1578                     shard.underlyingActor().persistence().isRecoveryApplicable());
1579
1580             waitUntilLeader(shard);
1581
1582             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1583
1584             assertEquals("isRecoveryApplicable", false,
1585                     shard.underlyingActor().persistence().isRecoveryApplicable());
1586
1587             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1588
1589             assertEquals("isRecoveryApplicable", true,
1590                     shard.underlyingActor().persistence().isRecoveryApplicable());
1591
1592             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1593         }};
1594     }
1595
1596     @Test
1597     public void testRegisterRoleChangeListener() throws Exception {
1598         new ShardTestKit(getSystem()) {
1599             {
1600                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1601                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1602                         "testRegisterRoleChangeListener");
1603
1604                 waitUntilLeader(shard);
1605
1606                 TestActorRef<MessageCollectorActor> listener =
1607                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1608
1609                 shard.tell(new RegisterRoleChangeListener(), listener);
1610
1611                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1612                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1613                 // sleep.
1614                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1615
1616                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1617
1618                 assertEquals(1, allMatching.size());
1619             }
1620         };
1621     }
1622
1623     @Test
1624     public void testFollowerInitialSyncStatus() throws Exception {
1625         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1626                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1627                 "testFollowerInitialSyncStatus");
1628
1629         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1630
1631         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1632
1633         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1634
1635         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1636
1637         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1638     }
1639
1640     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1641         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1642         ListenableFuture<Void> future =
1643             commitCohort.preCommit();
1644         try {
1645             future.get();
1646             future = commitCohort.commit();
1647             future.get();
1648         } catch (InterruptedException | ExecutionException e) {
1649         }
1650     }
1651 }