afd354317caa26eea6c3ec607c92e319e8c08df3
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashSet;
43 import java.util.Set;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Test;
49 import org.mockito.InOrder;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
52 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
53 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
54 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
74 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
75 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
76 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
77 import org.opendaylight.controller.cluster.datastore.modification.Modification;
78 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
79 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
80 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
89 import org.opendaylight.controller.cluster.raft.Snapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
92 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
96 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
97 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
98 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
99 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
100 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
101 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
102 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
103 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
104 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
105 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
106 import org.opendaylight.yangtools.yang.common.QName;
107 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
109 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
111 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
123 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
124 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
125 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
126 import scala.concurrent.Await;
127 import scala.concurrent.Future;
128 import scala.concurrent.duration.FiniteDuration;
129
130 public class ShardTest extends AbstractShardTest {
131     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
132
133     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
134
135     @Test
136     public void testRegisterChangeListener() throws Exception {
137         new ShardTestKit(getSystem()) {{
138             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
139                     newShardProps(),  "testRegisterChangeListener");
140
141             waitUntilLeader(shard);
142
143             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
144
145             final MockDataChangeListener listener = new MockDataChangeListener(1);
146             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
147                     "testRegisterChangeListener-DataChangeListener");
148
149             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
150                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
151
152             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
153                     RegisterChangeListenerReply.class);
154             final String replyPath = reply.getListenerRegistrationPath().toString();
155             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
156                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
157
158             final YangInstanceIdentifier path = TestModel.TEST_PATH;
159             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
160
161             listener.waitForChangeEvents(path);
162
163             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
164             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
165         }};
166     }
167
168     @SuppressWarnings("serial")
169     @Test
170     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
171         // This test tests the timing window in which a change listener is registered before the
172         // shard becomes the leader. We verify that the listener is registered and notified of the
173         // existing data when the shard becomes the leader.
174         new ShardTestKit(getSystem()) {{
175             // For this test, we want to send the RegisterChangeListener message after the shard
176             // has recovered from persistence and before it becomes the leader. So we subclass
177             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
178             // we know that the shard has been initialized to a follower and has started the
179             // election process. The following 2 CountDownLatches are used to coordinate the
180             // ElectionTimeout with the sending of the RegisterChangeListener message.
181             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
182             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
183             final Creator<Shard> creator = new Creator<Shard>() {
184                 boolean firstElectionTimeout = true;
185
186                 @Override
187                 public Shard create() throws Exception {
188                     // Use a non persistent provider because this test actually invokes persist on the journal
189                     // this will cause all other messages to not be queued properly after that.
190                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
191                     // it does do a persist)
192                     return new Shard(newShardBuilder()) {
193                         @Override
194                         public void onReceiveCommand(final Object message) throws Exception {
195                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
196                                 // Got the first ElectionTimeout. We don't forward it to the
197                                 // base Shard yet until we've sent the RegisterChangeListener
198                                 // message. So we signal the onFirstElectionTimeout latch to tell
199                                 // the main thread to send the RegisterChangeListener message and
200                                 // start a thread to wait on the onChangeListenerRegistered latch,
201                                 // which the main thread signals after it has sent the message.
202                                 // After the onChangeListenerRegistered is triggered, we send the
203                                 // original ElectionTimeout message to proceed with the election.
204                                 firstElectionTimeout = false;
205                                 final ActorRef self = getSelf();
206                                 new Thread() {
207                                     @Override
208                                     public void run() {
209                                         Uninterruptibles.awaitUninterruptibly(
210                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
211                                         self.tell(message, self);
212                                     }
213                                 }.start();
214
215                                 onFirstElectionTimeout.countDown();
216                             } else {
217                                 super.onReceiveCommand(message);
218                             }
219                         }
220                     };
221                 }
222             };
223
224             final MockDataChangeListener listener = new MockDataChangeListener(1);
225             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
226                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
227
228             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
229                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
230                     "testRegisterChangeListenerWhenNotLeaderInitially");
231
232             // Write initial data into the in-memory store.
233             final YangInstanceIdentifier path = TestModel.TEST_PATH;
234             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
235
236             // Wait until the shard receives the first ElectionTimeout message.
237             assertEquals("Got first ElectionTimeout", true,
238                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
239
240             // Now send the RegisterChangeListener and wait for the reply.
241             shard.tell(new RegisterChangeListener(path, dclActor,
242                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
243
244             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
245                     RegisterChangeListenerReply.class);
246             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
247
248             // Sanity check - verify the shard is not the leader yet.
249             shard.tell(new FindLeader(), getRef());
250             final FindLeaderReply findLeadeReply =
251                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
252             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
253
254             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
255             // with the election process.
256             onChangeListenerRegistered.countDown();
257
258             // Wait for the shard to become the leader and notify our listener with the existing
259             // data in the store.
260             listener.waitForChangeEvents(path);
261
262             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
263             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
264         }};
265     }
266
267     @Test
268     public void testRegisterDataTreeChangeListener() throws Exception {
269         new ShardTestKit(getSystem()) {{
270             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
271                     newShardProps(), "testRegisterDataTreeChangeListener");
272
273             waitUntilLeader(shard);
274
275             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
276
277             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
278             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
279                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
280
281             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
282
283             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
284                     RegisterDataTreeChangeListenerReply.class);
285             final String replyPath = reply.getListenerRegistrationPath().toString();
286             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
287                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
288
289             final YangInstanceIdentifier path = TestModel.TEST_PATH;
290             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
291
292             listener.waitForChangeEvents();
293
294             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
295             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
296         }};
297     }
298
299     @SuppressWarnings("serial")
300     @Test
301     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
302         new ShardTestKit(getSystem()) {{
303             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
304             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
305             final Creator<Shard> creator = new Creator<Shard>() {
306                 boolean firstElectionTimeout = true;
307
308                 @Override
309                 public Shard create() throws Exception {
310                     return new Shard(Shard.builder().id(shardID).datastoreContext(
311                             dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
312                         @Override
313                         public void onReceiveCommand(final Object message) throws Exception {
314                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
315                                 firstElectionTimeout = false;
316                                 final ActorRef self = getSelf();
317                                 new Thread() {
318                                     @Override
319                                     public void run() {
320                                         Uninterruptibles.awaitUninterruptibly(
321                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
322                                         self.tell(message, self);
323                                     }
324                                 }.start();
325
326                                 onFirstElectionTimeout.countDown();
327                             } else {
328                                 super.onReceiveCommand(message);
329                             }
330                         }
331                     };
332                 }
333             };
334
335             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
336             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
337                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
338
339             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
340                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
341                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
342
343             final YangInstanceIdentifier path = TestModel.TEST_PATH;
344             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
345
346             assertEquals("Got first ElectionTimeout", true,
347                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
348
349             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
350             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
351                 RegisterDataTreeChangeListenerReply.class);
352             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
353
354             shard.tell(new FindLeader(), getRef());
355             final FindLeaderReply findLeadeReply =
356                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
357             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
358
359             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
360
361             onChangeListenerRegistered.countDown();
362
363             // TODO: investigate why we do not receive data chage events
364             listener.waitForChangeEvents();
365
366             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
367             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
368         }};
369     }
370
371     @Test
372     public void testCreateTransaction(){
373         new ShardTestKit(getSystem()) {{
374             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
375
376             waitUntilLeader(shard);
377
378             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
379
380             shard.tell(new CreateTransaction("txn-1",
381                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
382
383             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
384                     CreateTransactionReply.class);
385
386             final String path = reply.getTransactionActorPath().toString();
387             assertTrue("Unexpected transaction path " + path,
388                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
389
390             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
391         }};
392     }
393
394     @Test
395     public void testCreateTransactionOnChain(){
396         new ShardTestKit(getSystem()) {{
397             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
398
399             waitUntilLeader(shard);
400
401             shard.tell(new CreateTransaction("txn-1",
402                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
403                     getRef());
404
405             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
406                     CreateTransactionReply.class);
407
408             final String path = reply.getTransactionActorPath().toString();
409             assertTrue("Unexpected transaction path " + path,
410                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
411
412             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
413         }};
414     }
415
416     @SuppressWarnings("serial")
417     @Test
418     public void testPeerAddressResolved() throws Exception {
419         new ShardTestKit(getSystem()) {{
420             final CountDownLatch recoveryComplete = new CountDownLatch(1);
421             class TestShard extends Shard {
422                 TestShard() {
423                     super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
424                             peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
425                             schemaContext(SCHEMA_CONTEXT));
426                 }
427
428                 String getPeerAddress(String id) {
429                     return getRaftActorContext().getPeerAddress(id);
430                 }
431
432                 @Override
433                 protected void onRecoveryComplete() {
434                     try {
435                         super.onRecoveryComplete();
436                     } finally {
437                         recoveryComplete.countDown();
438                     }
439                 }
440             }
441
442             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
443                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
444                         @Override
445                         public TestShard create() throws Exception {
446                             return new TestShard();
447                         }
448                     })), "testPeerAddressResolved");
449
450             assertEquals("Recovery complete", true,
451                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
452
453             final String address = "akka://foobar";
454             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
455
456             assertEquals("getPeerAddress", address,
457                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
458
459             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
460         }};
461     }
462
463     @Test
464     public void testApplySnapshot() throws Exception {
465
466         ShardTestKit testkit = new ShardTestKit(getSystem());
467
468         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
469                 "testApplySnapshot");
470
471         testkit.waitUntilLeader(shard);
472
473         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
474         store.setSchemaContext(SCHEMA_CONTEXT);
475
476         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
477                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
478                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
479                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
480
481         writeToStore(store, TestModel.TEST_PATH, container);
482
483         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
484         final NormalizedNode<?,?> expected = readStore(store, root);
485
486         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
487                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
488
489         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
490
491         final NormalizedNode<?,?> actual = readStore(shard, root);
492
493         assertEquals("Root node", expected, actual);
494
495         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
496     }
497
498     @Test
499     public void testApplyState() throws Exception {
500
501         ShardTestKit testkit = new ShardTestKit(getSystem());
502
503         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
504
505         testkit.waitUntilLeader(shard);
506
507         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
508
509         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
510                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
511
512         shard.underlyingActor().onReceiveCommand(applyState);
513
514         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
515         assertEquals("Applied state", node, actual);
516
517         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
518     }
519
520     @Test
521     public void testApplyStateWithCandidatePayload() throws Exception {
522
523         ShardTestKit testkit = new ShardTestKit(getSystem());
524
525         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
526
527         testkit.waitUntilLeader(shard);
528
529         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
530         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
531
532         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
533                 DataTreeCandidatePayload.create(candidate)));
534
535         shard.underlyingActor().onReceiveCommand(applyState);
536
537         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
538         assertEquals("Applied state", node, actual);
539
540         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
541     }
542
543     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
544         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
545         testStore.setSchemaContext(SCHEMA_CONTEXT);
546
547         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
548
549         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
550
551         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
552             SerializationUtils.serializeNormalizedNode(root),
553             Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
554         return testStore;
555     }
556
557     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
558         source.validate(mod);
559         final DataTreeCandidate candidate = source.prepare(mod);
560         source.commit(candidate);
561         return DataTreeCandidatePayload.create(candidate);
562     }
563
564     @Test
565     public void testDataTreeCandidateRecovery() throws Exception {
566         // Set up the InMemorySnapshotStore.
567         final DataTree source = setupInMemorySnapshotStore();
568
569         final DataTreeModification writeMod = source.takeSnapshot().newModification();
570         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
571         writeMod.ready();
572         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
573
574         // Set up the InMemoryJournal.
575         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
576
577         final int nListEntries = 16;
578         final Set<Integer> listEntryKeys = new HashSet<>();
579
580         // Add some ModificationPayload entries
581         for (int i = 1; i <= nListEntries; i++) {
582             listEntryKeys.add(Integer.valueOf(i));
583
584             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
585                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
586
587             final DataTreeModification mod = source.takeSnapshot().newModification();
588             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
589             mod.ready();
590             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
591                 payloadForModification(source, mod)));
592         }
593
594         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
595             new ApplyJournalEntries(nListEntries));
596
597         testRecovery(listEntryKeys);
598     }
599
600     @Test
601     public void testModicationRecovery() throws Exception {
602
603         // Set up the InMemorySnapshotStore.
604         setupInMemorySnapshotStore();
605
606         // Set up the InMemoryJournal.
607
608         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
609
610         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
611             new WriteModification(TestModel.OUTER_LIST_PATH,
612                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
613
614         final int nListEntries = 16;
615         final Set<Integer> listEntryKeys = new HashSet<>();
616
617         // Add some ModificationPayload entries
618         for(int i = 1; i <= nListEntries; i++) {
619             listEntryKeys.add(Integer.valueOf(i));
620             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
621                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
622             final Modification mod = new MergeModification(path,
623                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
624             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
625                     newModificationPayload(mod)));
626         }
627
628         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
629                 new ApplyJournalEntries(nListEntries));
630
631         testRecovery(listEntryKeys);
632     }
633
634     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
635         final MutableCompositeModification compMod = new MutableCompositeModification();
636         for(final Modification mod: mods) {
637             compMod.addModification(mod);
638         }
639
640         return new ModificationPayload(compMod);
641     }
642
643     @Test
644     public void testConcurrentThreePhaseCommits() throws Throwable {
645         new ShardTestKit(getSystem()) {{
646             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
647                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
648                     "testConcurrentThreePhaseCommits");
649
650             waitUntilLeader(shard);
651
652          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
653
654             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
655
656             final String transactionID1 = "tx1";
657             final MutableCompositeModification modification1 = new MutableCompositeModification();
658             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
659                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
660
661             final String transactionID2 = "tx2";
662             final MutableCompositeModification modification2 = new MutableCompositeModification();
663             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
664                     TestModel.OUTER_LIST_PATH,
665                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
666                     modification2);
667
668             final String transactionID3 = "tx3";
669             final MutableCompositeModification modification3 = new MutableCompositeModification();
670             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
671                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
672                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
673                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
674                     modification3);
675
676             final long timeoutSec = 5;
677             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
678             final Timeout timeout = new Timeout(duration);
679
680             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
681             // by the ShardTransaction.
682
683             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
684                     cohort1, modification1, true, false), getRef());
685             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
686                     expectMsgClass(duration, ReadyTransactionReply.class));
687             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
688
689             // Send the CanCommitTransaction message for the first Tx.
690
691             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
692             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
693                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
694             assertEquals("Can commit", true, canCommitReply.getCanCommit());
695
696             // Send the ForwardedReadyTransaction for the next 2 Tx's.
697
698             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
699                     cohort2, modification2, true, false), getRef());
700             expectMsgClass(duration, ReadyTransactionReply.class);
701
702             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
703                     cohort3, modification3, true, false), getRef());
704             expectMsgClass(duration, ReadyTransactionReply.class);
705
706             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
707             // processed after the first Tx completes.
708
709             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
710                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
711
712             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
713                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
714
715             // Send the CommitTransaction message for the first Tx. After it completes, it should
716             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
717
718             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
719             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
720
721             // Wait for the next 2 Tx's to complete.
722
723             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
724             final CountDownLatch commitLatch = new CountDownLatch(2);
725
726             class OnFutureComplete extends OnComplete<Object> {
727                 private final Class<?> expRespType;
728
729                 OnFutureComplete(final Class<?> expRespType) {
730                     this.expRespType = expRespType;
731                 }
732
733                 @Override
734                 public void onComplete(final Throwable error, final Object resp) {
735                     if(error != null) {
736                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
737                     } else {
738                         try {
739                             assertEquals("Commit response type", expRespType, resp.getClass());
740                             onSuccess(resp);
741                         } catch (final Exception e) {
742                             caughtEx.set(e);
743                         }
744                     }
745                 }
746
747                 void onSuccess(final Object resp) throws Exception {
748                 }
749             }
750
751             class OnCommitFutureComplete extends OnFutureComplete {
752                 OnCommitFutureComplete() {
753                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
754                 }
755
756                 @Override
757                 public void onComplete(final Throwable error, final Object resp) {
758                     super.onComplete(error, resp);
759                     commitLatch.countDown();
760                 }
761             }
762
763             class OnCanCommitFutureComplete extends OnFutureComplete {
764                 private final String transactionID;
765
766                 OnCanCommitFutureComplete(final String transactionID) {
767                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
768                     this.transactionID = transactionID;
769                 }
770
771                 @Override
772                 void onSuccess(final Object resp) throws Exception {
773                     final CanCommitTransactionReply canCommitReply =
774                             CanCommitTransactionReply.fromSerializable(resp);
775                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
776
777                     final Future<Object> commitFuture = Patterns.ask(shard,
778                             new CommitTransaction(transactionID).toSerializable(), timeout);
779                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
780                 }
781             }
782
783             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
784                     getSystem().dispatcher());
785
786             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
787                     getSystem().dispatcher());
788
789             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
790
791             if(caughtEx.get() != null) {
792                 throw caughtEx.get();
793             }
794
795             assertEquals("Commits complete", true, done);
796
797             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
798             inOrder.verify(cohort1).canCommit();
799             inOrder.verify(cohort1).preCommit();
800             inOrder.verify(cohort1).commit();
801             inOrder.verify(cohort2).canCommit();
802             inOrder.verify(cohort2).preCommit();
803             inOrder.verify(cohort2).commit();
804             inOrder.verify(cohort3).canCommit();
805             inOrder.verify(cohort3).preCommit();
806             inOrder.verify(cohort3).commit();
807
808             // Verify data in the data store.
809
810             verifyOuterListEntry(shard, 1);
811
812             verifyLastApplied(shard, 2);
813
814             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
815         }};
816     }
817
818     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
819             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
820         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
821     }
822
823     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
824             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
825             final int messagesSent) {
826         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
827         batched.addModification(new WriteModification(path, data));
828         batched.setReady(ready);
829         batched.setDoCommitOnReady(doCommitOnReady);
830         batched.setTotalMessagesSent(messagesSent);
831         return batched;
832     }
833
834     @Test
835     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
836         new ShardTestKit(getSystem()) {{
837             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
838                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
839                     "testBatchedModificationsWithNoCommitOnReady");
840
841             waitUntilLeader(shard);
842
843             final String transactionID = "tx";
844             final FiniteDuration duration = duration("5 seconds");
845
846             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
847             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
848                 @Override
849                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
850                     if(mockCohort.get() == null) {
851                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
852                     }
853
854                     return mockCohort.get();
855                 }
856             };
857
858             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
859
860             // Send a BatchedModifications to start a transaction.
861
862             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
863                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
864             expectMsgClass(duration, BatchedModificationsReply.class);
865
866             // Send a couple more BatchedModifications.
867
868             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
869                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
870             expectMsgClass(duration, BatchedModificationsReply.class);
871
872             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
873                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
874                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
875             expectMsgClass(duration, ReadyTransactionReply.class);
876
877             // Send the CanCommitTransaction message.
878
879             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
880             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
881                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
882             assertEquals("Can commit", true, canCommitReply.getCanCommit());
883
884             // Send the CanCommitTransaction message.
885
886             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
887             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
888
889             final InOrder inOrder = inOrder(mockCohort.get());
890             inOrder.verify(mockCohort.get()).canCommit();
891             inOrder.verify(mockCohort.get()).preCommit();
892             inOrder.verify(mockCohort.get()).commit();
893
894             // Verify data in the data store.
895
896             verifyOuterListEntry(shard, 1);
897
898             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
899         }};
900     }
901
902     @Test
903     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
904         new ShardTestKit(getSystem()) {{
905             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
906                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
907                     "testBatchedModificationsWithCommitOnReady");
908
909             waitUntilLeader(shard);
910
911             final String transactionID = "tx";
912             final FiniteDuration duration = duration("5 seconds");
913
914             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
915             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
916                 @Override
917                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
918                     if(mockCohort.get() == null) {
919                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
920                     }
921
922                     return mockCohort.get();
923                 }
924             };
925
926             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
927
928             // Send a BatchedModifications to start a transaction.
929
930             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
931                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
932             expectMsgClass(duration, BatchedModificationsReply.class);
933
934             // Send a couple more BatchedModifications.
935
936             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
937                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
938             expectMsgClass(duration, BatchedModificationsReply.class);
939
940             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
941                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
942                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
943
944             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
945
946             final InOrder inOrder = inOrder(mockCohort.get());
947             inOrder.verify(mockCohort.get()).canCommit();
948             inOrder.verify(mockCohort.get()).preCommit();
949             inOrder.verify(mockCohort.get()).commit();
950
951             // Verify data in the data store.
952
953             verifyOuterListEntry(shard, 1);
954
955             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
956         }};
957     }
958
959     @Test(expected=IllegalStateException.class)
960     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
961         new ShardTestKit(getSystem()) {{
962             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
963                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
964                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
965
966             waitUntilLeader(shard);
967
968             final String transactionID = "tx1";
969             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
970             batched.setReady(true);
971             batched.setTotalMessagesSent(2);
972
973             shard.tell(batched, getRef());
974
975             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
976
977             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
978
979             if(failure != null) {
980                 throw failure.cause();
981             }
982         }};
983     }
984
985     @Test
986     public void testBatchedModificationsWithOperationFailure() throws Throwable {
987         new ShardTestKit(getSystem()) {{
988             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
989                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
990                     "testBatchedModificationsWithOperationFailure");
991
992             waitUntilLeader(shard);
993
994             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
995             // write will not validate the children for performance reasons.
996
997             String transactionID = "tx1";
998
999             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1000                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1001                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1002
1003             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1004             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1005             shard.tell(batched, getRef());
1006             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1007
1008             Throwable cause = failure.cause();
1009
1010             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1011             batched.setReady(true);
1012             batched.setTotalMessagesSent(2);
1013
1014             shard.tell(batched, getRef());
1015
1016             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1017             assertEquals("Failure cause", cause, failure.cause());
1018
1019             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1020         }};
1021     }
1022
1023     @SuppressWarnings("unchecked")
1024     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1025         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1026         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1027         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1028                 outerList.getValue() instanceof Iterable);
1029         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1030         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1031                 entry instanceof MapEntryNode);
1032         final MapEntryNode mapEntry = (MapEntryNode)entry;
1033         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1034                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1035         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1036         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1037     }
1038
1039     @Test
1040     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1041         new ShardTestKit(getSystem()) {{
1042             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1043                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1044                     "testBatchedModificationsOnTransactionChain");
1045
1046             waitUntilLeader(shard);
1047
1048             final String transactionChainID = "txChain";
1049             final String transactionID1 = "tx1";
1050             final String transactionID2 = "tx2";
1051
1052             final FiniteDuration duration = duration("5 seconds");
1053
1054             // Send a BatchedModifications to start a chained write transaction and ready it.
1055
1056             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1057             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1058             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1059                     containerNode, true, false, 1), getRef());
1060             expectMsgClass(duration, ReadyTransactionReply.class);
1061
1062             // Create a read Tx on the same chain.
1063
1064             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1065                     transactionChainID).toSerializable(), getRef());
1066
1067             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1068
1069             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1070             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1071             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1072
1073             // Commit the write transaction.
1074
1075             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1076             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1077                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1078             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1079
1080             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1081             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1082
1083             // Verify data in the data store.
1084
1085             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1086             assertEquals("Stored node", containerNode, actualNode);
1087
1088             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1089         }};
1090     }
1091
1092     @Test
1093     public void testOnBatchedModificationsWhenNotLeader() {
1094         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1095         new ShardTestKit(getSystem()) {{
1096             final Creator<Shard> creator = new Creator<Shard>() {
1097                 private static final long serialVersionUID = 1L;
1098
1099                 @Override
1100                 public Shard create() throws Exception {
1101                     return new Shard(newShardBuilder()) {
1102                         @Override
1103                         protected boolean isLeader() {
1104                             return overrideLeaderCalls.get() ? false : super.isLeader();
1105                         }
1106
1107                         @Override
1108                         protected ActorSelection getLeader() {
1109                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1110                                 super.getLeader();
1111                         }
1112                     };
1113                 }
1114             };
1115
1116             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1117                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1118
1119             waitUntilLeader(shard);
1120
1121             overrideLeaderCalls.set(true);
1122
1123             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1124
1125             shard.tell(batched, ActorRef.noSender());
1126
1127             expectMsgEquals(batched);
1128
1129             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1130         }};
1131     }
1132
1133     @Test
1134     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1135         new ShardTestKit(getSystem()) {{
1136             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1137                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1138                     "testForwardedReadyTransactionWithImmediateCommit");
1139
1140             waitUntilLeader(shard);
1141
1142             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1143
1144             final String transactionID = "tx1";
1145             final MutableCompositeModification modification = new MutableCompositeModification();
1146             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1147             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1148                     TestModel.TEST_PATH, containerNode, modification);
1149
1150             final FiniteDuration duration = duration("5 seconds");
1151
1152             // Simulate the ForwardedReadyTransaction messages that would be sent
1153             // by the ShardTransaction.
1154
1155             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1156                     cohort, modification, true, true), getRef());
1157
1158             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1159
1160             final InOrder inOrder = inOrder(cohort);
1161             inOrder.verify(cohort).canCommit();
1162             inOrder.verify(cohort).preCommit();
1163             inOrder.verify(cohort).commit();
1164
1165             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1166             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1167
1168             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1169         }};
1170     }
1171
1172     @Test
1173     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1174         new ShardTestKit(getSystem()) {{
1175             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1176                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1177                     "testReadyLocalTransactionWithImmediateCommit");
1178
1179             waitUntilLeader(shard);
1180
1181             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1182
1183             final DataTreeModification modification = dataStore.newModification();
1184
1185             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1186             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1187             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1188             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1189
1190             final String txId = "tx1";
1191             modification.ready();
1192             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1193
1194             shard.tell(readyMessage, getRef());
1195
1196             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1197
1198             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1199             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1200
1201             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1202         }};
1203     }
1204
1205     @Test
1206     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1207         new ShardTestKit(getSystem()) {{
1208             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1209                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1210                     "testReadyLocalTransactionWithThreePhaseCommit");
1211
1212             waitUntilLeader(shard);
1213
1214             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1215
1216             final DataTreeModification modification = dataStore.newModification();
1217
1218             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1219             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1220             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1221             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1222
1223             final String txId = "tx1";
1224                 modification.ready();
1225             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1226
1227             shard.tell(readyMessage, getRef());
1228
1229             expectMsgClass(ReadyTransactionReply.class);
1230
1231             // Send the CanCommitTransaction message.
1232
1233             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1234             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1235                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1236             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1237
1238             // Send the CanCommitTransaction message.
1239
1240             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1241             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1242
1243             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1244             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1245
1246             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1247         }};
1248     }
1249
1250     @Test
1251     public void testCommitWithPersistenceDisabled() throws Throwable {
1252         dataStoreContextBuilder.persistent(false);
1253         new ShardTestKit(getSystem()) {{
1254             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1255                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1256                     "testCommitWithPersistenceDisabled");
1257
1258             waitUntilLeader(shard);
1259
1260             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1261
1262             // Setup a simulated transactions with a mock cohort.
1263
1264             final String transactionID = "tx";
1265             final MutableCompositeModification modification = new MutableCompositeModification();
1266             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1267             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1268                 TestModel.TEST_PATH, containerNode, modification);
1269
1270             final FiniteDuration duration = duration("5 seconds");
1271
1272             // Simulate the ForwardedReadyTransaction messages that would be sent
1273             // by the ShardTransaction.
1274
1275             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1276                 cohort, modification, true, false), getRef());
1277             expectMsgClass(duration, ReadyTransactionReply.class);
1278
1279             // Send the CanCommitTransaction message.
1280
1281             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1282             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1283                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1284             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1285
1286             // Send the CanCommitTransaction message.
1287
1288             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1289             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1290
1291             final InOrder inOrder = inOrder(cohort);
1292             inOrder.verify(cohort).canCommit();
1293             inOrder.verify(cohort).preCommit();
1294             inOrder.verify(cohort).commit();
1295
1296             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1297             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1298
1299             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1300         }};
1301     }
1302
1303     private static DataTreeCandidateTip mockCandidate(final String name) {
1304         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1305         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1306         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1307         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1308         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1309         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1310         return mockCandidate;
1311     }
1312
1313     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1314         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1315         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1316         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1317         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1318         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1319         return mockCandidate;
1320     }
1321
1322     @Test
1323     public void testCommitWhenTransactionHasNoModifications(){
1324         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1325         // but here that need not happen
1326         new ShardTestKit(getSystem()) {
1327             {
1328                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1329                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1330                         "testCommitWhenTransactionHasNoModifications");
1331
1332                 waitUntilLeader(shard);
1333
1334                 final String transactionID = "tx1";
1335                 final MutableCompositeModification modification = new MutableCompositeModification();
1336                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1337                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1338                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1339                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1340                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1341
1342                 final FiniteDuration duration = duration("5 seconds");
1343
1344                 // Simulate the ForwardedReadyTransaction messages that would be sent
1345                 // by the ShardTransaction.
1346
1347                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1348                         cohort, modification, true, false), getRef());
1349                 expectMsgClass(duration, ReadyTransactionReply.class);
1350
1351                 // Send the CanCommitTransaction message.
1352
1353                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1354                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1355                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1356                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1357
1358                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1359                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1360
1361                 final InOrder inOrder = inOrder(cohort);
1362                 inOrder.verify(cohort).canCommit();
1363                 inOrder.verify(cohort).preCommit();
1364                 inOrder.verify(cohort).commit();
1365
1366                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1367                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1368
1369                 // Use MBean for verification
1370                 // Committed transaction count should increase as usual
1371                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1372
1373                 // Commit index should not advance because this does not go into the journal
1374                 assertEquals(-1, shardStats.getCommitIndex());
1375
1376                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1377
1378             }
1379         };
1380     }
1381
1382     @Test
1383     public void testCommitWhenTransactionHasModifications(){
1384         new ShardTestKit(getSystem()) {
1385             {
1386                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1387                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1388                         "testCommitWhenTransactionHasModifications");
1389
1390                 waitUntilLeader(shard);
1391
1392                 final String transactionID = "tx1";
1393                 final MutableCompositeModification modification = new MutableCompositeModification();
1394                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1395                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1396                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1397                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1398                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1399                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1400
1401                 final FiniteDuration duration = duration("5 seconds");
1402
1403                 // Simulate the ForwardedReadyTransaction messages that would be sent
1404                 // by the ShardTransaction.
1405
1406                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1407                         cohort, modification, true, false), getRef());
1408                 expectMsgClass(duration, ReadyTransactionReply.class);
1409
1410                 // Send the CanCommitTransaction message.
1411
1412                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1413                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1414                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1415                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1416
1417                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1418                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1419
1420                 final InOrder inOrder = inOrder(cohort);
1421                 inOrder.verify(cohort).canCommit();
1422                 inOrder.verify(cohort).preCommit();
1423                 inOrder.verify(cohort).commit();
1424
1425                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1426                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1427
1428                 // Use MBean for verification
1429                 // Committed transaction count should increase as usual
1430                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1431
1432                 // Commit index should advance as we do not have an empty modification
1433                 assertEquals(0, shardStats.getCommitIndex());
1434
1435                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1436
1437             }
1438         };
1439     }
1440
1441     @Test
1442     public void testCommitPhaseFailure() throws Throwable {
1443         new ShardTestKit(getSystem()) {{
1444             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1445                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1446                     "testCommitPhaseFailure");
1447
1448             waitUntilLeader(shard);
1449
1450             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1451             // commit phase.
1452
1453             final String transactionID1 = "tx1";
1454             final MutableCompositeModification modification1 = new MutableCompositeModification();
1455             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1456             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1457             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1458             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1459             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1460
1461             final String transactionID2 = "tx2";
1462             final MutableCompositeModification modification2 = new MutableCompositeModification();
1463             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1464             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1465
1466             final FiniteDuration duration = duration("5 seconds");
1467             final Timeout timeout = new Timeout(duration);
1468
1469             // Simulate the ForwardedReadyTransaction messages that would be sent
1470             // by the ShardTransaction.
1471
1472             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1473                     cohort1, modification1, true, false), getRef());
1474             expectMsgClass(duration, ReadyTransactionReply.class);
1475
1476             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1477                     cohort2, modification2, true, false), getRef());
1478             expectMsgClass(duration, ReadyTransactionReply.class);
1479
1480             // Send the CanCommitTransaction message for the first Tx.
1481
1482             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1483             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1484                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1485             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1486
1487             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1488             // processed after the first Tx completes.
1489
1490             final Future<Object> canCommitFuture = Patterns.ask(shard,
1491                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1492
1493             // Send the CommitTransaction message for the first Tx. This should send back an error
1494             // and trigger the 2nd Tx to proceed.
1495
1496             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1497             expectMsgClass(duration, akka.actor.Status.Failure.class);
1498
1499             // Wait for the 2nd Tx to complete the canCommit phase.
1500
1501             final CountDownLatch latch = new CountDownLatch(1);
1502             canCommitFuture.onComplete(new OnComplete<Object>() {
1503                 @Override
1504                 public void onComplete(final Throwable t, final Object resp) {
1505                     latch.countDown();
1506                 }
1507             }, getSystem().dispatcher());
1508
1509             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1510
1511             final InOrder inOrder = inOrder(cohort1, cohort2);
1512             inOrder.verify(cohort1).canCommit();
1513             inOrder.verify(cohort1).preCommit();
1514             inOrder.verify(cohort1).commit();
1515             inOrder.verify(cohort2).canCommit();
1516
1517             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1518         }};
1519     }
1520
1521     @Test
1522     public void testPreCommitPhaseFailure() throws Throwable {
1523         new ShardTestKit(getSystem()) {{
1524             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1525                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1526                     "testPreCommitPhaseFailure");
1527
1528             waitUntilLeader(shard);
1529
1530             final String transactionID1 = "tx1";
1531             final MutableCompositeModification modification1 = new MutableCompositeModification();
1532             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1533             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1534             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1535
1536             final String transactionID2 = "tx2";
1537             final MutableCompositeModification modification2 = new MutableCompositeModification();
1538             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1539             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1540
1541             final FiniteDuration duration = duration("5 seconds");
1542             final Timeout timeout = new Timeout(duration);
1543
1544             // Simulate the ForwardedReadyTransaction messages that would be sent
1545             // by the ShardTransaction.
1546
1547             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1548                     cohort1, modification1, true, false), getRef());
1549             expectMsgClass(duration, ReadyTransactionReply.class);
1550
1551             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1552                     cohort2, modification2, true, false), getRef());
1553             expectMsgClass(duration, ReadyTransactionReply.class);
1554
1555             // Send the CanCommitTransaction message for the first Tx.
1556
1557             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1558             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1559                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1560             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1561
1562             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1563             // processed after the first Tx completes.
1564
1565             final Future<Object> canCommitFuture = Patterns.ask(shard,
1566                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1567
1568             // Send the CommitTransaction message for the first Tx. This should send back an error
1569             // and trigger the 2nd Tx to proceed.
1570
1571             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1572             expectMsgClass(duration, akka.actor.Status.Failure.class);
1573
1574             // Wait for the 2nd Tx to complete the canCommit phase.
1575
1576             final CountDownLatch latch = new CountDownLatch(1);
1577             canCommitFuture.onComplete(new OnComplete<Object>() {
1578                 @Override
1579                 public void onComplete(final Throwable t, final Object resp) {
1580                     latch.countDown();
1581                 }
1582             }, getSystem().dispatcher());
1583
1584             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1585
1586             final InOrder inOrder = inOrder(cohort1, cohort2);
1587             inOrder.verify(cohort1).canCommit();
1588             inOrder.verify(cohort1).preCommit();
1589             inOrder.verify(cohort2).canCommit();
1590
1591             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1592         }};
1593     }
1594
1595     @Test
1596     public void testCanCommitPhaseFailure() throws Throwable {
1597         new ShardTestKit(getSystem()) {{
1598             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1599                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1600                     "testCanCommitPhaseFailure");
1601
1602             waitUntilLeader(shard);
1603
1604             final FiniteDuration duration = duration("5 seconds");
1605
1606             final String transactionID1 = "tx1";
1607             final MutableCompositeModification modification = new MutableCompositeModification();
1608             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1609             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1610
1611             // Simulate the ForwardedReadyTransaction messages that would be sent
1612             // by the ShardTransaction.
1613
1614             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1615                     cohort, modification, true, false), getRef());
1616             expectMsgClass(duration, ReadyTransactionReply.class);
1617
1618             // Send the CanCommitTransaction message.
1619
1620             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1621             expectMsgClass(duration, akka.actor.Status.Failure.class);
1622
1623             // Send another can commit to ensure the failed one got cleaned up.
1624
1625             reset(cohort);
1626
1627             final String transactionID2 = "tx2";
1628             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1629
1630             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1631                     cohort, modification, true, false), getRef());
1632             expectMsgClass(duration, ReadyTransactionReply.class);
1633
1634             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1635             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1636                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1637             assertEquals("getCanCommit", true, reply.getCanCommit());
1638
1639             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1640         }};
1641     }
1642
1643     @Test
1644     public void testCanCommitPhaseFalseResponse() throws Throwable {
1645         new ShardTestKit(getSystem()) {{
1646             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1647                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1648                     "testCanCommitPhaseFalseResponse");
1649
1650             waitUntilLeader(shard);
1651
1652             final FiniteDuration duration = duration("5 seconds");
1653
1654             final String transactionID1 = "tx1";
1655             final MutableCompositeModification modification = new MutableCompositeModification();
1656             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1657             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1658
1659             // Simulate the ForwardedReadyTransaction messages that would be sent
1660             // by the ShardTransaction.
1661
1662             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1663                     cohort, modification, true, false), getRef());
1664             expectMsgClass(duration, ReadyTransactionReply.class);
1665
1666             // Send the CanCommitTransaction message.
1667
1668             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1669             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1670                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1671             assertEquals("getCanCommit", false, reply.getCanCommit());
1672
1673             // Send another can commit to ensure the failed one got cleaned up.
1674
1675             reset(cohort);
1676
1677             final String transactionID2 = "tx2";
1678             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1679
1680             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1681                 cohort, modification, true, false), getRef());
1682             expectMsgClass(duration, ReadyTransactionReply.class);
1683
1684             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1685             reply = CanCommitTransactionReply.fromSerializable(
1686                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1687             assertEquals("getCanCommit", true, reply.getCanCommit());
1688
1689             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1690         }};
1691     }
1692
1693     @Test
1694     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1695         new ShardTestKit(getSystem()) {{
1696             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1697                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1698                     "testImmediateCommitWithCanCommitPhaseFailure");
1699
1700             waitUntilLeader(shard);
1701
1702             final FiniteDuration duration = duration("5 seconds");
1703
1704             final String transactionID1 = "tx1";
1705             final MutableCompositeModification modification = new MutableCompositeModification();
1706             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1707             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1708
1709             // Simulate the ForwardedReadyTransaction messages that would be sent
1710             // by the ShardTransaction.
1711
1712             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1713                     cohort, modification, true, true), getRef());
1714
1715             expectMsgClass(duration, akka.actor.Status.Failure.class);
1716
1717             // Send another can commit to ensure the failed one got cleaned up.
1718
1719             reset(cohort);
1720
1721             final String transactionID2 = "tx2";
1722             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1723             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1724             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1725             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1726             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1727             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1728             doReturn(candidateRoot).when(candidate).getRootNode();
1729             doReturn(candidate).when(cohort).getCandidate();
1730
1731             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1732                     cohort, modification, true, true), getRef());
1733
1734             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1735
1736             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1737         }};
1738     }
1739
1740     @Test
1741     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1742         new ShardTestKit(getSystem()) {{
1743             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1744                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1745                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1746
1747             waitUntilLeader(shard);
1748
1749             final FiniteDuration duration = duration("5 seconds");
1750
1751             final String transactionID = "tx1";
1752             final MutableCompositeModification modification = new MutableCompositeModification();
1753             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1754             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1755
1756             // Simulate the ForwardedReadyTransaction messages that would be sent
1757             // by the ShardTransaction.
1758
1759             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1760                     cohort, modification, true, true), getRef());
1761
1762             expectMsgClass(duration, akka.actor.Status.Failure.class);
1763
1764             // Send another can commit to ensure the failed one got cleaned up.
1765
1766             reset(cohort);
1767
1768             final String transactionID2 = "tx2";
1769             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1770             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1771             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1772             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1773             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1774             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1775             doReturn(candidateRoot).when(candidate).getRootNode();
1776             doReturn(candidate).when(cohort).getCandidate();
1777
1778             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1779                     cohort, modification, true, true), getRef());
1780
1781             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1782
1783             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1784         }};
1785     }
1786
1787     @Test
1788     public void testAbortBeforeFinishCommit() throws Throwable {
1789         new ShardTestKit(getSystem()) {{
1790             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1791                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1792                     "testAbortBeforeFinishCommit");
1793
1794             waitUntilLeader(shard);
1795
1796             final FiniteDuration duration = duration("5 seconds");
1797             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1798
1799             final String transactionID = "tx1";
1800             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1801                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1802                 @Override
1803                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1804                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1805
1806                     // Simulate an AbortTransaction message occurring during replication, after
1807                     // persisting and before finishing the commit to the in-memory store.
1808                     // We have no followers so due to optimizations in the RaftActor, it does not
1809                     // attempt replication and thus we can't send an AbortTransaction message b/c
1810                     // it would be processed too late after CommitTransaction completes. So we'll
1811                     // simulate an AbortTransaction message occurring during replication by calling
1812                     // the shard directly.
1813                     //
1814                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1815
1816                     return preCommitFuture;
1817                 }
1818             };
1819
1820             final MutableCompositeModification modification = new MutableCompositeModification();
1821             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1822                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1823                     modification, preCommit);
1824
1825             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1826                 cohort, modification, true, false), getRef());
1827             expectMsgClass(duration, ReadyTransactionReply.class);
1828
1829             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1830             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1831                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1832             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1833
1834             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1835             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1836
1837             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1838
1839             // Since we're simulating an abort occurring during replication and before finish commit,
1840             // the data should still get written to the in-memory store since we've gotten past
1841             // canCommit and preCommit and persisted the data.
1842             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1843
1844             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1845         }};
1846     }
1847
1848     @Test
1849     public void testTransactionCommitTimeout() throws Throwable {
1850         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1851
1852         new ShardTestKit(getSystem()) {{
1853             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1854                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1855                     "testTransactionCommitTimeout");
1856
1857             waitUntilLeader(shard);
1858
1859             final FiniteDuration duration = duration("5 seconds");
1860
1861             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1862
1863             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1864             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1865                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1866
1867             // Create 1st Tx - will timeout
1868
1869             final String transactionID1 = "tx1";
1870             final MutableCompositeModification modification1 = new MutableCompositeModification();
1871             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1872                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1873                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1874                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1875                     modification1);
1876
1877             // Create 2nd Tx
1878
1879             final String transactionID2 = "tx3";
1880             final MutableCompositeModification modification2 = new MutableCompositeModification();
1881             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1882                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1883             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1884                     listNodePath,
1885                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1886                     modification2);
1887
1888             // Ready the Tx's
1889
1890             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1891                     cohort1, modification1, true, false), getRef());
1892             expectMsgClass(duration, ReadyTransactionReply.class);
1893
1894             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1895                     cohort2, modification2, true, false), getRef());
1896             expectMsgClass(duration, ReadyTransactionReply.class);
1897
1898             // canCommit 1st Tx. We don't send the commit so it should timeout.
1899
1900             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1901             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1902
1903             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1904
1905             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1906             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1907
1908             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1909
1910             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1911             expectMsgClass(duration, akka.actor.Status.Failure.class);
1912
1913             // Commit the 2nd Tx.
1914
1915             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1916             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1917
1918             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1919             assertNotNull(listNodePath + " not found", node);
1920
1921             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1922         }};
1923     }
1924
1925     @Test
1926     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1927         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1928
1929         new ShardTestKit(getSystem()) {{
1930             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1931                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1932                     "testTransactionCommitQueueCapacityExceeded");
1933
1934             waitUntilLeader(shard);
1935
1936             final FiniteDuration duration = duration("5 seconds");
1937
1938             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1939
1940             final String transactionID1 = "tx1";
1941             final MutableCompositeModification modification1 = new MutableCompositeModification();
1942             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1943                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1944
1945             final String transactionID2 = "tx2";
1946             final MutableCompositeModification modification2 = new MutableCompositeModification();
1947             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1948                     TestModel.OUTER_LIST_PATH,
1949                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1950                     modification2);
1951
1952             final String transactionID3 = "tx3";
1953             final MutableCompositeModification modification3 = new MutableCompositeModification();
1954             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1955                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1956
1957             // Ready the Tx's
1958
1959             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1960                     cohort1, modification1, true, false), getRef());
1961             expectMsgClass(duration, ReadyTransactionReply.class);
1962
1963             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1964                     cohort2, modification2, true, false), getRef());
1965             expectMsgClass(duration, ReadyTransactionReply.class);
1966
1967             // The 3rd Tx should exceed queue capacity and fail.
1968
1969             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1970                     cohort3, modification3, true, false), getRef());
1971             expectMsgClass(duration, akka.actor.Status.Failure.class);
1972
1973             // canCommit 1st Tx.
1974
1975             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1976             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1977
1978             // canCommit the 2nd Tx - it should get queued.
1979
1980             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1981
1982             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1983
1984             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1985             expectMsgClass(duration, akka.actor.Status.Failure.class);
1986
1987             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1988         }};
1989     }
1990
1991     @Test
1992     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1993         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1994
1995         new ShardTestKit(getSystem()) {{
1996             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1997                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1998                     "testTransactionCommitWithPriorExpiredCohortEntries");
1999
2000             waitUntilLeader(shard);
2001
2002             final FiniteDuration duration = duration("5 seconds");
2003
2004             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2005
2006             final String transactionID1 = "tx1";
2007             final MutableCompositeModification modification1 = new MutableCompositeModification();
2008             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2009                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2010
2011             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2012                 cohort1, modification1, true, false), getRef());
2013             expectMsgClass(duration, ReadyTransactionReply.class);
2014
2015             final String transactionID2 = "tx2";
2016             final MutableCompositeModification modification2 = new MutableCompositeModification();
2017             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2018                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2019
2020             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2021                     cohort2, modification2, true, false), getRef());
2022             expectMsgClass(duration, ReadyTransactionReply.class);
2023
2024             final String transactionID3 = "tx3";
2025             final MutableCompositeModification modification3 = new MutableCompositeModification();
2026             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2027                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2028
2029             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2030                 cohort3, modification3, true, false), getRef());
2031             expectMsgClass(duration, ReadyTransactionReply.class);
2032
2033             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2034             // should expire from the queue and the last one should be processed.
2035
2036             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2037             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2038
2039             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2040         }};
2041     }
2042
2043     @Test
2044     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2045         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2046
2047         new ShardTestKit(getSystem()) {{
2048             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2049                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2050                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2051
2052             waitUntilLeader(shard);
2053
2054             final FiniteDuration duration = duration("5 seconds");
2055
2056             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2057
2058             final String transactionID1 = "tx1";
2059             final MutableCompositeModification modification1 = new MutableCompositeModification();
2060             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2061                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2062
2063             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2064                     cohort1, modification1, true, false), getRef());
2065             expectMsgClass(duration, ReadyTransactionReply.class);
2066
2067             // CanCommit the first one so it's the current in-progress CohortEntry.
2068
2069             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2070             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2071
2072             // Ready the second Tx.
2073
2074             final String transactionID2 = "tx2";
2075             final MutableCompositeModification modification2 = new MutableCompositeModification();
2076             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2077                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2078
2079             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2080                     cohort2, modification2, true, false), getRef());
2081             expectMsgClass(duration, ReadyTransactionReply.class);
2082
2083             // Ready the third Tx.
2084
2085             final String transactionID3 = "tx3";
2086             final DataTreeModification modification3 = dataStore.newModification();
2087             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2088                     .apply(modification3);
2089                 modification3.ready();
2090             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2091
2092             shard.tell(readyMessage, getRef());
2093
2094             // Commit the first Tx. After completing, the second should expire from the queue and the third
2095             // Tx committed.
2096
2097             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2098             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2099
2100             // Expect commit reply from the third Tx.
2101
2102             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2103
2104             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2105             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2106
2107             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2108         }};
2109     }
2110
2111     @Test
2112     public void testCanCommitBeforeReadyFailure() throws Throwable {
2113         new ShardTestKit(getSystem()) {{
2114             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2115                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2116                     "testCanCommitBeforeReadyFailure");
2117
2118             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2119             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2120
2121             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2122         }};
2123     }
2124
2125     @Test
2126     public void testAbortCurrentTransaction() throws Throwable {
2127         new ShardTestKit(getSystem()) {{
2128             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2129                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2130                     "testAbortCurrentTransaction");
2131
2132             waitUntilLeader(shard);
2133
2134             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2135
2136             final String transactionID1 = "tx1";
2137             final MutableCompositeModification modification1 = new MutableCompositeModification();
2138             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2139             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2140             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2141
2142             final String transactionID2 = "tx2";
2143             final MutableCompositeModification modification2 = new MutableCompositeModification();
2144             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2145             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2146
2147             final FiniteDuration duration = duration("5 seconds");
2148             final Timeout timeout = new Timeout(duration);
2149
2150             // Simulate the ForwardedReadyTransaction messages that would be sent
2151             // by the ShardTransaction.
2152
2153             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2154                     cohort1, modification1, true, false), getRef());
2155             expectMsgClass(duration, ReadyTransactionReply.class);
2156
2157             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2158                     cohort2, modification2, true, false), getRef());
2159             expectMsgClass(duration, ReadyTransactionReply.class);
2160
2161             // Send the CanCommitTransaction message for the first Tx.
2162
2163             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2164             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2165                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2166             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2167
2168             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2169             // processed after the first Tx completes.
2170
2171             final Future<Object> canCommitFuture = Patterns.ask(shard,
2172                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2173
2174             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2175             // Tx to proceed.
2176
2177             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2178             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2179
2180             // Wait for the 2nd Tx to complete the canCommit phase.
2181
2182             Await.ready(canCommitFuture, duration);
2183
2184             final InOrder inOrder = inOrder(cohort1, cohort2);
2185             inOrder.verify(cohort1).canCommit();
2186             inOrder.verify(cohort2).canCommit();
2187
2188             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2189         }};
2190     }
2191
2192     @Test
2193     public void testAbortQueuedTransaction() throws Throwable {
2194         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2195         new ShardTestKit(getSystem()) {{
2196             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2197             @SuppressWarnings("serial")
2198             final Creator<Shard> creator = new Creator<Shard>() {
2199                 @Override
2200                 public Shard create() throws Exception {
2201                     return new Shard(newShardBuilder()) {
2202                         @Override
2203                         public void onReceiveCommand(final Object message) throws Exception {
2204                             super.onReceiveCommand(message);
2205                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2206                                 if(cleaupCheckLatch.get() != null) {
2207                                     cleaupCheckLatch.get().countDown();
2208                                 }
2209                             }
2210                         }
2211                     };
2212                 }
2213             };
2214
2215             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2216                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2217                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2218
2219             waitUntilLeader(shard);
2220
2221             final String transactionID = "tx1";
2222
2223             final MutableCompositeModification modification = new MutableCompositeModification();
2224             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2225             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2226
2227             final FiniteDuration duration = duration("5 seconds");
2228
2229             // Ready the tx.
2230
2231             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2232                     cohort, modification, true, false), getRef());
2233             expectMsgClass(duration, ReadyTransactionReply.class);
2234
2235             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2236
2237             // Send the AbortTransaction message.
2238
2239             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2240             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2241
2242             verify(cohort).abort();
2243
2244             // Verify the tx cohort is removed from queue at the cleanup check interval.
2245
2246             cleaupCheckLatch.set(new CountDownLatch(1));
2247             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2248                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2249
2250             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2251
2252             // Now send CanCommitTransaction - should fail.
2253
2254             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2255
2256             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2257             assertTrue("Failure type", failure instanceof IllegalStateException);
2258
2259             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2260         }};
2261     }
2262
2263     @Test
2264     public void testCreateSnapshot() throws Exception {
2265         testCreateSnapshot(true, "testCreateSnapshot");
2266     }
2267
2268     @Test
2269     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2270         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2271     }
2272
2273     @SuppressWarnings("serial")
2274     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2275
2276         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2277
2278         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2279         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2280             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2281                 super(delegate);
2282             }
2283
2284             @Override
2285             public void saveSnapshot(final Object o) {
2286                 savedSnapshot.set(o);
2287                 super.saveSnapshot(o);
2288             }
2289         }
2290
2291         dataStoreContextBuilder.persistent(persistent);
2292
2293         new ShardTestKit(getSystem()) {{
2294             class TestShard extends Shard {
2295
2296                 protected TestShard(AbstractBuilder<?, ?> builder) {
2297                     super(builder);
2298                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2299                 }
2300
2301                 @Override
2302                 public void handleCommand(final Object message) {
2303                     super.handleCommand(message);
2304
2305                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2306                         latch.get().countDown();
2307                     }
2308                 }
2309
2310                 @Override
2311                 public RaftActorContext getRaftActorContext() {
2312                     return super.getRaftActorContext();
2313                 }
2314             }
2315
2316             final Creator<Shard> creator = new Creator<Shard>() {
2317                 @Override
2318                 public Shard create() throws Exception {
2319                     return new TestShard(newShardBuilder());
2320                 }
2321             };
2322
2323             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2324                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2325
2326             waitUntilLeader(shard);
2327             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2328
2329             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2330
2331             // Trigger creation of a snapshot by ensuring
2332             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2333             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2334             awaitAndValidateSnapshot(expectedRoot);
2335
2336             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2337             awaitAndValidateSnapshot(expectedRoot);
2338
2339             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2340         }
2341
2342             private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
2343                                               ) throws InterruptedException {
2344                 System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
2345                 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2346
2347                 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2348                         savedSnapshot.get() instanceof Snapshot);
2349
2350                 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2351
2352                 latch.set(new CountDownLatch(1));
2353                 savedSnapshot.set(null);
2354             }
2355
2356             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2357
2358                 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2359                 assertEquals("Root node", expectedRoot, actual);
2360
2361            }
2362         };
2363     }
2364
2365     /**
2366      * This test simply verifies that the applySnapShot logic will work
2367      * @throws ReadFailedException
2368      * @throws DataValidationFailedException
2369      */
2370     @Test
2371     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2372         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2373         store.setSchemaContext(SCHEMA_CONTEXT);
2374
2375         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2376         putTransaction.write(TestModel.TEST_PATH,
2377             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2378         commitTransaction(store, putTransaction);
2379
2380
2381         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2382
2383         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2384
2385         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2386         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2387
2388         commitTransaction(store, writeTransaction);
2389
2390         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2391
2392         assertEquals(expected, actual);
2393     }
2394
2395     @Test
2396     public void testRecoveryApplicable(){
2397
2398         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2399                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2400
2401         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2402                 schemaContext(SCHEMA_CONTEXT).props();
2403
2404         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2405                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2406
2407         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2408                 schemaContext(SCHEMA_CONTEXT).props();
2409
2410         new ShardTestKit(getSystem()) {{
2411             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2412                     persistentProps, "testPersistence1");
2413
2414             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2415
2416             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2417
2418             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2419                     nonPersistentProps, "testPersistence2");
2420
2421             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2422
2423             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2424
2425         }};
2426
2427     }
2428
2429     @Test
2430     public void testOnDatastoreContext() {
2431         new ShardTestKit(getSystem()) {{
2432             dataStoreContextBuilder.persistent(true);
2433
2434             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2435
2436             assertEquals("isRecoveryApplicable", true,
2437                     shard.underlyingActor().persistence().isRecoveryApplicable());
2438
2439             waitUntilLeader(shard);
2440
2441             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2442
2443             assertEquals("isRecoveryApplicable", false,
2444                 shard.underlyingActor().persistence().isRecoveryApplicable());
2445
2446             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2447
2448             assertEquals("isRecoveryApplicable", true,
2449                 shard.underlyingActor().persistence().isRecoveryApplicable());
2450
2451             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2452         }};
2453     }
2454
2455     @Test
2456     public void testRegisterRoleChangeListener() throws Exception {
2457         new ShardTestKit(getSystem()) {
2458             {
2459                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2460                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2461                         "testRegisterRoleChangeListener");
2462
2463                 waitUntilLeader(shard);
2464
2465                 final TestActorRef<MessageCollectorActor> listener =
2466                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2467
2468                 shard.tell(new RegisterRoleChangeListener(), listener);
2469
2470                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2471
2472                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2473                     ShardLeaderStateChanged.class);
2474                 assertEquals("getLocalShardDataTree present", true,
2475                         leaderStateChanged.getLocalShardDataTree().isPresent());
2476                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2477                     leaderStateChanged.getLocalShardDataTree().get());
2478
2479                 MessageCollectorActor.clearMessages(listener);
2480
2481                 // Force a leader change
2482
2483                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2484
2485                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2486                         ShardLeaderStateChanged.class);
2487                 assertEquals("getLocalShardDataTree present", false,
2488                         leaderStateChanged.getLocalShardDataTree().isPresent());
2489
2490                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2491             }
2492         };
2493     }
2494
2495     @Test
2496     public void testFollowerInitialSyncStatus() throws Exception {
2497         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2498                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2499                 "testFollowerInitialSyncStatus");
2500
2501         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2502
2503         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2504
2505         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2506
2507         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2508
2509         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2510     }
2511
2512     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2513         modification.ready();
2514         store.validate(modification);
2515         store.commit(store.prepare(modification));
2516     }
2517
2518     @Test
2519     public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
2520         new ShardTestKit(getSystem()) {{
2521             dataStoreContextBuilder.persistent(false);
2522             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
2523             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
2524             final Creator<Shard> creator = new Creator<Shard>() {
2525                 private static final long serialVersionUID = 1L;
2526                 boolean firstElectionTimeout = true;
2527
2528                 @Override
2529                 public Shard create() throws Exception {
2530                     return new Shard(newShardBuilder()) {
2531                         @Override
2532                         public void onReceiveCommand(final Object message) throws Exception {
2533                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
2534                                 firstElectionTimeout = false;
2535                                 final ActorRef self = getSelf();
2536                                 new Thread() {
2537                                     @Override
2538                                     public void run() {
2539                                         Uninterruptibles.awaitUninterruptibly(
2540                                             onChangeListenerRegistered, 5, TimeUnit.SECONDS);
2541                                         self.tell(message, self);
2542                                     }
2543                                 }.start();
2544
2545                                 onFirstElectionTimeout.countDown();
2546                             } else {
2547                                 super.onReceiveCommand(message);
2548                             }
2549                         }
2550                     };
2551                 }
2552             };
2553
2554             final MockDataChangeListener listener = new MockDataChangeListener(1);
2555             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2556                 "testDataChangeListenerOnFollower-DataChangeListener");
2557
2558             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2559                 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
2560                     withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
2561
2562             assertEquals("Got first ElectionTimeout", true,
2563                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
2564
2565             shard.tell(new FindLeader(), getRef());
2566             final FindLeaderReply findLeadeReply =
2567                 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2568             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
2569
2570             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2571
2572             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2573             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2574                 RegisterChangeListenerReply.class);
2575             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2576
2577             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2578
2579             onChangeListenerRegistered.countDown();
2580
2581             listener.waitForChangeEvents();
2582
2583             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2584             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2585         }};
2586     }
2587
2588     @Test
2589     public void testClusteredDataChangeListernerRegistration() throws Exception {
2590         dataStoreContextBuilder.persistent(false).build();
2591         new ShardTestKit(getSystem()) {{
2592             final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
2593                 .shardName("inventory").type("config").build();
2594
2595             final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
2596                 .shardName("inventory").type("config").build();
2597             final Creator<Shard> followerShardCreator = new Creator<Shard>() {
2598                 private static final long serialVersionUID = 1L;
2599
2600                 @Override
2601                 public Shard create() throws Exception {
2602                     return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()).
2603                             peerAddresses(Collections.singletonMap(member2ShardID.toString(),
2604                                     "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {
2605                         @Override
2606                         public void onReceiveCommand(final Object message) throws Exception {
2607
2608                             if(!(message instanceof ElectionTimeout)) {
2609                                 super.onReceiveCommand(message);
2610                             }
2611                         }
2612                     };
2613                 }
2614             };
2615
2616             final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
2617                 private static final long serialVersionUID = 1L;
2618
2619                 @Override
2620                 public Shard create() throws Exception {
2621                     return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()).
2622                             peerAddresses(Collections.singletonMap(member1ShardID.toString(),
2623                                     "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {};
2624                 }
2625             };
2626
2627
2628             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2629                 Props.create(new DelegatingShardCreator(followerShardCreator)),
2630                 member1ShardID.toString());
2631
2632             final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
2633                 Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
2634                 member2ShardID.toString());
2635             // Sleep to let election happen
2636             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
2637
2638             shard.tell(new FindLeader(), getRef());
2639             final FindLeaderReply findLeaderReply =
2640                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2641             assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
2642
2643             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2644             final MockDataChangeListener listener = new MockDataChangeListener(1);
2645             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2646                 "testDataChangeListenerOnFollower-DataChangeListener");
2647
2648             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2649             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2650                 RegisterChangeListenerReply.class);
2651             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2652
2653             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2654
2655             listener.waitForChangeEvents();
2656
2657             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2658             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2659         }};
2660     }
2661 }