Fix for Bug 2290.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.dispatch.Dispatchers;
7 import akka.dispatch.OnComplete;
8 import akka.japi.Creator;
9 import akka.pattern.Patterns;
10 import akka.testkit.TestActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Function;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.InOrder;
23 import org.mockito.invocation.InvocationOnMock;
24 import org.mockito.stubbing.Answer;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
38 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
39 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
40 import org.opendaylight.controller.cluster.datastore.modification.Modification;
41 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
44 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
45 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
46 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
47 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
48 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
49 import org.opendaylight.controller.cluster.raft.Snapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
55 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
56 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
57 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
58 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
61 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
62 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
63 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
64 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
65 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
66 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
67 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
68 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
74 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
75 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
76 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
77 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
78 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
79 import scala.concurrent.Await;
80 import scala.concurrent.Future;
81 import scala.concurrent.duration.FiniteDuration;
82
83 import java.io.IOException;
84 import java.util.Collections;
85 import java.util.HashSet;
86 import java.util.Map;
87 import java.util.Set;
88 import java.util.concurrent.CountDownLatch;
89 import java.util.concurrent.ExecutionException;
90 import java.util.concurrent.TimeUnit;
91 import java.util.concurrent.atomic.AtomicInteger;
92 import java.util.concurrent.atomic.AtomicReference;
93
94 import static org.junit.Assert.assertEquals;
95 import static org.junit.Assert.assertFalse;
96 import static org.junit.Assert.assertNotNull;
97 import static org.junit.Assert.assertNull;
98 import static org.junit.Assert.assertTrue;
99 import static org.junit.Assert.fail;
100 import static org.mockito.Mockito.doAnswer;
101 import static org.mockito.Mockito.doReturn;
102 import static org.mockito.Mockito.inOrder;
103 import static org.mockito.Mockito.mock;
104
105
106 public class ShardTest extends AbstractActorTest {
107
108     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
109
110     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
111
112     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
113             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
114
115     private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
116             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
117             shardHeartbeatIntervalInMillis(100).build();
118
119     @Before
120     public void setUp() {
121         InMemorySnapshotStore.clear();
122         InMemoryJournal.clear();
123     }
124
125     @After
126     public void tearDown() {
127         InMemorySnapshotStore.clear();
128         InMemoryJournal.clear();
129     }
130
131     private Props newShardProps() {
132         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
133                 dataStoreContext, SCHEMA_CONTEXT);
134     }
135
136     @Test
137     public void testRegisterChangeListener() throws Exception {
138         new ShardTestKit(getSystem()) {{
139             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
140                     newShardProps(),  "testRegisterChangeListener");
141
142             waitUntilLeader(shard);
143
144             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
145
146             MockDataChangeListener listener = new MockDataChangeListener(1);
147             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
148                     "testRegisterChangeListener-DataChangeListener");
149
150             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
151                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
152
153             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
154                     RegisterChangeListenerReply.class);
155             String replyPath = reply.getListenerRegistrationPath().toString();
156             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
157                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
158
159             YangInstanceIdentifier path = TestModel.TEST_PATH;
160             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
161
162             listener.waitForChangeEvents(path);
163
164             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
166         }};
167     }
168
169     @SuppressWarnings("serial")
170     @Test
171     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
172         // This test tests the timing window in which a change listener is registered before the
173         // shard becomes the leader. We verify that the listener is registered and notified of the
174         // existing data when the shard becomes the leader.
175         new ShardTestKit(getSystem()) {{
176             // For this test, we want to send the RegisterChangeListener message after the shard
177             // has recovered from persistence and before it becomes the leader. So we subclass
178             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
179             // we know that the shard has been initialized to a follower and has started the
180             // election process. The following 2 CountDownLatches are used to coordinate the
181             // ElectionTimeout with the sending of the RegisterChangeListener message.
182             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
183             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
184             Creator<Shard> creator = new Creator<Shard>() {
185                 boolean firstElectionTimeout = true;
186
187                 @Override
188                 public Shard create() throws Exception {
189                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
190                             dataStoreContext, SCHEMA_CONTEXT) {
191                         @Override
192                         public void onReceiveCommand(final Object message) throws Exception {
193                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
194                                 // Got the first ElectionTimeout. We don't forward it to the
195                                 // base Shard yet until we've sent the RegisterChangeListener
196                                 // message. So we signal the onFirstElectionTimeout latch to tell
197                                 // the main thread to send the RegisterChangeListener message and
198                                 // start a thread to wait on the onChangeListenerRegistered latch,
199                                 // which the main thread signals after it has sent the message.
200                                 // After the onChangeListenerRegistered is triggered, we send the
201                                 // original ElectionTimeout message to proceed with the election.
202                                 firstElectionTimeout = false;
203                                 final ActorRef self = getSelf();
204                                 new Thread() {
205                                     @Override
206                                     public void run() {
207                                         Uninterruptibles.awaitUninterruptibly(
208                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
209                                         self.tell(message, self);
210                                     }
211                                 }.start();
212
213                                 onFirstElectionTimeout.countDown();
214                             } else {
215                                 super.onReceiveCommand(message);
216                             }
217                         }
218                     };
219                 }
220             };
221
222             MockDataChangeListener listener = new MockDataChangeListener(1);
223             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
224                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
225
226             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
227                     Props.create(new DelegatingShardCreator(creator)),
228                     "testRegisterChangeListenerWhenNotLeaderInitially");
229
230             // Write initial data into the in-memory store.
231             YangInstanceIdentifier path = TestModel.TEST_PATH;
232             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
233
234             // Wait until the shard receives the first ElectionTimeout message.
235             assertEquals("Got first ElectionTimeout", true,
236                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
237
238             // Now send the RegisterChangeListener and wait for the reply.
239             shard.tell(new RegisterChangeListener(path, dclActor.path(),
240                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
241
242             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
243                     RegisterChangeListenerReply.class);
244             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
245
246             // Sanity check - verify the shard is not the leader yet.
247             shard.tell(new FindLeader(), getRef());
248             FindLeaderReply findLeadeReply =
249                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
250             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
251
252             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
253             // with the election process.
254             onChangeListenerRegistered.countDown();
255
256             // Wait for the shard to become the leader and notify our listener with the existing
257             // data in the store.
258             listener.waitForChangeEvents(path);
259
260             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
261             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
262         }};
263     }
264
265     @Test
266     public void testCreateTransaction(){
267         new ShardTestKit(getSystem()) {{
268             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
269
270             waitUntilLeader(shard);
271
272             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
273
274             shard.tell(new CreateTransaction("txn-1",
275                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
276
277             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
278                     CreateTransactionReply.class);
279
280             String path = reply.getTransactionActorPath().toString();
281             assertTrue("Unexpected transaction path " + path,
282                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
283
284             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
285         }};
286     }
287
288     @Test
289     public void testCreateTransactionOnChain(){
290         new ShardTestKit(getSystem()) {{
291             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
292
293             waitUntilLeader(shard);
294
295             shard.tell(new CreateTransaction("txn-1",
296                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
297                     getRef());
298
299             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
300                     CreateTransactionReply.class);
301
302             String path = reply.getTransactionActorPath().toString();
303             assertTrue("Unexpected transaction path " + path,
304                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
305
306             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
307         }};
308     }
309
310     @Test
311     public void testPeerAddressResolved() throws Exception {
312         new ShardTestKit(getSystem()) {{
313             final CountDownLatch recoveryComplete = new CountDownLatch(1);
314             class TestShard extends Shard {
315                 TestShard() {
316                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
317                             dataStoreContext, SCHEMA_CONTEXT);
318                 }
319
320                 Map<String, String> getPeerAddresses() {
321                     return getRaftActorContext().getPeerAddresses();
322                 }
323
324                 @Override
325                 protected void onRecoveryComplete() {
326                     try {
327                         super.onRecoveryComplete();
328                     } finally {
329                         recoveryComplete.countDown();
330                     }
331                 }
332             }
333
334             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
335                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
336                         @Override
337                         public TestShard create() throws Exception {
338                             return new TestShard();
339                         }
340                     })), "testPeerAddressResolved");
341
342             //waitUntilLeader(shard);
343             assertEquals("Recovery complete", true,
344                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
345
346             String address = "akka://foobar";
347             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
348
349             assertEquals("getPeerAddresses", address,
350                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
351
352             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
353         }};
354     }
355
356     @Test
357     public void testApplySnapshot() throws Exception {
358         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
359                 "testApplySnapshot");
360
361         NormalizedNodeToNodeCodec codec =
362             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
363
364         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
365
366         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
367         NormalizedNode<?,?> expected = readStore(shard, root);
368
369         NormalizedNodeMessages.Container encode = codec.encode(expected);
370
371         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
372                 encode.getNormalizedNode().toByteString().toByteArray(),
373                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
374
375         shard.underlyingActor().onReceiveCommand(applySnapshot);
376
377         NormalizedNode<?,?> actual = readStore(shard, root);
378
379         assertEquals(expected, actual);
380
381         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
382     }
383
384     @Test
385     public void testApplyState() throws Exception {
386
387         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
388
389         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
390
391         MutableCompositeModification compMod = new MutableCompositeModification();
392         compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
393         Payload payload = new CompositeModificationPayload(compMod.toSerializable());
394         ApplyState applyState = new ApplyState(null, "test",
395                 new ReplicatedLogImplEntry(1, 2, payload));
396
397         shard.underlyingActor().onReceiveCommand(applyState);
398
399         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
400         assertEquals("Applied state", node, actual);
401
402         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
403     }
404
405     @SuppressWarnings("serial")
406     @Test
407     public void testRecovery() throws Exception {
408
409         // Set up the InMemorySnapshotStore.
410
411         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
412         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
413
414         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
415         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
416         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
417         commitCohort.preCommit().get();
418         commitCohort.commit().get();
419
420         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
421         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
422
423         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
424                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
425                         root).
426                                 getNormalizedNode().toByteString().toByteArray(),
427                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
428
429         // Set up the InMemoryJournal.
430
431         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
432                   new WriteModification(TestModel.OUTER_LIST_PATH,
433                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
434                           SCHEMA_CONTEXT))));
435
436         int nListEntries = 11;
437         Set<Integer> listEntryKeys = new HashSet<>();
438         for(int i = 1; i <= nListEntries; i++) {
439             listEntryKeys.add(Integer.valueOf(i));
440             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
441                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
442             Modification mod = new MergeModification(path,
443                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
444                     SCHEMA_CONTEXT);
445             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
446                     newPayload(mod)));
447         }
448
449         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
450                 new ApplyLogEntries(nListEntries));
451
452         // Create the actor and wait for recovery complete.
453
454         final CountDownLatch recoveryComplete = new CountDownLatch(1);
455
456         Creator<Shard> creator = new Creator<Shard>() {
457             @Override
458             public Shard create() throws Exception {
459                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
460                         dataStoreContext, SCHEMA_CONTEXT) {
461                     @Override
462                     protected void onRecoveryComplete() {
463                         try {
464                             super.onRecoveryComplete();
465                         } finally {
466                             recoveryComplete.countDown();
467                         }
468                     }
469                 };
470             }
471         };
472
473         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
474                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
475
476         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
477
478         // Verify data in the data store.
479
480         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
481         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
482         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
483                 outerList.getValue() instanceof Iterable);
484         for(Object entry: (Iterable<?>) outerList.getValue()) {
485             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
486                     entry instanceof MapEntryNode);
487             MapEntryNode mapEntry = (MapEntryNode)entry;
488             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
489                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
490             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
491             Object value = idLeaf.get().getValue();
492             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
493                     listEntryKeys.remove(value));
494         }
495
496         if(!listEntryKeys.isEmpty()) {
497             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
498                     listEntryKeys);
499         }
500
501         assertEquals("Last log index", nListEntries,
502                 shard.underlyingActor().getShardMBean().getLastLogIndex());
503         assertEquals("Commit index", nListEntries,
504                 shard.underlyingActor().getShardMBean().getCommitIndex());
505         assertEquals("Last applied", nListEntries,
506                 shard.underlyingActor().getShardMBean().getLastApplied());
507
508         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
509     }
510
511     private CompositeModificationPayload newPayload(Modification... mods) {
512         MutableCompositeModification compMod = new MutableCompositeModification();
513         for(Modification mod: mods) {
514             compMod.addModification(mod);
515         }
516
517         return new CompositeModificationPayload(compMod.toSerializable());
518     }
519
520     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
521             InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
522             MutableCompositeModification modification) {
523         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
524     }
525
526     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
527             InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
528             MutableCompositeModification modification,
529             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
530
531         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
532         tx.write(path, data);
533         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
534         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
535
536         doAnswer(new Answer<ListenableFuture<Boolean>>() {
537             @Override
538             public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
539                 return realCohort.canCommit();
540             }
541         }).when(cohort).canCommit();
542
543         doAnswer(new Answer<ListenableFuture<Void>>() {
544             @Override
545             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
546                 if(preCommit != null) {
547                     return preCommit.apply(realCohort);
548                 } else {
549                     return realCohort.preCommit();
550                 }
551             }
552         }).when(cohort).preCommit();
553
554         doAnswer(new Answer<ListenableFuture<Void>>() {
555             @Override
556             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
557                 return realCohort.commit();
558             }
559         }).when(cohort).commit();
560
561         doAnswer(new Answer<ListenableFuture<Void>>() {
562             @Override
563             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
564                 return realCohort.abort();
565             }
566         }).when(cohort).abort();
567
568         modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
569
570         return cohort;
571     }
572
573     @SuppressWarnings({ "unchecked" })
574     @Test
575     public void testConcurrentThreePhaseCommits() throws Throwable {
576         new ShardTestKit(getSystem()) {{
577             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
578                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
579                     "testConcurrentThreePhaseCommits");
580
581             waitUntilLeader(shard);
582
583             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
584
585             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
586
587             String transactionID1 = "tx1";
588             MutableCompositeModification modification1 = new MutableCompositeModification();
589             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
590                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
591
592             String transactionID2 = "tx2";
593             MutableCompositeModification modification2 = new MutableCompositeModification();
594             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
595                     TestModel.OUTER_LIST_PATH,
596                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
597                     modification2);
598
599             String transactionID3 = "tx3";
600             MutableCompositeModification modification3 = new MutableCompositeModification();
601             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
602                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
603                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
604                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
605                     modification3);
606
607             long timeoutSec = 5;
608             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
609             final Timeout timeout = new Timeout(duration);
610
611             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
612             // by the ShardTransaction.
613
614             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
615             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
616                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
617             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
618
619             // Send the CanCommitTransaction message for the first Tx.
620
621             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
622             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
623                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
624             assertEquals("Can commit", true, canCommitReply.getCanCommit());
625
626             // Send the ForwardedReadyTransaction for the next 2 Tx's.
627
628             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
629             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
630
631             shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
632             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
633
634             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
635             // processed after the first Tx completes.
636
637             Future<Object> canCommitFuture1 = Patterns.ask(shard,
638                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
639
640             Future<Object> canCommitFuture2 = Patterns.ask(shard,
641                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
642
643             // Send the CommitTransaction message for the first Tx. After it completes, it should
644             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
645
646             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
647             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
648
649             // Wait for the next 2 Tx's to complete.
650
651             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
652             final CountDownLatch commitLatch = new CountDownLatch(2);
653
654             class OnFutureComplete extends OnComplete<Object> {
655                 private final Class<?> expRespType;
656
657                 OnFutureComplete(Class<?> expRespType) {
658                     this.expRespType = expRespType;
659                 }
660
661                 @Override
662                 public void onComplete(Throwable error, Object resp) {
663                     if(error != null) {
664                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
665                     } else {
666                         try {
667                             assertEquals("Commit response type", expRespType, resp.getClass());
668                             onSuccess(resp);
669                         } catch (Exception e) {
670                             caughtEx.set(e);
671                         }
672                     }
673                 }
674
675                 void onSuccess(Object resp) throws Exception {
676                 }
677             }
678
679             class OnCommitFutureComplete extends OnFutureComplete {
680                 OnCommitFutureComplete() {
681                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
682                 }
683
684                 @Override
685                 public void onComplete(Throwable error, Object resp) {
686                     super.onComplete(error, resp);
687                     commitLatch.countDown();
688                 }
689             }
690
691             class OnCanCommitFutureComplete extends OnFutureComplete {
692                 private final String transactionID;
693
694                 OnCanCommitFutureComplete(String transactionID) {
695                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
696                     this.transactionID = transactionID;
697                 }
698
699                 @Override
700                 void onSuccess(Object resp) throws Exception {
701                     CanCommitTransactionReply canCommitReply =
702                             CanCommitTransactionReply.fromSerializable(resp);
703                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
704
705                     Future<Object> commitFuture = Patterns.ask(shard,
706                             new CommitTransaction(transactionID).toSerializable(), timeout);
707                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
708                 }
709             }
710
711             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
712                     getSystem().dispatcher());
713
714             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
715                     getSystem().dispatcher());
716
717             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
718
719             if(caughtEx.get() != null) {
720                 throw caughtEx.get();
721             }
722
723             assertEquals("Commits complete", true, done);
724
725             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
726             inOrder.verify(cohort1).canCommit();
727             inOrder.verify(cohort1).preCommit();
728             inOrder.verify(cohort1).commit();
729             inOrder.verify(cohort2).canCommit();
730             inOrder.verify(cohort2).preCommit();
731             inOrder.verify(cohort2).commit();
732             inOrder.verify(cohort3).canCommit();
733             inOrder.verify(cohort3).preCommit();
734             inOrder.verify(cohort3).commit();
735
736             // Verify data in the data store.
737
738             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
739             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
740             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
741                     outerList.getValue() instanceof Iterable);
742             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
743             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
744                        entry instanceof MapEntryNode);
745             MapEntryNode mapEntry = (MapEntryNode)entry;
746             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
747                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
748             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
749             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
750
751             for(int i = 0; i < 20 * 5; i++) {
752                 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
753                 if(lastLogIndex == 2) {
754                     break;
755                 }
756                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
757             }
758
759             assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
760
761             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
762         }};
763     }
764
765     @Test
766     public void testCommitPhaseFailure() throws Throwable {
767         new ShardTestKit(getSystem()) {{
768             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
769                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
770                     "testCommitPhaseFailure");
771
772             waitUntilLeader(shard);
773
774             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
775             // commit phase.
776
777             String transactionID1 = "tx1";
778             MutableCompositeModification modification1 = new MutableCompositeModification();
779             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
780             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
781             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
782             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
783
784             String transactionID2 = "tx2";
785             MutableCompositeModification modification2 = new MutableCompositeModification();
786             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
787             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
788
789             FiniteDuration duration = duration("5 seconds");
790             final Timeout timeout = new Timeout(duration);
791
792             // Simulate the ForwardedReadyTransaction messages that would be sent
793             // by the ShardTransaction.
794
795             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
796             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
797
798             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
799             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
800
801             // Send the CanCommitTransaction message for the first Tx.
802
803             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
804             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
805                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
806             assertEquals("Can commit", true, canCommitReply.getCanCommit());
807
808             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
809             // processed after the first Tx completes.
810
811             Future<Object> canCommitFuture = Patterns.ask(shard,
812                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
813
814             // Send the CommitTransaction message for the first Tx. This should send back an error
815             // and trigger the 2nd Tx to proceed.
816
817             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
818             expectMsgClass(duration, akka.actor.Status.Failure.class);
819
820             // Wait for the 2nd Tx to complete the canCommit phase.
821
822             final CountDownLatch latch = new CountDownLatch(1);
823             canCommitFuture.onComplete(new OnComplete<Object>() {
824                 @Override
825                 public void onComplete(Throwable t, Object resp) {
826                     latch.countDown();
827                 }
828             }, getSystem().dispatcher());
829
830             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
831
832             InOrder inOrder = inOrder(cohort1, cohort2);
833             inOrder.verify(cohort1).canCommit();
834             inOrder.verify(cohort1).preCommit();
835             inOrder.verify(cohort1).commit();
836             inOrder.verify(cohort2).canCommit();
837
838             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
839         }};
840     }
841
842     @Test
843     public void testPreCommitPhaseFailure() throws Throwable {
844         new ShardTestKit(getSystem()) {{
845             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
846                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
847                     "testPreCommitPhaseFailure");
848
849             waitUntilLeader(shard);
850
851             String transactionID = "tx1";
852             MutableCompositeModification modification = new MutableCompositeModification();
853             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
854             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
855             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
856
857             FiniteDuration duration = duration("5 seconds");
858
859             // Simulate the ForwardedReadyTransaction messages that would be sent
860             // by the ShardTransaction.
861
862             shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
863             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
864
865             // Send the CanCommitTransaction message.
866
867             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
868             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
869                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
870             assertEquals("Can commit", true, canCommitReply.getCanCommit());
871
872             // Send the CommitTransaction message. This should send back an error
873             // for preCommit failure.
874
875             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
876             expectMsgClass(duration, akka.actor.Status.Failure.class);
877
878             InOrder inOrder = inOrder(cohort);
879             inOrder.verify(cohort).canCommit();
880             inOrder.verify(cohort).preCommit();
881
882             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
883         }};
884     }
885
886     @Test
887     public void testCanCommitPhaseFailure() throws Throwable {
888         new ShardTestKit(getSystem()) {{
889             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
890                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
891                     "testCanCommitPhaseFailure");
892
893             waitUntilLeader(shard);
894
895             final FiniteDuration duration = duration("5 seconds");
896
897             String transactionID = "tx1";
898             MutableCompositeModification modification = new MutableCompositeModification();
899             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
900             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
901
902             // Simulate the ForwardedReadyTransaction messages that would be sent
903             // by the ShardTransaction.
904
905             shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
906             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
907
908             // Send the CanCommitTransaction message.
909
910             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
911             expectMsgClass(duration, akka.actor.Status.Failure.class);
912
913             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
914         }};
915     }
916
917     @Test
918     public void testAbortBeforeFinishCommit() throws Throwable {
919         new ShardTestKit(getSystem()) {{
920             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
921                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
922                     "testAbortBeforeFinishCommit");
923
924             waitUntilLeader(shard);
925
926             final FiniteDuration duration = duration("5 seconds");
927             final Timeout timeout = new Timeout(duration);
928
929             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
930
931             final String transactionID = "tx1";
932             final CountDownLatch abortComplete = new CountDownLatch(1);
933             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
934                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
935                 @Override
936                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
937                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
938
939                     Future<Object> abortFuture = Patterns.ask(shard,
940                             new AbortTransaction(transactionID).toSerializable(), timeout);
941                     abortFuture.onComplete(new OnComplete<Object>() {
942                         @Override
943                         public void onComplete(Throwable e, Object resp) {
944                             abortComplete.countDown();
945                         }
946                     }, getSystem().dispatcher());
947
948                     return preCommitFuture;
949                 }
950             };
951
952             MutableCompositeModification modification = new MutableCompositeModification();
953             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
954                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
955                     modification, preCommit);
956
957             shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
958             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
959
960             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
961             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
962                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
963             assertEquals("Can commit", true, canCommitReply.getCanCommit());
964
965             Future<Object> commitFuture = Patterns.ask(shard,
966                     new CommitTransaction(transactionID).toSerializable(), timeout);
967
968             assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
969
970             Await.result(commitFuture, duration);
971
972             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
973             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
974
975             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
976         }};
977     }
978
979     @Test
980     public void testTransactionCommitTimeout() throws Throwable {
981         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
982
983         new ShardTestKit(getSystem()) {{
984             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
985                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
986                     "testTransactionCommitTimeout");
987
988             waitUntilLeader(shard);
989
990             final FiniteDuration duration = duration("5 seconds");
991
992             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
993
994             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
995             writeToStore(shard, TestModel.OUTER_LIST_PATH,
996                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
997
998             // Create 1st Tx - will timeout
999
1000             String transactionID1 = "tx1";
1001             MutableCompositeModification modification1 = new MutableCompositeModification();
1002             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1003                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1004                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1005                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1006                     modification1);
1007
1008             // Create 2nd Tx
1009
1010             String transactionID2 = "tx3";
1011             MutableCompositeModification modification2 = new MutableCompositeModification();
1012             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1013                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1014             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1015                     listNodePath,
1016                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1017                     modification2);
1018
1019             // Ready the Tx's
1020
1021             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
1022             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1023
1024             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
1025             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1026
1027             // canCommit 1st Tx. We don't send the commit so it should timeout.
1028
1029             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1030             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1031
1032             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1033
1034             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1035             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1036
1037             // Commit the 2nd Tx.
1038
1039             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1040             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1041
1042             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1043             assertNotNull(listNodePath + " not found", node);
1044
1045             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1046         }};
1047     }
1048
1049     @Test
1050     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1051         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1052
1053         new ShardTestKit(getSystem()) {{
1054             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1055                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1056                     "testTransactionCommitQueueCapacityExceeded");
1057
1058             waitUntilLeader(shard);
1059
1060             final FiniteDuration duration = duration("5 seconds");
1061
1062             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1063
1064             String transactionID1 = "tx1";
1065             MutableCompositeModification modification1 = new MutableCompositeModification();
1066             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1067                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1068
1069             String transactionID2 = "tx2";
1070             MutableCompositeModification modification2 = new MutableCompositeModification();
1071             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1072                     TestModel.OUTER_LIST_PATH,
1073                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1074                     modification2);
1075
1076             String transactionID3 = "tx3";
1077             MutableCompositeModification modification3 = new MutableCompositeModification();
1078             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1079                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1080
1081             // Ready the Tx's
1082
1083             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
1084             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1085
1086             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
1087             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1088
1089             shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
1090             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1091
1092             // canCommit 1st Tx.
1093
1094             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1095             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1096
1097             // canCommit the 2nd Tx - it should get queued.
1098
1099             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1100
1101             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1102
1103             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1104             expectMsgClass(duration, akka.actor.Status.Failure.class);
1105
1106             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1107         }};
1108     }
1109
1110     @Test
1111     public void testCanCommitBeforeReadyFailure() throws Throwable {
1112         new ShardTestKit(getSystem()) {{
1113             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1114                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1115                     "testCanCommitBeforeReadyFailure");
1116
1117             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1118             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1119
1120             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1121         }};
1122     }
1123
1124     @Test
1125     public void testAbortTransaction() throws Throwable {
1126         new ShardTestKit(getSystem()) {{
1127             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1128                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1129                     "testAbortTransaction");
1130
1131             waitUntilLeader(shard);
1132
1133             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1134
1135             String transactionID1 = "tx1";
1136             MutableCompositeModification modification1 = new MutableCompositeModification();
1137             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1138             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1139             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1140
1141             String transactionID2 = "tx2";
1142             MutableCompositeModification modification2 = new MutableCompositeModification();
1143             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1144             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1145
1146             FiniteDuration duration = duration("5 seconds");
1147             final Timeout timeout = new Timeout(duration);
1148
1149             // Simulate the ForwardedReadyTransaction messages that would be sent
1150             // by the ShardTransaction.
1151
1152             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
1153             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1154
1155             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
1156             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1157
1158             // Send the CanCommitTransaction message for the first Tx.
1159
1160             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1161             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1162                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1163             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1164
1165             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1166             // processed after the first Tx completes.
1167
1168             Future<Object> canCommitFuture = Patterns.ask(shard,
1169                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1170
1171             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1172             // Tx to proceed.
1173
1174             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1175             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1176
1177             // Wait for the 2nd Tx to complete the canCommit phase.
1178
1179             final CountDownLatch latch = new CountDownLatch(1);
1180             canCommitFuture.onComplete(new OnComplete<Object>() {
1181                 @Override
1182                 public void onComplete(Throwable t, Object resp) {
1183                     latch.countDown();
1184                 }
1185             }, getSystem().dispatcher());
1186
1187             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1188
1189             InOrder inOrder = inOrder(cohort1, cohort2);
1190             inOrder.verify(cohort1).canCommit();
1191             inOrder.verify(cohort2).canCommit();
1192
1193             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1194         }};
1195     }
1196
1197     @Test
1198     public void testCreateSnapshot() throws IOException, InterruptedException {
1199             testCreateSnapshot(true, "testCreateSnapshot");
1200     }
1201
1202     @Test
1203     public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1204         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1205     }
1206
1207     public void testCreateSnapshot(boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1208         final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1209                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1210
1211         new ShardTestKit(getSystem()) {{
1212             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1213             Creator<Shard> creator = new Creator<Shard>() {
1214                 @Override
1215                 public Shard create() throws Exception {
1216                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1217                             dataStoreContext, SCHEMA_CONTEXT) {
1218                         @Override
1219                         protected void commitSnapshot(long sequenceNumber) {
1220                             super.commitSnapshot(sequenceNumber);
1221                             latch.get().countDown();
1222                         }
1223                     };
1224                 }
1225             };
1226
1227             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1228                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1229
1230             waitUntilLeader(shard);
1231
1232             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1233
1234             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1235
1236             latch.set(new CountDownLatch(1));
1237             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1238
1239             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1240
1241             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1242         }};
1243     }
1244
1245     /**
1246      * This test simply verifies that the applySnapShot logic will work
1247      * @throws ReadFailedException
1248      */
1249     @Test
1250     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1251         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
1252             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
1253
1254         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1255
1256         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1257         putTransaction.write(TestModel.TEST_PATH,
1258             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1259         commitTransaction(putTransaction);
1260
1261
1262         NormalizedNode expected = readStore(store);
1263
1264         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1265
1266         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1267         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1268
1269         commitTransaction(writeTransaction);
1270
1271         NormalizedNode actual = readStore(store);
1272
1273         assertEquals(expected, actual);
1274
1275     }
1276
1277     @Test
1278     public void testRecoveryApplicable(){
1279
1280         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1281                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1282
1283         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1284                 persistentContext, SCHEMA_CONTEXT);
1285
1286         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1287                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1288
1289         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1290                 nonPersistentContext, SCHEMA_CONTEXT);
1291
1292         new ShardTestKit(getSystem()) {{
1293             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1294                     persistentProps, "testPersistence1");
1295
1296             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1297
1298             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1299
1300             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1301                     nonPersistentProps, "testPersistence2");
1302
1303             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1304
1305             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1306
1307         }};
1308
1309     }
1310
1311
1312     private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
1313         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1314         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1315             transaction.read(YangInstanceIdentifier.builder().build());
1316
1317         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1318
1319         NormalizedNode<?, ?> normalizedNode = optional.get();
1320
1321         transaction.close();
1322
1323         return normalizedNode;
1324     }
1325
1326     private void commitTransaction(DOMStoreWriteTransaction transaction) {
1327         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1328         ListenableFuture<Void> future =
1329             commitCohort.preCommit();
1330         try {
1331             future.get();
1332             future = commitCohort.commit();
1333             future.get();
1334         } catch (InterruptedException | ExecutionException e) {
1335         }
1336     }
1337
1338     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1339         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1340             @Override
1341             public void onDataChanged(
1342                 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1343
1344             }
1345         };
1346     }
1347
1348     private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
1349             throws ExecutionException, InterruptedException {
1350         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1351
1352         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1353             transaction.read(id);
1354
1355         Optional<NormalizedNode<?, ?>> optional = future.get();
1356         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1357
1358         transaction.close();
1359
1360         return node;
1361     }
1362
1363     private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
1364         throws ExecutionException, InterruptedException {
1365         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1366
1367         transaction.write(id, node);
1368
1369         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1370         commitCohort.preCommit().get();
1371         commitCohort.commit().get();
1372     }
1373
1374     private static final class DelegatingShardCreator implements Creator<Shard> {
1375         private final Creator<Shard> delegate;
1376
1377         DelegatingShardCreator(Creator<Shard> delegate) {
1378             this.delegate = delegate;
1379         }
1380
1381         @Override
1382         public Shard create() throws Exception {
1383             return delegate.create();
1384         }
1385     }
1386 }