Merge "Introduce forwarding DOMData classes"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.mockito.InOrder;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
59 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
60 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
61 import org.opendaylight.controller.cluster.datastore.modification.Modification;
62 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
63 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
64 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
65 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
66 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
67 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
68 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
70 import org.opendaylight.controller.cluster.raft.Snapshot;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
74 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
75 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
76 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
78 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
79 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
80 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
81 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
82 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
83 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
84 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
85 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
86 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
87 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
88 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
89 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
90 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
91 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
92 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
93 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
94 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
95 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
96 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
97 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
98 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
99 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
100 import scala.concurrent.Await;
101 import scala.concurrent.Future;
102 import scala.concurrent.duration.FiniteDuration;
103
104
105 public class ShardTest extends AbstractActorTest {
106
107     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
108
109     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
110
111     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
112             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
113
114     private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
115             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
116             shardHeartbeatIntervalInMillis(100).build();
117
118     @Before
119     public void setUp() {
120         InMemorySnapshotStore.clear();
121         InMemoryJournal.clear();
122     }
123
124     @After
125     public void tearDown() {
126         InMemorySnapshotStore.clear();
127         InMemoryJournal.clear();
128     }
129
130     private Props newShardProps() {
131         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
132                 dataStoreContext, SCHEMA_CONTEXT);
133     }
134
135     @Test
136     public void testRegisterChangeListener() throws Exception {
137         new ShardTestKit(getSystem()) {{
138             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
139                     newShardProps(),  "testRegisterChangeListener");
140
141             waitUntilLeader(shard);
142
143             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
144
145             MockDataChangeListener listener = new MockDataChangeListener(1);
146             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
147                     "testRegisterChangeListener-DataChangeListener");
148
149             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
150                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
151
152             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
153                     RegisterChangeListenerReply.class);
154             String replyPath = reply.getListenerRegistrationPath().toString();
155             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
156                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
157
158             YangInstanceIdentifier path = TestModel.TEST_PATH;
159             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
160
161             listener.waitForChangeEvents(path);
162
163             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
164             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
165         }};
166     }
167
168     @SuppressWarnings("serial")
169     @Test
170     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
171         // This test tests the timing window in which a change listener is registered before the
172         // shard becomes the leader. We verify that the listener is registered and notified of the
173         // existing data when the shard becomes the leader.
174         new ShardTestKit(getSystem()) {{
175             // For this test, we want to send the RegisterChangeListener message after the shard
176             // has recovered from persistence and before it becomes the leader. So we subclass
177             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
178             // we know that the shard has been initialized to a follower and has started the
179             // election process. The following 2 CountDownLatches are used to coordinate the
180             // ElectionTimeout with the sending of the RegisterChangeListener message.
181             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
182             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
183             Creator<Shard> creator = new Creator<Shard>() {
184                 boolean firstElectionTimeout = true;
185
186                 @Override
187                 public Shard create() throws Exception {
188                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
189                             dataStoreContext, SCHEMA_CONTEXT) {
190                         @Override
191                         public void onReceiveCommand(final Object message) throws Exception {
192                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
193                                 // Got the first ElectionTimeout. We don't forward it to the
194                                 // base Shard yet until we've sent the RegisterChangeListener
195                                 // message. So we signal the onFirstElectionTimeout latch to tell
196                                 // the main thread to send the RegisterChangeListener message and
197                                 // start a thread to wait on the onChangeListenerRegistered latch,
198                                 // which the main thread signals after it has sent the message.
199                                 // After the onChangeListenerRegistered is triggered, we send the
200                                 // original ElectionTimeout message to proceed with the election.
201                                 firstElectionTimeout = false;
202                                 final ActorRef self = getSelf();
203                                 new Thread() {
204                                     @Override
205                                     public void run() {
206                                         Uninterruptibles.awaitUninterruptibly(
207                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
208                                         self.tell(message, self);
209                                     }
210                                 }.start();
211
212                                 onFirstElectionTimeout.countDown();
213                             } else {
214                                 super.onReceiveCommand(message);
215                             }
216                         }
217                     };
218                 }
219             };
220
221             MockDataChangeListener listener = new MockDataChangeListener(1);
222             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
223                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
224
225             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
226                     Props.create(new DelegatingShardCreator(creator)),
227                     "testRegisterChangeListenerWhenNotLeaderInitially");
228
229             // Write initial data into the in-memory store.
230             YangInstanceIdentifier path = TestModel.TEST_PATH;
231             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
232
233             // Wait until the shard receives the first ElectionTimeout message.
234             assertEquals("Got first ElectionTimeout", true,
235                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
236
237             // Now send the RegisterChangeListener and wait for the reply.
238             shard.tell(new RegisterChangeListener(path, dclActor.path(),
239                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
240
241             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
242                     RegisterChangeListenerReply.class);
243             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
244
245             // Sanity check - verify the shard is not the leader yet.
246             shard.tell(new FindLeader(), getRef());
247             FindLeaderReply findLeadeReply =
248                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
249             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
250
251             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
252             // with the election process.
253             onChangeListenerRegistered.countDown();
254
255             // Wait for the shard to become the leader and notify our listener with the existing
256             // data in the store.
257             listener.waitForChangeEvents(path);
258
259             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
260             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
261         }};
262     }
263
264     @Test
265     public void testCreateTransaction(){
266         new ShardTestKit(getSystem()) {{
267             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
268
269             waitUntilLeader(shard);
270
271             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
272
273             shard.tell(new CreateTransaction("txn-1",
274                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
275
276             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
277                     CreateTransactionReply.class);
278
279             String path = reply.getTransactionActorPath().toString();
280             assertTrue("Unexpected transaction path " + path,
281                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
282
283             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
284         }};
285     }
286
287     @Test
288     public void testCreateTransactionOnChain(){
289         new ShardTestKit(getSystem()) {{
290             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
291
292             waitUntilLeader(shard);
293
294             shard.tell(new CreateTransaction("txn-1",
295                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
296                     getRef());
297
298             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
299                     CreateTransactionReply.class);
300
301             String path = reply.getTransactionActorPath().toString();
302             assertTrue("Unexpected transaction path " + path,
303                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
304
305             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
306         }};
307     }
308
309     @SuppressWarnings("serial")
310     @Test
311     public void testPeerAddressResolved() throws Exception {
312         new ShardTestKit(getSystem()) {{
313             final CountDownLatch recoveryComplete = new CountDownLatch(1);
314             class TestShard extends Shard {
315                 TestShard() {
316                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
317                             dataStoreContext, SCHEMA_CONTEXT);
318                 }
319
320                 Map<String, String> getPeerAddresses() {
321                     return getRaftActorContext().getPeerAddresses();
322                 }
323
324                 @Override
325                 protected void onRecoveryComplete() {
326                     try {
327                         super.onRecoveryComplete();
328                     } finally {
329                         recoveryComplete.countDown();
330                     }
331                 }
332             }
333
334             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
335                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
336                         @Override
337                         public TestShard create() throws Exception {
338                             return new TestShard();
339                         }
340                     })), "testPeerAddressResolved");
341
342             //waitUntilLeader(shard);
343             assertEquals("Recovery complete", true,
344                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
345
346             String address = "akka://foobar";
347             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
348
349             assertEquals("getPeerAddresses", address,
350                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
351
352             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
353         }};
354     }
355
356     @Test
357     public void testApplySnapshot() throws Exception {
358         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
359                 "testApplySnapshot");
360
361         NormalizedNodeToNodeCodec codec =
362             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
363
364         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
365
366         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
367         NormalizedNode<?,?> expected = readStore(shard, root);
368
369         NormalizedNodeMessages.Container encode = codec.encode(expected);
370
371         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
372                 encode.getNormalizedNode().toByteString().toByteArray(),
373                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
374
375         shard.underlyingActor().onReceiveCommand(applySnapshot);
376
377         NormalizedNode<?,?> actual = readStore(shard, root);
378
379         assertEquals(expected, actual);
380
381         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
382     }
383
384     @Test
385     public void testApplyState() throws Exception {
386
387         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
388
389         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
390
391         MutableCompositeModification compMod = new MutableCompositeModification();
392         compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
393         Payload payload = new CompositeModificationPayload(compMod.toSerializable());
394         ApplyState applyState = new ApplyState(null, "test",
395                 new ReplicatedLogImplEntry(1, 2, payload));
396
397         shard.underlyingActor().onReceiveCommand(applyState);
398
399         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
400         assertEquals("Applied state", node, actual);
401
402         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
403     }
404
405     @SuppressWarnings("serial")
406     @Test
407     public void testRecovery() throws Exception {
408
409         // Set up the InMemorySnapshotStore.
410
411         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
412         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
413
414         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
415         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
416         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
417         commitCohort.preCommit().get();
418         commitCohort.commit().get();
419
420         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
421         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
422
423         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
424                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
425                         root).
426                                 getNormalizedNode().toByteString().toByteArray(),
427                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
428
429         // Set up the InMemoryJournal.
430
431         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
432                   new WriteModification(TestModel.OUTER_LIST_PATH,
433                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
434                           SCHEMA_CONTEXT))));
435
436         int nListEntries = 11;
437         Set<Integer> listEntryKeys = new HashSet<>();
438         for(int i = 1; i <= nListEntries; i++) {
439             listEntryKeys.add(Integer.valueOf(i));
440             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
441                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
442             Modification mod = new MergeModification(path,
443                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
444                     SCHEMA_CONTEXT);
445             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
446                     newPayload(mod)));
447         }
448
449         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
450                 new ApplyLogEntries(nListEntries));
451
452         // Create the actor and wait for recovery complete.
453
454         final CountDownLatch recoveryComplete = new CountDownLatch(1);
455
456         Creator<Shard> creator = new Creator<Shard>() {
457             @Override
458             public Shard create() throws Exception {
459                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
460                         dataStoreContext, SCHEMA_CONTEXT) {
461                     @Override
462                     protected void onRecoveryComplete() {
463                         try {
464                             super.onRecoveryComplete();
465                         } finally {
466                             recoveryComplete.countDown();
467                         }
468                     }
469                 };
470             }
471         };
472
473         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
474                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
475
476         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
477
478         // Verify data in the data store.
479
480         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
481         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
482         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
483                 outerList.getValue() instanceof Iterable);
484         for(Object entry: (Iterable<?>) outerList.getValue()) {
485             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
486                     entry instanceof MapEntryNode);
487             MapEntryNode mapEntry = (MapEntryNode)entry;
488             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
489                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
490             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
491             Object value = idLeaf.get().getValue();
492             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
493                     listEntryKeys.remove(value));
494         }
495
496         if(!listEntryKeys.isEmpty()) {
497             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
498                     listEntryKeys);
499         }
500
501         assertEquals("Last log index", nListEntries,
502                 shard.underlyingActor().getShardMBean().getLastLogIndex());
503         assertEquals("Commit index", nListEntries,
504                 shard.underlyingActor().getShardMBean().getCommitIndex());
505         assertEquals("Last applied", nListEntries,
506                 shard.underlyingActor().getShardMBean().getLastApplied());
507
508         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
509     }
510
511     private CompositeModificationPayload newPayload(final Modification... mods) {
512         MutableCompositeModification compMod = new MutableCompositeModification();
513         for(Modification mod: mods) {
514             compMod.addModification(mod);
515         }
516
517         return new CompositeModificationPayload(compMod.toSerializable());
518     }
519
520     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
521             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
522             final MutableCompositeModification modification) {
523         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
524     }
525
526     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
527             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
528             final MutableCompositeModification modification,
529             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
530
531         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
532         tx.write(path, data);
533         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
534         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
535
536         doAnswer(new Answer<ListenableFuture<Boolean>>() {
537             @Override
538             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
539                 return realCohort.canCommit();
540             }
541         }).when(cohort).canCommit();
542
543         doAnswer(new Answer<ListenableFuture<Void>>() {
544             @Override
545             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
546                 if(preCommit != null) {
547                     return preCommit.apply(realCohort);
548                 } else {
549                     return realCohort.preCommit();
550                 }
551             }
552         }).when(cohort).preCommit();
553
554         doAnswer(new Answer<ListenableFuture<Void>>() {
555             @Override
556             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
557                 return realCohort.commit();
558             }
559         }).when(cohort).commit();
560
561         doAnswer(new Answer<ListenableFuture<Void>>() {
562             @Override
563             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
564                 return realCohort.abort();
565             }
566         }).when(cohort).abort();
567
568         modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
569
570         return cohort;
571     }
572
573     @SuppressWarnings({ "unchecked" })
574     @Test
575     public void testConcurrentThreePhaseCommits() throws Throwable {
576         new ShardTestKit(getSystem()) {{
577             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
578                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
579                     "testConcurrentThreePhaseCommits");
580
581             waitUntilLeader(shard);
582
583             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
584
585             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
586
587             String transactionID1 = "tx1";
588             MutableCompositeModification modification1 = new MutableCompositeModification();
589             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
590                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
591
592             String transactionID2 = "tx2";
593             MutableCompositeModification modification2 = new MutableCompositeModification();
594             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
595                     TestModel.OUTER_LIST_PATH,
596                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
597                     modification2);
598
599             String transactionID3 = "tx3";
600             MutableCompositeModification modification3 = new MutableCompositeModification();
601             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
602                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
603                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
604                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
605                     modification3);
606
607             long timeoutSec = 5;
608             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
609             final Timeout timeout = new Timeout(duration);
610
611             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
612             // by the ShardTransaction.
613
614             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
615                     cohort1, modification1, true), getRef());
616             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
617                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
618             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
619
620             // Send the CanCommitTransaction message for the first Tx.
621
622             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
623             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
624                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
625             assertEquals("Can commit", true, canCommitReply.getCanCommit());
626
627             // Send the ForwardedReadyTransaction for the next 2 Tx's.
628
629             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
630                     cohort2, modification2, true), getRef());
631             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
632
633             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
634                     cohort3, modification3, true), getRef());
635             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
636
637             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
638             // processed after the first Tx completes.
639
640             Future<Object> canCommitFuture1 = Patterns.ask(shard,
641                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
642
643             Future<Object> canCommitFuture2 = Patterns.ask(shard,
644                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
645
646             // Send the CommitTransaction message for the first Tx. After it completes, it should
647             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
648
649             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
650             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
651
652             // Wait for the next 2 Tx's to complete.
653
654             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
655             final CountDownLatch commitLatch = new CountDownLatch(2);
656
657             class OnFutureComplete extends OnComplete<Object> {
658                 private final Class<?> expRespType;
659
660                 OnFutureComplete(final Class<?> expRespType) {
661                     this.expRespType = expRespType;
662                 }
663
664                 @Override
665                 public void onComplete(final Throwable error, final Object resp) {
666                     if(error != null) {
667                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
668                     } else {
669                         try {
670                             assertEquals("Commit response type", expRespType, resp.getClass());
671                             onSuccess(resp);
672                         } catch (Exception e) {
673                             caughtEx.set(e);
674                         }
675                     }
676                 }
677
678                 void onSuccess(final Object resp) throws Exception {
679                 }
680             }
681
682             class OnCommitFutureComplete extends OnFutureComplete {
683                 OnCommitFutureComplete() {
684                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
685                 }
686
687                 @Override
688                 public void onComplete(final Throwable error, final Object resp) {
689                     super.onComplete(error, resp);
690                     commitLatch.countDown();
691                 }
692             }
693
694             class OnCanCommitFutureComplete extends OnFutureComplete {
695                 private final String transactionID;
696
697                 OnCanCommitFutureComplete(final String transactionID) {
698                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
699                     this.transactionID = transactionID;
700                 }
701
702                 @Override
703                 void onSuccess(final Object resp) throws Exception {
704                     CanCommitTransactionReply canCommitReply =
705                             CanCommitTransactionReply.fromSerializable(resp);
706                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
707
708                     Future<Object> commitFuture = Patterns.ask(shard,
709                             new CommitTransaction(transactionID).toSerializable(), timeout);
710                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
711                 }
712             }
713
714             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
715                     getSystem().dispatcher());
716
717             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
718                     getSystem().dispatcher());
719
720             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
721
722             if(caughtEx.get() != null) {
723                 throw caughtEx.get();
724             }
725
726             assertEquals("Commits complete", true, done);
727
728             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
729             inOrder.verify(cohort1).canCommit();
730             inOrder.verify(cohort1).preCommit();
731             inOrder.verify(cohort1).commit();
732             inOrder.verify(cohort2).canCommit();
733             inOrder.verify(cohort2).preCommit();
734             inOrder.verify(cohort2).commit();
735             inOrder.verify(cohort3).canCommit();
736             inOrder.verify(cohort3).preCommit();
737             inOrder.verify(cohort3).commit();
738
739             // Verify data in the data store.
740
741             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
742             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
743             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
744                     outerList.getValue() instanceof Iterable);
745             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
746             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
747                        entry instanceof MapEntryNode);
748             MapEntryNode mapEntry = (MapEntryNode)entry;
749             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
750                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
751             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
752             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
753
754             for(int i = 0; i < 20 * 5; i++) {
755                 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
756                 if(lastLogIndex == 2) {
757                     break;
758                 }
759                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
760             }
761
762             assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
763
764             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
765         }};
766     }
767
768     @Test
769     public void testCommitPhaseFailure() throws Throwable {
770         new ShardTestKit(getSystem()) {{
771             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
772                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
773                     "testCommitPhaseFailure");
774
775             waitUntilLeader(shard);
776
777             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
778             // commit phase.
779
780             String transactionID1 = "tx1";
781             MutableCompositeModification modification1 = new MutableCompositeModification();
782             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
783             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
784             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
785             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
786
787             String transactionID2 = "tx2";
788             MutableCompositeModification modification2 = new MutableCompositeModification();
789             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
790             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
791
792             FiniteDuration duration = duration("5 seconds");
793             final Timeout timeout = new Timeout(duration);
794
795             // Simulate the ForwardedReadyTransaction messages that would be sent
796             // by the ShardTransaction.
797
798             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
799                     cohort1, modification1, true), getRef());
800             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
801
802             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
803                     cohort2, modification2, true), getRef());
804             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
805
806             // Send the CanCommitTransaction message for the first Tx.
807
808             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
809             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
810                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
811             assertEquals("Can commit", true, canCommitReply.getCanCommit());
812
813             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
814             // processed after the first Tx completes.
815
816             Future<Object> canCommitFuture = Patterns.ask(shard,
817                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
818
819             // Send the CommitTransaction message for the first Tx. This should send back an error
820             // and trigger the 2nd Tx to proceed.
821
822             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
823             expectMsgClass(duration, akka.actor.Status.Failure.class);
824
825             // Wait for the 2nd Tx to complete the canCommit phase.
826
827             final CountDownLatch latch = new CountDownLatch(1);
828             canCommitFuture.onComplete(new OnComplete<Object>() {
829                 @Override
830                 public void onComplete(final Throwable t, final Object resp) {
831                     latch.countDown();
832                 }
833             }, getSystem().dispatcher());
834
835             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
836
837             InOrder inOrder = inOrder(cohort1, cohort2);
838             inOrder.verify(cohort1).canCommit();
839             inOrder.verify(cohort1).preCommit();
840             inOrder.verify(cohort1).commit();
841             inOrder.verify(cohort2).canCommit();
842
843             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
844         }};
845     }
846
847     @Test
848     public void testPreCommitPhaseFailure() throws Throwable {
849         new ShardTestKit(getSystem()) {{
850             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
851                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
852                     "testPreCommitPhaseFailure");
853
854             waitUntilLeader(shard);
855
856             String transactionID = "tx1";
857             MutableCompositeModification modification = new MutableCompositeModification();
858             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
859             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
860             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
861
862             FiniteDuration duration = duration("5 seconds");
863
864             // Simulate the ForwardedReadyTransaction messages that would be sent
865             // by the ShardTransaction.
866
867             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
868                     cohort, modification, true), getRef());
869             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
870
871             // Send the CanCommitTransaction message.
872
873             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
874             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
875                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
876             assertEquals("Can commit", true, canCommitReply.getCanCommit());
877
878             // Send the CommitTransaction message. This should send back an error
879             // for preCommit failure.
880
881             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
882             expectMsgClass(duration, akka.actor.Status.Failure.class);
883
884             InOrder inOrder = inOrder(cohort);
885             inOrder.verify(cohort).canCommit();
886             inOrder.verify(cohort).preCommit();
887
888             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
889         }};
890     }
891
892     @Test
893     public void testCanCommitPhaseFailure() throws Throwable {
894         new ShardTestKit(getSystem()) {{
895             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
896                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
897                     "testCanCommitPhaseFailure");
898
899             waitUntilLeader(shard);
900
901             final FiniteDuration duration = duration("5 seconds");
902
903             String transactionID = "tx1";
904             MutableCompositeModification modification = new MutableCompositeModification();
905             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
906             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
907
908             // Simulate the ForwardedReadyTransaction messages that would be sent
909             // by the ShardTransaction.
910
911             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
912                     cohort, modification, true), getRef());
913             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
914
915             // Send the CanCommitTransaction message.
916
917             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
918             expectMsgClass(duration, akka.actor.Status.Failure.class);
919
920             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
921         }};
922     }
923
924     @Test
925     public void testAbortBeforeFinishCommit() throws Throwable {
926         new ShardTestKit(getSystem()) {{
927             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
928                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
929                     "testAbortBeforeFinishCommit");
930
931             waitUntilLeader(shard);
932
933             final FiniteDuration duration = duration("5 seconds");
934             final Timeout timeout = new Timeout(duration);
935
936             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
937
938             final String transactionID = "tx1";
939             final CountDownLatch abortComplete = new CountDownLatch(1);
940             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
941                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
942                 @Override
943                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
944                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
945
946                     Future<Object> abortFuture = Patterns.ask(shard,
947                             new AbortTransaction(transactionID).toSerializable(), timeout);
948                     abortFuture.onComplete(new OnComplete<Object>() {
949                         @Override
950                         public void onComplete(final Throwable e, final Object resp) {
951                             abortComplete.countDown();
952                         }
953                     }, getSystem().dispatcher());
954
955                     return preCommitFuture;
956                 }
957             };
958
959             MutableCompositeModification modification = new MutableCompositeModification();
960             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
961                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
962                     modification, preCommit);
963
964             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
965                     cohort, modification, true), getRef());
966             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
967
968             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
969             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
970                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
971             assertEquals("Can commit", true, canCommitReply.getCanCommit());
972
973             Future<Object> commitFuture = Patterns.ask(shard,
974                     new CommitTransaction(transactionID).toSerializable(), timeout);
975
976             assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
977
978             Await.result(commitFuture, duration);
979
980             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
981             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
982
983             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
984         }};
985     }
986
987     @Test
988     public void testTransactionCommitTimeout() throws Throwable {
989         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
990
991         new ShardTestKit(getSystem()) {{
992             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
993                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
994                     "testTransactionCommitTimeout");
995
996             waitUntilLeader(shard);
997
998             final FiniteDuration duration = duration("5 seconds");
999
1000             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1001
1002             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1003             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1004                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1005
1006             // Create 1st Tx - will timeout
1007
1008             String transactionID1 = "tx1";
1009             MutableCompositeModification modification1 = new MutableCompositeModification();
1010             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1011                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1012                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1013                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1014                     modification1);
1015
1016             // Create 2nd Tx
1017
1018             String transactionID2 = "tx3";
1019             MutableCompositeModification modification2 = new MutableCompositeModification();
1020             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1021                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1022             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1023                     listNodePath,
1024                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1025                     modification2);
1026
1027             // Ready the Tx's
1028
1029             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1030                     cohort1, modification1, true), getRef());
1031             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1032
1033             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1034                     cohort2, modification2, true), getRef());
1035             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1036
1037             // canCommit 1st Tx. We don't send the commit so it should timeout.
1038
1039             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1040             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1041
1042             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1043
1044             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1045             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1046
1047             // Commit the 2nd Tx.
1048
1049             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1050             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1051
1052             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1053             assertNotNull(listNodePath + " not found", node);
1054
1055             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1056         }};
1057     }
1058
1059     @Test
1060     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1061         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1062
1063         new ShardTestKit(getSystem()) {{
1064             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1065                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1066                     "testTransactionCommitQueueCapacityExceeded");
1067
1068             waitUntilLeader(shard);
1069
1070             final FiniteDuration duration = duration("5 seconds");
1071
1072             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1073
1074             String transactionID1 = "tx1";
1075             MutableCompositeModification modification1 = new MutableCompositeModification();
1076             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1077                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1078
1079             String transactionID2 = "tx2";
1080             MutableCompositeModification modification2 = new MutableCompositeModification();
1081             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1082                     TestModel.OUTER_LIST_PATH,
1083                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1084                     modification2);
1085
1086             String transactionID3 = "tx3";
1087             MutableCompositeModification modification3 = new MutableCompositeModification();
1088             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1089                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1090
1091             // Ready the Tx's
1092
1093             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1094                     cohort1, modification1, true), getRef());
1095             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1096
1097             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1098                     cohort2, modification2, true), getRef());
1099             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1100
1101             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1102                     cohort3, modification3, true), getRef());
1103             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1104
1105             // canCommit 1st Tx.
1106
1107             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1108             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1109
1110             // canCommit the 2nd Tx - it should get queued.
1111
1112             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1113
1114             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1115
1116             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1117             expectMsgClass(duration, akka.actor.Status.Failure.class);
1118
1119             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1120         }};
1121     }
1122
1123     @Test
1124     public void testCanCommitBeforeReadyFailure() throws Throwable {
1125         new ShardTestKit(getSystem()) {{
1126             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1127                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1128                     "testCanCommitBeforeReadyFailure");
1129
1130             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1131             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1132
1133             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1134         }};
1135     }
1136
1137     @Test
1138     public void testAbortTransaction() throws Throwable {
1139         new ShardTestKit(getSystem()) {{
1140             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1141                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1142                     "testAbortTransaction");
1143
1144             waitUntilLeader(shard);
1145
1146             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1147
1148             String transactionID1 = "tx1";
1149             MutableCompositeModification modification1 = new MutableCompositeModification();
1150             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1151             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1152             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1153
1154             String transactionID2 = "tx2";
1155             MutableCompositeModification modification2 = new MutableCompositeModification();
1156             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1157             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1158
1159             FiniteDuration duration = duration("5 seconds");
1160             final Timeout timeout = new Timeout(duration);
1161
1162             // Simulate the ForwardedReadyTransaction messages that would be sent
1163             // by the ShardTransaction.
1164
1165             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1166                     cohort1, modification1, true), getRef());
1167             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1168
1169             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1170                     cohort2, modification2, true), getRef());
1171             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1172
1173             // Send the CanCommitTransaction message for the first Tx.
1174
1175             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1176             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1177                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1178             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1179
1180             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1181             // processed after the first Tx completes.
1182
1183             Future<Object> canCommitFuture = Patterns.ask(shard,
1184                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1185
1186             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1187             // Tx to proceed.
1188
1189             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1190             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1191
1192             // Wait for the 2nd Tx to complete the canCommit phase.
1193
1194             final CountDownLatch latch = new CountDownLatch(1);
1195             canCommitFuture.onComplete(new OnComplete<Object>() {
1196                 @Override
1197                 public void onComplete(final Throwable t, final Object resp) {
1198                     latch.countDown();
1199                 }
1200             }, getSystem().dispatcher());
1201
1202             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1203
1204             InOrder inOrder = inOrder(cohort1, cohort2);
1205             inOrder.verify(cohort1).canCommit();
1206             inOrder.verify(cohort2).canCommit();
1207
1208             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1209         }};
1210     }
1211
1212     @Test
1213     public void testCreateSnapshot() throws IOException, InterruptedException {
1214             testCreateSnapshot(true, "testCreateSnapshot");
1215     }
1216
1217     @Test
1218     public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1219         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1220     }
1221
1222     @SuppressWarnings("serial")
1223     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1224         final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1225                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1226
1227         new ShardTestKit(getSystem()) {{
1228             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1229             Creator<Shard> creator = new Creator<Shard>() {
1230                 @Override
1231                 public Shard create() throws Exception {
1232                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1233                             dataStoreContext, SCHEMA_CONTEXT) {
1234                         @Override
1235                         protected void commitSnapshot(final long sequenceNumber) {
1236                             super.commitSnapshot(sequenceNumber);
1237                             latch.get().countDown();
1238                         }
1239                     };
1240                 }
1241             };
1242
1243             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1244                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1245
1246             waitUntilLeader(shard);
1247
1248             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1249
1250             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1251
1252             latch.set(new CountDownLatch(1));
1253             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1254
1255             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1256
1257             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1258         }};
1259     }
1260
1261     /**
1262      * This test simply verifies that the applySnapShot logic will work
1263      * @throws ReadFailedException
1264      */
1265     @Test
1266     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1267         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
1268             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
1269
1270         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1271
1272         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1273         putTransaction.write(TestModel.TEST_PATH,
1274             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1275         commitTransaction(putTransaction);
1276
1277
1278         NormalizedNode<?, ?> expected = readStore(store);
1279
1280         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1281
1282         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1283         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1284
1285         commitTransaction(writeTransaction);
1286
1287         NormalizedNode<?, ?> actual = readStore(store);
1288
1289         assertEquals(expected, actual);
1290
1291     }
1292
1293     @Test
1294     public void testRecoveryApplicable(){
1295
1296         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1297                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1298
1299         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1300                 persistentContext, SCHEMA_CONTEXT);
1301
1302         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1303                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1304
1305         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1306                 nonPersistentContext, SCHEMA_CONTEXT);
1307
1308         new ShardTestKit(getSystem()) {{
1309             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1310                     persistentProps, "testPersistence1");
1311
1312             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1313
1314             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1315
1316             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1317                     nonPersistentProps, "testPersistence2");
1318
1319             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1320
1321             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1322
1323         }};
1324
1325     }
1326
1327
1328     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1329         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1330         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1331             transaction.read(YangInstanceIdentifier.builder().build());
1332
1333         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1334
1335         NormalizedNode<?, ?> normalizedNode = optional.get();
1336
1337         transaction.close();
1338
1339         return normalizedNode;
1340     }
1341
1342     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1343         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1344         ListenableFuture<Void> future =
1345             commitCohort.preCommit();
1346         try {
1347             future.get();
1348             future = commitCohort.commit();
1349             future.get();
1350         } catch (InterruptedException | ExecutionException e) {
1351         }
1352     }
1353
1354     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1355         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1356             @Override
1357             public void onDataChanged(
1358                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1359
1360             }
1361         };
1362     }
1363
1364     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1365             throws ExecutionException, InterruptedException {
1366         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1367
1368         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1369             transaction.read(id);
1370
1371         Optional<NormalizedNode<?, ?>> optional = future.get();
1372         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1373
1374         transaction.close();
1375
1376         return node;
1377     }
1378
1379     private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
1380         throws ExecutionException, InterruptedException {
1381         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1382
1383         transaction.write(id, node);
1384
1385         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1386         commitCohort.preCommit().get();
1387         commitCohort.commit().get();
1388     }
1389
1390     @SuppressWarnings("serial")
1391     private static final class DelegatingShardCreator implements Creator<Shard> {
1392         private final Creator<Shard> delegate;
1393
1394         DelegatingShardCreator(final Creator<Shard> delegate) {
1395             this.delegate = delegate;
1396         }
1397
1398         @Override
1399         public Shard create() throws Exception {
1400             return delegate.create();
1401         }
1402     }
1403 }