Add getPeerIds to RaftActorContext
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashSet;
43 import java.util.Map;
44 import java.util.Set;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicReference;
49 import org.junit.Test;
50 import org.mockito.InOrder;
51 import org.opendaylight.controller.cluster.DataPersistenceProvider;
52 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
62 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
76 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
77 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.Modification;
79 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
80 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
81 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
84 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
87 import org.opendaylight.controller.cluster.raft.RaftActorContext;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
89 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
90 import org.opendaylight.controller.cluster.raft.Snapshot;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
92 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
93 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
94 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
96 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
97 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
98 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
99 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
100 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
101 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
102 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
103 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
104 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
105 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
106 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
107 import org.opendaylight.yangtools.yang.common.QName;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
123 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
124 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
125 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
126 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
127 import scala.concurrent.Await;
128 import scala.concurrent.Future;
129 import scala.concurrent.duration.FiniteDuration;
130
131 public class ShardTest extends AbstractShardTest {
132     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
133
134     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
135
136     @Test
137     public void testRegisterChangeListener() throws Exception {
138         new ShardTestKit(getSystem()) {{
139             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
140                     newShardProps(),  "testRegisterChangeListener");
141
142             waitUntilLeader(shard);
143
144             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
145
146             final MockDataChangeListener listener = new MockDataChangeListener(1);
147             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
148                     "testRegisterChangeListener-DataChangeListener");
149
150             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
151                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
152
153             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
154                     RegisterChangeListenerReply.class);
155             final String replyPath = reply.getListenerRegistrationPath().toString();
156             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
157                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
158
159             final YangInstanceIdentifier path = TestModel.TEST_PATH;
160             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
161
162             listener.waitForChangeEvents(path);
163
164             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
166         }};
167     }
168
169     @SuppressWarnings("serial")
170     @Test
171     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
172         // This test tests the timing window in which a change listener is registered before the
173         // shard becomes the leader. We verify that the listener is registered and notified of the
174         // existing data when the shard becomes the leader.
175         new ShardTestKit(getSystem()) {{
176             // For this test, we want to send the RegisterChangeListener message after the shard
177             // has recovered from persistence and before it becomes the leader. So we subclass
178             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
179             // we know that the shard has been initialized to a follower and has started the
180             // election process. The following 2 CountDownLatches are used to coordinate the
181             // ElectionTimeout with the sending of the RegisterChangeListener message.
182             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
183             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
184             final Creator<Shard> creator = new Creator<Shard>() {
185                 boolean firstElectionTimeout = true;
186
187                 @Override
188                 public Shard create() throws Exception {
189                     // Use a non persistent provider because this test actually invokes persist on the journal
190                     // this will cause all other messages to not be queued properly after that.
191                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
192                     // it does do a persist)
193                     return new Shard(shardID, Collections.<String,String>emptyMap(),
194                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
195                         @Override
196                         public void onReceiveCommand(final Object message) throws Exception {
197                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
198                                 // Got the first ElectionTimeout. We don't forward it to the
199                                 // base Shard yet until we've sent the RegisterChangeListener
200                                 // message. So we signal the onFirstElectionTimeout latch to tell
201                                 // the main thread to send the RegisterChangeListener message and
202                                 // start a thread to wait on the onChangeListenerRegistered latch,
203                                 // which the main thread signals after it has sent the message.
204                                 // After the onChangeListenerRegistered is triggered, we send the
205                                 // original ElectionTimeout message to proceed with the election.
206                                 firstElectionTimeout = false;
207                                 final ActorRef self = getSelf();
208                                 new Thread() {
209                                     @Override
210                                     public void run() {
211                                         Uninterruptibles.awaitUninterruptibly(
212                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
213                                         self.tell(message, self);
214                                     }
215                                 }.start();
216
217                                 onFirstElectionTimeout.countDown();
218                             } else {
219                                 super.onReceiveCommand(message);
220                             }
221                         }
222                     };
223                 }
224             };
225
226             final MockDataChangeListener listener = new MockDataChangeListener(1);
227             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
228                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
229
230             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
231                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
232                     "testRegisterChangeListenerWhenNotLeaderInitially");
233
234             // Write initial data into the in-memory store.
235             final YangInstanceIdentifier path = TestModel.TEST_PATH;
236             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
237
238             // Wait until the shard receives the first ElectionTimeout message.
239             assertEquals("Got first ElectionTimeout", true,
240                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
241
242             // Now send the RegisterChangeListener and wait for the reply.
243             shard.tell(new RegisterChangeListener(path, dclActor,
244                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
245
246             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
247                     RegisterChangeListenerReply.class);
248             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
249
250             // Sanity check - verify the shard is not the leader yet.
251             shard.tell(new FindLeader(), getRef());
252             final FindLeaderReply findLeadeReply =
253                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
254             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
255
256             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
257             // with the election process.
258             onChangeListenerRegistered.countDown();
259
260             // Wait for the shard to become the leader and notify our listener with the existing
261             // data in the store.
262             listener.waitForChangeEvents(path);
263
264             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
265             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
266         }};
267     }
268
269     @Test
270     public void testRegisterDataTreeChangeListener() throws Exception {
271         new ShardTestKit(getSystem()) {{
272             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
273                     newShardProps(), "testRegisterDataTreeChangeListener");
274
275             waitUntilLeader(shard);
276
277             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
278
279             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
280             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
281                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
282
283             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
284
285             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
286                     RegisterDataTreeChangeListenerReply.class);
287             final String replyPath = reply.getListenerRegistrationPath().toString();
288             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
289                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
290
291             final YangInstanceIdentifier path = TestModel.TEST_PATH;
292             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
293
294             listener.waitForChangeEvents();
295
296             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
297             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
298         }};
299     }
300
301     @SuppressWarnings("serial")
302     @Test
303     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
304         new ShardTestKit(getSystem()) {{
305             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
306             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
307             final Creator<Shard> creator = new Creator<Shard>() {
308                 boolean firstElectionTimeout = true;
309
310                 @Override
311                 public Shard create() throws Exception {
312                     return new Shard(shardID, Collections.<String,String>emptyMap(),
313                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
314                         @Override
315                         public void onReceiveCommand(final Object message) throws Exception {
316                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
317                                 firstElectionTimeout = false;
318                                 final ActorRef self = getSelf();
319                                 new Thread() {
320                                     @Override
321                                     public void run() {
322                                         Uninterruptibles.awaitUninterruptibly(
323                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
324                                         self.tell(message, self);
325                                     }
326                                 }.start();
327
328                                 onFirstElectionTimeout.countDown();
329                             } else {
330                                 super.onReceiveCommand(message);
331                             }
332                         }
333                     };
334                 }
335             };
336
337             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
338             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
339                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
340
341             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
342                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
343                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
344
345             final YangInstanceIdentifier path = TestModel.TEST_PATH;
346             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
347
348             assertEquals("Got first ElectionTimeout", true,
349                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
350
351             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
352             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
353                 RegisterDataTreeChangeListenerReply.class);
354             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
355
356             shard.tell(new FindLeader(), getRef());
357             final FindLeaderReply findLeadeReply =
358                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
359             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
360
361             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
362
363             onChangeListenerRegistered.countDown();
364
365             // TODO: investigate why we do not receive data chage events
366             listener.waitForChangeEvents();
367
368             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
369             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
370         }};
371     }
372
373     @Test
374     public void testCreateTransaction(){
375         new ShardTestKit(getSystem()) {{
376             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
377
378             waitUntilLeader(shard);
379
380             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
381
382             shard.tell(new CreateTransaction("txn-1",
383                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
384
385             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
386                     CreateTransactionReply.class);
387
388             final String path = reply.getTransactionActorPath().toString();
389             assertTrue("Unexpected transaction path " + path,
390                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
391
392             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
393         }};
394     }
395
396     @Test
397     public void testCreateTransactionOnChain(){
398         new ShardTestKit(getSystem()) {{
399             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
400
401             waitUntilLeader(shard);
402
403             shard.tell(new CreateTransaction("txn-1",
404                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
405                     getRef());
406
407             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
408                     CreateTransactionReply.class);
409
410             final String path = reply.getTransactionActorPath().toString();
411             assertTrue("Unexpected transaction path " + path,
412                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
413
414             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
415         }};
416     }
417
418     @SuppressWarnings("serial")
419     @Test
420     public void testPeerAddressResolved() throws Exception {
421         new ShardTestKit(getSystem()) {{
422             final CountDownLatch recoveryComplete = new CountDownLatch(1);
423             class TestShard extends Shard {
424                 TestShard() {
425                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
426                             newDatastoreContext(), SCHEMA_CONTEXT);
427                 }
428
429                 String getPeerAddress(String id) {
430                     return getRaftActorContext().getPeerAddress(id);
431                 }
432
433                 @Override
434                 protected void onRecoveryComplete() {
435                     try {
436                         super.onRecoveryComplete();
437                     } finally {
438                         recoveryComplete.countDown();
439                     }
440                 }
441             }
442
443             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
444                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
445                         @Override
446                         public TestShard create() throws Exception {
447                             return new TestShard();
448                         }
449                     })), "testPeerAddressResolved");
450
451             assertEquals("Recovery complete", true,
452                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
453
454             final String address = "akka://foobar";
455             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
456
457             assertEquals("getPeerAddress", address,
458                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
459
460             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
461         }};
462     }
463
464     @Test
465     public void testApplySnapshot() throws Exception {
466
467         ShardTestKit testkit = new ShardTestKit(getSystem());
468
469         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
470                 "testApplySnapshot");
471
472         testkit.waitUntilLeader(shard);
473
474         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
475         store.setSchemaContext(SCHEMA_CONTEXT);
476
477         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
478                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
479                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
480                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
481
482         writeToStore(store, TestModel.TEST_PATH, container);
483
484         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
485         final NormalizedNode<?,?> expected = readStore(store, root);
486
487         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
488                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
489
490         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
491
492         final NormalizedNode<?,?> actual = readStore(shard, root);
493
494         assertEquals("Root node", expected, actual);
495
496         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
497     }
498
499     @Test
500     public void testApplyState() throws Exception {
501
502         ShardTestKit testkit = new ShardTestKit(getSystem());
503
504         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
505
506         testkit.waitUntilLeader(shard);
507
508         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
509
510         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
511                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
512
513         shard.underlyingActor().onReceiveCommand(applyState);
514
515         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
516         assertEquals("Applied state", node, actual);
517
518         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
519     }
520
521     @Test
522     public void testApplyStateWithCandidatePayload() throws Exception {
523
524         ShardTestKit testkit = new ShardTestKit(getSystem());
525
526         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
527
528         testkit.waitUntilLeader(shard);
529
530         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
531         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
532
533         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
534                 DataTreeCandidatePayload.create(candidate)));
535
536         shard.underlyingActor().onReceiveCommand(applyState);
537
538         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
539         assertEquals("Applied state", node, actual);
540
541         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
542     }
543
544     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
545         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
546         testStore.setSchemaContext(SCHEMA_CONTEXT);
547
548         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
549
550         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
551
552         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
553             SerializationUtils.serializeNormalizedNode(root),
554             Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
555         return testStore;
556     }
557
558     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
559         source.validate(mod);
560         final DataTreeCandidate candidate = source.prepare(mod);
561         source.commit(candidate);
562         return DataTreeCandidatePayload.create(candidate);
563     }
564
565     @Test
566     public void testDataTreeCandidateRecovery() throws Exception {
567         // Set up the InMemorySnapshotStore.
568         final DataTree source = setupInMemorySnapshotStore();
569
570         final DataTreeModification writeMod = source.takeSnapshot().newModification();
571         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
572         writeMod.ready();
573         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
574
575         // Set up the InMemoryJournal.
576         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
577
578         final int nListEntries = 16;
579         final Set<Integer> listEntryKeys = new HashSet<>();
580
581         // Add some ModificationPayload entries
582         for (int i = 1; i <= nListEntries; i++) {
583             listEntryKeys.add(Integer.valueOf(i));
584
585             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
586                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
587
588             final DataTreeModification mod = source.takeSnapshot().newModification();
589             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
590             mod.ready();
591             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
592                 payloadForModification(source, mod)));
593         }
594
595         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
596             new ApplyJournalEntries(nListEntries));
597
598         testRecovery(listEntryKeys);
599     }
600
601     @Test
602     public void testModicationRecovery() throws Exception {
603
604         // Set up the InMemorySnapshotStore.
605         setupInMemorySnapshotStore();
606
607         // Set up the InMemoryJournal.
608
609         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
610
611         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
612             new WriteModification(TestModel.OUTER_LIST_PATH,
613                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
614
615         final int nListEntries = 16;
616         final Set<Integer> listEntryKeys = new HashSet<>();
617
618         // Add some ModificationPayload entries
619         for(int i = 1; i <= nListEntries; i++) {
620             listEntryKeys.add(Integer.valueOf(i));
621             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
622                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
623             final Modification mod = new MergeModification(path,
624                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
625             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
626                     newModificationPayload(mod)));
627         }
628
629         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
630                 new ApplyJournalEntries(nListEntries));
631
632         testRecovery(listEntryKeys);
633     }
634
635     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
636         final MutableCompositeModification compMod = new MutableCompositeModification();
637         for(final Modification mod: mods) {
638             compMod.addModification(mod);
639         }
640
641         return new ModificationPayload(compMod);
642     }
643
644     @Test
645     public void testConcurrentThreePhaseCommits() throws Throwable {
646         new ShardTestKit(getSystem()) {{
647             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
648                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
649                     "testConcurrentThreePhaseCommits");
650
651             waitUntilLeader(shard);
652
653          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
654
655             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
656
657             final String transactionID1 = "tx1";
658             final MutableCompositeModification modification1 = new MutableCompositeModification();
659             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
660                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
661
662             final String transactionID2 = "tx2";
663             final MutableCompositeModification modification2 = new MutableCompositeModification();
664             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
665                     TestModel.OUTER_LIST_PATH,
666                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
667                     modification2);
668
669             final String transactionID3 = "tx3";
670             final MutableCompositeModification modification3 = new MutableCompositeModification();
671             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
672                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
673                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
674                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
675                     modification3);
676
677             final long timeoutSec = 5;
678             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
679             final Timeout timeout = new Timeout(duration);
680
681             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
682             // by the ShardTransaction.
683
684             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
685                     cohort1, modification1, true, false), getRef());
686             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
687                     expectMsgClass(duration, ReadyTransactionReply.class));
688             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
689
690             // Send the CanCommitTransaction message for the first Tx.
691
692             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
693             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
694                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
695             assertEquals("Can commit", true, canCommitReply.getCanCommit());
696
697             // Send the ForwardedReadyTransaction for the next 2 Tx's.
698
699             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
700                     cohort2, modification2, true, false), getRef());
701             expectMsgClass(duration, ReadyTransactionReply.class);
702
703             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
704                     cohort3, modification3, true, false), getRef());
705             expectMsgClass(duration, ReadyTransactionReply.class);
706
707             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
708             // processed after the first Tx completes.
709
710             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
711                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
712
713             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
714                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
715
716             // Send the CommitTransaction message for the first Tx. After it completes, it should
717             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
718
719             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
720             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
721
722             // Wait for the next 2 Tx's to complete.
723
724             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
725             final CountDownLatch commitLatch = new CountDownLatch(2);
726
727             class OnFutureComplete extends OnComplete<Object> {
728                 private final Class<?> expRespType;
729
730                 OnFutureComplete(final Class<?> expRespType) {
731                     this.expRespType = expRespType;
732                 }
733
734                 @Override
735                 public void onComplete(final Throwable error, final Object resp) {
736                     if(error != null) {
737                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
738                     } else {
739                         try {
740                             assertEquals("Commit response type", expRespType, resp.getClass());
741                             onSuccess(resp);
742                         } catch (final Exception e) {
743                             caughtEx.set(e);
744                         }
745                     }
746                 }
747
748                 void onSuccess(final Object resp) throws Exception {
749                 }
750             }
751
752             class OnCommitFutureComplete extends OnFutureComplete {
753                 OnCommitFutureComplete() {
754                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
755                 }
756
757                 @Override
758                 public void onComplete(final Throwable error, final Object resp) {
759                     super.onComplete(error, resp);
760                     commitLatch.countDown();
761                 }
762             }
763
764             class OnCanCommitFutureComplete extends OnFutureComplete {
765                 private final String transactionID;
766
767                 OnCanCommitFutureComplete(final String transactionID) {
768                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
769                     this.transactionID = transactionID;
770                 }
771
772                 @Override
773                 void onSuccess(final Object resp) throws Exception {
774                     final CanCommitTransactionReply canCommitReply =
775                             CanCommitTransactionReply.fromSerializable(resp);
776                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
777
778                     final Future<Object> commitFuture = Patterns.ask(shard,
779                             new CommitTransaction(transactionID).toSerializable(), timeout);
780                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
781                 }
782             }
783
784             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
785                     getSystem().dispatcher());
786
787             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
788                     getSystem().dispatcher());
789
790             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
791
792             if(caughtEx.get() != null) {
793                 throw caughtEx.get();
794             }
795
796             assertEquals("Commits complete", true, done);
797
798             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
799             inOrder.verify(cohort1).canCommit();
800             inOrder.verify(cohort1).preCommit();
801             inOrder.verify(cohort1).commit();
802             inOrder.verify(cohort2).canCommit();
803             inOrder.verify(cohort2).preCommit();
804             inOrder.verify(cohort2).commit();
805             inOrder.verify(cohort3).canCommit();
806             inOrder.verify(cohort3).preCommit();
807             inOrder.verify(cohort3).commit();
808
809             // Verify data in the data store.
810
811             verifyOuterListEntry(shard, 1);
812
813             verifyLastApplied(shard, 2);
814
815             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
816         }};
817     }
818
819     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
820             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
821         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
822     }
823
824     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
825             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
826             final int messagesSent) {
827         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
828         batched.addModification(new WriteModification(path, data));
829         batched.setReady(ready);
830         batched.setDoCommitOnReady(doCommitOnReady);
831         batched.setTotalMessagesSent(messagesSent);
832         return batched;
833     }
834
835     @Test
836     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
837         new ShardTestKit(getSystem()) {{
838             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
839                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
840                     "testBatchedModificationsWithNoCommitOnReady");
841
842             waitUntilLeader(shard);
843
844             final String transactionID = "tx";
845             final FiniteDuration duration = duration("5 seconds");
846
847             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
848             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
849                 @Override
850                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
851                     if(mockCohort.get() == null) {
852                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
853                     }
854
855                     return mockCohort.get();
856                 }
857             };
858
859             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
860
861             // Send a BatchedModifications to start a transaction.
862
863             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
864                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
865             expectMsgClass(duration, BatchedModificationsReply.class);
866
867             // Send a couple more BatchedModifications.
868
869             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
870                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
871             expectMsgClass(duration, BatchedModificationsReply.class);
872
873             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
874                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
875                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
876             expectMsgClass(duration, ReadyTransactionReply.class);
877
878             // Send the CanCommitTransaction message.
879
880             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
881             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
882                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
883             assertEquals("Can commit", true, canCommitReply.getCanCommit());
884
885             // Send the CanCommitTransaction message.
886
887             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
888             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
889
890             final InOrder inOrder = inOrder(mockCohort.get());
891             inOrder.verify(mockCohort.get()).canCommit();
892             inOrder.verify(mockCohort.get()).preCommit();
893             inOrder.verify(mockCohort.get()).commit();
894
895             // Verify data in the data store.
896
897             verifyOuterListEntry(shard, 1);
898
899             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
900         }};
901     }
902
903     @Test
904     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
905         new ShardTestKit(getSystem()) {{
906             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
907                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
908                     "testBatchedModificationsWithCommitOnReady");
909
910             waitUntilLeader(shard);
911
912             final String transactionID = "tx";
913             final FiniteDuration duration = duration("5 seconds");
914
915             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
916             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
917                 @Override
918                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
919                     if(mockCohort.get() == null) {
920                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
921                     }
922
923                     return mockCohort.get();
924                 }
925             };
926
927             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
928
929             // Send a BatchedModifications to start a transaction.
930
931             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
932                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
933             expectMsgClass(duration, BatchedModificationsReply.class);
934
935             // Send a couple more BatchedModifications.
936
937             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
938                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
939             expectMsgClass(duration, BatchedModificationsReply.class);
940
941             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
942                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
943                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
944
945             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
946
947             final InOrder inOrder = inOrder(mockCohort.get());
948             inOrder.verify(mockCohort.get()).canCommit();
949             inOrder.verify(mockCohort.get()).preCommit();
950             inOrder.verify(mockCohort.get()).commit();
951
952             // Verify data in the data store.
953
954             verifyOuterListEntry(shard, 1);
955
956             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
957         }};
958     }
959
960     @Test(expected=IllegalStateException.class)
961     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
962         new ShardTestKit(getSystem()) {{
963             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
964                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
965                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
966
967             waitUntilLeader(shard);
968
969             final String transactionID = "tx1";
970             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
971             batched.setReady(true);
972             batched.setTotalMessagesSent(2);
973
974             shard.tell(batched, getRef());
975
976             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
977
978             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
979
980             if(failure != null) {
981                 throw failure.cause();
982             }
983         }};
984     }
985
986     @Test
987     public void testBatchedModificationsWithOperationFailure() throws Throwable {
988         new ShardTestKit(getSystem()) {{
989             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
990                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
991                     "testBatchedModificationsWithOperationFailure");
992
993             waitUntilLeader(shard);
994
995             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
996             // write will not validate the children for performance reasons.
997
998             String transactionID = "tx1";
999
1000             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1001                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1002                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1003
1004             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1005             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1006             shard.tell(batched, getRef());
1007             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1008
1009             Throwable cause = failure.cause();
1010
1011             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1012             batched.setReady(true);
1013             batched.setTotalMessagesSent(2);
1014
1015             shard.tell(batched, getRef());
1016
1017             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1018             assertEquals("Failure cause", cause, failure.cause());
1019
1020             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1021         }};
1022     }
1023
1024     @SuppressWarnings("unchecked")
1025     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1026         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1027         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1028         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1029                 outerList.getValue() instanceof Iterable);
1030         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1031         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1032                 entry instanceof MapEntryNode);
1033         final MapEntryNode mapEntry = (MapEntryNode)entry;
1034         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1035                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1036         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1037         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1038     }
1039
1040     @Test
1041     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1042         new ShardTestKit(getSystem()) {{
1043             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1044                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1045                     "testBatchedModificationsOnTransactionChain");
1046
1047             waitUntilLeader(shard);
1048
1049             final String transactionChainID = "txChain";
1050             final String transactionID1 = "tx1";
1051             final String transactionID2 = "tx2";
1052
1053             final FiniteDuration duration = duration("5 seconds");
1054
1055             // Send a BatchedModifications to start a chained write transaction and ready it.
1056
1057             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1058             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1059             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1060                     containerNode, true, false, 1), getRef());
1061             expectMsgClass(duration, ReadyTransactionReply.class);
1062
1063             // Create a read Tx on the same chain.
1064
1065             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1066                     transactionChainID).toSerializable(), getRef());
1067
1068             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1069
1070             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1071             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1072             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1073
1074             // Commit the write transaction.
1075
1076             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1077             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1078                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1079             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1080
1081             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1082             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1083
1084             // Verify data in the data store.
1085
1086             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1087             assertEquals("Stored node", containerNode, actualNode);
1088
1089             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1090         }};
1091     }
1092
1093     @Test
1094     public void testOnBatchedModificationsWhenNotLeader() {
1095         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1096         new ShardTestKit(getSystem()) {{
1097             final Creator<Shard> creator = new Creator<Shard>() {
1098                 private static final long serialVersionUID = 1L;
1099
1100                 @Override
1101                 public Shard create() throws Exception {
1102                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1103                             newDatastoreContext(), SCHEMA_CONTEXT) {
1104                         @Override
1105                         protected boolean isLeader() {
1106                             return overrideLeaderCalls.get() ? false : super.isLeader();
1107                         }
1108
1109                         @Override
1110                         protected ActorSelection getLeader() {
1111                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1112                                 super.getLeader();
1113                         }
1114                     };
1115                 }
1116             };
1117
1118             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1119                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1120
1121             waitUntilLeader(shard);
1122
1123             overrideLeaderCalls.set(true);
1124
1125             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1126
1127             shard.tell(batched, ActorRef.noSender());
1128
1129             expectMsgEquals(batched);
1130
1131             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1132         }};
1133     }
1134
1135     @Test
1136     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1137         new ShardTestKit(getSystem()) {{
1138             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1139                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1140                     "testForwardedReadyTransactionWithImmediateCommit");
1141
1142             waitUntilLeader(shard);
1143
1144             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1145
1146             final String transactionID = "tx1";
1147             final MutableCompositeModification modification = new MutableCompositeModification();
1148             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1149             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1150                     TestModel.TEST_PATH, containerNode, modification);
1151
1152             final FiniteDuration duration = duration("5 seconds");
1153
1154             // Simulate the ForwardedReadyTransaction messages that would be sent
1155             // by the ShardTransaction.
1156
1157             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1158                     cohort, modification, true, true), getRef());
1159
1160             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1161
1162             final InOrder inOrder = inOrder(cohort);
1163             inOrder.verify(cohort).canCommit();
1164             inOrder.verify(cohort).preCommit();
1165             inOrder.verify(cohort).commit();
1166
1167             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1168             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1169
1170             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1171         }};
1172     }
1173
1174     @Test
1175     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1176         new ShardTestKit(getSystem()) {{
1177             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1178                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1179                     "testReadyLocalTransactionWithImmediateCommit");
1180
1181             waitUntilLeader(shard);
1182
1183             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1184
1185             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1186
1187             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1188             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1189             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1190             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1191
1192             final String txId = "tx1";
1193             modification.ready();
1194             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1195
1196             shard.tell(readyMessage, getRef());
1197
1198             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1199
1200             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1201             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1202
1203             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1204         }};
1205     }
1206
1207     @Test
1208     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1209         new ShardTestKit(getSystem()) {{
1210             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1211                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1212                     "testReadyLocalTransactionWithThreePhaseCommit");
1213
1214             waitUntilLeader(shard);
1215
1216             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1217
1218             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1219
1220             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1221             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1222             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1223             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1224
1225             final String txId = "tx1";
1226                 modification.ready();
1227             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1228
1229             shard.tell(readyMessage, getRef());
1230
1231             expectMsgClass(ReadyTransactionReply.class);
1232
1233             // Send the CanCommitTransaction message.
1234
1235             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1236             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1237                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1238             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1239
1240             // Send the CanCommitTransaction message.
1241
1242             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1243             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1244
1245             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1246             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1247
1248             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1249         }};
1250     }
1251
1252     @Test
1253     public void testCommitWithPersistenceDisabled() throws Throwable {
1254         dataStoreContextBuilder.persistent(false);
1255         new ShardTestKit(getSystem()) {{
1256             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1257                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1258                     "testCommitWithPersistenceDisabled");
1259
1260             waitUntilLeader(shard);
1261
1262             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1263
1264             // Setup a simulated transactions with a mock cohort.
1265
1266             final String transactionID = "tx";
1267             final MutableCompositeModification modification = new MutableCompositeModification();
1268             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1269             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1270                 TestModel.TEST_PATH, containerNode, modification);
1271
1272             final FiniteDuration duration = duration("5 seconds");
1273
1274             // Simulate the ForwardedReadyTransaction messages that would be sent
1275             // by the ShardTransaction.
1276
1277             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1278                 cohort, modification, true, false), getRef());
1279             expectMsgClass(duration, ReadyTransactionReply.class);
1280
1281             // Send the CanCommitTransaction message.
1282
1283             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1284             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1285                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1286             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1287
1288             // Send the CanCommitTransaction message.
1289
1290             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1291             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1292
1293             final InOrder inOrder = inOrder(cohort);
1294             inOrder.verify(cohort).canCommit();
1295             inOrder.verify(cohort).preCommit();
1296             inOrder.verify(cohort).commit();
1297
1298             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1299             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1300
1301             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1302         }};
1303     }
1304
1305     private static DataTreeCandidateTip mockCandidate(final String name) {
1306         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1307         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1308         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1309         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1310         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1311         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1312         return mockCandidate;
1313     }
1314
1315     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1316         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1317         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1318         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1319         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1320         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1321         return mockCandidate;
1322     }
1323
1324     @Test
1325     public void testCommitWhenTransactionHasNoModifications(){
1326         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1327         // but here that need not happen
1328         new ShardTestKit(getSystem()) {
1329             {
1330                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1331                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1332                         "testCommitWhenTransactionHasNoModifications");
1333
1334                 waitUntilLeader(shard);
1335
1336                 final String transactionID = "tx1";
1337                 final MutableCompositeModification modification = new MutableCompositeModification();
1338                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1339                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1340                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1341                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1342                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1343
1344                 final FiniteDuration duration = duration("5 seconds");
1345
1346                 // Simulate the ForwardedReadyTransaction messages that would be sent
1347                 // by the ShardTransaction.
1348
1349                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1350                         cohort, modification, true, false), getRef());
1351                 expectMsgClass(duration, ReadyTransactionReply.class);
1352
1353                 // Send the CanCommitTransaction message.
1354
1355                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1356                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1357                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1358                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1359
1360                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1361                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1362
1363                 final InOrder inOrder = inOrder(cohort);
1364                 inOrder.verify(cohort).canCommit();
1365                 inOrder.verify(cohort).preCommit();
1366                 inOrder.verify(cohort).commit();
1367
1368                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1369                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1370
1371                 // Use MBean for verification
1372                 // Committed transaction count should increase as usual
1373                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1374
1375                 // Commit index should not advance because this does not go into the journal
1376                 assertEquals(-1, shardStats.getCommitIndex());
1377
1378                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1379
1380             }
1381         };
1382     }
1383
1384     @Test
1385     public void testCommitWhenTransactionHasModifications(){
1386         new ShardTestKit(getSystem()) {
1387             {
1388                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1389                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1390                         "testCommitWhenTransactionHasModifications");
1391
1392                 waitUntilLeader(shard);
1393
1394                 final String transactionID = "tx1";
1395                 final MutableCompositeModification modification = new MutableCompositeModification();
1396                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1397                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1398                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1399                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1400                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1401                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1402
1403                 final FiniteDuration duration = duration("5 seconds");
1404
1405                 // Simulate the ForwardedReadyTransaction messages that would be sent
1406                 // by the ShardTransaction.
1407
1408                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1409                         cohort, modification, true, false), getRef());
1410                 expectMsgClass(duration, ReadyTransactionReply.class);
1411
1412                 // Send the CanCommitTransaction message.
1413
1414                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1415                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1416                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1417                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1418
1419                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1420                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1421
1422                 final InOrder inOrder = inOrder(cohort);
1423                 inOrder.verify(cohort).canCommit();
1424                 inOrder.verify(cohort).preCommit();
1425                 inOrder.verify(cohort).commit();
1426
1427                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1428                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1429
1430                 // Use MBean for verification
1431                 // Committed transaction count should increase as usual
1432                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1433
1434                 // Commit index should advance as we do not have an empty modification
1435                 assertEquals(0, shardStats.getCommitIndex());
1436
1437                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1438
1439             }
1440         };
1441     }
1442
1443     @Test
1444     public void testCommitPhaseFailure() throws Throwable {
1445         new ShardTestKit(getSystem()) {{
1446             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1447                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1448                     "testCommitPhaseFailure");
1449
1450             waitUntilLeader(shard);
1451
1452             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1453             // commit phase.
1454
1455             final String transactionID1 = "tx1";
1456             final MutableCompositeModification modification1 = new MutableCompositeModification();
1457             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1458             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1459             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1460             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1461             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1462
1463             final String transactionID2 = "tx2";
1464             final MutableCompositeModification modification2 = new MutableCompositeModification();
1465             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1466             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1467
1468             final FiniteDuration duration = duration("5 seconds");
1469             final Timeout timeout = new Timeout(duration);
1470
1471             // Simulate the ForwardedReadyTransaction messages that would be sent
1472             // by the ShardTransaction.
1473
1474             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1475                     cohort1, modification1, true, false), getRef());
1476             expectMsgClass(duration, ReadyTransactionReply.class);
1477
1478             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1479                     cohort2, modification2, true, false), getRef());
1480             expectMsgClass(duration, ReadyTransactionReply.class);
1481
1482             // Send the CanCommitTransaction message for the first Tx.
1483
1484             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1485             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1486                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1487             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1488
1489             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1490             // processed after the first Tx completes.
1491
1492             final Future<Object> canCommitFuture = Patterns.ask(shard,
1493                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1494
1495             // Send the CommitTransaction message for the first Tx. This should send back an error
1496             // and trigger the 2nd Tx to proceed.
1497
1498             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1499             expectMsgClass(duration, akka.actor.Status.Failure.class);
1500
1501             // Wait for the 2nd Tx to complete the canCommit phase.
1502
1503             final CountDownLatch latch = new CountDownLatch(1);
1504             canCommitFuture.onComplete(new OnComplete<Object>() {
1505                 @Override
1506                 public void onComplete(final Throwable t, final Object resp) {
1507                     latch.countDown();
1508                 }
1509             }, getSystem().dispatcher());
1510
1511             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1512
1513             final InOrder inOrder = inOrder(cohort1, cohort2);
1514             inOrder.verify(cohort1).canCommit();
1515             inOrder.verify(cohort1).preCommit();
1516             inOrder.verify(cohort1).commit();
1517             inOrder.verify(cohort2).canCommit();
1518
1519             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1520         }};
1521     }
1522
1523     @Test
1524     public void testPreCommitPhaseFailure() throws Throwable {
1525         new ShardTestKit(getSystem()) {{
1526             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1527                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1528                     "testPreCommitPhaseFailure");
1529
1530             waitUntilLeader(shard);
1531
1532             final String transactionID1 = "tx1";
1533             final MutableCompositeModification modification1 = new MutableCompositeModification();
1534             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1535             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1536             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1537
1538             final String transactionID2 = "tx2";
1539             final MutableCompositeModification modification2 = new MutableCompositeModification();
1540             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1541             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1542
1543             final FiniteDuration duration = duration("5 seconds");
1544             final Timeout timeout = new Timeout(duration);
1545
1546             // Simulate the ForwardedReadyTransaction messages that would be sent
1547             // by the ShardTransaction.
1548
1549             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1550                     cohort1, modification1, true, false), getRef());
1551             expectMsgClass(duration, ReadyTransactionReply.class);
1552
1553             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1554                     cohort2, modification2, true, false), getRef());
1555             expectMsgClass(duration, ReadyTransactionReply.class);
1556
1557             // Send the CanCommitTransaction message for the first Tx.
1558
1559             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1560             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1561                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1562             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1563
1564             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1565             // processed after the first Tx completes.
1566
1567             final Future<Object> canCommitFuture = Patterns.ask(shard,
1568                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1569
1570             // Send the CommitTransaction message for the first Tx. This should send back an error
1571             // and trigger the 2nd Tx to proceed.
1572
1573             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1574             expectMsgClass(duration, akka.actor.Status.Failure.class);
1575
1576             // Wait for the 2nd Tx to complete the canCommit phase.
1577
1578             final CountDownLatch latch = new CountDownLatch(1);
1579             canCommitFuture.onComplete(new OnComplete<Object>() {
1580                 @Override
1581                 public void onComplete(final Throwable t, final Object resp) {
1582                     latch.countDown();
1583                 }
1584             }, getSystem().dispatcher());
1585
1586             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1587
1588             final InOrder inOrder = inOrder(cohort1, cohort2);
1589             inOrder.verify(cohort1).canCommit();
1590             inOrder.verify(cohort1).preCommit();
1591             inOrder.verify(cohort2).canCommit();
1592
1593             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1594         }};
1595     }
1596
1597     @Test
1598     public void testCanCommitPhaseFailure() throws Throwable {
1599         new ShardTestKit(getSystem()) {{
1600             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1601                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1602                     "testCanCommitPhaseFailure");
1603
1604             waitUntilLeader(shard);
1605
1606             final FiniteDuration duration = duration("5 seconds");
1607
1608             final String transactionID1 = "tx1";
1609             final MutableCompositeModification modification = new MutableCompositeModification();
1610             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1611             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1612
1613             // Simulate the ForwardedReadyTransaction messages that would be sent
1614             // by the ShardTransaction.
1615
1616             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1617                     cohort, modification, true, false), getRef());
1618             expectMsgClass(duration, ReadyTransactionReply.class);
1619
1620             // Send the CanCommitTransaction message.
1621
1622             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1623             expectMsgClass(duration, akka.actor.Status.Failure.class);
1624
1625             // Send another can commit to ensure the failed one got cleaned up.
1626
1627             reset(cohort);
1628
1629             final String transactionID2 = "tx2";
1630             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1631
1632             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1633                     cohort, modification, true, false), getRef());
1634             expectMsgClass(duration, ReadyTransactionReply.class);
1635
1636             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1637             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1638                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1639             assertEquals("getCanCommit", true, reply.getCanCommit());
1640
1641             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1642         }};
1643     }
1644
1645     @Test
1646     public void testCanCommitPhaseFalseResponse() throws Throwable {
1647         new ShardTestKit(getSystem()) {{
1648             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1649                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1650                     "testCanCommitPhaseFalseResponse");
1651
1652             waitUntilLeader(shard);
1653
1654             final FiniteDuration duration = duration("5 seconds");
1655
1656             final String transactionID1 = "tx1";
1657             final MutableCompositeModification modification = new MutableCompositeModification();
1658             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1659             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1660
1661             // Simulate the ForwardedReadyTransaction messages that would be sent
1662             // by the ShardTransaction.
1663
1664             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1665                     cohort, modification, true, false), getRef());
1666             expectMsgClass(duration, ReadyTransactionReply.class);
1667
1668             // Send the CanCommitTransaction message.
1669
1670             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1671             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1672                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1673             assertEquals("getCanCommit", false, reply.getCanCommit());
1674
1675             // Send another can commit to ensure the failed one got cleaned up.
1676
1677             reset(cohort);
1678
1679             final String transactionID2 = "tx2";
1680             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1681
1682             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1683                 cohort, modification, true, false), getRef());
1684             expectMsgClass(duration, ReadyTransactionReply.class);
1685
1686             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1687             reply = CanCommitTransactionReply.fromSerializable(
1688                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1689             assertEquals("getCanCommit", true, reply.getCanCommit());
1690
1691             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1692         }};
1693     }
1694
1695     @Test
1696     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1697         new ShardTestKit(getSystem()) {{
1698             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1699                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1700                     "testImmediateCommitWithCanCommitPhaseFailure");
1701
1702             waitUntilLeader(shard);
1703
1704             final FiniteDuration duration = duration("5 seconds");
1705
1706             final String transactionID1 = "tx1";
1707             final MutableCompositeModification modification = new MutableCompositeModification();
1708             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1709             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1710
1711             // Simulate the ForwardedReadyTransaction messages that would be sent
1712             // by the ShardTransaction.
1713
1714             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1715                     cohort, modification, true, true), getRef());
1716
1717             expectMsgClass(duration, akka.actor.Status.Failure.class);
1718
1719             // Send another can commit to ensure the failed one got cleaned up.
1720
1721             reset(cohort);
1722
1723             final String transactionID2 = "tx2";
1724             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1725             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1726             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1727             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1728             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1729             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1730             doReturn(candidateRoot).when(candidate).getRootNode();
1731             doReturn(candidate).when(cohort).getCandidate();
1732
1733             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1734                     cohort, modification, true, true), getRef());
1735
1736             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1737
1738             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1739         }};
1740     }
1741
1742     @Test
1743     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1744         new ShardTestKit(getSystem()) {{
1745             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1746                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1747                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1748
1749             waitUntilLeader(shard);
1750
1751             final FiniteDuration duration = duration("5 seconds");
1752
1753             final String transactionID = "tx1";
1754             final MutableCompositeModification modification = new MutableCompositeModification();
1755             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1756             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1757
1758             // Simulate the ForwardedReadyTransaction messages that would be sent
1759             // by the ShardTransaction.
1760
1761             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1762                     cohort, modification, true, true), getRef());
1763
1764             expectMsgClass(duration, akka.actor.Status.Failure.class);
1765
1766             // Send another can commit to ensure the failed one got cleaned up.
1767
1768             reset(cohort);
1769
1770             final String transactionID2 = "tx2";
1771             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1772             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1773             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1774             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1775             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1776             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1777             doReturn(candidateRoot).when(candidate).getRootNode();
1778             doReturn(candidate).when(cohort).getCandidate();
1779
1780             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1781                     cohort, modification, true, true), getRef());
1782
1783             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1784
1785             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1786         }};
1787     }
1788
1789     @Test
1790     public void testAbortBeforeFinishCommit() throws Throwable {
1791         new ShardTestKit(getSystem()) {{
1792             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1793                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1794                     "testAbortBeforeFinishCommit");
1795
1796             waitUntilLeader(shard);
1797
1798             final FiniteDuration duration = duration("5 seconds");
1799             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1800
1801             final String transactionID = "tx1";
1802             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1803                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1804                 @Override
1805                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1806                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1807
1808                     // Simulate an AbortTransaction message occurring during replication, after
1809                     // persisting and before finishing the commit to the in-memory store.
1810                     // We have no followers so due to optimizations in the RaftActor, it does not
1811                     // attempt replication and thus we can't send an AbortTransaction message b/c
1812                     // it would be processed too late after CommitTransaction completes. So we'll
1813                     // simulate an AbortTransaction message occurring during replication by calling
1814                     // the shard directly.
1815                     //
1816                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1817
1818                     return preCommitFuture;
1819                 }
1820             };
1821
1822             final MutableCompositeModification modification = new MutableCompositeModification();
1823             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1824                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1825                     modification, preCommit);
1826
1827             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1828                 cohort, modification, true, false), getRef());
1829             expectMsgClass(duration, ReadyTransactionReply.class);
1830
1831             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1832             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1833                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1834             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1835
1836             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1837             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1838
1839             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1840
1841             // Since we're simulating an abort occurring during replication and before finish commit,
1842             // the data should still get written to the in-memory store since we've gotten past
1843             // canCommit and preCommit and persisted the data.
1844             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1845
1846             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1847         }};
1848     }
1849
1850     @Test
1851     public void testTransactionCommitTimeout() throws Throwable {
1852         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1853
1854         new ShardTestKit(getSystem()) {{
1855             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1856                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1857                     "testTransactionCommitTimeout");
1858
1859             waitUntilLeader(shard);
1860
1861             final FiniteDuration duration = duration("5 seconds");
1862
1863             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1864
1865             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1866             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1867                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1868
1869             // Create 1st Tx - will timeout
1870
1871             final String transactionID1 = "tx1";
1872             final MutableCompositeModification modification1 = new MutableCompositeModification();
1873             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1874                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1875                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1876                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1877                     modification1);
1878
1879             // Create 2nd Tx
1880
1881             final String transactionID2 = "tx3";
1882             final MutableCompositeModification modification2 = new MutableCompositeModification();
1883             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1884                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1885             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1886                     listNodePath,
1887                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1888                     modification2);
1889
1890             // Ready the Tx's
1891
1892             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1893                     cohort1, modification1, true, false), getRef());
1894             expectMsgClass(duration, ReadyTransactionReply.class);
1895
1896             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1897                     cohort2, modification2, true, false), getRef());
1898             expectMsgClass(duration, ReadyTransactionReply.class);
1899
1900             // canCommit 1st Tx. We don't send the commit so it should timeout.
1901
1902             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1903             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1904
1905             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1906
1907             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1908             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1909
1910             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1911
1912             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1913             expectMsgClass(duration, akka.actor.Status.Failure.class);
1914
1915             // Commit the 2nd Tx.
1916
1917             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1918             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1919
1920             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1921             assertNotNull(listNodePath + " not found", node);
1922
1923             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1924         }};
1925     }
1926
1927     @Test
1928     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1929         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1930
1931         new ShardTestKit(getSystem()) {{
1932             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1933                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1934                     "testTransactionCommitQueueCapacityExceeded");
1935
1936             waitUntilLeader(shard);
1937
1938             final FiniteDuration duration = duration("5 seconds");
1939
1940             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1941
1942             final String transactionID1 = "tx1";
1943             final MutableCompositeModification modification1 = new MutableCompositeModification();
1944             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1945                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1946
1947             final String transactionID2 = "tx2";
1948             final MutableCompositeModification modification2 = new MutableCompositeModification();
1949             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1950                     TestModel.OUTER_LIST_PATH,
1951                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1952                     modification2);
1953
1954             final String transactionID3 = "tx3";
1955             final MutableCompositeModification modification3 = new MutableCompositeModification();
1956             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1957                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1958
1959             // Ready the Tx's
1960
1961             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1962                     cohort1, modification1, true, false), getRef());
1963             expectMsgClass(duration, ReadyTransactionReply.class);
1964
1965             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1966                     cohort2, modification2, true, false), getRef());
1967             expectMsgClass(duration, ReadyTransactionReply.class);
1968
1969             // The 3rd Tx should exceed queue capacity and fail.
1970
1971             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1972                     cohort3, modification3, true, false), getRef());
1973             expectMsgClass(duration, akka.actor.Status.Failure.class);
1974
1975             // canCommit 1st Tx.
1976
1977             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1978             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1979
1980             // canCommit the 2nd Tx - it should get queued.
1981
1982             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1983
1984             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1985
1986             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1987             expectMsgClass(duration, akka.actor.Status.Failure.class);
1988
1989             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1990         }};
1991     }
1992
1993     @Test
1994     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1995         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1996
1997         new ShardTestKit(getSystem()) {{
1998             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1999                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2000                     "testTransactionCommitWithPriorExpiredCohortEntries");
2001
2002             waitUntilLeader(shard);
2003
2004             final FiniteDuration duration = duration("5 seconds");
2005
2006             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2007
2008             final String transactionID1 = "tx1";
2009             final MutableCompositeModification modification1 = new MutableCompositeModification();
2010             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2011                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2012
2013             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2014                 cohort1, modification1, true, false), getRef());
2015             expectMsgClass(duration, ReadyTransactionReply.class);
2016
2017             final String transactionID2 = "tx2";
2018             final MutableCompositeModification modification2 = new MutableCompositeModification();
2019             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2020                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2021
2022             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2023                     cohort2, modification2, true, false), getRef());
2024             expectMsgClass(duration, ReadyTransactionReply.class);
2025
2026             final String transactionID3 = "tx3";
2027             final MutableCompositeModification modification3 = new MutableCompositeModification();
2028             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2029                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2030
2031             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2032                 cohort3, modification3, true, false), getRef());
2033             expectMsgClass(duration, ReadyTransactionReply.class);
2034
2035             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2036             // should expire from the queue and the last one should be processed.
2037
2038             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2039             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2040
2041             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2042         }};
2043     }
2044
2045     @Test
2046     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2047         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2048
2049         new ShardTestKit(getSystem()) {{
2050             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2051                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2052                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2053
2054             waitUntilLeader(shard);
2055
2056             final FiniteDuration duration = duration("5 seconds");
2057
2058             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2059
2060             final String transactionID1 = "tx1";
2061             final MutableCompositeModification modification1 = new MutableCompositeModification();
2062             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2063                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2064
2065             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2066                     cohort1, modification1, true, false), getRef());
2067             expectMsgClass(duration, ReadyTransactionReply.class);
2068
2069             // CanCommit the first one so it's the current in-progress CohortEntry.
2070
2071             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2072             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2073
2074             // Ready the second Tx.
2075
2076             final String transactionID2 = "tx2";
2077             final MutableCompositeModification modification2 = new MutableCompositeModification();
2078             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2079                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2080
2081             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2082                     cohort2, modification2, true, false), getRef());
2083             expectMsgClass(duration, ReadyTransactionReply.class);
2084
2085             // Ready the third Tx.
2086
2087             final String transactionID3 = "tx3";
2088             final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2089             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2090                     .apply(modification3);
2091                 modification3.ready();
2092             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2093
2094             shard.tell(readyMessage, getRef());
2095
2096             // Commit the first Tx. After completing, the second should expire from the queue and the third
2097             // Tx committed.
2098
2099             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2100             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2101
2102             // Expect commit reply from the third Tx.
2103
2104             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2105
2106             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2107             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2108
2109             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2110         }};
2111     }
2112
2113     @Test
2114     public void testCanCommitBeforeReadyFailure() throws Throwable {
2115         new ShardTestKit(getSystem()) {{
2116             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2117                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2118                     "testCanCommitBeforeReadyFailure");
2119
2120             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2121             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2122
2123             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2124         }};
2125     }
2126
2127     @Test
2128     public void testAbortCurrentTransaction() throws Throwable {
2129         new ShardTestKit(getSystem()) {{
2130             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2131                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2132                     "testAbortCurrentTransaction");
2133
2134             waitUntilLeader(shard);
2135
2136             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2137
2138             final String transactionID1 = "tx1";
2139             final MutableCompositeModification modification1 = new MutableCompositeModification();
2140             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2141             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2142             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2143
2144             final String transactionID2 = "tx2";
2145             final MutableCompositeModification modification2 = new MutableCompositeModification();
2146             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2147             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2148
2149             final FiniteDuration duration = duration("5 seconds");
2150             final Timeout timeout = new Timeout(duration);
2151
2152             // Simulate the ForwardedReadyTransaction messages that would be sent
2153             // by the ShardTransaction.
2154
2155             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2156                     cohort1, modification1, true, false), getRef());
2157             expectMsgClass(duration, ReadyTransactionReply.class);
2158
2159             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2160                     cohort2, modification2, true, false), getRef());
2161             expectMsgClass(duration, ReadyTransactionReply.class);
2162
2163             // Send the CanCommitTransaction message for the first Tx.
2164
2165             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2166             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2167                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2168             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2169
2170             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2171             // processed after the first Tx completes.
2172
2173             final Future<Object> canCommitFuture = Patterns.ask(shard,
2174                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2175
2176             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2177             // Tx to proceed.
2178
2179             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2180             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2181
2182             // Wait for the 2nd Tx to complete the canCommit phase.
2183
2184             Await.ready(canCommitFuture, duration);
2185
2186             final InOrder inOrder = inOrder(cohort1, cohort2);
2187             inOrder.verify(cohort1).canCommit();
2188             inOrder.verify(cohort2).canCommit();
2189
2190             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2191         }};
2192     }
2193
2194     @Test
2195     public void testAbortQueuedTransaction() throws Throwable {
2196         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2197         new ShardTestKit(getSystem()) {{
2198             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2199             @SuppressWarnings("serial")
2200             final Creator<Shard> creator = new Creator<Shard>() {
2201                 @Override
2202                 public Shard create() throws Exception {
2203                     return new Shard(shardID, Collections.<String,String>emptyMap(),
2204                             dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
2205                         @Override
2206                         public void onReceiveCommand(final Object message) throws Exception {
2207                             super.onReceiveCommand(message);
2208                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2209                                 if(cleaupCheckLatch.get() != null) {
2210                                     cleaupCheckLatch.get().countDown();
2211                                 }
2212                             }
2213                         }
2214                     };
2215                 }
2216             };
2217
2218             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2219                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2220                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2221
2222             waitUntilLeader(shard);
2223
2224             final String transactionID = "tx1";
2225
2226             final MutableCompositeModification modification = new MutableCompositeModification();
2227             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2228             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2229
2230             final FiniteDuration duration = duration("5 seconds");
2231
2232             // Ready the tx.
2233
2234             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2235                     cohort, modification, true, false), getRef());
2236             expectMsgClass(duration, ReadyTransactionReply.class);
2237
2238             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2239
2240             // Send the AbortTransaction message.
2241
2242             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2243             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2244
2245             verify(cohort).abort();
2246
2247             // Verify the tx cohort is removed from queue at the cleanup check interval.
2248
2249             cleaupCheckLatch.set(new CountDownLatch(1));
2250             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2251                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2252
2253             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2254
2255             // Now send CanCommitTransaction - should fail.
2256
2257             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2258
2259             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2260             assertTrue("Failure type", failure instanceof IllegalStateException);
2261
2262             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2263         }};
2264     }
2265
2266     @Test
2267     public void testCreateSnapshot() throws Exception {
2268         testCreateSnapshot(true, "testCreateSnapshot");
2269     }
2270
2271     @Test
2272     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2273         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2274     }
2275
2276     @SuppressWarnings("serial")
2277     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2278
2279         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2280
2281         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2282         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2283             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2284                 super(delegate);
2285             }
2286
2287             @Override
2288             public void saveSnapshot(final Object o) {
2289                 savedSnapshot.set(o);
2290                 super.saveSnapshot(o);
2291             }
2292         }
2293
2294         dataStoreContextBuilder.persistent(persistent);
2295
2296         new ShardTestKit(getSystem()) {{
2297             class TestShard extends Shard {
2298
2299                 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2300                                     final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2301                     super(name, peerAddresses, datastoreContext, schemaContext);
2302                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2303                 }
2304
2305                 @Override
2306                 public void handleCommand(final Object message) {
2307                     super.handleCommand(message);
2308
2309                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2310                         latch.get().countDown();
2311                     }
2312                 }
2313
2314                 @Override
2315                 public RaftActorContext getRaftActorContext() {
2316                     return super.getRaftActorContext();
2317                 }
2318             }
2319
2320             final Creator<Shard> creator = new Creator<Shard>() {
2321                 @Override
2322                 public Shard create() throws Exception {
2323                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2324                             newDatastoreContext(), SCHEMA_CONTEXT);
2325                 }
2326             };
2327
2328             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2329                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2330
2331             waitUntilLeader(shard);
2332
2333             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2334
2335             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2336
2337             // Trigger creation of a snapshot by ensuring
2338             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2339             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2340
2341             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2342
2343             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2344                     savedSnapshot.get() instanceof Snapshot);
2345
2346             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2347
2348             latch.set(new CountDownLatch(1));
2349             savedSnapshot.set(null);
2350
2351             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2352
2353             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2354
2355             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2356                     savedSnapshot.get() instanceof Snapshot);
2357
2358             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2359
2360             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2361         }
2362
2363         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2364
2365             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2366             assertEquals("Root node", expectedRoot, actual);
2367
2368         }};
2369     }
2370
2371     /**
2372      * This test simply verifies that the applySnapShot logic will work
2373      * @throws ReadFailedException
2374      * @throws DataValidationFailedException
2375      */
2376     @Test
2377     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2378         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2379         store.setSchemaContext(SCHEMA_CONTEXT);
2380
2381         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2382         putTransaction.write(TestModel.TEST_PATH,
2383             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2384         commitTransaction(store, putTransaction);
2385
2386
2387         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2388
2389         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2390
2391         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2392         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2393
2394         commitTransaction(store, writeTransaction);
2395
2396         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2397
2398         assertEquals(expected, actual);
2399     }
2400
2401     @Test
2402     public void testRecoveryApplicable(){
2403
2404         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2405                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2406
2407         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2408                 persistentContext, SCHEMA_CONTEXT);
2409
2410         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2411                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2412
2413         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2414             nonPersistentContext, SCHEMA_CONTEXT);
2415
2416         new ShardTestKit(getSystem()) {{
2417             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2418                     persistentProps, "testPersistence1");
2419
2420             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2421
2422             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2423
2424             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2425                     nonPersistentProps, "testPersistence2");
2426
2427             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2428
2429             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2430
2431         }};
2432
2433     }
2434
2435     @Test
2436     public void testOnDatastoreContext() {
2437         new ShardTestKit(getSystem()) {{
2438             dataStoreContextBuilder.persistent(true);
2439
2440             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2441
2442             assertEquals("isRecoveryApplicable", true,
2443                     shard.underlyingActor().persistence().isRecoveryApplicable());
2444
2445             waitUntilLeader(shard);
2446
2447             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2448
2449             assertEquals("isRecoveryApplicable", false,
2450                 shard.underlyingActor().persistence().isRecoveryApplicable());
2451
2452             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2453
2454             assertEquals("isRecoveryApplicable", true,
2455                 shard.underlyingActor().persistence().isRecoveryApplicable());
2456
2457             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2458         }};
2459     }
2460
2461     @Test
2462     public void testRegisterRoleChangeListener() throws Exception {
2463         new ShardTestKit(getSystem()) {
2464             {
2465                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2466                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2467                         "testRegisterRoleChangeListener");
2468
2469                 waitUntilLeader(shard);
2470
2471                 final TestActorRef<MessageCollectorActor> listener =
2472                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2473
2474                 shard.tell(new RegisterRoleChangeListener(), listener);
2475
2476                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2477
2478                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2479                     ShardLeaderStateChanged.class);
2480                 assertEquals("getLocalShardDataTree present", true,
2481                         leaderStateChanged.getLocalShardDataTree().isPresent());
2482                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2483                     leaderStateChanged.getLocalShardDataTree().get());
2484
2485                 MessageCollectorActor.clearMessages(listener);
2486
2487                 // Force a leader change
2488
2489                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2490
2491                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2492                         ShardLeaderStateChanged.class);
2493                 assertEquals("getLocalShardDataTree present", false,
2494                         leaderStateChanged.getLocalShardDataTree().isPresent());
2495
2496                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2497             }
2498         };
2499     }
2500
2501     @Test
2502     public void testFollowerInitialSyncStatus() throws Exception {
2503         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2504                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2505                 "testFollowerInitialSyncStatus");
2506
2507         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2508
2509         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2510
2511         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2512
2513         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2514
2515         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2516     }
2517
2518     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2519         modification.ready();
2520         store.validate(modification);
2521         store.commit(store.prepare(modification));
2522     }
2523
2524     @Test
2525     public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
2526         new ShardTestKit(getSystem()) {{
2527             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
2528             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
2529             final Creator<Shard> creator = new Creator<Shard>() {
2530                 boolean firstElectionTimeout = true;
2531
2532                 @Override
2533                 public Shard create() throws Exception {
2534                     return new Shard(shardID, Collections.<String,String>emptyMap(),
2535                         dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2536                         @Override
2537                         public void onReceiveCommand(final Object message) throws Exception {
2538                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
2539                                 firstElectionTimeout = false;
2540                                 final ActorRef self = getSelf();
2541                                 new Thread() {
2542                                     @Override
2543                                     public void run() {
2544                                         Uninterruptibles.awaitUninterruptibly(
2545                                             onChangeListenerRegistered, 5, TimeUnit.SECONDS);
2546                                         self.tell(message, self);
2547                                     }
2548                                 }.start();
2549
2550                                 onFirstElectionTimeout.countDown();
2551                             } else {
2552                                 super.onReceiveCommand(message);
2553                             }
2554                         }
2555                     };
2556                 }
2557             };
2558
2559             final MockDataChangeListener listener = new MockDataChangeListener(1);
2560             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2561                 "testDataChangeListenerOnFollower-DataChangeListener");
2562
2563             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2564                 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
2565                     withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
2566
2567             assertEquals("Got first ElectionTimeout", true,
2568                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
2569
2570             shard.tell(new FindLeader(), getRef());
2571             final FindLeaderReply findLeadeReply =
2572                 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2573             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
2574
2575             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2576
2577             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2578             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2579                 RegisterChangeListenerReply.class);
2580             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2581
2582             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2583
2584             onChangeListenerRegistered.countDown();
2585
2586             listener.waitForChangeEvents();
2587
2588             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2589             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2590         }};
2591     }
2592
2593     @Test
2594     public void testClusteredDataChangeListernerRegistration() throws Exception {
2595         new ShardTestKit(getSystem()) {{
2596             final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
2597                 .shardName("inventory").type("config").build();
2598
2599             final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
2600                 .shardName("inventory").type("config").build();
2601             final Creator<Shard> followerShardCreator = new Creator<Shard>() {
2602
2603                 @Override
2604                 public Shard create() throws Exception {
2605                     return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
2606                         "akka://test/user/" + member2ShardID.toString()),
2607                         dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2608                         @Override
2609                         public void onReceiveCommand(final Object message) throws Exception {
2610
2611                             if(!(message instanceof ElectionTimeout)) {
2612                                 super.onReceiveCommand(message);
2613                             }
2614                         }
2615                     };
2616                 }
2617             };
2618
2619             final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
2620
2621                 @Override
2622                 public Shard create() throws Exception {
2623                     return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
2624                         "akka://test/user/" + member1ShardID.toString()),
2625                         dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
2626                 }
2627             };
2628
2629
2630             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2631                 Props.create(new DelegatingShardCreator(followerShardCreator)),
2632                 member1ShardID.toString());
2633
2634             final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
2635                 Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
2636                 member2ShardID.toString());
2637             // Sleep to let election happen
2638             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
2639
2640             shard.tell(new FindLeader(), getRef());
2641             final FindLeaderReply findLeaderReply =
2642                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2643             assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
2644
2645             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2646             final MockDataChangeListener listener = new MockDataChangeListener(1);
2647             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2648                 "testDataChangeListenerOnFollower-DataChangeListener");
2649
2650             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2651             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2652                 RegisterChangeListenerReply.class);
2653             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2654
2655             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2656
2657             listener.waitForChangeEvents();
2658
2659             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2660             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2661         }};
2662     }
2663 }