BUG-5280: centralize ShardSnapshot operations
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.Props;
26 import akka.actor.Status.Failure;
27 import akka.dispatch.Dispatchers;
28 import akka.dispatch.OnComplete;
29 import akka.japi.Creator;
30 import akka.pattern.Patterns;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Function;
35 import com.google.common.base.Stopwatch;
36 import com.google.common.util.concurrent.Futures;
37 import com.google.common.util.concurrent.ListenableFuture;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.io.IOException;
40 import java.util.Collections;
41 import java.util.HashSet;
42 import java.util.Set;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.Test;
48 import org.mockito.InOrder;
49 import org.opendaylight.controller.cluster.DataPersistenceProvider;
50 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
51 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
55 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
56 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
57 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
58 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
60 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
62 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
65 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
75 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
76 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
77 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
78 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
79 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
80 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
81 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
82 import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
83 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
84 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
85 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
87 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
88 import org.opendaylight.controller.cluster.raft.RaftActorContext;
89 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
90 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
91 import org.opendaylight.controller.cluster.raft.Snapshot;
92 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
93 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
94 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
95 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
96 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
97 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
98 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
99 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
100 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
101 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
102 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
103 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
104 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
105 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
106 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
107 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
108 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
120 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
121 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
122 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
123 import scala.concurrent.Await;
124 import scala.concurrent.Future;
125 import scala.concurrent.duration.FiniteDuration;
126
127 public class ShardTest extends AbstractShardTest {
128     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
129
130     @Test
131     public void testRegisterChangeListener() throws Exception {
132         new ShardTestKit(getSystem()) {{
133             final TestActorRef<Shard> shard = actorFactory.createTestActor(
134                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterChangeListener");
135
136             waitUntilLeader(shard);
137
138             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
139
140             final MockDataChangeListener listener = new MockDataChangeListener(1);
141             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
142                     "testRegisterChangeListener-DataChangeListener");
143
144             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
145                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
146
147             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
148                     RegisterChangeListenerReply.class);
149             final String replyPath = reply.getListenerRegistrationPath().toString();
150             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
151                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
152
153             final YangInstanceIdentifier path = TestModel.TEST_PATH;
154             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
155
156             listener.waitForChangeEvents(path);
157         }};
158     }
159
160     @SuppressWarnings("serial")
161     @Test
162     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
163         // This test tests the timing window in which a change listener is registered before the
164         // shard becomes the leader. We verify that the listener is registered and notified of the
165         // existing data when the shard becomes the leader.
166         new ShardTestKit(getSystem()) {{
167             // For this test, we want to send the RegisterChangeListener message after the shard
168             // has recovered from persistence and before it becomes the leader. So we subclass
169             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
170             // we know that the shard has been initialized to a follower and has started the
171             // election process. The following 2 CountDownLatches are used to coordinate the
172             // ElectionTimeout with the sending of the RegisterChangeListener message.
173             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
174             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
175             final Creator<Shard> creator = new Creator<Shard>() {
176                 boolean firstElectionTimeout = true;
177
178                 @Override
179                 public Shard create() throws Exception {
180                     // Use a non persistent provider because this test actually invokes persist on the journal
181                     // this will cause all other messages to not be queued properly after that.
182                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
183                     // it does do a persist)
184                     return new Shard(newShardBuilder()) {
185                         @Override
186                         public void handleCommand(final Object message) {
187                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
188                                 // Got the first ElectionTimeout. We don't forward it to the
189                                 // base Shard yet until we've sent the RegisterChangeListener
190                                 // message. So we signal the onFirstElectionTimeout latch to tell
191                                 // the main thread to send the RegisterChangeListener message and
192                                 // start a thread to wait on the onChangeListenerRegistered latch,
193                                 // which the main thread signals after it has sent the message.
194                                 // After the onChangeListenerRegistered is triggered, we send the
195                                 // original ElectionTimeout message to proceed with the election.
196                                 firstElectionTimeout = false;
197                                 final ActorRef self = getSelf();
198                                 new Thread() {
199                                     @Override
200                                     public void run() {
201                                         Uninterruptibles.awaitUninterruptibly(
202                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
203                                         self.tell(message, self);
204                                     }
205                                 }.start();
206
207                                 onFirstElectionTimeout.countDown();
208                             } else {
209                                 super.handleCommand(message);
210                             }
211                         }
212                     };
213                 }
214             };
215
216             setupInMemorySnapshotStore();
217
218             final MockDataChangeListener listener = new MockDataChangeListener(1);
219             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
220                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
221
222             final TestActorRef<Shard> shard = actorFactory.createTestActor(
223                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
224                     "testRegisterChangeListenerWhenNotLeaderInitially");
225
226             final YangInstanceIdentifier path = TestModel.TEST_PATH;
227
228             // Wait until the shard receives the first ElectionTimeout message.
229             assertEquals("Got first ElectionTimeout", true,
230                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
231
232             // Now send the RegisterChangeListener and wait for the reply.
233             shard.tell(new RegisterChangeListener(path, dclActor,
234                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
235
236             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
237                     RegisterChangeListenerReply.class);
238             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
239
240             // Sanity check - verify the shard is not the leader yet.
241             shard.tell(FindLeader.INSTANCE, getRef());
242             final FindLeaderReply findLeadeReply =
243                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
244             assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
245
246             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
247             // with the election process.
248             onChangeListenerRegistered.countDown();
249
250             // Wait for the shard to become the leader and notify our listener with the existing
251             // data in the store.
252             listener.waitForChangeEvents(path);
253         }};
254     }
255
256     @Test
257     public void testRegisterDataTreeChangeListener() throws Exception {
258         new ShardTestKit(getSystem()) {{
259             final TestActorRef<Shard> shard = actorFactory.createTestActor(
260                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterDataTreeChangeListener");
261
262             waitUntilLeader(shard);
263
264             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
265
266             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
267             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
268                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
269
270             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
271
272             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
273                     RegisterDataTreeChangeListenerReply.class);
274             final String replyPath = reply.getListenerRegistrationPath().toString();
275             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
276                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
277
278             final YangInstanceIdentifier path = TestModel.TEST_PATH;
279             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
280
281             listener.waitForChangeEvents();
282         }};
283     }
284
285     @SuppressWarnings("serial")
286     @Test
287     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
288         new ShardTestKit(getSystem()) {{
289             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
290             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
291             final Creator<Shard> creator = new Creator<Shard>() {
292                 boolean firstElectionTimeout = true;
293
294                 @Override
295                 public Shard create() throws Exception {
296                     return new Shard(newShardBuilder()) {
297                         @Override
298                         public void handleCommand(final Object message) {
299                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
300                                 firstElectionTimeout = false;
301                                 final ActorRef self = getSelf();
302                                 new Thread() {
303                                     @Override
304                                     public void run() {
305                                         Uninterruptibles.awaitUninterruptibly(
306                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
307                                         self.tell(message, self);
308                                     }
309                                 }.start();
310
311                                 onFirstElectionTimeout.countDown();
312                             } else {
313                                 super.handleCommand(message);
314                             }
315                         }
316                     };
317                 }
318             };
319
320             setupInMemorySnapshotStore();
321
322             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
323             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
324                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
325
326             final TestActorRef<Shard> shard = actorFactory.createTestActor(
327                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
328                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
329
330             final YangInstanceIdentifier path = TestModel.TEST_PATH;
331
332             assertEquals("Got first ElectionTimeout", true,
333                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
334
335             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
336             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
337                 RegisterDataTreeChangeListenerReply.class);
338             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
339
340             shard.tell(FindLeader.INSTANCE, getRef());
341             final FindLeaderReply findLeadeReply =
342                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
343             assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
344
345
346             onChangeListenerRegistered.countDown();
347
348             // TODO: investigate why we do not receive data chage events
349             listener.waitForChangeEvents();
350         }};
351     }
352
353     @Test
354     public void testCreateTransaction(){
355         new ShardTestKit(getSystem()) {{
356             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
357
358             waitUntilLeader(shard);
359
360             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
361
362             shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
363                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
364
365             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
366                     CreateTransactionReply.class);
367
368             final String path = reply.getTransactionPath().toString();
369             assertTrue("Unexpected transaction path " + path,
370                     path.startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
371         }};
372     }
373
374     @Test
375     public void testCreateTransactionOnChain(){
376         new ShardTestKit(getSystem()) {{
377             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
378
379             waitUntilLeader(shard);
380
381             shard.tell(new CreateTransaction(nextTransactionId(),TransactionType.READ_ONLY.ordinal(),
382                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
383
384             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
385                     CreateTransactionReply.class);
386
387             final String path = reply.getTransactionPath().toString();
388             assertTrue("Unexpected transaction path " + path,
389                     path.startsWith("akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
390         }};
391     }
392
393     @Test
394     public void testPeerAddressResolved() throws Exception {
395         new ShardTestKit(getSystem()) {{
396             ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config");
397             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
398                     peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
399                         withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
400
401             final String address = "akka://foobar";
402             shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
403
404             shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
405             OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
406             assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
407         }};
408     }
409
410     @Test
411     public void testApplySnapshot() throws Exception {
412
413         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
414
415         ShardTestKit.waitUntilLeader(shard);
416
417         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
418         store.setSchemaContext(SCHEMA_CONTEXT);
419
420         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
421                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
422                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
423                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
424
425         writeToStore(store, TestModel.TEST_PATH, container);
426
427         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
428         final NormalizedNode<?,?> expected = readStore(store, root);
429
430         final Snapshot snapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(expected).serialize(),
431                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
432
433         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
434
435         final NormalizedNode<?,?> actual = readStore(shard, root);
436
437         assertEquals("Root node", expected, actual);
438     }
439
440     @Test
441     public void testApplyState() throws Exception {
442         final TestActorRef<Shard> shard = actorFactory.createTestActor(
443                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
444
445         ShardTestKit.waitUntilLeader(shard);
446
447         final DataTree source = setupInMemorySnapshotStore();
448         final DataTreeModification writeMod = source.takeSnapshot().newModification();
449         ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
450         writeMod.write(TestModel.TEST_PATH, node);
451         writeMod.ready();
452
453         final TransactionIdentifier tx = nextTransactionId();
454         final ApplyState applyState = new ApplyState(null, tx,
455             new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod, tx)));
456
457         shard.tell(applyState, shard);
458
459         Stopwatch sw = Stopwatch.createStarted();
460         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
461             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
462
463             final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
464             if(actual != null) {
465                 assertEquals("Applied state", node, actual);
466                 return;
467             }
468         }
469
470         fail("State was not applied");
471     }
472
473     @Test
474     public void testDataTreeCandidateRecovery() throws Exception {
475         // Set up the InMemorySnapshotStore.
476         final DataTree source = setupInMemorySnapshotStore();
477
478         final DataTreeModification writeMod = source.takeSnapshot().newModification();
479         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
480         writeMod.ready();
481         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
482
483         // Set up the InMemoryJournal.
484         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1,
485             payloadForModification(source, writeMod, nextTransactionId())));
486
487         final int nListEntries = 16;
488         final Set<Integer> listEntryKeys = new HashSet<>();
489
490         // Add some ModificationPayload entries
491         for (int i = 1; i <= nListEntries; i++) {
492             listEntryKeys.add(Integer.valueOf(i));
493
494             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
495                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
496
497             final DataTreeModification mod = source.takeSnapshot().newModification();
498             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
499             mod.ready();
500
501             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
502                 payloadForModification(source, mod, nextTransactionId())));
503         }
504
505         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
506             new ApplyJournalEntries(nListEntries));
507
508         testRecovery(listEntryKeys);
509     }
510
511     @Test
512     public void testConcurrentThreePhaseCommits() throws Throwable {
513         new ShardTestKit(getSystem()) {{
514             final TestActorRef<Shard> shard = actorFactory.createTestActor(
515                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
516                     "testConcurrentThreePhaseCommits");
517
518             waitUntilLeader(shard);
519
520             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
521
522             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
523
524             final TransactionIdentifier transactionID1 = nextTransactionId();
525             final MutableCompositeModification modification1 = new MutableCompositeModification();
526             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
527                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
528
529             final TransactionIdentifier transactionID2 = nextTransactionId();
530             final MutableCompositeModification modification2 = new MutableCompositeModification();
531             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
532                     TestModel.OUTER_LIST_PATH,
533                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
534                     modification2);
535
536             final TransactionIdentifier transactionID3 = nextTransactionId();
537             final MutableCompositeModification modification3 = new MutableCompositeModification();
538             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
539                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
540                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
541                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
542                     modification3);
543
544             final long timeoutSec = 5;
545             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
546             final Timeout timeout = new Timeout(duration);
547
548             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
549             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
550                     expectMsgClass(duration, ReadyTransactionReply.class));
551             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
552
553             // Send the CanCommitTransaction message for the first Tx.
554
555             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
556             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
557                     expectMsgClass(duration, CanCommitTransactionReply.class));
558             assertEquals("Can commit", true, canCommitReply.getCanCommit());
559
560             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
561             expectMsgClass(duration, ReadyTransactionReply.class);
562
563             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
564             expectMsgClass(duration, ReadyTransactionReply.class);
565
566             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
567             // processed after the first Tx completes.
568
569             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
570                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
571
572             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
573                     new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
574
575             // Send the CommitTransaction message for the first Tx. After it completes, it should
576             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
577
578             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
579             expectMsgClass(duration, CommitTransactionReply.class);
580
581             // Wait for the next 2 Tx's to complete.
582
583             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
584             final CountDownLatch commitLatch = new CountDownLatch(2);
585
586             class OnFutureComplete extends OnComplete<Object> {
587                 private final Class<?> expRespType;
588
589                 OnFutureComplete(final Class<?> expRespType) {
590                     this.expRespType = expRespType;
591                 }
592
593                 @Override
594                 public void onComplete(final Throwable error, final Object resp) {
595                     if(error != null) {
596                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
597                     } else {
598                         try {
599                             assertEquals("Commit response type", expRespType, resp.getClass());
600                             onSuccess(resp);
601                         } catch (final Exception e) {
602                             caughtEx.set(e);
603                         }
604                     }
605                 }
606
607                 void onSuccess(final Object resp) throws Exception {
608                 }
609             }
610
611             class OnCommitFutureComplete extends OnFutureComplete {
612                 OnCommitFutureComplete() {
613                     super(CommitTransactionReply.class);
614                 }
615
616                 @Override
617                 public void onComplete(final Throwable error, final Object resp) {
618                     super.onComplete(error, resp);
619                     commitLatch.countDown();
620                 }
621             }
622
623             class OnCanCommitFutureComplete extends OnFutureComplete {
624                 private final TransactionIdentifier transactionID;
625
626                 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
627                     super(CanCommitTransactionReply.class);
628                     this.transactionID = transactionID;
629                 }
630
631                 @Override
632                 void onSuccess(final Object resp) throws Exception {
633                     final CanCommitTransactionReply canCommitReply =
634                             CanCommitTransactionReply.fromSerializable(resp);
635                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
636
637                     final Future<Object> commitFuture = Patterns.ask(shard,
638                             new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
639                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
640                 }
641             }
642
643             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
644                     getSystem().dispatcher());
645
646             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
647                     getSystem().dispatcher());
648
649             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
650
651             if(caughtEx.get() != null) {
652                 throw caughtEx.get();
653             }
654
655             assertEquals("Commits complete", true, done);
656
657             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
658             inOrder.verify(cohort1).canCommit();
659             inOrder.verify(cohort1).preCommit();
660             inOrder.verify(cohort1).commit();
661             inOrder.verify(cohort2).canCommit();
662             inOrder.verify(cohort2).preCommit();
663             inOrder.verify(cohort2).commit();
664             inOrder.verify(cohort3).canCommit();
665             inOrder.verify(cohort3).preCommit();
666             inOrder.verify(cohort3).commit();
667
668             // Verify data in the data store.
669
670             verifyOuterListEntry(shard, 1);
671
672             verifyLastApplied(shard, 2);
673         }};
674     }
675
676     @Test
677     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
678         new ShardTestKit(getSystem()) {{
679             final TestActorRef<Shard> shard = actorFactory.createTestActor(
680                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
681                     "testBatchedModificationsWithNoCommitOnReady");
682
683             waitUntilLeader(shard);
684
685             final TransactionIdentifier transactionID = nextTransactionId();
686             final FiniteDuration duration = duration("5 seconds");
687
688             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
689             final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
690                 if(mockCohort.get() == null) {
691                     mockCohort.set(createDelegatingMockCohort("cohort", actual));
692                 }
693
694                 return mockCohort.get();
695             };
696
697             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
698
699             // Send a BatchedModifications to start a transaction.
700
701             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
702                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
703             expectMsgClass(duration, BatchedModificationsReply.class);
704
705             // Send a couple more BatchedModifications.
706
707             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
708                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
709             expectMsgClass(duration, BatchedModificationsReply.class);
710
711             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
712                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
713                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
714             expectMsgClass(duration, ReadyTransactionReply.class);
715
716             // Send the CanCommitTransaction message.
717
718             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
719             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
720                     expectMsgClass(duration, CanCommitTransactionReply.class));
721             assertEquals("Can commit", true, canCommitReply.getCanCommit());
722
723             // Send the CanCommitTransaction message.
724
725             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
726             expectMsgClass(duration, CommitTransactionReply.class);
727
728             final InOrder inOrder = inOrder(mockCohort.get());
729             inOrder.verify(mockCohort.get()).canCommit();
730             inOrder.verify(mockCohort.get()).preCommit();
731             inOrder.verify(mockCohort.get()).commit();
732
733             // Verify data in the data store.
734
735             verifyOuterListEntry(shard, 1);
736         }};
737     }
738
739     @Test
740     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
741         new ShardTestKit(getSystem()) {{
742             final TestActorRef<Shard> shard = actorFactory.createTestActor(
743                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
744                     "testBatchedModificationsWithCommitOnReady");
745
746             waitUntilLeader(shard);
747
748             final TransactionIdentifier transactionID = nextTransactionId();
749             final FiniteDuration duration = duration("5 seconds");
750
751             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
752             final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
753                 if(mockCohort.get() == null) {
754                     mockCohort.set(createDelegatingMockCohort("cohort", actual));
755                 }
756
757                 return mockCohort.get();
758             };
759
760             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
761
762             // Send a BatchedModifications to start a transaction.
763
764             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
765                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
766             expectMsgClass(duration, BatchedModificationsReply.class);
767
768             // Send a couple more BatchedModifications.
769
770             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
771                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
772             expectMsgClass(duration, BatchedModificationsReply.class);
773
774             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
775                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
776                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
777
778             expectMsgClass(duration, CommitTransactionReply.class);
779
780             final InOrder inOrder = inOrder(mockCohort.get());
781             inOrder.verify(mockCohort.get()).canCommit();
782             inOrder.verify(mockCohort.get()).preCommit();
783             inOrder.verify(mockCohort.get()).commit();
784
785             // Verify data in the data store.
786
787             verifyOuterListEntry(shard, 1);
788         }};
789     }
790
791     @Test(expected=IllegalStateException.class)
792     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
793         new ShardTestKit(getSystem()) {{
794             final TestActorRef<Shard> shard = actorFactory.createTestActor(
795                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
796                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
797
798             waitUntilLeader(shard);
799
800             final TransactionIdentifier transactionID = nextTransactionId();
801             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
802             batched.setReady(true);
803             batched.setTotalMessagesSent(2);
804
805             shard.tell(batched, getRef());
806
807             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
808
809             if(failure != null) {
810                 throw failure.cause();
811             }
812         }};
813     }
814
815     @Test
816     public void testBatchedModificationsWithOperationFailure() throws Throwable {
817         new ShardTestKit(getSystem()) {{
818             final TestActorRef<Shard> shard = actorFactory.createTestActor(
819                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
820                     "testBatchedModificationsWithOperationFailure");
821
822             waitUntilLeader(shard);
823
824             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
825             // write will not validate the children for performance reasons.
826
827             TransactionIdentifier transactionID = nextTransactionId();
828
829             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
830                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
831                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
832
833             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
834             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
835             shard.tell(batched, getRef());
836             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
837
838             Throwable cause = failure.cause();
839
840             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
841             batched.setReady(true);
842             batched.setTotalMessagesSent(2);
843
844             shard.tell(batched, getRef());
845
846             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
847             assertEquals("Failure cause", cause, failure.cause());
848         }};
849     }
850
851     @Test
852     public void testBatchedModificationsOnTransactionChain() throws Throwable {
853         new ShardTestKit(getSystem()) {{
854             final TestActorRef<Shard> shard = actorFactory.createTestActor(
855                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
856                     "testBatchedModificationsOnTransactionChain");
857
858             waitUntilLeader(shard);
859
860             final LocalHistoryIdentifier historyId = nextHistoryId();
861             final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
862             final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
863
864             final FiniteDuration duration = duration("5 seconds");
865
866             // Send a BatchedModifications to start a chained write transaction and ready it.
867
868             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
869             final YangInstanceIdentifier path = TestModel.TEST_PATH;
870             shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), getRef());
871             expectMsgClass(duration, ReadyTransactionReply.class);
872
873             // Create a read Tx on the same chain.
874
875             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
876                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
877
878             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
879
880             getSystem().actorSelection(createReply.getTransactionPath()).tell(
881                     new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
882             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
883             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
884
885             // Commit the write transaction.
886
887             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
888             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
889                     expectMsgClass(duration, CanCommitTransactionReply.class));
890             assertEquals("Can commit", true, canCommitReply.getCanCommit());
891
892             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
893             expectMsgClass(duration, CommitTransactionReply.class);
894
895             // Verify data in the data store.
896
897             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
898             assertEquals("Stored node", containerNode, actualNode);
899         }};
900     }
901
902     @Test
903     public void testOnBatchedModificationsWhenNotLeader() {
904         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
905         new ShardTestKit(getSystem()) {{
906             final Creator<Shard> creator = new Creator<Shard>() {
907                 private static final long serialVersionUID = 1L;
908
909                 @Override
910                 public Shard create() throws Exception {
911                     return new Shard(newShardBuilder()) {
912                         @Override
913                         protected boolean isLeader() {
914                             return overrideLeaderCalls.get() ? false : super.isLeader();
915                         }
916
917                         @Override
918                         protected ActorSelection getLeader() {
919                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
920                                 super.getLeader();
921                         }
922                     };
923                 }
924             };
925
926             final TestActorRef<Shard> shard = actorFactory.createTestActor(
927                     Props.create(new DelegatingShardCreator(creator)).
928                         withDispatcher(Dispatchers.DefaultDispatcherId()), "testOnBatchedModificationsWhenNotLeader");
929
930             waitUntilLeader(shard);
931
932             overrideLeaderCalls.set(true);
933
934             final BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
935
936             shard.tell(batched, ActorRef.noSender());
937
938             expectMsgEquals(batched);
939         }};
940     }
941
942     @Test
943     public void testTransactionMessagesWithNoLeader() {
944         new ShardTestKit(getSystem()) {{
945             dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
946                 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
947             final TestActorRef<Shard> shard = actorFactory.createTestActor(
948                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
949                     "testTransactionMessagesWithNoLeader");
950
951             waitUntilNoLeader(shard);
952
953             final TransactionIdentifier txId = nextTransactionId();
954             shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef());
955             Failure failure = expectMsgClass(Failure.class);
956             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
957
958             shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), txId,
959                     DataStoreVersions.CURRENT_VERSION, true), getRef());
960             failure = expectMsgClass(Failure.class);
961             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
962
963             shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
964             failure = expectMsgClass(Failure.class);
965             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
966         }};
967     }
968
969     @Test
970     public void testReadyWithReadWriteImmediateCommit() throws Exception{
971         testReadyWithImmediateCommit(true);
972     }
973
974     @Test
975     public void testReadyWithWriteOnlyImmediateCommit() throws Exception{
976         testReadyWithImmediateCommit(false);
977     }
978
979     private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
980         new ShardTestKit(getSystem()) {{
981             final TestActorRef<Shard> shard = actorFactory.createTestActor(
982                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
983                     "testReadyWithImmediateCommit-" + readWrite);
984
985             waitUntilLeader(shard);
986
987             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
988
989             final TransactionIdentifier transactionID = nextTransactionId();
990             final MutableCompositeModification modification = new MutableCompositeModification();
991             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
992             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
993                     TestModel.TEST_PATH, containerNode, modification);
994
995             final FiniteDuration duration = duration("5 seconds");
996
997             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
998
999             expectMsgClass(duration, CommitTransactionReply.class);
1000
1001             final InOrder inOrder = inOrder(cohort);
1002             inOrder.verify(cohort).canCommit();
1003             inOrder.verify(cohort).preCommit();
1004             inOrder.verify(cohort).commit();
1005
1006             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1007             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1008         }};
1009     }
1010
1011     @Test
1012     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1013         new ShardTestKit(getSystem()) {{
1014             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1015                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1016                     "testReadyLocalTransactionWithImmediateCommit");
1017
1018             waitUntilLeader(shard);
1019
1020             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1021
1022             final DataTreeModification modification = dataStore.newModification();
1023
1024             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1025             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1026             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1027             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1028
1029             final TransactionIdentifier txId = nextTransactionId();
1030             modification.ready();
1031             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1032
1033             shard.tell(readyMessage, getRef());
1034
1035             expectMsgClass(CommitTransactionReply.class);
1036
1037             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1038             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1039         }};
1040     }
1041
1042     @Test
1043     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1044         new ShardTestKit(getSystem()) {{
1045             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1046                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1047                     "testReadyLocalTransactionWithThreePhaseCommit");
1048
1049             waitUntilLeader(shard);
1050
1051             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1052
1053             final DataTreeModification modification = dataStore.newModification();
1054
1055             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1056             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1057             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1058             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1059
1060             final TransactionIdentifier txId = nextTransactionId();
1061             modification.ready();
1062             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1063
1064             shard.tell(readyMessage, getRef());
1065
1066             expectMsgClass(ReadyTransactionReply.class);
1067
1068             // Send the CanCommitTransaction message.
1069
1070             shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1071             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1072                     expectMsgClass(CanCommitTransactionReply.class));
1073             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1074
1075             // Send the CanCommitTransaction message.
1076
1077             shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1078             expectMsgClass(CommitTransactionReply.class);
1079
1080             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1081             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1082         }};
1083     }
1084
1085     @Test
1086     public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
1087         testCommitWithPersistenceDisabled(true);
1088     }
1089
1090     @Test
1091     public void testWriteOnlyCommitWithPersistenceDisabled() throws Throwable {
1092         testCommitWithPersistenceDisabled(true);
1093     }
1094
1095     private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
1096         dataStoreContextBuilder.persistent(false);
1097         new ShardTestKit(getSystem()) {{
1098             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1099                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1100                     "testCommitWithPersistenceDisabled-" + readWrite);
1101
1102             waitUntilLeader(shard);
1103
1104             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1105
1106             // Setup a simulated transactions with a mock cohort.
1107
1108             final TransactionIdentifier transactionID = nextTransactionId();
1109             final MutableCompositeModification modification = new MutableCompositeModification();
1110             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1111             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1112                 TestModel.TEST_PATH, containerNode, modification);
1113
1114             final FiniteDuration duration = duration("5 seconds");
1115
1116             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1117             expectMsgClass(duration, ReadyTransactionReply.class);
1118
1119             // Send the CanCommitTransaction message.
1120
1121             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1122             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1123                     expectMsgClass(duration, CanCommitTransactionReply.class));
1124             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1125
1126             // Send the CanCommitTransaction message.
1127
1128             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1129             expectMsgClass(duration, CommitTransactionReply.class);
1130
1131             final InOrder inOrder = inOrder(cohort);
1132             inOrder.verify(cohort).canCommit();
1133             inOrder.verify(cohort).preCommit();
1134             inOrder.verify(cohort).commit();
1135
1136             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1137             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1138         }};
1139     }
1140
1141     @Test
1142     public void testReadWriteCommitWhenTransactionHasNoModifications() {
1143         testCommitWhenTransactionHasNoModifications(true);
1144     }
1145
1146     @Test
1147     public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
1148         testCommitWhenTransactionHasNoModifications(false);
1149     }
1150
1151     private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1152         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1153         // but here that need not happen
1154         new ShardTestKit(getSystem()) {
1155             {
1156                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1157                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1158                         "testCommitWhenTransactionHasNoModifications-" + readWrite);
1159
1160                 waitUntilLeader(shard);
1161
1162                 final TransactionIdentifier transactionID = nextTransactionId();
1163                 final MutableCompositeModification modification = new MutableCompositeModification();
1164                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1165                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1166                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1167                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1168                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1169
1170                 final FiniteDuration duration = duration("5 seconds");
1171
1172                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1173                 expectMsgClass(duration, ReadyTransactionReply.class);
1174
1175                 // Send the CanCommitTransaction message.
1176
1177                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1178                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1179                         expectMsgClass(duration, CanCommitTransactionReply.class));
1180                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1181
1182                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1183                 expectMsgClass(duration, CommitTransactionReply.class);
1184
1185                 final InOrder inOrder = inOrder(cohort);
1186                 inOrder.verify(cohort).canCommit();
1187                 inOrder.verify(cohort).preCommit();
1188                 inOrder.verify(cohort).commit();
1189
1190                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1191                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1192
1193                 // Use MBean for verification
1194                 // Committed transaction count should increase as usual
1195                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1196
1197                 // Commit index should not advance because this does not go into the journal
1198                 assertEquals(-1, shardStats.getCommitIndex());
1199             }
1200         };
1201     }
1202
1203     @Test
1204     public void testReadWriteCommitWhenTransactionHasModifications() {
1205         testCommitWhenTransactionHasModifications(true);
1206     }
1207
1208     @Test
1209     public void testWriteOnlyCommitWhenTransactionHasModifications() {
1210         testCommitWhenTransactionHasModifications(false);
1211     }
1212
1213     private void testCommitWhenTransactionHasModifications(final boolean readWrite){
1214         new ShardTestKit(getSystem()) {
1215             {
1216                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1217                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1218                         "testCommitWhenTransactionHasModifications-" + readWrite);
1219
1220                 waitUntilLeader(shard);
1221
1222                 final TransactionIdentifier transactionID = nextTransactionId();
1223                 final MutableCompositeModification modification = new MutableCompositeModification();
1224                 modification.addModification(new DeleteModification(YangInstanceIdentifier.EMPTY));
1225                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1226                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1227                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1228                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1229                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1230
1231                 final FiniteDuration duration = duration("5 seconds");
1232
1233                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1234                 expectMsgClass(duration, ReadyTransactionReply.class);
1235
1236                 // Send the CanCommitTransaction message.
1237
1238                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1239                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1240                         expectMsgClass(duration, CanCommitTransactionReply.class));
1241                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1242
1243                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1244                 expectMsgClass(duration, CommitTransactionReply.class);
1245
1246                 final InOrder inOrder = inOrder(cohort);
1247                 inOrder.verify(cohort).canCommit();
1248                 inOrder.verify(cohort).preCommit();
1249                 inOrder.verify(cohort).commit();
1250
1251                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1252                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1253
1254                 // Use MBean for verification
1255                 // Committed transaction count should increase as usual
1256                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1257
1258                 // Commit index should advance as we do not have an empty modification
1259                 assertEquals(0, shardStats.getCommitIndex());
1260             }
1261         };
1262     }
1263
1264     @Test
1265     public void testCommitPhaseFailure() throws Throwable {
1266         testCommitPhaseFailure(true);
1267         testCommitPhaseFailure(false);
1268     }
1269
1270     private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
1271         new ShardTestKit(getSystem()) {{
1272             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1273                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1274                     "testCommitPhaseFailure-" + readWrite);
1275
1276             waitUntilLeader(shard);
1277
1278             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1279             // commit phase.
1280
1281             final TransactionIdentifier transactionID1 = nextTransactionId();
1282             final MutableCompositeModification modification1 = new MutableCompositeModification();
1283             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1284             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1285             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1286             doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
1287             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1288
1289             final TransactionIdentifier transactionID2 = nextTransactionId();
1290             final MutableCompositeModification modification2 = new MutableCompositeModification();
1291             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1292             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1293
1294             final FiniteDuration duration = duration("5 seconds");
1295             final Timeout timeout = new Timeout(duration);
1296
1297             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1298             expectMsgClass(duration, ReadyTransactionReply.class);
1299
1300             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1301             expectMsgClass(duration, ReadyTransactionReply.class);
1302
1303             // Send the CanCommitTransaction message for the first Tx.
1304
1305             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1306             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1307                     expectMsgClass(duration, CanCommitTransactionReply.class));
1308             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1309
1310             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1311             // processed after the first Tx completes.
1312
1313             final Future<Object> canCommitFuture = Patterns.ask(shard,
1314                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1315
1316             // Send the CommitTransaction message for the first Tx. This should send back an error
1317             // and trigger the 2nd Tx to proceed.
1318
1319             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1320             expectMsgClass(duration, akka.actor.Status.Failure.class);
1321
1322             // Wait for the 2nd Tx to complete the canCommit phase.
1323
1324             final CountDownLatch latch = new CountDownLatch(1);
1325             canCommitFuture.onComplete(new OnComplete<Object>() {
1326                 @Override
1327                 public void onComplete(final Throwable t, final Object resp) {
1328                     latch.countDown();
1329                 }
1330             }, getSystem().dispatcher());
1331
1332             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1333
1334             final InOrder inOrder = inOrder(cohort1, cohort2);
1335             inOrder.verify(cohort1).canCommit();
1336             inOrder.verify(cohort1).preCommit();
1337             inOrder.verify(cohort1).commit();
1338             inOrder.verify(cohort2).canCommit();
1339         }};
1340     }
1341
1342     @Test
1343     public void testPreCommitPhaseFailure() throws Throwable {
1344         testPreCommitPhaseFailure(true);
1345         testPreCommitPhaseFailure(false);
1346     }
1347
1348     private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
1349         new ShardTestKit(getSystem()) {{
1350             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1351                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1352                     "testPreCommitPhaseFailure-" + readWrite);
1353
1354             waitUntilLeader(shard);
1355
1356             final TransactionIdentifier transactionID1 = nextTransactionId();
1357             final MutableCompositeModification modification1 = new MutableCompositeModification();
1358             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1359             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1360             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1361
1362             final TransactionIdentifier transactionID2 = nextTransactionId();
1363             final MutableCompositeModification modification2 = new MutableCompositeModification();
1364             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1365             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1366
1367             final FiniteDuration duration = duration("5 seconds");
1368             final Timeout timeout = new Timeout(duration);
1369
1370             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1371             expectMsgClass(duration, ReadyTransactionReply.class);
1372
1373             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1374             expectMsgClass(duration, ReadyTransactionReply.class);
1375
1376             // Send the CanCommitTransaction message for the first Tx.
1377
1378             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1379             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1380                 expectMsgClass(duration, CanCommitTransactionReply.class));
1381             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1382
1383             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1384             // processed after the first Tx completes.
1385
1386             final Future<Object> canCommitFuture = Patterns.ask(shard,
1387                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1388
1389             // Send the CommitTransaction message for the first Tx. This should send back an error
1390             // and trigger the 2nd Tx to proceed.
1391
1392             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1393             expectMsgClass(duration, akka.actor.Status.Failure.class);
1394
1395             // Wait for the 2nd Tx to complete the canCommit phase.
1396
1397             final CountDownLatch latch = new CountDownLatch(1);
1398             canCommitFuture.onComplete(new OnComplete<Object>() {
1399                 @Override
1400                 public void onComplete(final Throwable t, final Object resp) {
1401                     latch.countDown();
1402                 }
1403             }, getSystem().dispatcher());
1404
1405             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1406
1407             final InOrder inOrder = inOrder(cohort1, cohort2);
1408             inOrder.verify(cohort1).canCommit();
1409             inOrder.verify(cohort1).preCommit();
1410             inOrder.verify(cohort2).canCommit();
1411         }};
1412     }
1413
1414     @Test
1415     public void testCanCommitPhaseFailure() throws Throwable {
1416         testCanCommitPhaseFailure(true);
1417         testCanCommitPhaseFailure(false);
1418     }
1419
1420     private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1421         new ShardTestKit(getSystem()) {{
1422             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1423                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1424                     "testCanCommitPhaseFailure-" + readWrite);
1425
1426             waitUntilLeader(shard);
1427
1428             final FiniteDuration duration = duration("5 seconds");
1429
1430             final TransactionIdentifier transactionID1 = nextTransactionId();
1431             final MutableCompositeModification modification = new MutableCompositeModification();
1432             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1433             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1434
1435             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1436             expectMsgClass(duration, ReadyTransactionReply.class);
1437
1438             // Send the CanCommitTransaction message.
1439
1440             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1441             expectMsgClass(duration, akka.actor.Status.Failure.class);
1442
1443             // Send another can commit to ensure the failed one got cleaned up.
1444
1445             reset(cohort);
1446
1447             final TransactionIdentifier transactionID2 = nextTransactionId();
1448             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1449
1450             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1451             expectMsgClass(duration, ReadyTransactionReply.class);
1452
1453             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1454             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1455                     expectMsgClass(CanCommitTransactionReply.class));
1456             assertEquals("getCanCommit", true, reply.getCanCommit());
1457         }};
1458     }
1459
1460     @Test
1461     public void testCanCommitPhaseFalseResponse() throws Throwable {
1462         testCanCommitPhaseFalseResponse(true);
1463         testCanCommitPhaseFalseResponse(false);
1464     }
1465
1466     private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1467         new ShardTestKit(getSystem()) {{
1468             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1469                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1470                     "testCanCommitPhaseFalseResponse-" + readWrite);
1471
1472             waitUntilLeader(shard);
1473
1474             final FiniteDuration duration = duration("5 seconds");
1475
1476             final TransactionIdentifier transactionID1 = nextTransactionId();
1477             final MutableCompositeModification modification = new MutableCompositeModification();
1478             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1479             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1480
1481             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1482             expectMsgClass(duration, ReadyTransactionReply.class);
1483
1484             // Send the CanCommitTransaction message.
1485
1486             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1487             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1488                     expectMsgClass(CanCommitTransactionReply.class));
1489             assertEquals("getCanCommit", false, reply.getCanCommit());
1490
1491             // Send another can commit to ensure the failed one got cleaned up.
1492
1493             reset(cohort);
1494
1495             final TransactionIdentifier transactionID2 = nextTransactionId();
1496             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1497
1498             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1499             expectMsgClass(duration, ReadyTransactionReply.class);
1500
1501             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1502             reply = CanCommitTransactionReply.fromSerializable(
1503                     expectMsgClass(CanCommitTransactionReply.class));
1504             assertEquals("getCanCommit", true, reply.getCanCommit());
1505         }};
1506     }
1507
1508     @Test
1509     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1510         testImmediateCommitWithCanCommitPhaseFailure(true);
1511         testImmediateCommitWithCanCommitPhaseFailure(false);
1512     }
1513
1514     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1515         new ShardTestKit(getSystem()) {{
1516             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1517                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1518                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1519
1520             waitUntilLeader(shard);
1521
1522             final FiniteDuration duration = duration("5 seconds");
1523
1524             final TransactionIdentifier transactionID1 = nextTransactionId();
1525             final MutableCompositeModification modification = new MutableCompositeModification();
1526             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1527             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1528
1529             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1530
1531             expectMsgClass(duration, akka.actor.Status.Failure.class);
1532
1533             // Send another can commit to ensure the failed one got cleaned up.
1534
1535             reset(cohort);
1536
1537             final TransactionIdentifier transactionID2 = nextTransactionId();
1538             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1539             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1540             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1541             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1542             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1543             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1544             doReturn(candidateRoot).when(candidate).getRootNode();
1545             doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
1546             doReturn(candidate).when(cohort).getCandidate();
1547
1548             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1549
1550             expectMsgClass(duration, CommitTransactionReply.class);
1551         }};
1552     }
1553
1554     @Test
1555     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1556         testImmediateCommitWithCanCommitPhaseFalseResponse(true);
1557         testImmediateCommitWithCanCommitPhaseFalseResponse(false);
1558     }
1559
1560     private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1561         new ShardTestKit(getSystem()) {{
1562             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1563                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1564                     "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
1565
1566             waitUntilLeader(shard);
1567
1568             final FiniteDuration duration = duration("5 seconds");
1569
1570             final TransactionIdentifier transactionID1 = nextTransactionId();
1571             final MutableCompositeModification modification = new MutableCompositeModification();
1572             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1573             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1574
1575             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1576
1577             expectMsgClass(duration, akka.actor.Status.Failure.class);
1578
1579             // Send another can commit to ensure the failed one got cleaned up.
1580
1581             reset(cohort);
1582
1583             final TransactionIdentifier transactionID2 = nextTransactionId();
1584             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1585             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1586             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1587             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1588             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1589             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1590             doReturn(candidateRoot).when(candidate).getRootNode();
1591             doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
1592             doReturn(candidate).when(cohort).getCandidate();
1593
1594             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1595
1596             expectMsgClass(duration, CommitTransactionReply.class);
1597         }};
1598     }
1599
1600     @Test
1601     public void testAbortBeforeFinishCommit() throws Throwable {
1602         testAbortBeforeFinishCommit(true);
1603         testAbortBeforeFinishCommit(false);
1604     }
1605
1606     private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
1607         new ShardTestKit(getSystem()) {{
1608             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1609                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1610                     "testAbortBeforeFinishCommit-" + readWrite);
1611
1612             waitUntilLeader(shard);
1613
1614             final FiniteDuration duration = duration("5 seconds");
1615             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1616
1617             final TransactionIdentifier transactionID = nextTransactionId();
1618             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1619                           cohort -> {
1620                 final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1621
1622                 // Simulate an AbortTransaction message occurring during replication, after
1623                 // persisting and before finishing the commit to the in-memory store.
1624                 // We have no followers so due to optimizations in the RaftActor, it does not
1625                 // attempt replication and thus we can't send an AbortTransaction message b/c
1626                 // it would be processed too late after CommitTransaction completes. So we'll
1627                 // simulate an AbortTransaction message occurring during replication by calling
1628                 // the shard directly.
1629                 //
1630                 shard.underlyingActor().doAbortTransaction(transactionID, null);
1631
1632                 return preCommitFuture;
1633             };
1634
1635             final MutableCompositeModification modification = new MutableCompositeModification();
1636             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1637                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1638                     modification, preCommit);
1639
1640             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1641             expectMsgClass(duration, ReadyTransactionReply.class);
1642
1643             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1644             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1645                 expectMsgClass(duration, CanCommitTransactionReply.class));
1646             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1647
1648             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1649             expectMsgClass(duration, CommitTransactionReply.class);
1650
1651             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1652
1653             // Since we're simulating an abort occurring during replication and before finish commit,
1654             // the data should still get written to the in-memory store since we've gotten past
1655             // canCommit and preCommit and persisted the data.
1656             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1657         }};
1658     }
1659
1660     @Test
1661     public void testTransactionCommitTimeout() throws Throwable {
1662         testTransactionCommitTimeout(true);
1663         testTransactionCommitTimeout(false);
1664     }
1665
1666     private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
1667         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1668
1669         new ShardTestKit(getSystem()) {{
1670             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1671                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1672                     "testTransactionCommitTimeout-" + readWrite);
1673
1674             waitUntilLeader(shard);
1675
1676             final FiniteDuration duration = duration("5 seconds");
1677
1678             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1679
1680             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1681             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1682                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1683
1684             // Create 1st Tx - will timeout
1685
1686             final TransactionIdentifier transactionID1 = nextTransactionId();
1687             final MutableCompositeModification modification1 = new MutableCompositeModification();
1688             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1689                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1690                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1691                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1692                     modification1);
1693
1694             // Create 2nd Tx
1695
1696             final TransactionIdentifier transactionID2 = nextTransactionId();
1697             final MutableCompositeModification modification2 = new MutableCompositeModification();
1698             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1699                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1700             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1701                     listNodePath,
1702                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1703                     modification2);
1704
1705             // Ready the Tx's
1706
1707             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1708             expectMsgClass(duration, ReadyTransactionReply.class);
1709
1710             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1711             expectMsgClass(duration, ReadyTransactionReply.class);
1712
1713             // canCommit 1st Tx. We don't send the commit so it should timeout.
1714
1715             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1716             expectMsgClass(duration, CanCommitTransactionReply.class);
1717
1718             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1719
1720             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1721             expectMsgClass(duration, CanCommitTransactionReply.class);
1722
1723             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1724
1725             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1726             expectMsgClass(duration, akka.actor.Status.Failure.class);
1727
1728             // Commit the 2nd Tx.
1729
1730             shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1731             expectMsgClass(duration, CommitTransactionReply.class);
1732
1733             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1734             assertNotNull(listNodePath + " not found", node);
1735         }};
1736     }
1737
1738     @Test
1739     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1740         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1741
1742         new ShardTestKit(getSystem()) {{
1743             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1744                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1745                     "testTransactionCommitQueueCapacityExceeded");
1746
1747             waitUntilLeader(shard);
1748
1749             final FiniteDuration duration = duration("5 seconds");
1750
1751             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1752
1753             final TransactionIdentifier transactionID1 = nextTransactionId();
1754             final MutableCompositeModification modification1 = new MutableCompositeModification();
1755             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1756                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1757
1758             final TransactionIdentifier transactionID2 = nextTransactionId();
1759             final MutableCompositeModification modification2 = new MutableCompositeModification();
1760             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1761                     TestModel.OUTER_LIST_PATH,
1762                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1763                     modification2);
1764
1765             final TransactionIdentifier transactionID3 = nextTransactionId();
1766             final MutableCompositeModification modification3 = new MutableCompositeModification();
1767             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1768                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1769
1770             // Ready the Tx's
1771
1772             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1773             expectMsgClass(duration, ReadyTransactionReply.class);
1774
1775             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1776             expectMsgClass(duration, ReadyTransactionReply.class);
1777
1778             // The 3rd Tx should exceed queue capacity and fail.
1779
1780             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1781             expectMsgClass(duration, akka.actor.Status.Failure.class);
1782
1783             // canCommit 1st Tx.
1784
1785             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1786             expectMsgClass(duration, CanCommitTransactionReply.class);
1787
1788             // canCommit the 2nd Tx - it should get queued.
1789
1790             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1791
1792             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1793
1794             shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1795             expectMsgClass(duration, akka.actor.Status.Failure.class);
1796         }};
1797     }
1798
1799     @Test
1800     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1801         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1802
1803         new ShardTestKit(getSystem()) {{
1804             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1805                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1806                     "testTransactionCommitWithPriorExpiredCohortEntries");
1807
1808             waitUntilLeader(shard);
1809
1810             final FiniteDuration duration = duration("5 seconds");
1811
1812             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1813
1814             final TransactionIdentifier transactionID1 = nextTransactionId();
1815             final MutableCompositeModification modification1 = new MutableCompositeModification();
1816             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1817                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1818
1819             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1820             expectMsgClass(duration, ReadyTransactionReply.class);
1821
1822             final TransactionIdentifier transactionID2 = nextTransactionId();
1823             final MutableCompositeModification modification2 = new MutableCompositeModification();
1824             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1825                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1826
1827             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1828             expectMsgClass(duration, ReadyTransactionReply.class);
1829
1830             final TransactionIdentifier transactionID3 = nextTransactionId();
1831             final MutableCompositeModification modification3 = new MutableCompositeModification();
1832             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1833                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1834
1835             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1836             expectMsgClass(duration, ReadyTransactionReply.class);
1837
1838             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1839             // should expire from the queue and the last one should be processed.
1840
1841             shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1842             expectMsgClass(duration, CanCommitTransactionReply.class);
1843         }};
1844     }
1845
1846     @Test
1847     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1848         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1849
1850         new ShardTestKit(getSystem()) {{
1851             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1852                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1853                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
1854
1855             waitUntilLeader(shard);
1856
1857             final FiniteDuration duration = duration("5 seconds");
1858
1859             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1860
1861             final TransactionIdentifier transactionID1 = nextTransactionId();
1862             final MutableCompositeModification modification1 = new MutableCompositeModification();
1863             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1864                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1865
1866             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1867             expectMsgClass(duration, ReadyTransactionReply.class);
1868
1869             // CanCommit the first one so it's the current in-progress CohortEntry.
1870
1871             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1872             expectMsgClass(duration, CanCommitTransactionReply.class);
1873
1874             // Ready the second Tx.
1875
1876             final TransactionIdentifier transactionID2 = nextTransactionId();
1877             final MutableCompositeModification modification2 = new MutableCompositeModification();
1878             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1879                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1880
1881             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1882             expectMsgClass(duration, ReadyTransactionReply.class);
1883
1884             // Ready the third Tx.
1885
1886             final TransactionIdentifier transactionID3 = nextTransactionId();
1887             final DataTreeModification modification3 = dataStore.newModification();
1888             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1889                     .apply(modification3);
1890                 modification3.ready();
1891             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1892
1893             shard.tell(readyMessage, getRef());
1894
1895             // Commit the first Tx. After completing, the second should expire from the queue and the third
1896             // Tx committed.
1897
1898             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1899             expectMsgClass(duration, CommitTransactionReply.class);
1900
1901             // Expect commit reply from the third Tx.
1902
1903             expectMsgClass(duration, CommitTransactionReply.class);
1904
1905             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1906             assertNotNull(TestModel.TEST2_PATH + " not found", node);
1907         }};
1908     }
1909
1910     @Test
1911     public void testCanCommitBeforeReadyFailure() throws Throwable {
1912         new ShardTestKit(getSystem()) {{
1913             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1914                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1915                     "testCanCommitBeforeReadyFailure");
1916
1917             shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
1918             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1919         }};
1920     }
1921
1922     @Test
1923     public void testAbortCurrentTransaction() throws Throwable {
1924         testAbortCurrentTransaction(true);
1925         testAbortCurrentTransaction(false);
1926     }
1927
1928     private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
1929         new ShardTestKit(getSystem()) {{
1930             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1931                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1932                     "testAbortCurrentTransaction-" + readWrite);
1933
1934             waitUntilLeader(shard);
1935
1936             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1937
1938             final TransactionIdentifier transactionID1 = nextTransactionId();
1939             final MutableCompositeModification modification1 = new MutableCompositeModification();
1940             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1941             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1942             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1943
1944             final TransactionIdentifier transactionID2 = nextTransactionId();
1945             final MutableCompositeModification modification2 = new MutableCompositeModification();
1946             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1947             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1948
1949             final FiniteDuration duration = duration("5 seconds");
1950             final Timeout timeout = new Timeout(duration);
1951
1952             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1953             expectMsgClass(duration, ReadyTransactionReply.class);
1954
1955             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1956             expectMsgClass(duration, ReadyTransactionReply.class);
1957
1958             // Send the CanCommitTransaction message for the first Tx.
1959
1960             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1961             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1962                     expectMsgClass(duration, CanCommitTransactionReply.class));
1963             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1964
1965             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1966             // processed after the first Tx completes.
1967
1968             final Future<Object> canCommitFuture = Patterns.ask(shard,
1969                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1970
1971             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1972             // Tx to proceed.
1973
1974             shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1975             expectMsgClass(duration, AbortTransactionReply.class);
1976
1977             // Wait for the 2nd Tx to complete the canCommit phase.
1978
1979             Await.ready(canCommitFuture, duration);
1980
1981             final InOrder inOrder = inOrder(cohort1, cohort2);
1982             inOrder.verify(cohort1).canCommit();
1983             inOrder.verify(cohort2).canCommit();
1984         }};
1985     }
1986
1987     @Test
1988     public void testAbortQueuedTransaction() throws Throwable {
1989         testAbortQueuedTransaction(true);
1990         testAbortQueuedTransaction(false);
1991     }
1992
1993     private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
1994         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1995         new ShardTestKit(getSystem()) {{
1996             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
1997             @SuppressWarnings("serial")
1998             final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1999                 @Override
2000                 public void handleCommand(final Object message) {
2001                     super.handleCommand(message);
2002                     if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
2003                         if(cleaupCheckLatch.get() != null) {
2004                             cleaupCheckLatch.get().countDown();
2005                         }
2006                     }
2007                 }
2008             };
2009
2010             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2011                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2012                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
2013
2014             waitUntilLeader(shard);
2015
2016             final TransactionIdentifier transactionID = nextTransactionId();
2017             final MutableCompositeModification modification = new MutableCompositeModification();
2018             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2019             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2020
2021             final FiniteDuration duration = duration("5 seconds");
2022
2023             // Ready the tx.
2024
2025             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
2026             expectMsgClass(duration, ReadyTransactionReply.class);
2027
2028             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2029
2030             // Send the AbortTransaction message.
2031
2032             shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
2033             expectMsgClass(duration, AbortTransactionReply.class);
2034
2035             verify(cohort).abort();
2036
2037             // Verify the tx cohort is removed from queue at the cleanup check interval.
2038
2039             cleaupCheckLatch.set(new CountDownLatch(1));
2040             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2041                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2042
2043             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2044
2045             // Now send CanCommitTransaction - should fail.
2046
2047             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
2048
2049             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2050             assertTrue("Failure type", failure instanceof IllegalStateException);
2051         }};
2052     }
2053
2054     @Test
2055     public void testCreateSnapshot() throws Exception {
2056         testCreateSnapshot(true, "testCreateSnapshot");
2057     }
2058
2059     @Test
2060     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2061         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2062     }
2063
2064     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2065
2066         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2067
2068         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2069         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2070             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2071                 super(delegate);
2072             }
2073
2074             @Override
2075             public void saveSnapshot(final Object o) {
2076                 savedSnapshot.set(o);
2077                 super.saveSnapshot(o);
2078             }
2079         }
2080
2081         dataStoreContextBuilder.persistent(persistent);
2082
2083         new ShardTestKit(getSystem()) {{
2084             class TestShard extends Shard {
2085
2086                 protected TestShard(AbstractBuilder<?, ?> builder) {
2087                     super(builder);
2088                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2089                 }
2090
2091                 @Override
2092                 public void handleCommand(final Object message) {
2093                     super.handleCommand(message);
2094
2095                     // XXX:  commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
2096                     if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
2097                         latch.get().countDown();
2098                     }
2099                 }
2100
2101                 @Override
2102                 public RaftActorContext getRaftActorContext() {
2103                     return super.getRaftActorContext();
2104                 }
2105             }
2106
2107             final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
2108
2109             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2110                     Props.create(new DelegatingShardCreator(creator)).
2111                         withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName);
2112
2113             waitUntilLeader(shard);
2114             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2115
2116             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
2117
2118             // Trigger creation of a snapshot by ensuring
2119             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2120             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2121             awaitAndValidateSnapshot(expectedRoot);
2122
2123             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2124             awaitAndValidateSnapshot(expectedRoot);
2125         }
2126
2127         private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
2128             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2129
2130             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2131                     savedSnapshot.get() instanceof Snapshot);
2132
2133             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2134
2135             latch.set(new CountDownLatch(1));
2136             savedSnapshot.set(null);
2137         }
2138
2139         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) throws IOException {
2140             final NormalizedNode<?, ?> actual = ShardDataTreeSnapshot.deserialize(snapshot.getState()).getRootNode().get();
2141             assertEquals("Root node", expectedRoot, actual);
2142         }};
2143     }
2144
2145     /**
2146      * This test simply verifies that the applySnapShot logic will work
2147      * @throws ReadFailedException
2148      * @throws DataValidationFailedException
2149      */
2150     @Test
2151     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2152         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2153         store.setSchemaContext(SCHEMA_CONTEXT);
2154
2155         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2156         putTransaction.write(TestModel.TEST_PATH,
2157             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2158         commitTransaction(store, putTransaction);
2159
2160
2161         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
2162
2163         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2164
2165         writeTransaction.delete(YangInstanceIdentifier.EMPTY);
2166         writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
2167
2168         commitTransaction(store, writeTransaction);
2169
2170         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
2171
2172         assertEquals(expected, actual);
2173     }
2174
2175     @Test
2176     public void testRecoveryApplicable(){
2177
2178         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2179                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2180
2181         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2182                 schemaContext(SCHEMA_CONTEXT).props();
2183
2184         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2185                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2186
2187         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2188                 schemaContext(SCHEMA_CONTEXT).props();
2189
2190         new ShardTestKit(getSystem()) {{
2191             final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
2192
2193             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2194
2195             final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
2196
2197             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2198         }};
2199     }
2200
2201     @Test
2202     public void testOnDatastoreContext() {
2203         new ShardTestKit(getSystem()) {{
2204             dataStoreContextBuilder.persistent(true);
2205
2206             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
2207
2208             assertEquals("isRecoveryApplicable", true,
2209                     shard.underlyingActor().persistence().isRecoveryApplicable());
2210
2211             waitUntilLeader(shard);
2212
2213             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2214
2215             assertEquals("isRecoveryApplicable", false,
2216                 shard.underlyingActor().persistence().isRecoveryApplicable());
2217
2218             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2219
2220             assertEquals("isRecoveryApplicable", true,
2221                 shard.underlyingActor().persistence().isRecoveryApplicable());
2222         }};
2223     }
2224
2225     @Test
2226     public void testRegisterRoleChangeListener() throws Exception {
2227         new ShardTestKit(getSystem()) {
2228             {
2229                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2230                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2231                         "testRegisterRoleChangeListener");
2232
2233                 waitUntilLeader(shard);
2234
2235                 final TestActorRef<MessageCollectorActor> listener =
2236                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2237
2238                 shard.tell(new RegisterRoleChangeListener(), listener);
2239
2240                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2241
2242                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2243                     ShardLeaderStateChanged.class);
2244                 assertEquals("getLocalShardDataTree present", true,
2245                         leaderStateChanged.getLocalShardDataTree().isPresent());
2246                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2247                     leaderStateChanged.getLocalShardDataTree().get());
2248
2249                 MessageCollectorActor.clearMessages(listener);
2250
2251                 // Force a leader change
2252
2253                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2254
2255                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2256                         ShardLeaderStateChanged.class);
2257                 assertEquals("getLocalShardDataTree present", false,
2258                         leaderStateChanged.getLocalShardDataTree().isPresent());
2259             }
2260         };
2261     }
2262
2263     @Test
2264     public void testFollowerInitialSyncStatus() throws Exception {
2265         final TestActorRef<Shard> shard = actorFactory.createTestActor(
2266                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2267                 "testFollowerInitialSyncStatus");
2268
2269         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2270
2271         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2272
2273         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2274
2275         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2276     }
2277
2278     @Test
2279     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2280         new ShardTestKit(getSystem()) {{
2281             String testName = "testClusteredDataChangeListenerDelayedRegistration";
2282             dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2283                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2284
2285             final MockDataChangeListener listener = new MockDataChangeListener(1);
2286             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2287                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2288
2289             setupInMemorySnapshotStore();
2290
2291             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2292                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2293                     actorFactory.generateActorId(testName + "-shard"));
2294
2295             waitUntilNoLeader(shard);
2296
2297             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2298
2299             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2300             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2301                 RegisterChangeListenerReply.class);
2302             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2303
2304             shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2305                     customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2306
2307             listener.waitForChangeEvents();
2308         }};
2309     }
2310
2311     @Test
2312     public void testClusteredDataChangeListenerRegistration() throws Exception {
2313         new ShardTestKit(getSystem()) {{
2314             String testName = "testClusteredDataChangeListenerRegistration";
2315             final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2316                     MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2317
2318             final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2319                     MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2320
2321             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2322                     Shard.builder().id(followerShardID).
2323                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2324                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2325                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2326                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2327
2328             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2329                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2330                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2331                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2332                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2333
2334             leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
2335             String leaderPath = waitUntilLeader(followerShard);
2336             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2337
2338             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2339             final MockDataChangeListener listener = new MockDataChangeListener(1);
2340             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2341                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2342
2343             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2344             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2345                 RegisterChangeListenerReply.class);
2346             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2347
2348             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2349
2350             listener.waitForChangeEvents();
2351         }};
2352     }
2353
2354     @Test
2355     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2356         new ShardTestKit(getSystem()) {{
2357             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2358             dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2359                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2360
2361             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2362             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2363                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2364
2365             setupInMemorySnapshotStore();
2366
2367             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2368                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2369                     actorFactory.generateActorId(testName + "-shard"));
2370
2371             waitUntilNoLeader(shard);
2372
2373             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2374
2375             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2376             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2377                     RegisterDataTreeChangeListenerReply.class);
2378             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2379
2380             shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2381                     customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2382
2383             listener.waitForChangeEvents();
2384         }};
2385     }
2386
2387     @Test
2388     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2389         new ShardTestKit(getSystem()) {{
2390             String testName = "testClusteredDataTreeChangeListenerRegistration";
2391             final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2392                     MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2393
2394             final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2395                     MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2396
2397             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2398                     Shard.builder().id(followerShardID).
2399                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2400                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2401                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2402                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2403
2404             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2405                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2406                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2407                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2408                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2409
2410             leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
2411             String leaderPath = waitUntilLeader(followerShard);
2412             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2413
2414             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2415             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2416             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2417                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2418
2419             followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2420             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2421                     RegisterDataTreeChangeListenerReply.class);
2422             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2423
2424             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2425
2426             listener.waitForChangeEvents();
2427         }};
2428     }
2429
2430     @Test
2431     public void testServerRemoved() throws Exception {
2432         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
2433
2434         final ActorRef shard = parent.underlyingActor().context().actorOf(
2435                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2436                 "testServerRemoved");
2437
2438         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2439
2440         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2441     }
2442 }