Bug 3570: Persist snapshot on follower ApplySnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.OnComplete;
21 import akka.japi.Creator;
22 import akka.pattern.Patterns;
23 import akka.persistence.SaveSnapshotSuccess;
24 import akka.testkit.TestActorRef;
25 import akka.util.Timeout;
26 import com.google.common.base.Function;
27 import com.google.common.base.Optional;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import java.io.IOException;
32 import java.util.Collections;
33 import java.util.HashSet;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.Test;
41 import org.mockito.InOrder;
42 import org.opendaylight.controller.cluster.DataPersistenceProvider;
43 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
44 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
45 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
64 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
66 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
67 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
68 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
69 import org.opendaylight.controller.cluster.datastore.modification.Modification;
70 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
71 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
72 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
73 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
78 import org.opendaylight.controller.cluster.raft.RaftActorContext;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
80 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
81 import org.opendaylight.controller.cluster.raft.Snapshot;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
84 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
85 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
86 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
88 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
89 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
90 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
91 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
92 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
93 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
95 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
96 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
97 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
98 import org.opendaylight.yangtools.yang.common.QName;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
101 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
102 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
116 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
117 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
118 import scala.concurrent.Await;
119 import scala.concurrent.Future;
120 import scala.concurrent.duration.FiniteDuration;
121
122 public class ShardTest extends AbstractShardTest {
123     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
124
125     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
126
127     final CountDownLatch recoveryComplete = new CountDownLatch(1);
128
129     protected Props newShardPropsWithRecoveryComplete() {
130
131         Creator<Shard> creator = new Creator<Shard>() {
132             @Override
133             public Shard create() throws Exception {
134                 return new Shard(shardID, Collections.<String,String>emptyMap(),
135                         newDatastoreContext(), SCHEMA_CONTEXT) {
136                     @Override
137                     protected void onRecoveryComplete() {
138                         try {
139                             super.onRecoveryComplete();
140                         } finally {
141                             recoveryComplete.countDown();
142                         }
143                     }
144                 };
145             }
146         };
147         return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
148     }
149
150     @Test
151     public void testRegisterChangeListener() throws Exception {
152         new ShardTestKit(getSystem()) {{
153             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
154                     newShardProps(),  "testRegisterChangeListener");
155
156             waitUntilLeader(shard);
157
158             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
159
160             MockDataChangeListener listener = new MockDataChangeListener(1);
161             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
162                     "testRegisterChangeListener-DataChangeListener");
163
164             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
165                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
166
167             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
168                     RegisterChangeListenerReply.class);
169             String replyPath = reply.getListenerRegistrationPath().toString();
170             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
171                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
172
173             YangInstanceIdentifier path = TestModel.TEST_PATH;
174             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
175
176             listener.waitForChangeEvents(path);
177
178             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
179             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
180         }};
181     }
182
183     @SuppressWarnings("serial")
184     @Test
185     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
186         // This test tests the timing window in which a change listener is registered before the
187         // shard becomes the leader. We verify that the listener is registered and notified of the
188         // existing data when the shard becomes the leader.
189         new ShardTestKit(getSystem()) {{
190             // For this test, we want to send the RegisterChangeListener message after the shard
191             // has recovered from persistence and before it becomes the leader. So we subclass
192             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
193             // we know that the shard has been initialized to a follower and has started the
194             // election process. The following 2 CountDownLatches are used to coordinate the
195             // ElectionTimeout with the sending of the RegisterChangeListener message.
196             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
197             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
198             Creator<Shard> creator = new Creator<Shard>() {
199                 boolean firstElectionTimeout = true;
200
201                 @Override
202                 public Shard create() throws Exception {
203                     // Use a non persistent provider because this test actually invokes persist on the journal
204                     // this will cause all other messages to not be queued properly after that.
205                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
206                     // it does do a persist)
207                     return new Shard(shardID, Collections.<String,String>emptyMap(),
208                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
209                         @Override
210                         public void onReceiveCommand(final Object message) throws Exception {
211                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
212                                 // Got the first ElectionTimeout. We don't forward it to the
213                                 // base Shard yet until we've sent the RegisterChangeListener
214                                 // message. So we signal the onFirstElectionTimeout latch to tell
215                                 // the main thread to send the RegisterChangeListener message and
216                                 // start a thread to wait on the onChangeListenerRegistered latch,
217                                 // which the main thread signals after it has sent the message.
218                                 // After the onChangeListenerRegistered is triggered, we send the
219                                 // original ElectionTimeout message to proceed with the election.
220                                 firstElectionTimeout = false;
221                                 final ActorRef self = getSelf();
222                                 new Thread() {
223                                     @Override
224                                     public void run() {
225                                         Uninterruptibles.awaitUninterruptibly(
226                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
227                                         self.tell(message, self);
228                                     }
229                                 }.start();
230
231                                 onFirstElectionTimeout.countDown();
232                             } else {
233                                 super.onReceiveCommand(message);
234                             }
235                         }
236                     };
237                 }
238             };
239
240             MockDataChangeListener listener = new MockDataChangeListener(1);
241             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
242                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
243
244             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
245                     Props.create(new DelegatingShardCreator(creator)),
246                     "testRegisterChangeListenerWhenNotLeaderInitially");
247
248             // Write initial data into the in-memory store.
249             YangInstanceIdentifier path = TestModel.TEST_PATH;
250             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
251
252             // Wait until the shard receives the first ElectionTimeout message.
253             assertEquals("Got first ElectionTimeout", true,
254                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
255
256             // Now send the RegisterChangeListener and wait for the reply.
257             shard.tell(new RegisterChangeListener(path, dclActor,
258                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
259
260             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
261                     RegisterChangeListenerReply.class);
262             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
263
264             // Sanity check - verify the shard is not the leader yet.
265             shard.tell(new FindLeader(), getRef());
266             FindLeaderReply findLeadeReply =
267                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
268             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
269
270             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
271             // with the election process.
272             onChangeListenerRegistered.countDown();
273
274             // Wait for the shard to become the leader and notify our listener with the existing
275             // data in the store.
276             listener.waitForChangeEvents(path);
277
278             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
279             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
280         }};
281     }
282
283     @Test
284     public void testRegisterDataTreeChangeListener() throws Exception {
285         new ShardTestKit(getSystem()) {{
286             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
287                     newShardProps(), "testRegisterDataTreeChangeListener");
288
289             waitUntilLeader(shard);
290
291             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
292
293             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
294             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
295                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
296
297             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
298
299             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
300                     RegisterDataTreeChangeListenerReply.class);
301             String replyPath = reply.getListenerRegistrationPath().toString();
302             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
303                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
304
305             YangInstanceIdentifier path = TestModel.TEST_PATH;
306             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
307
308             listener.waitForChangeEvents();
309
310             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
311             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
312         }};
313     }
314
315     @SuppressWarnings("serial")
316     @Test
317     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
318         new ShardTestKit(getSystem()) {{
319             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
320             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
321             Creator<Shard> creator = new Creator<Shard>() {
322                 boolean firstElectionTimeout = true;
323
324                 @Override
325                 public Shard create() throws Exception {
326                     return new Shard(shardID, Collections.<String,String>emptyMap(),
327                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
328                         @Override
329                         public void onReceiveCommand(final Object message) throws Exception {
330                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
331                                 firstElectionTimeout = false;
332                                 final ActorRef self = getSelf();
333                                 new Thread() {
334                                     @Override
335                                     public void run() {
336                                         Uninterruptibles.awaitUninterruptibly(
337                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
338                                         self.tell(message, self);
339                                     }
340                                 }.start();
341
342                                 onFirstElectionTimeout.countDown();
343                             } else {
344                                 super.onReceiveCommand(message);
345                             }
346                         }
347                     };
348                 }
349             };
350
351             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
352             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
353                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
354
355             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
356                     Props.create(new DelegatingShardCreator(creator)),
357                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
358
359             YangInstanceIdentifier path = TestModel.TEST_PATH;
360             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
361
362             assertEquals("Got first ElectionTimeout", true,
363                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
364
365             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
366             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
367                     RegisterDataTreeChangeListenerReply.class);
368             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
369
370             shard.tell(new FindLeader(), getRef());
371             FindLeaderReply findLeadeReply =
372                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
373             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
374
375             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
376
377             onChangeListenerRegistered.countDown();
378
379             // TODO: investigate why we do not receive data chage events
380             listener.waitForChangeEvents();
381
382             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
383             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
384         }};
385     }
386
387     @Test
388     public void testCreateTransaction(){
389         new ShardTestKit(getSystem()) {{
390             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
391
392             waitUntilLeader(shard);
393
394             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
395
396             shard.tell(new CreateTransaction("txn-1",
397                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
398
399             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
400                     CreateTransactionReply.class);
401
402             String path = reply.getTransactionActorPath().toString();
403             assertTrue("Unexpected transaction path " + path,
404                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
405
406             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
407         }};
408     }
409
410     @Test
411     public void testCreateTransactionOnChain(){
412         new ShardTestKit(getSystem()) {{
413             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
414
415             waitUntilLeader(shard);
416
417             shard.tell(new CreateTransaction("txn-1",
418                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
419                     getRef());
420
421             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
422                     CreateTransactionReply.class);
423
424             String path = reply.getTransactionActorPath().toString();
425             assertTrue("Unexpected transaction path " + path,
426                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
427
428             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
429         }};
430     }
431
432     @SuppressWarnings("serial")
433     @Test
434     public void testPeerAddressResolved() throws Exception {
435         new ShardTestKit(getSystem()) {{
436             final CountDownLatch recoveryComplete = new CountDownLatch(1);
437             class TestShard extends Shard {
438                 TestShard() {
439                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
440                             newDatastoreContext(), SCHEMA_CONTEXT);
441                 }
442
443                 Map<String, String> getPeerAddresses() {
444                     return getRaftActorContext().getPeerAddresses();
445                 }
446
447                 @Override
448                 protected void onRecoveryComplete() {
449                     try {
450                         super.onRecoveryComplete();
451                     } finally {
452                         recoveryComplete.countDown();
453                     }
454                 }
455             }
456
457             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
458                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
459                         @Override
460                         public TestShard create() throws Exception {
461                             return new TestShard();
462                         }
463                     })), "testPeerAddressResolved");
464
465             //waitUntilLeader(shard);
466             assertEquals("Recovery complete", true,
467                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
468
469             String address = "akka://foobar";
470             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
471
472             assertEquals("getPeerAddresses", address,
473                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
474
475             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
476         }};
477     }
478
479     @Test
480     public void testApplySnapshot() throws Exception {
481         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
482                 "testApplySnapshot");
483
484         DataTree store = InMemoryDataTreeFactory.getInstance().create();
485         store.setSchemaContext(SCHEMA_CONTEXT);
486
487         ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
488                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
489                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
490                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
491
492         writeToStore(store, TestModel.TEST_PATH, container);
493
494         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
495         NormalizedNode<?,?> expected = readStore(store, root);
496
497         Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
498                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
499
500         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
501
502         NormalizedNode<?,?> actual = readStore(shard, root);
503
504         assertEquals("Root node", expected, actual);
505
506         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
507     }
508
509     @Test
510     public void testApplyState() throws Exception {
511
512         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
513
514         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
515
516         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
517                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
518
519         shard.underlyingActor().onReceiveCommand(applyState);
520
521         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
522         assertEquals("Applied state", node, actual);
523
524         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
525     }
526
527     @Test
528     public void testApplyStateWithCandidatePayload() throws Exception {
529
530         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
531
532         recoveryComplete.await(5,  TimeUnit.SECONDS);
533
534         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
535         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
536
537         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
538                 DataTreeCandidatePayload.create(candidate)));
539
540         shard.underlyingActor().onReceiveCommand(applyState);
541
542         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
543         assertEquals("Applied state", node, actual);
544
545         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
546     }
547
548     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
549         DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
550         testStore.setSchemaContext(SCHEMA_CONTEXT);
551
552         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
553
554         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
555
556         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
557                 SerializationUtils.serializeNormalizedNode(root),
558                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
559         return testStore;
560     }
561
562     private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
563         source.validate(mod);
564         final DataTreeCandidate candidate = source.prepare(mod);
565         source.commit(candidate);
566         return DataTreeCandidatePayload.create(candidate);
567     }
568
569     @Test
570     public void testDataTreeCandidateRecovery() throws Exception {
571         // Set up the InMemorySnapshotStore.
572         final DataTree source = setupInMemorySnapshotStore();
573
574         final DataTreeModification writeMod = source.takeSnapshot().newModification();
575         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
576
577         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
578
579         // Set up the InMemoryJournal.
580         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
581
582         int nListEntries = 16;
583         Set<Integer> listEntryKeys = new HashSet<>();
584
585         // Add some ModificationPayload entries
586         for (int i = 1; i <= nListEntries; i++) {
587             listEntryKeys.add(Integer.valueOf(i));
588
589             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
590                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
591
592             final DataTreeModification mod = source.takeSnapshot().newModification();
593             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
594
595             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
596                 payloadForModification(source, mod)));
597         }
598
599         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
600                 new ApplyJournalEntries(nListEntries));
601
602         testRecovery(listEntryKeys);
603     }
604
605     @Test
606     public void testModicationRecovery() throws Exception {
607
608         // Set up the InMemorySnapshotStore.
609         setupInMemorySnapshotStore();
610
611         // Set up the InMemoryJournal.
612
613         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
614
615         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
616                   new WriteModification(TestModel.OUTER_LIST_PATH,
617                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
618
619         int nListEntries = 16;
620         Set<Integer> listEntryKeys = new HashSet<>();
621
622         // Add some ModificationPayload entries
623         for(int i = 1; i <= nListEntries; i++) {
624             listEntryKeys.add(Integer.valueOf(i));
625             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
626                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
627             Modification mod = new MergeModification(path,
628                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
629             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
630                     newModificationPayload(mod)));
631         }
632
633         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
634                 new ApplyJournalEntries(nListEntries));
635
636         testRecovery(listEntryKeys);
637     }
638
639     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
640         MutableCompositeModification compMod = new MutableCompositeModification();
641         for(Modification mod: mods) {
642             compMod.addModification(mod);
643         }
644
645         return new ModificationPayload(compMod);
646     }
647
648     @Test
649     public void testConcurrentThreePhaseCommits() throws Throwable {
650         new ShardTestKit(getSystem()) {{
651             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
652                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
653                     "testConcurrentThreePhaseCommits");
654
655             waitUntilLeader(shard);
656
657          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
658
659             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
660
661             String transactionID1 = "tx1";
662             MutableCompositeModification modification1 = new MutableCompositeModification();
663             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
664                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
665
666             String transactionID2 = "tx2";
667             MutableCompositeModification modification2 = new MutableCompositeModification();
668             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
669                     TestModel.OUTER_LIST_PATH,
670                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
671                     modification2);
672
673             String transactionID3 = "tx3";
674             MutableCompositeModification modification3 = new MutableCompositeModification();
675             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
676                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
677                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
678                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
679                     modification3);
680
681             long timeoutSec = 5;
682             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
683             final Timeout timeout = new Timeout(duration);
684
685             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
686             // by the ShardTransaction.
687
688             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
689                     cohort1, modification1, true, false), getRef());
690             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
691                     expectMsgClass(duration, ReadyTransactionReply.class));
692             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
693
694             // Send the CanCommitTransaction message for the first Tx.
695
696             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
697             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
698                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
699             assertEquals("Can commit", true, canCommitReply.getCanCommit());
700
701             // Send the ForwardedReadyTransaction for the next 2 Tx's.
702
703             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
704                     cohort2, modification2, true, false), getRef());
705             expectMsgClass(duration, ReadyTransactionReply.class);
706
707             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
708                     cohort3, modification3, true, false), getRef());
709             expectMsgClass(duration, ReadyTransactionReply.class);
710
711             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
712             // processed after the first Tx completes.
713
714             Future<Object> canCommitFuture1 = Patterns.ask(shard,
715                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
716
717             Future<Object> canCommitFuture2 = Patterns.ask(shard,
718                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
719
720             // Send the CommitTransaction message for the first Tx. After it completes, it should
721             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
722
723             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
724             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
725
726             // Wait for the next 2 Tx's to complete.
727
728             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
729             final CountDownLatch commitLatch = new CountDownLatch(2);
730
731             class OnFutureComplete extends OnComplete<Object> {
732                 private final Class<?> expRespType;
733
734                 OnFutureComplete(final Class<?> expRespType) {
735                     this.expRespType = expRespType;
736                 }
737
738                 @Override
739                 public void onComplete(final Throwable error, final Object resp) {
740                     if(error != null) {
741                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
742                     } else {
743                         try {
744                             assertEquals("Commit response type", expRespType, resp.getClass());
745                             onSuccess(resp);
746                         } catch (Exception e) {
747                             caughtEx.set(e);
748                         }
749                     }
750                 }
751
752                 void onSuccess(final Object resp) throws Exception {
753                 }
754             }
755
756             class OnCommitFutureComplete extends OnFutureComplete {
757                 OnCommitFutureComplete() {
758                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
759                 }
760
761                 @Override
762                 public void onComplete(final Throwable error, final Object resp) {
763                     super.onComplete(error, resp);
764                     commitLatch.countDown();
765                 }
766             }
767
768             class OnCanCommitFutureComplete extends OnFutureComplete {
769                 private final String transactionID;
770
771                 OnCanCommitFutureComplete(final String transactionID) {
772                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
773                     this.transactionID = transactionID;
774                 }
775
776                 @Override
777                 void onSuccess(final Object resp) throws Exception {
778                     CanCommitTransactionReply canCommitReply =
779                             CanCommitTransactionReply.fromSerializable(resp);
780                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
781
782                     Future<Object> commitFuture = Patterns.ask(shard,
783                             new CommitTransaction(transactionID).toSerializable(), timeout);
784                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
785                 }
786             }
787
788             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
789                     getSystem().dispatcher());
790
791             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
792                     getSystem().dispatcher());
793
794             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
795
796             if(caughtEx.get() != null) {
797                 throw caughtEx.get();
798             }
799
800             assertEquals("Commits complete", true, done);
801
802             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
803             inOrder.verify(cohort1).canCommit();
804             inOrder.verify(cohort1).preCommit();
805             inOrder.verify(cohort1).commit();
806             inOrder.verify(cohort2).canCommit();
807             inOrder.verify(cohort2).preCommit();
808             inOrder.verify(cohort2).commit();
809             inOrder.verify(cohort3).canCommit();
810             inOrder.verify(cohort3).preCommit();
811             inOrder.verify(cohort3).commit();
812
813             // Verify data in the data store.
814
815             verifyOuterListEntry(shard, 1);
816
817             verifyLastApplied(shard, 2);
818
819             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
820         }};
821     }
822
823     private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
824             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady, int messagesSent) {
825         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
826     }
827
828     private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
829             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady,
830             int messagesSent) {
831         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
832         batched.addModification(new WriteModification(path, data));
833         batched.setReady(ready);
834         batched.setDoCommitOnReady(doCommitOnReady);
835         batched.setTotalMessagesSent(messagesSent);
836         return batched;
837     }
838
839     @Test
840     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
841         new ShardTestKit(getSystem()) {{
842             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
843                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
844                     "testBatchedModificationsWithNoCommitOnReady");
845
846             waitUntilLeader(shard);
847
848             final String transactionID = "tx";
849             FiniteDuration duration = duration("5 seconds");
850
851             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
852             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
853                 @Override
854                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
855                     if(mockCohort.get() == null) {
856                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
857                     }
858
859                     return mockCohort.get();
860                 }
861             };
862
863             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
864
865             // Send a BatchedModifications to start a transaction.
866
867             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
868                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
869             expectMsgClass(duration, BatchedModificationsReply.class);
870
871             // Send a couple more BatchedModifications.
872
873             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
874                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
875             expectMsgClass(duration, BatchedModificationsReply.class);
876
877             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
878                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
879                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
880             expectMsgClass(duration, ReadyTransactionReply.class);
881
882             // Send the CanCommitTransaction message.
883
884             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
885             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
886                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
887             assertEquals("Can commit", true, canCommitReply.getCanCommit());
888
889             // Send the CanCommitTransaction message.
890
891             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
892             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
893
894             InOrder inOrder = inOrder(mockCohort.get());
895             inOrder.verify(mockCohort.get()).canCommit();
896             inOrder.verify(mockCohort.get()).preCommit();
897             inOrder.verify(mockCohort.get()).commit();
898
899             // Verify data in the data store.
900
901             verifyOuterListEntry(shard, 1);
902
903             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
904         }};
905     }
906
907     @Test
908     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
909         new ShardTestKit(getSystem()) {{
910             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
911                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
912                     "testBatchedModificationsWithCommitOnReady");
913
914             waitUntilLeader(shard);
915
916             final String transactionID = "tx";
917             FiniteDuration duration = duration("5 seconds");
918
919             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
920             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
921                 @Override
922                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
923                     if(mockCohort.get() == null) {
924                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
925                     }
926
927                     return mockCohort.get();
928                 }
929             };
930
931             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
932
933             // Send a BatchedModifications to start a transaction.
934
935             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
936                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
937             expectMsgClass(duration, BatchedModificationsReply.class);
938
939             // Send a couple more BatchedModifications.
940
941             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
942                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
943             expectMsgClass(duration, BatchedModificationsReply.class);
944
945             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
946                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
947                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
948
949             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
950
951             InOrder inOrder = inOrder(mockCohort.get());
952             inOrder.verify(mockCohort.get()).canCommit();
953             inOrder.verify(mockCohort.get()).preCommit();
954             inOrder.verify(mockCohort.get()).commit();
955
956             // Verify data in the data store.
957
958             verifyOuterListEntry(shard, 1);
959
960             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
961         }};
962     }
963
964     @Test(expected=IllegalStateException.class)
965     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
966         new ShardTestKit(getSystem()) {{
967             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
968                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
969                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
970
971             waitUntilLeader(shard);
972
973             String transactionID = "tx1";
974             BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
975             batched.setReady(true);
976             batched.setTotalMessagesSent(2);
977
978             shard.tell(batched, getRef());
979
980             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
981
982             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
983
984             if(failure != null) {
985                 throw failure.cause();
986             }
987         }};
988     }
989
990     @SuppressWarnings("unchecked")
991     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
992         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
993         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
994         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
995                 outerList.getValue() instanceof Iterable);
996         Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
997         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
998                 entry instanceof MapEntryNode);
999         MapEntryNode mapEntry = (MapEntryNode)entry;
1000         Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1001                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1002         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1003         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1004     }
1005
1006     @Test
1007     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1008         new ShardTestKit(getSystem()) {{
1009             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1010                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1011                     "testBatchedModificationsOnTransactionChain");
1012
1013             waitUntilLeader(shard);
1014
1015             String transactionChainID = "txChain";
1016             String transactionID1 = "tx1";
1017             String transactionID2 = "tx2";
1018
1019             FiniteDuration duration = duration("5 seconds");
1020
1021             // Send a BatchedModifications to start a chained write transaction and ready it.
1022
1023             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1024             YangInstanceIdentifier path = TestModel.TEST_PATH;
1025             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1026                     containerNode, true, false, 1), getRef());
1027             expectMsgClass(duration, ReadyTransactionReply.class);
1028
1029             // Create a read Tx on the same chain.
1030
1031             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1032                     transactionChainID).toSerializable(), getRef());
1033
1034             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1035
1036             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1037             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1038             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1039
1040             // Commit the write transaction.
1041
1042             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1043             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1044                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1045             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1046
1047             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1048             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1049
1050             // Verify data in the data store.
1051
1052             NormalizedNode<?, ?> actualNode = readStore(shard, path);
1053             assertEquals("Stored node", containerNode, actualNode);
1054
1055             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1056         }};
1057     }
1058
1059     @Test
1060     public void testOnBatchedModificationsWhenNotLeader() {
1061         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1062         new ShardTestKit(getSystem()) {{
1063             Creator<Shard> creator = new Creator<Shard>() {
1064                 private static final long serialVersionUID = 1L;
1065
1066                 @Override
1067                 public Shard create() throws Exception {
1068                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1069                             newDatastoreContext(), SCHEMA_CONTEXT) {
1070                         @Override
1071                         protected boolean isLeader() {
1072                             return overrideLeaderCalls.get() ? false : super.isLeader();
1073                         }
1074
1075                         @Override
1076                         protected ActorSelection getLeader() {
1077                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1078                                 super.getLeader();
1079                         }
1080                     };
1081                 }
1082             };
1083
1084             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1085                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1086
1087             waitUntilLeader(shard);
1088
1089             overrideLeaderCalls.set(true);
1090
1091             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1092
1093             shard.tell(batched, ActorRef.noSender());
1094
1095             expectMsgEquals(batched);
1096
1097             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1098         }};
1099     }
1100
1101     @Test
1102     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1103         new ShardTestKit(getSystem()) {{
1104             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1105                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1106                     "testForwardedReadyTransactionWithImmediateCommit");
1107
1108             waitUntilLeader(shard);
1109
1110             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1111
1112             String transactionID = "tx1";
1113             MutableCompositeModification modification = new MutableCompositeModification();
1114             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1115             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1116                     TestModel.TEST_PATH, containerNode, modification);
1117
1118             FiniteDuration duration = duration("5 seconds");
1119
1120             // Simulate the ForwardedReadyTransaction messages that would be sent
1121             // by the ShardTransaction.
1122
1123             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1124                     cohort, modification, true, true), getRef());
1125
1126             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1127
1128             InOrder inOrder = inOrder(cohort);
1129             inOrder.verify(cohort).canCommit();
1130             inOrder.verify(cohort).preCommit();
1131             inOrder.verify(cohort).commit();
1132
1133             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1134             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1135
1136             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1137         }};
1138     }
1139
1140     @Test
1141     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1142         new ShardTestKit(getSystem()) {{
1143             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1144                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1145                     "testReadyLocalTransactionWithImmediateCommit");
1146
1147             waitUntilLeader(shard);
1148
1149             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1150
1151             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1152
1153             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1154             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1155             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1156             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1157
1158             String txId = "tx1";
1159             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1160
1161             shard.tell(readyMessage, getRef());
1162
1163             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1164
1165             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1166             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1167
1168             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1169         }};
1170     }
1171
1172     @Test
1173     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1174         new ShardTestKit(getSystem()) {{
1175             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1176                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1177                     "testReadyLocalTransactionWithThreePhaseCommit");
1178
1179             waitUntilLeader(shard);
1180
1181             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1182
1183             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1184
1185             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1186             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1187             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1188             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1189
1190             String txId = "tx1";
1191             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1192
1193             shard.tell(readyMessage, getRef());
1194
1195             expectMsgClass(ReadyTransactionReply.class);
1196
1197             // Send the CanCommitTransaction message.
1198
1199             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1200             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1201                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1202             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1203
1204             // Send the CanCommitTransaction message.
1205
1206             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1207             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1208
1209             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1210             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1211
1212             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1213         }};
1214     }
1215
1216     @Test
1217     public void testCommitWithPersistenceDisabled() throws Throwable {
1218         dataStoreContextBuilder.persistent(false);
1219         new ShardTestKit(getSystem()) {{
1220             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1221                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1222                     "testCommitWithPersistenceDisabled");
1223
1224             waitUntilLeader(shard);
1225
1226             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1227
1228             // Setup a simulated transactions with a mock cohort.
1229
1230             String transactionID = "tx";
1231             MutableCompositeModification modification = new MutableCompositeModification();
1232             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1233             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1234                     TestModel.TEST_PATH, containerNode, modification);
1235
1236             FiniteDuration duration = duration("5 seconds");
1237
1238             // Simulate the ForwardedReadyTransaction messages that would be sent
1239             // by the ShardTransaction.
1240
1241             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1242                     cohort, modification, true, false), getRef());
1243             expectMsgClass(duration, ReadyTransactionReply.class);
1244
1245             // Send the CanCommitTransaction message.
1246
1247             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1248             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1249                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1250             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1251
1252             // Send the CanCommitTransaction message.
1253
1254             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1255             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1256
1257             InOrder inOrder = inOrder(cohort);
1258             inOrder.verify(cohort).canCommit();
1259             inOrder.verify(cohort).preCommit();
1260             inOrder.verify(cohort).commit();
1261
1262             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1263             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1264
1265             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1266         }};
1267     }
1268
1269     private static DataTreeCandidateTip mockCandidate(final String name) {
1270         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1271         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1272         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1273         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1274         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1275         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1276         return mockCandidate;
1277     }
1278
1279     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1280         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1281         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1282         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1283         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1284         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1285         return mockCandidate;
1286     }
1287
1288     @Test
1289     public void testCommitWhenTransactionHasNoModifications(){
1290         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1291         // but here that need not happen
1292         new ShardTestKit(getSystem()) {
1293             {
1294                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1295                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1296                         "testCommitWhenTransactionHasNoModifications");
1297
1298                 waitUntilLeader(shard);
1299
1300                 String transactionID = "tx1";
1301                 MutableCompositeModification modification = new MutableCompositeModification();
1302                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1303                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1304                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1305                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1306                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1307
1308                 FiniteDuration duration = duration("5 seconds");
1309
1310                 // Simulate the ForwardedReadyTransaction messages that would be sent
1311                 // by the ShardTransaction.
1312
1313                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1314                         cohort, modification, true, false), getRef());
1315                 expectMsgClass(duration, ReadyTransactionReply.class);
1316
1317                 // Send the CanCommitTransaction message.
1318
1319                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1320                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1321                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1322                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1323
1324                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1325                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1326
1327                 InOrder inOrder = inOrder(cohort);
1328                 inOrder.verify(cohort).canCommit();
1329                 inOrder.verify(cohort).preCommit();
1330                 inOrder.verify(cohort).commit();
1331
1332                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1333                 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1334
1335                 // Use MBean for verification
1336                 // Committed transaction count should increase as usual
1337                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1338
1339                 // Commit index should not advance because this does not go into the journal
1340                 assertEquals(-1, shardStats.getCommitIndex());
1341
1342                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1343
1344             }
1345         };
1346     }
1347
1348     @Test
1349     public void testCommitWhenTransactionHasModifications(){
1350         new ShardTestKit(getSystem()) {
1351             {
1352                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1353                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1354                         "testCommitWhenTransactionHasModifications");
1355
1356                 waitUntilLeader(shard);
1357
1358                 String transactionID = "tx1";
1359                 MutableCompositeModification modification = new MutableCompositeModification();
1360                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1361                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1362                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1363                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1364                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1365                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1366
1367                 FiniteDuration duration = duration("5 seconds");
1368
1369                 // Simulate the ForwardedReadyTransaction messages that would be sent
1370                 // by the ShardTransaction.
1371
1372                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1373                         cohort, modification, true, false), getRef());
1374                 expectMsgClass(duration, ReadyTransactionReply.class);
1375
1376                 // Send the CanCommitTransaction message.
1377
1378                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1379                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1380                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1381                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1382
1383                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1384                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1385
1386                 InOrder inOrder = inOrder(cohort);
1387                 inOrder.verify(cohort).canCommit();
1388                 inOrder.verify(cohort).preCommit();
1389                 inOrder.verify(cohort).commit();
1390
1391                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1392                 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1393
1394                 // Use MBean for verification
1395                 // Committed transaction count should increase as usual
1396                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1397
1398                 // Commit index should advance as we do not have an empty modification
1399                 assertEquals(0, shardStats.getCommitIndex());
1400
1401                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1402
1403             }
1404         };
1405     }
1406
1407     @Test
1408     public void testCommitPhaseFailure() throws Throwable {
1409         new ShardTestKit(getSystem()) {{
1410             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1411                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1412                     "testCommitPhaseFailure");
1413
1414             waitUntilLeader(shard);
1415
1416             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1417             // commit phase.
1418
1419             String transactionID1 = "tx1";
1420             MutableCompositeModification modification1 = new MutableCompositeModification();
1421             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1422             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1423             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1424             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1425             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1426
1427             String transactionID2 = "tx2";
1428             MutableCompositeModification modification2 = new MutableCompositeModification();
1429             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1430             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1431
1432             FiniteDuration duration = duration("5 seconds");
1433             final Timeout timeout = new Timeout(duration);
1434
1435             // Simulate the ForwardedReadyTransaction messages that would be sent
1436             // by the ShardTransaction.
1437
1438             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1439                     cohort1, modification1, true, false), getRef());
1440             expectMsgClass(duration, ReadyTransactionReply.class);
1441
1442             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1443                     cohort2, modification2, true, false), getRef());
1444             expectMsgClass(duration, ReadyTransactionReply.class);
1445
1446             // Send the CanCommitTransaction message for the first Tx.
1447
1448             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1449             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1450                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1451             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1452
1453             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1454             // processed after the first Tx completes.
1455
1456             Future<Object> canCommitFuture = Patterns.ask(shard,
1457                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1458
1459             // Send the CommitTransaction message for the first Tx. This should send back an error
1460             // and trigger the 2nd Tx to proceed.
1461
1462             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1463             expectMsgClass(duration, akka.actor.Status.Failure.class);
1464
1465             // Wait for the 2nd Tx to complete the canCommit phase.
1466
1467             final CountDownLatch latch = new CountDownLatch(1);
1468             canCommitFuture.onComplete(new OnComplete<Object>() {
1469                 @Override
1470                 public void onComplete(final Throwable t, final Object resp) {
1471                     latch.countDown();
1472                 }
1473             }, getSystem().dispatcher());
1474
1475             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1476
1477             InOrder inOrder = inOrder(cohort1, cohort2);
1478             inOrder.verify(cohort1).canCommit();
1479             inOrder.verify(cohort1).preCommit();
1480             inOrder.verify(cohort1).commit();
1481             inOrder.verify(cohort2).canCommit();
1482
1483             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1484         }};
1485     }
1486
1487     @Test
1488     public void testPreCommitPhaseFailure() throws Throwable {
1489         new ShardTestKit(getSystem()) {{
1490             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1491                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1492                     "testPreCommitPhaseFailure");
1493
1494             waitUntilLeader(shard);
1495
1496             String transactionID1 = "tx1";
1497             MutableCompositeModification modification1 = new MutableCompositeModification();
1498             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1499             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1500             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1501
1502             String transactionID2 = "tx2";
1503             MutableCompositeModification modification2 = new MutableCompositeModification();
1504             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1505             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1506
1507             FiniteDuration duration = duration("5 seconds");
1508             final Timeout timeout = new Timeout(duration);
1509
1510             // Simulate the ForwardedReadyTransaction messages that would be sent
1511             // by the ShardTransaction.
1512
1513             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1514                     cohort1, modification1, true, false), getRef());
1515             expectMsgClass(duration, ReadyTransactionReply.class);
1516
1517             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1518                     cohort2, modification2, true, false), getRef());
1519             expectMsgClass(duration, ReadyTransactionReply.class);
1520
1521             // Send the CanCommitTransaction message for the first Tx.
1522
1523             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1524             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1525                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1526             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1527
1528             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1529             // processed after the first Tx completes.
1530
1531             Future<Object> canCommitFuture = Patterns.ask(shard,
1532                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1533
1534             // Send the CommitTransaction message for the first Tx. This should send back an error
1535             // and trigger the 2nd Tx to proceed.
1536
1537             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1538             expectMsgClass(duration, akka.actor.Status.Failure.class);
1539
1540             // Wait for the 2nd Tx to complete the canCommit phase.
1541
1542             final CountDownLatch latch = new CountDownLatch(1);
1543             canCommitFuture.onComplete(new OnComplete<Object>() {
1544                 @Override
1545                 public void onComplete(final Throwable t, final Object resp) {
1546                     latch.countDown();
1547                 }
1548             }, getSystem().dispatcher());
1549
1550             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1551
1552             InOrder inOrder = inOrder(cohort1, cohort2);
1553             inOrder.verify(cohort1).canCommit();
1554             inOrder.verify(cohort1).preCommit();
1555             inOrder.verify(cohort2).canCommit();
1556
1557             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1558         }};
1559     }
1560
1561     @Test
1562     public void testCanCommitPhaseFailure() throws Throwable {
1563         new ShardTestKit(getSystem()) {{
1564             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1565                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1566                     "testCanCommitPhaseFailure");
1567
1568             waitUntilLeader(shard);
1569
1570             final FiniteDuration duration = duration("5 seconds");
1571
1572             String transactionID1 = "tx1";
1573             MutableCompositeModification modification = new MutableCompositeModification();
1574             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1575             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1576
1577             // Simulate the ForwardedReadyTransaction messages that would be sent
1578             // by the ShardTransaction.
1579
1580             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1581                     cohort, modification, true, false), getRef());
1582             expectMsgClass(duration, ReadyTransactionReply.class);
1583
1584             // Send the CanCommitTransaction message.
1585
1586             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1587             expectMsgClass(duration, akka.actor.Status.Failure.class);
1588
1589             // Send another can commit to ensure the failed one got cleaned up.
1590
1591             reset(cohort);
1592
1593             String transactionID2 = "tx2";
1594             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1595
1596             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1597                     cohort, modification, true, false), getRef());
1598             expectMsgClass(duration, ReadyTransactionReply.class);
1599
1600             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1601             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1602                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1603             assertEquals("getCanCommit", true, reply.getCanCommit());
1604
1605             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1606         }};
1607     }
1608
1609     @Test
1610     public void testCanCommitPhaseFalseResponse() throws Throwable {
1611         new ShardTestKit(getSystem()) {{
1612             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1613                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1614                     "testCanCommitPhaseFalseResponse");
1615
1616             waitUntilLeader(shard);
1617
1618             final FiniteDuration duration = duration("5 seconds");
1619
1620             String transactionID1 = "tx1";
1621             MutableCompositeModification modification = new MutableCompositeModification();
1622             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1623             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1624
1625             // Simulate the ForwardedReadyTransaction messages that would be sent
1626             // by the ShardTransaction.
1627
1628             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1629                     cohort, modification, true, false), getRef());
1630             expectMsgClass(duration, ReadyTransactionReply.class);
1631
1632             // Send the CanCommitTransaction message.
1633
1634             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1635             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1636                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1637             assertEquals("getCanCommit", false, reply.getCanCommit());
1638
1639             // Send another can commit to ensure the failed one got cleaned up.
1640
1641             reset(cohort);
1642
1643             String transactionID2 = "tx2";
1644             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1645
1646             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1647                     cohort, modification, true, false), getRef());
1648             expectMsgClass(duration, ReadyTransactionReply.class);
1649
1650             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1651             reply = CanCommitTransactionReply.fromSerializable(
1652                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1653             assertEquals("getCanCommit", true, reply.getCanCommit());
1654
1655             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1656         }};
1657     }
1658
1659     @Test
1660     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1661         new ShardTestKit(getSystem()) {{
1662             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1663                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1664                     "testImmediateCommitWithCanCommitPhaseFailure");
1665
1666             waitUntilLeader(shard);
1667
1668             final FiniteDuration duration = duration("5 seconds");
1669
1670             String transactionID1 = "tx1";
1671             MutableCompositeModification modification = new MutableCompositeModification();
1672             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1673             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1674
1675             // Simulate the ForwardedReadyTransaction messages that would be sent
1676             // by the ShardTransaction.
1677
1678             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1679                     cohort, modification, true, true), getRef());
1680
1681             expectMsgClass(duration, akka.actor.Status.Failure.class);
1682
1683             // Send another can commit to ensure the failed one got cleaned up.
1684
1685             reset(cohort);
1686
1687             String transactionID2 = "tx2";
1688             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1689             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1690             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1691             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1692             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1693             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1694             doReturn(candidateRoot).when(candidate).getRootNode();
1695             doReturn(candidate).when(cohort).getCandidate();
1696
1697             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1698                     cohort, modification, true, true), getRef());
1699
1700             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1701
1702             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1703         }};
1704     }
1705
1706     @Test
1707     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1708         new ShardTestKit(getSystem()) {{
1709             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1710                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1711                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1712
1713             waitUntilLeader(shard);
1714
1715             final FiniteDuration duration = duration("5 seconds");
1716
1717             String transactionID = "tx1";
1718             MutableCompositeModification modification = new MutableCompositeModification();
1719             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1720             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1721
1722             // Simulate the ForwardedReadyTransaction messages that would be sent
1723             // by the ShardTransaction.
1724
1725             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1726                     cohort, modification, true, true), getRef());
1727
1728             expectMsgClass(duration, akka.actor.Status.Failure.class);
1729
1730             // Send another can commit to ensure the failed one got cleaned up.
1731
1732             reset(cohort);
1733
1734             String transactionID2 = "tx2";
1735             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1736             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1737             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1738             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1739             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1740             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1741             doReturn(candidateRoot).when(candidate).getRootNode();
1742             doReturn(candidate).when(cohort).getCandidate();
1743
1744             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1745                     cohort, modification, true, true), getRef());
1746
1747             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1748
1749             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1750         }};
1751     }
1752
1753     @Test
1754     public void testAbortBeforeFinishCommit() throws Throwable {
1755         new ShardTestKit(getSystem()) {{
1756             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1757                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1758                     "testAbortBeforeFinishCommit");
1759
1760             waitUntilLeader(shard);
1761
1762             final FiniteDuration duration = duration("5 seconds");
1763             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1764
1765             final String transactionID = "tx1";
1766             Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1767                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1768                 @Override
1769                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1770                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1771
1772                     // Simulate an AbortTransaction message occurring during replication, after
1773                     // persisting and before finishing the commit to the in-memory store.
1774                     // We have no followers so due to optimizations in the RaftActor, it does not
1775                     // attempt replication and thus we can't send an AbortTransaction message b/c
1776                     // it would be processed too late after CommitTransaction completes. So we'll
1777                     // simulate an AbortTransaction message occurring during replication by calling
1778                     // the shard directly.
1779                     //
1780                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1781
1782                     return preCommitFuture;
1783                 }
1784             };
1785
1786             MutableCompositeModification modification = new MutableCompositeModification();
1787             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1788                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1789                     modification, preCommit);
1790
1791             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1792                     cohort, modification, true, false), getRef());
1793             expectMsgClass(duration, ReadyTransactionReply.class);
1794
1795             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1796             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1797                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1798             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1799
1800             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1801             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1802
1803             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1804
1805             // Since we're simulating an abort occurring during replication and before finish commit,
1806             // the data should still get written to the in-memory store since we've gotten past
1807             // canCommit and preCommit and persisted the data.
1808             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1809
1810             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1811         }};
1812     }
1813
1814     @Test
1815     public void testTransactionCommitTimeout() throws Throwable {
1816         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1817
1818         new ShardTestKit(getSystem()) {{
1819             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1820                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1821                     "testTransactionCommitTimeout");
1822
1823             waitUntilLeader(shard);
1824
1825             final FiniteDuration duration = duration("5 seconds");
1826
1827             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1828
1829             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1830             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1831                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1832
1833             // Create 1st Tx - will timeout
1834
1835             String transactionID1 = "tx1";
1836             MutableCompositeModification modification1 = new MutableCompositeModification();
1837             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1838                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1839                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1840                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1841                     modification1);
1842
1843             // Create 2nd Tx
1844
1845             String transactionID2 = "tx3";
1846             MutableCompositeModification modification2 = new MutableCompositeModification();
1847             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1848                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1849             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1850                     listNodePath,
1851                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1852                     modification2);
1853
1854             // Ready the Tx's
1855
1856             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1857                     cohort1, modification1, true, false), getRef());
1858             expectMsgClass(duration, ReadyTransactionReply.class);
1859
1860             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1861                     cohort2, modification2, true, false), getRef());
1862             expectMsgClass(duration, ReadyTransactionReply.class);
1863
1864             // canCommit 1st Tx. We don't send the commit so it should timeout.
1865
1866             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1867             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1868
1869             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1870
1871             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1872             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1873
1874             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1875
1876             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1877             expectMsgClass(duration, akka.actor.Status.Failure.class);
1878
1879             // Commit the 2nd Tx.
1880
1881             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1882             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1883
1884             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1885             assertNotNull(listNodePath + " not found", node);
1886
1887             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1888         }};
1889     }
1890
1891     @Test
1892     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1893         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1894
1895         new ShardTestKit(getSystem()) {{
1896             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1897                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1898                     "testTransactionCommitQueueCapacityExceeded");
1899
1900             waitUntilLeader(shard);
1901
1902             final FiniteDuration duration = duration("5 seconds");
1903
1904             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1905
1906             String transactionID1 = "tx1";
1907             MutableCompositeModification modification1 = new MutableCompositeModification();
1908             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1909                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1910
1911             String transactionID2 = "tx2";
1912             MutableCompositeModification modification2 = new MutableCompositeModification();
1913             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1914                     TestModel.OUTER_LIST_PATH,
1915                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1916                     modification2);
1917
1918             String transactionID3 = "tx3";
1919             MutableCompositeModification modification3 = new MutableCompositeModification();
1920             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1921                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1922
1923             // Ready the Tx's
1924
1925             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1926                     cohort1, modification1, true, false), getRef());
1927             expectMsgClass(duration, ReadyTransactionReply.class);
1928
1929             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1930                     cohort2, modification2, true, false), getRef());
1931             expectMsgClass(duration, ReadyTransactionReply.class);
1932
1933             // The 3rd Tx should exceed queue capacity and fail.
1934
1935             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1936                     cohort3, modification3, true, false), getRef());
1937             expectMsgClass(duration, akka.actor.Status.Failure.class);
1938
1939             // canCommit 1st Tx.
1940
1941             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1942             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1943
1944             // canCommit the 2nd Tx - it should get queued.
1945
1946             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1947
1948             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1949
1950             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1951             expectMsgClass(duration, akka.actor.Status.Failure.class);
1952
1953             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1954         }};
1955     }
1956
1957     @Test
1958     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1959         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1960
1961         new ShardTestKit(getSystem()) {{
1962             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1963                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1964                     "testTransactionCommitWithPriorExpiredCohortEntries");
1965
1966             waitUntilLeader(shard);
1967
1968             final FiniteDuration duration = duration("5 seconds");
1969
1970             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1971
1972             String transactionID1 = "tx1";
1973             MutableCompositeModification modification1 = new MutableCompositeModification();
1974             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1975                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1976
1977             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1978                     cohort1, modification1, true, false), getRef());
1979             expectMsgClass(duration, ReadyTransactionReply.class);
1980
1981             String transactionID2 = "tx2";
1982             MutableCompositeModification modification2 = new MutableCompositeModification();
1983             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1984                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1985
1986             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1987                     cohort2, modification2, true, false), getRef());
1988             expectMsgClass(duration, ReadyTransactionReply.class);
1989
1990             String transactionID3 = "tx3";
1991             MutableCompositeModification modification3 = new MutableCompositeModification();
1992             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1993                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1994
1995             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1996                     cohort3, modification3, true, false), getRef());
1997             expectMsgClass(duration, ReadyTransactionReply.class);
1998
1999             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2000             // should expire from the queue and the last one should be processed.
2001
2002             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2003             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2004
2005             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2006         }};
2007     }
2008
2009     @Test
2010     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2011         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2012
2013         new ShardTestKit(getSystem()) {{
2014             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2015                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2016                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2017
2018             waitUntilLeader(shard);
2019
2020             final FiniteDuration duration = duration("5 seconds");
2021
2022             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2023
2024             String transactionID1 = "tx1";
2025             MutableCompositeModification modification1 = new MutableCompositeModification();
2026             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2027                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2028
2029             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2030                     cohort1, modification1, true, false), getRef());
2031             expectMsgClass(duration, ReadyTransactionReply.class);
2032
2033             // CanCommit the first one so it's the current in-progress CohortEntry.
2034
2035             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2036             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2037
2038             // Ready the second Tx.
2039
2040             String transactionID2 = "tx2";
2041             MutableCompositeModification modification2 = new MutableCompositeModification();
2042             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2043                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2044
2045             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2046                     cohort2, modification2, true, false), getRef());
2047             expectMsgClass(duration, ReadyTransactionReply.class);
2048
2049             // Ready the third Tx.
2050
2051             String transactionID3 = "tx3";
2052             DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2053             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2054                     .apply(modification3);
2055             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2056
2057             shard.tell(readyMessage, getRef());
2058
2059             // Commit the first Tx. After completing, the second should expire from the queue and the third
2060             // Tx committed.
2061
2062             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2063             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2064
2065             // Expect commit reply from the third Tx.
2066
2067             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2068
2069             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2070             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2071
2072             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2073         }};
2074     }
2075
2076     @Test
2077     public void testCanCommitBeforeReadyFailure() throws Throwable {
2078         new ShardTestKit(getSystem()) {{
2079             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2080                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2081                     "testCanCommitBeforeReadyFailure");
2082
2083             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2084             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2085
2086             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2087         }};
2088     }
2089
2090     @Test
2091     public void testAbortTransaction() throws Throwable {
2092         new ShardTestKit(getSystem()) {{
2093             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2094                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2095                     "testAbortTransaction");
2096
2097             waitUntilLeader(shard);
2098
2099             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2100
2101             String transactionID1 = "tx1";
2102             MutableCompositeModification modification1 = new MutableCompositeModification();
2103             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2104             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2105             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2106
2107             String transactionID2 = "tx2";
2108             MutableCompositeModification modification2 = new MutableCompositeModification();
2109             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2110             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2111
2112             FiniteDuration duration = duration("5 seconds");
2113             final Timeout timeout = new Timeout(duration);
2114
2115             // Simulate the ForwardedReadyTransaction messages that would be sent
2116             // by the ShardTransaction.
2117
2118             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2119                     cohort1, modification1, true, false), getRef());
2120             expectMsgClass(duration, ReadyTransactionReply.class);
2121
2122             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2123                     cohort2, modification2, true, false), getRef());
2124             expectMsgClass(duration, ReadyTransactionReply.class);
2125
2126             // Send the CanCommitTransaction message for the first Tx.
2127
2128             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2129             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2130                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2131             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2132
2133             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2134             // processed after the first Tx completes.
2135
2136             Future<Object> canCommitFuture = Patterns.ask(shard,
2137                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2138
2139             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2140             // Tx to proceed.
2141
2142             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2143             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2144
2145             // Wait for the 2nd Tx to complete the canCommit phase.
2146
2147             Await.ready(canCommitFuture, duration);
2148
2149             InOrder inOrder = inOrder(cohort1, cohort2);
2150             inOrder.verify(cohort1).canCommit();
2151             inOrder.verify(cohort2).canCommit();
2152
2153             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2154         }};
2155     }
2156
2157     @Test
2158     public void testCreateSnapshot() throws Exception {
2159         testCreateSnapshot(true, "testCreateSnapshot");
2160     }
2161
2162     @Test
2163     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2164         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2165     }
2166
2167     @SuppressWarnings("serial")
2168     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2169
2170         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2171
2172         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2173         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2174             TestPersistentDataProvider(DataPersistenceProvider delegate) {
2175                 super(delegate);
2176             }
2177
2178             @Override
2179             public void saveSnapshot(Object o) {
2180                 savedSnapshot.set(o);
2181                 super.saveSnapshot(o);
2182             }
2183         }
2184
2185         dataStoreContextBuilder.persistent(persistent);
2186
2187         new ShardTestKit(getSystem()) {{
2188             class TestShard extends Shard {
2189
2190                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
2191                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
2192                     super(name, peerAddresses, datastoreContext, schemaContext);
2193                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2194                 }
2195
2196                 @Override
2197                 public void handleCommand(Object message) {
2198                     super.handleCommand(message);
2199
2200                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2201                         latch.get().countDown();
2202                     }
2203                 }
2204
2205                 @Override
2206                 public RaftActorContext getRaftActorContext() {
2207                     return super.getRaftActorContext();
2208                 }
2209             }
2210
2211             Creator<Shard> creator = new Creator<Shard>() {
2212                 @Override
2213                 public Shard create() throws Exception {
2214                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2215                             newDatastoreContext(), SCHEMA_CONTEXT);
2216                 }
2217             };
2218
2219             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2220                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2221
2222             waitUntilLeader(shard);
2223
2224             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2225
2226             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2227
2228             // Trigger creation of a snapshot by ensuring
2229             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2230             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2231
2232             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2233
2234             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2235                     savedSnapshot.get() instanceof Snapshot);
2236
2237             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2238
2239             latch.set(new CountDownLatch(1));
2240             savedSnapshot.set(null);
2241
2242             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2243
2244             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2245
2246             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2247                     savedSnapshot.get() instanceof Snapshot);
2248
2249             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2250
2251             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2252         }
2253
2254         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
2255
2256             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2257             assertEquals("Root node", expectedRoot, actual);
2258
2259         }};
2260     }
2261
2262     /**
2263      * This test simply verifies that the applySnapShot logic will work
2264      * @throws ReadFailedException
2265      * @throws DataValidationFailedException
2266      */
2267     @Test
2268     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2269         DataTree store = InMemoryDataTreeFactory.getInstance().create();
2270         store.setSchemaContext(SCHEMA_CONTEXT);
2271
2272         DataTreeModification putTransaction = store.takeSnapshot().newModification();
2273         putTransaction.write(TestModel.TEST_PATH,
2274             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2275         commitTransaction(store, putTransaction);
2276
2277
2278         NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2279
2280         DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2281
2282         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2283         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2284
2285         commitTransaction(store, writeTransaction);
2286
2287         NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2288
2289         assertEquals(expected, actual);
2290     }
2291
2292     @Test
2293     public void testRecoveryApplicable(){
2294
2295         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2296                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2297
2298         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2299                 persistentContext, SCHEMA_CONTEXT);
2300
2301         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2302                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2303
2304         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2305                 nonPersistentContext, SCHEMA_CONTEXT);
2306
2307         new ShardTestKit(getSystem()) {{
2308             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2309                     persistentProps, "testPersistence1");
2310
2311             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2312
2313             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2314
2315             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2316                     nonPersistentProps, "testPersistence2");
2317
2318             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2319
2320             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2321
2322         }};
2323
2324     }
2325
2326     @Test
2327     public void testOnDatastoreContext() {
2328         new ShardTestKit(getSystem()) {{
2329             dataStoreContextBuilder.persistent(true);
2330
2331             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2332
2333             assertEquals("isRecoveryApplicable", true,
2334                     shard.underlyingActor().persistence().isRecoveryApplicable());
2335
2336             waitUntilLeader(shard);
2337
2338             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2339
2340             assertEquals("isRecoveryApplicable", false,
2341                     shard.underlyingActor().persistence().isRecoveryApplicable());
2342
2343             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2344
2345             assertEquals("isRecoveryApplicable", true,
2346                     shard.underlyingActor().persistence().isRecoveryApplicable());
2347
2348             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2349         }};
2350     }
2351
2352     @Test
2353     public void testRegisterRoleChangeListener() throws Exception {
2354         new ShardTestKit(getSystem()) {
2355             {
2356                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2357                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2358                         "testRegisterRoleChangeListener");
2359
2360                 waitUntilLeader(shard);
2361
2362                 TestActorRef<MessageCollectorActor> listener =
2363                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2364
2365                 shard.tell(new RegisterRoleChangeListener(), listener);
2366
2367                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2368
2369                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2370                         ShardLeaderStateChanged.class);
2371                 assertEquals("getLocalShardDataTree present", true,
2372                         leaderStateChanged.getLocalShardDataTree().isPresent());
2373                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2374                         leaderStateChanged.getLocalShardDataTree().get());
2375
2376                 MessageCollectorActor.clearMessages(listener);
2377
2378                 // Force a leader change
2379
2380                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2381
2382                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2383                         ShardLeaderStateChanged.class);
2384                 assertEquals("getLocalShardDataTree present", false,
2385                         leaderStateChanged.getLocalShardDataTree().isPresent());
2386
2387                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2388             }
2389         };
2390     }
2391
2392     @Test
2393     public void testFollowerInitialSyncStatus() throws Exception {
2394         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2395                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2396                 "testFollowerInitialSyncStatus");
2397
2398         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2399
2400         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2401
2402         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2403
2404         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2405
2406         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2407     }
2408
2409     private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2410         modification.ready();
2411         store.validate(modification);
2412         store.commit(store.prepare(modification));
2413     }
2414 }