Bug 2268: Use streaming for Modification payload
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.mockito.InOrder;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
60 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
61 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
62 import org.opendaylight.controller.cluster.datastore.modification.Modification;
63 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
64 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
66 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
67 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
68 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
69 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
70 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
71 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
72 import org.opendaylight.controller.cluster.raft.Snapshot;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
74 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
75 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
76 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
77 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
78 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
79 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
80 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
81 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
82 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
83 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
84 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
85 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
86 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
87 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
88 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
89 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
90 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
91 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
92 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
93 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
95 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
97 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
98 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
100 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
101 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
102 import scala.concurrent.Await;
103 import scala.concurrent.Future;
104 import scala.concurrent.duration.FiniteDuration;
105
106 public class ShardTest extends AbstractActorTest {
107
108     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
109
110     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
111
112     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
113             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
114
115     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
116             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
117             shardHeartbeatIntervalInMillis(100);
118
119     @Before
120     public void setUp() {
121         Builder newBuilder = DatastoreContext.newBuilder();
122         InMemorySnapshotStore.clear();
123         InMemoryJournal.clear();
124     }
125
126     @After
127     public void tearDown() {
128         InMemorySnapshotStore.clear();
129         InMemoryJournal.clear();
130     }
131
132     private DatastoreContext newDatastoreContext() {
133         return dataStoreContextBuilder.build();
134     }
135
136     private Props newShardProps() {
137         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
138                 newDatastoreContext(), SCHEMA_CONTEXT);
139     }
140
141     @Test
142     public void testRegisterChangeListener() throws Exception {
143         new ShardTestKit(getSystem()) {{
144             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
145                     newShardProps(),  "testRegisterChangeListener");
146
147             waitUntilLeader(shard);
148
149             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
150
151             MockDataChangeListener listener = new MockDataChangeListener(1);
152             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
153                     "testRegisterChangeListener-DataChangeListener");
154
155             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
156                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
157
158             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
159                     RegisterChangeListenerReply.class);
160             String replyPath = reply.getListenerRegistrationPath().toString();
161             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
162                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
163
164             YangInstanceIdentifier path = TestModel.TEST_PATH;
165             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
166
167             listener.waitForChangeEvents(path);
168
169             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
170             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
171         }};
172     }
173
174     @SuppressWarnings("serial")
175     @Test
176     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
177         // This test tests the timing window in which a change listener is registered before the
178         // shard becomes the leader. We verify that the listener is registered and notified of the
179         // existing data when the shard becomes the leader.
180         new ShardTestKit(getSystem()) {{
181             // For this test, we want to send the RegisterChangeListener message after the shard
182             // has recovered from persistence and before it becomes the leader. So we subclass
183             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
184             // we know that the shard has been initialized to a follower and has started the
185             // election process. The following 2 CountDownLatches are used to coordinate the
186             // ElectionTimeout with the sending of the RegisterChangeListener message.
187             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
188             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
189             Creator<Shard> creator = new Creator<Shard>() {
190                 boolean firstElectionTimeout = true;
191
192                 @Override
193                 public Shard create() throws Exception {
194                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
195                             newDatastoreContext(), SCHEMA_CONTEXT) {
196                         @Override
197                         public void onReceiveCommand(final Object message) throws Exception {
198                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
199                                 // Got the first ElectionTimeout. We don't forward it to the
200                                 // base Shard yet until we've sent the RegisterChangeListener
201                                 // message. So we signal the onFirstElectionTimeout latch to tell
202                                 // the main thread to send the RegisterChangeListener message and
203                                 // start a thread to wait on the onChangeListenerRegistered latch,
204                                 // which the main thread signals after it has sent the message.
205                                 // After the onChangeListenerRegistered is triggered, we send the
206                                 // original ElectionTimeout message to proceed with the election.
207                                 firstElectionTimeout = false;
208                                 final ActorRef self = getSelf();
209                                 new Thread() {
210                                     @Override
211                                     public void run() {
212                                         Uninterruptibles.awaitUninterruptibly(
213                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
214                                         self.tell(message, self);
215                                     }
216                                 }.start();
217
218                                 onFirstElectionTimeout.countDown();
219                             } else {
220                                 super.onReceiveCommand(message);
221                             }
222                         }
223                     };
224                 }
225             };
226
227             MockDataChangeListener listener = new MockDataChangeListener(1);
228             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
229                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
230
231             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
232                     Props.create(new DelegatingShardCreator(creator)),
233                     "testRegisterChangeListenerWhenNotLeaderInitially");
234
235             // Write initial data into the in-memory store.
236             YangInstanceIdentifier path = TestModel.TEST_PATH;
237             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
238
239             // Wait until the shard receives the first ElectionTimeout message.
240             assertEquals("Got first ElectionTimeout", true,
241                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
242
243             // Now send the RegisterChangeListener and wait for the reply.
244             shard.tell(new RegisterChangeListener(path, dclActor.path(),
245                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
246
247             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
248                     RegisterChangeListenerReply.class);
249             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
250
251             // Sanity check - verify the shard is not the leader yet.
252             shard.tell(new FindLeader(), getRef());
253             FindLeaderReply findLeadeReply =
254                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
255             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
256
257             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
258             // with the election process.
259             onChangeListenerRegistered.countDown();
260
261             // Wait for the shard to become the leader and notify our listener with the existing
262             // data in the store.
263             listener.waitForChangeEvents(path);
264
265             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
266             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
267         }};
268     }
269
270     @Test
271     public void testCreateTransaction(){
272         new ShardTestKit(getSystem()) {{
273             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
274
275             waitUntilLeader(shard);
276
277             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
278
279             shard.tell(new CreateTransaction("txn-1",
280                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
281
282             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
283                     CreateTransactionReply.class);
284
285             String path = reply.getTransactionActorPath().toString();
286             assertTrue("Unexpected transaction path " + path,
287                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
288
289             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
290         }};
291     }
292
293     @Test
294     public void testCreateTransactionOnChain(){
295         new ShardTestKit(getSystem()) {{
296             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
297
298             waitUntilLeader(shard);
299
300             shard.tell(new CreateTransaction("txn-1",
301                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
302                     getRef());
303
304             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
305                     CreateTransactionReply.class);
306
307             String path = reply.getTransactionActorPath().toString();
308             assertTrue("Unexpected transaction path " + path,
309                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
310
311             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
312         }};
313     }
314
315     @SuppressWarnings("serial")
316     @Test
317     public void testPeerAddressResolved() throws Exception {
318         new ShardTestKit(getSystem()) {{
319             final CountDownLatch recoveryComplete = new CountDownLatch(1);
320             class TestShard extends Shard {
321                 TestShard() {
322                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
323                             newDatastoreContext(), SCHEMA_CONTEXT);
324                 }
325
326                 Map<String, String> getPeerAddresses() {
327                     return getRaftActorContext().getPeerAddresses();
328                 }
329
330                 @Override
331                 protected void onRecoveryComplete() {
332                     try {
333                         super.onRecoveryComplete();
334                     } finally {
335                         recoveryComplete.countDown();
336                     }
337                 }
338             }
339
340             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
341                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
342                         @Override
343                         public TestShard create() throws Exception {
344                             return new TestShard();
345                         }
346                     })), "testPeerAddressResolved");
347
348             //waitUntilLeader(shard);
349             assertEquals("Recovery complete", true,
350                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
351
352             String address = "akka://foobar";
353             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
354
355             assertEquals("getPeerAddresses", address,
356                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
357
358             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
359         }};
360     }
361
362     @Test
363     public void testApplySnapshot() throws Exception {
364         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
365                 "testApplySnapshot");
366
367         NormalizedNodeToNodeCodec codec =
368             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
369
370         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
371
372         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
373         NormalizedNode<?,?> expected = readStore(shard, root);
374
375         NormalizedNodeMessages.Container encode = codec.encode(expected);
376
377         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
378                 encode.getNormalizedNode().toByteString().toByteArray(),
379                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
380
381         shard.underlyingActor().onReceiveCommand(applySnapshot);
382
383         NormalizedNode<?,?> actual = readStore(shard, root);
384
385         assertEquals(expected, actual);
386
387         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
388     }
389
390     @Test
391     public void testApplyState() throws Exception {
392
393         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
394
395         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
396
397         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
398                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
399
400         shard.underlyingActor().onReceiveCommand(applyState);
401
402         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
403         assertEquals("Applied state", node, actual);
404
405         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
406     }
407
408     @Test
409     public void testApplyStateLegacy() throws Exception {
410
411         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
412
413         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
414
415         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
416                 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
417
418         shard.underlyingActor().onReceiveCommand(applyState);
419
420         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
421         assertEquals("Applied state", node, actual);
422
423         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
424     }
425
426     @SuppressWarnings("serial")
427     @Test
428     public void testRecovery() throws Exception {
429
430         // Set up the InMemorySnapshotStore.
431
432         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
433         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
434
435         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
436         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
437         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
438         commitCohort.preCommit().get();
439         commitCohort.commit().get();
440
441         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
442         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
443
444         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
445                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
446                         root).
447                                 getNormalizedNode().toByteString().toByteArray(),
448                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
449
450         // Set up the InMemoryJournal.
451
452         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
453                   new WriteModification(TestModel.OUTER_LIST_PATH,
454                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
455
456         int nListEntries = 16;
457         Set<Integer> listEntryKeys = new HashSet<>();
458         int i = 1;
459
460         // Add some of the legacy CompositeModificationPayload
461         for(; i <= 2; i++) {
462             listEntryKeys.add(Integer.valueOf(i));
463             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
464                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
465             Modification mod = new MergeModification(path,
466                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
467             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
468                     newLegacyPayload(mod)));
469         }
470
471         // Add some of the legacy CompositeModificationByteStringPayload
472         for(; i <= 5; i++) {
473             listEntryKeys.add(Integer.valueOf(i));
474             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
475                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
476             Modification mod = new MergeModification(path,
477                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
478             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
479                     newLegacyByteStringPayload(mod)));
480         }
481
482         // Add some of the ModificationPayload
483         for(; i <= nListEntries; i++) {
484             listEntryKeys.add(Integer.valueOf(i));
485             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
486                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
487             Modification mod = new MergeModification(path,
488                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
489             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
490                     newModificationPayload(mod)));
491         }
492
493         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
494                 new ApplyLogEntries(nListEntries));
495
496         // Create the actor and wait for recovery complete.
497
498         final CountDownLatch recoveryComplete = new CountDownLatch(1);
499
500         Creator<Shard> creator = new Creator<Shard>() {
501             @Override
502             public Shard create() throws Exception {
503                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
504                         newDatastoreContext(), SCHEMA_CONTEXT) {
505                     @Override
506                     protected void onRecoveryComplete() {
507                         try {
508                             super.onRecoveryComplete();
509                         } finally {
510                             recoveryComplete.countDown();
511                         }
512                     }
513                 };
514             }
515         };
516
517         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
518                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
519
520         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
521
522         // Verify data in the data store.
523
524         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
525         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
526         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
527                 outerList.getValue() instanceof Iterable);
528         for(Object entry: (Iterable<?>) outerList.getValue()) {
529             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
530                     entry instanceof MapEntryNode);
531             MapEntryNode mapEntry = (MapEntryNode)entry;
532             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
533                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
534             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
535             Object value = idLeaf.get().getValue();
536             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
537                     listEntryKeys.remove(value));
538         }
539
540         if(!listEntryKeys.isEmpty()) {
541             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
542                     listEntryKeys);
543         }
544
545         assertEquals("Last log index", nListEntries,
546                 shard.underlyingActor().getShardMBean().getLastLogIndex());
547         assertEquals("Commit index", nListEntries,
548                 shard.underlyingActor().getShardMBean().getCommitIndex());
549         assertEquals("Last applied", nListEntries,
550                 shard.underlyingActor().getShardMBean().getLastApplied());
551
552         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
553     }
554
555     private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
556         MutableCompositeModification compMod = new MutableCompositeModification();
557         for(Modification mod: mods) {
558             compMod.addModification(mod);
559         }
560
561         return new CompositeModificationPayload(compMod.toSerializable());
562     }
563
564     private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
565         MutableCompositeModification compMod = new MutableCompositeModification();
566         for(Modification mod: mods) {
567             compMod.addModification(mod);
568         }
569
570         return new CompositeModificationByteStringPayload(compMod.toSerializable());
571     }
572
573     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
574         MutableCompositeModification compMod = new MutableCompositeModification();
575         for(Modification mod: mods) {
576             compMod.addModification(mod);
577         }
578
579         return new ModificationPayload(compMod);
580     }
581
582     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
583             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
584             final MutableCompositeModification modification) {
585         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
586     }
587
588     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
589             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
590             final MutableCompositeModification modification,
591             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
592
593         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
594         tx.write(path, data);
595         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
596         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
597
598         doAnswer(new Answer<ListenableFuture<Boolean>>() {
599             @Override
600             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
601                 return realCohort.canCommit();
602             }
603         }).when(cohort).canCommit();
604
605         doAnswer(new Answer<ListenableFuture<Void>>() {
606             @Override
607             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
608                 if(preCommit != null) {
609                     return preCommit.apply(realCohort);
610                 } else {
611                     return realCohort.preCommit();
612                 }
613             }
614         }).when(cohort).preCommit();
615
616         doAnswer(new Answer<ListenableFuture<Void>>() {
617             @Override
618             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
619                 return realCohort.commit();
620             }
621         }).when(cohort).commit();
622
623         doAnswer(new Answer<ListenableFuture<Void>>() {
624             @Override
625             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
626                 return realCohort.abort();
627             }
628         }).when(cohort).abort();
629
630         modification.addModification(new WriteModification(path, data));
631
632         return cohort;
633     }
634
635     @SuppressWarnings({ "unchecked" })
636     @Test
637     public void testConcurrentThreePhaseCommits() throws Throwable {
638         new ShardTestKit(getSystem()) {{
639             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
640                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
641                     "testConcurrentThreePhaseCommits");
642
643             waitUntilLeader(shard);
644
645             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
646
647             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
648
649             String transactionID1 = "tx1";
650             MutableCompositeModification modification1 = new MutableCompositeModification();
651             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
652                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
653
654             String transactionID2 = "tx2";
655             MutableCompositeModification modification2 = new MutableCompositeModification();
656             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
657                     TestModel.OUTER_LIST_PATH,
658                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
659                     modification2);
660
661             String transactionID3 = "tx3";
662             MutableCompositeModification modification3 = new MutableCompositeModification();
663             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
664                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
665                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
666                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
667                     modification3);
668
669             long timeoutSec = 5;
670             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
671             final Timeout timeout = new Timeout(duration);
672
673             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
674             // by the ShardTransaction.
675
676             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
677                     cohort1, modification1, true), getRef());
678             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
679                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
680             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
681
682             // Send the CanCommitTransaction message for the first Tx.
683
684             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
685             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
686                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
687             assertEquals("Can commit", true, canCommitReply.getCanCommit());
688
689             // Send the ForwardedReadyTransaction for the next 2 Tx's.
690
691             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
692                     cohort2, modification2, true), getRef());
693             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
694
695             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
696                     cohort3, modification3, true), getRef());
697             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
698
699             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
700             // processed after the first Tx completes.
701
702             Future<Object> canCommitFuture1 = Patterns.ask(shard,
703                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
704
705             Future<Object> canCommitFuture2 = Patterns.ask(shard,
706                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
707
708             // Send the CommitTransaction message for the first Tx. After it completes, it should
709             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
710
711             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
712             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
713
714             // Wait for the next 2 Tx's to complete.
715
716             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
717             final CountDownLatch commitLatch = new CountDownLatch(2);
718
719             class OnFutureComplete extends OnComplete<Object> {
720                 private final Class<?> expRespType;
721
722                 OnFutureComplete(final Class<?> expRespType) {
723                     this.expRespType = expRespType;
724                 }
725
726                 @Override
727                 public void onComplete(final Throwable error, final Object resp) {
728                     if(error != null) {
729                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
730                     } else {
731                         try {
732                             assertEquals("Commit response type", expRespType, resp.getClass());
733                             onSuccess(resp);
734                         } catch (Exception e) {
735                             caughtEx.set(e);
736                         }
737                     }
738                 }
739
740                 void onSuccess(final Object resp) throws Exception {
741                 }
742             }
743
744             class OnCommitFutureComplete extends OnFutureComplete {
745                 OnCommitFutureComplete() {
746                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
747                 }
748
749                 @Override
750                 public void onComplete(final Throwable error, final Object resp) {
751                     super.onComplete(error, resp);
752                     commitLatch.countDown();
753                 }
754             }
755
756             class OnCanCommitFutureComplete extends OnFutureComplete {
757                 private final String transactionID;
758
759                 OnCanCommitFutureComplete(final String transactionID) {
760                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
761                     this.transactionID = transactionID;
762                 }
763
764                 @Override
765                 void onSuccess(final Object resp) throws Exception {
766                     CanCommitTransactionReply canCommitReply =
767                             CanCommitTransactionReply.fromSerializable(resp);
768                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
769
770                     Future<Object> commitFuture = Patterns.ask(shard,
771                             new CommitTransaction(transactionID).toSerializable(), timeout);
772                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
773                 }
774             }
775
776             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
777                     getSystem().dispatcher());
778
779             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
780                     getSystem().dispatcher());
781
782             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
783
784             if(caughtEx.get() != null) {
785                 throw caughtEx.get();
786             }
787
788             assertEquals("Commits complete", true, done);
789
790             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
791             inOrder.verify(cohort1).canCommit();
792             inOrder.verify(cohort1).preCommit();
793             inOrder.verify(cohort1).commit();
794             inOrder.verify(cohort2).canCommit();
795             inOrder.verify(cohort2).preCommit();
796             inOrder.verify(cohort2).commit();
797             inOrder.verify(cohort3).canCommit();
798             inOrder.verify(cohort3).preCommit();
799             inOrder.verify(cohort3).commit();
800
801             // Verify data in the data store.
802
803             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
804             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
805             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
806                     outerList.getValue() instanceof Iterable);
807             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
808             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
809                        entry instanceof MapEntryNode);
810             MapEntryNode mapEntry = (MapEntryNode)entry;
811             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
812                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
813             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
814             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
815
816             verifyLastLogIndex(shard, 2);
817
818             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
819         }};
820     }
821
822     private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
823         for(int i = 0; i < 20 * 5; i++) {
824             long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
825             if(lastLogIndex == expectedValue) {
826                 break;
827             }
828             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
829         }
830
831         assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
832     }
833
834     @Test
835     public void testCommitWithPersistenceDisabled() throws Throwable {
836         dataStoreContextBuilder.persistent(false);
837         new ShardTestKit(getSystem()) {{
838             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
839                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
840                     "testCommitPhaseFailure");
841
842             waitUntilLeader(shard);
843
844             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
845
846             // Setup a simulated transactions with a mock cohort.
847
848             String transactionID = "tx";
849             MutableCompositeModification modification = new MutableCompositeModification();
850             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
851             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
852                     TestModel.TEST_PATH, containerNode, modification);
853
854             FiniteDuration duration = duration("5 seconds");
855
856             // Simulate the ForwardedReadyTransaction messages that would be sent
857             // by the ShardTransaction.
858
859             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
860                     cohort, modification, true), getRef());
861             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
862
863             // Send the CanCommitTransaction message.
864
865             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
866             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
867                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
868             assertEquals("Can commit", true, canCommitReply.getCanCommit());
869
870             // Send the CanCommitTransaction message.
871
872             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
873             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
874
875             InOrder inOrder = inOrder(cohort);
876             inOrder.verify(cohort).canCommit();
877             inOrder.verify(cohort).preCommit();
878             inOrder.verify(cohort).commit();
879
880             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
881             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
882
883             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
884         }};
885     }
886
887     @Test
888     public void testCommitPhaseFailure() throws Throwable {
889         new ShardTestKit(getSystem()) {{
890             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
891                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
892                     "testCommitPhaseFailure");
893
894             waitUntilLeader(shard);
895
896             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
897             // commit phase.
898
899             String transactionID1 = "tx1";
900             MutableCompositeModification modification1 = new MutableCompositeModification();
901             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
902             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
903             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
904             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
905
906             String transactionID2 = "tx2";
907             MutableCompositeModification modification2 = new MutableCompositeModification();
908             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
909             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
910
911             FiniteDuration duration = duration("5 seconds");
912             final Timeout timeout = new Timeout(duration);
913
914             // Simulate the ForwardedReadyTransaction messages that would be sent
915             // by the ShardTransaction.
916
917             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
918                     cohort1, modification1, true), getRef());
919             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
920
921             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
922                     cohort2, modification2, true), getRef());
923             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
924
925             // Send the CanCommitTransaction message for the first Tx.
926
927             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
928             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
929                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
930             assertEquals("Can commit", true, canCommitReply.getCanCommit());
931
932             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
933             // processed after the first Tx completes.
934
935             Future<Object> canCommitFuture = Patterns.ask(shard,
936                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
937
938             // Send the CommitTransaction message for the first Tx. This should send back an error
939             // and trigger the 2nd Tx to proceed.
940
941             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
942             expectMsgClass(duration, akka.actor.Status.Failure.class);
943
944             // Wait for the 2nd Tx to complete the canCommit phase.
945
946             final CountDownLatch latch = new CountDownLatch(1);
947             canCommitFuture.onComplete(new OnComplete<Object>() {
948                 @Override
949                 public void onComplete(final Throwable t, final Object resp) {
950                     latch.countDown();
951                 }
952             }, getSystem().dispatcher());
953
954             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
955
956             InOrder inOrder = inOrder(cohort1, cohort2);
957             inOrder.verify(cohort1).canCommit();
958             inOrder.verify(cohort1).preCommit();
959             inOrder.verify(cohort1).commit();
960             inOrder.verify(cohort2).canCommit();
961
962             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
963         }};
964     }
965
966     @Test
967     public void testPreCommitPhaseFailure() throws Throwable {
968         new ShardTestKit(getSystem()) {{
969             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
970                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
971                     "testPreCommitPhaseFailure");
972
973             waitUntilLeader(shard);
974
975             String transactionID = "tx1";
976             MutableCompositeModification modification = new MutableCompositeModification();
977             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
978             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
979             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
980
981             FiniteDuration duration = duration("5 seconds");
982
983             // Simulate the ForwardedReadyTransaction messages that would be sent
984             // by the ShardTransaction.
985
986             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
987                     cohort, modification, true), getRef());
988             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
989
990             // Send the CanCommitTransaction message.
991
992             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
993             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
994                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
995             assertEquals("Can commit", true, canCommitReply.getCanCommit());
996
997             // Send the CommitTransaction message. This should send back an error
998             // for preCommit failure.
999
1000             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1001             expectMsgClass(duration, akka.actor.Status.Failure.class);
1002
1003             InOrder inOrder = inOrder(cohort);
1004             inOrder.verify(cohort).canCommit();
1005             inOrder.verify(cohort).preCommit();
1006
1007             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1008         }};
1009     }
1010
1011     @Test
1012     public void testCanCommitPhaseFailure() throws Throwable {
1013         new ShardTestKit(getSystem()) {{
1014             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1015                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1016                     "testCanCommitPhaseFailure");
1017
1018             waitUntilLeader(shard);
1019
1020             final FiniteDuration duration = duration("5 seconds");
1021
1022             String transactionID = "tx1";
1023             MutableCompositeModification modification = new MutableCompositeModification();
1024             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1025             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1026
1027             // Simulate the ForwardedReadyTransaction messages that would be sent
1028             // by the ShardTransaction.
1029
1030             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1031                     cohort, modification, true), getRef());
1032             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1033
1034             // Send the CanCommitTransaction message.
1035
1036             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1037             expectMsgClass(duration, akka.actor.Status.Failure.class);
1038
1039             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1040         }};
1041     }
1042
1043     @Test
1044     public void testAbortBeforeFinishCommit() throws Throwable {
1045         new ShardTestKit(getSystem()) {{
1046             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1047                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1048                     "testAbortBeforeFinishCommit");
1049
1050             waitUntilLeader(shard);
1051
1052             final FiniteDuration duration = duration("5 seconds");
1053             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1054
1055             final String transactionID = "tx1";
1056             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1057                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1058                 @Override
1059                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1060                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1061
1062                     // Simulate an AbortTransaction message occurring during replication, after
1063                     // persisting and before finishing the commit to the in-memory store.
1064                     // We have no followers so due to optimizations in the RaftActor, it does not
1065                     // attempt replication and thus we can't send an AbortTransaction message b/c
1066                     // it would be processed too late after CommitTransaction completes. So we'll
1067                     // simulate an AbortTransaction message occurring during replication by calling
1068                     // the shard directly.
1069                     //
1070                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1071
1072                     return preCommitFuture;
1073                 }
1074             };
1075
1076             MutableCompositeModification modification = new MutableCompositeModification();
1077             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1078                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1079                     modification, preCommit);
1080
1081             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1082                     cohort, modification, true), getRef());
1083             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1084
1085             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1086             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1087                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1088             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1089
1090             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1091             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1092
1093             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1094
1095             // Since we're simulating an abort occurring during replication and before finish commit,
1096             // the data should still get written to the in-memory store since we've gotten past
1097             // canCommit and preCommit and persisted the data.
1098             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1099
1100             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1101         }};
1102     }
1103
1104     @Test
1105     public void testTransactionCommitTimeout() throws Throwable {
1106         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1107
1108         new ShardTestKit(getSystem()) {{
1109             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1110                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1111                     "testTransactionCommitTimeout");
1112
1113             waitUntilLeader(shard);
1114
1115             final FiniteDuration duration = duration("5 seconds");
1116
1117             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1118
1119             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1120             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1121                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1122
1123             // Create 1st Tx - will timeout
1124
1125             String transactionID1 = "tx1";
1126             MutableCompositeModification modification1 = new MutableCompositeModification();
1127             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1128                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1129                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1130                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1131                     modification1);
1132
1133             // Create 2nd Tx
1134
1135             String transactionID2 = "tx3";
1136             MutableCompositeModification modification2 = new MutableCompositeModification();
1137             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1138                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1139             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1140                     listNodePath,
1141                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1142                     modification2);
1143
1144             // Ready the Tx's
1145
1146             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1147                     cohort1, modification1, true), getRef());
1148             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1149
1150             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1151                     cohort2, modification2, true), getRef());
1152             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1153
1154             // canCommit 1st Tx. We don't send the commit so it should timeout.
1155
1156             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1157             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1158
1159             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1160
1161             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1162             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1163
1164             // Commit the 2nd Tx.
1165
1166             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1167             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1168
1169             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1170             assertNotNull(listNodePath + " not found", node);
1171
1172             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1173         }};
1174     }
1175
1176     @Test
1177     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1178         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1179
1180         new ShardTestKit(getSystem()) {{
1181             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1182                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1183                     "testTransactionCommitQueueCapacityExceeded");
1184
1185             waitUntilLeader(shard);
1186
1187             final FiniteDuration duration = duration("5 seconds");
1188
1189             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1190
1191             String transactionID1 = "tx1";
1192             MutableCompositeModification modification1 = new MutableCompositeModification();
1193             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1194                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1195
1196             String transactionID2 = "tx2";
1197             MutableCompositeModification modification2 = new MutableCompositeModification();
1198             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1199                     TestModel.OUTER_LIST_PATH,
1200                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1201                     modification2);
1202
1203             String transactionID3 = "tx3";
1204             MutableCompositeModification modification3 = new MutableCompositeModification();
1205             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1206                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1207
1208             // Ready the Tx's
1209
1210             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1211                     cohort1, modification1, true), getRef());
1212             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1213
1214             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1215                     cohort2, modification2, true), getRef());
1216             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1217
1218             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1219                     cohort3, modification3, true), getRef());
1220             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1221
1222             // canCommit 1st Tx.
1223
1224             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1225             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1226
1227             // canCommit the 2nd Tx - it should get queued.
1228
1229             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1230
1231             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1232
1233             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1234             expectMsgClass(duration, akka.actor.Status.Failure.class);
1235
1236             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1237         }};
1238     }
1239
1240     @Test
1241     public void testCanCommitBeforeReadyFailure() throws Throwable {
1242         new ShardTestKit(getSystem()) {{
1243             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1244                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1245                     "testCanCommitBeforeReadyFailure");
1246
1247             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1248             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1249
1250             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1251         }};
1252     }
1253
1254     @Test
1255     public void testAbortTransaction() throws Throwable {
1256         new ShardTestKit(getSystem()) {{
1257             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1258                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1259                     "testAbortTransaction");
1260
1261             waitUntilLeader(shard);
1262
1263             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1264
1265             String transactionID1 = "tx1";
1266             MutableCompositeModification modification1 = new MutableCompositeModification();
1267             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1268             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1269             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1270
1271             String transactionID2 = "tx2";
1272             MutableCompositeModification modification2 = new MutableCompositeModification();
1273             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1274             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1275
1276             FiniteDuration duration = duration("5 seconds");
1277             final Timeout timeout = new Timeout(duration);
1278
1279             // Simulate the ForwardedReadyTransaction messages that would be sent
1280             // by the ShardTransaction.
1281
1282             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1283                     cohort1, modification1, true), getRef());
1284             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1285
1286             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1287                     cohort2, modification2, true), getRef());
1288             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1289
1290             // Send the CanCommitTransaction message for the first Tx.
1291
1292             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1293             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1294                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1295             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1296
1297             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1298             // processed after the first Tx completes.
1299
1300             Future<Object> canCommitFuture = Patterns.ask(shard,
1301                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1302
1303             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1304             // Tx to proceed.
1305
1306             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1307             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1308
1309             // Wait for the 2nd Tx to complete the canCommit phase.
1310
1311             Await.ready(canCommitFuture, duration);
1312
1313             InOrder inOrder = inOrder(cohort1, cohort2);
1314             inOrder.verify(cohort1).canCommit();
1315             inOrder.verify(cohort2).canCommit();
1316
1317             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1318         }};
1319     }
1320
1321     @Test
1322     public void testCreateSnapshot() throws IOException, InterruptedException {
1323             testCreateSnapshot(true, "testCreateSnapshot");
1324     }
1325
1326     @Test
1327     public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1328         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1329     }
1330
1331     @SuppressWarnings("serial")
1332     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1333         final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1334                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1335
1336         new ShardTestKit(getSystem()) {{
1337             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1338             Creator<Shard> creator = new Creator<Shard>() {
1339                 @Override
1340                 public Shard create() throws Exception {
1341                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1342                             newDatastoreContext(), SCHEMA_CONTEXT) {
1343                         @Override
1344                         protected void commitSnapshot(final long sequenceNumber) {
1345                             super.commitSnapshot(sequenceNumber);
1346                             latch.get().countDown();
1347                         }
1348                     };
1349                 }
1350             };
1351
1352             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1353                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1354
1355             waitUntilLeader(shard);
1356
1357             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1358
1359             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1360
1361             latch.set(new CountDownLatch(1));
1362             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1363
1364             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1365
1366             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1367         }};
1368     }
1369
1370     /**
1371      * This test simply verifies that the applySnapShot logic will work
1372      * @throws ReadFailedException
1373      */
1374     @Test
1375     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1376         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1377
1378         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1379
1380         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1381         putTransaction.write(TestModel.TEST_PATH,
1382             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1383         commitTransaction(putTransaction);
1384
1385
1386         NormalizedNode<?, ?> expected = readStore(store);
1387
1388         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1389
1390         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1391         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1392
1393         commitTransaction(writeTransaction);
1394
1395         NormalizedNode<?, ?> actual = readStore(store);
1396
1397         assertEquals(expected, actual);
1398     }
1399
1400     @Test
1401     public void testRecoveryApplicable(){
1402
1403         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1404                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1405
1406         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1407                 persistentContext, SCHEMA_CONTEXT);
1408
1409         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1410                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1411
1412         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1413                 nonPersistentContext, SCHEMA_CONTEXT);
1414
1415         new ShardTestKit(getSystem()) {{
1416             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1417                     persistentProps, "testPersistence1");
1418
1419             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1420
1421             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1422
1423             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1424                     nonPersistentProps, "testPersistence2");
1425
1426             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1427
1428             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1429
1430         }};
1431
1432     }
1433
1434
1435     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1436         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1437         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1438             transaction.read(YangInstanceIdentifier.builder().build());
1439
1440         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1441
1442         NormalizedNode<?, ?> normalizedNode = optional.get();
1443
1444         transaction.close();
1445
1446         return normalizedNode;
1447     }
1448
1449     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1450         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1451         ListenableFuture<Void> future =
1452             commitCohort.preCommit();
1453         try {
1454             future.get();
1455             future = commitCohort.commit();
1456             future.get();
1457         } catch (InterruptedException | ExecutionException e) {
1458         }
1459     }
1460
1461     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1462         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1463             @Override
1464             public void onDataChanged(
1465                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1466
1467             }
1468         };
1469     }
1470
1471     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1472             throws ExecutionException, InterruptedException {
1473         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1474
1475         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1476             transaction.read(id);
1477
1478         Optional<NormalizedNode<?, ?>> optional = future.get();
1479         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1480
1481         transaction.close();
1482
1483         return node;
1484     }
1485
1486     private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
1487         throws ExecutionException, InterruptedException {
1488         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1489
1490         transaction.write(id, node);
1491
1492         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1493         commitCohort.preCommit().get();
1494         commitCohort.commit().get();
1495     }
1496
1497     @SuppressWarnings("serial")
1498     private static final class DelegatingShardCreator implements Creator<Shard> {
1499         private final Creator<Shard> delegate;
1500
1501         DelegatingShardCreator(final Creator<Shard> delegate) {
1502             this.delegate = delegate;
1503         }
1504
1505         @Override
1506         public Shard create() throws Exception {
1507             return delegate.create();
1508         }
1509     }
1510 }