Merge "Use BatchedModifications message in place of ReadyTransaction message"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.Dispatchers;
17 import akka.dispatch.OnComplete;
18 import akka.japi.Creator;
19 import akka.pattern.Patterns;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Function;
23 import com.google.common.base.Optional;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import java.io.IOException;
29 import java.util.Collections;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
48 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
55 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
60 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
61 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
62 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
63 import org.opendaylight.controller.cluster.datastore.modification.Modification;
64 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
65 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
67 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
68 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
69 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
70 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
71 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
72 import org.opendaylight.controller.cluster.raft.RaftActorContext;
73 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
74 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
75 import org.opendaylight.controller.cluster.raft.Snapshot;
76 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
77 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
79 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
80 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
81 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
82 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
83 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
84 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
85 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
86 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
87 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
88 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
89 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
90 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
91 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
92 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
93 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
95 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
97 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
98 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
99 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
100 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
101 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
102 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
103 import scala.concurrent.Await;
104 import scala.concurrent.Future;
105 import scala.concurrent.duration.FiniteDuration;
106
107 public class ShardTest extends AbstractShardTest {
108
109     @Test
110     public void testRegisterChangeListener() throws Exception {
111         new ShardTestKit(getSystem()) {{
112             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
113                     newShardProps(),  "testRegisterChangeListener");
114
115             waitUntilLeader(shard);
116
117             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
118
119             MockDataChangeListener listener = new MockDataChangeListener(1);
120             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
121                     "testRegisterChangeListener-DataChangeListener");
122
123             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
124                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
125
126             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
127                     RegisterChangeListenerReply.class);
128             String replyPath = reply.getListenerRegistrationPath().toString();
129             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
130                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
131
132             YangInstanceIdentifier path = TestModel.TEST_PATH;
133             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
134
135             listener.waitForChangeEvents(path);
136
137             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
138             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
139         }};
140     }
141
142     @SuppressWarnings("serial")
143     @Test
144     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
145         // This test tests the timing window in which a change listener is registered before the
146         // shard becomes the leader. We verify that the listener is registered and notified of the
147         // existing data when the shard becomes the leader.
148         new ShardTestKit(getSystem()) {{
149             // For this test, we want to send the RegisterChangeListener message after the shard
150             // has recovered from persistence and before it becomes the leader. So we subclass
151             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
152             // we know that the shard has been initialized to a follower and has started the
153             // election process. The following 2 CountDownLatches are used to coordinate the
154             // ElectionTimeout with the sending of the RegisterChangeListener message.
155             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
156             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
157             Creator<Shard> creator = new Creator<Shard>() {
158                 boolean firstElectionTimeout = true;
159
160                 @Override
161                 public Shard create() throws Exception {
162                     // Use a non persistent provider because this test actually invokes persist on the journal
163                     // this will cause all other messages to not be queued properly after that.
164                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
165                     // it does do a persist)
166                     return new Shard(shardID, Collections.<String,String>emptyMap(),
167                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
168                         @Override
169                         public void onReceiveCommand(final Object message) throws Exception {
170                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
171                                 // Got the first ElectionTimeout. We don't forward it to the
172                                 // base Shard yet until we've sent the RegisterChangeListener
173                                 // message. So we signal the onFirstElectionTimeout latch to tell
174                                 // the main thread to send the RegisterChangeListener message and
175                                 // start a thread to wait on the onChangeListenerRegistered latch,
176                                 // which the main thread signals after it has sent the message.
177                                 // After the onChangeListenerRegistered is triggered, we send the
178                                 // original ElectionTimeout message to proceed with the election.
179                                 firstElectionTimeout = false;
180                                 final ActorRef self = getSelf();
181                                 new Thread() {
182                                     @Override
183                                     public void run() {
184                                         Uninterruptibles.awaitUninterruptibly(
185                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
186                                         self.tell(message, self);
187                                     }
188                                 }.start();
189
190                                 onFirstElectionTimeout.countDown();
191                             } else {
192                                 super.onReceiveCommand(message);
193                             }
194                         }
195                     };
196                 }
197             };
198
199             MockDataChangeListener listener = new MockDataChangeListener(1);
200             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
201                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
202
203             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
204                     Props.create(new DelegatingShardCreator(creator)),
205                     "testRegisterChangeListenerWhenNotLeaderInitially");
206
207             // Write initial data into the in-memory store.
208             YangInstanceIdentifier path = TestModel.TEST_PATH;
209             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
210
211             // Wait until the shard receives the first ElectionTimeout message.
212             assertEquals("Got first ElectionTimeout", true,
213                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
214
215             // Now send the RegisterChangeListener and wait for the reply.
216             shard.tell(new RegisterChangeListener(path, dclActor,
217                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
218
219             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
220                     RegisterChangeListenerReply.class);
221             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
222
223             // Sanity check - verify the shard is not the leader yet.
224             shard.tell(new FindLeader(), getRef());
225             FindLeaderReply findLeadeReply =
226                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
227             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
228
229             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
230             // with the election process.
231             onChangeListenerRegistered.countDown();
232
233             // Wait for the shard to become the leader and notify our listener with the existing
234             // data in the store.
235             listener.waitForChangeEvents(path);
236
237             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
238             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
239         }};
240     }
241
242     @Test
243     public void testCreateTransaction(){
244         new ShardTestKit(getSystem()) {{
245             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
246
247             waitUntilLeader(shard);
248
249             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
250
251             shard.tell(new CreateTransaction("txn-1",
252                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
253
254             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
255                     CreateTransactionReply.class);
256
257             String path = reply.getTransactionActorPath().toString();
258             assertTrue("Unexpected transaction path " + path,
259                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
260
261             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
262         }};
263     }
264
265     @Test
266     public void testCreateTransactionOnChain(){
267         new ShardTestKit(getSystem()) {{
268             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
269
270             waitUntilLeader(shard);
271
272             shard.tell(new CreateTransaction("txn-1",
273                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
274                     getRef());
275
276             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
277                     CreateTransactionReply.class);
278
279             String path = reply.getTransactionActorPath().toString();
280             assertTrue("Unexpected transaction path " + path,
281                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
282
283             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
284         }};
285     }
286
287     @SuppressWarnings("serial")
288     @Test
289     public void testPeerAddressResolved() throws Exception {
290         new ShardTestKit(getSystem()) {{
291             final CountDownLatch recoveryComplete = new CountDownLatch(1);
292             class TestShard extends Shard {
293                 TestShard() {
294                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
295                             newDatastoreContext(), SCHEMA_CONTEXT);
296                 }
297
298                 Map<String, String> getPeerAddresses() {
299                     return getRaftActorContext().getPeerAddresses();
300                 }
301
302                 @Override
303                 protected void onRecoveryComplete() {
304                     try {
305                         super.onRecoveryComplete();
306                     } finally {
307                         recoveryComplete.countDown();
308                     }
309                 }
310             }
311
312             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
313                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
314                         @Override
315                         public TestShard create() throws Exception {
316                             return new TestShard();
317                         }
318                     })), "testPeerAddressResolved");
319
320             //waitUntilLeader(shard);
321             assertEquals("Recovery complete", true,
322                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
323
324             String address = "akka://foobar";
325             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
326
327             assertEquals("getPeerAddresses", address,
328                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
329
330             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
331         }};
332     }
333
334     @Test
335     public void testApplySnapshot() throws Exception {
336         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
337                 "testApplySnapshot");
338
339         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
340         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
341
342         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
343
344         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
345         NormalizedNode<?,?> expected = readStore(store, root);
346
347         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
348                 SerializationUtils.serializeNormalizedNode(expected),
349                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
350
351         shard.underlyingActor().onReceiveCommand(applySnapshot);
352
353         NormalizedNode<?,?> actual = readStore(shard, root);
354
355         assertEquals("Root node", expected, actual);
356
357         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
358     }
359
360     @Test
361     public void testApplyState() throws Exception {
362
363         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
364
365         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
366
367         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
368                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
369
370         shard.underlyingActor().onReceiveCommand(applyState);
371
372         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
373         assertEquals("Applied state", node, actual);
374
375         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
376     }
377
378     @Test
379     public void testRecovery() throws Exception {
380
381         // Set up the InMemorySnapshotStore.
382
383         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
384         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
385
386         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
387
388         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
389
390         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
391                 SerializationUtils.serializeNormalizedNode(root),
392                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
393
394         // Set up the InMemoryJournal.
395
396         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
397                   new WriteModification(TestModel.OUTER_LIST_PATH,
398                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
399
400         int nListEntries = 16;
401         Set<Integer> listEntryKeys = new HashSet<>();
402
403         // Add some ModificationPayload entries
404         for(int i = 1; i <= nListEntries; i++) {
405             listEntryKeys.add(Integer.valueOf(i));
406             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
407                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
408             Modification mod = new MergeModification(path,
409                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
410             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
411                     newModificationPayload(mod)));
412         }
413
414         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
415                 new ApplyJournalEntries(nListEntries));
416
417         testRecovery(listEntryKeys);
418     }
419
420     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
421         MutableCompositeModification compMod = new MutableCompositeModification();
422         for(Modification mod: mods) {
423             compMod.addModification(mod);
424         }
425
426         return new ModificationPayload(compMod);
427     }
428
429     @SuppressWarnings({ "unchecked" })
430     @Test
431     public void testConcurrentThreePhaseCommits() throws Throwable {
432         new ShardTestKit(getSystem()) {{
433             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
434                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
435                     "testConcurrentThreePhaseCommits");
436
437             waitUntilLeader(shard);
438
439          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
440
441             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
442
443             String transactionID1 = "tx1";
444             MutableCompositeModification modification1 = new MutableCompositeModification();
445             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
446                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
447
448             String transactionID2 = "tx2";
449             MutableCompositeModification modification2 = new MutableCompositeModification();
450             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
451                     TestModel.OUTER_LIST_PATH,
452                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
453                     modification2);
454
455             String transactionID3 = "tx3";
456             MutableCompositeModification modification3 = new MutableCompositeModification();
457             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
458                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
459                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
460                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
461                     modification3);
462
463             long timeoutSec = 5;
464             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
465             final Timeout timeout = new Timeout(duration);
466
467             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
468             // by the ShardTransaction.
469
470             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
471                     cohort1, modification1, true), getRef());
472             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
473                     expectMsgClass(duration, ReadyTransactionReply.class));
474             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
475
476             // Send the CanCommitTransaction message for the first Tx.
477
478             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
479             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
480                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
481             assertEquals("Can commit", true, canCommitReply.getCanCommit());
482
483             // Send the ForwardedReadyTransaction for the next 2 Tx's.
484
485             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
486                     cohort2, modification2, true), getRef());
487             expectMsgClass(duration, ReadyTransactionReply.class);
488
489             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
490                     cohort3, modification3, true), getRef());
491             expectMsgClass(duration, ReadyTransactionReply.class);
492
493             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
494             // processed after the first Tx completes.
495
496             Future<Object> canCommitFuture1 = Patterns.ask(shard,
497                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
498
499             Future<Object> canCommitFuture2 = Patterns.ask(shard,
500                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
501
502             // Send the CommitTransaction message for the first Tx. After it completes, it should
503             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
504
505             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
506             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
507
508             // Wait for the next 2 Tx's to complete.
509
510             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
511             final CountDownLatch commitLatch = new CountDownLatch(2);
512
513             class OnFutureComplete extends OnComplete<Object> {
514                 private final Class<?> expRespType;
515
516                 OnFutureComplete(final Class<?> expRespType) {
517                     this.expRespType = expRespType;
518                 }
519
520                 @Override
521                 public void onComplete(final Throwable error, final Object resp) {
522                     if(error != null) {
523                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
524                     } else {
525                         try {
526                             assertEquals("Commit response type", expRespType, resp.getClass());
527                             onSuccess(resp);
528                         } catch (Exception e) {
529                             caughtEx.set(e);
530                         }
531                     }
532                 }
533
534                 void onSuccess(final Object resp) throws Exception {
535                 }
536             }
537
538             class OnCommitFutureComplete extends OnFutureComplete {
539                 OnCommitFutureComplete() {
540                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
541                 }
542
543                 @Override
544                 public void onComplete(final Throwable error, final Object resp) {
545                     super.onComplete(error, resp);
546                     commitLatch.countDown();
547                 }
548             }
549
550             class OnCanCommitFutureComplete extends OnFutureComplete {
551                 private final String transactionID;
552
553                 OnCanCommitFutureComplete(final String transactionID) {
554                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
555                     this.transactionID = transactionID;
556                 }
557
558                 @Override
559                 void onSuccess(final Object resp) throws Exception {
560                     CanCommitTransactionReply canCommitReply =
561                             CanCommitTransactionReply.fromSerializable(resp);
562                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
563
564                     Future<Object> commitFuture = Patterns.ask(shard,
565                             new CommitTransaction(transactionID).toSerializable(), timeout);
566                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
567                 }
568             }
569
570             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
571                     getSystem().dispatcher());
572
573             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
574                     getSystem().dispatcher());
575
576             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
577
578             if(caughtEx.get() != null) {
579                 throw caughtEx.get();
580             }
581
582             assertEquals("Commits complete", true, done);
583
584             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
585             inOrder.verify(cohort1).canCommit();
586             inOrder.verify(cohort1).preCommit();
587             inOrder.verify(cohort1).commit();
588             inOrder.verify(cohort2).canCommit();
589             inOrder.verify(cohort2).preCommit();
590             inOrder.verify(cohort2).commit();
591             inOrder.verify(cohort3).canCommit();
592             inOrder.verify(cohort3).preCommit();
593             inOrder.verify(cohort3).commit();
594
595             // Verify data in the data store.
596
597             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
598             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
599             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
600                     outerList.getValue() instanceof Iterable);
601             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
602             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
603                        entry instanceof MapEntryNode);
604             MapEntryNode mapEntry = (MapEntryNode)entry;
605             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
606                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
607             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
608             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
609
610             verifyLastApplied(shard, 2);
611
612             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
613         }};
614     }
615
616     private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
617             NormalizedNode<?, ?> data, boolean ready) {
618         return newBatchedModifications(transactionID, null, path, data, ready);
619     }
620
621     private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
622             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
623         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
624         batched.addModification(new WriteModification(path, data));
625         batched.setReady(ready);
626         return batched;
627     }
628
629     @SuppressWarnings("unchecked")
630     @Test
631     public void testMultipleBatchedModifications() throws Throwable {
632         new ShardTestKit(getSystem()) {{
633             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
634                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
635                     "testMultipleBatchedModifications");
636
637             waitUntilLeader(shard);
638
639             final String transactionID = "tx";
640             FiniteDuration duration = duration("5 seconds");
641
642             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
643             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
644                 @Override
645                 public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
646                     if(mockCohort.get() == null) {
647                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
648                     }
649
650                     return mockCohort.get();
651                 }
652             };
653
654             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
655
656             // Send a BatchedModifications to start a transaction.
657
658             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
659                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
660             expectMsgClass(duration, BatchedModificationsReply.class);
661
662             // Send a couple more BatchedModifications.
663
664             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
665                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
666             expectMsgClass(duration, BatchedModificationsReply.class);
667
668             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
669                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
670                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
671             expectMsgClass(duration, ReadyTransactionReply.class);
672
673             // Send the CanCommitTransaction message.
674
675             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
676             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
677                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
678             assertEquals("Can commit", true, canCommitReply.getCanCommit());
679
680             // Send the CanCommitTransaction message.
681
682             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
683             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
684
685             InOrder inOrder = inOrder(mockCohort.get());
686             inOrder.verify(mockCohort.get()).canCommit();
687             inOrder.verify(mockCohort.get()).preCommit();
688             inOrder.verify(mockCohort.get()).commit();
689
690             // Verify data in the data store.
691
692             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
693             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
694             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
695                     outerList.getValue() instanceof Iterable);
696             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
697             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
698                        entry instanceof MapEntryNode);
699             MapEntryNode mapEntry = (MapEntryNode)entry;
700             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
701                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
702             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
703             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
704
705             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
706         }};
707     }
708
709     @Test
710     public void testBatchedModificationsOnTransactionChain() throws Throwable {
711         new ShardTestKit(getSystem()) {{
712             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
713                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
714                     "testBatchedModificationsOnTransactionChain");
715
716             waitUntilLeader(shard);
717
718             String transactionChainID = "txChain";
719             String transactionID1 = "tx1";
720             String transactionID2 = "tx2";
721
722             FiniteDuration duration = duration("5 seconds");
723
724             // Send a BatchedModifications to start a chained write transaction and ready it.
725
726             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
727             YangInstanceIdentifier path = TestModel.TEST_PATH;
728             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
729                     containerNode, true), getRef());
730             expectMsgClass(duration, ReadyTransactionReply.class);
731
732             // Create a read Tx on the same chain.
733
734             shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
735                     transactionChainID).toSerializable(), getRef());
736
737             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
738
739             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
740             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
741             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
742
743             // Commit the write transaction.
744
745             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
746             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
747                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
748             assertEquals("Can commit", true, canCommitReply.getCanCommit());
749
750             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
751             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
752
753             // Verify data in the data store.
754
755             NormalizedNode<?, ?> actualNode = readStore(shard, path);
756             assertEquals("Stored node", containerNode, actualNode);
757
758             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
759         }};
760     }
761
762     @Test
763     public void testOnBatchedModificationsWhenNotLeader() {
764         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
765         new ShardTestKit(getSystem()) {{
766             Creator<Shard> creator = new Creator<Shard>() {
767                 @Override
768                 public Shard create() throws Exception {
769                     return new Shard(shardID, Collections.<String,String>emptyMap(),
770                             newDatastoreContext(), SCHEMA_CONTEXT) {
771                         @Override
772                         protected boolean isLeader() {
773                             return overrideLeaderCalls.get() ? false : super.isLeader();
774                         }
775
776                         @Override
777                         protected ActorSelection getLeader() {
778                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
779                                 super.getLeader();
780                         }
781                     };
782                 }
783             };
784
785             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
786                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
787
788             waitUntilLeader(shard);
789
790             overrideLeaderCalls.set(true);
791
792             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
793
794             shard.tell(batched, ActorRef.noSender());
795
796             expectMsgEquals(batched);
797
798             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
799         }};
800     }
801
802     @Test
803     public void testCommitWithPersistenceDisabled() throws Throwable {
804         dataStoreContextBuilder.persistent(false);
805         new ShardTestKit(getSystem()) {{
806             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
807                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
808                     "testCommitWithPersistenceDisabled");
809
810             waitUntilLeader(shard);
811
812             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
813
814             // Setup a simulated transactions with a mock cohort.
815
816             String transactionID = "tx";
817             MutableCompositeModification modification = new MutableCompositeModification();
818             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
819             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
820                     TestModel.TEST_PATH, containerNode, modification);
821
822             FiniteDuration duration = duration("5 seconds");
823
824             // Simulate the ForwardedReadyTransaction messages that would be sent
825             // by the ShardTransaction.
826
827             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
828                     cohort, modification, true), getRef());
829             expectMsgClass(duration, ReadyTransactionReply.class);
830
831             // Send the CanCommitTransaction message.
832
833             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
834             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
835                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
836             assertEquals("Can commit", true, canCommitReply.getCanCommit());
837
838             // Send the CanCommitTransaction message.
839
840             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
841             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
842
843             InOrder inOrder = inOrder(cohort);
844             inOrder.verify(cohort).canCommit();
845             inOrder.verify(cohort).preCommit();
846             inOrder.verify(cohort).commit();
847
848             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
849             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
850
851             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
852         }};
853     }
854
855     @Test
856     public void testCommitWhenTransactionHasNoModifications(){
857         // Note that persistence is enabled which would normally result in the entry getting written to the journal
858         // but here that need not happen
859         new ShardTestKit(getSystem()) {
860             {
861                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
862                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
863                         "testCommitWhenTransactionHasNoModifications");
864
865                 waitUntilLeader(shard);
866
867                 String transactionID = "tx1";
868                 MutableCompositeModification modification = new MutableCompositeModification();
869                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
870                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
871                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
872                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
873
874                 FiniteDuration duration = duration("5 seconds");
875
876                 // Simulate the ForwardedReadyTransaction messages that would be sent
877                 // by the ShardTransaction.
878
879                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
880                         cohort, modification, true), getRef());
881                 expectMsgClass(duration, ReadyTransactionReply.class);
882
883                 // Send the CanCommitTransaction message.
884
885                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
886                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
887                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
888                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
889
890                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
891                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
892
893                 InOrder inOrder = inOrder(cohort);
894                 inOrder.verify(cohort).canCommit();
895                 inOrder.verify(cohort).preCommit();
896                 inOrder.verify(cohort).commit();
897
898                 // Use MBean for verification
899                 // Committed transaction count should increase as usual
900                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
901
902                 // Commit index should not advance because this does not go into the journal
903                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
904
905                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
906
907             }
908         };
909     }
910
911     @Test
912     public void testCommitWhenTransactionHasModifications(){
913         new ShardTestKit(getSystem()) {
914             {
915                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
916                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
917                         "testCommitWhenTransactionHasModifications");
918
919                 waitUntilLeader(shard);
920
921                 String transactionID = "tx1";
922                 MutableCompositeModification modification = new MutableCompositeModification();
923                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
924                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
925                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
926                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
927                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
928
929                 FiniteDuration duration = duration("5 seconds");
930
931                 // Simulate the ForwardedReadyTransaction messages that would be sent
932                 // by the ShardTransaction.
933
934                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
935                         cohort, modification, true), getRef());
936                 expectMsgClass(duration, ReadyTransactionReply.class);
937
938                 // Send the CanCommitTransaction message.
939
940                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
941                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
942                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
943                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
944
945                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
946                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
947
948                 InOrder inOrder = inOrder(cohort);
949                 inOrder.verify(cohort).canCommit();
950                 inOrder.verify(cohort).preCommit();
951                 inOrder.verify(cohort).commit();
952
953                 // Use MBean for verification
954                 // Committed transaction count should increase as usual
955                 assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
956
957                 // Commit index should advance as we do not have an empty modification
958                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
959
960                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
961
962             }
963         };
964     }
965
966     @Test
967     public void testCommitPhaseFailure() throws Throwable {
968         new ShardTestKit(getSystem()) {{
969             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
970                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
971                     "testCommitPhaseFailure");
972
973             waitUntilLeader(shard);
974
975          // Setup 2 simulated transactions with mock cohorts. The first one fails in the
976             // commit phase.
977
978             String transactionID1 = "tx1";
979             MutableCompositeModification modification1 = new MutableCompositeModification();
980             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
981             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
982             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
983             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
984
985             String transactionID2 = "tx2";
986             MutableCompositeModification modification2 = new MutableCompositeModification();
987             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
988             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
989
990             FiniteDuration duration = duration("5 seconds");
991             final Timeout timeout = new Timeout(duration);
992
993             // Simulate the ForwardedReadyTransaction messages that would be sent
994             // by the ShardTransaction.
995
996             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
997                     cohort1, modification1, true), getRef());
998             expectMsgClass(duration, ReadyTransactionReply.class);
999
1000             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1001                     cohort2, modification2, true), getRef());
1002             expectMsgClass(duration, ReadyTransactionReply.class);
1003
1004             // Send the CanCommitTransaction message for the first Tx.
1005
1006             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1007             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1008                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1009             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1010
1011             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1012             // processed after the first Tx completes.
1013
1014             Future<Object> canCommitFuture = Patterns.ask(shard,
1015                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1016
1017             // Send the CommitTransaction message for the first Tx. This should send back an error
1018             // and trigger the 2nd Tx to proceed.
1019
1020             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1021             expectMsgClass(duration, akka.actor.Status.Failure.class);
1022
1023             // Wait for the 2nd Tx to complete the canCommit phase.
1024
1025             final CountDownLatch latch = new CountDownLatch(1);
1026             canCommitFuture.onComplete(new OnComplete<Object>() {
1027                 @Override
1028                 public void onComplete(final Throwable t, final Object resp) {
1029                     latch.countDown();
1030                 }
1031             }, getSystem().dispatcher());
1032
1033             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1034
1035             InOrder inOrder = inOrder(cohort1, cohort2);
1036             inOrder.verify(cohort1).canCommit();
1037             inOrder.verify(cohort1).preCommit();
1038             inOrder.verify(cohort1).commit();
1039             inOrder.verify(cohort2).canCommit();
1040
1041             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1042         }};
1043     }
1044
1045     @Test
1046     public void testPreCommitPhaseFailure() throws Throwable {
1047         new ShardTestKit(getSystem()) {{
1048             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1049                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1050                     "testPreCommitPhaseFailure");
1051
1052             waitUntilLeader(shard);
1053
1054             String transactionID = "tx1";
1055             MutableCompositeModification modification = new MutableCompositeModification();
1056             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1057             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1058             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1059
1060             FiniteDuration duration = duration("5 seconds");
1061
1062             // Simulate the ForwardedReadyTransaction messages that would be sent
1063             // by the ShardTransaction.
1064
1065             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1066                     cohort, modification, true), getRef());
1067             expectMsgClass(duration, ReadyTransactionReply.class);
1068
1069             // Send the CanCommitTransaction message.
1070
1071             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1072             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1073                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1074             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1075
1076             // Send the CommitTransaction message. This should send back an error
1077             // for preCommit failure.
1078
1079             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1080             expectMsgClass(duration, akka.actor.Status.Failure.class);
1081
1082             InOrder inOrder = inOrder(cohort);
1083             inOrder.verify(cohort).canCommit();
1084             inOrder.verify(cohort).preCommit();
1085
1086             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1087         }};
1088     }
1089
1090     @Test
1091     public void testCanCommitPhaseFailure() throws Throwable {
1092         new ShardTestKit(getSystem()) {{
1093             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1094                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1095                     "testCanCommitPhaseFailure");
1096
1097             waitUntilLeader(shard);
1098
1099             final FiniteDuration duration = duration("5 seconds");
1100
1101             String transactionID = "tx1";
1102             MutableCompositeModification modification = new MutableCompositeModification();
1103             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1104             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1105
1106             // Simulate the ForwardedReadyTransaction messages that would be sent
1107             // by the ShardTransaction.
1108
1109             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1110                     cohort, modification, true), getRef());
1111             expectMsgClass(duration, ReadyTransactionReply.class);
1112
1113             // Send the CanCommitTransaction message.
1114
1115             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1116             expectMsgClass(duration, akka.actor.Status.Failure.class);
1117
1118             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1119         }};
1120     }
1121
1122     @Test
1123     public void testAbortBeforeFinishCommit() throws Throwable {
1124         new ShardTestKit(getSystem()) {{
1125             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1126                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1127                     "testAbortBeforeFinishCommit");
1128
1129             waitUntilLeader(shard);
1130
1131             final FiniteDuration duration = duration("5 seconds");
1132             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1133
1134             final String transactionID = "tx1";
1135             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1136                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1137                 @Override
1138                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1139                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1140
1141                     // Simulate an AbortTransaction message occurring during replication, after
1142                     // persisting and before finishing the commit to the in-memory store.
1143                     // We have no followers so due to optimizations in the RaftActor, it does not
1144                     // attempt replication and thus we can't send an AbortTransaction message b/c
1145                     // it would be processed too late after CommitTransaction completes. So we'll
1146                     // simulate an AbortTransaction message occurring during replication by calling
1147                     // the shard directly.
1148                     //
1149                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1150
1151                     return preCommitFuture;
1152                 }
1153             };
1154
1155             MutableCompositeModification modification = new MutableCompositeModification();
1156             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1157                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1158                     modification, preCommit);
1159
1160             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1161                     cohort, modification, true), getRef());
1162             expectMsgClass(duration, ReadyTransactionReply.class);
1163
1164             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1165             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1166                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1167             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1168
1169             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1170             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1171
1172             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1173
1174             // Since we're simulating an abort occurring during replication and before finish commit,
1175             // the data should still get written to the in-memory store since we've gotten past
1176             // canCommit and preCommit and persisted the data.
1177             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1178
1179             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1180         }};
1181     }
1182
1183     @Test
1184     public void testTransactionCommitTimeout() throws Throwable {
1185         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1186
1187         new ShardTestKit(getSystem()) {{
1188             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1189                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1190                     "testTransactionCommitTimeout");
1191
1192             waitUntilLeader(shard);
1193
1194             final FiniteDuration duration = duration("5 seconds");
1195
1196             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1197
1198             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1199             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1200                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1201
1202             // Create 1st Tx - will timeout
1203
1204             String transactionID1 = "tx1";
1205             MutableCompositeModification modification1 = new MutableCompositeModification();
1206             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1207                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1208                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1209                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1210                     modification1);
1211
1212             // Create 2nd Tx
1213
1214             String transactionID2 = "tx3";
1215             MutableCompositeModification modification2 = new MutableCompositeModification();
1216             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1217                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1218             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1219                     listNodePath,
1220                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1221                     modification2);
1222
1223             // Ready the Tx's
1224
1225             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1226                     cohort1, modification1, true), getRef());
1227             expectMsgClass(duration, ReadyTransactionReply.class);
1228
1229             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1230                     cohort2, modification2, true), getRef());
1231             expectMsgClass(duration, ReadyTransactionReply.class);
1232
1233             // canCommit 1st Tx. We don't send the commit so it should timeout.
1234
1235             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1236             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1237
1238             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1239
1240             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1241             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1242
1243             // Commit the 2nd Tx.
1244
1245             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1246             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1247
1248             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1249             assertNotNull(listNodePath + " not found", node);
1250
1251             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1252         }};
1253     }
1254
1255     @Test
1256     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1257         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1258
1259         new ShardTestKit(getSystem()) {{
1260             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1261                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1262                     "testTransactionCommitQueueCapacityExceeded");
1263
1264             waitUntilLeader(shard);
1265
1266             final FiniteDuration duration = duration("5 seconds");
1267
1268             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1269
1270             String transactionID1 = "tx1";
1271             MutableCompositeModification modification1 = new MutableCompositeModification();
1272             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1273                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1274
1275             String transactionID2 = "tx2";
1276             MutableCompositeModification modification2 = new MutableCompositeModification();
1277             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1278                     TestModel.OUTER_LIST_PATH,
1279                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1280                     modification2);
1281
1282             String transactionID3 = "tx3";
1283             MutableCompositeModification modification3 = new MutableCompositeModification();
1284             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1285                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1286
1287             // Ready the Tx's
1288
1289             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1290                     cohort1, modification1, true), getRef());
1291             expectMsgClass(duration, ReadyTransactionReply.class);
1292
1293             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1294                     cohort2, modification2, true), getRef());
1295             expectMsgClass(duration, ReadyTransactionReply.class);
1296
1297             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1298                     cohort3, modification3, true), getRef());
1299             expectMsgClass(duration, ReadyTransactionReply.class);
1300
1301             // canCommit 1st Tx.
1302
1303             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1304             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1305
1306             // canCommit the 2nd Tx - it should get queued.
1307
1308             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1309
1310             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1311
1312             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1313             expectMsgClass(duration, akka.actor.Status.Failure.class);
1314
1315             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1316         }};
1317     }
1318
1319     @Test
1320     public void testCanCommitBeforeReadyFailure() throws Throwable {
1321         new ShardTestKit(getSystem()) {{
1322             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1323                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1324                     "testCanCommitBeforeReadyFailure");
1325
1326             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1327             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1328
1329             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1330         }};
1331     }
1332
1333     @Test
1334     public void testAbortTransaction() throws Throwable {
1335         new ShardTestKit(getSystem()) {{
1336             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1337                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1338                     "testAbortTransaction");
1339
1340             waitUntilLeader(shard);
1341
1342             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1343
1344             String transactionID1 = "tx1";
1345             MutableCompositeModification modification1 = new MutableCompositeModification();
1346             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1347             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1348             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1349
1350             String transactionID2 = "tx2";
1351             MutableCompositeModification modification2 = new MutableCompositeModification();
1352             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1353             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1354
1355             FiniteDuration duration = duration("5 seconds");
1356             final Timeout timeout = new Timeout(duration);
1357
1358             // Simulate the ForwardedReadyTransaction messages that would be sent
1359             // by the ShardTransaction.
1360
1361             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1362                     cohort1, modification1, true), getRef());
1363             expectMsgClass(duration, ReadyTransactionReply.class);
1364
1365             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1366                     cohort2, modification2, true), getRef());
1367             expectMsgClass(duration, ReadyTransactionReply.class);
1368
1369             // Send the CanCommitTransaction message for the first Tx.
1370
1371             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1372             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1373                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1374             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1375
1376             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1377             // processed after the first Tx completes.
1378
1379             Future<Object> canCommitFuture = Patterns.ask(shard,
1380                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1381
1382             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1383             // Tx to proceed.
1384
1385             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1386             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1387
1388             // Wait for the 2nd Tx to complete the canCommit phase.
1389
1390             Await.ready(canCommitFuture, duration);
1391
1392             InOrder inOrder = inOrder(cohort1, cohort2);
1393             inOrder.verify(cohort1).canCommit();
1394             inOrder.verify(cohort2).canCommit();
1395
1396             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1397         }};
1398     }
1399
1400     @Test
1401     public void testCreateSnapshot() throws Exception {
1402         testCreateSnapshot(true, "testCreateSnapshot");
1403     }
1404
1405     @Test
1406     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1407         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1408     }
1409
1410     @SuppressWarnings("serial")
1411     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1412
1413         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1414         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1415             TestPersistentDataProvider(DataPersistenceProvider delegate) {
1416                 super(delegate);
1417             }
1418
1419             @Override
1420             public void saveSnapshot(Object o) {
1421                 savedSnapshot.set(o);
1422                 super.saveSnapshot(o);
1423             }
1424         }
1425
1426         dataStoreContextBuilder.persistent(persistent);
1427
1428         new ShardTestKit(getSystem()) {{
1429             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1430
1431             class TestShard extends Shard {
1432
1433                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
1434                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
1435                     super(name, peerAddresses, datastoreContext, schemaContext);
1436                     setPersistence(new TestPersistentDataProvider(super.persistence()));
1437                 }
1438
1439                 @Override
1440                 protected void commitSnapshot(final long sequenceNumber) {
1441                     super.commitSnapshot(sequenceNumber);
1442                     latch.get().countDown();
1443                 }
1444
1445                 @Override
1446                 public RaftActorContext getRaftActorContext() {
1447                     return super.getRaftActorContext();
1448                 }
1449             }
1450
1451             Creator<Shard> creator = new Creator<Shard>() {
1452                 @Override
1453                 public Shard create() throws Exception {
1454                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
1455                             newDatastoreContext(), SCHEMA_CONTEXT);
1456                 }
1457             };
1458
1459             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1460                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1461
1462             waitUntilLeader(shard);
1463
1464             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1465
1466             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1467
1468             // Trigger creation of a snapshot by ensuring
1469             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1470             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1471
1472             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1473
1474             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1475                     savedSnapshot.get() instanceof Snapshot);
1476
1477             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1478
1479             latch.set(new CountDownLatch(1));
1480             savedSnapshot.set(null);
1481
1482             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1483
1484             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1485
1486             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1487                     savedSnapshot.get() instanceof Snapshot);
1488
1489             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1490
1491             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1492         }
1493
1494         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1495
1496             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1497             assertEquals("Root node", expectedRoot, actual);
1498
1499         }};
1500     }
1501
1502     /**
1503      * This test simply verifies that the applySnapShot logic will work
1504      * @throws ReadFailedException
1505      */
1506     @Test
1507     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1508         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1509
1510         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1511
1512         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1513         putTransaction.write(TestModel.TEST_PATH,
1514             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1515         commitTransaction(putTransaction);
1516
1517
1518         NormalizedNode<?, ?> expected = readStore(store);
1519
1520         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1521
1522         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1523         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1524
1525         commitTransaction(writeTransaction);
1526
1527         NormalizedNode<?, ?> actual = readStore(store);
1528
1529         assertEquals(expected, actual);
1530     }
1531
1532     @Test
1533     public void testRecoveryApplicable(){
1534
1535         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1536                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1537
1538         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1539                 persistentContext, SCHEMA_CONTEXT);
1540
1541         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1542                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1543
1544         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1545                 nonPersistentContext, SCHEMA_CONTEXT);
1546
1547         new ShardTestKit(getSystem()) {{
1548             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1549                     persistentProps, "testPersistence1");
1550
1551             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1552
1553             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1554
1555             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1556                     nonPersistentProps, "testPersistence2");
1557
1558             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1559
1560             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1561
1562         }};
1563
1564     }
1565
1566     @Test
1567     public void testOnDatastoreContext() {
1568         new ShardTestKit(getSystem()) {{
1569             dataStoreContextBuilder.persistent(true);
1570
1571             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1572
1573             assertEquals("isRecoveryApplicable", true,
1574                     shard.underlyingActor().persistence().isRecoveryApplicable());
1575
1576             waitUntilLeader(shard);
1577
1578             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1579
1580             assertEquals("isRecoveryApplicable", false,
1581                     shard.underlyingActor().persistence().isRecoveryApplicable());
1582
1583             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1584
1585             assertEquals("isRecoveryApplicable", true,
1586                     shard.underlyingActor().persistence().isRecoveryApplicable());
1587
1588             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1589         }};
1590     }
1591
1592     @Test
1593     public void testRegisterRoleChangeListener() throws Exception {
1594         new ShardTestKit(getSystem()) {
1595             {
1596                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1597                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1598                         "testRegisterRoleChangeListener");
1599
1600                 waitUntilLeader(shard);
1601
1602                 TestActorRef<MessageCollectorActor> listener =
1603                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1604
1605                 shard.tell(new RegisterRoleChangeListener(), listener);
1606
1607                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1608                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1609                 // sleep.
1610                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1611
1612                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1613
1614                 assertEquals(1, allMatching.size());
1615             }
1616         };
1617     }
1618
1619     @Test
1620     public void testFollowerInitialSyncStatus() throws Exception {
1621         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1622                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1623                 "testFollowerInitialSyncStatus");
1624
1625         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1626
1627         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1628
1629         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1630
1631         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1632
1633         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1634     }
1635
1636     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1637         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1638         ListenableFuture<Void> future =
1639             commitCohort.preCommit();
1640         try {
1641             future.get();
1642             future = commitCohort.commit();
1643             future.get();
1644         } catch (InterruptedException | ExecutionException e) {
1645         }
1646     }
1647 }