Merge "Refactor ShardTest"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.dispatch.Dispatchers;
16 import akka.dispatch.OnComplete;
17 import akka.japi.Creator;
18 import akka.japi.Procedure;
19 import akka.pattern.Patterns;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.io.IOException;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
43 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
52 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
54 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
55 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
56 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
57 import org.opendaylight.controller.cluster.datastore.modification.Modification;
58 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
59 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
60 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
61 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
62 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
63 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
64 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
65 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
66 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
67 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
68 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
70 import org.opendaylight.controller.cluster.raft.Snapshot;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
74 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
75 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
76 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
78 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
79 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
80 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
81 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
82 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
83 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
84 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
85 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
86 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
87 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
88 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
89 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
90 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
91 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
92 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
93 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
94 import scala.concurrent.Await;
95 import scala.concurrent.Future;
96 import scala.concurrent.duration.FiniteDuration;
97
98 public class ShardTest extends AbstractShardTest {
99     @Test
100     public void testRegisterChangeListener() throws Exception {
101         new ShardTestKit(getSystem()) {{
102             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
103                     newShardProps(),  "testRegisterChangeListener");
104
105             waitUntilLeader(shard);
106
107             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
108
109             MockDataChangeListener listener = new MockDataChangeListener(1);
110             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
111                     "testRegisterChangeListener-DataChangeListener");
112
113             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
114                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
115
116             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
117                     RegisterChangeListenerReply.class);
118             String replyPath = reply.getListenerRegistrationPath().toString();
119             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
120                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
121
122             YangInstanceIdentifier path = TestModel.TEST_PATH;
123             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
124
125             listener.waitForChangeEvents(path);
126
127             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
128             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
129         }};
130     }
131
132     @SuppressWarnings("serial")
133     @Test
134     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
135         // This test tests the timing window in which a change listener is registered before the
136         // shard becomes the leader. We verify that the listener is registered and notified of the
137         // existing data when the shard becomes the leader.
138         new ShardTestKit(getSystem()) {{
139             // For this test, we want to send the RegisterChangeListener message after the shard
140             // has recovered from persistence and before it becomes the leader. So we subclass
141             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
142             // we know that the shard has been initialized to a follower and has started the
143             // election process. The following 2 CountDownLatches are used to coordinate the
144             // ElectionTimeout with the sending of the RegisterChangeListener message.
145             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
146             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
147             Creator<Shard> creator = new Creator<Shard>() {
148                 boolean firstElectionTimeout = true;
149
150                 @Override
151                 public Shard create() throws Exception {
152                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
153                             newDatastoreContext(), SCHEMA_CONTEXT) {
154                         @Override
155                         public void onReceiveCommand(final Object message) throws Exception {
156                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
157                                 // Got the first ElectionTimeout. We don't forward it to the
158                                 // base Shard yet until we've sent the RegisterChangeListener
159                                 // message. So we signal the onFirstElectionTimeout latch to tell
160                                 // the main thread to send the RegisterChangeListener message and
161                                 // start a thread to wait on the onChangeListenerRegistered latch,
162                                 // which the main thread signals after it has sent the message.
163                                 // After the onChangeListenerRegistered is triggered, we send the
164                                 // original ElectionTimeout message to proceed with the election.
165                                 firstElectionTimeout = false;
166                                 final ActorRef self = getSelf();
167                                 new Thread() {
168                                     @Override
169                                     public void run() {
170                                         Uninterruptibles.awaitUninterruptibly(
171                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
172                                         self.tell(message, self);
173                                     }
174                                 }.start();
175
176                                 onFirstElectionTimeout.countDown();
177                             } else {
178                                 super.onReceiveCommand(message);
179                             }
180                         }
181                     };
182                 }
183             };
184
185             MockDataChangeListener listener = new MockDataChangeListener(1);
186             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
187                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
188
189             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
190                     Props.create(new DelegatingShardCreator(creator)),
191                     "testRegisterChangeListenerWhenNotLeaderInitially");
192
193             // Write initial data into the in-memory store.
194             YangInstanceIdentifier path = TestModel.TEST_PATH;
195             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
196
197             // Wait until the shard receives the first ElectionTimeout message.
198             assertEquals("Got first ElectionTimeout", true,
199                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
200
201             // Now send the RegisterChangeListener and wait for the reply.
202             shard.tell(new RegisterChangeListener(path, dclActor.path(),
203                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
204
205             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
206                     RegisterChangeListenerReply.class);
207             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
208
209             // Sanity check - verify the shard is not the leader yet.
210             shard.tell(new FindLeader(), getRef());
211             FindLeaderReply findLeadeReply =
212                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
213             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
214
215             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
216             // with the election process.
217             onChangeListenerRegistered.countDown();
218
219             // Wait for the shard to become the leader and notify our listener with the existing
220             // data in the store.
221             listener.waitForChangeEvents(path);
222
223             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
224             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
225         }};
226     }
227
228     @Test
229     public void testCreateTransaction(){
230         new ShardTestKit(getSystem()) {{
231             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
232
233             waitUntilLeader(shard);
234
235             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
236
237             shard.tell(new CreateTransaction("txn-1",
238                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
239
240             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
241                     CreateTransactionReply.class);
242
243             String path = reply.getTransactionActorPath().toString();
244             assertTrue("Unexpected transaction path " + path,
245                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
246
247             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
248         }};
249     }
250
251     @Test
252     public void testCreateTransactionOnChain(){
253         new ShardTestKit(getSystem()) {{
254             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
255
256             waitUntilLeader(shard);
257
258             shard.tell(new CreateTransaction("txn-1",
259                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
260                     getRef());
261
262             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
263                     CreateTransactionReply.class);
264
265             String path = reply.getTransactionActorPath().toString();
266             assertTrue("Unexpected transaction path " + path,
267                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
268
269             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
270         }};
271     }
272
273     @SuppressWarnings("serial")
274     @Test
275     public void testPeerAddressResolved() throws Exception {
276         new ShardTestKit(getSystem()) {{
277             final CountDownLatch recoveryComplete = new CountDownLatch(1);
278             class TestShard extends Shard {
279                 TestShard() {
280                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
281                             newDatastoreContext(), SCHEMA_CONTEXT);
282                 }
283
284                 Map<String, String> getPeerAddresses() {
285                     return getRaftActorContext().getPeerAddresses();
286                 }
287
288                 @Override
289                 protected void onRecoveryComplete() {
290                     try {
291                         super.onRecoveryComplete();
292                     } finally {
293                         recoveryComplete.countDown();
294                     }
295                 }
296             }
297
298             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
299                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
300                         @Override
301                         public TestShard create() throws Exception {
302                             return new TestShard();
303                         }
304                     })), "testPeerAddressResolved");
305
306             //waitUntilLeader(shard);
307             assertEquals("Recovery complete", true,
308                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
309
310             String address = "akka://foobar";
311             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
312
313             assertEquals("getPeerAddresses", address,
314                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
315
316             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
317         }};
318     }
319
320     @Test
321     public void testApplySnapshot() throws Exception {
322         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
323                 "testApplySnapshot");
324
325         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
326         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
327
328         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
329
330         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
331         NormalizedNode<?,?> expected = readStore(store, root);
332
333         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
334                 SerializationUtils.serializeNormalizedNode(expected),
335                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
336
337         shard.underlyingActor().onReceiveCommand(applySnapshot);
338
339         NormalizedNode<?,?> actual = readStore(shard, root);
340
341         assertEquals("Root node", expected, actual);
342
343         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
344     }
345
346     @Test
347     public void testApplyState() throws Exception {
348
349         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
350
351         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
352
353         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
354                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
355
356         shard.underlyingActor().onReceiveCommand(applyState);
357
358         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
359         assertEquals("Applied state", node, actual);
360
361         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
362     }
363
364     @Test
365     public void testRecovery() throws Exception {
366
367         // Set up the InMemorySnapshotStore.
368
369         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
370         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
371
372         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
373
374         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
375
376         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
377                 SerializationUtils.serializeNormalizedNode(root),
378                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
379
380         // Set up the InMemoryJournal.
381
382         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
383                   new WriteModification(TestModel.OUTER_LIST_PATH,
384                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
385
386         int nListEntries = 16;
387         Set<Integer> listEntryKeys = new HashSet<>();
388
389         // Add some ModificationPayload entries
390         for(int i = 1; i <= nListEntries; i++) {
391             listEntryKeys.add(Integer.valueOf(i));
392             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
393                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
394             Modification mod = new MergeModification(path,
395                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
396             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
397                     newModificationPayload(mod)));
398         }
399
400         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
401                 new ApplyJournalEntries(nListEntries));
402
403         testRecovery(listEntryKeys);
404     }
405
406     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
407         MutableCompositeModification compMod = new MutableCompositeModification();
408         for(Modification mod: mods) {
409             compMod.addModification(mod);
410         }
411
412         return new ModificationPayload(compMod);
413     }
414
415     @SuppressWarnings({ "unchecked" })
416     @Test
417     public void testConcurrentThreePhaseCommits() throws Throwable {
418         new ShardTestKit(getSystem()) {{
419             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
420                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
421                     "testConcurrentThreePhaseCommits");
422
423             waitUntilLeader(shard);
424
425             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
426
427             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
428
429             String transactionID1 = "tx1";
430             MutableCompositeModification modification1 = new MutableCompositeModification();
431             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
432                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
433
434             String transactionID2 = "tx2";
435             MutableCompositeModification modification2 = new MutableCompositeModification();
436             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
437                     TestModel.OUTER_LIST_PATH,
438                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
439                     modification2);
440
441             String transactionID3 = "tx3";
442             MutableCompositeModification modification3 = new MutableCompositeModification();
443             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
444                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
445                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
446                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
447                     modification3);
448
449             long timeoutSec = 5;
450             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
451             final Timeout timeout = new Timeout(duration);
452
453             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
454             // by the ShardTransaction.
455
456             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
457                     cohort1, modification1, true), getRef());
458             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
459                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
460             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
461
462             // Send the CanCommitTransaction message for the first Tx.
463
464             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
465             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
466                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
467             assertEquals("Can commit", true, canCommitReply.getCanCommit());
468
469             // Send the ForwardedReadyTransaction for the next 2 Tx's.
470
471             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
472                     cohort2, modification2, true), getRef());
473             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
474
475             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
476                     cohort3, modification3, true), getRef());
477             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
478
479             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
480             // processed after the first Tx completes.
481
482             Future<Object> canCommitFuture1 = Patterns.ask(shard,
483                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
484
485             Future<Object> canCommitFuture2 = Patterns.ask(shard,
486                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
487
488             // Send the CommitTransaction message for the first Tx. After it completes, it should
489             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
490
491             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
492             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
493
494             // Wait for the next 2 Tx's to complete.
495
496             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
497             final CountDownLatch commitLatch = new CountDownLatch(2);
498
499             class OnFutureComplete extends OnComplete<Object> {
500                 private final Class<?> expRespType;
501
502                 OnFutureComplete(final Class<?> expRespType) {
503                     this.expRespType = expRespType;
504                 }
505
506                 @Override
507                 public void onComplete(final Throwable error, final Object resp) {
508                     if(error != null) {
509                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
510                     } else {
511                         try {
512                             assertEquals("Commit response type", expRespType, resp.getClass());
513                             onSuccess(resp);
514                         } catch (Exception e) {
515                             caughtEx.set(e);
516                         }
517                     }
518                 }
519
520                 void onSuccess(final Object resp) throws Exception {
521                 }
522             }
523
524             class OnCommitFutureComplete extends OnFutureComplete {
525                 OnCommitFutureComplete() {
526                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
527                 }
528
529                 @Override
530                 public void onComplete(final Throwable error, final Object resp) {
531                     super.onComplete(error, resp);
532                     commitLatch.countDown();
533                 }
534             }
535
536             class OnCanCommitFutureComplete extends OnFutureComplete {
537                 private final String transactionID;
538
539                 OnCanCommitFutureComplete(final String transactionID) {
540                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
541                     this.transactionID = transactionID;
542                 }
543
544                 @Override
545                 void onSuccess(final Object resp) throws Exception {
546                     CanCommitTransactionReply canCommitReply =
547                             CanCommitTransactionReply.fromSerializable(resp);
548                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
549
550                     Future<Object> commitFuture = Patterns.ask(shard,
551                             new CommitTransaction(transactionID).toSerializable(), timeout);
552                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
553                 }
554             }
555
556             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
557                     getSystem().dispatcher());
558
559             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
560                     getSystem().dispatcher());
561
562             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
563
564             if(caughtEx.get() != null) {
565                 throw caughtEx.get();
566             }
567
568             assertEquals("Commits complete", true, done);
569
570             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
571             inOrder.verify(cohort1).canCommit();
572             inOrder.verify(cohort1).preCommit();
573             inOrder.verify(cohort1).commit();
574             inOrder.verify(cohort2).canCommit();
575             inOrder.verify(cohort2).preCommit();
576             inOrder.verify(cohort2).commit();
577             inOrder.verify(cohort3).canCommit();
578             inOrder.verify(cohort3).preCommit();
579             inOrder.verify(cohort3).commit();
580
581             // Verify data in the data store.
582
583             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
584             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
585             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
586                     outerList.getValue() instanceof Iterable);
587             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
588             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
589                        entry instanceof MapEntryNode);
590             MapEntryNode mapEntry = (MapEntryNode)entry;
591             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
592                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
593             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
594             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
595
596             verifyLastLogIndex(shard, 2);
597
598             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
599         }};
600     }
601
602     @Test
603     public void testCommitWithPersistenceDisabled() throws Throwable {
604         dataStoreContextBuilder.persistent(false);
605         new ShardTestKit(getSystem()) {{
606             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
607                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
608                     "testCommitPhaseFailure");
609
610             waitUntilLeader(shard);
611
612             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
613
614             // Setup a simulated transactions with a mock cohort.
615
616             String transactionID = "tx";
617             MutableCompositeModification modification = new MutableCompositeModification();
618             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
619             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
620                     TestModel.TEST_PATH, containerNode, modification);
621
622             FiniteDuration duration = duration("5 seconds");
623
624             // Simulate the ForwardedReadyTransaction messages that would be sent
625             // by the ShardTransaction.
626
627             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
628                     cohort, modification, true), getRef());
629             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
630
631             // Send the CanCommitTransaction message.
632
633             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
634             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
635                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
636             assertEquals("Can commit", true, canCommitReply.getCanCommit());
637
638             // Send the CanCommitTransaction message.
639
640             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
641             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
642
643             InOrder inOrder = inOrder(cohort);
644             inOrder.verify(cohort).canCommit();
645             inOrder.verify(cohort).preCommit();
646             inOrder.verify(cohort).commit();
647
648             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
649             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
650
651             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
652         }};
653     }
654
655     @Test
656     public void testCommitPhaseFailure() throws Throwable {
657         new ShardTestKit(getSystem()) {{
658             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
659                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
660                     "testCommitPhaseFailure");
661
662             waitUntilLeader(shard);
663
664             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
665             // commit phase.
666
667             String transactionID1 = "tx1";
668             MutableCompositeModification modification1 = new MutableCompositeModification();
669             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
670             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
671             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
672             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
673
674             String transactionID2 = "tx2";
675             MutableCompositeModification modification2 = new MutableCompositeModification();
676             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
677             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
678
679             FiniteDuration duration = duration("5 seconds");
680             final Timeout timeout = new Timeout(duration);
681
682             // Simulate the ForwardedReadyTransaction messages that would be sent
683             // by the ShardTransaction.
684
685             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
686                     cohort1, modification1, true), getRef());
687             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
688
689             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
690                     cohort2, modification2, true), getRef());
691             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
692
693             // Send the CanCommitTransaction message for the first Tx.
694
695             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
696             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
697                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
698             assertEquals("Can commit", true, canCommitReply.getCanCommit());
699
700             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
701             // processed after the first Tx completes.
702
703             Future<Object> canCommitFuture = Patterns.ask(shard,
704                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
705
706             // Send the CommitTransaction message for the first Tx. This should send back an error
707             // and trigger the 2nd Tx to proceed.
708
709             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
710             expectMsgClass(duration, akka.actor.Status.Failure.class);
711
712             // Wait for the 2nd Tx to complete the canCommit phase.
713
714             final CountDownLatch latch = new CountDownLatch(1);
715             canCommitFuture.onComplete(new OnComplete<Object>() {
716                 @Override
717                 public void onComplete(final Throwable t, final Object resp) {
718                     latch.countDown();
719                 }
720             }, getSystem().dispatcher());
721
722             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
723
724             InOrder inOrder = inOrder(cohort1, cohort2);
725             inOrder.verify(cohort1).canCommit();
726             inOrder.verify(cohort1).preCommit();
727             inOrder.verify(cohort1).commit();
728             inOrder.verify(cohort2).canCommit();
729
730             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
731         }};
732     }
733
734     @Test
735     public void testPreCommitPhaseFailure() throws Throwable {
736         new ShardTestKit(getSystem()) {{
737             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
738                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
739                     "testPreCommitPhaseFailure");
740
741             waitUntilLeader(shard);
742
743             String transactionID = "tx1";
744             MutableCompositeModification modification = new MutableCompositeModification();
745             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
746             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
747             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
748
749             FiniteDuration duration = duration("5 seconds");
750
751             // Simulate the ForwardedReadyTransaction messages that would be sent
752             // by the ShardTransaction.
753
754             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
755                     cohort, modification, true), getRef());
756             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
757
758             // Send the CanCommitTransaction message.
759
760             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
761             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
762                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
763             assertEquals("Can commit", true, canCommitReply.getCanCommit());
764
765             // Send the CommitTransaction message. This should send back an error
766             // for preCommit failure.
767
768             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
769             expectMsgClass(duration, akka.actor.Status.Failure.class);
770
771             InOrder inOrder = inOrder(cohort);
772             inOrder.verify(cohort).canCommit();
773             inOrder.verify(cohort).preCommit();
774
775             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
776         }};
777     }
778
779     @Test
780     public void testCanCommitPhaseFailure() throws Throwable {
781         new ShardTestKit(getSystem()) {{
782             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
783                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
784                     "testCanCommitPhaseFailure");
785
786             waitUntilLeader(shard);
787
788             final FiniteDuration duration = duration("5 seconds");
789
790             String transactionID = "tx1";
791             MutableCompositeModification modification = new MutableCompositeModification();
792             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
793             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
794
795             // Simulate the ForwardedReadyTransaction messages that would be sent
796             // by the ShardTransaction.
797
798             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
799                     cohort, modification, true), getRef());
800             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
801
802             // Send the CanCommitTransaction message.
803
804             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
805             expectMsgClass(duration, akka.actor.Status.Failure.class);
806
807             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
808         }};
809     }
810
811     @Test
812     public void testAbortBeforeFinishCommit() throws Throwable {
813         new ShardTestKit(getSystem()) {{
814             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
815                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
816                     "testAbortBeforeFinishCommit");
817
818             waitUntilLeader(shard);
819
820             final FiniteDuration duration = duration("5 seconds");
821             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
822
823             final String transactionID = "tx1";
824             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
825                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
826                 @Override
827                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
828                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
829
830                     // Simulate an AbortTransaction message occurring during replication, after
831                     // persisting and before finishing the commit to the in-memory store.
832                     // We have no followers so due to optimizations in the RaftActor, it does not
833                     // attempt replication and thus we can't send an AbortTransaction message b/c
834                     // it would be processed too late after CommitTransaction completes. So we'll
835                     // simulate an AbortTransaction message occurring during replication by calling
836                     // the shard directly.
837                     //
838                     shard.underlyingActor().doAbortTransaction(transactionID, null);
839
840                     return preCommitFuture;
841                 }
842             };
843
844             MutableCompositeModification modification = new MutableCompositeModification();
845             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
846                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
847                     modification, preCommit);
848
849             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
850                     cohort, modification, true), getRef());
851             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
852
853             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
854             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
855                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
856             assertEquals("Can commit", true, canCommitReply.getCanCommit());
857
858             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
859             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
860
861             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
862
863             // Since we're simulating an abort occurring during replication and before finish commit,
864             // the data should still get written to the in-memory store since we've gotten past
865             // canCommit and preCommit and persisted the data.
866             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
867
868             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
869         }};
870     }
871
872     @Test
873     public void testTransactionCommitTimeout() throws Throwable {
874         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
875
876         new ShardTestKit(getSystem()) {{
877             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
878                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
879                     "testTransactionCommitTimeout");
880
881             waitUntilLeader(shard);
882
883             final FiniteDuration duration = duration("5 seconds");
884
885             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
886
887             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
888             writeToStore(shard, TestModel.OUTER_LIST_PATH,
889                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
890
891             // Create 1st Tx - will timeout
892
893             String transactionID1 = "tx1";
894             MutableCompositeModification modification1 = new MutableCompositeModification();
895             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
896                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
897                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
898                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
899                     modification1);
900
901             // Create 2nd Tx
902
903             String transactionID2 = "tx3";
904             MutableCompositeModification modification2 = new MutableCompositeModification();
905             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
906                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
907             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
908                     listNodePath,
909                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
910                     modification2);
911
912             // Ready the Tx's
913
914             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
915                     cohort1, modification1, true), getRef());
916             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
917
918             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
919                     cohort2, modification2, true), getRef());
920             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
921
922             // canCommit 1st Tx. We don't send the commit so it should timeout.
923
924             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
925             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
926
927             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
928
929             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
930             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
931
932             // Commit the 2nd Tx.
933
934             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
935             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
936
937             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
938             assertNotNull(listNodePath + " not found", node);
939
940             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
941         }};
942     }
943
944     @Test
945     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
946         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
947
948         new ShardTestKit(getSystem()) {{
949             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
950                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
951                     "testTransactionCommitQueueCapacityExceeded");
952
953             waitUntilLeader(shard);
954
955             final FiniteDuration duration = duration("5 seconds");
956
957             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
958
959             String transactionID1 = "tx1";
960             MutableCompositeModification modification1 = new MutableCompositeModification();
961             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
962                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
963
964             String transactionID2 = "tx2";
965             MutableCompositeModification modification2 = new MutableCompositeModification();
966             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
967                     TestModel.OUTER_LIST_PATH,
968                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
969                     modification2);
970
971             String transactionID3 = "tx3";
972             MutableCompositeModification modification3 = new MutableCompositeModification();
973             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
974                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
975
976             // Ready the Tx's
977
978             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
979                     cohort1, modification1, true), getRef());
980             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
981
982             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
983                     cohort2, modification2, true), getRef());
984             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
985
986             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
987                     cohort3, modification3, true), getRef());
988             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
989
990             // canCommit 1st Tx.
991
992             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
993             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
994
995             // canCommit the 2nd Tx - it should get queued.
996
997             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
998
999             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1000
1001             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1002             expectMsgClass(duration, akka.actor.Status.Failure.class);
1003
1004             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1005         }};
1006     }
1007
1008     @Test
1009     public void testCanCommitBeforeReadyFailure() throws Throwable {
1010         new ShardTestKit(getSystem()) {{
1011             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1012                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1013                     "testCanCommitBeforeReadyFailure");
1014
1015             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1016             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1017
1018             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1019         }};
1020     }
1021
1022     @Test
1023     public void testAbortTransaction() throws Throwable {
1024         new ShardTestKit(getSystem()) {{
1025             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1026                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1027                     "testAbortTransaction");
1028
1029             waitUntilLeader(shard);
1030
1031             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1032
1033             String transactionID1 = "tx1";
1034             MutableCompositeModification modification1 = new MutableCompositeModification();
1035             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1036             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1037             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1038
1039             String transactionID2 = "tx2";
1040             MutableCompositeModification modification2 = new MutableCompositeModification();
1041             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1042             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1043
1044             FiniteDuration duration = duration("5 seconds");
1045             final Timeout timeout = new Timeout(duration);
1046
1047             // Simulate the ForwardedReadyTransaction messages that would be sent
1048             // by the ShardTransaction.
1049
1050             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1051                     cohort1, modification1, true), getRef());
1052             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1053
1054             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1055                     cohort2, modification2, true), getRef());
1056             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1057
1058             // Send the CanCommitTransaction message for the first Tx.
1059
1060             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1061             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1062                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1063             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1064
1065             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1066             // processed after the first Tx completes.
1067
1068             Future<Object> canCommitFuture = Patterns.ask(shard,
1069                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1070
1071             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1072             // Tx to proceed.
1073
1074             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1075             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1076
1077             // Wait for the 2nd Tx to complete the canCommit phase.
1078
1079             Await.ready(canCommitFuture, duration);
1080
1081             InOrder inOrder = inOrder(cohort1, cohort2);
1082             inOrder.verify(cohort1).canCommit();
1083             inOrder.verify(cohort2).canCommit();
1084
1085             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1086         }};
1087     }
1088
1089     @Test
1090     public void testCreateSnapshot() throws Exception {
1091         testCreateSnapshot(true, "testCreateSnapshot");
1092     }
1093
1094     @Test
1095     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1096         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1097     }
1098
1099     @SuppressWarnings("serial")
1100     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1101
1102         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1103         class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1104             DataPersistenceProvider delegate;
1105
1106             DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1107                 this.delegate = delegate;
1108             }
1109
1110             @Override
1111             public boolean isRecoveryApplicable() {
1112                 return delegate.isRecoveryApplicable();
1113             }
1114
1115             @Override
1116             public <T> void persist(T o, Procedure<T> procedure) {
1117                 delegate.persist(o, procedure);
1118             }
1119
1120             @Override
1121             public void saveSnapshot(Object o) {
1122                 savedSnapshot.set(o);
1123                 delegate.saveSnapshot(o);
1124             }
1125
1126             @Override
1127             public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1128                 delegate.deleteSnapshots(criteria);
1129             }
1130
1131             @Override
1132             public void deleteMessages(long sequenceNumber) {
1133                 delegate.deleteMessages(sequenceNumber);
1134             }
1135         }
1136
1137         dataStoreContextBuilder.persistent(persistent);
1138
1139         new ShardTestKit(getSystem()) {{
1140             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1141             Creator<Shard> creator = new Creator<Shard>() {
1142                 @Override
1143                 public Shard create() throws Exception {
1144                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1145                             newDatastoreContext(), SCHEMA_CONTEXT) {
1146
1147                         DelegatingPersistentDataProvider delegating;
1148
1149                         @Override
1150                         protected DataPersistenceProvider persistence() {
1151                             if(delegating == null) {
1152                                 delegating = new DelegatingPersistentDataProvider(super.persistence());
1153                             }
1154
1155                             return delegating;
1156                         }
1157
1158                         @Override
1159                         protected void commitSnapshot(final long sequenceNumber) {
1160                             super.commitSnapshot(sequenceNumber);
1161                             latch.get().countDown();
1162                         }
1163                     };
1164                 }
1165             };
1166
1167             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1168                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1169
1170             waitUntilLeader(shard);
1171
1172             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1173
1174             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1175
1176             CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1177             shard.tell(capture, getRef());
1178
1179             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1180
1181             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1182                     savedSnapshot.get() instanceof Snapshot);
1183
1184             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1185
1186             latch.set(new CountDownLatch(1));
1187             savedSnapshot.set(null);
1188
1189             shard.tell(capture, getRef());
1190
1191             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1192
1193             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1194                     savedSnapshot.get() instanceof Snapshot);
1195
1196             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1197
1198             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1199         }
1200
1201         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1202
1203             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1204             assertEquals("Root node", expectedRoot, actual);
1205
1206         }};
1207     }
1208
1209     /**
1210      * This test simply verifies that the applySnapShot logic will work
1211      * @throws ReadFailedException
1212      */
1213     @Test
1214     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1215         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1216
1217         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1218
1219         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1220         putTransaction.write(TestModel.TEST_PATH,
1221             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1222         commitTransaction(putTransaction);
1223
1224
1225         NormalizedNode<?, ?> expected = readStore(store);
1226
1227         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1228
1229         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1230         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1231
1232         commitTransaction(writeTransaction);
1233
1234         NormalizedNode<?, ?> actual = readStore(store);
1235
1236         assertEquals(expected, actual);
1237     }
1238
1239     @Test
1240     public void testRecoveryApplicable(){
1241
1242         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1243                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1244
1245         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1246                 persistentContext, SCHEMA_CONTEXT);
1247
1248         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1249                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1250
1251         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1252                 nonPersistentContext, SCHEMA_CONTEXT);
1253
1254         new ShardTestKit(getSystem()) {{
1255             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1256                     persistentProps, "testPersistence1");
1257
1258             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1259
1260             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1261
1262             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1263                     nonPersistentProps, "testPersistence2");
1264
1265             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1266
1267             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1268
1269         }};
1270     }
1271
1272     @Test
1273     public void testOnDatastoreContext() {
1274         new ShardTestKit(getSystem()) {{
1275             dataStoreContextBuilder.persistent(true);
1276
1277             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1278
1279             assertEquals("isRecoveryApplicable", true,
1280                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1281
1282             waitUntilLeader(shard);
1283
1284             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1285
1286             assertEquals("isRecoveryApplicable", false,
1287                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1288
1289             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1290
1291             assertEquals("isRecoveryApplicable", true,
1292                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1293
1294             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1295         }};
1296     }
1297
1298     @Test
1299     public void testRegisterRoleChangeListener() throws Exception {
1300         new ShardTestKit(getSystem()) {
1301             {
1302                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1303                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1304                         "testRegisterRoleChangeListener");
1305
1306                 waitUntilLeader(shard);
1307
1308                 TestActorRef<MessageCollectorActor> listener =
1309                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1310
1311                 shard.tell(new RegisterRoleChangeListener(), listener);
1312
1313                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1314                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1315                 // sleep.
1316                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1317
1318                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1319
1320                 assertEquals(1, allMatching.size());
1321             }
1322         };
1323     }
1324
1325     @Test
1326     public void testFollowerInitialSyncStatus() throws Exception {
1327         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1328                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1329                 "testFollowerInitialSyncStatus");
1330
1331         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1332
1333         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1334
1335         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1336
1337         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1338
1339         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1340     }
1341
1342     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1343         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1344         ListenableFuture<Void> future =
1345             commitCohort.preCommit();
1346         try {
1347             future.get();
1348             future = commitCohort.commit();
1349             future.get();
1350         } catch (InterruptedException | ExecutionException e) {
1351         }
1352     }
1353 }