BUG-650: remove executor abstraction
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.mockito.InOrder;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
59 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
60 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
61 import org.opendaylight.controller.cluster.datastore.modification.Modification;
62 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
63 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
64 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
65 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
66 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
67 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
68 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
70 import org.opendaylight.controller.cluster.raft.Snapshot;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
74 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
75 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
76 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
78 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
79 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
80 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
81 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
82 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
83 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
84 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
85 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
86 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
87 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
88 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
89 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
90 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
91 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
92 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
93 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
94 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
95 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
96 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
97 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
98 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
99 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
100 import scala.concurrent.Await;
101 import scala.concurrent.Future;
102 import scala.concurrent.duration.FiniteDuration;
103
104 public class ShardTest extends AbstractActorTest {
105
106     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
107
108     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
109
110     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
111             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
112
113     private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
114             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
115             shardHeartbeatIntervalInMillis(100).build();
116
117     @Before
118     public void setUp() {
119         InMemorySnapshotStore.clear();
120         InMemoryJournal.clear();
121     }
122
123     @After
124     public void tearDown() {
125         InMemorySnapshotStore.clear();
126         InMemoryJournal.clear();
127     }
128
129     private Props newShardProps() {
130         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
131                 dataStoreContext, SCHEMA_CONTEXT);
132     }
133
134     @Test
135     public void testRegisterChangeListener() throws Exception {
136         new ShardTestKit(getSystem()) {{
137             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
138                     newShardProps(),  "testRegisterChangeListener");
139
140             waitUntilLeader(shard);
141
142             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
143
144             MockDataChangeListener listener = new MockDataChangeListener(1);
145             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
146                     "testRegisterChangeListener-DataChangeListener");
147
148             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
149                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
150
151             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
152                     RegisterChangeListenerReply.class);
153             String replyPath = reply.getListenerRegistrationPath().toString();
154             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
155                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
156
157             YangInstanceIdentifier path = TestModel.TEST_PATH;
158             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
159
160             listener.waitForChangeEvents(path);
161
162             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
163             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
164         }};
165     }
166
167     @SuppressWarnings("serial")
168     @Test
169     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
170         // This test tests the timing window in which a change listener is registered before the
171         // shard becomes the leader. We verify that the listener is registered and notified of the
172         // existing data when the shard becomes the leader.
173         new ShardTestKit(getSystem()) {{
174             // For this test, we want to send the RegisterChangeListener message after the shard
175             // has recovered from persistence and before it becomes the leader. So we subclass
176             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
177             // we know that the shard has been initialized to a follower and has started the
178             // election process. The following 2 CountDownLatches are used to coordinate the
179             // ElectionTimeout with the sending of the RegisterChangeListener message.
180             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
181             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
182             Creator<Shard> creator = new Creator<Shard>() {
183                 boolean firstElectionTimeout = true;
184
185                 @Override
186                 public Shard create() throws Exception {
187                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
188                             dataStoreContext, SCHEMA_CONTEXT) {
189                         @Override
190                         public void onReceiveCommand(final Object message) throws Exception {
191                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
192                                 // Got the first ElectionTimeout. We don't forward it to the
193                                 // base Shard yet until we've sent the RegisterChangeListener
194                                 // message. So we signal the onFirstElectionTimeout latch to tell
195                                 // the main thread to send the RegisterChangeListener message and
196                                 // start a thread to wait on the onChangeListenerRegistered latch,
197                                 // which the main thread signals after it has sent the message.
198                                 // After the onChangeListenerRegistered is triggered, we send the
199                                 // original ElectionTimeout message to proceed with the election.
200                                 firstElectionTimeout = false;
201                                 final ActorRef self = getSelf();
202                                 new Thread() {
203                                     @Override
204                                     public void run() {
205                                         Uninterruptibles.awaitUninterruptibly(
206                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
207                                         self.tell(message, self);
208                                     }
209                                 }.start();
210
211                                 onFirstElectionTimeout.countDown();
212                             } else {
213                                 super.onReceiveCommand(message);
214                             }
215                         }
216                     };
217                 }
218             };
219
220             MockDataChangeListener listener = new MockDataChangeListener(1);
221             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
222                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
223
224             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
225                     Props.create(new DelegatingShardCreator(creator)),
226                     "testRegisterChangeListenerWhenNotLeaderInitially");
227
228             // Write initial data into the in-memory store.
229             YangInstanceIdentifier path = TestModel.TEST_PATH;
230             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
231
232             // Wait until the shard receives the first ElectionTimeout message.
233             assertEquals("Got first ElectionTimeout", true,
234                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
235
236             // Now send the RegisterChangeListener and wait for the reply.
237             shard.tell(new RegisterChangeListener(path, dclActor.path(),
238                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
239
240             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
241                     RegisterChangeListenerReply.class);
242             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
243
244             // Sanity check - verify the shard is not the leader yet.
245             shard.tell(new FindLeader(), getRef());
246             FindLeaderReply findLeadeReply =
247                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
248             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
249
250             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
251             // with the election process.
252             onChangeListenerRegistered.countDown();
253
254             // Wait for the shard to become the leader and notify our listener with the existing
255             // data in the store.
256             listener.waitForChangeEvents(path);
257
258             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
259             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
260         }};
261     }
262
263     @Test
264     public void testCreateTransaction(){
265         new ShardTestKit(getSystem()) {{
266             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
267
268             waitUntilLeader(shard);
269
270             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
271
272             shard.tell(new CreateTransaction("txn-1",
273                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
274
275             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
276                     CreateTransactionReply.class);
277
278             String path = reply.getTransactionActorPath().toString();
279             assertTrue("Unexpected transaction path " + path,
280                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
281
282             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
283         }};
284     }
285
286     @Test
287     public void testCreateTransactionOnChain(){
288         new ShardTestKit(getSystem()) {{
289             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
290
291             waitUntilLeader(shard);
292
293             shard.tell(new CreateTransaction("txn-1",
294                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
295                     getRef());
296
297             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
298                     CreateTransactionReply.class);
299
300             String path = reply.getTransactionActorPath().toString();
301             assertTrue("Unexpected transaction path " + path,
302                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
303
304             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
305         }};
306     }
307
308     @SuppressWarnings("serial")
309     @Test
310     public void testPeerAddressResolved() throws Exception {
311         new ShardTestKit(getSystem()) {{
312             final CountDownLatch recoveryComplete = new CountDownLatch(1);
313             class TestShard extends Shard {
314                 TestShard() {
315                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
316                             dataStoreContext, SCHEMA_CONTEXT);
317                 }
318
319                 Map<String, String> getPeerAddresses() {
320                     return getRaftActorContext().getPeerAddresses();
321                 }
322
323                 @Override
324                 protected void onRecoveryComplete() {
325                     try {
326                         super.onRecoveryComplete();
327                     } finally {
328                         recoveryComplete.countDown();
329                     }
330                 }
331             }
332
333             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
334                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
335                         @Override
336                         public TestShard create() throws Exception {
337                             return new TestShard();
338                         }
339                     })), "testPeerAddressResolved");
340
341             //waitUntilLeader(shard);
342             assertEquals("Recovery complete", true,
343                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
344
345             String address = "akka://foobar";
346             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
347
348             assertEquals("getPeerAddresses", address,
349                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
350
351             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
352         }};
353     }
354
355     @Test
356     public void testApplySnapshot() throws Exception {
357         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
358                 "testApplySnapshot");
359
360         NormalizedNodeToNodeCodec codec =
361             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
362
363         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
364
365         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
366         NormalizedNode<?,?> expected = readStore(shard, root);
367
368         NormalizedNodeMessages.Container encode = codec.encode(expected);
369
370         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
371                 encode.getNormalizedNode().toByteString().toByteArray(),
372                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
373
374         shard.underlyingActor().onReceiveCommand(applySnapshot);
375
376         NormalizedNode<?,?> actual = readStore(shard, root);
377
378         assertEquals(expected, actual);
379
380         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
381     }
382
383     @Test
384     public void testApplyState() throws Exception {
385
386         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
387
388         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
389
390         MutableCompositeModification compMod = new MutableCompositeModification();
391         compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
392         Payload payload = new CompositeModificationPayload(compMod.toSerializable());
393         ApplyState applyState = new ApplyState(null, "test",
394                 new ReplicatedLogImplEntry(1, 2, payload));
395
396         shard.underlyingActor().onReceiveCommand(applyState);
397
398         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
399         assertEquals("Applied state", node, actual);
400
401         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
402     }
403
404     @SuppressWarnings("serial")
405     @Test
406     public void testRecovery() throws Exception {
407
408         // Set up the InMemorySnapshotStore.
409
410         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
411         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
412
413         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
414         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
415         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
416         commitCohort.preCommit().get();
417         commitCohort.commit().get();
418
419         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
420         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
421
422         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
423                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
424                         root).
425                                 getNormalizedNode().toByteString().toByteArray(),
426                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
427
428         // Set up the InMemoryJournal.
429
430         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
431                   new WriteModification(TestModel.OUTER_LIST_PATH,
432                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
433                           SCHEMA_CONTEXT))));
434
435         int nListEntries = 11;
436         Set<Integer> listEntryKeys = new HashSet<>();
437         for(int i = 1; i <= nListEntries; i++) {
438             listEntryKeys.add(Integer.valueOf(i));
439             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
440                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
441             Modification mod = new MergeModification(path,
442                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
443                     SCHEMA_CONTEXT);
444             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
445                     newPayload(mod)));
446         }
447
448         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
449                 new ApplyLogEntries(nListEntries));
450
451         // Create the actor and wait for recovery complete.
452
453         final CountDownLatch recoveryComplete = new CountDownLatch(1);
454
455         Creator<Shard> creator = new Creator<Shard>() {
456             @Override
457             public Shard create() throws Exception {
458                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
459                         dataStoreContext, SCHEMA_CONTEXT) {
460                     @Override
461                     protected void onRecoveryComplete() {
462                         try {
463                             super.onRecoveryComplete();
464                         } finally {
465                             recoveryComplete.countDown();
466                         }
467                     }
468                 };
469             }
470         };
471
472         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
473                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
474
475         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
476
477         // Verify data in the data store.
478
479         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
480         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
481         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
482                 outerList.getValue() instanceof Iterable);
483         for(Object entry: (Iterable<?>) outerList.getValue()) {
484             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
485                     entry instanceof MapEntryNode);
486             MapEntryNode mapEntry = (MapEntryNode)entry;
487             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
488                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
489             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
490             Object value = idLeaf.get().getValue();
491             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
492                     listEntryKeys.remove(value));
493         }
494
495         if(!listEntryKeys.isEmpty()) {
496             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
497                     listEntryKeys);
498         }
499
500         assertEquals("Last log index", nListEntries,
501                 shard.underlyingActor().getShardMBean().getLastLogIndex());
502         assertEquals("Commit index", nListEntries,
503                 shard.underlyingActor().getShardMBean().getCommitIndex());
504         assertEquals("Last applied", nListEntries,
505                 shard.underlyingActor().getShardMBean().getLastApplied());
506
507         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
508     }
509
510     private CompositeModificationPayload newPayload(final Modification... mods) {
511         MutableCompositeModification compMod = new MutableCompositeModification();
512         for(Modification mod: mods) {
513             compMod.addModification(mod);
514         }
515
516         return new CompositeModificationPayload(compMod.toSerializable());
517     }
518
519     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
520             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
521             final MutableCompositeModification modification) {
522         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
523     }
524
525     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
526             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
527             final MutableCompositeModification modification,
528             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
529
530         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
531         tx.write(path, data);
532         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
533         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
534
535         doAnswer(new Answer<ListenableFuture<Boolean>>() {
536             @Override
537             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
538                 return realCohort.canCommit();
539             }
540         }).when(cohort).canCommit();
541
542         doAnswer(new Answer<ListenableFuture<Void>>() {
543             @Override
544             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
545                 if(preCommit != null) {
546                     return preCommit.apply(realCohort);
547                 } else {
548                     return realCohort.preCommit();
549                 }
550             }
551         }).when(cohort).preCommit();
552
553         doAnswer(new Answer<ListenableFuture<Void>>() {
554             @Override
555             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
556                 return realCohort.commit();
557             }
558         }).when(cohort).commit();
559
560         doAnswer(new Answer<ListenableFuture<Void>>() {
561             @Override
562             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
563                 return realCohort.abort();
564             }
565         }).when(cohort).abort();
566
567         modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
568
569         return cohort;
570     }
571
572     @SuppressWarnings({ "unchecked" })
573     @Test
574     public void testConcurrentThreePhaseCommits() throws Throwable {
575         new ShardTestKit(getSystem()) {{
576             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
577                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
578                     "testConcurrentThreePhaseCommits");
579
580             waitUntilLeader(shard);
581
582             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
583
584             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
585
586             String transactionID1 = "tx1";
587             MutableCompositeModification modification1 = new MutableCompositeModification();
588             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
589                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
590
591             String transactionID2 = "tx2";
592             MutableCompositeModification modification2 = new MutableCompositeModification();
593             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
594                     TestModel.OUTER_LIST_PATH,
595                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
596                     modification2);
597
598             String transactionID3 = "tx3";
599             MutableCompositeModification modification3 = new MutableCompositeModification();
600             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
601                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
602                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
603                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
604                     modification3);
605
606             long timeoutSec = 5;
607             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
608             final Timeout timeout = new Timeout(duration);
609
610             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
611             // by the ShardTransaction.
612
613             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
614                     cohort1, modification1, true), getRef());
615             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
616                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
617             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
618
619             // Send the CanCommitTransaction message for the first Tx.
620
621             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
622             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
623                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
624             assertEquals("Can commit", true, canCommitReply.getCanCommit());
625
626             // Send the ForwardedReadyTransaction for the next 2 Tx's.
627
628             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
629                     cohort2, modification2, true), getRef());
630             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
631
632             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
633                     cohort3, modification3, true), getRef());
634             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
635
636             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
637             // processed after the first Tx completes.
638
639             Future<Object> canCommitFuture1 = Patterns.ask(shard,
640                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
641
642             Future<Object> canCommitFuture2 = Patterns.ask(shard,
643                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
644
645             // Send the CommitTransaction message for the first Tx. After it completes, it should
646             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
647
648             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
649             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
650
651             // Wait for the next 2 Tx's to complete.
652
653             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
654             final CountDownLatch commitLatch = new CountDownLatch(2);
655
656             class OnFutureComplete extends OnComplete<Object> {
657                 private final Class<?> expRespType;
658
659                 OnFutureComplete(final Class<?> expRespType) {
660                     this.expRespType = expRespType;
661                 }
662
663                 @Override
664                 public void onComplete(final Throwable error, final Object resp) {
665                     if(error != null) {
666                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
667                     } else {
668                         try {
669                             assertEquals("Commit response type", expRespType, resp.getClass());
670                             onSuccess(resp);
671                         } catch (Exception e) {
672                             caughtEx.set(e);
673                         }
674                     }
675                 }
676
677                 void onSuccess(final Object resp) throws Exception {
678                 }
679             }
680
681             class OnCommitFutureComplete extends OnFutureComplete {
682                 OnCommitFutureComplete() {
683                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
684                 }
685
686                 @Override
687                 public void onComplete(final Throwable error, final Object resp) {
688                     super.onComplete(error, resp);
689                     commitLatch.countDown();
690                 }
691             }
692
693             class OnCanCommitFutureComplete extends OnFutureComplete {
694                 private final String transactionID;
695
696                 OnCanCommitFutureComplete(final String transactionID) {
697                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
698                     this.transactionID = transactionID;
699                 }
700
701                 @Override
702                 void onSuccess(final Object resp) throws Exception {
703                     CanCommitTransactionReply canCommitReply =
704                             CanCommitTransactionReply.fromSerializable(resp);
705                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
706
707                     Future<Object> commitFuture = Patterns.ask(shard,
708                             new CommitTransaction(transactionID).toSerializable(), timeout);
709                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
710                 }
711             }
712
713             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
714                     getSystem().dispatcher());
715
716             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
717                     getSystem().dispatcher());
718
719             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
720
721             if(caughtEx.get() != null) {
722                 throw caughtEx.get();
723             }
724
725             assertEquals("Commits complete", true, done);
726
727             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
728             inOrder.verify(cohort1).canCommit();
729             inOrder.verify(cohort1).preCommit();
730             inOrder.verify(cohort1).commit();
731             inOrder.verify(cohort2).canCommit();
732             inOrder.verify(cohort2).preCommit();
733             inOrder.verify(cohort2).commit();
734             inOrder.verify(cohort3).canCommit();
735             inOrder.verify(cohort3).preCommit();
736             inOrder.verify(cohort3).commit();
737
738             // Verify data in the data store.
739
740             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
741             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
742             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
743                     outerList.getValue() instanceof Iterable);
744             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
745             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
746                        entry instanceof MapEntryNode);
747             MapEntryNode mapEntry = (MapEntryNode)entry;
748             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
749                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
750             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
751             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
752
753             for(int i = 0; i < 20 * 5; i++) {
754                 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
755                 if(lastLogIndex == 2) {
756                     break;
757                 }
758                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
759             }
760
761             assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
762
763             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
764         }};
765     }
766
767     @Test
768     public void testCommitPhaseFailure() throws Throwable {
769         new ShardTestKit(getSystem()) {{
770             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
771                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
772                     "testCommitPhaseFailure");
773
774             waitUntilLeader(shard);
775
776             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
777             // commit phase.
778
779             String transactionID1 = "tx1";
780             MutableCompositeModification modification1 = new MutableCompositeModification();
781             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
782             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
783             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
784             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
785
786             String transactionID2 = "tx2";
787             MutableCompositeModification modification2 = new MutableCompositeModification();
788             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
789             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
790
791             FiniteDuration duration = duration("5 seconds");
792             final Timeout timeout = new Timeout(duration);
793
794             // Simulate the ForwardedReadyTransaction messages that would be sent
795             // by the ShardTransaction.
796
797             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
798                     cohort1, modification1, true), getRef());
799             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
800
801             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
802                     cohort2, modification2, true), getRef());
803             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
804
805             // Send the CanCommitTransaction message for the first Tx.
806
807             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
808             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
809                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
810             assertEquals("Can commit", true, canCommitReply.getCanCommit());
811
812             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
813             // processed after the first Tx completes.
814
815             Future<Object> canCommitFuture = Patterns.ask(shard,
816                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
817
818             // Send the CommitTransaction message for the first Tx. This should send back an error
819             // and trigger the 2nd Tx to proceed.
820
821             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
822             expectMsgClass(duration, akka.actor.Status.Failure.class);
823
824             // Wait for the 2nd Tx to complete the canCommit phase.
825
826             final CountDownLatch latch = new CountDownLatch(1);
827             canCommitFuture.onComplete(new OnComplete<Object>() {
828                 @Override
829                 public void onComplete(final Throwable t, final Object resp) {
830                     latch.countDown();
831                 }
832             }, getSystem().dispatcher());
833
834             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
835
836             InOrder inOrder = inOrder(cohort1, cohort2);
837             inOrder.verify(cohort1).canCommit();
838             inOrder.verify(cohort1).preCommit();
839             inOrder.verify(cohort1).commit();
840             inOrder.verify(cohort2).canCommit();
841
842             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
843         }};
844     }
845
846     @Test
847     public void testPreCommitPhaseFailure() throws Throwable {
848         new ShardTestKit(getSystem()) {{
849             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
850                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
851                     "testPreCommitPhaseFailure");
852
853             waitUntilLeader(shard);
854
855             String transactionID = "tx1";
856             MutableCompositeModification modification = new MutableCompositeModification();
857             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
858             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
859             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
860
861             FiniteDuration duration = duration("5 seconds");
862
863             // Simulate the ForwardedReadyTransaction messages that would be sent
864             // by the ShardTransaction.
865
866             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
867                     cohort, modification, true), getRef());
868             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
869
870             // Send the CanCommitTransaction message.
871
872             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
873             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
874                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
875             assertEquals("Can commit", true, canCommitReply.getCanCommit());
876
877             // Send the CommitTransaction message. This should send back an error
878             // for preCommit failure.
879
880             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
881             expectMsgClass(duration, akka.actor.Status.Failure.class);
882
883             InOrder inOrder = inOrder(cohort);
884             inOrder.verify(cohort).canCommit();
885             inOrder.verify(cohort).preCommit();
886
887             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
888         }};
889     }
890
891     @Test
892     public void testCanCommitPhaseFailure() throws Throwable {
893         new ShardTestKit(getSystem()) {{
894             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
895                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
896                     "testCanCommitPhaseFailure");
897
898             waitUntilLeader(shard);
899
900             final FiniteDuration duration = duration("5 seconds");
901
902             String transactionID = "tx1";
903             MutableCompositeModification modification = new MutableCompositeModification();
904             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
905             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
906
907             // Simulate the ForwardedReadyTransaction messages that would be sent
908             // by the ShardTransaction.
909
910             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
911                     cohort, modification, true), getRef());
912             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
913
914             // Send the CanCommitTransaction message.
915
916             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
917             expectMsgClass(duration, akka.actor.Status.Failure.class);
918
919             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
920         }};
921     }
922
923     @Test
924     public void testAbortBeforeFinishCommit() throws Throwable {
925         new ShardTestKit(getSystem()) {{
926             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
927                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
928                     "testAbortBeforeFinishCommit");
929
930             waitUntilLeader(shard);
931
932             final FiniteDuration duration = duration("5 seconds");
933             final Timeout timeout = new Timeout(duration);
934
935             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
936
937             final String transactionID = "tx1";
938             final CountDownLatch abortComplete = new CountDownLatch(1);
939             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
940                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
941                 @Override
942                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
943                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
944
945                     Future<Object> abortFuture = Patterns.ask(shard,
946                             new AbortTransaction(transactionID).toSerializable(), timeout);
947                     abortFuture.onComplete(new OnComplete<Object>() {
948                         @Override
949                         public void onComplete(final Throwable e, final Object resp) {
950                             abortComplete.countDown();
951                         }
952                     }, getSystem().dispatcher());
953
954                     return preCommitFuture;
955                 }
956             };
957
958             MutableCompositeModification modification = new MutableCompositeModification();
959             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
960                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
961                     modification, preCommit);
962
963             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
964                     cohort, modification, true), getRef());
965             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
966
967             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
968             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
969                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
970             assertEquals("Can commit", true, canCommitReply.getCanCommit());
971
972             Future<Object> commitFuture = Patterns.ask(shard,
973                     new CommitTransaction(transactionID).toSerializable(), timeout);
974
975             assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
976
977             Await.result(commitFuture, duration);
978
979             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
980             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
981
982             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
983         }};
984     }
985
986     @Test
987     public void testTransactionCommitTimeout() throws Throwable {
988         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
989
990         new ShardTestKit(getSystem()) {{
991             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
992                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
993                     "testTransactionCommitTimeout");
994
995             waitUntilLeader(shard);
996
997             final FiniteDuration duration = duration("5 seconds");
998
999             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1000
1001             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1002             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1003                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1004
1005             // Create 1st Tx - will timeout
1006
1007             String transactionID1 = "tx1";
1008             MutableCompositeModification modification1 = new MutableCompositeModification();
1009             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1010                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1011                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1012                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1013                     modification1);
1014
1015             // Create 2nd Tx
1016
1017             String transactionID2 = "tx3";
1018             MutableCompositeModification modification2 = new MutableCompositeModification();
1019             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1020                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1021             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1022                     listNodePath,
1023                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1024                     modification2);
1025
1026             // Ready the Tx's
1027
1028             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1029                     cohort1, modification1, true), getRef());
1030             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1031
1032             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1033                     cohort2, modification2, true), getRef());
1034             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1035
1036             // canCommit 1st Tx. We don't send the commit so it should timeout.
1037
1038             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1039             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1040
1041             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1042
1043             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1044             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1045
1046             // Commit the 2nd Tx.
1047
1048             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1049             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1050
1051             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1052             assertNotNull(listNodePath + " not found", node);
1053
1054             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1055         }};
1056     }
1057
1058     @Test
1059     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1060         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1061
1062         new ShardTestKit(getSystem()) {{
1063             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1064                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1065                     "testTransactionCommitQueueCapacityExceeded");
1066
1067             waitUntilLeader(shard);
1068
1069             final FiniteDuration duration = duration("5 seconds");
1070
1071             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1072
1073             String transactionID1 = "tx1";
1074             MutableCompositeModification modification1 = new MutableCompositeModification();
1075             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1076                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1077
1078             String transactionID2 = "tx2";
1079             MutableCompositeModification modification2 = new MutableCompositeModification();
1080             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1081                     TestModel.OUTER_LIST_PATH,
1082                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1083                     modification2);
1084
1085             String transactionID3 = "tx3";
1086             MutableCompositeModification modification3 = new MutableCompositeModification();
1087             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1088                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1089
1090             // Ready the Tx's
1091
1092             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1093                     cohort1, modification1, true), getRef());
1094             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1095
1096             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1097                     cohort2, modification2, true), getRef());
1098             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1099
1100             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1101                     cohort3, modification3, true), getRef());
1102             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1103
1104             // canCommit 1st Tx.
1105
1106             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1107             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1108
1109             // canCommit the 2nd Tx - it should get queued.
1110
1111             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1112
1113             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1114
1115             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1116             expectMsgClass(duration, akka.actor.Status.Failure.class);
1117
1118             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1119         }};
1120     }
1121
1122     @Test
1123     public void testCanCommitBeforeReadyFailure() throws Throwable {
1124         new ShardTestKit(getSystem()) {{
1125             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1126                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1127                     "testCanCommitBeforeReadyFailure");
1128
1129             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1130             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1131
1132             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1133         }};
1134     }
1135
1136     @Test
1137     public void testAbortTransaction() throws Throwable {
1138         new ShardTestKit(getSystem()) {{
1139             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1140                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1141                     "testAbortTransaction");
1142
1143             waitUntilLeader(shard);
1144
1145             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1146
1147             String transactionID1 = "tx1";
1148             MutableCompositeModification modification1 = new MutableCompositeModification();
1149             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1150             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1151             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1152
1153             String transactionID2 = "tx2";
1154             MutableCompositeModification modification2 = new MutableCompositeModification();
1155             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1156             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1157
1158             FiniteDuration duration = duration("5 seconds");
1159             final Timeout timeout = new Timeout(duration);
1160
1161             // Simulate the ForwardedReadyTransaction messages that would be sent
1162             // by the ShardTransaction.
1163
1164             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1165                     cohort1, modification1, true), getRef());
1166             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1167
1168             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1169                     cohort2, modification2, true), getRef());
1170             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1171
1172             // Send the CanCommitTransaction message for the first Tx.
1173
1174             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1175             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1176                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1177             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1178
1179             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1180             // processed after the first Tx completes.
1181
1182             Future<Object> canCommitFuture = Patterns.ask(shard,
1183                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1184
1185             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1186             // Tx to proceed.
1187
1188             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1189             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1190
1191             // Wait for the 2nd Tx to complete the canCommit phase.
1192
1193             final CountDownLatch latch = new CountDownLatch(1);
1194             canCommitFuture.onComplete(new OnComplete<Object>() {
1195                 @Override
1196                 public void onComplete(final Throwable t, final Object resp) {
1197                     latch.countDown();
1198                 }
1199             }, getSystem().dispatcher());
1200
1201             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1202
1203             InOrder inOrder = inOrder(cohort1, cohort2);
1204             inOrder.verify(cohort1).canCommit();
1205             inOrder.verify(cohort2).canCommit();
1206
1207             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1208         }};
1209     }
1210
1211     @Test
1212     public void testCreateSnapshot() throws IOException, InterruptedException {
1213             testCreateSnapshot(true, "testCreateSnapshot");
1214     }
1215
1216     @Test
1217     public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1218         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1219     }
1220
1221     @SuppressWarnings("serial")
1222     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1223         final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1224                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1225
1226         new ShardTestKit(getSystem()) {{
1227             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1228             Creator<Shard> creator = new Creator<Shard>() {
1229                 @Override
1230                 public Shard create() throws Exception {
1231                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1232                             dataStoreContext, SCHEMA_CONTEXT) {
1233                         @Override
1234                         protected void commitSnapshot(final long sequenceNumber) {
1235                             super.commitSnapshot(sequenceNumber);
1236                             latch.get().countDown();
1237                         }
1238                     };
1239                 }
1240             };
1241
1242             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1243                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1244
1245             waitUntilLeader(shard);
1246
1247             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1248
1249             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1250
1251             latch.set(new CountDownLatch(1));
1252             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1253
1254             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1255
1256             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1257         }};
1258     }
1259
1260     /**
1261      * This test simply verifies that the applySnapShot logic will work
1262      * @throws ReadFailedException
1263      */
1264     @Test
1265     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1266         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1267
1268         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1269
1270         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1271         putTransaction.write(TestModel.TEST_PATH,
1272             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1273         commitTransaction(putTransaction);
1274
1275
1276         NormalizedNode<?, ?> expected = readStore(store);
1277
1278         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1279
1280         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1281         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1282
1283         commitTransaction(writeTransaction);
1284
1285         NormalizedNode<?, ?> actual = readStore(store);
1286
1287         assertEquals(expected, actual);
1288     }
1289
1290     @Test
1291     public void testRecoveryApplicable(){
1292
1293         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1294                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1295
1296         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1297                 persistentContext, SCHEMA_CONTEXT);
1298
1299         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1300                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1301
1302         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1303                 nonPersistentContext, SCHEMA_CONTEXT);
1304
1305         new ShardTestKit(getSystem()) {{
1306             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1307                     persistentProps, "testPersistence1");
1308
1309             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1310
1311             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1312
1313             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1314                     nonPersistentProps, "testPersistence2");
1315
1316             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1317
1318             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1319
1320         }};
1321
1322     }
1323
1324
1325     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1326         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1327         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1328             transaction.read(YangInstanceIdentifier.builder().build());
1329
1330         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1331
1332         NormalizedNode<?, ?> normalizedNode = optional.get();
1333
1334         transaction.close();
1335
1336         return normalizedNode;
1337     }
1338
1339     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1340         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1341         ListenableFuture<Void> future =
1342             commitCohort.preCommit();
1343         try {
1344             future.get();
1345             future = commitCohort.commit();
1346             future.get();
1347         } catch (InterruptedException | ExecutionException e) {
1348         }
1349     }
1350
1351     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1352         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1353             @Override
1354             public void onDataChanged(
1355                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1356
1357             }
1358         };
1359     }
1360
1361     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1362             throws ExecutionException, InterruptedException {
1363         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1364
1365         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1366             transaction.read(id);
1367
1368         Optional<NormalizedNode<?, ?>> optional = future.get();
1369         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1370
1371         transaction.close();
1372
1373         return node;
1374     }
1375
1376     private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
1377         throws ExecutionException, InterruptedException {
1378         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1379
1380         transaction.write(id, node);
1381
1382         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1383         commitCohort.preCommit().get();
1384         commitCohort.commit().get();
1385     }
1386
1387     @SuppressWarnings("serial")
1388     private static final class DelegatingShardCreator implements Creator<Shard> {
1389         private final Creator<Shard> delegate;
1390
1391         DelegatingShardCreator(final Creator<Shard> delegate) {
1392             this.delegate = delegate;
1393         }
1394
1395         @Override
1396         public Shard create() throws Exception {
1397             return delegate.create();
1398         }
1399     }
1400 }