Fixed missing uses of ${artifactId} in urn
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.mockito.InOrder;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
59 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
60 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
61 import org.opendaylight.controller.cluster.datastore.modification.Modification;
62 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
63 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
64 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
65 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
66 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
67 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
68 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
70 import org.opendaylight.controller.cluster.raft.Snapshot;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
74 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
75 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
76 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
78 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
79 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
80 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
81 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
82 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
83 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
84 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
85 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
86 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
87 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
88 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
89 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
90 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
91 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
92 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
93 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
94 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
95 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
96 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
97 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
98 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
99 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
100 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
101 import scala.concurrent.Await;
102 import scala.concurrent.Future;
103 import scala.concurrent.duration.FiniteDuration;
104
105 public class ShardTest extends AbstractActorTest {
106
107     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
108
109     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
110
111     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
112             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
113
114     private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
115             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
116             shardHeartbeatIntervalInMillis(100).build();
117
118     @Before
119     public void setUp() {
120         InMemorySnapshotStore.clear();
121         InMemoryJournal.clear();
122     }
123
124     @After
125     public void tearDown() {
126         InMemorySnapshotStore.clear();
127         InMemoryJournal.clear();
128     }
129
130     private Props newShardProps() {
131         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
132                 dataStoreContext, SCHEMA_CONTEXT);
133     }
134
135     @Test
136     public void testRegisterChangeListener() throws Exception {
137         new ShardTestKit(getSystem()) {{
138             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
139                     newShardProps(),  "testRegisterChangeListener");
140
141             waitUntilLeader(shard);
142
143             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
144
145             MockDataChangeListener listener = new MockDataChangeListener(1);
146             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
147                     "testRegisterChangeListener-DataChangeListener");
148
149             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
150                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
151
152             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
153                     RegisterChangeListenerReply.class);
154             String replyPath = reply.getListenerRegistrationPath().toString();
155             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
156                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
157
158             YangInstanceIdentifier path = TestModel.TEST_PATH;
159             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
160
161             listener.waitForChangeEvents(path);
162
163             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
164             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
165         }};
166     }
167
168     @SuppressWarnings("serial")
169     @Test
170     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
171         // This test tests the timing window in which a change listener is registered before the
172         // shard becomes the leader. We verify that the listener is registered and notified of the
173         // existing data when the shard becomes the leader.
174         new ShardTestKit(getSystem()) {{
175             // For this test, we want to send the RegisterChangeListener message after the shard
176             // has recovered from persistence and before it becomes the leader. So we subclass
177             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
178             // we know that the shard has been initialized to a follower and has started the
179             // election process. The following 2 CountDownLatches are used to coordinate the
180             // ElectionTimeout with the sending of the RegisterChangeListener message.
181             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
182             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
183             Creator<Shard> creator = new Creator<Shard>() {
184                 boolean firstElectionTimeout = true;
185
186                 @Override
187                 public Shard create() throws Exception {
188                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
189                             dataStoreContext, SCHEMA_CONTEXT) {
190                         @Override
191                         public void onReceiveCommand(final Object message) throws Exception {
192                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
193                                 // Got the first ElectionTimeout. We don't forward it to the
194                                 // base Shard yet until we've sent the RegisterChangeListener
195                                 // message. So we signal the onFirstElectionTimeout latch to tell
196                                 // the main thread to send the RegisterChangeListener message and
197                                 // start a thread to wait on the onChangeListenerRegistered latch,
198                                 // which the main thread signals after it has sent the message.
199                                 // After the onChangeListenerRegistered is triggered, we send the
200                                 // original ElectionTimeout message to proceed with the election.
201                                 firstElectionTimeout = false;
202                                 final ActorRef self = getSelf();
203                                 new Thread() {
204                                     @Override
205                                     public void run() {
206                                         Uninterruptibles.awaitUninterruptibly(
207                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
208                                         self.tell(message, self);
209                                     }
210                                 }.start();
211
212                                 onFirstElectionTimeout.countDown();
213                             } else {
214                                 super.onReceiveCommand(message);
215                             }
216                         }
217                     };
218                 }
219             };
220
221             MockDataChangeListener listener = new MockDataChangeListener(1);
222             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
223                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
224
225             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
226                     Props.create(new DelegatingShardCreator(creator)),
227                     "testRegisterChangeListenerWhenNotLeaderInitially");
228
229             // Write initial data into the in-memory store.
230             YangInstanceIdentifier path = TestModel.TEST_PATH;
231             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
232
233             // Wait until the shard receives the first ElectionTimeout message.
234             assertEquals("Got first ElectionTimeout", true,
235                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
236
237             // Now send the RegisterChangeListener and wait for the reply.
238             shard.tell(new RegisterChangeListener(path, dclActor.path(),
239                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
240
241             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
242                     RegisterChangeListenerReply.class);
243             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
244
245             // Sanity check - verify the shard is not the leader yet.
246             shard.tell(new FindLeader(), getRef());
247             FindLeaderReply findLeadeReply =
248                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
249             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
250
251             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
252             // with the election process.
253             onChangeListenerRegistered.countDown();
254
255             // Wait for the shard to become the leader and notify our listener with the existing
256             // data in the store.
257             listener.waitForChangeEvents(path);
258
259             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
260             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
261         }};
262     }
263
264     @Test
265     public void testCreateTransaction(){
266         new ShardTestKit(getSystem()) {{
267             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
268
269             waitUntilLeader(shard);
270
271             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
272
273             shard.tell(new CreateTransaction("txn-1",
274                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
275
276             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
277                     CreateTransactionReply.class);
278
279             String path = reply.getTransactionActorPath().toString();
280             assertTrue("Unexpected transaction path " + path,
281                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
282
283             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
284         }};
285     }
286
287     @Test
288     public void testCreateTransactionOnChain(){
289         new ShardTestKit(getSystem()) {{
290             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
291
292             waitUntilLeader(shard);
293
294             shard.tell(new CreateTransaction("txn-1",
295                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
296                     getRef());
297
298             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
299                     CreateTransactionReply.class);
300
301             String path = reply.getTransactionActorPath().toString();
302             assertTrue("Unexpected transaction path " + path,
303                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
304
305             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
306         }};
307     }
308
309     @SuppressWarnings("serial")
310     @Test
311     public void testPeerAddressResolved() throws Exception {
312         new ShardTestKit(getSystem()) {{
313             final CountDownLatch recoveryComplete = new CountDownLatch(1);
314             class TestShard extends Shard {
315                 TestShard() {
316                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
317                             dataStoreContext, SCHEMA_CONTEXT);
318                 }
319
320                 Map<String, String> getPeerAddresses() {
321                     return getRaftActorContext().getPeerAddresses();
322                 }
323
324                 @Override
325                 protected void onRecoveryComplete() {
326                     try {
327                         super.onRecoveryComplete();
328                     } finally {
329                         recoveryComplete.countDown();
330                     }
331                 }
332             }
333
334             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
335                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
336                         @Override
337                         public TestShard create() throws Exception {
338                             return new TestShard();
339                         }
340                     })), "testPeerAddressResolved");
341
342             //waitUntilLeader(shard);
343             assertEquals("Recovery complete", true,
344                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
345
346             String address = "akka://foobar";
347             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
348
349             assertEquals("getPeerAddresses", address,
350                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
351
352             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
353         }};
354     }
355
356     @Test
357     public void testApplySnapshot() throws Exception {
358         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
359                 "testApplySnapshot");
360
361         NormalizedNodeToNodeCodec codec =
362             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
363
364         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
365
366         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
367         NormalizedNode<?,?> expected = readStore(shard, root);
368
369         NormalizedNodeMessages.Container encode = codec.encode(expected);
370
371         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
372                 encode.getNormalizedNode().toByteString().toByteArray(),
373                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
374
375         shard.underlyingActor().onReceiveCommand(applySnapshot);
376
377         NormalizedNode<?,?> actual = readStore(shard, root);
378
379         assertEquals(expected, actual);
380
381         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
382     }
383
384     @Test
385     public void testApplyState() throws Exception {
386
387         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
388
389         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
390
391         MutableCompositeModification compMod = new MutableCompositeModification();
392         compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
393         Payload payload = new CompositeModificationPayload(compMod.toSerializable());
394         ApplyState applyState = new ApplyState(null, "test",
395                 new ReplicatedLogImplEntry(1, 2, payload));
396
397         shard.underlyingActor().onReceiveCommand(applyState);
398
399         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
400         assertEquals("Applied state", node, actual);
401
402         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
403     }
404
405     @SuppressWarnings("serial")
406     @Test
407     public void testRecovery() throws Exception {
408
409         // Set up the InMemorySnapshotStore.
410
411         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
412         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
413
414         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
415         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
416         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
417         commitCohort.preCommit().get();
418         commitCohort.commit().get();
419
420         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
421         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
422
423         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
424                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
425                         root).
426                                 getNormalizedNode().toByteString().toByteArray(),
427                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
428
429         // Set up the InMemoryJournal.
430
431         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
432                   new WriteModification(TestModel.OUTER_LIST_PATH,
433                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
434                           SCHEMA_CONTEXT))));
435
436         int nListEntries = 16;
437         Set<Integer> listEntryKeys = new HashSet<>();
438         for(int i = 1; i <= nListEntries-5; i++) {
439             listEntryKeys.add(Integer.valueOf(i));
440             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
441                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
442             Modification mod = new MergeModification(path,
443                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
444                     SCHEMA_CONTEXT);
445             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
446                     newPayload(mod)));
447         }
448
449         // Add some of the new CompositeModificationByteStringPayload
450         for(int i = 11; i <= nListEntries; i++) {
451             listEntryKeys.add(Integer.valueOf(i));
452             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
453                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
454             Modification mod = new MergeModification(path,
455                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
456                     SCHEMA_CONTEXT);
457             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
458                     newByteStringPayload(mod)));
459         }
460
461
462         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
463                 new ApplyLogEntries(nListEntries));
464
465         // Create the actor and wait for recovery complete.
466
467         final CountDownLatch recoveryComplete = new CountDownLatch(1);
468
469         Creator<Shard> creator = new Creator<Shard>() {
470             @Override
471             public Shard create() throws Exception {
472                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
473                         dataStoreContext, SCHEMA_CONTEXT) {
474                     @Override
475                     protected void onRecoveryComplete() {
476                         try {
477                             super.onRecoveryComplete();
478                         } finally {
479                             recoveryComplete.countDown();
480                         }
481                     }
482                 };
483             }
484         };
485
486         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
487                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
488
489         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
490
491         // Verify data in the data store.
492
493         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
494         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
495         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
496                 outerList.getValue() instanceof Iterable);
497         for(Object entry: (Iterable<?>) outerList.getValue()) {
498             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
499                     entry instanceof MapEntryNode);
500             MapEntryNode mapEntry = (MapEntryNode)entry;
501             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
502                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
503             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
504             Object value = idLeaf.get().getValue();
505             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
506                     listEntryKeys.remove(value));
507         }
508
509         if(!listEntryKeys.isEmpty()) {
510             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
511                     listEntryKeys);
512         }
513
514         assertEquals("Last log index", nListEntries,
515                 shard.underlyingActor().getShardMBean().getLastLogIndex());
516         assertEquals("Commit index", nListEntries,
517                 shard.underlyingActor().getShardMBean().getCommitIndex());
518         assertEquals("Last applied", nListEntries,
519                 shard.underlyingActor().getShardMBean().getLastApplied());
520
521         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
522     }
523
524     private CompositeModificationPayload newPayload(final Modification... mods) {
525         MutableCompositeModification compMod = new MutableCompositeModification();
526         for(Modification mod: mods) {
527             compMod.addModification(mod);
528         }
529
530         return new CompositeModificationPayload(compMod.toSerializable());
531     }
532
533     private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
534         MutableCompositeModification compMod = new MutableCompositeModification();
535         for(Modification mod: mods) {
536             compMod.addModification(mod);
537         }
538
539         return new CompositeModificationByteStringPayload(compMod.toSerializable());
540     }
541
542
543     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
544             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
545             final MutableCompositeModification modification) {
546         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
547     }
548
549     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
550             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
551             final MutableCompositeModification modification,
552             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
553
554         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
555         tx.write(path, data);
556         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
557         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
558
559         doAnswer(new Answer<ListenableFuture<Boolean>>() {
560             @Override
561             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
562                 return realCohort.canCommit();
563             }
564         }).when(cohort).canCommit();
565
566         doAnswer(new Answer<ListenableFuture<Void>>() {
567             @Override
568             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
569                 if(preCommit != null) {
570                     return preCommit.apply(realCohort);
571                 } else {
572                     return realCohort.preCommit();
573                 }
574             }
575         }).when(cohort).preCommit();
576
577         doAnswer(new Answer<ListenableFuture<Void>>() {
578             @Override
579             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
580                 return realCohort.commit();
581             }
582         }).when(cohort).commit();
583
584         doAnswer(new Answer<ListenableFuture<Void>>() {
585             @Override
586             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
587                 return realCohort.abort();
588             }
589         }).when(cohort).abort();
590
591         modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
592
593         return cohort;
594     }
595
596     @SuppressWarnings({ "unchecked" })
597     @Test
598     public void testConcurrentThreePhaseCommits() throws Throwable {
599         new ShardTestKit(getSystem()) {{
600             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
601                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
602                     "testConcurrentThreePhaseCommits");
603
604             waitUntilLeader(shard);
605
606             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
607
608             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
609
610             String transactionID1 = "tx1";
611             MutableCompositeModification modification1 = new MutableCompositeModification();
612             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
613                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
614
615             String transactionID2 = "tx2";
616             MutableCompositeModification modification2 = new MutableCompositeModification();
617             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
618                     TestModel.OUTER_LIST_PATH,
619                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
620                     modification2);
621
622             String transactionID3 = "tx3";
623             MutableCompositeModification modification3 = new MutableCompositeModification();
624             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
625                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
626                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
627                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
628                     modification3);
629
630             long timeoutSec = 5;
631             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
632             final Timeout timeout = new Timeout(duration);
633
634             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
635             // by the ShardTransaction.
636
637             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
638                     cohort1, modification1, true), getRef());
639             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
640                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
641             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
642
643             // Send the CanCommitTransaction message for the first Tx.
644
645             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
646             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
647                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
648             assertEquals("Can commit", true, canCommitReply.getCanCommit());
649
650             // Send the ForwardedReadyTransaction for the next 2 Tx's.
651
652             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
653                     cohort2, modification2, true), getRef());
654             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
655
656             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
657                     cohort3, modification3, true), getRef());
658             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
659
660             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
661             // processed after the first Tx completes.
662
663             Future<Object> canCommitFuture1 = Patterns.ask(shard,
664                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
665
666             Future<Object> canCommitFuture2 = Patterns.ask(shard,
667                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
668
669             // Send the CommitTransaction message for the first Tx. After it completes, it should
670             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
671
672             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
673             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
674
675             // Wait for the next 2 Tx's to complete.
676
677             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
678             final CountDownLatch commitLatch = new CountDownLatch(2);
679
680             class OnFutureComplete extends OnComplete<Object> {
681                 private final Class<?> expRespType;
682
683                 OnFutureComplete(final Class<?> expRespType) {
684                     this.expRespType = expRespType;
685                 }
686
687                 @Override
688                 public void onComplete(final Throwable error, final Object resp) {
689                     if(error != null) {
690                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
691                     } else {
692                         try {
693                             assertEquals("Commit response type", expRespType, resp.getClass());
694                             onSuccess(resp);
695                         } catch (Exception e) {
696                             caughtEx.set(e);
697                         }
698                     }
699                 }
700
701                 void onSuccess(final Object resp) throws Exception {
702                 }
703             }
704
705             class OnCommitFutureComplete extends OnFutureComplete {
706                 OnCommitFutureComplete() {
707                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
708                 }
709
710                 @Override
711                 public void onComplete(final Throwable error, final Object resp) {
712                     super.onComplete(error, resp);
713                     commitLatch.countDown();
714                 }
715             }
716
717             class OnCanCommitFutureComplete extends OnFutureComplete {
718                 private final String transactionID;
719
720                 OnCanCommitFutureComplete(final String transactionID) {
721                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
722                     this.transactionID = transactionID;
723                 }
724
725                 @Override
726                 void onSuccess(final Object resp) throws Exception {
727                     CanCommitTransactionReply canCommitReply =
728                             CanCommitTransactionReply.fromSerializable(resp);
729                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
730
731                     Future<Object> commitFuture = Patterns.ask(shard,
732                             new CommitTransaction(transactionID).toSerializable(), timeout);
733                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
734                 }
735             }
736
737             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
738                     getSystem().dispatcher());
739
740             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
741                     getSystem().dispatcher());
742
743             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
744
745             if(caughtEx.get() != null) {
746                 throw caughtEx.get();
747             }
748
749             assertEquals("Commits complete", true, done);
750
751             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
752             inOrder.verify(cohort1).canCommit();
753             inOrder.verify(cohort1).preCommit();
754             inOrder.verify(cohort1).commit();
755             inOrder.verify(cohort2).canCommit();
756             inOrder.verify(cohort2).preCommit();
757             inOrder.verify(cohort2).commit();
758             inOrder.verify(cohort3).canCommit();
759             inOrder.verify(cohort3).preCommit();
760             inOrder.verify(cohort3).commit();
761
762             // Verify data in the data store.
763
764             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
765             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
766             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
767                     outerList.getValue() instanceof Iterable);
768             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
769             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
770                        entry instanceof MapEntryNode);
771             MapEntryNode mapEntry = (MapEntryNode)entry;
772             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
773                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
774             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
775             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
776
777             for(int i = 0; i < 20 * 5; i++) {
778                 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
779                 if(lastLogIndex == 2) {
780                     break;
781                 }
782                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
783             }
784
785             assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
786
787             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
788         }};
789     }
790
791     @Test
792     public void testCommitPhaseFailure() throws Throwable {
793         new ShardTestKit(getSystem()) {{
794             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
795                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
796                     "testCommitPhaseFailure");
797
798             waitUntilLeader(shard);
799
800             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
801             // commit phase.
802
803             String transactionID1 = "tx1";
804             MutableCompositeModification modification1 = new MutableCompositeModification();
805             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
806             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
807             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
808             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
809
810             String transactionID2 = "tx2";
811             MutableCompositeModification modification2 = new MutableCompositeModification();
812             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
813             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
814
815             FiniteDuration duration = duration("5 seconds");
816             final Timeout timeout = new Timeout(duration);
817
818             // Simulate the ForwardedReadyTransaction messages that would be sent
819             // by the ShardTransaction.
820
821             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
822                     cohort1, modification1, true), getRef());
823             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
824
825             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
826                     cohort2, modification2, true), getRef());
827             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
828
829             // Send the CanCommitTransaction message for the first Tx.
830
831             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
832             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
833                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
834             assertEquals("Can commit", true, canCommitReply.getCanCommit());
835
836             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
837             // processed after the first Tx completes.
838
839             Future<Object> canCommitFuture = Patterns.ask(shard,
840                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
841
842             // Send the CommitTransaction message for the first Tx. This should send back an error
843             // and trigger the 2nd Tx to proceed.
844
845             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
846             expectMsgClass(duration, akka.actor.Status.Failure.class);
847
848             // Wait for the 2nd Tx to complete the canCommit phase.
849
850             final CountDownLatch latch = new CountDownLatch(1);
851             canCommitFuture.onComplete(new OnComplete<Object>() {
852                 @Override
853                 public void onComplete(final Throwable t, final Object resp) {
854                     latch.countDown();
855                 }
856             }, getSystem().dispatcher());
857
858             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
859
860             InOrder inOrder = inOrder(cohort1, cohort2);
861             inOrder.verify(cohort1).canCommit();
862             inOrder.verify(cohort1).preCommit();
863             inOrder.verify(cohort1).commit();
864             inOrder.verify(cohort2).canCommit();
865
866             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
867         }};
868     }
869
870     @Test
871     public void testPreCommitPhaseFailure() throws Throwable {
872         new ShardTestKit(getSystem()) {{
873             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
874                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
875                     "testPreCommitPhaseFailure");
876
877             waitUntilLeader(shard);
878
879             String transactionID = "tx1";
880             MutableCompositeModification modification = new MutableCompositeModification();
881             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
882             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
883             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
884
885             FiniteDuration duration = duration("5 seconds");
886
887             // Simulate the ForwardedReadyTransaction messages that would be sent
888             // by the ShardTransaction.
889
890             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
891                     cohort, modification, true), getRef());
892             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
893
894             // Send the CanCommitTransaction message.
895
896             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
897             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
898                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
899             assertEquals("Can commit", true, canCommitReply.getCanCommit());
900
901             // Send the CommitTransaction message. This should send back an error
902             // for preCommit failure.
903
904             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
905             expectMsgClass(duration, akka.actor.Status.Failure.class);
906
907             InOrder inOrder = inOrder(cohort);
908             inOrder.verify(cohort).canCommit();
909             inOrder.verify(cohort).preCommit();
910
911             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
912         }};
913     }
914
915     @Test
916     public void testCanCommitPhaseFailure() throws Throwable {
917         new ShardTestKit(getSystem()) {{
918             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
919                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
920                     "testCanCommitPhaseFailure");
921
922             waitUntilLeader(shard);
923
924             final FiniteDuration duration = duration("5 seconds");
925
926             String transactionID = "tx1";
927             MutableCompositeModification modification = new MutableCompositeModification();
928             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
929             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
930
931             // Simulate the ForwardedReadyTransaction messages that would be sent
932             // by the ShardTransaction.
933
934             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
935                     cohort, modification, true), getRef());
936             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
937
938             // Send the CanCommitTransaction message.
939
940             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
941             expectMsgClass(duration, akka.actor.Status.Failure.class);
942
943             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
944         }};
945     }
946
947     @Test
948     public void testAbortBeforeFinishCommit() throws Throwable {
949         new ShardTestKit(getSystem()) {{
950             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
951                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
952                     "testAbortBeforeFinishCommit");
953
954             waitUntilLeader(shard);
955
956             final FiniteDuration duration = duration("5 seconds");
957             final Timeout timeout = new Timeout(duration);
958
959             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
960
961             final String transactionID = "tx1";
962             final CountDownLatch abortComplete = new CountDownLatch(1);
963             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
964                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
965                 @Override
966                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
967                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
968
969                     Future<Object> abortFuture = Patterns.ask(shard,
970                             new AbortTransaction(transactionID).toSerializable(), timeout);
971                     abortFuture.onComplete(new OnComplete<Object>() {
972                         @Override
973                         public void onComplete(final Throwable e, final Object resp) {
974                             abortComplete.countDown();
975                         }
976                     }, getSystem().dispatcher());
977
978                     return preCommitFuture;
979                 }
980             };
981
982             MutableCompositeModification modification = new MutableCompositeModification();
983             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
984                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
985                     modification, preCommit);
986
987             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
988                     cohort, modification, true), getRef());
989             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
990
991             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
992             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
993                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
994             assertEquals("Can commit", true, canCommitReply.getCanCommit());
995
996             Future<Object> commitFuture = Patterns.ask(shard,
997                     new CommitTransaction(transactionID).toSerializable(), timeout);
998
999             assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
1000
1001             Await.result(commitFuture, duration);
1002
1003             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1004             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1005
1006             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1007         }};
1008     }
1009
1010     @Test
1011     public void testTransactionCommitTimeout() throws Throwable {
1012         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
1013
1014         new ShardTestKit(getSystem()) {{
1015             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1016                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1017                     "testTransactionCommitTimeout");
1018
1019             waitUntilLeader(shard);
1020
1021             final FiniteDuration duration = duration("5 seconds");
1022
1023             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1024
1025             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1026             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1027                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1028
1029             // Create 1st Tx - will timeout
1030
1031             String transactionID1 = "tx1";
1032             MutableCompositeModification modification1 = new MutableCompositeModification();
1033             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1034                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1035                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1036                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1037                     modification1);
1038
1039             // Create 2nd Tx
1040
1041             String transactionID2 = "tx3";
1042             MutableCompositeModification modification2 = new MutableCompositeModification();
1043             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1044                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1045             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1046                     listNodePath,
1047                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1048                     modification2);
1049
1050             // Ready the Tx's
1051
1052             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1053                     cohort1, modification1, true), getRef());
1054             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1055
1056             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1057                     cohort2, modification2, true), getRef());
1058             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1059
1060             // canCommit 1st Tx. We don't send the commit so it should timeout.
1061
1062             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1063             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1064
1065             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1066
1067             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1068             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1069
1070             // Commit the 2nd Tx.
1071
1072             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1073             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1074
1075             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1076             assertNotNull(listNodePath + " not found", node);
1077
1078             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1079         }};
1080     }
1081
1082     @Test
1083     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1084         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1085
1086         new ShardTestKit(getSystem()) {{
1087             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1088                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1089                     "testTransactionCommitQueueCapacityExceeded");
1090
1091             waitUntilLeader(shard);
1092
1093             final FiniteDuration duration = duration("5 seconds");
1094
1095             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1096
1097             String transactionID1 = "tx1";
1098             MutableCompositeModification modification1 = new MutableCompositeModification();
1099             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1100                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1101
1102             String transactionID2 = "tx2";
1103             MutableCompositeModification modification2 = new MutableCompositeModification();
1104             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1105                     TestModel.OUTER_LIST_PATH,
1106                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1107                     modification2);
1108
1109             String transactionID3 = "tx3";
1110             MutableCompositeModification modification3 = new MutableCompositeModification();
1111             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1112                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1113
1114             // Ready the Tx's
1115
1116             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1117                     cohort1, modification1, true), getRef());
1118             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1119
1120             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1121                     cohort2, modification2, true), getRef());
1122             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1123
1124             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1125                     cohort3, modification3, true), getRef());
1126             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1127
1128             // canCommit 1st Tx.
1129
1130             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1131             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1132
1133             // canCommit the 2nd Tx - it should get queued.
1134
1135             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1136
1137             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1138
1139             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1140             expectMsgClass(duration, akka.actor.Status.Failure.class);
1141
1142             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1143         }};
1144     }
1145
1146     @Test
1147     public void testCanCommitBeforeReadyFailure() throws Throwable {
1148         new ShardTestKit(getSystem()) {{
1149             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1150                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1151                     "testCanCommitBeforeReadyFailure");
1152
1153             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1154             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1155
1156             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1157         }};
1158     }
1159
1160     @Test
1161     public void testAbortTransaction() throws Throwable {
1162         new ShardTestKit(getSystem()) {{
1163             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1164                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1165                     "testAbortTransaction");
1166
1167             waitUntilLeader(shard);
1168
1169             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1170
1171             String transactionID1 = "tx1";
1172             MutableCompositeModification modification1 = new MutableCompositeModification();
1173             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1174             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1175             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1176
1177             String transactionID2 = "tx2";
1178             MutableCompositeModification modification2 = new MutableCompositeModification();
1179             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1180             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1181
1182             FiniteDuration duration = duration("5 seconds");
1183             final Timeout timeout = new Timeout(duration);
1184
1185             // Simulate the ForwardedReadyTransaction messages that would be sent
1186             // by the ShardTransaction.
1187
1188             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1189                     cohort1, modification1, true), getRef());
1190             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1191
1192             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1193                     cohort2, modification2, true), getRef());
1194             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1195
1196             // Send the CanCommitTransaction message for the first Tx.
1197
1198             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1199             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1200                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1201             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1202
1203             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1204             // processed after the first Tx completes.
1205
1206             Future<Object> canCommitFuture = Patterns.ask(shard,
1207                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1208
1209             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1210             // Tx to proceed.
1211
1212             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1213             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1214
1215             // Wait for the 2nd Tx to complete the canCommit phase.
1216
1217             final CountDownLatch latch = new CountDownLatch(1);
1218             canCommitFuture.onComplete(new OnComplete<Object>() {
1219                 @Override
1220                 public void onComplete(final Throwable t, final Object resp) {
1221                     latch.countDown();
1222                 }
1223             }, getSystem().dispatcher());
1224
1225             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1226
1227             InOrder inOrder = inOrder(cohort1, cohort2);
1228             inOrder.verify(cohort1).canCommit();
1229             inOrder.verify(cohort2).canCommit();
1230
1231             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1232         }};
1233     }
1234
1235     @Test
1236     public void testCreateSnapshot() throws IOException, InterruptedException {
1237             testCreateSnapshot(true, "testCreateSnapshot");
1238     }
1239
1240     @Test
1241     public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1242         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1243     }
1244
1245     @SuppressWarnings("serial")
1246     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1247         final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1248                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1249
1250         new ShardTestKit(getSystem()) {{
1251             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1252             Creator<Shard> creator = new Creator<Shard>() {
1253                 @Override
1254                 public Shard create() throws Exception {
1255                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1256                             dataStoreContext, SCHEMA_CONTEXT) {
1257                         @Override
1258                         protected void commitSnapshot(final long sequenceNumber) {
1259                             super.commitSnapshot(sequenceNumber);
1260                             latch.get().countDown();
1261                         }
1262                     };
1263                 }
1264             };
1265
1266             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1267                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1268
1269             waitUntilLeader(shard);
1270
1271             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1272
1273             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1274
1275             latch.set(new CountDownLatch(1));
1276             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1277
1278             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1279
1280             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1281         }};
1282     }
1283
1284     /**
1285      * This test simply verifies that the applySnapShot logic will work
1286      * @throws ReadFailedException
1287      */
1288     @Test
1289     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1290         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1291
1292         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1293
1294         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1295         putTransaction.write(TestModel.TEST_PATH,
1296             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1297         commitTransaction(putTransaction);
1298
1299
1300         NormalizedNode<?, ?> expected = readStore(store);
1301
1302         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1303
1304         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1305         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1306
1307         commitTransaction(writeTransaction);
1308
1309         NormalizedNode<?, ?> actual = readStore(store);
1310
1311         assertEquals(expected, actual);
1312     }
1313
1314     @Test
1315     public void testRecoveryApplicable(){
1316
1317         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1318                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1319
1320         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1321                 persistentContext, SCHEMA_CONTEXT);
1322
1323         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1324                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1325
1326         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1327                 nonPersistentContext, SCHEMA_CONTEXT);
1328
1329         new ShardTestKit(getSystem()) {{
1330             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1331                     persistentProps, "testPersistence1");
1332
1333             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1334
1335             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1336
1337             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1338                     nonPersistentProps, "testPersistence2");
1339
1340             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1341
1342             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1343
1344         }};
1345
1346     }
1347
1348
1349     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1350         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1351         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1352             transaction.read(YangInstanceIdentifier.builder().build());
1353
1354         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1355
1356         NormalizedNode<?, ?> normalizedNode = optional.get();
1357
1358         transaction.close();
1359
1360         return normalizedNode;
1361     }
1362
1363     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1364         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1365         ListenableFuture<Void> future =
1366             commitCohort.preCommit();
1367         try {
1368             future.get();
1369             future = commitCohort.commit();
1370             future.get();
1371         } catch (InterruptedException | ExecutionException e) {
1372         }
1373     }
1374
1375     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1376         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1377             @Override
1378             public void onDataChanged(
1379                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1380
1381             }
1382         };
1383     }
1384
1385     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1386             throws ExecutionException, InterruptedException {
1387         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1388
1389         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1390             transaction.read(id);
1391
1392         Optional<NormalizedNode<?, ?>> optional = future.get();
1393         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1394
1395         transaction.close();
1396
1397         return node;
1398     }
1399
1400     private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
1401         throws ExecutionException, InterruptedException {
1402         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1403
1404         transaction.write(id, node);
1405
1406         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1407         commitCohort.preCommit().get();
1408         commitCohort.commit().get();
1409     }
1410
1411     @SuppressWarnings("serial")
1412     private static final class DelegatingShardCreator implements Creator<Shard> {
1413         private final Creator<Shard> delegate;
1414
1415         DelegatingShardCreator(final Creator<Shard> delegate) {
1416             this.delegate = delegate;
1417         }
1418
1419         @Override
1420         public Shard create() throws Exception {
1421             return delegate.create();
1422         }
1423     }
1424 }