Merge "Use SnapshotManager"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.Dispatchers;
17 import akka.dispatch.OnComplete;
18 import akka.japi.Creator;
19 import akka.japi.Procedure;
20 import akka.pattern.Patterns;
21 import akka.persistence.SnapshotSelectionCriteria;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Test;
42 import org.mockito.InOrder;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
61 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
62 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
63 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
64 import org.opendaylight.controller.cluster.datastore.modification.Modification;
65 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
66 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
67 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
68 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
69 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
70 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
71 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
72 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
73 import org.opendaylight.controller.cluster.raft.RaftActorContext;
74 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
75 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
76 import org.opendaylight.controller.cluster.raft.Snapshot;
77 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
80 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
81 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
82 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
84 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
85 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
86 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
87 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
88 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
89 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
90 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
91 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
92 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
93 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
95 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
97 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
98 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
100 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
101 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
102 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
103 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
104 import scala.concurrent.Await;
105 import scala.concurrent.Future;
106 import scala.concurrent.duration.FiniteDuration;
107
108 public class ShardTest extends AbstractShardTest {
109
110     @Test
111     public void testRegisterChangeListener() throws Exception {
112         new ShardTestKit(getSystem()) {{
113             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
114                     newShardProps(),  "testRegisterChangeListener");
115
116             waitUntilLeader(shard);
117
118             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
119
120             MockDataChangeListener listener = new MockDataChangeListener(1);
121             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
122                     "testRegisterChangeListener-DataChangeListener");
123
124             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
125                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
126
127             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
128                     RegisterChangeListenerReply.class);
129             String replyPath = reply.getListenerRegistrationPath().toString();
130             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
131                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
132
133             YangInstanceIdentifier path = TestModel.TEST_PATH;
134             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
135
136             listener.waitForChangeEvents(path);
137
138             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
139             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
140         }};
141     }
142
143     @SuppressWarnings("serial")
144     @Test
145     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
146         // This test tests the timing window in which a change listener is registered before the
147         // shard becomes the leader. We verify that the listener is registered and notified of the
148         // existing data when the shard becomes the leader.
149         new ShardTestKit(getSystem()) {{
150             // For this test, we want to send the RegisterChangeListener message after the shard
151             // has recovered from persistence and before it becomes the leader. So we subclass
152             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
153             // we know that the shard has been initialized to a follower and has started the
154             // election process. The following 2 CountDownLatches are used to coordinate the
155             // ElectionTimeout with the sending of the RegisterChangeListener message.
156             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
157             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
158             Creator<Shard> creator = new Creator<Shard>() {
159                 boolean firstElectionTimeout = true;
160
161                 @Override
162                 public Shard create() throws Exception {
163                     return new Shard(shardID, Collections.<String,String>emptyMap(),
164                             newDatastoreContext(), SCHEMA_CONTEXT) {
165                         @Override
166                         public void onReceiveCommand(final Object message) throws Exception {
167                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
168                                 // Got the first ElectionTimeout. We don't forward it to the
169                                 // base Shard yet until we've sent the RegisterChangeListener
170                                 // message. So we signal the onFirstElectionTimeout latch to tell
171                                 // the main thread to send the RegisterChangeListener message and
172                                 // start a thread to wait on the onChangeListenerRegistered latch,
173                                 // which the main thread signals after it has sent the message.
174                                 // After the onChangeListenerRegistered is triggered, we send the
175                                 // original ElectionTimeout message to proceed with the election.
176                                 firstElectionTimeout = false;
177                                 final ActorRef self = getSelf();
178                                 new Thread() {
179                                     @Override
180                                     public void run() {
181                                         Uninterruptibles.awaitUninterruptibly(
182                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
183                                         self.tell(message, self);
184                                     }
185                                 }.start();
186
187                                 onFirstElectionTimeout.countDown();
188                             } else {
189                                 super.onReceiveCommand(message);
190                             }
191                         }
192                     };
193                 }
194             };
195
196             MockDataChangeListener listener = new MockDataChangeListener(1);
197             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
198                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
199
200             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
201                     Props.create(new DelegatingShardCreator(creator)),
202                     "testRegisterChangeListenerWhenNotLeaderInitially");
203
204             // Write initial data into the in-memory store.
205             YangInstanceIdentifier path = TestModel.TEST_PATH;
206             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
207
208             // Wait until the shard receives the first ElectionTimeout message.
209             assertEquals("Got first ElectionTimeout", true,
210                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
211
212             // Now send the RegisterChangeListener and wait for the reply.
213             shard.tell(new RegisterChangeListener(path, dclActor,
214                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
215
216             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
217                     RegisterChangeListenerReply.class);
218             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
219
220             // Sanity check - verify the shard is not the leader yet.
221             shard.tell(new FindLeader(), getRef());
222             FindLeaderReply findLeadeReply =
223                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
224             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
225
226             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
227             // with the election process.
228             onChangeListenerRegistered.countDown();
229
230             // Wait for the shard to become the leader and notify our listener with the existing
231             // data in the store.
232             listener.waitForChangeEvents(path);
233
234             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
235             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
236         }};
237     }
238
239     @Test
240     public void testCreateTransaction(){
241         new ShardTestKit(getSystem()) {{
242             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
243
244             waitUntilLeader(shard);
245
246             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
247
248             shard.tell(new CreateTransaction("txn-1",
249                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
250
251             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
252                     CreateTransactionReply.class);
253
254             String path = reply.getTransactionActorPath().toString();
255             assertTrue("Unexpected transaction path " + path,
256                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
257
258             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
259         }};
260     }
261
262     @Test
263     public void testCreateTransactionOnChain(){
264         new ShardTestKit(getSystem()) {{
265             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
266
267             waitUntilLeader(shard);
268
269             shard.tell(new CreateTransaction("txn-1",
270                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
271                     getRef());
272
273             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
274                     CreateTransactionReply.class);
275
276             String path = reply.getTransactionActorPath().toString();
277             assertTrue("Unexpected transaction path " + path,
278                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
279
280             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
281         }};
282     }
283
284     @SuppressWarnings("serial")
285     @Test
286     public void testPeerAddressResolved() throws Exception {
287         new ShardTestKit(getSystem()) {{
288             final CountDownLatch recoveryComplete = new CountDownLatch(1);
289             class TestShard extends Shard {
290                 TestShard() {
291                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
292                             newDatastoreContext(), SCHEMA_CONTEXT);
293                 }
294
295                 Map<String, String> getPeerAddresses() {
296                     return getRaftActorContext().getPeerAddresses();
297                 }
298
299                 @Override
300                 protected void onRecoveryComplete() {
301                     try {
302                         super.onRecoveryComplete();
303                     } finally {
304                         recoveryComplete.countDown();
305                     }
306                 }
307             }
308
309             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
310                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
311                         @Override
312                         public TestShard create() throws Exception {
313                             return new TestShard();
314                         }
315                     })), "testPeerAddressResolved");
316
317             //waitUntilLeader(shard);
318             assertEquals("Recovery complete", true,
319                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
320
321             String address = "akka://foobar";
322             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
323
324             assertEquals("getPeerAddresses", address,
325                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
326
327             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
328         }};
329     }
330
331     @Test
332     public void testApplySnapshot() throws Exception {
333         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
334                 "testApplySnapshot");
335
336         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
337         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
338
339         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
340
341         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
342         NormalizedNode<?,?> expected = readStore(store, root);
343
344         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
345                 SerializationUtils.serializeNormalizedNode(expected),
346                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
347
348         shard.underlyingActor().onReceiveCommand(applySnapshot);
349
350         NormalizedNode<?,?> actual = readStore(shard, root);
351
352         assertEquals("Root node", expected, actual);
353
354         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
355     }
356
357     @Test
358     public void testApplyState() throws Exception {
359
360         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
361
362         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
363
364         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
365                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
366
367         shard.underlyingActor().onReceiveCommand(applyState);
368
369         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
370         assertEquals("Applied state", node, actual);
371
372         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
373     }
374
375     @Test
376     public void testRecovery() throws Exception {
377
378         // Set up the InMemorySnapshotStore.
379
380         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
381         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
382
383         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
384
385         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
386
387         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
388                 SerializationUtils.serializeNormalizedNode(root),
389                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
390
391         // Set up the InMemoryJournal.
392
393         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
394                   new WriteModification(TestModel.OUTER_LIST_PATH,
395                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
396
397         int nListEntries = 16;
398         Set<Integer> listEntryKeys = new HashSet<>();
399
400         // Add some ModificationPayload entries
401         for(int i = 1; i <= nListEntries; i++) {
402             listEntryKeys.add(Integer.valueOf(i));
403             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
404                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
405             Modification mod = new MergeModification(path,
406                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
407             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
408                     newModificationPayload(mod)));
409         }
410
411         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
412                 new ApplyJournalEntries(nListEntries));
413
414         testRecovery(listEntryKeys);
415     }
416
417     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
418         MutableCompositeModification compMod = new MutableCompositeModification();
419         for(Modification mod: mods) {
420             compMod.addModification(mod);
421         }
422
423         return new ModificationPayload(compMod);
424     }
425
426     @SuppressWarnings({ "unchecked" })
427     @Test
428     public void testConcurrentThreePhaseCommits() throws Throwable {
429         new ShardTestKit(getSystem()) {{
430             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
431                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
432                     "testConcurrentThreePhaseCommits");
433
434             waitUntilLeader(shard);
435
436             final String transactionID1 = "tx1";
437             final String transactionID2 = "tx2";
438             final String transactionID3 = "tx3";
439
440             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
441             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
442             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
443             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
444                 @Override
445                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
446                     if(transactionID.equals(transactionID1)) {
447                         mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
448                         return mockCohort1.get();
449                     } else if(transactionID.equals(transactionID2)) {
450                         mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
451                         return mockCohort2.get();
452                     } else {
453                         mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
454                         return mockCohort3.get();
455                     }
456                 }
457             };
458
459             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
460
461             long timeoutSec = 5;
462             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
463             final Timeout timeout = new Timeout(duration);
464
465             // Send a BatchedModifications message for the first transaction.
466
467             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
468                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
469             BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
470             assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
471             assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
472
473             // Send the CanCommitTransaction message for the first Tx.
474
475             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
476             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
477                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
478             assertEquals("Can commit", true, canCommitReply.getCanCommit());
479
480             // Send BatchedModifications for the next 2 Tx's.
481
482             shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
483                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
484             expectMsgClass(duration, BatchedModificationsReply.class);
485
486             shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
487                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
488                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
489             expectMsgClass(duration, BatchedModificationsReply.class);
490
491             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
492             // processed after the first Tx completes.
493
494             Future<Object> canCommitFuture1 = Patterns.ask(shard,
495                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
496
497             Future<Object> canCommitFuture2 = Patterns.ask(shard,
498                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
499
500             // Send the CommitTransaction message for the first Tx. After it completes, it should
501             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
502
503             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
504             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
505
506             // Wait for the next 2 Tx's to complete.
507
508             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
509             final CountDownLatch commitLatch = new CountDownLatch(2);
510
511             class OnFutureComplete extends OnComplete<Object> {
512                 private final Class<?> expRespType;
513
514                 OnFutureComplete(final Class<?> expRespType) {
515                     this.expRespType = expRespType;
516                 }
517
518                 @Override
519                 public void onComplete(final Throwable error, final Object resp) {
520                     if(error != null) {
521                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
522                     } else {
523                         try {
524                             assertEquals("Commit response type", expRespType, resp.getClass());
525                             onSuccess(resp);
526                         } catch (Exception e) {
527                             caughtEx.set(e);
528                         }
529                     }
530                 }
531
532                 void onSuccess(final Object resp) throws Exception {
533                 }
534             }
535
536             class OnCommitFutureComplete extends OnFutureComplete {
537                 OnCommitFutureComplete() {
538                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
539                 }
540
541                 @Override
542                 public void onComplete(final Throwable error, final Object resp) {
543                     super.onComplete(error, resp);
544                     commitLatch.countDown();
545                 }
546             }
547
548             class OnCanCommitFutureComplete extends OnFutureComplete {
549                 private final String transactionID;
550
551                 OnCanCommitFutureComplete(final String transactionID) {
552                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
553                     this.transactionID = transactionID;
554                 }
555
556                 @Override
557                 void onSuccess(final Object resp) throws Exception {
558                     CanCommitTransactionReply canCommitReply =
559                             CanCommitTransactionReply.fromSerializable(resp);
560                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
561
562                     Future<Object> commitFuture = Patterns.ask(shard,
563                             new CommitTransaction(transactionID).toSerializable(), timeout);
564                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
565                 }
566             }
567
568             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
569                     getSystem().dispatcher());
570
571             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
572                     getSystem().dispatcher());
573
574             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
575
576             if(caughtEx.get() != null) {
577                 throw caughtEx.get();
578             }
579
580             assertEquals("Commits complete", true, done);
581
582             InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
583             inOrder.verify(mockCohort1.get()).canCommit();
584             inOrder.verify(mockCohort1.get()).preCommit();
585             inOrder.verify(mockCohort1.get()).commit();
586             inOrder.verify(mockCohort2.get()).canCommit();
587             inOrder.verify(mockCohort2.get()).preCommit();
588             inOrder.verify(mockCohort2.get()).commit();
589             inOrder.verify(mockCohort3.get()).canCommit();
590             inOrder.verify(mockCohort3.get()).preCommit();
591             inOrder.verify(mockCohort3.get()).commit();
592
593             // Verify data in the data store.
594
595             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
596             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
597             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
598                     outerList.getValue() instanceof Iterable);
599             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
600             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
601                        entry instanceof MapEntryNode);
602             MapEntryNode mapEntry = (MapEntryNode)entry;
603             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
604                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
605             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
606             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
607
608             verifyLastApplied(shard, 2);
609
610             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
611         }};
612     }
613
614     private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
615             NormalizedNode<?, ?> data, boolean ready) {
616         return newBatchedModifications(transactionID, null, path, data, ready);
617     }
618
619     private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
620             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
621         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
622         batched.addModification(new WriteModification(path, data));
623         batched.setReady(ready);
624         return batched;
625     }
626
627     @SuppressWarnings("unchecked")
628     @Test
629     public void testMultipleBatchedModifications() throws Throwable {
630         new ShardTestKit(getSystem()) {{
631             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
632                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
633                     "testMultipleBatchedModifications");
634
635             waitUntilLeader(shard);
636
637             final String transactionID = "tx";
638             FiniteDuration duration = duration("5 seconds");
639
640             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
641             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
642                 @Override
643                 public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
644                     if(mockCohort.get() == null) {
645                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
646                     }
647
648                     return mockCohort.get();
649                 }
650             };
651
652             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
653
654             // Send a BatchedModifications to start a transaction.
655
656             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
657                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
658             expectMsgClass(duration, BatchedModificationsReply.class);
659
660             // Send a couple more BatchedModifications.
661
662             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
663                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
664             expectMsgClass(duration, BatchedModificationsReply.class);
665
666             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
667                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
668                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
669             expectMsgClass(duration, BatchedModificationsReply.class);
670
671             // Send the CanCommitTransaction message.
672
673             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
674             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
675                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
676             assertEquals("Can commit", true, canCommitReply.getCanCommit());
677
678             // Send the CanCommitTransaction message.
679
680             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
681             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
682
683             InOrder inOrder = inOrder(mockCohort.get());
684             inOrder.verify(mockCohort.get()).canCommit();
685             inOrder.verify(mockCohort.get()).preCommit();
686             inOrder.verify(mockCohort.get()).commit();
687
688             // Verify data in the data store.
689
690             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
691             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
692             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
693                     outerList.getValue() instanceof Iterable);
694             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
695             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
696                        entry instanceof MapEntryNode);
697             MapEntryNode mapEntry = (MapEntryNode)entry;
698             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
699                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
700             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
701             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
702
703             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
704         }};
705     }
706
707     @Test
708     public void testBatchedModificationsOnTransactionChain() throws Throwable {
709         new ShardTestKit(getSystem()) {{
710             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
711                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
712                     "testBatchedModificationsOnTransactionChain");
713
714             waitUntilLeader(shard);
715
716             String transactionChainID = "txChain";
717             String transactionID1 = "tx1";
718             String transactionID2 = "tx2";
719
720             FiniteDuration duration = duration("5 seconds");
721
722             // Send a BatchedModifications to start a chained write transaction and ready it.
723
724             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
725             YangInstanceIdentifier path = TestModel.TEST_PATH;
726             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
727                     containerNode, true), getRef());
728             expectMsgClass(duration, BatchedModificationsReply.class);
729
730             // Create a read Tx on the same chain.
731
732             shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
733                     transactionChainID).toSerializable(), getRef());
734
735             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
736
737             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
738             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
739             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
740
741             // Commit the write transaction.
742
743             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
744             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
745                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
746             assertEquals("Can commit", true, canCommitReply.getCanCommit());
747
748             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
749             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
750
751             // Verify data in the data store.
752
753             NormalizedNode<?, ?> actualNode = readStore(shard, path);
754             assertEquals("Stored node", containerNode, actualNode);
755
756             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
757         }};
758     }
759
760     @Test
761     public void testOnBatchedModificationsWhenNotLeader() {
762         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
763         new ShardTestKit(getSystem()) {{
764             Creator<Shard> creator = new Creator<Shard>() {
765                 @Override
766                 public Shard create() throws Exception {
767                     return new Shard(shardID, Collections.<String,String>emptyMap(),
768                             newDatastoreContext(), SCHEMA_CONTEXT) {
769                         @Override
770                         protected boolean isLeader() {
771                             return overrideLeaderCalls.get() ? false : super.isLeader();
772                         }
773
774                         @Override
775                         protected ActorSelection getLeader() {
776                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
777                                 super.getLeader();
778                         }
779                     };
780                 }
781             };
782
783             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
784                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
785
786             waitUntilLeader(shard);
787
788             overrideLeaderCalls.set(true);
789
790             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
791
792             shard.tell(batched, ActorRef.noSender());
793
794             expectMsgEquals(batched);
795
796             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
797         }};
798     }
799
800     @Test
801     public void testCommitWithPersistenceDisabled() throws Throwable {
802         dataStoreContextBuilder.persistent(false);
803         new ShardTestKit(getSystem()) {{
804             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
805                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
806                     "testCommitWithPersistenceDisabled");
807
808             waitUntilLeader(shard);
809
810             String transactionID = "tx";
811             FiniteDuration duration = duration("5 seconds");
812
813             // Send a BatchedModifications to start a transaction.
814
815             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
816             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
817             expectMsgClass(duration, BatchedModificationsReply.class);
818
819             // Send the CanCommitTransaction message.
820
821             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
822             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
823                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
824             assertEquals("Can commit", true, canCommitReply.getCanCommit());
825
826             // Send the CanCommitTransaction message.
827
828             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
829             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
830
831             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
832             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
833
834             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
835         }};
836     }
837
838     @Test
839     public void testCommitWhenTransactionHasNoModifications(){
840         // Note that persistence is enabled which would normally result in the entry getting written to the journal
841         // but here that need not happen
842         new ShardTestKit(getSystem()) {
843             {
844                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
845                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
846                         "testCommitWhenTransactionHasNoModifications");
847
848                 waitUntilLeader(shard);
849
850                 String transactionID = "tx1";
851                 MutableCompositeModification modification = new MutableCompositeModification();
852                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
853                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
854                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
855                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
856
857                 FiniteDuration duration = duration("5 seconds");
858
859                 // Simulate the ForwardedReadyTransaction messages that would be sent
860                 // by the ShardTransaction.
861
862                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
863                         cohort, modification, true), getRef());
864                 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
865
866                 // Send the CanCommitTransaction message.
867
868                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
869                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
870                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
871                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
872
873                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
874                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
875
876                 InOrder inOrder = inOrder(cohort);
877                 inOrder.verify(cohort).canCommit();
878                 inOrder.verify(cohort).preCommit();
879                 inOrder.verify(cohort).commit();
880
881                 // Use MBean for verification
882                 // Committed transaction count should increase as usual
883                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
884
885                 // Commit index should not advance because this does not go into the journal
886                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
887
888                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
889
890             }
891         };
892     }
893
894     @Test
895     public void testCommitWhenTransactionHasModifications(){
896         new ShardTestKit(getSystem()) {
897             {
898                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
899                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
900                         "testCommitWhenTransactionHasModifications");
901
902                 waitUntilLeader(shard);
903
904                 String transactionID = "tx1";
905                 MutableCompositeModification modification = new MutableCompositeModification();
906                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
907                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
908                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
909                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
910                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
911
912                 FiniteDuration duration = duration("5 seconds");
913
914                 // Simulate the ForwardedReadyTransaction messages that would be sent
915                 // by the ShardTransaction.
916
917                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
918                         cohort, modification, true), getRef());
919                 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
920
921                 // Send the CanCommitTransaction message.
922
923                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
924                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
925                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
926                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
927
928                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
929                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
930
931                 InOrder inOrder = inOrder(cohort);
932                 inOrder.verify(cohort).canCommit();
933                 inOrder.verify(cohort).preCommit();
934                 inOrder.verify(cohort).commit();
935
936                 // Use MBean for verification
937                 // Committed transaction count should increase as usual
938                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
939
940                 // Commit index should advance as we do not have an empty modification
941                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
942
943                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
944
945             }
946         };
947     }
948
949     @Test
950     public void testCommitPhaseFailure() throws Throwable {
951         new ShardTestKit(getSystem()) {{
952             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
953                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
954                     "testCommitPhaseFailure");
955
956             waitUntilLeader(shard);
957
958             // Setup 2 mock cohorts. The first one fails in the commit phase.
959
960             final String transactionID1 = "tx1";
961             final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
962             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
963             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
964             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
965
966             final String transactionID2 = "tx2";
967             final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
968             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
969
970             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
971                 @Override
972                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
973                         DOMStoreThreePhaseCommitCohort actual) {
974                     return transactionID1.equals(transactionID) ? cohort1 : cohort2;
975                 }
976             };
977
978             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
979
980             FiniteDuration duration = duration("5 seconds");
981             final Timeout timeout = new Timeout(duration);
982
983             // Send BatchedModifications to start and ready each transaction.
984
985             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
986                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
987             expectMsgClass(duration, BatchedModificationsReply.class);
988
989             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
990                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
991             expectMsgClass(duration, BatchedModificationsReply.class);
992
993             // Send the CanCommitTransaction message for the first Tx.
994
995             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
996             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
997                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
998             assertEquals("Can commit", true, canCommitReply.getCanCommit());
999
1000             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1001             // processed after the first Tx completes.
1002
1003             Future<Object> canCommitFuture = Patterns.ask(shard,
1004                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1005
1006             // Send the CommitTransaction message for the first Tx. This should send back an error
1007             // and trigger the 2nd Tx to proceed.
1008
1009             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1010             expectMsgClass(duration, akka.actor.Status.Failure.class);
1011
1012             // Wait for the 2nd Tx to complete the canCommit phase.
1013
1014             final CountDownLatch latch = new CountDownLatch(1);
1015             canCommitFuture.onComplete(new OnComplete<Object>() {
1016                 @Override
1017                 public void onComplete(final Throwable t, final Object resp) {
1018                     latch.countDown();
1019                 }
1020             }, getSystem().dispatcher());
1021
1022             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1023
1024             InOrder inOrder = inOrder(cohort1, cohort2);
1025             inOrder.verify(cohort1).canCommit();
1026             inOrder.verify(cohort1).preCommit();
1027             inOrder.verify(cohort1).commit();
1028             inOrder.verify(cohort2).canCommit();
1029
1030             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1031         }};
1032     }
1033
1034     @Test
1035     public void testPreCommitPhaseFailure() throws Throwable {
1036         new ShardTestKit(getSystem()) {{
1037             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1038                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1039                     "testPreCommitPhaseFailure");
1040
1041             waitUntilLeader(shard);
1042
1043             String transactionID = "tx1";
1044             final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1045             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1046             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1047
1048             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
1049                 @Override
1050                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
1051                         DOMStoreThreePhaseCommitCohort actual) {
1052                     return cohort;
1053                 }
1054             };
1055
1056             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
1057
1058             FiniteDuration duration = duration("5 seconds");
1059
1060             // Send BatchedModifications to start and ready a transaction.
1061
1062             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
1063                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1064             expectMsgClass(duration, BatchedModificationsReply.class);
1065
1066             // Send the CanCommitTransaction message.
1067
1068             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1069             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1070                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1071             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1072
1073             // Send the CommitTransaction message. This should send back an error
1074             // for preCommit failure.
1075
1076             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1077             expectMsgClass(duration, akka.actor.Status.Failure.class);
1078
1079             InOrder inOrder = inOrder(cohort);
1080             inOrder.verify(cohort).canCommit();
1081             inOrder.verify(cohort).preCommit();
1082
1083             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1084         }};
1085     }
1086
1087     @Test
1088     public void testCanCommitPhaseFailure() throws Throwable {
1089         new ShardTestKit(getSystem()) {{
1090             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1091                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1092                     "testCanCommitPhaseFailure");
1093
1094             waitUntilLeader(shard);
1095
1096             final FiniteDuration duration = duration("5 seconds");
1097
1098             String transactionID = "tx1";
1099             final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1100             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1101
1102             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
1103                 @Override
1104                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
1105                         DOMStoreThreePhaseCommitCohort actual) {
1106                     return cohort;
1107                 }
1108             };
1109
1110             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
1111
1112             // Send BatchedModifications to start and ready a transaction.
1113
1114             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
1115                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1116             expectMsgClass(duration, BatchedModificationsReply.class);
1117
1118             // Send the CanCommitTransaction message.
1119
1120             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1121             expectMsgClass(duration, akka.actor.Status.Failure.class);
1122
1123             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1124         }};
1125     }
1126
1127     @Test
1128     public void testAbortBeforeFinishCommit() throws Throwable {
1129         new ShardTestKit(getSystem()) {{
1130             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1131                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1132                     "testAbortBeforeFinishCommit");
1133
1134             waitUntilLeader(shard);
1135
1136             final FiniteDuration duration = duration("5 seconds");
1137             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1138
1139             final String transactionID = "tx1";
1140             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1141                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1142                 @Override
1143                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1144                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1145
1146                     // Simulate an AbortTransaction message occurring during replication, after
1147                     // persisting and before finishing the commit to the in-memory store.
1148                     // We have no followers so due to optimizations in the RaftActor, it does not
1149                     // attempt replication and thus we can't send an AbortTransaction message b/c
1150                     // it would be processed too late after CommitTransaction completes. So we'll
1151                     // simulate an AbortTransaction message occurring during replication by calling
1152                     // the shard directly.
1153                     //
1154                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1155
1156                     return preCommitFuture;
1157                 }
1158             };
1159
1160             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
1161                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1162             expectMsgClass(duration, BatchedModificationsReply.class);
1163
1164             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1165             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1166                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1167             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1168
1169             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1170             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1171
1172             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1173
1174             // Since we're simulating an abort occurring during replication and before finish commit,
1175             // the data should still get written to the in-memory store since we've gotten past
1176             // canCommit and preCommit and persisted the data.
1177             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1178
1179             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1180         }};
1181     }
1182
1183     @Test
1184     public void testTransactionCommitTimeout() throws Throwable {
1185         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1186
1187         new ShardTestKit(getSystem()) {{
1188             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1189                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1190                     "testTransactionCommitTimeout");
1191
1192             waitUntilLeader(shard);
1193
1194             final FiniteDuration duration = duration("5 seconds");
1195
1196             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1197             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1198                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1199
1200             // Create and ready the 1st Tx - will timeout
1201
1202             String transactionID1 = "tx1";
1203             shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
1204                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1205                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
1206             expectMsgClass(duration, BatchedModificationsReply.class);
1207
1208             // Create and ready the 2nd Tx
1209
1210             String transactionID2 = "tx2";
1211             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1212                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1213             shard.tell(newBatchedModifications(transactionID2, listNodePath,
1214                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
1215             expectMsgClass(duration, BatchedModificationsReply.class);
1216
1217             // canCommit 1st Tx. We don't send the commit so it should timeout.
1218
1219             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1220             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1221
1222             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1223
1224             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1225             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1226
1227             // Commit the 2nd Tx.
1228
1229             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1230             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1231
1232             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1233             assertNotNull(listNodePath + " not found", node);
1234
1235             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1236         }};
1237     }
1238
1239     @Test
1240     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1241         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1242
1243         new ShardTestKit(getSystem()) {{
1244             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1245                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1246                     "testTransactionCommitQueueCapacityExceeded");
1247
1248             waitUntilLeader(shard);
1249
1250             final FiniteDuration duration = duration("5 seconds");
1251
1252             String transactionID1 = "tx1";
1253             String transactionID2 = "tx2";
1254             String transactionID3 = "tx3";
1255
1256             // Send a BatchedModifications to start transactions and ready them.
1257
1258             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1259                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1260             expectMsgClass(duration, BatchedModificationsReply.class);
1261
1262             shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
1263                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
1264             expectMsgClass(duration, BatchedModificationsReply.class);
1265
1266             shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1267                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1268             expectMsgClass(duration, BatchedModificationsReply.class);
1269
1270             // canCommit 1st Tx.
1271
1272             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1273             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1274
1275             // canCommit the 2nd Tx - it should get queued.
1276
1277             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1278
1279             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1280
1281             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1282             expectMsgClass(duration, akka.actor.Status.Failure.class);
1283
1284             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1285         }};
1286     }
1287
1288     @Test
1289     public void testCanCommitBeforeReadyFailure() throws Throwable {
1290         new ShardTestKit(getSystem()) {{
1291             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1292                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1293                     "testCanCommitBeforeReadyFailure");
1294
1295             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1296             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1297
1298             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1299         }};
1300     }
1301
1302     @Test
1303     public void testAbortTransaction() throws Throwable {
1304         new ShardTestKit(getSystem()) {{
1305             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1306                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1307                     "testAbortTransaction");
1308
1309             waitUntilLeader(shard);
1310
1311             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1312
1313             final String transactionID1 = "tx1";
1314             final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1315             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1316             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1317
1318             final String transactionID2 = "tx2";
1319             final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1320             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1321
1322             FiniteDuration duration = duration("5 seconds");
1323             final Timeout timeout = new Timeout(duration);
1324
1325             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
1326                 @Override
1327                 public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
1328                         DOMStoreThreePhaseCommitCohort actual) {
1329                     return transactionID1.equals(transactionID) ? cohort1 : cohort2;
1330                 }
1331             };
1332
1333             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
1334
1335             // Send BatchedModifications to start and ready each transaction.
1336
1337             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1338                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1339             expectMsgClass(duration, BatchedModificationsReply.class);
1340
1341             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1342                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1343             expectMsgClass(duration, BatchedModificationsReply.class);
1344
1345             // Send the CanCommitTransaction message for the first Tx.
1346
1347             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1348             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1349                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1350             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1351
1352             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1353             // processed after the first Tx completes.
1354
1355             Future<Object> canCommitFuture = Patterns.ask(shard,
1356                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1357
1358             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1359             // Tx to proceed.
1360
1361             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1362             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1363
1364             // Wait for the 2nd Tx to complete the canCommit phase.
1365
1366             Await.ready(canCommitFuture, duration);
1367
1368             InOrder inOrder = inOrder(cohort1, cohort2);
1369             inOrder.verify(cohort1).canCommit();
1370             inOrder.verify(cohort2).canCommit();
1371
1372             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1373         }};
1374     }
1375
1376     @Test
1377     public void testCreateSnapshot() throws Exception {
1378         testCreateSnapshot(true, "testCreateSnapshot");
1379     }
1380
1381     @Test
1382     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1383         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1384     }
1385
1386     @SuppressWarnings("serial")
1387     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1388
1389         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1390         class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1391             DataPersistenceProvider delegate;
1392
1393             DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1394                 this.delegate = delegate;
1395             }
1396
1397             @Override
1398             public boolean isRecoveryApplicable() {
1399                 return delegate.isRecoveryApplicable();
1400             }
1401
1402             @Override
1403             public <T> void persist(T o, Procedure<T> procedure) {
1404                 delegate.persist(o, procedure);
1405             }
1406
1407             @Override
1408             public void saveSnapshot(Object o) {
1409                 savedSnapshot.set(o);
1410                 delegate.saveSnapshot(o);
1411             }
1412
1413             @Override
1414             public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1415                 delegate.deleteSnapshots(criteria);
1416             }
1417
1418             @Override
1419             public void deleteMessages(long sequenceNumber) {
1420                 delegate.deleteMessages(sequenceNumber);
1421             }
1422         }
1423
1424         dataStoreContextBuilder.persistent(persistent);
1425
1426
1427
1428         new ShardTestKit(getSystem()) {{
1429             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1430
1431             class TestShard extends Shard {
1432
1433                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
1434                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
1435                     super(name, peerAddresses, datastoreContext, schemaContext);
1436                 }
1437
1438                 DelegatingPersistentDataProvider delegating;
1439
1440                 protected DataPersistenceProvider persistence() {
1441                     if(delegating == null) {
1442                         delegating = new DelegatingPersistentDataProvider(super.persistence());
1443                     }
1444                     return delegating;
1445                 }
1446
1447                 @Override
1448                 protected void commitSnapshot(final long sequenceNumber) {
1449                     super.commitSnapshot(sequenceNumber);
1450                     latch.get().countDown();
1451                 }
1452
1453                 @Override
1454                 public RaftActorContext getRaftActorContext() {
1455                     return super.getRaftActorContext();
1456                 }
1457             }
1458
1459             Creator<Shard> creator = new Creator<Shard>() {
1460                 @Override
1461                 public Shard create() throws Exception {
1462                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
1463                             newDatastoreContext(), SCHEMA_CONTEXT);
1464                 }
1465             };
1466
1467             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1468                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1469
1470             waitUntilLeader(shard);
1471
1472             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1473
1474             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1475
1476             // Trigger creation of a snapshot by ensuring
1477             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1478             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1479
1480             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1481
1482             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1483                     savedSnapshot.get() instanceof Snapshot);
1484
1485             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1486
1487             latch.set(new CountDownLatch(1));
1488             savedSnapshot.set(null);
1489
1490             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1491
1492             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1493
1494             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1495                     savedSnapshot.get() instanceof Snapshot);
1496
1497             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1498
1499             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1500         }
1501
1502         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1503
1504             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1505             assertEquals("Root node", expectedRoot, actual);
1506
1507         }};
1508     }
1509
1510     /**
1511      * This test simply verifies that the applySnapShot logic will work
1512      * @throws ReadFailedException
1513      */
1514     @Test
1515     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1516         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1517
1518         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1519
1520         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1521         putTransaction.write(TestModel.TEST_PATH,
1522             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1523         commitTransaction(putTransaction);
1524
1525
1526         NormalizedNode<?, ?> expected = readStore(store);
1527
1528         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1529
1530         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1531         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1532
1533         commitTransaction(writeTransaction);
1534
1535         NormalizedNode<?, ?> actual = readStore(store);
1536
1537         assertEquals(expected, actual);
1538     }
1539
1540     @Test
1541     public void testRecoveryApplicable(){
1542
1543         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1544                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1545
1546         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1547                 persistentContext, SCHEMA_CONTEXT);
1548
1549         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1550                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1551
1552         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1553                 nonPersistentContext, SCHEMA_CONTEXT);
1554
1555         new ShardTestKit(getSystem()) {{
1556             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1557                     persistentProps, "testPersistence1");
1558
1559             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1560
1561             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1562
1563             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1564                     nonPersistentProps, "testPersistence2");
1565
1566             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1567
1568             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1569
1570         }};
1571
1572     }
1573
1574     @Test
1575     public void testOnDatastoreContext() {
1576         new ShardTestKit(getSystem()) {{
1577             dataStoreContextBuilder.persistent(true);
1578
1579             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1580
1581             assertEquals("isRecoveryApplicable", true,
1582                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1583
1584             waitUntilLeader(shard);
1585
1586             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1587
1588             assertEquals("isRecoveryApplicable", false,
1589                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1590
1591             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1592
1593             assertEquals("isRecoveryApplicable", true,
1594                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1595
1596             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1597         }};
1598     }
1599
1600     @Test
1601     public void testRegisterRoleChangeListener() throws Exception {
1602         new ShardTestKit(getSystem()) {
1603             {
1604                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1605                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1606                         "testRegisterRoleChangeListener");
1607
1608                 waitUntilLeader(shard);
1609
1610                 TestActorRef<MessageCollectorActor> listener =
1611                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1612
1613                 shard.tell(new RegisterRoleChangeListener(), listener);
1614
1615                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1616                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1617                 // sleep.
1618                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1619
1620                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1621
1622                 assertEquals(1, allMatching.size());
1623             }
1624         };
1625     }
1626
1627     @Test
1628     public void testFollowerInitialSyncStatus() throws Exception {
1629         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1630                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1631                 "testFollowerInitialSyncStatus");
1632
1633         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1634
1635         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1636
1637         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1638
1639         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1640
1641         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1642     }
1643
1644     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1645         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1646         ListenableFuture<Void> future =
1647             commitCohort.preCommit();
1648         try {
1649             future.get();
1650             future = commitCohort.commit();
1651             future.get();
1652         } catch (InterruptedException | ExecutionException e) {
1653         }
1654     }
1655 }