Fix license header violations in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashSet;
43 import java.util.Map;
44 import java.util.Set;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicReference;
49 import org.junit.Test;
50 import org.mockito.InOrder;
51 import org.opendaylight.controller.cluster.DataPersistenceProvider;
52 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
62 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
76 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
77 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.Modification;
79 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
80 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
81 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
84 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
87 import org.opendaylight.controller.cluster.raft.RaftActorContext;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
89 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
90 import org.opendaylight.controller.cluster.raft.Snapshot;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
92 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
93 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
94 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
96 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
97 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
98 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
99 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
100 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
101 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
102 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
103 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
104 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
105 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
106 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
107 import org.opendaylight.yangtools.yang.common.QName;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
123 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
124 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
125 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
126 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
127 import scala.concurrent.Await;
128 import scala.concurrent.Future;
129 import scala.concurrent.duration.FiniteDuration;
130
131 public class ShardTest extends AbstractShardTest {
132     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
133
134     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
135
136     @Test
137     public void testRegisterChangeListener() throws Exception {
138         new ShardTestKit(getSystem()) {{
139             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
140                     newShardProps(),  "testRegisterChangeListener");
141
142             waitUntilLeader(shard);
143
144             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
145
146             final MockDataChangeListener listener = new MockDataChangeListener(1);
147             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
148                     "testRegisterChangeListener-DataChangeListener");
149
150             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
151                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
152
153             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
154                     RegisterChangeListenerReply.class);
155             final String replyPath = reply.getListenerRegistrationPath().toString();
156             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
157                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
158
159             final YangInstanceIdentifier path = TestModel.TEST_PATH;
160             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
161
162             listener.waitForChangeEvents(path);
163
164             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
166         }};
167     }
168
169     @SuppressWarnings("serial")
170     @Test
171     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
172         // This test tests the timing window in which a change listener is registered before the
173         // shard becomes the leader. We verify that the listener is registered and notified of the
174         // existing data when the shard becomes the leader.
175         new ShardTestKit(getSystem()) {{
176             // For this test, we want to send the RegisterChangeListener message after the shard
177             // has recovered from persistence and before it becomes the leader. So we subclass
178             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
179             // we know that the shard has been initialized to a follower and has started the
180             // election process. The following 2 CountDownLatches are used to coordinate the
181             // ElectionTimeout with the sending of the RegisterChangeListener message.
182             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
183             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
184             final Creator<Shard> creator = new Creator<Shard>() {
185                 boolean firstElectionTimeout = true;
186
187                 @Override
188                 public Shard create() throws Exception {
189                     // Use a non persistent provider because this test actually invokes persist on the journal
190                     // this will cause all other messages to not be queued properly after that.
191                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
192                     // it does do a persist)
193                     return new Shard(shardID, Collections.<String,String>emptyMap(),
194                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
195                         @Override
196                         public void onReceiveCommand(final Object message) throws Exception {
197                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
198                                 // Got the first ElectionTimeout. We don't forward it to the
199                                 // base Shard yet until we've sent the RegisterChangeListener
200                                 // message. So we signal the onFirstElectionTimeout latch to tell
201                                 // the main thread to send the RegisterChangeListener message and
202                                 // start a thread to wait on the onChangeListenerRegistered latch,
203                                 // which the main thread signals after it has sent the message.
204                                 // After the onChangeListenerRegistered is triggered, we send the
205                                 // original ElectionTimeout message to proceed with the election.
206                                 firstElectionTimeout = false;
207                                 final ActorRef self = getSelf();
208                                 new Thread() {
209                                     @Override
210                                     public void run() {
211                                         Uninterruptibles.awaitUninterruptibly(
212                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
213                                         self.tell(message, self);
214                                     }
215                                 }.start();
216
217                                 onFirstElectionTimeout.countDown();
218                             } else {
219                                 super.onReceiveCommand(message);
220                             }
221                         }
222                     };
223                 }
224             };
225
226             final MockDataChangeListener listener = new MockDataChangeListener(1);
227             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
228                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
229
230             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
231                     Props.create(new DelegatingShardCreator(creator)),
232                     "testRegisterChangeListenerWhenNotLeaderInitially");
233
234             // Write initial data into the in-memory store.
235             final YangInstanceIdentifier path = TestModel.TEST_PATH;
236             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
237
238             // Wait until the shard receives the first ElectionTimeout message.
239             assertEquals("Got first ElectionTimeout", true,
240                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
241
242             // Now send the RegisterChangeListener and wait for the reply.
243             shard.tell(new RegisterChangeListener(path, dclActor,
244                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
245
246             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
247                     RegisterChangeListenerReply.class);
248             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
249
250             // Sanity check - verify the shard is not the leader yet.
251             shard.tell(new FindLeader(), getRef());
252             final FindLeaderReply findLeadeReply =
253                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
254             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
255
256             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
257             // with the election process.
258             onChangeListenerRegistered.countDown();
259
260             // Wait for the shard to become the leader and notify our listener with the existing
261             // data in the store.
262             listener.waitForChangeEvents(path);
263
264             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
265             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
266         }};
267     }
268
269     @Test
270     public void testRegisterDataTreeChangeListener() throws Exception {
271         new ShardTestKit(getSystem()) {{
272             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
273                     newShardProps(), "testRegisterDataTreeChangeListener");
274
275             waitUntilLeader(shard);
276
277             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
278
279             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
280             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
281                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
282
283             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
284
285             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
286                     RegisterDataTreeChangeListenerReply.class);
287             final String replyPath = reply.getListenerRegistrationPath().toString();
288             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
289                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
290
291             final YangInstanceIdentifier path = TestModel.TEST_PATH;
292             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
293
294             listener.waitForChangeEvents();
295
296             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
297             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
298         }};
299     }
300
301     @SuppressWarnings("serial")
302     @Test
303     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
304         new ShardTestKit(getSystem()) {{
305             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
306             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
307             final Creator<Shard> creator = new Creator<Shard>() {
308                 boolean firstElectionTimeout = true;
309
310                 @Override
311                 public Shard create() throws Exception {
312                     return new Shard(shardID, Collections.<String,String>emptyMap(),
313                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
314                         @Override
315                         public void onReceiveCommand(final Object message) throws Exception {
316                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
317                                 firstElectionTimeout = false;
318                                 final ActorRef self = getSelf();
319                                 new Thread() {
320                                     @Override
321                                     public void run() {
322                                         Uninterruptibles.awaitUninterruptibly(
323                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
324                                         self.tell(message, self);
325                                     }
326                                 }.start();
327
328                                 onFirstElectionTimeout.countDown();
329                             } else {
330                                 super.onReceiveCommand(message);
331                             }
332                         }
333                     };
334                 }
335             };
336
337             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
338             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
339                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
340
341             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
342                     Props.create(new DelegatingShardCreator(creator)),
343                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
344
345             final YangInstanceIdentifier path = TestModel.TEST_PATH;
346             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
347
348             assertEquals("Got first ElectionTimeout", true,
349                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
350
351             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
352             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
353                     RegisterDataTreeChangeListenerReply.class);
354             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
355
356             shard.tell(new FindLeader(), getRef());
357             final FindLeaderReply findLeadeReply =
358                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
359             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
360
361             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
362
363             onChangeListenerRegistered.countDown();
364
365             // TODO: investigate why we do not receive data chage events
366             listener.waitForChangeEvents();
367
368             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
369             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
370         }};
371     }
372
373     @Test
374     public void testCreateTransaction(){
375         new ShardTestKit(getSystem()) {{
376             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
377
378             waitUntilLeader(shard);
379
380             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
381
382             shard.tell(new CreateTransaction("txn-1",
383                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
384
385             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
386                     CreateTransactionReply.class);
387
388             final String path = reply.getTransactionActorPath().toString();
389             assertTrue("Unexpected transaction path " + path,
390                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
391
392             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
393         }};
394     }
395
396     @Test
397     public void testCreateTransactionOnChain(){
398         new ShardTestKit(getSystem()) {{
399             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
400
401             waitUntilLeader(shard);
402
403             shard.tell(new CreateTransaction("txn-1",
404                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
405                     getRef());
406
407             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
408                     CreateTransactionReply.class);
409
410             final String path = reply.getTransactionActorPath().toString();
411             assertTrue("Unexpected transaction path " + path,
412                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
413
414             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
415         }};
416     }
417
418     @SuppressWarnings("serial")
419     @Test
420     public void testPeerAddressResolved() throws Exception {
421         new ShardTestKit(getSystem()) {{
422             final CountDownLatch recoveryComplete = new CountDownLatch(1);
423             class TestShard extends Shard {
424                 TestShard() {
425                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
426                             newDatastoreContext(), SCHEMA_CONTEXT);
427                 }
428
429                 Map<String, String> getPeerAddresses() {
430                     return getRaftActorContext().getPeerAddresses();
431                 }
432
433                 @Override
434                 protected void onRecoveryComplete() {
435                     try {
436                         super.onRecoveryComplete();
437                     } finally {
438                         recoveryComplete.countDown();
439                     }
440                 }
441             }
442
443             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
444                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
445                         @Override
446                         public TestShard create() throws Exception {
447                             return new TestShard();
448                         }
449                     })), "testPeerAddressResolved");
450
451             //waitUntilLeader(shard);
452             assertEquals("Recovery complete", true,
453                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
454
455             final String address = "akka://foobar";
456             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
457
458             assertEquals("getPeerAddresses", address,
459                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
460
461             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
462         }};
463     }
464
465     @Test
466     public void testApplySnapshot() throws Exception {
467
468         ShardTestKit testkit = new ShardTestKit(getSystem());
469
470         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
471                 "testApplySnapshot");
472
473         testkit.waitUntilLeader(shard);
474
475         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
476         store.setSchemaContext(SCHEMA_CONTEXT);
477
478         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
479                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
480                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
481                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
482
483         writeToStore(store, TestModel.TEST_PATH, container);
484
485         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
486         final NormalizedNode<?,?> expected = readStore(store, root);
487
488         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
489                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
490
491         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
492
493         final NormalizedNode<?,?> actual = readStore(shard, root);
494
495         assertEquals("Root node", expected, actual);
496
497         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
498     }
499
500     @Test
501     public void testApplyState() throws Exception {
502
503         ShardTestKit testkit = new ShardTestKit(getSystem());
504
505         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
506
507         testkit.waitUntilLeader(shard);
508
509         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
510
511         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
512                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
513
514         shard.underlyingActor().onReceiveCommand(applyState);
515
516         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
517         assertEquals("Applied state", node, actual);
518
519         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
520     }
521
522     @Test
523     public void testApplyStateWithCandidatePayload() throws Exception {
524
525         ShardTestKit testkit = new ShardTestKit(getSystem());
526
527         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
528
529         testkit.waitUntilLeader(shard);
530
531         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
532         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
533
534         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
535                 DataTreeCandidatePayload.create(candidate)));
536
537         shard.underlyingActor().onReceiveCommand(applyState);
538
539         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
540         assertEquals("Applied state", node, actual);
541
542         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
543     }
544
545     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
546         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
547         testStore.setSchemaContext(SCHEMA_CONTEXT);
548
549         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
550
551         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
552
553         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
554                 SerializationUtils.serializeNormalizedNode(root),
555                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
556         return testStore;
557     }
558
559     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
560         source.validate(mod);
561         final DataTreeCandidate candidate = source.prepare(mod);
562         source.commit(candidate);
563         return DataTreeCandidatePayload.create(candidate);
564     }
565
566     @Test
567     public void testDataTreeCandidateRecovery() throws Exception {
568         // Set up the InMemorySnapshotStore.
569         final DataTree source = setupInMemorySnapshotStore();
570
571         final DataTreeModification writeMod = source.takeSnapshot().newModification();
572         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
573         writeMod.ready();
574         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
575
576         // Set up the InMemoryJournal.
577         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
578
579         final int nListEntries = 16;
580         final Set<Integer> listEntryKeys = new HashSet<>();
581
582         // Add some ModificationPayload entries
583         for (int i = 1; i <= nListEntries; i++) {
584             listEntryKeys.add(Integer.valueOf(i));
585
586             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
587                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
588
589             final DataTreeModification mod = source.takeSnapshot().newModification();
590             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
591             mod.ready();
592             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
593                 payloadForModification(source, mod)));
594         }
595
596         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
597                 new ApplyJournalEntries(nListEntries));
598
599         testRecovery(listEntryKeys);
600     }
601
602     @Test
603     public void testModicationRecovery() throws Exception {
604
605         // Set up the InMemorySnapshotStore.
606         setupInMemorySnapshotStore();
607
608         // Set up the InMemoryJournal.
609
610         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
611
612         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
613                   new WriteModification(TestModel.OUTER_LIST_PATH,
614                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
615
616         final int nListEntries = 16;
617         final Set<Integer> listEntryKeys = new HashSet<>();
618
619         // Add some ModificationPayload entries
620         for(int i = 1; i <= nListEntries; i++) {
621             listEntryKeys.add(Integer.valueOf(i));
622             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
623                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
624             final Modification mod = new MergeModification(path,
625                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
626             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
627                     newModificationPayload(mod)));
628         }
629
630         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
631                 new ApplyJournalEntries(nListEntries));
632
633         testRecovery(listEntryKeys);
634     }
635
636     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
637         final MutableCompositeModification compMod = new MutableCompositeModification();
638         for(final Modification mod: mods) {
639             compMod.addModification(mod);
640         }
641
642         return new ModificationPayload(compMod);
643     }
644
645     @Test
646     public void testConcurrentThreePhaseCommits() throws Throwable {
647         new ShardTestKit(getSystem()) {{
648             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
649                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
650                     "testConcurrentThreePhaseCommits");
651
652             waitUntilLeader(shard);
653
654          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
655
656             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
657
658             final String transactionID1 = "tx1";
659             final MutableCompositeModification modification1 = new MutableCompositeModification();
660             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
661                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
662
663             final String transactionID2 = "tx2";
664             final MutableCompositeModification modification2 = new MutableCompositeModification();
665             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
666                     TestModel.OUTER_LIST_PATH,
667                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
668                     modification2);
669
670             final String transactionID3 = "tx3";
671             final MutableCompositeModification modification3 = new MutableCompositeModification();
672             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
673                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
674                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
675                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
676                     modification3);
677
678             final long timeoutSec = 5;
679             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
680             final Timeout timeout = new Timeout(duration);
681
682             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
683             // by the ShardTransaction.
684
685             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
686                     cohort1, modification1, true, false), getRef());
687             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
688                     expectMsgClass(duration, ReadyTransactionReply.class));
689             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
690
691             // Send the CanCommitTransaction message for the first Tx.
692
693             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
694             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
695                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
696             assertEquals("Can commit", true, canCommitReply.getCanCommit());
697
698             // Send the ForwardedReadyTransaction for the next 2 Tx's.
699
700             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
701                     cohort2, modification2, true, false), getRef());
702             expectMsgClass(duration, ReadyTransactionReply.class);
703
704             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
705                     cohort3, modification3, true, false), getRef());
706             expectMsgClass(duration, ReadyTransactionReply.class);
707
708             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
709             // processed after the first Tx completes.
710
711             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
712                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
713
714             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
715                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
716
717             // Send the CommitTransaction message for the first Tx. After it completes, it should
718             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
719
720             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
721             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
722
723             // Wait for the next 2 Tx's to complete.
724
725             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
726             final CountDownLatch commitLatch = new CountDownLatch(2);
727
728             class OnFutureComplete extends OnComplete<Object> {
729                 private final Class<?> expRespType;
730
731                 OnFutureComplete(final Class<?> expRespType) {
732                     this.expRespType = expRespType;
733                 }
734
735                 @Override
736                 public void onComplete(final Throwable error, final Object resp) {
737                     if(error != null) {
738                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
739                     } else {
740                         try {
741                             assertEquals("Commit response type", expRespType, resp.getClass());
742                             onSuccess(resp);
743                         } catch (final Exception e) {
744                             caughtEx.set(e);
745                         }
746                     }
747                 }
748
749                 void onSuccess(final Object resp) throws Exception {
750                 }
751             }
752
753             class OnCommitFutureComplete extends OnFutureComplete {
754                 OnCommitFutureComplete() {
755                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
756                 }
757
758                 @Override
759                 public void onComplete(final Throwable error, final Object resp) {
760                     super.onComplete(error, resp);
761                     commitLatch.countDown();
762                 }
763             }
764
765             class OnCanCommitFutureComplete extends OnFutureComplete {
766                 private final String transactionID;
767
768                 OnCanCommitFutureComplete(final String transactionID) {
769                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
770                     this.transactionID = transactionID;
771                 }
772
773                 @Override
774                 void onSuccess(final Object resp) throws Exception {
775                     final CanCommitTransactionReply canCommitReply =
776                             CanCommitTransactionReply.fromSerializable(resp);
777                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
778
779                     final Future<Object> commitFuture = Patterns.ask(shard,
780                             new CommitTransaction(transactionID).toSerializable(), timeout);
781                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
782                 }
783             }
784
785             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
786                     getSystem().dispatcher());
787
788             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
789                     getSystem().dispatcher());
790
791             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
792
793             if(caughtEx.get() != null) {
794                 throw caughtEx.get();
795             }
796
797             assertEquals("Commits complete", true, done);
798
799             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
800             inOrder.verify(cohort1).canCommit();
801             inOrder.verify(cohort1).preCommit();
802             inOrder.verify(cohort1).commit();
803             inOrder.verify(cohort2).canCommit();
804             inOrder.verify(cohort2).preCommit();
805             inOrder.verify(cohort2).commit();
806             inOrder.verify(cohort3).canCommit();
807             inOrder.verify(cohort3).preCommit();
808             inOrder.verify(cohort3).commit();
809
810             // Verify data in the data store.
811
812             verifyOuterListEntry(shard, 1);
813
814             verifyLastApplied(shard, 2);
815
816             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
817         }};
818     }
819
820     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
821             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
822         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
823     }
824
825     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
826             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
827             final int messagesSent) {
828         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
829         batched.addModification(new WriteModification(path, data));
830         batched.setReady(ready);
831         batched.setDoCommitOnReady(doCommitOnReady);
832         batched.setTotalMessagesSent(messagesSent);
833         return batched;
834     }
835
836     @Test
837     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
838         new ShardTestKit(getSystem()) {{
839             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
840                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
841                     "testBatchedModificationsWithNoCommitOnReady");
842
843             waitUntilLeader(shard);
844
845             final String transactionID = "tx";
846             final FiniteDuration duration = duration("5 seconds");
847
848             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
849             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
850                 @Override
851                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
852                     if(mockCohort.get() == null) {
853                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
854                     }
855
856                     return mockCohort.get();
857                 }
858             };
859
860             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
861
862             // Send a BatchedModifications to start a transaction.
863
864             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
865                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
866             expectMsgClass(duration, BatchedModificationsReply.class);
867
868             // Send a couple more BatchedModifications.
869
870             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
871                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
872             expectMsgClass(duration, BatchedModificationsReply.class);
873
874             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
875                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
876                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
877             expectMsgClass(duration, ReadyTransactionReply.class);
878
879             // Send the CanCommitTransaction message.
880
881             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
882             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
883                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
884             assertEquals("Can commit", true, canCommitReply.getCanCommit());
885
886             // Send the CanCommitTransaction message.
887
888             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
889             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
890
891             final InOrder inOrder = inOrder(mockCohort.get());
892             inOrder.verify(mockCohort.get()).canCommit();
893             inOrder.verify(mockCohort.get()).preCommit();
894             inOrder.verify(mockCohort.get()).commit();
895
896             // Verify data in the data store.
897
898             verifyOuterListEntry(shard, 1);
899
900             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
901         }};
902     }
903
904     @Test
905     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
906         new ShardTestKit(getSystem()) {{
907             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
908                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
909                     "testBatchedModificationsWithCommitOnReady");
910
911             waitUntilLeader(shard);
912
913             final String transactionID = "tx";
914             final FiniteDuration duration = duration("5 seconds");
915
916             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
917             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
918                 @Override
919                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
920                     if(mockCohort.get() == null) {
921                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
922                     }
923
924                     return mockCohort.get();
925                 }
926             };
927
928             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
929
930             // Send a BatchedModifications to start a transaction.
931
932             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
933                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
934             expectMsgClass(duration, BatchedModificationsReply.class);
935
936             // Send a couple more BatchedModifications.
937
938             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
939                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
940             expectMsgClass(duration, BatchedModificationsReply.class);
941
942             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
943                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
944                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
945
946             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
947
948             final InOrder inOrder = inOrder(mockCohort.get());
949             inOrder.verify(mockCohort.get()).canCommit();
950             inOrder.verify(mockCohort.get()).preCommit();
951             inOrder.verify(mockCohort.get()).commit();
952
953             // Verify data in the data store.
954
955             verifyOuterListEntry(shard, 1);
956
957             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
958         }};
959     }
960
961     @Test(expected=IllegalStateException.class)
962     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
963         new ShardTestKit(getSystem()) {{
964             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
965                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
966                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
967
968             waitUntilLeader(shard);
969
970             final String transactionID = "tx1";
971             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
972             batched.setReady(true);
973             batched.setTotalMessagesSent(2);
974
975             shard.tell(batched, getRef());
976
977             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
978
979             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
980
981             if(failure != null) {
982                 throw failure.cause();
983             }
984         }};
985     }
986
987     @Test
988     public void testBatchedModificationsWithOperationFailure() throws Throwable {
989         new ShardTestKit(getSystem()) {{
990             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
991                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
992                     "testBatchedModificationsWithOperationFailure");
993
994             waitUntilLeader(shard);
995
996             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
997             // write will not validate the children for performance reasons.
998
999             String transactionID = "tx1";
1000
1001             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1002                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1003                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1004
1005             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1006             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1007             shard.tell(batched, getRef());
1008             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1009
1010             Throwable cause = failure.cause();
1011
1012             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1013             batched.setReady(true);
1014             batched.setTotalMessagesSent(2);
1015
1016             shard.tell(batched, getRef());
1017
1018             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1019             assertEquals("Failure cause", cause, failure.cause());
1020
1021             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1022         }};
1023     }
1024
1025     @SuppressWarnings("unchecked")
1026     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1027         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1028         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1029         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1030                 outerList.getValue() instanceof Iterable);
1031         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1032         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1033                 entry instanceof MapEntryNode);
1034         final MapEntryNode mapEntry = (MapEntryNode)entry;
1035         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1036                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1037         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1038         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1039     }
1040
1041     @Test
1042     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1043         new ShardTestKit(getSystem()) {{
1044             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1045                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1046                     "testBatchedModificationsOnTransactionChain");
1047
1048             waitUntilLeader(shard);
1049
1050             final String transactionChainID = "txChain";
1051             final String transactionID1 = "tx1";
1052             final String transactionID2 = "tx2";
1053
1054             final FiniteDuration duration = duration("5 seconds");
1055
1056             // Send a BatchedModifications to start a chained write transaction and ready it.
1057
1058             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1059             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1060             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1061                     containerNode, true, false, 1), getRef());
1062             expectMsgClass(duration, ReadyTransactionReply.class);
1063
1064             // Create a read Tx on the same chain.
1065
1066             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1067                     transactionChainID).toSerializable(), getRef());
1068
1069             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1070
1071             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1072             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1073             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1074
1075             // Commit the write transaction.
1076
1077             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1078             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1079                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1080             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1081
1082             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1083             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1084
1085             // Verify data in the data store.
1086
1087             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1088             assertEquals("Stored node", containerNode, actualNode);
1089
1090             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1091         }};
1092     }
1093
1094     @Test
1095     public void testOnBatchedModificationsWhenNotLeader() {
1096         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1097         new ShardTestKit(getSystem()) {{
1098             final Creator<Shard> creator = new Creator<Shard>() {
1099                 private static final long serialVersionUID = 1L;
1100
1101                 @Override
1102                 public Shard create() throws Exception {
1103                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1104                             newDatastoreContext(), SCHEMA_CONTEXT) {
1105                         @Override
1106                         protected boolean isLeader() {
1107                             return overrideLeaderCalls.get() ? false : super.isLeader();
1108                         }
1109
1110                         @Override
1111                         protected ActorSelection getLeader() {
1112                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1113                                 super.getLeader();
1114                         }
1115                     };
1116                 }
1117             };
1118
1119             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1120                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1121
1122             waitUntilLeader(shard);
1123
1124             overrideLeaderCalls.set(true);
1125
1126             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1127
1128             shard.tell(batched, ActorRef.noSender());
1129
1130             expectMsgEquals(batched);
1131
1132             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1133         }};
1134     }
1135
1136     @Test
1137     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1138         new ShardTestKit(getSystem()) {{
1139             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1140                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1141                     "testForwardedReadyTransactionWithImmediateCommit");
1142
1143             waitUntilLeader(shard);
1144
1145             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1146
1147             final String transactionID = "tx1";
1148             final MutableCompositeModification modification = new MutableCompositeModification();
1149             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1150             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1151                     TestModel.TEST_PATH, containerNode, modification);
1152
1153             final FiniteDuration duration = duration("5 seconds");
1154
1155             // Simulate the ForwardedReadyTransaction messages that would be sent
1156             // by the ShardTransaction.
1157
1158             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1159                     cohort, modification, true, true), getRef());
1160
1161             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1162
1163             final InOrder inOrder = inOrder(cohort);
1164             inOrder.verify(cohort).canCommit();
1165             inOrder.verify(cohort).preCommit();
1166             inOrder.verify(cohort).commit();
1167
1168             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1169             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1170
1171             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1172         }};
1173     }
1174
1175     @Test
1176     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1177         new ShardTestKit(getSystem()) {{
1178             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1179                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1180                     "testReadyLocalTransactionWithImmediateCommit");
1181
1182             waitUntilLeader(shard);
1183
1184             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1185
1186             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1187
1188             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1189             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1190             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1191             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1192
1193             final String txId = "tx1";
1194             modification.ready();
1195             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1196
1197             shard.tell(readyMessage, getRef());
1198
1199             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1200
1201             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1202             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1203
1204             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1205         }};
1206     }
1207
1208     @Test
1209     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1210         new ShardTestKit(getSystem()) {{
1211             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1212                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1213                     "testReadyLocalTransactionWithThreePhaseCommit");
1214
1215             waitUntilLeader(shard);
1216
1217             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1218
1219             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1220
1221             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1222             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1223             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1224             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1225
1226             final String txId = "tx1";
1227                 modification.ready();
1228             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1229
1230             shard.tell(readyMessage, getRef());
1231
1232             expectMsgClass(ReadyTransactionReply.class);
1233
1234             // Send the CanCommitTransaction message.
1235
1236             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1237             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1238                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1239             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1240
1241             // Send the CanCommitTransaction message.
1242
1243             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1244             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1245
1246             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1247             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1248
1249             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1250         }};
1251     }
1252
1253     @Test
1254     public void testCommitWithPersistenceDisabled() throws Throwable {
1255         dataStoreContextBuilder.persistent(false);
1256         new ShardTestKit(getSystem()) {{
1257             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1258                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1259                     "testCommitWithPersistenceDisabled");
1260
1261             waitUntilLeader(shard);
1262
1263             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1264
1265             // Setup a simulated transactions with a mock cohort.
1266
1267             final String transactionID = "tx";
1268             final MutableCompositeModification modification = new MutableCompositeModification();
1269             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1270             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1271                     TestModel.TEST_PATH, containerNode, modification);
1272
1273             final FiniteDuration duration = duration("5 seconds");
1274
1275             // Simulate the ForwardedReadyTransaction messages that would be sent
1276             // by the ShardTransaction.
1277
1278             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1279                     cohort, modification, true, false), getRef());
1280             expectMsgClass(duration, ReadyTransactionReply.class);
1281
1282             // Send the CanCommitTransaction message.
1283
1284             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1285             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1286                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1287             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1288
1289             // Send the CanCommitTransaction message.
1290
1291             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1292             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1293
1294             final InOrder inOrder = inOrder(cohort);
1295             inOrder.verify(cohort).canCommit();
1296             inOrder.verify(cohort).preCommit();
1297             inOrder.verify(cohort).commit();
1298
1299             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1300             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1301
1302             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1303         }};
1304     }
1305
1306     private static DataTreeCandidateTip mockCandidate(final String name) {
1307         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1308         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1309         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1310         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1311         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1312         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1313         return mockCandidate;
1314     }
1315
1316     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1317         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1318         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1319         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1320         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1321         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1322         return mockCandidate;
1323     }
1324
1325     @Test
1326     public void testCommitWhenTransactionHasNoModifications(){
1327         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1328         // but here that need not happen
1329         new ShardTestKit(getSystem()) {
1330             {
1331                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1332                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1333                         "testCommitWhenTransactionHasNoModifications");
1334
1335                 waitUntilLeader(shard);
1336
1337                 final String transactionID = "tx1";
1338                 final MutableCompositeModification modification = new MutableCompositeModification();
1339                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1340                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1341                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1342                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1343                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1344
1345                 final FiniteDuration duration = duration("5 seconds");
1346
1347                 // Simulate the ForwardedReadyTransaction messages that would be sent
1348                 // by the ShardTransaction.
1349
1350                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1351                         cohort, modification, true, false), getRef());
1352                 expectMsgClass(duration, ReadyTransactionReply.class);
1353
1354                 // Send the CanCommitTransaction message.
1355
1356                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1357                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1358                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1359                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1360
1361                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1362                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1363
1364                 final InOrder inOrder = inOrder(cohort);
1365                 inOrder.verify(cohort).canCommit();
1366                 inOrder.verify(cohort).preCommit();
1367                 inOrder.verify(cohort).commit();
1368
1369                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1370                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1371
1372                 // Use MBean for verification
1373                 // Committed transaction count should increase as usual
1374                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1375
1376                 // Commit index should not advance because this does not go into the journal
1377                 assertEquals(-1, shardStats.getCommitIndex());
1378
1379                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1380
1381             }
1382         };
1383     }
1384
1385     @Test
1386     public void testCommitWhenTransactionHasModifications(){
1387         new ShardTestKit(getSystem()) {
1388             {
1389                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1390                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1391                         "testCommitWhenTransactionHasModifications");
1392
1393                 waitUntilLeader(shard);
1394
1395                 final String transactionID = "tx1";
1396                 final MutableCompositeModification modification = new MutableCompositeModification();
1397                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1398                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1399                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1400                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1401                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1402                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1403
1404                 final FiniteDuration duration = duration("5 seconds");
1405
1406                 // Simulate the ForwardedReadyTransaction messages that would be sent
1407                 // by the ShardTransaction.
1408
1409                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1410                         cohort, modification, true, false), getRef());
1411                 expectMsgClass(duration, ReadyTransactionReply.class);
1412
1413                 // Send the CanCommitTransaction message.
1414
1415                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1416                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1417                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1418                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1419
1420                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1421                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1422
1423                 final InOrder inOrder = inOrder(cohort);
1424                 inOrder.verify(cohort).canCommit();
1425                 inOrder.verify(cohort).preCommit();
1426                 inOrder.verify(cohort).commit();
1427
1428                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1429                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1430
1431                 // Use MBean for verification
1432                 // Committed transaction count should increase as usual
1433                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1434
1435                 // Commit index should advance as we do not have an empty modification
1436                 assertEquals(0, shardStats.getCommitIndex());
1437
1438                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1439
1440             }
1441         };
1442     }
1443
1444     @Test
1445     public void testCommitPhaseFailure() throws Throwable {
1446         new ShardTestKit(getSystem()) {{
1447             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1448                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1449                     "testCommitPhaseFailure");
1450
1451             waitUntilLeader(shard);
1452
1453             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1454             // commit phase.
1455
1456             final String transactionID1 = "tx1";
1457             final MutableCompositeModification modification1 = new MutableCompositeModification();
1458             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1459             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1460             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1461             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1462             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1463
1464             final String transactionID2 = "tx2";
1465             final MutableCompositeModification modification2 = new MutableCompositeModification();
1466             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1467             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1468
1469             final FiniteDuration duration = duration("5 seconds");
1470             final Timeout timeout = new Timeout(duration);
1471
1472             // Simulate the ForwardedReadyTransaction messages that would be sent
1473             // by the ShardTransaction.
1474
1475             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1476                     cohort1, modification1, true, false), getRef());
1477             expectMsgClass(duration, ReadyTransactionReply.class);
1478
1479             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1480                     cohort2, modification2, true, false), getRef());
1481             expectMsgClass(duration, ReadyTransactionReply.class);
1482
1483             // Send the CanCommitTransaction message for the first Tx.
1484
1485             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1486             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1487                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1488             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1489
1490             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1491             // processed after the first Tx completes.
1492
1493             final Future<Object> canCommitFuture = Patterns.ask(shard,
1494                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1495
1496             // Send the CommitTransaction message for the first Tx. This should send back an error
1497             // and trigger the 2nd Tx to proceed.
1498
1499             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1500             expectMsgClass(duration, akka.actor.Status.Failure.class);
1501
1502             // Wait for the 2nd Tx to complete the canCommit phase.
1503
1504             final CountDownLatch latch = new CountDownLatch(1);
1505             canCommitFuture.onComplete(new OnComplete<Object>() {
1506                 @Override
1507                 public void onComplete(final Throwable t, final Object resp) {
1508                     latch.countDown();
1509                 }
1510             }, getSystem().dispatcher());
1511
1512             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1513
1514             final InOrder inOrder = inOrder(cohort1, cohort2);
1515             inOrder.verify(cohort1).canCommit();
1516             inOrder.verify(cohort1).preCommit();
1517             inOrder.verify(cohort1).commit();
1518             inOrder.verify(cohort2).canCommit();
1519
1520             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1521         }};
1522     }
1523
1524     @Test
1525     public void testPreCommitPhaseFailure() throws Throwable {
1526         new ShardTestKit(getSystem()) {{
1527             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1528                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1529                     "testPreCommitPhaseFailure");
1530
1531             waitUntilLeader(shard);
1532
1533             final String transactionID1 = "tx1";
1534             final MutableCompositeModification modification1 = new MutableCompositeModification();
1535             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1536             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1537             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1538
1539             final String transactionID2 = "tx2";
1540             final MutableCompositeModification modification2 = new MutableCompositeModification();
1541             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1542             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1543
1544             final FiniteDuration duration = duration("5 seconds");
1545             final Timeout timeout = new Timeout(duration);
1546
1547             // Simulate the ForwardedReadyTransaction messages that would be sent
1548             // by the ShardTransaction.
1549
1550             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1551                     cohort1, modification1, true, false), getRef());
1552             expectMsgClass(duration, ReadyTransactionReply.class);
1553
1554             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1555                     cohort2, modification2, true, false), getRef());
1556             expectMsgClass(duration, ReadyTransactionReply.class);
1557
1558             // Send the CanCommitTransaction message for the first Tx.
1559
1560             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1561             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1562                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1563             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1564
1565             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1566             // processed after the first Tx completes.
1567
1568             final Future<Object> canCommitFuture = Patterns.ask(shard,
1569                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1570
1571             // Send the CommitTransaction message for the first Tx. This should send back an error
1572             // and trigger the 2nd Tx to proceed.
1573
1574             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1575             expectMsgClass(duration, akka.actor.Status.Failure.class);
1576
1577             // Wait for the 2nd Tx to complete the canCommit phase.
1578
1579             final CountDownLatch latch = new CountDownLatch(1);
1580             canCommitFuture.onComplete(new OnComplete<Object>() {
1581                 @Override
1582                 public void onComplete(final Throwable t, final Object resp) {
1583                     latch.countDown();
1584                 }
1585             }, getSystem().dispatcher());
1586
1587             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1588
1589             final InOrder inOrder = inOrder(cohort1, cohort2);
1590             inOrder.verify(cohort1).canCommit();
1591             inOrder.verify(cohort1).preCommit();
1592             inOrder.verify(cohort2).canCommit();
1593
1594             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1595         }};
1596     }
1597
1598     @Test
1599     public void testCanCommitPhaseFailure() throws Throwable {
1600         new ShardTestKit(getSystem()) {{
1601             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1602                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1603                     "testCanCommitPhaseFailure");
1604
1605             waitUntilLeader(shard);
1606
1607             final FiniteDuration duration = duration("5 seconds");
1608
1609             final String transactionID1 = "tx1";
1610             final MutableCompositeModification modification = new MutableCompositeModification();
1611             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1612             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1613
1614             // Simulate the ForwardedReadyTransaction messages that would be sent
1615             // by the ShardTransaction.
1616
1617             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1618                     cohort, modification, true, false), getRef());
1619             expectMsgClass(duration, ReadyTransactionReply.class);
1620
1621             // Send the CanCommitTransaction message.
1622
1623             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1624             expectMsgClass(duration, akka.actor.Status.Failure.class);
1625
1626             // Send another can commit to ensure the failed one got cleaned up.
1627
1628             reset(cohort);
1629
1630             final String transactionID2 = "tx2";
1631             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1632
1633             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1634                     cohort, modification, true, false), getRef());
1635             expectMsgClass(duration, ReadyTransactionReply.class);
1636
1637             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1638             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1639                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1640             assertEquals("getCanCommit", true, reply.getCanCommit());
1641
1642             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1643         }};
1644     }
1645
1646     @Test
1647     public void testCanCommitPhaseFalseResponse() throws Throwable {
1648         new ShardTestKit(getSystem()) {{
1649             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1650                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1651                     "testCanCommitPhaseFalseResponse");
1652
1653             waitUntilLeader(shard);
1654
1655             final FiniteDuration duration = duration("5 seconds");
1656
1657             final String transactionID1 = "tx1";
1658             final MutableCompositeModification modification = new MutableCompositeModification();
1659             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1660             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1661
1662             // Simulate the ForwardedReadyTransaction messages that would be sent
1663             // by the ShardTransaction.
1664
1665             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1666                     cohort, modification, true, false), getRef());
1667             expectMsgClass(duration, ReadyTransactionReply.class);
1668
1669             // Send the CanCommitTransaction message.
1670
1671             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1672             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1673                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1674             assertEquals("getCanCommit", false, reply.getCanCommit());
1675
1676             // Send another can commit to ensure the failed one got cleaned up.
1677
1678             reset(cohort);
1679
1680             final String transactionID2 = "tx2";
1681             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1682
1683             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1684                     cohort, modification, true, false), getRef());
1685             expectMsgClass(duration, ReadyTransactionReply.class);
1686
1687             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1688             reply = CanCommitTransactionReply.fromSerializable(
1689                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1690             assertEquals("getCanCommit", true, reply.getCanCommit());
1691
1692             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1693         }};
1694     }
1695
1696     @Test
1697     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1698         new ShardTestKit(getSystem()) {{
1699             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1700                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1701                     "testImmediateCommitWithCanCommitPhaseFailure");
1702
1703             waitUntilLeader(shard);
1704
1705             final FiniteDuration duration = duration("5 seconds");
1706
1707             final String transactionID1 = "tx1";
1708             final MutableCompositeModification modification = new MutableCompositeModification();
1709             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1710             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1711
1712             // Simulate the ForwardedReadyTransaction messages that would be sent
1713             // by the ShardTransaction.
1714
1715             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1716                     cohort, modification, true, true), getRef());
1717
1718             expectMsgClass(duration, akka.actor.Status.Failure.class);
1719
1720             // Send another can commit to ensure the failed one got cleaned up.
1721
1722             reset(cohort);
1723
1724             final String transactionID2 = "tx2";
1725             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1726             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1727             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1728             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1729             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1730             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1731             doReturn(candidateRoot).when(candidate).getRootNode();
1732             doReturn(candidate).when(cohort).getCandidate();
1733
1734             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1735                     cohort, modification, true, true), getRef());
1736
1737             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1738
1739             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1740         }};
1741     }
1742
1743     @Test
1744     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1745         new ShardTestKit(getSystem()) {{
1746             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1747                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1748                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1749
1750             waitUntilLeader(shard);
1751
1752             final FiniteDuration duration = duration("5 seconds");
1753
1754             final String transactionID = "tx1";
1755             final MutableCompositeModification modification = new MutableCompositeModification();
1756             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1757             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1758
1759             // Simulate the ForwardedReadyTransaction messages that would be sent
1760             // by the ShardTransaction.
1761
1762             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1763                     cohort, modification, true, true), getRef());
1764
1765             expectMsgClass(duration, akka.actor.Status.Failure.class);
1766
1767             // Send another can commit to ensure the failed one got cleaned up.
1768
1769             reset(cohort);
1770
1771             final String transactionID2 = "tx2";
1772             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1773             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1774             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1775             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1776             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1777             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1778             doReturn(candidateRoot).when(candidate).getRootNode();
1779             doReturn(candidate).when(cohort).getCandidate();
1780
1781             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1782                     cohort, modification, true, true), getRef());
1783
1784             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1785
1786             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1787         }};
1788     }
1789
1790     @Test
1791     public void testAbortBeforeFinishCommit() throws Throwable {
1792         new ShardTestKit(getSystem()) {{
1793             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1794                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1795                     "testAbortBeforeFinishCommit");
1796
1797             waitUntilLeader(shard);
1798
1799             final FiniteDuration duration = duration("5 seconds");
1800             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1801
1802             final String transactionID = "tx1";
1803             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1804                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1805                 @Override
1806                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1807                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1808
1809                     // Simulate an AbortTransaction message occurring during replication, after
1810                     // persisting and before finishing the commit to the in-memory store.
1811                     // We have no followers so due to optimizations in the RaftActor, it does not
1812                     // attempt replication and thus we can't send an AbortTransaction message b/c
1813                     // it would be processed too late after CommitTransaction completes. So we'll
1814                     // simulate an AbortTransaction message occurring during replication by calling
1815                     // the shard directly.
1816                     //
1817                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1818
1819                     return preCommitFuture;
1820                 }
1821             };
1822
1823             final MutableCompositeModification modification = new MutableCompositeModification();
1824             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1825                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1826                     modification, preCommit);
1827
1828             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1829                     cohort, modification, true, false), getRef());
1830             expectMsgClass(duration, ReadyTransactionReply.class);
1831
1832             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1833             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1834                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1835             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1836
1837             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1838             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1839
1840             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1841
1842             // Since we're simulating an abort occurring during replication and before finish commit,
1843             // the data should still get written to the in-memory store since we've gotten past
1844             // canCommit and preCommit and persisted the data.
1845             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1846
1847             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1848         }};
1849     }
1850
1851     @Test
1852     public void testTransactionCommitTimeout() throws Throwable {
1853         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1854
1855         new ShardTestKit(getSystem()) {{
1856             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1857                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1858                     "testTransactionCommitTimeout");
1859
1860             waitUntilLeader(shard);
1861
1862             final FiniteDuration duration = duration("5 seconds");
1863
1864             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1865
1866             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1867             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1868                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1869
1870             // Create 1st Tx - will timeout
1871
1872             final String transactionID1 = "tx1";
1873             final MutableCompositeModification modification1 = new MutableCompositeModification();
1874             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1875                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1876                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1877                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1878                     modification1);
1879
1880             // Create 2nd Tx
1881
1882             final String transactionID2 = "tx3";
1883             final MutableCompositeModification modification2 = new MutableCompositeModification();
1884             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1885                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1886             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1887                     listNodePath,
1888                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1889                     modification2);
1890
1891             // Ready the Tx's
1892
1893             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1894                     cohort1, modification1, true, false), getRef());
1895             expectMsgClass(duration, ReadyTransactionReply.class);
1896
1897             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1898                     cohort2, modification2, true, false), getRef());
1899             expectMsgClass(duration, ReadyTransactionReply.class);
1900
1901             // canCommit 1st Tx. We don't send the commit so it should timeout.
1902
1903             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1904             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1905
1906             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1907
1908             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1909             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1910
1911             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1912
1913             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1914             expectMsgClass(duration, akka.actor.Status.Failure.class);
1915
1916             // Commit the 2nd Tx.
1917
1918             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1919             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1920
1921             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1922             assertNotNull(listNodePath + " not found", node);
1923
1924             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1925         }};
1926     }
1927
1928     @Test
1929     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1930         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1931
1932         new ShardTestKit(getSystem()) {{
1933             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1934                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1935                     "testTransactionCommitQueueCapacityExceeded");
1936
1937             waitUntilLeader(shard);
1938
1939             final FiniteDuration duration = duration("5 seconds");
1940
1941             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1942
1943             final String transactionID1 = "tx1";
1944             final MutableCompositeModification modification1 = new MutableCompositeModification();
1945             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1946                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1947
1948             final String transactionID2 = "tx2";
1949             final MutableCompositeModification modification2 = new MutableCompositeModification();
1950             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1951                     TestModel.OUTER_LIST_PATH,
1952                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1953                     modification2);
1954
1955             final String transactionID3 = "tx3";
1956             final MutableCompositeModification modification3 = new MutableCompositeModification();
1957             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1958                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1959
1960             // Ready the Tx's
1961
1962             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1963                     cohort1, modification1, true, false), getRef());
1964             expectMsgClass(duration, ReadyTransactionReply.class);
1965
1966             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1967                     cohort2, modification2, true, false), getRef());
1968             expectMsgClass(duration, ReadyTransactionReply.class);
1969
1970             // The 3rd Tx should exceed queue capacity and fail.
1971
1972             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1973                     cohort3, modification3, true, false), getRef());
1974             expectMsgClass(duration, akka.actor.Status.Failure.class);
1975
1976             // canCommit 1st Tx.
1977
1978             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1979             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1980
1981             // canCommit the 2nd Tx - it should get queued.
1982
1983             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1984
1985             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1986
1987             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1988             expectMsgClass(duration, akka.actor.Status.Failure.class);
1989
1990             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1991         }};
1992     }
1993
1994     @Test
1995     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1996         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1997
1998         new ShardTestKit(getSystem()) {{
1999             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2000                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2001                     "testTransactionCommitWithPriorExpiredCohortEntries");
2002
2003             waitUntilLeader(shard);
2004
2005             final FiniteDuration duration = duration("5 seconds");
2006
2007             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2008
2009             final String transactionID1 = "tx1";
2010             final MutableCompositeModification modification1 = new MutableCompositeModification();
2011             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2012                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2013
2014             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2015                     cohort1, modification1, true, false), getRef());
2016             expectMsgClass(duration, ReadyTransactionReply.class);
2017
2018             final String transactionID2 = "tx2";
2019             final MutableCompositeModification modification2 = new MutableCompositeModification();
2020             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2021                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2022
2023             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2024                     cohort2, modification2, true, false), getRef());
2025             expectMsgClass(duration, ReadyTransactionReply.class);
2026
2027             final String transactionID3 = "tx3";
2028             final MutableCompositeModification modification3 = new MutableCompositeModification();
2029             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2030                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2031
2032             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2033                     cohort3, modification3, true, false), getRef());
2034             expectMsgClass(duration, ReadyTransactionReply.class);
2035
2036             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2037             // should expire from the queue and the last one should be processed.
2038
2039             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2040             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2041
2042             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2043         }};
2044     }
2045
2046     @Test
2047     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2048         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2049
2050         new ShardTestKit(getSystem()) {{
2051             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2052                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2053                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2054
2055             waitUntilLeader(shard);
2056
2057             final FiniteDuration duration = duration("5 seconds");
2058
2059             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2060
2061             final String transactionID1 = "tx1";
2062             final MutableCompositeModification modification1 = new MutableCompositeModification();
2063             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2064                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2065
2066             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2067                     cohort1, modification1, true, false), getRef());
2068             expectMsgClass(duration, ReadyTransactionReply.class);
2069
2070             // CanCommit the first one so it's the current in-progress CohortEntry.
2071
2072             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2073             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2074
2075             // Ready the second Tx.
2076
2077             final String transactionID2 = "tx2";
2078             final MutableCompositeModification modification2 = new MutableCompositeModification();
2079             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2080                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2081
2082             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2083                     cohort2, modification2, true, false), getRef());
2084             expectMsgClass(duration, ReadyTransactionReply.class);
2085
2086             // Ready the third Tx.
2087
2088             final String transactionID3 = "tx3";
2089             final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2090             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2091                     .apply(modification3);
2092                 modification3.ready();
2093             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2094
2095             shard.tell(readyMessage, getRef());
2096
2097             // Commit the first Tx. After completing, the second should expire from the queue and the third
2098             // Tx committed.
2099
2100             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2101             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2102
2103             // Expect commit reply from the third Tx.
2104
2105             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2106
2107             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2108             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2109
2110             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2111         }};
2112     }
2113
2114     @Test
2115     public void testCanCommitBeforeReadyFailure() throws Throwable {
2116         new ShardTestKit(getSystem()) {{
2117             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2118                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2119                     "testCanCommitBeforeReadyFailure");
2120
2121             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2122             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2123
2124             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2125         }};
2126     }
2127
2128     @Test
2129     public void testAbortCurrentTransaction() throws Throwable {
2130         new ShardTestKit(getSystem()) {{
2131             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2132                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2133                     "testAbortCurrentTransaction");
2134
2135             waitUntilLeader(shard);
2136
2137             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2138
2139             final String transactionID1 = "tx1";
2140             final MutableCompositeModification modification1 = new MutableCompositeModification();
2141             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2142             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2143             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2144
2145             final String transactionID2 = "tx2";
2146             final MutableCompositeModification modification2 = new MutableCompositeModification();
2147             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2148             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2149
2150             final FiniteDuration duration = duration("5 seconds");
2151             final Timeout timeout = new Timeout(duration);
2152
2153             // Simulate the ForwardedReadyTransaction messages that would be sent
2154             // by the ShardTransaction.
2155
2156             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2157                     cohort1, modification1, true, false), getRef());
2158             expectMsgClass(duration, ReadyTransactionReply.class);
2159
2160             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2161                     cohort2, modification2, true, false), getRef());
2162             expectMsgClass(duration, ReadyTransactionReply.class);
2163
2164             // Send the CanCommitTransaction message for the first Tx.
2165
2166             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2167             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2168                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2169             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2170
2171             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2172             // processed after the first Tx completes.
2173
2174             final Future<Object> canCommitFuture = Patterns.ask(shard,
2175                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2176
2177             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2178             // Tx to proceed.
2179
2180             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2181             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2182
2183             // Wait for the 2nd Tx to complete the canCommit phase.
2184
2185             Await.ready(canCommitFuture, duration);
2186
2187             final InOrder inOrder = inOrder(cohort1, cohort2);
2188             inOrder.verify(cohort1).canCommit();
2189             inOrder.verify(cohort2).canCommit();
2190
2191             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2192         }};
2193     }
2194
2195     @Test
2196     public void testAbortQueuedTransaction() throws Throwable {
2197         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2198         new ShardTestKit(getSystem()) {{
2199             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2200             @SuppressWarnings("serial")
2201             final Creator<Shard> creator = new Creator<Shard>() {
2202                 @Override
2203                 public Shard create() throws Exception {
2204                     return new Shard(shardID, Collections.<String,String>emptyMap(),
2205                             dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
2206                         @Override
2207                         public void onReceiveCommand(final Object message) throws Exception {
2208                             super.onReceiveCommand(message);
2209                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2210                                 if(cleaupCheckLatch.get() != null) {
2211                                     cleaupCheckLatch.get().countDown();
2212                                 }
2213                             }
2214                         }
2215                     };
2216                 }
2217             };
2218
2219             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2220                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2221                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2222
2223             waitUntilLeader(shard);
2224
2225             final String transactionID = "tx1";
2226
2227             final MutableCompositeModification modification = new MutableCompositeModification();
2228             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2229             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2230
2231             final FiniteDuration duration = duration("5 seconds");
2232
2233             // Ready the tx.
2234
2235             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2236                     cohort, modification, true, false), getRef());
2237             expectMsgClass(duration, ReadyTransactionReply.class);
2238
2239             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2240
2241             // Send the AbortTransaction message.
2242
2243             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2244             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2245
2246             verify(cohort).abort();
2247
2248             // Verify the tx cohort is removed from queue at the cleanup check interval.
2249
2250             cleaupCheckLatch.set(new CountDownLatch(1));
2251             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2252                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2253
2254             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2255
2256             // Now send CanCommitTransaction - should fail.
2257
2258             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2259
2260             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2261             assertTrue("Failure type", failure instanceof IllegalStateException);
2262
2263             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2264         }};
2265     }
2266
2267     @Test
2268     public void testCreateSnapshot() throws Exception {
2269         testCreateSnapshot(true, "testCreateSnapshot");
2270     }
2271
2272     @Test
2273     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2274         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2275     }
2276
2277     @SuppressWarnings("serial")
2278     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2279
2280         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2281
2282         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2283         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2284             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2285                 super(delegate);
2286             }
2287
2288             @Override
2289             public void saveSnapshot(final Object o) {
2290                 savedSnapshot.set(o);
2291                 super.saveSnapshot(o);
2292             }
2293         }
2294
2295         dataStoreContextBuilder.persistent(persistent);
2296
2297         new ShardTestKit(getSystem()) {{
2298             class TestShard extends Shard {
2299
2300                 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2301                                     final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2302                     super(name, peerAddresses, datastoreContext, schemaContext);
2303                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2304                 }
2305
2306                 @Override
2307                 public void handleCommand(final Object message) {
2308                     super.handleCommand(message);
2309
2310                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2311                         latch.get().countDown();
2312                     }
2313                 }
2314
2315                 @Override
2316                 public RaftActorContext getRaftActorContext() {
2317                     return super.getRaftActorContext();
2318                 }
2319             }
2320
2321             final Creator<Shard> creator = new Creator<Shard>() {
2322                 @Override
2323                 public Shard create() throws Exception {
2324                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2325                             newDatastoreContext(), SCHEMA_CONTEXT);
2326                 }
2327             };
2328
2329             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2330                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2331
2332             waitUntilLeader(shard);
2333
2334             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2335
2336             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2337
2338             // Trigger creation of a snapshot by ensuring
2339             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2340             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2341
2342             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2343
2344             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2345                     savedSnapshot.get() instanceof Snapshot);
2346
2347             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2348
2349             latch.set(new CountDownLatch(1));
2350             savedSnapshot.set(null);
2351
2352             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2353
2354             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2355
2356             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2357                     savedSnapshot.get() instanceof Snapshot);
2358
2359             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2360
2361             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2362         }
2363
2364         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2365
2366             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2367             assertEquals("Root node", expectedRoot, actual);
2368
2369         }};
2370     }
2371
2372     /**
2373      * This test simply verifies that the applySnapShot logic will work
2374      * @throws ReadFailedException
2375      * @throws DataValidationFailedException
2376      */
2377     @Test
2378     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2379         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2380         store.setSchemaContext(SCHEMA_CONTEXT);
2381
2382         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2383         putTransaction.write(TestModel.TEST_PATH,
2384             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2385         commitTransaction(store, putTransaction);
2386
2387
2388         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2389
2390         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2391
2392         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2393         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2394
2395         commitTransaction(store, writeTransaction);
2396
2397         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2398
2399         assertEquals(expected, actual);
2400     }
2401
2402     @Test
2403     public void testRecoveryApplicable(){
2404
2405         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2406                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2407
2408         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2409                 persistentContext, SCHEMA_CONTEXT);
2410
2411         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2412                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2413
2414         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2415                 nonPersistentContext, SCHEMA_CONTEXT);
2416
2417         new ShardTestKit(getSystem()) {{
2418             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2419                     persistentProps, "testPersistence1");
2420
2421             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2422
2423             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2424
2425             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2426                     nonPersistentProps, "testPersistence2");
2427
2428             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2429
2430             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2431
2432         }};
2433
2434     }
2435
2436     @Test
2437     public void testOnDatastoreContext() {
2438         new ShardTestKit(getSystem()) {{
2439             dataStoreContextBuilder.persistent(true);
2440
2441             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2442
2443             assertEquals("isRecoveryApplicable", true,
2444                     shard.underlyingActor().persistence().isRecoveryApplicable());
2445
2446             waitUntilLeader(shard);
2447
2448             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2449
2450             assertEquals("isRecoveryApplicable", false,
2451                     shard.underlyingActor().persistence().isRecoveryApplicable());
2452
2453             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2454
2455             assertEquals("isRecoveryApplicable", true,
2456                     shard.underlyingActor().persistence().isRecoveryApplicable());
2457
2458             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2459         }};
2460     }
2461
2462     @Test
2463     public void testRegisterRoleChangeListener() throws Exception {
2464         new ShardTestKit(getSystem()) {
2465             {
2466                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2467                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2468                         "testRegisterRoleChangeListener");
2469
2470                 waitUntilLeader(shard);
2471
2472                 final TestActorRef<MessageCollectorActor> listener =
2473                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2474
2475                 shard.tell(new RegisterRoleChangeListener(), listener);
2476
2477                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2478
2479                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2480                         ShardLeaderStateChanged.class);
2481                 assertEquals("getLocalShardDataTree present", true,
2482                         leaderStateChanged.getLocalShardDataTree().isPresent());
2483                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2484                         leaderStateChanged.getLocalShardDataTree().get());
2485
2486                 MessageCollectorActor.clearMessages(listener);
2487
2488                 // Force a leader change
2489
2490                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2491
2492                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2493                         ShardLeaderStateChanged.class);
2494                 assertEquals("getLocalShardDataTree present", false,
2495                         leaderStateChanged.getLocalShardDataTree().isPresent());
2496
2497                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2498             }
2499         };
2500     }
2501
2502     @Test
2503     public void testFollowerInitialSyncStatus() throws Exception {
2504         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2505                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2506                 "testFollowerInitialSyncStatus");
2507
2508         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2509
2510         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2511
2512         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2513
2514         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2515
2516         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2517     }
2518
2519     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2520         modification.ready();
2521         store.validate(modification);
2522         store.commit(store.prepare(modification));
2523     }
2524 }