Merge "BUG-1567 Expose sources for submodules in netconf"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.Dispatchers;
17 import akka.dispatch.OnComplete;
18 import akka.japi.Creator;
19 import akka.japi.Procedure;
20 import akka.pattern.Patterns;
21 import akka.persistence.SnapshotSelectionCriteria;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Test;
42 import org.mockito.InOrder;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
61 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
62 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
63 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
64 import org.opendaylight.controller.cluster.datastore.modification.Modification;
65 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
66 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
67 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
68 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
69 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
70 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
71 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
72 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
73 import org.opendaylight.controller.cluster.raft.RaftActorContext;
74 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
75 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
76 import org.opendaylight.controller.cluster.raft.Snapshot;
77 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
80 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
81 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
82 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
84 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
85 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
86 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
87 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
88 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
89 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
90 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
91 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
92 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
93 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
95 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
97 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
98 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
100 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
101 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
102 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
103 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
104 import scala.concurrent.Await;
105 import scala.concurrent.Future;
106 import scala.concurrent.duration.FiniteDuration;
107
108 public class ShardTest extends AbstractShardTest {
109
110     @Test
111     public void testRegisterChangeListener() throws Exception {
112         new ShardTestKit(getSystem()) {{
113             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
114                     newShardProps(),  "testRegisterChangeListener");
115
116             waitUntilLeader(shard);
117
118             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
119
120             MockDataChangeListener listener = new MockDataChangeListener(1);
121             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
122                     "testRegisterChangeListener-DataChangeListener");
123
124             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
125                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
126
127             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
128                     RegisterChangeListenerReply.class);
129             String replyPath = reply.getListenerRegistrationPath().toString();
130             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
131                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
132
133             YangInstanceIdentifier path = TestModel.TEST_PATH;
134             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
135
136             listener.waitForChangeEvents(path);
137
138             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
139             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
140         }};
141     }
142
143     @SuppressWarnings("serial")
144     @Test
145     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
146         // This test tests the timing window in which a change listener is registered before the
147         // shard becomes the leader. We verify that the listener is registered and notified of the
148         // existing data when the shard becomes the leader.
149         new ShardTestKit(getSystem()) {{
150             // For this test, we want to send the RegisterChangeListener message after the shard
151             // has recovered from persistence and before it becomes the leader. So we subclass
152             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
153             // we know that the shard has been initialized to a follower and has started the
154             // election process. The following 2 CountDownLatches are used to coordinate the
155             // ElectionTimeout with the sending of the RegisterChangeListener message.
156             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
157             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
158             Creator<Shard> creator = new Creator<Shard>() {
159                 boolean firstElectionTimeout = true;
160
161                 @Override
162                 public Shard create() throws Exception {
163                     // Use a non persistent provider because this test actually invokes persist on the journal
164                     // this will cause all other messages to not be queued properly after that.
165                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
166                     // it does do a persist)
167                     return new Shard(shardID, Collections.<String,String>emptyMap(),
168                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
169                         @Override
170                         public void onReceiveCommand(final Object message) throws Exception {
171                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
172                                 // Got the first ElectionTimeout. We don't forward it to the
173                                 // base Shard yet until we've sent the RegisterChangeListener
174                                 // message. So we signal the onFirstElectionTimeout latch to tell
175                                 // the main thread to send the RegisterChangeListener message and
176                                 // start a thread to wait on the onChangeListenerRegistered latch,
177                                 // which the main thread signals after it has sent the message.
178                                 // After the onChangeListenerRegistered is triggered, we send the
179                                 // original ElectionTimeout message to proceed with the election.
180                                 firstElectionTimeout = false;
181                                 final ActorRef self = getSelf();
182                                 new Thread() {
183                                     @Override
184                                     public void run() {
185                                         Uninterruptibles.awaitUninterruptibly(
186                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
187                                         self.tell(message, self);
188                                     }
189                                 }.start();
190
191                                 onFirstElectionTimeout.countDown();
192                             } else {
193                                 super.onReceiveCommand(message);
194                             }
195                         }
196                     };
197                 }
198             };
199
200             MockDataChangeListener listener = new MockDataChangeListener(1);
201             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
202                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
203
204             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
205                     Props.create(new DelegatingShardCreator(creator)),
206                     "testRegisterChangeListenerWhenNotLeaderInitially");
207
208             // Write initial data into the in-memory store.
209             YangInstanceIdentifier path = TestModel.TEST_PATH;
210             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
211
212             // Wait until the shard receives the first ElectionTimeout message.
213             assertEquals("Got first ElectionTimeout", true,
214                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
215
216             // Now send the RegisterChangeListener and wait for the reply.
217             shard.tell(new RegisterChangeListener(path, dclActor,
218                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
219
220             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
221                     RegisterChangeListenerReply.class);
222             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
223
224             // Sanity check - verify the shard is not the leader yet.
225             shard.tell(new FindLeader(), getRef());
226             FindLeaderReply findLeadeReply =
227                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
228             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
229
230             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
231             // with the election process.
232             onChangeListenerRegistered.countDown();
233
234             // Wait for the shard to become the leader and notify our listener with the existing
235             // data in the store.
236             listener.waitForChangeEvents(path);
237
238             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
239             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
240         }};
241     }
242
243     @Test
244     public void testCreateTransaction(){
245         new ShardTestKit(getSystem()) {{
246             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
247
248             waitUntilLeader(shard);
249
250             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
251
252             shard.tell(new CreateTransaction("txn-1",
253                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
254
255             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
256                     CreateTransactionReply.class);
257
258             String path = reply.getTransactionActorPath().toString();
259             assertTrue("Unexpected transaction path " + path,
260                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
261
262             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
263         }};
264     }
265
266     @Test
267     public void testCreateTransactionOnChain(){
268         new ShardTestKit(getSystem()) {{
269             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
270
271             waitUntilLeader(shard);
272
273             shard.tell(new CreateTransaction("txn-1",
274                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
275                     getRef());
276
277             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
278                     CreateTransactionReply.class);
279
280             String path = reply.getTransactionActorPath().toString();
281             assertTrue("Unexpected transaction path " + path,
282                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
283
284             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
285         }};
286     }
287
288     @SuppressWarnings("serial")
289     @Test
290     public void testPeerAddressResolved() throws Exception {
291         new ShardTestKit(getSystem()) {{
292             final CountDownLatch recoveryComplete = new CountDownLatch(1);
293             class TestShard extends Shard {
294                 TestShard() {
295                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
296                             newDatastoreContext(), SCHEMA_CONTEXT);
297                 }
298
299                 Map<String, String> getPeerAddresses() {
300                     return getRaftActorContext().getPeerAddresses();
301                 }
302
303                 @Override
304                 protected void onRecoveryComplete() {
305                     try {
306                         super.onRecoveryComplete();
307                     } finally {
308                         recoveryComplete.countDown();
309                     }
310                 }
311             }
312
313             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
314                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
315                         @Override
316                         public TestShard create() throws Exception {
317                             return new TestShard();
318                         }
319                     })), "testPeerAddressResolved");
320
321             //waitUntilLeader(shard);
322             assertEquals("Recovery complete", true,
323                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
324
325             String address = "akka://foobar";
326             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
327
328             assertEquals("getPeerAddresses", address,
329                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
330
331             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
332         }};
333     }
334
335     @Test
336     public void testApplySnapshot() throws Exception {
337         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
338                 "testApplySnapshot");
339
340         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
341         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
342
343         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
344
345         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
346         NormalizedNode<?,?> expected = readStore(store, root);
347
348         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
349                 SerializationUtils.serializeNormalizedNode(expected),
350                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
351
352         shard.underlyingActor().onReceiveCommand(applySnapshot);
353
354         NormalizedNode<?,?> actual = readStore(shard, root);
355
356         assertEquals("Root node", expected, actual);
357
358         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
359     }
360
361     @Test
362     public void testApplyState() throws Exception {
363
364         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
365
366         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
367
368         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
369                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
370
371         shard.underlyingActor().onReceiveCommand(applyState);
372
373         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
374         assertEquals("Applied state", node, actual);
375
376         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
377     }
378
379     @Test
380     public void testRecovery() throws Exception {
381
382         // Set up the InMemorySnapshotStore.
383
384         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
385         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
386
387         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
388
389         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
390
391         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
392                 SerializationUtils.serializeNormalizedNode(root),
393                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
394
395         // Set up the InMemoryJournal.
396
397         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
398                   new WriteModification(TestModel.OUTER_LIST_PATH,
399                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
400
401         int nListEntries = 16;
402         Set<Integer> listEntryKeys = new HashSet<>();
403
404         // Add some ModificationPayload entries
405         for(int i = 1; i <= nListEntries; i++) {
406             listEntryKeys.add(Integer.valueOf(i));
407             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
408                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
409             Modification mod = new MergeModification(path,
410                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
411             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
412                     newModificationPayload(mod)));
413         }
414
415         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
416                 new ApplyJournalEntries(nListEntries));
417
418         testRecovery(listEntryKeys);
419     }
420
421     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
422         MutableCompositeModification compMod = new MutableCompositeModification();
423         for(Modification mod: mods) {
424             compMod.addModification(mod);
425         }
426
427         return new ModificationPayload(compMod);
428     }
429
430     @SuppressWarnings({ "unchecked" })
431     @Test
432     public void testConcurrentThreePhaseCommits() throws Throwable {
433         new ShardTestKit(getSystem()) {{
434             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
435                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
436                     "testConcurrentThreePhaseCommits");
437
438             waitUntilLeader(shard);
439
440             final String transactionID1 = "tx1";
441             final String transactionID2 = "tx2";
442             final String transactionID3 = "tx3";
443
444             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
445             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
446             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
447             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
448                 @Override
449                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
450                     if(transactionID.equals(transactionID1)) {
451                         mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
452                         return mockCohort1.get();
453                     } else if(transactionID.equals(transactionID2)) {
454                         mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
455                         return mockCohort2.get();
456                     } else {
457                         mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
458                         return mockCohort3.get();
459                     }
460                 }
461             };
462
463             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
464
465             long timeoutSec = 5;
466             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
467             final Timeout timeout = new Timeout(duration);
468
469             // Send a BatchedModifications message for the first transaction.
470
471             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
472                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
473             BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
474             assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
475             assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
476
477             // Send the CanCommitTransaction message for the first Tx.
478
479             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
480             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
481                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
482             assertEquals("Can commit", true, canCommitReply.getCanCommit());
483
484             // Send BatchedModifications for the next 2 Tx's.
485
486             shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
487                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
488             expectMsgClass(duration, BatchedModificationsReply.class);
489
490             shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
491                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
492                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
493             expectMsgClass(duration, BatchedModificationsReply.class);
494
495             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
496             // processed after the first Tx completes.
497
498             Future<Object> canCommitFuture1 = Patterns.ask(shard,
499                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
500
501             Future<Object> canCommitFuture2 = Patterns.ask(shard,
502                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
503
504             // Send the CommitTransaction message for the first Tx. After it completes, it should
505             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
506
507             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
508             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
509
510             // Wait for the next 2 Tx's to complete.
511
512             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
513             final CountDownLatch commitLatch = new CountDownLatch(2);
514
515             class OnFutureComplete extends OnComplete<Object> {
516                 private final Class<?> expRespType;
517
518                 OnFutureComplete(final Class<?> expRespType) {
519                     this.expRespType = expRespType;
520                 }
521
522                 @Override
523                 public void onComplete(final Throwable error, final Object resp) {
524                     if(error != null) {
525                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
526                     } else {
527                         try {
528                             assertEquals("Commit response type", expRespType, resp.getClass());
529                             onSuccess(resp);
530                         } catch (Exception e) {
531                             caughtEx.set(e);
532                         }
533                     }
534                 }
535
536                 void onSuccess(final Object resp) throws Exception {
537                 }
538             }
539
540             class OnCommitFutureComplete extends OnFutureComplete {
541                 OnCommitFutureComplete() {
542                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
543                 }
544
545                 @Override
546                 public void onComplete(final Throwable error, final Object resp) {
547                     super.onComplete(error, resp);
548                     commitLatch.countDown();
549                 }
550             }
551
552             class OnCanCommitFutureComplete extends OnFutureComplete {
553                 private final String transactionID;
554
555                 OnCanCommitFutureComplete(final String transactionID) {
556                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
557                     this.transactionID = transactionID;
558                 }
559
560                 @Override
561                 void onSuccess(final Object resp) throws Exception {
562                     CanCommitTransactionReply canCommitReply =
563                             CanCommitTransactionReply.fromSerializable(resp);
564                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
565
566                     Future<Object> commitFuture = Patterns.ask(shard,
567                             new CommitTransaction(transactionID).toSerializable(), timeout);
568                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
569                 }
570             }
571
572             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
573                     getSystem().dispatcher());
574
575             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
576                     getSystem().dispatcher());
577
578             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
579
580             if(caughtEx.get() != null) {
581                 throw caughtEx.get();
582             }
583
584             assertEquals("Commits complete", true, done);
585
586             InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
587             inOrder.verify(mockCohort1.get()).canCommit();
588             inOrder.verify(mockCohort1.get()).preCommit();
589             inOrder.verify(mockCohort1.get()).commit();
590             inOrder.verify(mockCohort2.get()).canCommit();
591             inOrder.verify(mockCohort2.get()).preCommit();
592             inOrder.verify(mockCohort2.get()).commit();
593             inOrder.verify(mockCohort3.get()).canCommit();
594             inOrder.verify(mockCohort3.get()).preCommit();
595             inOrder.verify(mockCohort3.get()).commit();
596
597             // Verify data in the data store.
598
599             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
600             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
601             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
602                     outerList.getValue() instanceof Iterable);
603             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
604             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
605                        entry instanceof MapEntryNode);
606             MapEntryNode mapEntry = (MapEntryNode)entry;
607             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
608                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
609             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
610             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
611
612             verifyLastApplied(shard, 2);
613
614             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
615         }};
616     }
617
618     private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
619             NormalizedNode<?, ?> data, boolean ready) {
620         return newBatchedModifications(transactionID, null, path, data, ready);
621     }
622
623     private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
624             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
625         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
626         batched.addModification(new WriteModification(path, data));
627         batched.setReady(ready);
628         return batched;
629     }
630
631     @SuppressWarnings("unchecked")
632     @Test
633     public void testMultipleBatchedModifications() throws Throwable {
634         new ShardTestKit(getSystem()) {{
635             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
636                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
637                     "testMultipleBatchedModifications");
638
639             waitUntilLeader(shard);
640
641             final String transactionID = "tx";
642             FiniteDuration duration = duration("5 seconds");
643
644             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
645             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
646                 @Override
647                 public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
648                     if(mockCohort.get() == null) {
649                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
650                     }
651
652                     return mockCohort.get();
653                 }
654             };
655
656             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
657
658             // Send a BatchedModifications to start a transaction.
659
660             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
661                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
662             expectMsgClass(duration, BatchedModificationsReply.class);
663
664             // Send a couple more BatchedModifications.
665
666             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
667                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
668             expectMsgClass(duration, BatchedModificationsReply.class);
669
670             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
671                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
672                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
673             expectMsgClass(duration, BatchedModificationsReply.class);
674
675             // Send the CanCommitTransaction message.
676
677             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
678             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
679                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
680             assertEquals("Can commit", true, canCommitReply.getCanCommit());
681
682             // Send the CanCommitTransaction message.
683
684             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
685             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
686
687             InOrder inOrder = inOrder(mockCohort.get());
688             inOrder.verify(mockCohort.get()).canCommit();
689             inOrder.verify(mockCohort.get()).preCommit();
690             inOrder.verify(mockCohort.get()).commit();
691
692             // Verify data in the data store.
693
694             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
695             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
696             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
697                     outerList.getValue() instanceof Iterable);
698             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
699             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
700                        entry instanceof MapEntryNode);
701             MapEntryNode mapEntry = (MapEntryNode)entry;
702             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
703                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
704             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
705             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
706
707             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
708         }};
709     }
710
711     @Test
712     public void testBatchedModificationsOnTransactionChain() throws Throwable {
713         new ShardTestKit(getSystem()) {{
714             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
715                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
716                     "testBatchedModificationsOnTransactionChain");
717
718             waitUntilLeader(shard);
719
720             String transactionChainID = "txChain";
721             String transactionID1 = "tx1";
722             String transactionID2 = "tx2";
723
724             FiniteDuration duration = duration("5 seconds");
725
726             // Send a BatchedModifications to start a chained write transaction and ready it.
727
728             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
729             YangInstanceIdentifier path = TestModel.TEST_PATH;
730             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
731                     containerNode, true), getRef());
732             expectMsgClass(duration, BatchedModificationsReply.class);
733
734             // Create a read Tx on the same chain.
735
736             shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
737                     transactionChainID).toSerializable(), getRef());
738
739             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
740
741             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
742             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
743             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
744
745             // Commit the write transaction.
746
747             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
748             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
749                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
750             assertEquals("Can commit", true, canCommitReply.getCanCommit());
751
752             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
753             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
754
755             // Verify data in the data store.
756
757             NormalizedNode<?, ?> actualNode = readStore(shard, path);
758             assertEquals("Stored node", containerNode, actualNode);
759
760             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
761         }};
762     }
763
764     @Test
765     public void testOnBatchedModificationsWhenNotLeader() {
766         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
767         new ShardTestKit(getSystem()) {{
768             Creator<Shard> creator = new Creator<Shard>() {
769                 @Override
770                 public Shard create() throws Exception {
771                     return new Shard(shardID, Collections.<String,String>emptyMap(),
772                             newDatastoreContext(), SCHEMA_CONTEXT) {
773                         @Override
774                         protected boolean isLeader() {
775                             return overrideLeaderCalls.get() ? false : super.isLeader();
776                         }
777
778                         @Override
779                         protected ActorSelection getLeader() {
780                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
781                                 super.getLeader();
782                         }
783                     };
784                 }
785             };
786
787             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
788                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
789
790             waitUntilLeader(shard);
791
792             overrideLeaderCalls.set(true);
793
794             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
795
796             shard.tell(batched, ActorRef.noSender());
797
798             expectMsgEquals(batched);
799
800             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
801         }};
802     }
803
804     @Test
805     public void testCommitWithPersistenceDisabled() throws Throwable {
806         dataStoreContextBuilder.persistent(false);
807         new ShardTestKit(getSystem()) {{
808             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
809                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
810                     "testCommitWithPersistenceDisabled");
811
812             waitUntilLeader(shard);
813
814             String transactionID = "tx";
815             FiniteDuration duration = duration("5 seconds");
816
817             // Send a BatchedModifications to start a transaction.
818
819             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
820             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
821             expectMsgClass(duration, BatchedModificationsReply.class);
822
823             // Send the CanCommitTransaction message.
824
825             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
826             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
827                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
828             assertEquals("Can commit", true, canCommitReply.getCanCommit());
829
830             // Send the CanCommitTransaction message.
831
832             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
833             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
834
835             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
836             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
837
838             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
839         }};
840     }
841
842     @Test
843     public void testCommitWhenTransactionHasNoModifications(){
844         // Note that persistence is enabled which would normally result in the entry getting written to the journal
845         // but here that need not happen
846         new ShardTestKit(getSystem()) {
847             {
848                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
849                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
850                         "testCommitWhenTransactionHasNoModifications");
851
852                 waitUntilLeader(shard);
853
854                 String transactionID = "tx1";
855                 MutableCompositeModification modification = new MutableCompositeModification();
856                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
857                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
858                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
859                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
860
861                 FiniteDuration duration = duration("5 seconds");
862
863                 // Simulate the ForwardedReadyTransaction messages that would be sent
864                 // by the ShardTransaction.
865
866                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
867                         cohort, modification, true), getRef());
868                 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
869
870                 // Send the CanCommitTransaction message.
871
872                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
873                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
874                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
875                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
876
877                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
878                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
879
880                 InOrder inOrder = inOrder(cohort);
881                 inOrder.verify(cohort).canCommit();
882                 inOrder.verify(cohort).preCommit();
883                 inOrder.verify(cohort).commit();
884
885                 // Use MBean for verification
886                 // Committed transaction count should increase as usual
887                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
888
889                 // Commit index should not advance because this does not go into the journal
890                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
891
892                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
893
894             }
895         };
896     }
897
898     @Test
899     public void testCommitWhenTransactionHasModifications(){
900         new ShardTestKit(getSystem()) {
901             {
902                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
903                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
904                         "testCommitWhenTransactionHasModifications");
905
906                 waitUntilLeader(shard);
907
908                 String transactionID = "tx1";
909                 MutableCompositeModification modification = new MutableCompositeModification();
910                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
911                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
912                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
913                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
914                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
915
916                 FiniteDuration duration = duration("5 seconds");
917
918                 // Simulate the ForwardedReadyTransaction messages that would be sent
919                 // by the ShardTransaction.
920
921                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
922                         cohort, modification, true), getRef());
923                 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
924
925                 // Send the CanCommitTransaction message.
926
927                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
928                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
929                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
930                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
931
932                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
933                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
934
935                 InOrder inOrder = inOrder(cohort);
936                 inOrder.verify(cohort).canCommit();
937                 inOrder.verify(cohort).preCommit();
938                 inOrder.verify(cohort).commit();
939
940                 // Use MBean for verification
941                 // Committed transaction count should increase as usual
942                 assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
943
944                 // Commit index should advance as we do not have an empty modification
945                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
946
947                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
948
949             }
950         };
951     }
952
953     @Test
954     public void testCommitPhaseFailure() throws Throwable {
955         new ShardTestKit(getSystem()) {{
956             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
957                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
958                     "testCommitPhaseFailure");
959
960             waitUntilLeader(shard);
961
962             // Setup 2 mock cohorts. The first one fails in the commit phase.
963
964             final String transactionID1 = "tx1";
965             final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
966             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
967             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
968             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
969
970             final String transactionID2 = "tx2";
971             final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
972             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
973
974             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
975                 @Override
976                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
977                         DOMStoreThreePhaseCommitCohort actual) {
978                     return transactionID1.equals(transactionID) ? cohort1 : cohort2;
979                 }
980             };
981
982             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
983
984             FiniteDuration duration = duration("5 seconds");
985             final Timeout timeout = new Timeout(duration);
986
987             // Send BatchedModifications to start and ready each transaction.
988
989             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
990                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
991             expectMsgClass(duration, BatchedModificationsReply.class);
992
993             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
994                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
995             expectMsgClass(duration, BatchedModificationsReply.class);
996
997             // Send the CanCommitTransaction message for the first Tx.
998
999             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1000             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1001                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1002             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1003
1004             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1005             // processed after the first Tx completes.
1006
1007             Future<Object> canCommitFuture = Patterns.ask(shard,
1008                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1009
1010             // Send the CommitTransaction message for the first Tx. This should send back an error
1011             // and trigger the 2nd Tx to proceed.
1012
1013             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1014             expectMsgClass(duration, akka.actor.Status.Failure.class);
1015
1016             // Wait for the 2nd Tx to complete the canCommit phase.
1017
1018             final CountDownLatch latch = new CountDownLatch(1);
1019             canCommitFuture.onComplete(new OnComplete<Object>() {
1020                 @Override
1021                 public void onComplete(final Throwable t, final Object resp) {
1022                     latch.countDown();
1023                 }
1024             }, getSystem().dispatcher());
1025
1026             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1027
1028             InOrder inOrder = inOrder(cohort1, cohort2);
1029             inOrder.verify(cohort1).canCommit();
1030             inOrder.verify(cohort1).preCommit();
1031             inOrder.verify(cohort1).commit();
1032             inOrder.verify(cohort2).canCommit();
1033
1034             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1035         }};
1036     }
1037
1038     @Test
1039     public void testPreCommitPhaseFailure() throws Throwable {
1040         new ShardTestKit(getSystem()) {{
1041             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1042                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1043                     "testPreCommitPhaseFailure");
1044
1045             waitUntilLeader(shard);
1046
1047             String transactionID = "tx1";
1048             final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1049             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1050             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1051
1052             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
1053                 @Override
1054                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
1055                         DOMStoreThreePhaseCommitCohort actual) {
1056                     return cohort;
1057                 }
1058             };
1059
1060             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
1061
1062             FiniteDuration duration = duration("5 seconds");
1063
1064             // Send BatchedModifications to start and ready a transaction.
1065
1066             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
1067                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1068             expectMsgClass(duration, BatchedModificationsReply.class);
1069
1070             // Send the CanCommitTransaction message.
1071
1072             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1073             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1074                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1075             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1076
1077             // Send the CommitTransaction message. This should send back an error
1078             // for preCommit failure.
1079
1080             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1081             expectMsgClass(duration, akka.actor.Status.Failure.class);
1082
1083             InOrder inOrder = inOrder(cohort);
1084             inOrder.verify(cohort).canCommit();
1085             inOrder.verify(cohort).preCommit();
1086
1087             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1088         }};
1089     }
1090
1091     @Test
1092     public void testCanCommitPhaseFailure() throws Throwable {
1093         new ShardTestKit(getSystem()) {{
1094             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1095                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1096                     "testCanCommitPhaseFailure");
1097
1098             waitUntilLeader(shard);
1099
1100             final FiniteDuration duration = duration("5 seconds");
1101
1102             String transactionID = "tx1";
1103             final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1104             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1105
1106             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
1107                 @Override
1108                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
1109                         DOMStoreThreePhaseCommitCohort actual) {
1110                     return cohort;
1111                 }
1112             };
1113
1114             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
1115
1116             // Send BatchedModifications to start and ready a transaction.
1117
1118             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
1119                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1120             expectMsgClass(duration, BatchedModificationsReply.class);
1121
1122             // Send the CanCommitTransaction message.
1123
1124             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1125             expectMsgClass(duration, akka.actor.Status.Failure.class);
1126
1127             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1128         }};
1129     }
1130
1131     @Test
1132     public void testAbortBeforeFinishCommit() throws Throwable {
1133         new ShardTestKit(getSystem()) {{
1134             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1135                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1136                     "testAbortBeforeFinishCommit");
1137
1138             waitUntilLeader(shard);
1139
1140             final FiniteDuration duration = duration("5 seconds");
1141             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1142
1143             final String transactionID = "tx1";
1144             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1145                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1146                 @Override
1147                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1148                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1149
1150                     // Simulate an AbortTransaction message occurring during replication, after
1151                     // persisting and before finishing the commit to the in-memory store.
1152                     // We have no followers so due to optimizations in the RaftActor, it does not
1153                     // attempt replication and thus we can't send an AbortTransaction message b/c
1154                     // it would be processed too late after CommitTransaction completes. So we'll
1155                     // simulate an AbortTransaction message occurring during replication by calling
1156                     // the shard directly.
1157                     //
1158                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1159
1160                     return preCommitFuture;
1161                 }
1162             };
1163
1164             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
1165                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1166             expectMsgClass(duration, BatchedModificationsReply.class);
1167
1168             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1169             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1170                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1171             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1172
1173             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1174             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1175
1176             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1177
1178             // Since we're simulating an abort occurring during replication and before finish commit,
1179             // the data should still get written to the in-memory store since we've gotten past
1180             // canCommit and preCommit and persisted the data.
1181             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1182
1183             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1184         }};
1185     }
1186
1187     @Test
1188     public void testTransactionCommitTimeout() throws Throwable {
1189         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1190
1191         new ShardTestKit(getSystem()) {{
1192             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1193                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1194                     "testTransactionCommitTimeout");
1195
1196             waitUntilLeader(shard);
1197
1198             final FiniteDuration duration = duration("5 seconds");
1199
1200             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1201             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1202                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1203
1204             // Create and ready the 1st Tx - will timeout
1205
1206             String transactionID1 = "tx1";
1207             shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
1208                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1209                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
1210             expectMsgClass(duration, BatchedModificationsReply.class);
1211
1212             // Create and ready the 2nd Tx
1213
1214             String transactionID2 = "tx2";
1215             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1216                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1217             shard.tell(newBatchedModifications(transactionID2, listNodePath,
1218                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
1219             expectMsgClass(duration, BatchedModificationsReply.class);
1220
1221             // canCommit 1st Tx. We don't send the commit so it should timeout.
1222
1223             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1224             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1225
1226             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1227
1228             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1229             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1230
1231             // Commit the 2nd Tx.
1232
1233             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1234             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1235
1236             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1237             assertNotNull(listNodePath + " not found", node);
1238
1239             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1240         }};
1241     }
1242
1243     @Test
1244     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1245         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1246
1247         new ShardTestKit(getSystem()) {{
1248             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1249                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1250                     "testTransactionCommitQueueCapacityExceeded");
1251
1252             waitUntilLeader(shard);
1253
1254             final FiniteDuration duration = duration("5 seconds");
1255
1256             String transactionID1 = "tx1";
1257             String transactionID2 = "tx2";
1258             String transactionID3 = "tx3";
1259
1260             // Send a BatchedModifications to start transactions and ready them.
1261
1262             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1263                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1264             expectMsgClass(duration, BatchedModificationsReply.class);
1265
1266             shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
1267                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
1268             expectMsgClass(duration, BatchedModificationsReply.class);
1269
1270             shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1271                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1272             expectMsgClass(duration, BatchedModificationsReply.class);
1273
1274             // canCommit 1st Tx.
1275
1276             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1277             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1278
1279             // canCommit the 2nd Tx - it should get queued.
1280
1281             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1282
1283             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1284
1285             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1286             expectMsgClass(duration, akka.actor.Status.Failure.class);
1287
1288             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1289         }};
1290     }
1291
1292     @Test
1293     public void testCanCommitBeforeReadyFailure() throws Throwable {
1294         new ShardTestKit(getSystem()) {{
1295             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1296                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1297                     "testCanCommitBeforeReadyFailure");
1298
1299             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1300             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1301
1302             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1303         }};
1304     }
1305
1306     @Test
1307     public void testAbortTransaction() throws Throwable {
1308         new ShardTestKit(getSystem()) {{
1309             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1310                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1311                     "testAbortTransaction");
1312
1313             waitUntilLeader(shard);
1314
1315             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1316
1317             final String transactionID1 = "tx1";
1318             final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1319             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1320             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1321
1322             final String transactionID2 = "tx2";
1323             final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1324             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1325
1326             FiniteDuration duration = duration("5 seconds");
1327             final Timeout timeout = new Timeout(duration);
1328
1329             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
1330                 @Override
1331                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
1332                         DOMStoreThreePhaseCommitCohort actual) {
1333                     return transactionID1.equals(transactionID) ? cohort1 : cohort2;
1334                 }
1335             };
1336
1337             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
1338
1339             // Send BatchedModifications to start and ready each transaction.
1340
1341             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1342                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1343             expectMsgClass(duration, BatchedModificationsReply.class);
1344
1345             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1346                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1347             expectMsgClass(duration, BatchedModificationsReply.class);
1348
1349             // Send the CanCommitTransaction message for the first Tx.
1350
1351             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1352             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1353                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1354             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1355
1356             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1357             // processed after the first Tx completes.
1358
1359             Future<Object> canCommitFuture = Patterns.ask(shard,
1360                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1361
1362             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1363             // Tx to proceed.
1364
1365             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1366             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1367
1368             // Wait for the 2nd Tx to complete the canCommit phase.
1369
1370             Await.ready(canCommitFuture, duration);
1371
1372             InOrder inOrder = inOrder(cohort1, cohort2);
1373             inOrder.verify(cohort1).canCommit();
1374             inOrder.verify(cohort2).canCommit();
1375
1376             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1377         }};
1378     }
1379
1380     @Test
1381     public void testCreateSnapshot() throws Exception {
1382         testCreateSnapshot(true, "testCreateSnapshot");
1383     }
1384
1385     @Test
1386     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1387         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1388     }
1389
1390     @SuppressWarnings("serial")
1391     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1392
1393         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1394         class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1395             DataPersistenceProvider delegate;
1396
1397             DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1398                 this.delegate = delegate;
1399             }
1400
1401             @Override
1402             public boolean isRecoveryApplicable() {
1403                 return delegate.isRecoveryApplicable();
1404             }
1405
1406             @Override
1407             public <T> void persist(T o, Procedure<T> procedure) {
1408                 delegate.persist(o, procedure);
1409             }
1410
1411             @Override
1412             public void saveSnapshot(Object o) {
1413                 savedSnapshot.set(o);
1414                 delegate.saveSnapshot(o);
1415             }
1416
1417             @Override
1418             public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1419                 delegate.deleteSnapshots(criteria);
1420             }
1421
1422             @Override
1423             public void deleteMessages(long sequenceNumber) {
1424                 delegate.deleteMessages(sequenceNumber);
1425             }
1426         }
1427
1428         dataStoreContextBuilder.persistent(persistent);
1429
1430
1431
1432         new ShardTestKit(getSystem()) {{
1433             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1434
1435             class TestShard extends Shard {
1436
1437                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
1438                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
1439                     super(name, peerAddresses, datastoreContext, schemaContext);
1440                 }
1441
1442                 DelegatingPersistentDataProvider delegating;
1443
1444                 protected DataPersistenceProvider persistence() {
1445                     if(delegating == null) {
1446                         delegating = new DelegatingPersistentDataProvider(super.persistence());
1447                     }
1448                     return delegating;
1449                 }
1450
1451                 @Override
1452                 protected void commitSnapshot(final long sequenceNumber) {
1453                     super.commitSnapshot(sequenceNumber);
1454                     latch.get().countDown();
1455                 }
1456
1457                 @Override
1458                 public RaftActorContext getRaftActorContext() {
1459                     return super.getRaftActorContext();
1460                 }
1461             }
1462
1463             Creator<Shard> creator = new Creator<Shard>() {
1464                 @Override
1465                 public Shard create() throws Exception {
1466                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
1467                             newDatastoreContext(), SCHEMA_CONTEXT);
1468                 }
1469             };
1470
1471             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1472                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1473
1474             waitUntilLeader(shard);
1475
1476             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1477
1478             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1479
1480             // Trigger creation of a snapshot by ensuring
1481             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1482             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1483
1484             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1485
1486             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1487                     savedSnapshot.get() instanceof Snapshot);
1488
1489             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1490
1491             latch.set(new CountDownLatch(1));
1492             savedSnapshot.set(null);
1493
1494             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1495
1496             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1497
1498             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1499                     savedSnapshot.get() instanceof Snapshot);
1500
1501             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1502
1503             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1504         }
1505
1506         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1507
1508             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1509             assertEquals("Root node", expectedRoot, actual);
1510
1511         }};
1512     }
1513
1514     /**
1515      * This test simply verifies that the applySnapShot logic will work
1516      * @throws ReadFailedException
1517      */
1518     @Test
1519     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1520         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1521
1522         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1523
1524         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1525         putTransaction.write(TestModel.TEST_PATH,
1526             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1527         commitTransaction(putTransaction);
1528
1529
1530         NormalizedNode<?, ?> expected = readStore(store);
1531
1532         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1533
1534         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1535         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1536
1537         commitTransaction(writeTransaction);
1538
1539         NormalizedNode<?, ?> actual = readStore(store);
1540
1541         assertEquals(expected, actual);
1542     }
1543
1544     @Test
1545     public void testRecoveryApplicable(){
1546
1547         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1548                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1549
1550         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1551                 persistentContext, SCHEMA_CONTEXT);
1552
1553         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1554                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1555
1556         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1557                 nonPersistentContext, SCHEMA_CONTEXT);
1558
1559         new ShardTestKit(getSystem()) {{
1560             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1561                     persistentProps, "testPersistence1");
1562
1563             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1564
1565             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1566
1567             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1568                     nonPersistentProps, "testPersistence2");
1569
1570             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1571
1572             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1573
1574         }};
1575
1576     }
1577
1578     @Test
1579     public void testOnDatastoreContext() {
1580         new ShardTestKit(getSystem()) {{
1581             dataStoreContextBuilder.persistent(true);
1582
1583             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1584
1585             assertEquals("isRecoveryApplicable", true,
1586                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1587
1588             waitUntilLeader(shard);
1589
1590             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1591
1592             assertEquals("isRecoveryApplicable", false,
1593                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1594
1595             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1596
1597             assertEquals("isRecoveryApplicable", true,
1598                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1599
1600             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1601         }};
1602     }
1603
1604     @Test
1605     public void testRegisterRoleChangeListener() throws Exception {
1606         new ShardTestKit(getSystem()) {
1607             {
1608                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1609                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1610                         "testRegisterRoleChangeListener");
1611
1612                 waitUntilLeader(shard);
1613
1614                 TestActorRef<MessageCollectorActor> listener =
1615                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1616
1617                 shard.tell(new RegisterRoleChangeListener(), listener);
1618
1619                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1620                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1621                 // sleep.
1622                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1623
1624                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1625
1626                 assertEquals(1, allMatching.size());
1627             }
1628         };
1629     }
1630
1631     @Test
1632     public void testFollowerInitialSyncStatus() throws Exception {
1633         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1634                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1635                 "testFollowerInitialSyncStatus");
1636
1637         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1638
1639         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1640
1641         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1642
1643         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1644
1645         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1646     }
1647
1648     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1649         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1650         ListenableFuture<Void> future =
1651             commitCohort.preCommit();
1652         try {
1653             future.get();
1654             future = commitCohort.commit();
1655             future.get();
1656         } catch (InterruptedException | ExecutionException e) {
1657         }
1658     }
1659 }