Bug 2265: Use new NormalizedNode streaming in messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.mockito.InOrder;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
60 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
61 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
62 import org.opendaylight.controller.cluster.datastore.modification.Modification;
63 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
64 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
65 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
66 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
67 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
68 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
71 import org.opendaylight.controller.cluster.raft.Snapshot;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
74 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
75 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
76 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
78 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
79 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
80 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
81 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
82 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
83 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
84 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
85 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
86 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
87 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
88 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
89 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
90 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
91 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
92 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
93 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
95 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
97 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
98 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
100 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
101 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
102 import scala.concurrent.Await;
103 import scala.concurrent.Future;
104 import scala.concurrent.duration.FiniteDuration;
105
106 public class ShardTest extends AbstractActorTest {
107
108     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
109
110     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
111
112     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
113             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
114
115     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
116             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
117             shardHeartbeatIntervalInMillis(100);
118
119     @Before
120     public void setUp() {
121         Builder newBuilder = DatastoreContext.newBuilder();
122         InMemorySnapshotStore.clear();
123         InMemoryJournal.clear();
124     }
125
126     @After
127     public void tearDown() {
128         InMemorySnapshotStore.clear();
129         InMemoryJournal.clear();
130     }
131
132     private DatastoreContext newDatastoreContext() {
133         return dataStoreContextBuilder.build();
134     }
135
136     private Props newShardProps() {
137         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
138                 newDatastoreContext(), SCHEMA_CONTEXT);
139     }
140
141     @Test
142     public void testRegisterChangeListener() throws Exception {
143         new ShardTestKit(getSystem()) {{
144             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
145                     newShardProps(),  "testRegisterChangeListener");
146
147             waitUntilLeader(shard);
148
149             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
150
151             MockDataChangeListener listener = new MockDataChangeListener(1);
152             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
153                     "testRegisterChangeListener-DataChangeListener");
154
155             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
156                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
157
158             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
159                     RegisterChangeListenerReply.class);
160             String replyPath = reply.getListenerRegistrationPath().toString();
161             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
162                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
163
164             YangInstanceIdentifier path = TestModel.TEST_PATH;
165             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
166
167             listener.waitForChangeEvents(path);
168
169             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
170             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
171         }};
172     }
173
174     @SuppressWarnings("serial")
175     @Test
176     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
177         // This test tests the timing window in which a change listener is registered before the
178         // shard becomes the leader. We verify that the listener is registered and notified of the
179         // existing data when the shard becomes the leader.
180         new ShardTestKit(getSystem()) {{
181             // For this test, we want to send the RegisterChangeListener message after the shard
182             // has recovered from persistence and before it becomes the leader. So we subclass
183             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
184             // we know that the shard has been initialized to a follower and has started the
185             // election process. The following 2 CountDownLatches are used to coordinate the
186             // ElectionTimeout with the sending of the RegisterChangeListener message.
187             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
188             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
189             Creator<Shard> creator = new Creator<Shard>() {
190                 boolean firstElectionTimeout = true;
191
192                 @Override
193                 public Shard create() throws Exception {
194                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
195                             newDatastoreContext(), SCHEMA_CONTEXT) {
196                         @Override
197                         public void onReceiveCommand(final Object message) throws Exception {
198                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
199                                 // Got the first ElectionTimeout. We don't forward it to the
200                                 // base Shard yet until we've sent the RegisterChangeListener
201                                 // message. So we signal the onFirstElectionTimeout latch to tell
202                                 // the main thread to send the RegisterChangeListener message and
203                                 // start a thread to wait on the onChangeListenerRegistered latch,
204                                 // which the main thread signals after it has sent the message.
205                                 // After the onChangeListenerRegistered is triggered, we send the
206                                 // original ElectionTimeout message to proceed with the election.
207                                 firstElectionTimeout = false;
208                                 final ActorRef self = getSelf();
209                                 new Thread() {
210                                     @Override
211                                     public void run() {
212                                         Uninterruptibles.awaitUninterruptibly(
213                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
214                                         self.tell(message, self);
215                                     }
216                                 }.start();
217
218                                 onFirstElectionTimeout.countDown();
219                             } else {
220                                 super.onReceiveCommand(message);
221                             }
222                         }
223                     };
224                 }
225             };
226
227             MockDataChangeListener listener = new MockDataChangeListener(1);
228             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
229                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
230
231             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
232                     Props.create(new DelegatingShardCreator(creator)),
233                     "testRegisterChangeListenerWhenNotLeaderInitially");
234
235             // Write initial data into the in-memory store.
236             YangInstanceIdentifier path = TestModel.TEST_PATH;
237             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
238
239             // Wait until the shard receives the first ElectionTimeout message.
240             assertEquals("Got first ElectionTimeout", true,
241                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
242
243             // Now send the RegisterChangeListener and wait for the reply.
244             shard.tell(new RegisterChangeListener(path, dclActor.path(),
245                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
246
247             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
248                     RegisterChangeListenerReply.class);
249             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
250
251             // Sanity check - verify the shard is not the leader yet.
252             shard.tell(new FindLeader(), getRef());
253             FindLeaderReply findLeadeReply =
254                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
255             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
256
257             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
258             // with the election process.
259             onChangeListenerRegistered.countDown();
260
261             // Wait for the shard to become the leader and notify our listener with the existing
262             // data in the store.
263             listener.waitForChangeEvents(path);
264
265             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
266             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
267         }};
268     }
269
270     @Test
271     public void testCreateTransaction(){
272         new ShardTestKit(getSystem()) {{
273             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
274
275             waitUntilLeader(shard);
276
277             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
278
279             shard.tell(new CreateTransaction("txn-1",
280                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
281
282             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
283                     CreateTransactionReply.class);
284
285             String path = reply.getTransactionActorPath().toString();
286             assertTrue("Unexpected transaction path " + path,
287                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
288
289             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
290         }};
291     }
292
293     @Test
294     public void testCreateTransactionOnChain(){
295         new ShardTestKit(getSystem()) {{
296             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
297
298             waitUntilLeader(shard);
299
300             shard.tell(new CreateTransaction("txn-1",
301                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
302                     getRef());
303
304             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
305                     CreateTransactionReply.class);
306
307             String path = reply.getTransactionActorPath().toString();
308             assertTrue("Unexpected transaction path " + path,
309                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
310
311             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
312         }};
313     }
314
315     @SuppressWarnings("serial")
316     @Test
317     public void testPeerAddressResolved() throws Exception {
318         new ShardTestKit(getSystem()) {{
319             final CountDownLatch recoveryComplete = new CountDownLatch(1);
320             class TestShard extends Shard {
321                 TestShard() {
322                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
323                             newDatastoreContext(), SCHEMA_CONTEXT);
324                 }
325
326                 Map<String, String> getPeerAddresses() {
327                     return getRaftActorContext().getPeerAddresses();
328                 }
329
330                 @Override
331                 protected void onRecoveryComplete() {
332                     try {
333                         super.onRecoveryComplete();
334                     } finally {
335                         recoveryComplete.countDown();
336                     }
337                 }
338             }
339
340             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
341                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
342                         @Override
343                         public TestShard create() throws Exception {
344                             return new TestShard();
345                         }
346                     })), "testPeerAddressResolved");
347
348             //waitUntilLeader(shard);
349             assertEquals("Recovery complete", true,
350                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
351
352             String address = "akka://foobar";
353             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
354
355             assertEquals("getPeerAddresses", address,
356                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
357
358             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
359         }};
360     }
361
362     @Test
363     public void testApplySnapshot() throws Exception {
364         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
365                 "testApplySnapshot");
366
367         NormalizedNodeToNodeCodec codec =
368             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
369
370         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
371
372         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
373         NormalizedNode<?,?> expected = readStore(shard, root);
374
375         NormalizedNodeMessages.Container encode = codec.encode(expected);
376
377         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
378                 encode.getNormalizedNode().toByteString().toByteArray(),
379                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
380
381         shard.underlyingActor().onReceiveCommand(applySnapshot);
382
383         NormalizedNode<?,?> actual = readStore(shard, root);
384
385         assertEquals(expected, actual);
386
387         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
388     }
389
390     @Test
391     public void testApplyState() throws Exception {
392
393         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
394
395         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
396
397         MutableCompositeModification compMod = new MutableCompositeModification();
398         compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
399         Payload payload = new CompositeModificationPayload(compMod.toSerializable());
400         ApplyState applyState = new ApplyState(null, "test",
401                 new ReplicatedLogImplEntry(1, 2, payload));
402
403         shard.underlyingActor().onReceiveCommand(applyState);
404
405         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
406         assertEquals("Applied state", node, actual);
407
408         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
409     }
410
411     @SuppressWarnings("serial")
412     @Test
413     public void testRecovery() throws Exception {
414
415         // Set up the InMemorySnapshotStore.
416
417         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
418         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
419
420         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
421         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
422         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
423         commitCohort.preCommit().get();
424         commitCohort.commit().get();
425
426         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
427         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
428
429         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
430                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
431                         root).
432                                 getNormalizedNode().toByteString().toByteArray(),
433                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
434
435         // Set up the InMemoryJournal.
436
437         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
438                   new WriteModification(TestModel.OUTER_LIST_PATH,
439                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
440                           SCHEMA_CONTEXT))));
441
442         int nListEntries = 16;
443         Set<Integer> listEntryKeys = new HashSet<>();
444         for(int i = 1; i <= nListEntries-5; i++) {
445             listEntryKeys.add(Integer.valueOf(i));
446             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
447                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
448             Modification mod = new MergeModification(path,
449                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
450                     SCHEMA_CONTEXT);
451             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
452                     newPayload(mod)));
453         }
454
455         // Add some of the new CompositeModificationByteStringPayload
456         for(int i = 11; i <= nListEntries; i++) {
457             listEntryKeys.add(Integer.valueOf(i));
458             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
459                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
460             Modification mod = new MergeModification(path,
461                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
462                     SCHEMA_CONTEXT);
463             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
464                     newByteStringPayload(mod)));
465         }
466
467
468         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
469                 new ApplyLogEntries(nListEntries));
470
471         // Create the actor and wait for recovery complete.
472
473         final CountDownLatch recoveryComplete = new CountDownLatch(1);
474
475         Creator<Shard> creator = new Creator<Shard>() {
476             @Override
477             public Shard create() throws Exception {
478                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
479                         newDatastoreContext(), SCHEMA_CONTEXT) {
480                     @Override
481                     protected void onRecoveryComplete() {
482                         try {
483                             super.onRecoveryComplete();
484                         } finally {
485                             recoveryComplete.countDown();
486                         }
487                     }
488                 };
489             }
490         };
491
492         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
493                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
494
495         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
496
497         // Verify data in the data store.
498
499         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
500         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
501         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
502                 outerList.getValue() instanceof Iterable);
503         for(Object entry: (Iterable<?>) outerList.getValue()) {
504             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
505                     entry instanceof MapEntryNode);
506             MapEntryNode mapEntry = (MapEntryNode)entry;
507             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
508                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
509             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
510             Object value = idLeaf.get().getValue();
511             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
512                     listEntryKeys.remove(value));
513         }
514
515         if(!listEntryKeys.isEmpty()) {
516             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
517                     listEntryKeys);
518         }
519
520         assertEquals("Last log index", nListEntries,
521                 shard.underlyingActor().getShardMBean().getLastLogIndex());
522         assertEquals("Commit index", nListEntries,
523                 shard.underlyingActor().getShardMBean().getCommitIndex());
524         assertEquals("Last applied", nListEntries,
525                 shard.underlyingActor().getShardMBean().getLastApplied());
526
527         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
528     }
529
530     private CompositeModificationPayload newPayload(final Modification... mods) {
531         MutableCompositeModification compMod = new MutableCompositeModification();
532         for(Modification mod: mods) {
533             compMod.addModification(mod);
534         }
535
536         return new CompositeModificationPayload(compMod.toSerializable());
537     }
538
539     private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
540         MutableCompositeModification compMod = new MutableCompositeModification();
541         for(Modification mod: mods) {
542             compMod.addModification(mod);
543         }
544
545         return new CompositeModificationByteStringPayload(compMod.toSerializable());
546     }
547
548
549     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
550             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
551             final MutableCompositeModification modification) {
552         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
553     }
554
555     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
556             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
557             final MutableCompositeModification modification,
558             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
559
560         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
561         tx.write(path, data);
562         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
563         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
564
565         doAnswer(new Answer<ListenableFuture<Boolean>>() {
566             @Override
567             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
568                 return realCohort.canCommit();
569             }
570         }).when(cohort).canCommit();
571
572         doAnswer(new Answer<ListenableFuture<Void>>() {
573             @Override
574             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
575                 if(preCommit != null) {
576                     return preCommit.apply(realCohort);
577                 } else {
578                     return realCohort.preCommit();
579                 }
580             }
581         }).when(cohort).preCommit();
582
583         doAnswer(new Answer<ListenableFuture<Void>>() {
584             @Override
585             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
586                 return realCohort.commit();
587             }
588         }).when(cohort).commit();
589
590         doAnswer(new Answer<ListenableFuture<Void>>() {
591             @Override
592             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
593                 return realCohort.abort();
594             }
595         }).when(cohort).abort();
596
597         modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
598
599         return cohort;
600     }
601
602     @SuppressWarnings({ "unchecked" })
603     @Test
604     public void testConcurrentThreePhaseCommits() throws Throwable {
605         new ShardTestKit(getSystem()) {{
606             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
607                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
608                     "testConcurrentThreePhaseCommits");
609
610             waitUntilLeader(shard);
611
612             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
613
614             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
615
616             String transactionID1 = "tx1";
617             MutableCompositeModification modification1 = new MutableCompositeModification();
618             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
619                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
620
621             String transactionID2 = "tx2";
622             MutableCompositeModification modification2 = new MutableCompositeModification();
623             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
624                     TestModel.OUTER_LIST_PATH,
625                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
626                     modification2);
627
628             String transactionID3 = "tx3";
629             MutableCompositeModification modification3 = new MutableCompositeModification();
630             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
631                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
632                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
633                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
634                     modification3);
635
636             long timeoutSec = 5;
637             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
638             final Timeout timeout = new Timeout(duration);
639
640             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
641             // by the ShardTransaction.
642
643             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
644                     cohort1, modification1, true), getRef());
645             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
646                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
647             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
648
649             // Send the CanCommitTransaction message for the first Tx.
650
651             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
652             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
653                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
654             assertEquals("Can commit", true, canCommitReply.getCanCommit());
655
656             // Send the ForwardedReadyTransaction for the next 2 Tx's.
657
658             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
659                     cohort2, modification2, true), getRef());
660             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
661
662             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
663                     cohort3, modification3, true), getRef());
664             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
665
666             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
667             // processed after the first Tx completes.
668
669             Future<Object> canCommitFuture1 = Patterns.ask(shard,
670                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
671
672             Future<Object> canCommitFuture2 = Patterns.ask(shard,
673                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
674
675             // Send the CommitTransaction message for the first Tx. After it completes, it should
676             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
677
678             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
679             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
680
681             // Wait for the next 2 Tx's to complete.
682
683             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
684             final CountDownLatch commitLatch = new CountDownLatch(2);
685
686             class OnFutureComplete extends OnComplete<Object> {
687                 private final Class<?> expRespType;
688
689                 OnFutureComplete(final Class<?> expRespType) {
690                     this.expRespType = expRespType;
691                 }
692
693                 @Override
694                 public void onComplete(final Throwable error, final Object resp) {
695                     if(error != null) {
696                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
697                     } else {
698                         try {
699                             assertEquals("Commit response type", expRespType, resp.getClass());
700                             onSuccess(resp);
701                         } catch (Exception e) {
702                             caughtEx.set(e);
703                         }
704                     }
705                 }
706
707                 void onSuccess(final Object resp) throws Exception {
708                 }
709             }
710
711             class OnCommitFutureComplete extends OnFutureComplete {
712                 OnCommitFutureComplete() {
713                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
714                 }
715
716                 @Override
717                 public void onComplete(final Throwable error, final Object resp) {
718                     super.onComplete(error, resp);
719                     commitLatch.countDown();
720                 }
721             }
722
723             class OnCanCommitFutureComplete extends OnFutureComplete {
724                 private final String transactionID;
725
726                 OnCanCommitFutureComplete(final String transactionID) {
727                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
728                     this.transactionID = transactionID;
729                 }
730
731                 @Override
732                 void onSuccess(final Object resp) throws Exception {
733                     CanCommitTransactionReply canCommitReply =
734                             CanCommitTransactionReply.fromSerializable(resp);
735                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
736
737                     Future<Object> commitFuture = Patterns.ask(shard,
738                             new CommitTransaction(transactionID).toSerializable(), timeout);
739                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
740                 }
741             }
742
743             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
744                     getSystem().dispatcher());
745
746             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
747                     getSystem().dispatcher());
748
749             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
750
751             if(caughtEx.get() != null) {
752                 throw caughtEx.get();
753             }
754
755             assertEquals("Commits complete", true, done);
756
757             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
758             inOrder.verify(cohort1).canCommit();
759             inOrder.verify(cohort1).preCommit();
760             inOrder.verify(cohort1).commit();
761             inOrder.verify(cohort2).canCommit();
762             inOrder.verify(cohort2).preCommit();
763             inOrder.verify(cohort2).commit();
764             inOrder.verify(cohort3).canCommit();
765             inOrder.verify(cohort3).preCommit();
766             inOrder.verify(cohort3).commit();
767
768             // Verify data in the data store.
769
770             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
771             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
772             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
773                     outerList.getValue() instanceof Iterable);
774             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
775             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
776                        entry instanceof MapEntryNode);
777             MapEntryNode mapEntry = (MapEntryNode)entry;
778             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
779                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
780             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
781             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
782
783             verifyLastLogIndex(shard, 2);
784
785             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
786         }};
787     }
788
789     private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
790         for(int i = 0; i < 20 * 5; i++) {
791             long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
792             if(lastLogIndex == expectedValue) {
793                 break;
794             }
795             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
796         }
797
798         assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
799     }
800
801     @Test
802     public void testCommitWithPersistenceDisabled() throws Throwable {
803         dataStoreContextBuilder.persistent(false);
804         new ShardTestKit(getSystem()) {{
805             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
806                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
807                     "testCommitPhaseFailure");
808
809             waitUntilLeader(shard);
810
811             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
812
813             // Setup a simulated transactions with a mock cohort.
814
815             String transactionID = "tx";
816             MutableCompositeModification modification = new MutableCompositeModification();
817             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
818             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
819                     TestModel.TEST_PATH, containerNode, modification);
820
821             FiniteDuration duration = duration("5 seconds");
822
823             // Simulate the ForwardedReadyTransaction messages that would be sent
824             // by the ShardTransaction.
825
826             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
827                     cohort, modification, true), getRef());
828             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
829
830             // Send the CanCommitTransaction message.
831
832             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
833             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
834                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
835             assertEquals("Can commit", true, canCommitReply.getCanCommit());
836
837             // Send the CanCommitTransaction message.
838
839             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
840             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
841
842             InOrder inOrder = inOrder(cohort);
843             inOrder.verify(cohort).canCommit();
844             inOrder.verify(cohort).preCommit();
845             inOrder.verify(cohort).commit();
846
847             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
848             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
849
850             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
851         }};
852     }
853
854     @Test
855     public void testCommitPhaseFailure() throws Throwable {
856         new ShardTestKit(getSystem()) {{
857             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
858                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
859                     "testCommitPhaseFailure");
860
861             waitUntilLeader(shard);
862
863             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
864             // commit phase.
865
866             String transactionID1 = "tx1";
867             MutableCompositeModification modification1 = new MutableCompositeModification();
868             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
869             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
870             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
871             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
872
873             String transactionID2 = "tx2";
874             MutableCompositeModification modification2 = new MutableCompositeModification();
875             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
876             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
877
878             FiniteDuration duration = duration("5 seconds");
879             final Timeout timeout = new Timeout(duration);
880
881             // Simulate the ForwardedReadyTransaction messages that would be sent
882             // by the ShardTransaction.
883
884             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
885                     cohort1, modification1, true), getRef());
886             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
887
888             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
889                     cohort2, modification2, true), getRef());
890             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
891
892             // Send the CanCommitTransaction message for the first Tx.
893
894             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
895             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
896                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
897             assertEquals("Can commit", true, canCommitReply.getCanCommit());
898
899             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
900             // processed after the first Tx completes.
901
902             Future<Object> canCommitFuture = Patterns.ask(shard,
903                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
904
905             // Send the CommitTransaction message for the first Tx. This should send back an error
906             // and trigger the 2nd Tx to proceed.
907
908             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
909             expectMsgClass(duration, akka.actor.Status.Failure.class);
910
911             // Wait for the 2nd Tx to complete the canCommit phase.
912
913             final CountDownLatch latch = new CountDownLatch(1);
914             canCommitFuture.onComplete(new OnComplete<Object>() {
915                 @Override
916                 public void onComplete(final Throwable t, final Object resp) {
917                     latch.countDown();
918                 }
919             }, getSystem().dispatcher());
920
921             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
922
923             InOrder inOrder = inOrder(cohort1, cohort2);
924             inOrder.verify(cohort1).canCommit();
925             inOrder.verify(cohort1).preCommit();
926             inOrder.verify(cohort1).commit();
927             inOrder.verify(cohort2).canCommit();
928
929             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
930         }};
931     }
932
933     @Test
934     public void testPreCommitPhaseFailure() throws Throwable {
935         new ShardTestKit(getSystem()) {{
936             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
937                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
938                     "testPreCommitPhaseFailure");
939
940             waitUntilLeader(shard);
941
942             String transactionID = "tx1";
943             MutableCompositeModification modification = new MutableCompositeModification();
944             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
945             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
946             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
947
948             FiniteDuration duration = duration("5 seconds");
949
950             // Simulate the ForwardedReadyTransaction messages that would be sent
951             // by the ShardTransaction.
952
953             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
954                     cohort, modification, true), getRef());
955             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
956
957             // Send the CanCommitTransaction message.
958
959             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
960             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
961                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
962             assertEquals("Can commit", true, canCommitReply.getCanCommit());
963
964             // Send the CommitTransaction message. This should send back an error
965             // for preCommit failure.
966
967             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
968             expectMsgClass(duration, akka.actor.Status.Failure.class);
969
970             InOrder inOrder = inOrder(cohort);
971             inOrder.verify(cohort).canCommit();
972             inOrder.verify(cohort).preCommit();
973
974             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
975         }};
976     }
977
978     @Test
979     public void testCanCommitPhaseFailure() throws Throwable {
980         new ShardTestKit(getSystem()) {{
981             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
982                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
983                     "testCanCommitPhaseFailure");
984
985             waitUntilLeader(shard);
986
987             final FiniteDuration duration = duration("5 seconds");
988
989             String transactionID = "tx1";
990             MutableCompositeModification modification = new MutableCompositeModification();
991             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
992             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
993
994             // Simulate the ForwardedReadyTransaction messages that would be sent
995             // by the ShardTransaction.
996
997             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
998                     cohort, modification, true), getRef());
999             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1000
1001             // Send the CanCommitTransaction message.
1002
1003             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1004             expectMsgClass(duration, akka.actor.Status.Failure.class);
1005
1006             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1007         }};
1008     }
1009
1010     @Test
1011     public void testAbortBeforeFinishCommit() throws Throwable {
1012         new ShardTestKit(getSystem()) {{
1013             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1014                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1015                     "testAbortBeforeFinishCommit");
1016
1017             waitUntilLeader(shard);
1018
1019             final FiniteDuration duration = duration("5 seconds");
1020             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1021
1022             final String transactionID = "tx1";
1023             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1024                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1025                 @Override
1026                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1027                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1028
1029                     // Simulate an AbortTransaction message occurring during replication, after
1030                     // persisting and before finishing the commit to the in-memory store.
1031                     // We have no followers so due to optimizations in the RaftActor, it does not
1032                     // attempt replication and thus we can't send an AbortTransaction message b/c
1033                     // it would be processed too late after CommitTransaction completes. So we'll
1034                     // simulate an AbortTransaction message occurring during replication by calling
1035                     // the shard directly.
1036                     //
1037                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1038
1039                     return preCommitFuture;
1040                 }
1041             };
1042
1043             MutableCompositeModification modification = new MutableCompositeModification();
1044             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1045                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1046                     modification, preCommit);
1047
1048             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1049                     cohort, modification, true), getRef());
1050             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1051
1052             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1053             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1054                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1055             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1056
1057             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1058             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1059
1060             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1061
1062             // Since we're simulating an abort occurring during replication and before finish commit,
1063             // the data should still get written to the in-memory store since we've gotten past
1064             // canCommit and preCommit and persisted the data.
1065             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1066
1067             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1068         }};
1069     }
1070
1071     @Test
1072     public void testTransactionCommitTimeout() throws Throwable {
1073         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1074
1075         new ShardTestKit(getSystem()) {{
1076             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1077                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1078                     "testTransactionCommitTimeout");
1079
1080             waitUntilLeader(shard);
1081
1082             final FiniteDuration duration = duration("5 seconds");
1083
1084             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1085
1086             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1087             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1088                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1089
1090             // Create 1st Tx - will timeout
1091
1092             String transactionID1 = "tx1";
1093             MutableCompositeModification modification1 = new MutableCompositeModification();
1094             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1095                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1096                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1097                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1098                     modification1);
1099
1100             // Create 2nd Tx
1101
1102             String transactionID2 = "tx3";
1103             MutableCompositeModification modification2 = new MutableCompositeModification();
1104             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1105                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1106             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1107                     listNodePath,
1108                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1109                     modification2);
1110
1111             // Ready the Tx's
1112
1113             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1114                     cohort1, modification1, true), getRef());
1115             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1116
1117             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1118                     cohort2, modification2, true), getRef());
1119             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1120
1121             // canCommit 1st Tx. We don't send the commit so it should timeout.
1122
1123             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1124             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1125
1126             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1127
1128             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1129             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1130
1131             // Commit the 2nd Tx.
1132
1133             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1134             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1135
1136             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1137             assertNotNull(listNodePath + " not found", node);
1138
1139             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1140         }};
1141     }
1142
1143     @Test
1144     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1145         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1146
1147         new ShardTestKit(getSystem()) {{
1148             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1149                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1150                     "testTransactionCommitQueueCapacityExceeded");
1151
1152             waitUntilLeader(shard);
1153
1154             final FiniteDuration duration = duration("5 seconds");
1155
1156             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1157
1158             String transactionID1 = "tx1";
1159             MutableCompositeModification modification1 = new MutableCompositeModification();
1160             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1161                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1162
1163             String transactionID2 = "tx2";
1164             MutableCompositeModification modification2 = new MutableCompositeModification();
1165             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1166                     TestModel.OUTER_LIST_PATH,
1167                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1168                     modification2);
1169
1170             String transactionID3 = "tx3";
1171             MutableCompositeModification modification3 = new MutableCompositeModification();
1172             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1173                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1174
1175             // Ready the Tx's
1176
1177             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1178                     cohort1, modification1, true), getRef());
1179             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1180
1181             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1182                     cohort2, modification2, true), getRef());
1183             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1184
1185             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1186                     cohort3, modification3, true), getRef());
1187             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1188
1189             // canCommit 1st Tx.
1190
1191             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1192             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1193
1194             // canCommit the 2nd Tx - it should get queued.
1195
1196             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1197
1198             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1199
1200             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1201             expectMsgClass(duration, akka.actor.Status.Failure.class);
1202
1203             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1204         }};
1205     }
1206
1207     @Test
1208     public void testCanCommitBeforeReadyFailure() throws Throwable {
1209         new ShardTestKit(getSystem()) {{
1210             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1211                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1212                     "testCanCommitBeforeReadyFailure");
1213
1214             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1215             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1216
1217             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1218         }};
1219     }
1220
1221     @Test
1222     public void testAbortTransaction() throws Throwable {
1223         new ShardTestKit(getSystem()) {{
1224             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1225                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1226                     "testAbortTransaction");
1227
1228             waitUntilLeader(shard);
1229
1230             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1231
1232             String transactionID1 = "tx1";
1233             MutableCompositeModification modification1 = new MutableCompositeModification();
1234             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1235             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1236             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1237
1238             String transactionID2 = "tx2";
1239             MutableCompositeModification modification2 = new MutableCompositeModification();
1240             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1241             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1242
1243             FiniteDuration duration = duration("5 seconds");
1244             final Timeout timeout = new Timeout(duration);
1245
1246             // Simulate the ForwardedReadyTransaction messages that would be sent
1247             // by the ShardTransaction.
1248
1249             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1250                     cohort1, modification1, true), getRef());
1251             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1252
1253             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1254                     cohort2, modification2, true), getRef());
1255             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1256
1257             // Send the CanCommitTransaction message for the first Tx.
1258
1259             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1260             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1261                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1262             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1263
1264             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1265             // processed after the first Tx completes.
1266
1267             Future<Object> canCommitFuture = Patterns.ask(shard,
1268                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1269
1270             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1271             // Tx to proceed.
1272
1273             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1274             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1275
1276             // Wait for the 2nd Tx to complete the canCommit phase.
1277
1278             Await.ready(canCommitFuture, duration);
1279
1280             InOrder inOrder = inOrder(cohort1, cohort2);
1281             inOrder.verify(cohort1).canCommit();
1282             inOrder.verify(cohort2).canCommit();
1283
1284             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1285         }};
1286     }
1287
1288     @Test
1289     public void testCreateSnapshot() throws IOException, InterruptedException {
1290             testCreateSnapshot(true, "testCreateSnapshot");
1291     }
1292
1293     @Test
1294     public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1295         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1296     }
1297
1298     @SuppressWarnings("serial")
1299     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1300         final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1301                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1302
1303         new ShardTestKit(getSystem()) {{
1304             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1305             Creator<Shard> creator = new Creator<Shard>() {
1306                 @Override
1307                 public Shard create() throws Exception {
1308                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1309                             newDatastoreContext(), SCHEMA_CONTEXT) {
1310                         @Override
1311                         protected void commitSnapshot(final long sequenceNumber) {
1312                             super.commitSnapshot(sequenceNumber);
1313                             latch.get().countDown();
1314                         }
1315                     };
1316                 }
1317             };
1318
1319             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1320                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1321
1322             waitUntilLeader(shard);
1323
1324             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1325
1326             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1327
1328             latch.set(new CountDownLatch(1));
1329             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1330
1331             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1332
1333             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1334         }};
1335     }
1336
1337     /**
1338      * This test simply verifies that the applySnapShot logic will work
1339      * @throws ReadFailedException
1340      */
1341     @Test
1342     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1343         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1344
1345         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1346
1347         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1348         putTransaction.write(TestModel.TEST_PATH,
1349             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1350         commitTransaction(putTransaction);
1351
1352
1353         NormalizedNode<?, ?> expected = readStore(store);
1354
1355         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1356
1357         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1358         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1359
1360         commitTransaction(writeTransaction);
1361
1362         NormalizedNode<?, ?> actual = readStore(store);
1363
1364         assertEquals(expected, actual);
1365     }
1366
1367     @Test
1368     public void testRecoveryApplicable(){
1369
1370         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1371                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1372
1373         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1374                 persistentContext, SCHEMA_CONTEXT);
1375
1376         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1377                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1378
1379         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1380                 nonPersistentContext, SCHEMA_CONTEXT);
1381
1382         new ShardTestKit(getSystem()) {{
1383             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1384                     persistentProps, "testPersistence1");
1385
1386             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1387
1388             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1389
1390             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1391                     nonPersistentProps, "testPersistence2");
1392
1393             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1394
1395             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1396
1397         }};
1398
1399     }
1400
1401
1402     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1403         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1404         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1405             transaction.read(YangInstanceIdentifier.builder().build());
1406
1407         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1408
1409         NormalizedNode<?, ?> normalizedNode = optional.get();
1410
1411         transaction.close();
1412
1413         return normalizedNode;
1414     }
1415
1416     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1417         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1418         ListenableFuture<Void> future =
1419             commitCohort.preCommit();
1420         try {
1421             future.get();
1422             future = commitCohort.commit();
1423             future.get();
1424         } catch (InterruptedException | ExecutionException e) {
1425         }
1426     }
1427
1428     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1429         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1430             @Override
1431             public void onDataChanged(
1432                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1433
1434             }
1435         };
1436     }
1437
1438     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1439             throws ExecutionException, InterruptedException {
1440         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1441
1442         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1443             transaction.read(id);
1444
1445         Optional<NormalizedNode<?, ?>> optional = future.get();
1446         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1447
1448         transaction.close();
1449
1450         return node;
1451     }
1452
1453     private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
1454         throws ExecutionException, InterruptedException {
1455         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1456
1457         transaction.write(id, node);
1458
1459         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1460         commitCohort.preCommit().get();
1461         commitCohort.commit().get();
1462     }
1463
1464     @SuppressWarnings("serial")
1465     private static final class DelegatingShardCreator implements Creator<Shard> {
1466         private final Creator<Shard> delegate;
1467
1468         DelegatingShardCreator(final Creator<Shard> delegate) {
1469             this.delegate = delegate;
1470         }
1471
1472         @Override
1473         public Shard create() throws Exception {
1474             return delegate.create();
1475         }
1476     }
1477 }