Merge "BUG 2486 : Optimizations for a single node cluster deployment"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Ignore;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.invocation.InvocationOnMock;
46 import org.mockito.stubbing.Answer;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
60 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
61 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
62 import org.opendaylight.controller.cluster.datastore.modification.Modification;
63 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
64 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
65 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
66 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
67 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
68 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
71 import org.opendaylight.controller.cluster.raft.Snapshot;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
74 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
75 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
76 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
78 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
79 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
80 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
81 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
82 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
83 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
84 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
85 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
86 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
87 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
88 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
89 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
90 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
91 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
92 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
93 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
95 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
97 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
98 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
100 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
101 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
102 import scala.concurrent.Await;
103 import scala.concurrent.Future;
104 import scala.concurrent.duration.FiniteDuration;
105
106 public class ShardTest extends AbstractActorTest {
107
108     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
109
110     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
111
112     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
113             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
114
115     private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
116             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
117             shardHeartbeatIntervalInMillis(100).build();
118
119     @Before
120     public void setUp() {
121         InMemorySnapshotStore.clear();
122         InMemoryJournal.clear();
123     }
124
125     @After
126     public void tearDown() {
127         InMemorySnapshotStore.clear();
128         InMemoryJournal.clear();
129     }
130
131     private Props newShardProps() {
132         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
133                 dataStoreContext, SCHEMA_CONTEXT);
134     }
135
136     @Test
137     public void testRegisterChangeListener() throws Exception {
138         new ShardTestKit(getSystem()) {{
139             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
140                     newShardProps(),  "testRegisterChangeListener");
141
142             waitUntilLeader(shard);
143
144             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
145
146             MockDataChangeListener listener = new MockDataChangeListener(1);
147             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
148                     "testRegisterChangeListener-DataChangeListener");
149
150             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
151                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
152
153             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
154                     RegisterChangeListenerReply.class);
155             String replyPath = reply.getListenerRegistrationPath().toString();
156             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
157                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
158
159             YangInstanceIdentifier path = TestModel.TEST_PATH;
160             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
161
162             listener.waitForChangeEvents(path);
163
164             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
166         }};
167     }
168
169     @SuppressWarnings("serial")
170     @Test
171     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
172         // This test tests the timing window in which a change listener is registered before the
173         // shard becomes the leader. We verify that the listener is registered and notified of the
174         // existing data when the shard becomes the leader.
175         new ShardTestKit(getSystem()) {{
176             // For this test, we want to send the RegisterChangeListener message after the shard
177             // has recovered from persistence and before it becomes the leader. So we subclass
178             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
179             // we know that the shard has been initialized to a follower and has started the
180             // election process. The following 2 CountDownLatches are used to coordinate the
181             // ElectionTimeout with the sending of the RegisterChangeListener message.
182             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
183             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
184             Creator<Shard> creator = new Creator<Shard>() {
185                 boolean firstElectionTimeout = true;
186
187                 @Override
188                 public Shard create() throws Exception {
189                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
190                             dataStoreContext, SCHEMA_CONTEXT) {
191                         @Override
192                         public void onReceiveCommand(final Object message) throws Exception {
193                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
194                                 // Got the first ElectionTimeout. We don't forward it to the
195                                 // base Shard yet until we've sent the RegisterChangeListener
196                                 // message. So we signal the onFirstElectionTimeout latch to tell
197                                 // the main thread to send the RegisterChangeListener message and
198                                 // start a thread to wait on the onChangeListenerRegistered latch,
199                                 // which the main thread signals after it has sent the message.
200                                 // After the onChangeListenerRegistered is triggered, we send the
201                                 // original ElectionTimeout message to proceed with the election.
202                                 firstElectionTimeout = false;
203                                 final ActorRef self = getSelf();
204                                 new Thread() {
205                                     @Override
206                                     public void run() {
207                                         Uninterruptibles.awaitUninterruptibly(
208                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
209                                         self.tell(message, self);
210                                     }
211                                 }.start();
212
213                                 onFirstElectionTimeout.countDown();
214                             } else {
215                                 super.onReceiveCommand(message);
216                             }
217                         }
218                     };
219                 }
220             };
221
222             MockDataChangeListener listener = new MockDataChangeListener(1);
223             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
224                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
225
226             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
227                     Props.create(new DelegatingShardCreator(creator)),
228                     "testRegisterChangeListenerWhenNotLeaderInitially");
229
230             // Write initial data into the in-memory store.
231             YangInstanceIdentifier path = TestModel.TEST_PATH;
232             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
233
234             // Wait until the shard receives the first ElectionTimeout message.
235             assertEquals("Got first ElectionTimeout", true,
236                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
237
238             // Now send the RegisterChangeListener and wait for the reply.
239             shard.tell(new RegisterChangeListener(path, dclActor.path(),
240                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
241
242             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
243                     RegisterChangeListenerReply.class);
244             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
245
246             // Sanity check - verify the shard is not the leader yet.
247             shard.tell(new FindLeader(), getRef());
248             FindLeaderReply findLeadeReply =
249                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
250             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
251
252             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
253             // with the election process.
254             onChangeListenerRegistered.countDown();
255
256             // Wait for the shard to become the leader and notify our listener with the existing
257             // data in the store.
258             listener.waitForChangeEvents(path);
259
260             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
261             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
262         }};
263     }
264
265     @Test
266     public void testCreateTransaction(){
267         new ShardTestKit(getSystem()) {{
268             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
269
270             waitUntilLeader(shard);
271
272             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
273
274             shard.tell(new CreateTransaction("txn-1",
275                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
276
277             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
278                     CreateTransactionReply.class);
279
280             String path = reply.getTransactionActorPath().toString();
281             assertTrue("Unexpected transaction path " + path,
282                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
283
284             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
285         }};
286     }
287
288     @Test
289     public void testCreateTransactionOnChain(){
290         new ShardTestKit(getSystem()) {{
291             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
292
293             waitUntilLeader(shard);
294
295             shard.tell(new CreateTransaction("txn-1",
296                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
297                     getRef());
298
299             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
300                     CreateTransactionReply.class);
301
302             String path = reply.getTransactionActorPath().toString();
303             assertTrue("Unexpected transaction path " + path,
304                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
305
306             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
307         }};
308     }
309
310     @SuppressWarnings("serial")
311     @Test
312     public void testPeerAddressResolved() throws Exception {
313         new ShardTestKit(getSystem()) {{
314             final CountDownLatch recoveryComplete = new CountDownLatch(1);
315             class TestShard extends Shard {
316                 TestShard() {
317                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
318                             dataStoreContext, SCHEMA_CONTEXT);
319                 }
320
321                 Map<String, String> getPeerAddresses() {
322                     return getRaftActorContext().getPeerAddresses();
323                 }
324
325                 @Override
326                 protected void onRecoveryComplete() {
327                     try {
328                         super.onRecoveryComplete();
329                     } finally {
330                         recoveryComplete.countDown();
331                     }
332                 }
333             }
334
335             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
336                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
337                         @Override
338                         public TestShard create() throws Exception {
339                             return new TestShard();
340                         }
341                     })), "testPeerAddressResolved");
342
343             //waitUntilLeader(shard);
344             assertEquals("Recovery complete", true,
345                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
346
347             String address = "akka://foobar";
348             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
349
350             assertEquals("getPeerAddresses", address,
351                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
352
353             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
354         }};
355     }
356
357     @Test
358     public void testApplySnapshot() throws Exception {
359         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
360                 "testApplySnapshot");
361
362         NormalizedNodeToNodeCodec codec =
363             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
364
365         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
366
367         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
368         NormalizedNode<?,?> expected = readStore(shard, root);
369
370         NormalizedNodeMessages.Container encode = codec.encode(expected);
371
372         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
373                 encode.getNormalizedNode().toByteString().toByteArray(),
374                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
375
376         shard.underlyingActor().onReceiveCommand(applySnapshot);
377
378         NormalizedNode<?,?> actual = readStore(shard, root);
379
380         assertEquals(expected, actual);
381
382         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
383     }
384
385     @Test
386     public void testApplyState() throws Exception {
387
388         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
389
390         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
391
392         MutableCompositeModification compMod = new MutableCompositeModification();
393         compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
394         Payload payload = new CompositeModificationPayload(compMod.toSerializable());
395         ApplyState applyState = new ApplyState(null, "test",
396                 new ReplicatedLogImplEntry(1, 2, payload));
397
398         shard.underlyingActor().onReceiveCommand(applyState);
399
400         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
401         assertEquals("Applied state", node, actual);
402
403         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
404     }
405
406     @SuppressWarnings("serial")
407     @Test
408     public void testRecovery() throws Exception {
409
410         // Set up the InMemorySnapshotStore.
411
412         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
413         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
414
415         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
416         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
417         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
418         commitCohort.preCommit().get();
419         commitCohort.commit().get();
420
421         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
422         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
423
424         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
425                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
426                         root).
427                                 getNormalizedNode().toByteString().toByteArray(),
428                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
429
430         // Set up the InMemoryJournal.
431
432         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
433                   new WriteModification(TestModel.OUTER_LIST_PATH,
434                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
435                           SCHEMA_CONTEXT))));
436
437         int nListEntries = 16;
438         Set<Integer> listEntryKeys = new HashSet<>();
439         for(int i = 1; i <= nListEntries-5; i++) {
440             listEntryKeys.add(Integer.valueOf(i));
441             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
442                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
443             Modification mod = new MergeModification(path,
444                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
445                     SCHEMA_CONTEXT);
446             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
447                     newPayload(mod)));
448         }
449
450         // Add some of the new CompositeModificationByteStringPayload
451         for(int i = 11; i <= nListEntries; i++) {
452             listEntryKeys.add(Integer.valueOf(i));
453             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
454                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
455             Modification mod = new MergeModification(path,
456                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
457                     SCHEMA_CONTEXT);
458             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
459                     newByteStringPayload(mod)));
460         }
461
462
463         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
464                 new ApplyLogEntries(nListEntries));
465
466         // Create the actor and wait for recovery complete.
467
468         final CountDownLatch recoveryComplete = new CountDownLatch(1);
469
470         Creator<Shard> creator = new Creator<Shard>() {
471             @Override
472             public Shard create() throws Exception {
473                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
474                         dataStoreContext, SCHEMA_CONTEXT) {
475                     @Override
476                     protected void onRecoveryComplete() {
477                         try {
478                             super.onRecoveryComplete();
479                         } finally {
480                             recoveryComplete.countDown();
481                         }
482                     }
483                 };
484             }
485         };
486
487         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
488                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
489
490         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
491
492         // Verify data in the data store.
493
494         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
495         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
496         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
497                 outerList.getValue() instanceof Iterable);
498         for(Object entry: (Iterable<?>) outerList.getValue()) {
499             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
500                     entry instanceof MapEntryNode);
501             MapEntryNode mapEntry = (MapEntryNode)entry;
502             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
503                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
504             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
505             Object value = idLeaf.get().getValue();
506             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
507                     listEntryKeys.remove(value));
508         }
509
510         if(!listEntryKeys.isEmpty()) {
511             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
512                     listEntryKeys);
513         }
514
515         assertEquals("Last log index", nListEntries,
516                 shard.underlyingActor().getShardMBean().getLastLogIndex());
517         assertEquals("Commit index", nListEntries,
518                 shard.underlyingActor().getShardMBean().getCommitIndex());
519         assertEquals("Last applied", nListEntries,
520                 shard.underlyingActor().getShardMBean().getLastApplied());
521
522         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
523     }
524
525     private CompositeModificationPayload newPayload(final Modification... mods) {
526         MutableCompositeModification compMod = new MutableCompositeModification();
527         for(Modification mod: mods) {
528             compMod.addModification(mod);
529         }
530
531         return new CompositeModificationPayload(compMod.toSerializable());
532     }
533
534     private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
535         MutableCompositeModification compMod = new MutableCompositeModification();
536         for(Modification mod: mods) {
537             compMod.addModification(mod);
538         }
539
540         return new CompositeModificationByteStringPayload(compMod.toSerializable());
541     }
542
543
544     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
545             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
546             final MutableCompositeModification modification) {
547         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
548     }
549
550     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
551             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
552             final MutableCompositeModification modification,
553             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
554
555         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
556         tx.write(path, data);
557         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
558         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
559
560         doAnswer(new Answer<ListenableFuture<Boolean>>() {
561             @Override
562             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
563                 return realCohort.canCommit();
564             }
565         }).when(cohort).canCommit();
566
567         doAnswer(new Answer<ListenableFuture<Void>>() {
568             @Override
569             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
570                 if(preCommit != null) {
571                     return preCommit.apply(realCohort);
572                 } else {
573                     return realCohort.preCommit();
574                 }
575             }
576         }).when(cohort).preCommit();
577
578         doAnswer(new Answer<ListenableFuture<Void>>() {
579             @Override
580             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
581                 return realCohort.commit();
582             }
583         }).when(cohort).commit();
584
585         doAnswer(new Answer<ListenableFuture<Void>>() {
586             @Override
587             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
588                 return realCohort.abort();
589             }
590         }).when(cohort).abort();
591
592         modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
593
594         return cohort;
595     }
596
597     @SuppressWarnings({ "unchecked" })
598     @Test
599     public void testConcurrentThreePhaseCommits() throws Throwable {
600         new ShardTestKit(getSystem()) {{
601             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
602                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
603                     "testConcurrentThreePhaseCommits");
604
605             waitUntilLeader(shard);
606
607             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
608
609             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
610
611             String transactionID1 = "tx1";
612             MutableCompositeModification modification1 = new MutableCompositeModification();
613             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
614                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
615
616             String transactionID2 = "tx2";
617             MutableCompositeModification modification2 = new MutableCompositeModification();
618             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
619                     TestModel.OUTER_LIST_PATH,
620                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
621                     modification2);
622
623             String transactionID3 = "tx3";
624             MutableCompositeModification modification3 = new MutableCompositeModification();
625             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
626                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
627                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
628                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
629                     modification3);
630
631             long timeoutSec = 5;
632             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
633             final Timeout timeout = new Timeout(duration);
634
635             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
636             // by the ShardTransaction.
637
638             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
639                     cohort1, modification1, true), getRef());
640             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
641                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
642             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
643
644             // Send the CanCommitTransaction message for the first Tx.
645
646             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
647             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
648                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
649             assertEquals("Can commit", true, canCommitReply.getCanCommit());
650
651             // Send the ForwardedReadyTransaction for the next 2 Tx's.
652
653             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
654                     cohort2, modification2, true), getRef());
655             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
656
657             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
658                     cohort3, modification3, true), getRef());
659             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
660
661             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
662             // processed after the first Tx completes.
663
664             Future<Object> canCommitFuture1 = Patterns.ask(shard,
665                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
666
667             Future<Object> canCommitFuture2 = Patterns.ask(shard,
668                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
669
670             // Send the CommitTransaction message for the first Tx. After it completes, it should
671             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
672
673             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
674             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
675
676             // Wait for the next 2 Tx's to complete.
677
678             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
679             final CountDownLatch commitLatch = new CountDownLatch(2);
680
681             class OnFutureComplete extends OnComplete<Object> {
682                 private final Class<?> expRespType;
683
684                 OnFutureComplete(final Class<?> expRespType) {
685                     this.expRespType = expRespType;
686                 }
687
688                 @Override
689                 public void onComplete(final Throwable error, final Object resp) {
690                     if(error != null) {
691                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
692                     } else {
693                         try {
694                             assertEquals("Commit response type", expRespType, resp.getClass());
695                             onSuccess(resp);
696                         } catch (Exception e) {
697                             caughtEx.set(e);
698                         }
699                     }
700                 }
701
702                 void onSuccess(final Object resp) throws Exception {
703                 }
704             }
705
706             class OnCommitFutureComplete extends OnFutureComplete {
707                 OnCommitFutureComplete() {
708                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
709                 }
710
711                 @Override
712                 public void onComplete(final Throwable error, final Object resp) {
713                     super.onComplete(error, resp);
714                     commitLatch.countDown();
715                 }
716             }
717
718             class OnCanCommitFutureComplete extends OnFutureComplete {
719                 private final String transactionID;
720
721                 OnCanCommitFutureComplete(final String transactionID) {
722                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
723                     this.transactionID = transactionID;
724                 }
725
726                 @Override
727                 void onSuccess(final Object resp) throws Exception {
728                     CanCommitTransactionReply canCommitReply =
729                             CanCommitTransactionReply.fromSerializable(resp);
730                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
731
732                     Future<Object> commitFuture = Patterns.ask(shard,
733                             new CommitTransaction(transactionID).toSerializable(), timeout);
734                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
735                 }
736             }
737
738             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
739                     getSystem().dispatcher());
740
741             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
742                     getSystem().dispatcher());
743
744             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
745
746             if(caughtEx.get() != null) {
747                 throw caughtEx.get();
748             }
749
750             assertEquals("Commits complete", true, done);
751
752             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
753             inOrder.verify(cohort1).canCommit();
754             inOrder.verify(cohort1).preCommit();
755             inOrder.verify(cohort1).commit();
756             inOrder.verify(cohort2).canCommit();
757             inOrder.verify(cohort2).preCommit();
758             inOrder.verify(cohort2).commit();
759             inOrder.verify(cohort3).canCommit();
760             inOrder.verify(cohort3).preCommit();
761             inOrder.verify(cohort3).commit();
762
763             // Verify data in the data store.
764
765             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
766             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
767             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
768                     outerList.getValue() instanceof Iterable);
769             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
770             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
771                        entry instanceof MapEntryNode);
772             MapEntryNode mapEntry = (MapEntryNode)entry;
773             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
774                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
775             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
776             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
777
778             for(int i = 0; i < 20 * 5; i++) {
779                 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
780                 if(lastLogIndex == 2) {
781                     break;
782                 }
783                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
784             }
785
786             assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
787
788             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
789         }};
790     }
791
792     @Test
793     public void testCommitPhaseFailure() throws Throwable {
794         new ShardTestKit(getSystem()) {{
795             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
796                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
797                     "testCommitPhaseFailure");
798
799             waitUntilLeader(shard);
800
801             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
802             // commit phase.
803
804             String transactionID1 = "tx1";
805             MutableCompositeModification modification1 = new MutableCompositeModification();
806             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
807             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
808             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
809             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
810
811             String transactionID2 = "tx2";
812             MutableCompositeModification modification2 = new MutableCompositeModification();
813             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
814             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
815
816             FiniteDuration duration = duration("5 seconds");
817             final Timeout timeout = new Timeout(duration);
818
819             // Simulate the ForwardedReadyTransaction messages that would be sent
820             // by the ShardTransaction.
821
822             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
823                     cohort1, modification1, true), getRef());
824             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
825
826             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
827                     cohort2, modification2, true), getRef());
828             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
829
830             // Send the CanCommitTransaction message for the first Tx.
831
832             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
833             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
834                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
835             assertEquals("Can commit", true, canCommitReply.getCanCommit());
836
837             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
838             // processed after the first Tx completes.
839
840             Future<Object> canCommitFuture = Patterns.ask(shard,
841                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
842
843             // Send the CommitTransaction message for the first Tx. This should send back an error
844             // and trigger the 2nd Tx to proceed.
845
846             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
847             expectMsgClass(duration, akka.actor.Status.Failure.class);
848
849             // Wait for the 2nd Tx to complete the canCommit phase.
850
851             final CountDownLatch latch = new CountDownLatch(1);
852             canCommitFuture.onComplete(new OnComplete<Object>() {
853                 @Override
854                 public void onComplete(final Throwable t, final Object resp) {
855                     latch.countDown();
856                 }
857             }, getSystem().dispatcher());
858
859             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
860
861             InOrder inOrder = inOrder(cohort1, cohort2);
862             inOrder.verify(cohort1).canCommit();
863             inOrder.verify(cohort1).preCommit();
864             inOrder.verify(cohort1).commit();
865             inOrder.verify(cohort2).canCommit();
866
867             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
868         }};
869     }
870
871     @Test
872     public void testPreCommitPhaseFailure() throws Throwable {
873         new ShardTestKit(getSystem()) {{
874             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
875                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
876                     "testPreCommitPhaseFailure");
877
878             waitUntilLeader(shard);
879
880             String transactionID = "tx1";
881             MutableCompositeModification modification = new MutableCompositeModification();
882             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
883             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
884             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
885
886             FiniteDuration duration = duration("5 seconds");
887
888             // Simulate the ForwardedReadyTransaction messages that would be sent
889             // by the ShardTransaction.
890
891             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
892                     cohort, modification, true), getRef());
893             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
894
895             // Send the CanCommitTransaction message.
896
897             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
898             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
899                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
900             assertEquals("Can commit", true, canCommitReply.getCanCommit());
901
902             // Send the CommitTransaction message. This should send back an error
903             // for preCommit failure.
904
905             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
906             expectMsgClass(duration, akka.actor.Status.Failure.class);
907
908             InOrder inOrder = inOrder(cohort);
909             inOrder.verify(cohort).canCommit();
910             inOrder.verify(cohort).preCommit();
911
912             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
913         }};
914     }
915
916     @Test
917     public void testCanCommitPhaseFailure() throws Throwable {
918         new ShardTestKit(getSystem()) {{
919             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
920                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
921                     "testCanCommitPhaseFailure");
922
923             waitUntilLeader(shard);
924
925             final FiniteDuration duration = duration("5 seconds");
926
927             String transactionID = "tx1";
928             MutableCompositeModification modification = new MutableCompositeModification();
929             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
930             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
931
932             // Simulate the ForwardedReadyTransaction messages that would be sent
933             // by the ShardTransaction.
934
935             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
936                     cohort, modification, true), getRef());
937             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
938
939             // Send the CanCommitTransaction message.
940
941             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
942             expectMsgClass(duration, akka.actor.Status.Failure.class);
943
944             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
945         }};
946     }
947
948     @Test
949     @Ignore("This test will work only if replication is turned on. Needs modification due to optimizations added to Shard/RaftActor.")
950     public void testAbortBeforeFinishCommit() throws Throwable {
951         new ShardTestKit(getSystem()) {{
952             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
953                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
954                     "testAbortBeforeFinishCommit");
955
956             waitUntilLeader(shard);
957
958             final FiniteDuration duration = duration("5 seconds");
959             final Timeout timeout = new Timeout(duration);
960
961             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
962
963             final String transactionID = "tx1";
964             final CountDownLatch abortComplete = new CountDownLatch(1);
965             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
966                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
967                 @Override
968                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
969                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
970
971                     Future<Object> abortFuture = Patterns.ask(shard,
972                             new AbortTransaction(transactionID).toSerializable(), timeout);
973                     abortFuture.onComplete(new OnComplete<Object>() {
974                         @Override
975                         public void onComplete(final Throwable e, final Object resp) {
976                             abortComplete.countDown();
977                         }
978                     }, getSystem().dispatcher());
979
980                     return preCommitFuture;
981                 }
982             };
983
984             MutableCompositeModification modification = new MutableCompositeModification();
985             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
986                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
987                     modification, preCommit);
988
989             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
990                     cohort, modification, true), getRef());
991             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
992
993             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
994             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
995                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
996             assertEquals("Can commit", true, canCommitReply.getCanCommit());
997
998             Future<Object> commitFuture = Patterns.ask(shard,
999                     new CommitTransaction(transactionID).toSerializable(), timeout);
1000
1001             assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
1002
1003             Await.result(commitFuture, duration);
1004
1005             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1006             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1007
1008             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1009         }};
1010     }
1011
1012     @Test
1013     public void testTransactionCommitTimeout() throws Throwable {
1014         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
1015
1016         new ShardTestKit(getSystem()) {{
1017             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1018                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1019                     "testTransactionCommitTimeout");
1020
1021             waitUntilLeader(shard);
1022
1023             final FiniteDuration duration = duration("5 seconds");
1024
1025             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1026
1027             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1028             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1029                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1030
1031             // Create 1st Tx - will timeout
1032
1033             String transactionID1 = "tx1";
1034             MutableCompositeModification modification1 = new MutableCompositeModification();
1035             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1036                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1037                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1038                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1039                     modification1);
1040
1041             // Create 2nd Tx
1042
1043             String transactionID2 = "tx3";
1044             MutableCompositeModification modification2 = new MutableCompositeModification();
1045             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1046                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1047             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1048                     listNodePath,
1049                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1050                     modification2);
1051
1052             // Ready the Tx's
1053
1054             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1055                     cohort1, modification1, true), getRef());
1056             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1057
1058             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1059                     cohort2, modification2, true), getRef());
1060             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1061
1062             // canCommit 1st Tx. We don't send the commit so it should timeout.
1063
1064             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1065             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1066
1067             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1068
1069             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1070             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1071
1072             // Commit the 2nd Tx.
1073
1074             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1075             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1076
1077             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1078             assertNotNull(listNodePath + " not found", node);
1079
1080             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1081         }};
1082     }
1083
1084     @Test
1085     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1086         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1087
1088         new ShardTestKit(getSystem()) {{
1089             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1090                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1091                     "testTransactionCommitQueueCapacityExceeded");
1092
1093             waitUntilLeader(shard);
1094
1095             final FiniteDuration duration = duration("5 seconds");
1096
1097             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1098
1099             String transactionID1 = "tx1";
1100             MutableCompositeModification modification1 = new MutableCompositeModification();
1101             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1102                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1103
1104             String transactionID2 = "tx2";
1105             MutableCompositeModification modification2 = new MutableCompositeModification();
1106             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1107                     TestModel.OUTER_LIST_PATH,
1108                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1109                     modification2);
1110
1111             String transactionID3 = "tx3";
1112             MutableCompositeModification modification3 = new MutableCompositeModification();
1113             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1114                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1115
1116             // Ready the Tx's
1117
1118             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1119                     cohort1, modification1, true), getRef());
1120             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1121
1122             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1123                     cohort2, modification2, true), getRef());
1124             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1125
1126             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1127                     cohort3, modification3, true), getRef());
1128             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1129
1130             // canCommit 1st Tx.
1131
1132             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1133             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1134
1135             // canCommit the 2nd Tx - it should get queued.
1136
1137             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1138
1139             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1140
1141             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1142             expectMsgClass(duration, akka.actor.Status.Failure.class);
1143
1144             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1145         }};
1146     }
1147
1148     @Test
1149     public void testCanCommitBeforeReadyFailure() throws Throwable {
1150         new ShardTestKit(getSystem()) {{
1151             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1152                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1153                     "testCanCommitBeforeReadyFailure");
1154
1155             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1156             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1157
1158             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1159         }};
1160     }
1161
1162     @Test
1163     public void testAbortTransaction() throws Throwable {
1164         new ShardTestKit(getSystem()) {{
1165             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1166                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1167                     "testAbortTransaction");
1168
1169             waitUntilLeader(shard);
1170
1171             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1172
1173             String transactionID1 = "tx1";
1174             MutableCompositeModification modification1 = new MutableCompositeModification();
1175             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1176             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1177             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1178
1179             String transactionID2 = "tx2";
1180             MutableCompositeModification modification2 = new MutableCompositeModification();
1181             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1182             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1183
1184             FiniteDuration duration = duration("5 seconds");
1185             final Timeout timeout = new Timeout(duration);
1186
1187             // Simulate the ForwardedReadyTransaction messages that would be sent
1188             // by the ShardTransaction.
1189
1190             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1191                     cohort1, modification1, true), getRef());
1192             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1193
1194             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1195                     cohort2, modification2, true), getRef());
1196             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1197
1198             // Send the CanCommitTransaction message for the first Tx.
1199
1200             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1201             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1202                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1203             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1204
1205             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1206             // processed after the first Tx completes.
1207
1208             Future<Object> canCommitFuture = Patterns.ask(shard,
1209                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1210
1211             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1212             // Tx to proceed.
1213
1214             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1215             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1216
1217             // Wait for the 2nd Tx to complete the canCommit phase.
1218
1219             final CountDownLatch latch = new CountDownLatch(1);
1220             canCommitFuture.onComplete(new OnComplete<Object>() {
1221                 @Override
1222                 public void onComplete(final Throwable t, final Object resp) {
1223                     latch.countDown();
1224                 }
1225             }, getSystem().dispatcher());
1226
1227             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1228
1229             InOrder inOrder = inOrder(cohort1, cohort2);
1230             inOrder.verify(cohort1).canCommit();
1231             inOrder.verify(cohort2).canCommit();
1232
1233             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1234         }};
1235     }
1236
1237     @Test
1238     public void testCreateSnapshot() throws IOException, InterruptedException {
1239             testCreateSnapshot(true, "testCreateSnapshot");
1240     }
1241
1242     @Test
1243     public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1244         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1245     }
1246
1247     @SuppressWarnings("serial")
1248     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1249         final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1250                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1251
1252         new ShardTestKit(getSystem()) {{
1253             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1254             Creator<Shard> creator = new Creator<Shard>() {
1255                 @Override
1256                 public Shard create() throws Exception {
1257                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1258                             dataStoreContext, SCHEMA_CONTEXT) {
1259                         @Override
1260                         protected void commitSnapshot(final long sequenceNumber) {
1261                             super.commitSnapshot(sequenceNumber);
1262                             latch.get().countDown();
1263                         }
1264                     };
1265                 }
1266             };
1267
1268             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1269                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1270
1271             waitUntilLeader(shard);
1272
1273             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1274
1275             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1276
1277             latch.set(new CountDownLatch(1));
1278             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1279
1280             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1281
1282             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1283         }};
1284     }
1285
1286     /**
1287      * This test simply verifies that the applySnapShot logic will work
1288      * @throws ReadFailedException
1289      */
1290     @Test
1291     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1292         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1293
1294         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1295
1296         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1297         putTransaction.write(TestModel.TEST_PATH,
1298             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1299         commitTransaction(putTransaction);
1300
1301
1302         NormalizedNode<?, ?> expected = readStore(store);
1303
1304         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1305
1306         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1307         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1308
1309         commitTransaction(writeTransaction);
1310
1311         NormalizedNode<?, ?> actual = readStore(store);
1312
1313         assertEquals(expected, actual);
1314     }
1315
1316     @Test
1317     public void testRecoveryApplicable(){
1318
1319         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1320                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1321
1322         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1323                 persistentContext, SCHEMA_CONTEXT);
1324
1325         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1326                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1327
1328         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1329                 nonPersistentContext, SCHEMA_CONTEXT);
1330
1331         new ShardTestKit(getSystem()) {{
1332             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1333                     persistentProps, "testPersistence1");
1334
1335             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1336
1337             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1338
1339             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1340                     nonPersistentProps, "testPersistence2");
1341
1342             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1343
1344             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1345
1346         }};
1347
1348     }
1349
1350
1351     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1352         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1353         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1354             transaction.read(YangInstanceIdentifier.builder().build());
1355
1356         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1357
1358         NormalizedNode<?, ?> normalizedNode = optional.get();
1359
1360         transaction.close();
1361
1362         return normalizedNode;
1363     }
1364
1365     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1366         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1367         ListenableFuture<Void> future =
1368             commitCohort.preCommit();
1369         try {
1370             future.get();
1371             future = commitCohort.commit();
1372             future.get();
1373         } catch (InterruptedException | ExecutionException e) {
1374         }
1375     }
1376
1377     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1378         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1379             @Override
1380             public void onDataChanged(
1381                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1382
1383             }
1384         };
1385     }
1386
1387     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1388             throws ExecutionException, InterruptedException {
1389         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1390
1391         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1392             transaction.read(id);
1393
1394         Optional<NormalizedNode<?, ?>> optional = future.get();
1395         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1396
1397         transaction.close();
1398
1399         return node;
1400     }
1401
1402     private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
1403         throws ExecutionException, InterruptedException {
1404         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1405
1406         transaction.write(id, node);
1407
1408         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1409         commitCohort.preCommit().get();
1410         commitCohort.commit().get();
1411     }
1412
1413     @SuppressWarnings("serial")
1414     private static final class DelegatingShardCreator implements Creator<Shard> {
1415         private final Creator<Shard> delegate;
1416
1417         DelegatingShardCreator(final Creator<Shard> delegate) {
1418             this.delegate = delegate;
1419         }
1420
1421         @Override
1422         public Shard create() throws Exception {
1423             return delegate.create();
1424         }
1425     }
1426 }