Merge "Add LeaderStateChanged notification"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.Dispatchers;
17 import akka.dispatch.OnComplete;
18 import akka.japi.Creator;
19 import akka.japi.Procedure;
20 import akka.pattern.Patterns;
21 import akka.persistence.SnapshotSelectionCriteria;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Test;
42 import org.mockito.InOrder;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
61 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
62 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
63 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
64 import org.opendaylight.controller.cluster.datastore.modification.Modification;
65 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
66 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
67 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
68 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
69 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
70 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
71 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
72 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
73 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
74 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
75 import org.opendaylight.controller.cluster.raft.Snapshot;
76 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
77 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
79 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
80 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
81 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
82 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
84 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
85 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
86 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
87 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
88 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
89 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
90 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
91 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
92 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
93 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
95 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
97 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
98 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
100 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
101 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
102 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
103 import scala.concurrent.Await;
104 import scala.concurrent.Future;
105 import scala.concurrent.duration.FiniteDuration;
106
107 public class ShardTest extends AbstractShardTest {
108
109     @Test
110     public void testRegisterChangeListener() throws Exception {
111         new ShardTestKit(getSystem()) {{
112             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
113                     newShardProps(),  "testRegisterChangeListener");
114
115             waitUntilLeader(shard);
116
117             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
118
119             MockDataChangeListener listener = new MockDataChangeListener(1);
120             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
121                     "testRegisterChangeListener-DataChangeListener");
122
123             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
124                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
125
126             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
127                     RegisterChangeListenerReply.class);
128             String replyPath = reply.getListenerRegistrationPath().toString();
129             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
130                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
131
132             YangInstanceIdentifier path = TestModel.TEST_PATH;
133             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
134
135             listener.waitForChangeEvents(path);
136
137             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
138             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
139         }};
140     }
141
142     @SuppressWarnings("serial")
143     @Test
144     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
145         // This test tests the timing window in which a change listener is registered before the
146         // shard becomes the leader. We verify that the listener is registered and notified of the
147         // existing data when the shard becomes the leader.
148         new ShardTestKit(getSystem()) {{
149             // For this test, we want to send the RegisterChangeListener message after the shard
150             // has recovered from persistence and before it becomes the leader. So we subclass
151             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
152             // we know that the shard has been initialized to a follower and has started the
153             // election process. The following 2 CountDownLatches are used to coordinate the
154             // ElectionTimeout with the sending of the RegisterChangeListener message.
155             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
156             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
157             Creator<Shard> creator = new Creator<Shard>() {
158                 boolean firstElectionTimeout = true;
159
160                 @Override
161                 public Shard create() throws Exception {
162                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
163                             newDatastoreContext(), SCHEMA_CONTEXT) {
164                         @Override
165                         public void onReceiveCommand(final Object message) throws Exception {
166                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
167                                 // Got the first ElectionTimeout. We don't forward it to the
168                                 // base Shard yet until we've sent the RegisterChangeListener
169                                 // message. So we signal the onFirstElectionTimeout latch to tell
170                                 // the main thread to send the RegisterChangeListener message and
171                                 // start a thread to wait on the onChangeListenerRegistered latch,
172                                 // which the main thread signals after it has sent the message.
173                                 // After the onChangeListenerRegistered is triggered, we send the
174                                 // original ElectionTimeout message to proceed with the election.
175                                 firstElectionTimeout = false;
176                                 final ActorRef self = getSelf();
177                                 new Thread() {
178                                     @Override
179                                     public void run() {
180                                         Uninterruptibles.awaitUninterruptibly(
181                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
182                                         self.tell(message, self);
183                                     }
184                                 }.start();
185
186                                 onFirstElectionTimeout.countDown();
187                             } else {
188                                 super.onReceiveCommand(message);
189                             }
190                         }
191                     };
192                 }
193             };
194
195             MockDataChangeListener listener = new MockDataChangeListener(1);
196             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
197                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
198
199             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
200                     Props.create(new DelegatingShardCreator(creator)),
201                     "testRegisterChangeListenerWhenNotLeaderInitially");
202
203             // Write initial data into the in-memory store.
204             YangInstanceIdentifier path = TestModel.TEST_PATH;
205             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
206
207             // Wait until the shard receives the first ElectionTimeout message.
208             assertEquals("Got first ElectionTimeout", true,
209                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
210
211             // Now send the RegisterChangeListener and wait for the reply.
212             shard.tell(new RegisterChangeListener(path, dclActor.path(),
213                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
214
215             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
216                     RegisterChangeListenerReply.class);
217             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
218
219             // Sanity check - verify the shard is not the leader yet.
220             shard.tell(new FindLeader(), getRef());
221             FindLeaderReply findLeadeReply =
222                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
223             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
224
225             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
226             // with the election process.
227             onChangeListenerRegistered.countDown();
228
229             // Wait for the shard to become the leader and notify our listener with the existing
230             // data in the store.
231             listener.waitForChangeEvents(path);
232
233             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
234             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
235         }};
236     }
237
238     @Test
239     public void testCreateTransaction(){
240         new ShardTestKit(getSystem()) {{
241             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
242
243             waitUntilLeader(shard);
244
245             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
246
247             shard.tell(new CreateTransaction("txn-1",
248                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
249
250             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
251                     CreateTransactionReply.class);
252
253             String path = reply.getTransactionActorPath().toString();
254             assertTrue("Unexpected transaction path " + path,
255                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
256
257             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
258         }};
259     }
260
261     @Test
262     public void testCreateTransactionOnChain(){
263         new ShardTestKit(getSystem()) {{
264             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
265
266             waitUntilLeader(shard);
267
268             shard.tell(new CreateTransaction("txn-1",
269                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
270                     getRef());
271
272             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
273                     CreateTransactionReply.class);
274
275             String path = reply.getTransactionActorPath().toString();
276             assertTrue("Unexpected transaction path " + path,
277                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
278
279             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
280         }};
281     }
282
283     @SuppressWarnings("serial")
284     @Test
285     public void testPeerAddressResolved() throws Exception {
286         new ShardTestKit(getSystem()) {{
287             final CountDownLatch recoveryComplete = new CountDownLatch(1);
288             class TestShard extends Shard {
289                 TestShard() {
290                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
291                             newDatastoreContext(), SCHEMA_CONTEXT);
292                 }
293
294                 Map<String, String> getPeerAddresses() {
295                     return getRaftActorContext().getPeerAddresses();
296                 }
297
298                 @Override
299                 protected void onRecoveryComplete() {
300                     try {
301                         super.onRecoveryComplete();
302                     } finally {
303                         recoveryComplete.countDown();
304                     }
305                 }
306             }
307
308             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
309                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
310                         @Override
311                         public TestShard create() throws Exception {
312                             return new TestShard();
313                         }
314                     })), "testPeerAddressResolved");
315
316             //waitUntilLeader(shard);
317             assertEquals("Recovery complete", true,
318                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
319
320             String address = "akka://foobar";
321             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
322
323             assertEquals("getPeerAddresses", address,
324                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
325
326             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
327         }};
328     }
329
330     @Test
331     public void testApplySnapshot() throws Exception {
332         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
333                 "testApplySnapshot");
334
335         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
336         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
337
338         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
339
340         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
341         NormalizedNode<?,?> expected = readStore(store, root);
342
343         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
344                 SerializationUtils.serializeNormalizedNode(expected),
345                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
346
347         shard.underlyingActor().onReceiveCommand(applySnapshot);
348
349         NormalizedNode<?,?> actual = readStore(shard, root);
350
351         assertEquals("Root node", expected, actual);
352
353         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
354     }
355
356     @Test
357     public void testApplyState() throws Exception {
358
359         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
360
361         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
362
363         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
364                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
365
366         shard.underlyingActor().onReceiveCommand(applyState);
367
368         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
369         assertEquals("Applied state", node, actual);
370
371         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
372     }
373
374     @Test
375     public void testRecovery() throws Exception {
376
377         // Set up the InMemorySnapshotStore.
378
379         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
380         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
381
382         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
383
384         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
385
386         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
387                 SerializationUtils.serializeNormalizedNode(root),
388                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
389
390         // Set up the InMemoryJournal.
391
392         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
393                   new WriteModification(TestModel.OUTER_LIST_PATH,
394                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
395
396         int nListEntries = 16;
397         Set<Integer> listEntryKeys = new HashSet<>();
398
399         // Add some ModificationPayload entries
400         for(int i = 1; i <= nListEntries; i++) {
401             listEntryKeys.add(Integer.valueOf(i));
402             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
403                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
404             Modification mod = new MergeModification(path,
405                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
406             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
407                     newModificationPayload(mod)));
408         }
409
410         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
411                 new ApplyJournalEntries(nListEntries));
412
413         testRecovery(listEntryKeys);
414     }
415
416     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
417         MutableCompositeModification compMod = new MutableCompositeModification();
418         for(Modification mod: mods) {
419             compMod.addModification(mod);
420         }
421
422         return new ModificationPayload(compMod);
423     }
424
425     @SuppressWarnings({ "unchecked" })
426     @Test
427     public void testConcurrentThreePhaseCommits() throws Throwable {
428         new ShardTestKit(getSystem()) {{
429             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
430                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
431                     "testConcurrentThreePhaseCommits");
432
433             waitUntilLeader(shard);
434
435             final String transactionID1 = "tx1";
436             final String transactionID2 = "tx2";
437             final String transactionID3 = "tx3";
438
439             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
440             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
441             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
442             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
443                 @Override
444                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
445                     if(transactionID.equals(transactionID1)) {
446                         mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
447                         return mockCohort1.get();
448                     } else if(transactionID.equals(transactionID2)) {
449                         mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
450                         return mockCohort2.get();
451                     } else {
452                         mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
453                         return mockCohort3.get();
454                     }
455                 }
456             };
457
458             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
459
460             long timeoutSec = 5;
461             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
462             final Timeout timeout = new Timeout(duration);
463
464             // Send a BatchedModifications message for the first transaction.
465
466             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
467                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
468             BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
469             assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
470             assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
471
472             // Send the CanCommitTransaction message for the first Tx.
473
474             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
475             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
476                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
477             assertEquals("Can commit", true, canCommitReply.getCanCommit());
478
479             // Send BatchedModifications for the next 2 Tx's.
480
481             shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
482                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
483             expectMsgClass(duration, BatchedModificationsReply.class);
484
485             shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
486                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
487                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
488             expectMsgClass(duration, BatchedModificationsReply.class);
489
490             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
491             // processed after the first Tx completes.
492
493             Future<Object> canCommitFuture1 = Patterns.ask(shard,
494                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
495
496             Future<Object> canCommitFuture2 = Patterns.ask(shard,
497                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
498
499             // Send the CommitTransaction message for the first Tx. After it completes, it should
500             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
501
502             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
503             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
504
505             // Wait for the next 2 Tx's to complete.
506
507             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
508             final CountDownLatch commitLatch = new CountDownLatch(2);
509
510             class OnFutureComplete extends OnComplete<Object> {
511                 private final Class<?> expRespType;
512
513                 OnFutureComplete(final Class<?> expRespType) {
514                     this.expRespType = expRespType;
515                 }
516
517                 @Override
518                 public void onComplete(final Throwable error, final Object resp) {
519                     if(error != null) {
520                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
521                     } else {
522                         try {
523                             assertEquals("Commit response type", expRespType, resp.getClass());
524                             onSuccess(resp);
525                         } catch (Exception e) {
526                             caughtEx.set(e);
527                         }
528                     }
529                 }
530
531                 void onSuccess(final Object resp) throws Exception {
532                 }
533             }
534
535             class OnCommitFutureComplete extends OnFutureComplete {
536                 OnCommitFutureComplete() {
537                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
538                 }
539
540                 @Override
541                 public void onComplete(final Throwable error, final Object resp) {
542                     super.onComplete(error, resp);
543                     commitLatch.countDown();
544                 }
545             }
546
547             class OnCanCommitFutureComplete extends OnFutureComplete {
548                 private final String transactionID;
549
550                 OnCanCommitFutureComplete(final String transactionID) {
551                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
552                     this.transactionID = transactionID;
553                 }
554
555                 @Override
556                 void onSuccess(final Object resp) throws Exception {
557                     CanCommitTransactionReply canCommitReply =
558                             CanCommitTransactionReply.fromSerializable(resp);
559                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
560
561                     Future<Object> commitFuture = Patterns.ask(shard,
562                             new CommitTransaction(transactionID).toSerializable(), timeout);
563                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
564                 }
565             }
566
567             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
568                     getSystem().dispatcher());
569
570             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
571                     getSystem().dispatcher());
572
573             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
574
575             if(caughtEx.get() != null) {
576                 throw caughtEx.get();
577             }
578
579             assertEquals("Commits complete", true, done);
580
581             InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
582             inOrder.verify(mockCohort1.get()).canCommit();
583             inOrder.verify(mockCohort1.get()).preCommit();
584             inOrder.verify(mockCohort1.get()).commit();
585             inOrder.verify(mockCohort2.get()).canCommit();
586             inOrder.verify(mockCohort2.get()).preCommit();
587             inOrder.verify(mockCohort2.get()).commit();
588             inOrder.verify(mockCohort3.get()).canCommit();
589             inOrder.verify(mockCohort3.get()).preCommit();
590             inOrder.verify(mockCohort3.get()).commit();
591
592             // Verify data in the data store.
593
594             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
595             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
596             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
597                     outerList.getValue() instanceof Iterable);
598             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
599             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
600                        entry instanceof MapEntryNode);
601             MapEntryNode mapEntry = (MapEntryNode)entry;
602             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
603                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
604             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
605             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
606
607             verifyLastApplied(shard, 2);
608
609             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
610         }};
611     }
612
613     private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
614             NormalizedNode<?, ?> data, boolean ready) {
615         return newBatchedModifications(transactionID, null, path, data, ready);
616     }
617
618     private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
619             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
620         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
621         batched.addModification(new WriteModification(path, data));
622         batched.setReady(ready);
623         return batched;
624     }
625
626     @SuppressWarnings("unchecked")
627     @Test
628     public void testMultipleBatchedModifications() throws Throwable {
629         new ShardTestKit(getSystem()) {{
630             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
631                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
632                     "testMultipleBatchedModifications");
633
634             waitUntilLeader(shard);
635
636             final String transactionID = "tx";
637             FiniteDuration duration = duration("5 seconds");
638
639             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
640             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
641                 @Override
642                 public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
643                     if(mockCohort.get() == null) {
644                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
645                     }
646
647                     return mockCohort.get();
648                 }
649             };
650
651             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
652
653             // Send a BatchedModifications to start a transaction.
654
655             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
656                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
657             expectMsgClass(duration, BatchedModificationsReply.class);
658
659             // Send a couple more BatchedModifications.
660
661             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
662                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
663             expectMsgClass(duration, BatchedModificationsReply.class);
664
665             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
666                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
667                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
668             expectMsgClass(duration, BatchedModificationsReply.class);
669
670             // Send the CanCommitTransaction message.
671
672             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
673             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
674                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
675             assertEquals("Can commit", true, canCommitReply.getCanCommit());
676
677             // Send the CanCommitTransaction message.
678
679             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
680             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
681
682             InOrder inOrder = inOrder(mockCohort.get());
683             inOrder.verify(mockCohort.get()).canCommit();
684             inOrder.verify(mockCohort.get()).preCommit();
685             inOrder.verify(mockCohort.get()).commit();
686
687             // Verify data in the data store.
688
689             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
690             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
691             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
692                     outerList.getValue() instanceof Iterable);
693             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
694             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
695                        entry instanceof MapEntryNode);
696             MapEntryNode mapEntry = (MapEntryNode)entry;
697             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
698                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
699             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
700             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
701
702             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
703         }};
704     }
705
706     @Test
707     public void testBatchedModificationsOnTransactionChain() throws Throwable {
708         new ShardTestKit(getSystem()) {{
709             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
710                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
711                     "testBatchedModificationsOnTransactionChain");
712
713             waitUntilLeader(shard);
714
715             String transactionChainID = "txChain";
716             String transactionID1 = "tx1";
717             String transactionID2 = "tx2";
718
719             FiniteDuration duration = duration("5 seconds");
720
721             // Send a BatchedModifications to start a chained write transaction and ready it.
722
723             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
724             YangInstanceIdentifier path = TestModel.TEST_PATH;
725             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
726                     containerNode, true), getRef());
727             expectMsgClass(duration, BatchedModificationsReply.class);
728
729             // Create a read Tx on the same chain.
730
731             shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
732                     transactionChainID).toSerializable(), getRef());
733
734             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
735
736             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
737             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
738             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
739
740             // Commit the write transaction.
741
742             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
743             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
744                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
745             assertEquals("Can commit", true, canCommitReply.getCanCommit());
746
747             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
748             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
749
750             // Verify data in the data store.
751
752             NormalizedNode<?, ?> actualNode = readStore(shard, path);
753             assertEquals("Stored node", containerNode, actualNode);
754
755             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
756         }};
757     }
758
759     @Test
760     public void testOnBatchedModificationsWhenNotLeader() {
761         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
762         new ShardTestKit(getSystem()) {{
763             Creator<Shard> creator = new Creator<Shard>() {
764                 @Override
765                 public Shard create() throws Exception {
766                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
767                             newDatastoreContext(), SCHEMA_CONTEXT) {
768                         @Override
769                         protected boolean isLeader() {
770                             return overrideLeaderCalls.get() ? false : super.isLeader();
771                         }
772
773                         @Override
774                         protected ActorSelection getLeader() {
775                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
776                                 super.getLeader();
777                         }
778                     };
779                 }
780             };
781
782             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
783                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
784
785             waitUntilLeader(shard);
786
787             overrideLeaderCalls.set(true);
788
789             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
790
791             shard.tell(batched, ActorRef.noSender());
792
793             expectMsgEquals(batched);
794
795             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
796         }};
797     }
798
799     @Test
800     public void testCommitWithPersistenceDisabled() throws Throwable {
801         dataStoreContextBuilder.persistent(false);
802         new ShardTestKit(getSystem()) {{
803             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
804                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
805                     "testCommitWithPersistenceDisabled");
806
807             waitUntilLeader(shard);
808
809             String transactionID = "tx";
810             FiniteDuration duration = duration("5 seconds");
811
812             // Send a BatchedModifications to start a transaction.
813
814             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
815             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
816             expectMsgClass(duration, BatchedModificationsReply.class);
817
818             // Send the CanCommitTransaction message.
819
820             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
821             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
822                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
823             assertEquals("Can commit", true, canCommitReply.getCanCommit());
824
825             // Send the CanCommitTransaction message.
826
827             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
828             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
829
830             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
831             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
832
833             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
834         }};
835     }
836
837     @Test
838     public void testCommitWhenTransactionHasNoModifications(){
839         // Note that persistence is enabled which would normally result in the entry getting written to the journal
840         // but here that need not happen
841         new ShardTestKit(getSystem()) {
842             {
843                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
844                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
845                         "testCommitWhenTransactionHasNoModifications");
846
847                 waitUntilLeader(shard);
848
849                 String transactionID = "tx1";
850                 MutableCompositeModification modification = new MutableCompositeModification();
851                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
852                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
853                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
854                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
855
856                 FiniteDuration duration = duration("5 seconds");
857
858                 // Simulate the ForwardedReadyTransaction messages that would be sent
859                 // by the ShardTransaction.
860
861                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
862                         cohort, modification, true), getRef());
863                 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
864
865                 // Send the CanCommitTransaction message.
866
867                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
868                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
869                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
870                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
871
872                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
873                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
874
875                 InOrder inOrder = inOrder(cohort);
876                 inOrder.verify(cohort).canCommit();
877                 inOrder.verify(cohort).preCommit();
878                 inOrder.verify(cohort).commit();
879
880                 // Use MBean for verification
881                 // Committed transaction count should increase as usual
882                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
883
884                 // Commit index should not advance because this does not go into the journal
885                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
886
887                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
888
889             }
890         };
891     }
892
893     @Test
894     public void testCommitWhenTransactionHasModifications(){
895         new ShardTestKit(getSystem()) {
896             {
897                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
898                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
899                         "testCommitWhenTransactionHasModifications");
900
901                 waitUntilLeader(shard);
902
903                 String transactionID = "tx1";
904                 MutableCompositeModification modification = new MutableCompositeModification();
905                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
906                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
907                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
908                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
909                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
910
911                 FiniteDuration duration = duration("5 seconds");
912
913                 // Simulate the ForwardedReadyTransaction messages that would be sent
914                 // by the ShardTransaction.
915
916                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
917                         cohort, modification, true), getRef());
918                 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
919
920                 // Send the CanCommitTransaction message.
921
922                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
923                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
924                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
925                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
926
927                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
928                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
929
930                 InOrder inOrder = inOrder(cohort);
931                 inOrder.verify(cohort).canCommit();
932                 inOrder.verify(cohort).preCommit();
933                 inOrder.verify(cohort).commit();
934
935                 // Use MBean for verification
936                 // Committed transaction count should increase as usual
937                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
938
939                 // Commit index should advance as we do not have an empty modification
940                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
941
942                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
943
944             }
945         };
946     }
947
948     @Test
949     public void testCommitPhaseFailure() throws Throwable {
950         new ShardTestKit(getSystem()) {{
951             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
952                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
953                     "testCommitPhaseFailure");
954
955             waitUntilLeader(shard);
956
957             // Setup 2 mock cohorts. The first one fails in the commit phase.
958
959             final String transactionID1 = "tx1";
960             final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
961             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
962             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
963             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
964
965             final String transactionID2 = "tx2";
966             final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
967             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
968
969             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
970                 @Override
971                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
972                         DOMStoreThreePhaseCommitCohort actual) {
973                     return transactionID1.equals(transactionID) ? cohort1 : cohort2;
974                 }
975             };
976
977             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
978
979             FiniteDuration duration = duration("5 seconds");
980             final Timeout timeout = new Timeout(duration);
981
982             // Send BatchedModifications to start and ready each transaction.
983
984             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
985                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
986             expectMsgClass(duration, BatchedModificationsReply.class);
987
988             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
989                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
990             expectMsgClass(duration, BatchedModificationsReply.class);
991
992             // Send the CanCommitTransaction message for the first Tx.
993
994             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
995             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
996                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
997             assertEquals("Can commit", true, canCommitReply.getCanCommit());
998
999             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1000             // processed after the first Tx completes.
1001
1002             Future<Object> canCommitFuture = Patterns.ask(shard,
1003                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1004
1005             // Send the CommitTransaction message for the first Tx. This should send back an error
1006             // and trigger the 2nd Tx to proceed.
1007
1008             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1009             expectMsgClass(duration, akka.actor.Status.Failure.class);
1010
1011             // Wait for the 2nd Tx to complete the canCommit phase.
1012
1013             final CountDownLatch latch = new CountDownLatch(1);
1014             canCommitFuture.onComplete(new OnComplete<Object>() {
1015                 @Override
1016                 public void onComplete(final Throwable t, final Object resp) {
1017                     latch.countDown();
1018                 }
1019             }, getSystem().dispatcher());
1020
1021             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1022
1023             InOrder inOrder = inOrder(cohort1, cohort2);
1024             inOrder.verify(cohort1).canCommit();
1025             inOrder.verify(cohort1).preCommit();
1026             inOrder.verify(cohort1).commit();
1027             inOrder.verify(cohort2).canCommit();
1028
1029             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1030         }};
1031     }
1032
1033     @Test
1034     public void testPreCommitPhaseFailure() throws Throwable {
1035         new ShardTestKit(getSystem()) {{
1036             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1037                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1038                     "testPreCommitPhaseFailure");
1039
1040             waitUntilLeader(shard);
1041
1042             String transactionID = "tx1";
1043             final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1044             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1045             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1046
1047             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
1048                 @Override
1049                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
1050                         DOMStoreThreePhaseCommitCohort actual) {
1051                     return cohort;
1052                 }
1053             };
1054
1055             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
1056
1057             FiniteDuration duration = duration("5 seconds");
1058
1059             // Send BatchedModifications to start and ready a transaction.
1060
1061             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
1062                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1063             expectMsgClass(duration, BatchedModificationsReply.class);
1064
1065             // Send the CanCommitTransaction message.
1066
1067             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1068             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1069                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1070             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1071
1072             // Send the CommitTransaction message. This should send back an error
1073             // for preCommit failure.
1074
1075             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1076             expectMsgClass(duration, akka.actor.Status.Failure.class);
1077
1078             InOrder inOrder = inOrder(cohort);
1079             inOrder.verify(cohort).canCommit();
1080             inOrder.verify(cohort).preCommit();
1081
1082             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1083         }};
1084     }
1085
1086     @Test
1087     public void testCanCommitPhaseFailure() throws Throwable {
1088         new ShardTestKit(getSystem()) {{
1089             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1090                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1091                     "testCanCommitPhaseFailure");
1092
1093             waitUntilLeader(shard);
1094
1095             final FiniteDuration duration = duration("5 seconds");
1096
1097             String transactionID = "tx1";
1098             final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1099             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1100
1101             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
1102                 @Override
1103                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
1104                         DOMStoreThreePhaseCommitCohort actual) {
1105                     return cohort;
1106                 }
1107             };
1108
1109             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
1110
1111             // Send BatchedModifications to start and ready a transaction.
1112
1113             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
1114                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1115             expectMsgClass(duration, BatchedModificationsReply.class);
1116
1117             // Send the CanCommitTransaction message.
1118
1119             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1120             expectMsgClass(duration, akka.actor.Status.Failure.class);
1121
1122             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1123         }};
1124     }
1125
1126     @Test
1127     public void testAbortBeforeFinishCommit() throws Throwable {
1128         new ShardTestKit(getSystem()) {{
1129             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1130                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1131                     "testAbortBeforeFinishCommit");
1132
1133             waitUntilLeader(shard);
1134
1135             final FiniteDuration duration = duration("5 seconds");
1136             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1137
1138             final String transactionID = "tx1";
1139             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1140                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1141                 @Override
1142                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1143                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1144
1145                     // Simulate an AbortTransaction message occurring during replication, after
1146                     // persisting and before finishing the commit to the in-memory store.
1147                     // We have no followers so due to optimizations in the RaftActor, it does not
1148                     // attempt replication and thus we can't send an AbortTransaction message b/c
1149                     // it would be processed too late after CommitTransaction completes. So we'll
1150                     // simulate an AbortTransaction message occurring during replication by calling
1151                     // the shard directly.
1152                     //
1153                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1154
1155                     return preCommitFuture;
1156                 }
1157             };
1158
1159             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
1160                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1161             expectMsgClass(duration, BatchedModificationsReply.class);
1162
1163             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1164             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1165                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1166             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1167
1168             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1169             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1170
1171             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1172
1173             // Since we're simulating an abort occurring during replication and before finish commit,
1174             // the data should still get written to the in-memory store since we've gotten past
1175             // canCommit and preCommit and persisted the data.
1176             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1177
1178             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1179         }};
1180     }
1181
1182     @Test
1183     public void testTransactionCommitTimeout() throws Throwable {
1184         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1185
1186         new ShardTestKit(getSystem()) {{
1187             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1188                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1189                     "testTransactionCommitTimeout");
1190
1191             waitUntilLeader(shard);
1192
1193             final FiniteDuration duration = duration("5 seconds");
1194
1195             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1196             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1197                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1198
1199             // Create and ready the 1st Tx - will timeout
1200
1201             String transactionID1 = "tx1";
1202             shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
1203                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1204                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
1205             expectMsgClass(duration, BatchedModificationsReply.class);
1206
1207             // Create and ready the 2nd Tx
1208
1209             String transactionID2 = "tx2";
1210             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1211                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1212             shard.tell(newBatchedModifications(transactionID2, listNodePath,
1213                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
1214             expectMsgClass(duration, BatchedModificationsReply.class);
1215
1216             // canCommit 1st Tx. We don't send the commit so it should timeout.
1217
1218             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1219             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1220
1221             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1222
1223             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1224             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1225
1226             // Commit the 2nd Tx.
1227
1228             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1229             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1230
1231             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1232             assertNotNull(listNodePath + " not found", node);
1233
1234             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1235         }};
1236     }
1237
1238     @Test
1239     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1240         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1241
1242         new ShardTestKit(getSystem()) {{
1243             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1244                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1245                     "testTransactionCommitQueueCapacityExceeded");
1246
1247             waitUntilLeader(shard);
1248
1249             final FiniteDuration duration = duration("5 seconds");
1250
1251             String transactionID1 = "tx1";
1252             String transactionID2 = "tx2";
1253             String transactionID3 = "tx3";
1254
1255             // Send a BatchedModifications to start transactions and ready them.
1256
1257             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1258                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1259             expectMsgClass(duration, BatchedModificationsReply.class);
1260
1261             shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
1262                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
1263             expectMsgClass(duration, BatchedModificationsReply.class);
1264
1265             shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1266                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1267             expectMsgClass(duration, BatchedModificationsReply.class);
1268
1269             // canCommit 1st Tx.
1270
1271             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1272             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1273
1274             // canCommit the 2nd Tx - it should get queued.
1275
1276             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1277
1278             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1279
1280             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1281             expectMsgClass(duration, akka.actor.Status.Failure.class);
1282
1283             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1284         }};
1285     }
1286
1287     @Test
1288     public void testCanCommitBeforeReadyFailure() throws Throwable {
1289         new ShardTestKit(getSystem()) {{
1290             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1291                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1292                     "testCanCommitBeforeReadyFailure");
1293
1294             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1295             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1296
1297             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1298         }};
1299     }
1300
1301     @Test
1302     public void testAbortTransaction() throws Throwable {
1303         new ShardTestKit(getSystem()) {{
1304             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1305                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1306                     "testAbortTransaction");
1307
1308             waitUntilLeader(shard);
1309
1310             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1311
1312             final String transactionID1 = "tx1";
1313             final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1314             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1315             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1316
1317             final String transactionID2 = "tx2";
1318             final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1319             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1320
1321             FiniteDuration duration = duration("5 seconds");
1322             final Timeout timeout = new Timeout(duration);
1323
1324             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
1325                 @Override
1326                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
1327                         DOMStoreThreePhaseCommitCohort actual) {
1328                     return transactionID1.equals(transactionID) ? cohort1 : cohort2;
1329                 }
1330             };
1331
1332             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
1333
1334             // Send BatchedModifications to start and ready each transaction.
1335
1336             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1337                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1338             expectMsgClass(duration, BatchedModificationsReply.class);
1339
1340             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1341                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1342             expectMsgClass(duration, BatchedModificationsReply.class);
1343
1344             // Send the CanCommitTransaction message for the first Tx.
1345
1346             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1347             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1348                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1349             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1350
1351             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1352             // processed after the first Tx completes.
1353
1354             Future<Object> canCommitFuture = Patterns.ask(shard,
1355                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1356
1357             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1358             // Tx to proceed.
1359
1360             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1361             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1362
1363             // Wait for the 2nd Tx to complete the canCommit phase.
1364
1365             Await.ready(canCommitFuture, duration);
1366
1367             InOrder inOrder = inOrder(cohort1, cohort2);
1368             inOrder.verify(cohort1).canCommit();
1369             inOrder.verify(cohort2).canCommit();
1370
1371             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1372         }};
1373     }
1374
1375     @Test
1376     public void testCreateSnapshot() throws Exception {
1377         testCreateSnapshot(true, "testCreateSnapshot");
1378     }
1379
1380     @Test
1381     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1382         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1383     }
1384
1385     @SuppressWarnings("serial")
1386     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1387
1388         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1389         class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1390             DataPersistenceProvider delegate;
1391
1392             DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1393                 this.delegate = delegate;
1394             }
1395
1396             @Override
1397             public boolean isRecoveryApplicable() {
1398                 return delegate.isRecoveryApplicable();
1399             }
1400
1401             @Override
1402             public <T> void persist(T o, Procedure<T> procedure) {
1403                 delegate.persist(o, procedure);
1404             }
1405
1406             @Override
1407             public void saveSnapshot(Object o) {
1408                 savedSnapshot.set(o);
1409                 delegate.saveSnapshot(o);
1410             }
1411
1412             @Override
1413             public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1414                 delegate.deleteSnapshots(criteria);
1415             }
1416
1417             @Override
1418             public void deleteMessages(long sequenceNumber) {
1419                 delegate.deleteMessages(sequenceNumber);
1420             }
1421         }
1422
1423         dataStoreContextBuilder.persistent(persistent);
1424
1425         new ShardTestKit(getSystem()) {{
1426             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1427             Creator<Shard> creator = new Creator<Shard>() {
1428                 @Override
1429                 public Shard create() throws Exception {
1430                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1431                             newDatastoreContext(), SCHEMA_CONTEXT) {
1432
1433                         DelegatingPersistentDataProvider delegating;
1434
1435                         @Override
1436                         protected DataPersistenceProvider persistence() {
1437                             if(delegating == null) {
1438                                 delegating = new DelegatingPersistentDataProvider(super.persistence());
1439                             }
1440
1441                             return delegating;
1442                         }
1443
1444                         @Override
1445                         protected void commitSnapshot(final long sequenceNumber) {
1446                             super.commitSnapshot(sequenceNumber);
1447                             latch.get().countDown();
1448                         }
1449                     };
1450                 }
1451             };
1452
1453             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1454                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1455
1456             waitUntilLeader(shard);
1457
1458             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1459
1460             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1461
1462             CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1463             shard.tell(capture, getRef());
1464
1465             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1466
1467             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1468                     savedSnapshot.get() instanceof Snapshot);
1469
1470             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1471
1472             latch.set(new CountDownLatch(1));
1473             savedSnapshot.set(null);
1474
1475             shard.tell(capture, getRef());
1476
1477             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1478
1479             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1480                     savedSnapshot.get() instanceof Snapshot);
1481
1482             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1483
1484             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1485         }
1486
1487         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1488
1489             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1490             assertEquals("Root node", expectedRoot, actual);
1491
1492         }};
1493     }
1494
1495     /**
1496      * This test simply verifies that the applySnapShot logic will work
1497      * @throws ReadFailedException
1498      */
1499     @Test
1500     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1501         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1502
1503         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1504
1505         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1506         putTransaction.write(TestModel.TEST_PATH,
1507             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1508         commitTransaction(putTransaction);
1509
1510
1511         NormalizedNode<?, ?> expected = readStore(store);
1512
1513         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1514
1515         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1516         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1517
1518         commitTransaction(writeTransaction);
1519
1520         NormalizedNode<?, ?> actual = readStore(store);
1521
1522         assertEquals(expected, actual);
1523     }
1524
1525     @Test
1526     public void testRecoveryApplicable(){
1527
1528         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1529                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1530
1531         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1532                 persistentContext, SCHEMA_CONTEXT);
1533
1534         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1535                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1536
1537         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1538                 nonPersistentContext, SCHEMA_CONTEXT);
1539
1540         new ShardTestKit(getSystem()) {{
1541             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1542                     persistentProps, "testPersistence1");
1543
1544             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1545
1546             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1547
1548             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1549                     nonPersistentProps, "testPersistence2");
1550
1551             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1552
1553             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1554
1555         }};
1556
1557     }
1558
1559     @Test
1560     public void testOnDatastoreContext() {
1561         new ShardTestKit(getSystem()) {{
1562             dataStoreContextBuilder.persistent(true);
1563
1564             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1565
1566             assertEquals("isRecoveryApplicable", true,
1567                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1568
1569             waitUntilLeader(shard);
1570
1571             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1572
1573             assertEquals("isRecoveryApplicable", false,
1574                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1575
1576             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1577
1578             assertEquals("isRecoveryApplicable", true,
1579                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1580
1581             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1582         }};
1583     }
1584
1585     @Test
1586     public void testRegisterRoleChangeListener() throws Exception {
1587         new ShardTestKit(getSystem()) {
1588             {
1589                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1590                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1591                         "testRegisterRoleChangeListener");
1592
1593                 waitUntilLeader(shard);
1594
1595                 TestActorRef<MessageCollectorActor> listener =
1596                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1597
1598                 shard.tell(new RegisterRoleChangeListener(), listener);
1599
1600                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1601                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1602                 // sleep.
1603                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1604
1605                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1606
1607                 assertEquals(1, allMatching.size());
1608             }
1609         };
1610     }
1611
1612     @Test
1613     public void testFollowerInitialSyncStatus() throws Exception {
1614         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1615                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1616                 "testFollowerInitialSyncStatus");
1617
1618         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1619
1620         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1621
1622         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1623
1624         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1625
1626         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1627     }
1628
1629     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1630         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1631         ListenableFuture<Void> future =
1632             commitCohort.preCommit();
1633         try {
1634             future.get();
1635             future = commitCohort.commit();
1636             future.get();
1637         } catch (InterruptedException | ExecutionException e) {
1638         }
1639     }
1640 }