Enabling Data Change Notifications for all nodes in cluster.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23
24 import akka.actor.ActorRef;
25 import akka.actor.ActorSelection;
26 import akka.actor.PoisonPill;
27 import akka.actor.Props;
28 import akka.actor.Status.Failure;
29 import akka.dispatch.Dispatchers;
30 import akka.dispatch.OnComplete;
31 import akka.japi.Creator;
32 import akka.pattern.Patterns;
33 import akka.persistence.SaveSnapshotSuccess;
34 import akka.testkit.TestActorRef;
35 import akka.util.Timeout;
36 import com.google.common.base.Function;
37 import com.google.common.base.Optional;
38 import com.google.common.util.concurrent.Futures;
39 import com.google.common.util.concurrent.ListenableFuture;
40 import com.google.common.util.concurrent.Uninterruptibles;
41 import java.io.IOException;
42 import java.util.Collections;
43 import java.util.HashSet;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicReference;
50 import org.junit.Test;
51 import org.mockito.InOrder;
52 import org.opendaylight.controller.cluster.DataPersistenceProvider;
53 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
64 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
75 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
78 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
79 import org.opendaylight.controller.cluster.datastore.modification.Modification;
80 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
81 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
82 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
83 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
84 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
85 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
87 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
88 import org.opendaylight.controller.cluster.raft.RaftActorContext;
89 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
90 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
91 import org.opendaylight.controller.cluster.raft.Snapshot;
92 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
93 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
94 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
95 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
96 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
97 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
98 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
99 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
100 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
101 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
102 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
103 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
104 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
105 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
106 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
107 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
108 import org.opendaylight.yangtools.yang.common.QName;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
113 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
115 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
123 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
124 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
125 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
126 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
127 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
128 import scala.concurrent.Await;
129 import scala.concurrent.Future;
130 import scala.concurrent.duration.FiniteDuration;
131
132 public class ShardTest extends AbstractShardTest {
133     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
134
135     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
136
137     @Test
138     public void testRegisterChangeListener() throws Exception {
139         new ShardTestKit(getSystem()) {{
140             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
141                     newShardProps(),  "testRegisterChangeListener");
142
143             waitUntilLeader(shard);
144
145             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
146
147             final MockDataChangeListener listener = new MockDataChangeListener(1);
148             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
149                     "testRegisterChangeListener-DataChangeListener");
150
151             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
152                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
153
154             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
155                     RegisterChangeListenerReply.class);
156             final String replyPath = reply.getListenerRegistrationPath().toString();
157             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
158                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
159
160             final YangInstanceIdentifier path = TestModel.TEST_PATH;
161             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
162
163             listener.waitForChangeEvents(path);
164
165             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
166             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
167         }};
168     }
169
170     @SuppressWarnings("serial")
171     @Test
172     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
173         // This test tests the timing window in which a change listener is registered before the
174         // shard becomes the leader. We verify that the listener is registered and notified of the
175         // existing data when the shard becomes the leader.
176         new ShardTestKit(getSystem()) {{
177             // For this test, we want to send the RegisterChangeListener message after the shard
178             // has recovered from persistence and before it becomes the leader. So we subclass
179             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
180             // we know that the shard has been initialized to a follower and has started the
181             // election process. The following 2 CountDownLatches are used to coordinate the
182             // ElectionTimeout with the sending of the RegisterChangeListener message.
183             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
184             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
185             final Creator<Shard> creator = new Creator<Shard>() {
186                 boolean firstElectionTimeout = true;
187
188                 @Override
189                 public Shard create() throws Exception {
190                     // Use a non persistent provider because this test actually invokes persist on the journal
191                     // this will cause all other messages to not be queued properly after that.
192                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
193                     // it does do a persist)
194                     return new Shard(shardID, Collections.<String,String>emptyMap(),
195                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
196                         @Override
197                         public void onReceiveCommand(final Object message) throws Exception {
198                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
199                                 // Got the first ElectionTimeout. We don't forward it to the
200                                 // base Shard yet until we've sent the RegisterChangeListener
201                                 // message. So we signal the onFirstElectionTimeout latch to tell
202                                 // the main thread to send the RegisterChangeListener message and
203                                 // start a thread to wait on the onChangeListenerRegistered latch,
204                                 // which the main thread signals after it has sent the message.
205                                 // After the onChangeListenerRegistered is triggered, we send the
206                                 // original ElectionTimeout message to proceed with the election.
207                                 firstElectionTimeout = false;
208                                 final ActorRef self = getSelf();
209                                 new Thread() {
210                                     @Override
211                                     public void run() {
212                                         Uninterruptibles.awaitUninterruptibly(
213                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
214                                         self.tell(message, self);
215                                     }
216                                 }.start();
217
218                                 onFirstElectionTimeout.countDown();
219                             } else {
220                                 super.onReceiveCommand(message);
221                             }
222                         }
223                     };
224                 }
225             };
226
227             final MockDataChangeListener listener = new MockDataChangeListener(1);
228             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
229                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
230
231             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
232                     Props.create(new DelegatingShardCreator(creator)),
233                     "testRegisterChangeListenerWhenNotLeaderInitially");
234
235             // Write initial data into the in-memory store.
236             final YangInstanceIdentifier path = TestModel.TEST_PATH;
237             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
238
239             // Wait until the shard receives the first ElectionTimeout message.
240             assertEquals("Got first ElectionTimeout", true,
241                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
242
243             // Now send the RegisterChangeListener and wait for the reply.
244             shard.tell(new RegisterChangeListener(path, dclActor,
245                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
246
247             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
248                     RegisterChangeListenerReply.class);
249             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
250
251             // Sanity check - verify the shard is not the leader yet.
252             shard.tell(new FindLeader(), getRef());
253             final FindLeaderReply findLeadeReply =
254                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
255             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
256
257             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
258             // with the election process.
259             onChangeListenerRegistered.countDown();
260
261             // Wait for the shard to become the leader and notify our listener with the existing
262             // data in the store.
263             listener.waitForChangeEvents(path);
264
265             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
266             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
267         }};
268     }
269
270     @Test
271     public void testRegisterDataTreeChangeListener() throws Exception {
272         new ShardTestKit(getSystem()) {{
273             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
274                     newShardProps(), "testRegisterDataTreeChangeListener");
275
276             waitUntilLeader(shard);
277
278             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
279
280             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
281             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
282                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
283
284             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
285
286             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
287                     RegisterDataTreeChangeListenerReply.class);
288             final String replyPath = reply.getListenerRegistrationPath().toString();
289             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
290                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
291
292             final YangInstanceIdentifier path = TestModel.TEST_PATH;
293             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
294
295             listener.waitForChangeEvents();
296
297             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
298             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
299         }};
300     }
301
302     @SuppressWarnings("serial")
303     @Test
304     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
305         new ShardTestKit(getSystem()) {{
306             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
307             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
308             final Creator<Shard> creator = new Creator<Shard>() {
309                 boolean firstElectionTimeout = true;
310
311                 @Override
312                 public Shard create() throws Exception {
313                     return new Shard(shardID, Collections.<String,String>emptyMap(),
314                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
315                         @Override
316                         public void onReceiveCommand(final Object message) throws Exception {
317                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
318                                 firstElectionTimeout = false;
319                                 final ActorRef self = getSelf();
320                                 new Thread() {
321                                     @Override
322                                     public void run() {
323                                         Uninterruptibles.awaitUninterruptibly(
324                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
325                                         self.tell(message, self);
326                                     }
327                                 }.start();
328
329                                 onFirstElectionTimeout.countDown();
330                             } else {
331                                 super.onReceiveCommand(message);
332                             }
333                         }
334                     };
335                 }
336             };
337
338             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
339             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
340                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
341
342             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
343                     Props.create(new DelegatingShardCreator(creator)),
344                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
345
346             final YangInstanceIdentifier path = TestModel.TEST_PATH;
347             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
348
349             assertEquals("Got first ElectionTimeout", true,
350                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
351
352             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
353             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
354                 RegisterDataTreeChangeListenerReply.class);
355             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
356
357             shard.tell(new FindLeader(), getRef());
358             final FindLeaderReply findLeadeReply =
359                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
360             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
361
362             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
363
364             onChangeListenerRegistered.countDown();
365
366             // TODO: investigate why we do not receive data chage events
367             listener.waitForChangeEvents();
368
369             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
370             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
371         }};
372     }
373
374     @Test
375     public void testCreateTransaction(){
376         new ShardTestKit(getSystem()) {{
377             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
378
379             waitUntilLeader(shard);
380
381             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
382
383             shard.tell(new CreateTransaction("txn-1",
384                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
385
386             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
387                     CreateTransactionReply.class);
388
389             final String path = reply.getTransactionActorPath().toString();
390             assertTrue("Unexpected transaction path " + path,
391                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
392
393             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
394         }};
395     }
396
397     @Test
398     public void testCreateTransactionOnChain(){
399         new ShardTestKit(getSystem()) {{
400             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
401
402             waitUntilLeader(shard);
403
404             shard.tell(new CreateTransaction("txn-1",
405                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
406                     getRef());
407
408             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
409                     CreateTransactionReply.class);
410
411             final String path = reply.getTransactionActorPath().toString();
412             assertTrue("Unexpected transaction path " + path,
413                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
414
415             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
416         }};
417     }
418
419     @SuppressWarnings("serial")
420     @Test
421     public void testPeerAddressResolved() throws Exception {
422         new ShardTestKit(getSystem()) {{
423             final CountDownLatch recoveryComplete = new CountDownLatch(1);
424             class TestShard extends Shard {
425                 TestShard() {
426                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
427                             newDatastoreContext(), SCHEMA_CONTEXT);
428                 }
429
430                 Map<String, String> getPeerAddresses() {
431                     return getRaftActorContext().getPeerAddresses();
432                 }
433
434                 @Override
435                 protected void onRecoveryComplete() {
436                     try {
437                         super.onRecoveryComplete();
438                     } finally {
439                         recoveryComplete.countDown();
440                     }
441                 }
442             }
443
444             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
445                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
446                         @Override
447                         public TestShard create() throws Exception {
448                             return new TestShard();
449                         }
450                     })), "testPeerAddressResolved");
451
452             //waitUntilLeader(shard);
453             assertEquals("Recovery complete", true,
454                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
455
456             final String address = "akka://foobar";
457             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
458
459             assertEquals("getPeerAddresses", address,
460                 ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
461
462             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
463         }};
464     }
465
466     @Test
467     public void testApplySnapshot() throws Exception {
468
469         ShardTestKit testkit = new ShardTestKit(getSystem());
470
471         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
472                 "testApplySnapshot");
473
474         testkit.waitUntilLeader(shard);
475
476         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
477         store.setSchemaContext(SCHEMA_CONTEXT);
478
479         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
480                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
481                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
482                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
483
484         writeToStore(store, TestModel.TEST_PATH, container);
485
486         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
487         final NormalizedNode<?,?> expected = readStore(store, root);
488
489         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
490                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
491
492         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
493
494         final NormalizedNode<?,?> actual = readStore(shard, root);
495
496         assertEquals("Root node", expected, actual);
497
498         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
499     }
500
501     @Test
502     public void testApplyState() throws Exception {
503
504         ShardTestKit testkit = new ShardTestKit(getSystem());
505
506         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
507
508         testkit.waitUntilLeader(shard);
509
510         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
511
512         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
513                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
514
515         shard.underlyingActor().onReceiveCommand(applyState);
516
517         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
518         assertEquals("Applied state", node, actual);
519
520         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
521     }
522
523     @Test
524     public void testApplyStateWithCandidatePayload() throws Exception {
525
526         ShardTestKit testkit = new ShardTestKit(getSystem());
527
528         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
529
530         testkit.waitUntilLeader(shard);
531
532         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
533         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
534
535         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
536                 DataTreeCandidatePayload.create(candidate)));
537
538         shard.underlyingActor().onReceiveCommand(applyState);
539
540         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
541         assertEquals("Applied state", node, actual);
542
543         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
544     }
545
546     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
547         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
548         testStore.setSchemaContext(SCHEMA_CONTEXT);
549
550         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
551
552         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
553
554         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
555             SerializationUtils.serializeNormalizedNode(root),
556             Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
557         return testStore;
558     }
559
560     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
561         source.validate(mod);
562         final DataTreeCandidate candidate = source.prepare(mod);
563         source.commit(candidate);
564         return DataTreeCandidatePayload.create(candidate);
565     }
566
567     @Test
568     public void testDataTreeCandidateRecovery() throws Exception {
569         // Set up the InMemorySnapshotStore.
570         final DataTree source = setupInMemorySnapshotStore();
571
572         final DataTreeModification writeMod = source.takeSnapshot().newModification();
573         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
574         writeMod.ready();
575         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
576
577         // Set up the InMemoryJournal.
578         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
579
580         final int nListEntries = 16;
581         final Set<Integer> listEntryKeys = new HashSet<>();
582
583         // Add some ModificationPayload entries
584         for (int i = 1; i <= nListEntries; i++) {
585             listEntryKeys.add(Integer.valueOf(i));
586
587             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
588                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
589
590             final DataTreeModification mod = source.takeSnapshot().newModification();
591             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
592             mod.ready();
593             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
594                 payloadForModification(source, mod)));
595         }
596
597         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
598             new ApplyJournalEntries(nListEntries));
599
600         testRecovery(listEntryKeys);
601     }
602
603     @Test
604     public void testModicationRecovery() throws Exception {
605
606         // Set up the InMemorySnapshotStore.
607         setupInMemorySnapshotStore();
608
609         // Set up the InMemoryJournal.
610
611         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
612
613         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
614             new WriteModification(TestModel.OUTER_LIST_PATH,
615                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
616
617         final int nListEntries = 16;
618         final Set<Integer> listEntryKeys = new HashSet<>();
619
620         // Add some ModificationPayload entries
621         for(int i = 1; i <= nListEntries; i++) {
622             listEntryKeys.add(Integer.valueOf(i));
623             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
624                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
625             final Modification mod = new MergeModification(path,
626                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
627             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
628                     newModificationPayload(mod)));
629         }
630
631         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
632                 new ApplyJournalEntries(nListEntries));
633
634         testRecovery(listEntryKeys);
635     }
636
637     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
638         final MutableCompositeModification compMod = new MutableCompositeModification();
639         for(final Modification mod: mods) {
640             compMod.addModification(mod);
641         }
642
643         return new ModificationPayload(compMod);
644     }
645
646     @Test
647     public void testConcurrentThreePhaseCommits() throws Throwable {
648         new ShardTestKit(getSystem()) {{
649             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
650                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
651                     "testConcurrentThreePhaseCommits");
652
653             waitUntilLeader(shard);
654
655          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
656
657             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
658
659             final String transactionID1 = "tx1";
660             final MutableCompositeModification modification1 = new MutableCompositeModification();
661             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
662                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
663
664             final String transactionID2 = "tx2";
665             final MutableCompositeModification modification2 = new MutableCompositeModification();
666             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
667                     TestModel.OUTER_LIST_PATH,
668                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
669                     modification2);
670
671             final String transactionID3 = "tx3";
672             final MutableCompositeModification modification3 = new MutableCompositeModification();
673             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
674                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
675                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
676                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
677                     modification3);
678
679             final long timeoutSec = 5;
680             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
681             final Timeout timeout = new Timeout(duration);
682
683             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
684             // by the ShardTransaction.
685
686             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
687                     cohort1, modification1, true, false), getRef());
688             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
689                     expectMsgClass(duration, ReadyTransactionReply.class));
690             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
691
692             // Send the CanCommitTransaction message for the first Tx.
693
694             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
695             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
696                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
697             assertEquals("Can commit", true, canCommitReply.getCanCommit());
698
699             // Send the ForwardedReadyTransaction for the next 2 Tx's.
700
701             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
702                     cohort2, modification2, true, false), getRef());
703             expectMsgClass(duration, ReadyTransactionReply.class);
704
705             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
706                     cohort3, modification3, true, false), getRef());
707             expectMsgClass(duration, ReadyTransactionReply.class);
708
709             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
710             // processed after the first Tx completes.
711
712             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
713                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
714
715             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
716                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
717
718             // Send the CommitTransaction message for the first Tx. After it completes, it should
719             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
720
721             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
722             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
723
724             // Wait for the next 2 Tx's to complete.
725
726             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
727             final CountDownLatch commitLatch = new CountDownLatch(2);
728
729             class OnFutureComplete extends OnComplete<Object> {
730                 private final Class<?> expRespType;
731
732                 OnFutureComplete(final Class<?> expRespType) {
733                     this.expRespType = expRespType;
734                 }
735
736                 @Override
737                 public void onComplete(final Throwable error, final Object resp) {
738                     if(error != null) {
739                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
740                     } else {
741                         try {
742                             assertEquals("Commit response type", expRespType, resp.getClass());
743                             onSuccess(resp);
744                         } catch (final Exception e) {
745                             caughtEx.set(e);
746                         }
747                     }
748                 }
749
750                 void onSuccess(final Object resp) throws Exception {
751                 }
752             }
753
754             class OnCommitFutureComplete extends OnFutureComplete {
755                 OnCommitFutureComplete() {
756                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
757                 }
758
759                 @Override
760                 public void onComplete(final Throwable error, final Object resp) {
761                     super.onComplete(error, resp);
762                     commitLatch.countDown();
763                 }
764             }
765
766             class OnCanCommitFutureComplete extends OnFutureComplete {
767                 private final String transactionID;
768
769                 OnCanCommitFutureComplete(final String transactionID) {
770                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
771                     this.transactionID = transactionID;
772                 }
773
774                 @Override
775                 void onSuccess(final Object resp) throws Exception {
776                     final CanCommitTransactionReply canCommitReply =
777                             CanCommitTransactionReply.fromSerializable(resp);
778                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
779
780                     final Future<Object> commitFuture = Patterns.ask(shard,
781                             new CommitTransaction(transactionID).toSerializable(), timeout);
782                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
783                 }
784             }
785
786             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
787                     getSystem().dispatcher());
788
789             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
790                     getSystem().dispatcher());
791
792             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
793
794             if(caughtEx.get() != null) {
795                 throw caughtEx.get();
796             }
797
798             assertEquals("Commits complete", true, done);
799
800             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
801             inOrder.verify(cohort1).canCommit();
802             inOrder.verify(cohort1).preCommit();
803             inOrder.verify(cohort1).commit();
804             inOrder.verify(cohort2).canCommit();
805             inOrder.verify(cohort2).preCommit();
806             inOrder.verify(cohort2).commit();
807             inOrder.verify(cohort3).canCommit();
808             inOrder.verify(cohort3).preCommit();
809             inOrder.verify(cohort3).commit();
810
811             // Verify data in the data store.
812
813             verifyOuterListEntry(shard, 1);
814
815             verifyLastApplied(shard, 2);
816
817             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
818         }};
819     }
820
821     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
822             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
823         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
824     }
825
826     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
827             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
828             final int messagesSent) {
829         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
830         batched.addModification(new WriteModification(path, data));
831         batched.setReady(ready);
832         batched.setDoCommitOnReady(doCommitOnReady);
833         batched.setTotalMessagesSent(messagesSent);
834         return batched;
835     }
836
837     @Test
838     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
839         new ShardTestKit(getSystem()) {{
840             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
841                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
842                     "testBatchedModificationsWithNoCommitOnReady");
843
844             waitUntilLeader(shard);
845
846             final String transactionID = "tx";
847             final FiniteDuration duration = duration("5 seconds");
848
849             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
850             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
851                 @Override
852                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
853                     if(mockCohort.get() == null) {
854                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
855                     }
856
857                     return mockCohort.get();
858                 }
859             };
860
861             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
862
863             // Send a BatchedModifications to start a transaction.
864
865             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
866                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
867             expectMsgClass(duration, BatchedModificationsReply.class);
868
869             // Send a couple more BatchedModifications.
870
871             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
872                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
873             expectMsgClass(duration, BatchedModificationsReply.class);
874
875             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
876                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
877                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
878             expectMsgClass(duration, ReadyTransactionReply.class);
879
880             // Send the CanCommitTransaction message.
881
882             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
883             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
884                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
885             assertEquals("Can commit", true, canCommitReply.getCanCommit());
886
887             // Send the CanCommitTransaction message.
888
889             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
890             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
891
892             final InOrder inOrder = inOrder(mockCohort.get());
893             inOrder.verify(mockCohort.get()).canCommit();
894             inOrder.verify(mockCohort.get()).preCommit();
895             inOrder.verify(mockCohort.get()).commit();
896
897             // Verify data in the data store.
898
899             verifyOuterListEntry(shard, 1);
900
901             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
902         }};
903     }
904
905     @Test
906     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
907         new ShardTestKit(getSystem()) {{
908             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
909                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
910                     "testBatchedModificationsWithCommitOnReady");
911
912             waitUntilLeader(shard);
913
914             final String transactionID = "tx";
915             final FiniteDuration duration = duration("5 seconds");
916
917             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
918             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
919                 @Override
920                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
921                     if(mockCohort.get() == null) {
922                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
923                     }
924
925                     return mockCohort.get();
926                 }
927             };
928
929             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
930
931             // Send a BatchedModifications to start a transaction.
932
933             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
934                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
935             expectMsgClass(duration, BatchedModificationsReply.class);
936
937             // Send a couple more BatchedModifications.
938
939             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
940                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
941             expectMsgClass(duration, BatchedModificationsReply.class);
942
943             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
944                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
945                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
946
947             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
948
949             final InOrder inOrder = inOrder(mockCohort.get());
950             inOrder.verify(mockCohort.get()).canCommit();
951             inOrder.verify(mockCohort.get()).preCommit();
952             inOrder.verify(mockCohort.get()).commit();
953
954             // Verify data in the data store.
955
956             verifyOuterListEntry(shard, 1);
957
958             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
959         }};
960     }
961
962     @Test(expected=IllegalStateException.class)
963     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
964         new ShardTestKit(getSystem()) {{
965             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
966                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
967                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
968
969             waitUntilLeader(shard);
970
971             final String transactionID = "tx1";
972             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
973             batched.setReady(true);
974             batched.setTotalMessagesSent(2);
975
976             shard.tell(batched, getRef());
977
978             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
979
980             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
981
982             if(failure != null) {
983                 throw failure.cause();
984             }
985         }};
986     }
987
988     @Test
989     public void testBatchedModificationsWithOperationFailure() throws Throwable {
990         new ShardTestKit(getSystem()) {{
991             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
992                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
993                     "testBatchedModificationsWithOperationFailure");
994
995             waitUntilLeader(shard);
996
997             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
998             // write will not validate the children for performance reasons.
999
1000             String transactionID = "tx1";
1001
1002             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1003                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1004                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1005
1006             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1007             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1008             shard.tell(batched, getRef());
1009             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1010
1011             Throwable cause = failure.cause();
1012
1013             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1014             batched.setReady(true);
1015             batched.setTotalMessagesSent(2);
1016
1017             shard.tell(batched, getRef());
1018
1019             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1020             assertEquals("Failure cause", cause, failure.cause());
1021
1022             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1023         }};
1024     }
1025
1026     @SuppressWarnings("unchecked")
1027     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1028         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1029         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1030         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1031                 outerList.getValue() instanceof Iterable);
1032         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1033         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1034                 entry instanceof MapEntryNode);
1035         final MapEntryNode mapEntry = (MapEntryNode)entry;
1036         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1037                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1038         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1039         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1040     }
1041
1042     @Test
1043     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1044         new ShardTestKit(getSystem()) {{
1045             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1046                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1047                     "testBatchedModificationsOnTransactionChain");
1048
1049             waitUntilLeader(shard);
1050
1051             final String transactionChainID = "txChain";
1052             final String transactionID1 = "tx1";
1053             final String transactionID2 = "tx2";
1054
1055             final FiniteDuration duration = duration("5 seconds");
1056
1057             // Send a BatchedModifications to start a chained write transaction and ready it.
1058
1059             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1060             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1061             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1062                     containerNode, true, false, 1), getRef());
1063             expectMsgClass(duration, ReadyTransactionReply.class);
1064
1065             // Create a read Tx on the same chain.
1066
1067             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1068                     transactionChainID).toSerializable(), getRef());
1069
1070             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1071
1072             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1073             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1074             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1075
1076             // Commit the write transaction.
1077
1078             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1079             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1080                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1081             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1082
1083             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1084             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1085
1086             // Verify data in the data store.
1087
1088             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1089             assertEquals("Stored node", containerNode, actualNode);
1090
1091             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1092         }};
1093     }
1094
1095     @Test
1096     public void testOnBatchedModificationsWhenNotLeader() {
1097         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1098         new ShardTestKit(getSystem()) {{
1099             final Creator<Shard> creator = new Creator<Shard>() {
1100                 private static final long serialVersionUID = 1L;
1101
1102                 @Override
1103                 public Shard create() throws Exception {
1104                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1105                             newDatastoreContext(), SCHEMA_CONTEXT) {
1106                         @Override
1107                         protected boolean isLeader() {
1108                             return overrideLeaderCalls.get() ? false : super.isLeader();
1109                         }
1110
1111                         @Override
1112                         protected ActorSelection getLeader() {
1113                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1114                                 super.getLeader();
1115                         }
1116                     };
1117                 }
1118             };
1119
1120             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1121                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1122
1123             waitUntilLeader(shard);
1124
1125             overrideLeaderCalls.set(true);
1126
1127             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1128
1129             shard.tell(batched, ActorRef.noSender());
1130
1131             expectMsgEquals(batched);
1132
1133             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1134         }};
1135     }
1136
1137     @Test
1138     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1139         new ShardTestKit(getSystem()) {{
1140             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1141                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1142                     "testForwardedReadyTransactionWithImmediateCommit");
1143
1144             waitUntilLeader(shard);
1145
1146             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1147
1148             final String transactionID = "tx1";
1149             final MutableCompositeModification modification = new MutableCompositeModification();
1150             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1151             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1152                     TestModel.TEST_PATH, containerNode, modification);
1153
1154             final FiniteDuration duration = duration("5 seconds");
1155
1156             // Simulate the ForwardedReadyTransaction messages that would be sent
1157             // by the ShardTransaction.
1158
1159             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1160                     cohort, modification, true, true), getRef());
1161
1162             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1163
1164             final InOrder inOrder = inOrder(cohort);
1165             inOrder.verify(cohort).canCommit();
1166             inOrder.verify(cohort).preCommit();
1167             inOrder.verify(cohort).commit();
1168
1169             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1170             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1171
1172             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1173         }};
1174     }
1175
1176     @Test
1177     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1178         new ShardTestKit(getSystem()) {{
1179             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1180                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1181                     "testReadyLocalTransactionWithImmediateCommit");
1182
1183             waitUntilLeader(shard);
1184
1185             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1186
1187             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1188
1189             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1190             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1191             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1192             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1193
1194             final String txId = "tx1";
1195             modification.ready();
1196             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1197
1198             shard.tell(readyMessage, getRef());
1199
1200             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1201
1202             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1203             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1204
1205             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1206         }};
1207     }
1208
1209     @Test
1210     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1211         new ShardTestKit(getSystem()) {{
1212             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1213                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1214                     "testReadyLocalTransactionWithThreePhaseCommit");
1215
1216             waitUntilLeader(shard);
1217
1218             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1219
1220             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1221
1222             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1223             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1224             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1225             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1226
1227             final String txId = "tx1";
1228                 modification.ready();
1229             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1230
1231             shard.tell(readyMessage, getRef());
1232
1233             expectMsgClass(ReadyTransactionReply.class);
1234
1235             // Send the CanCommitTransaction message.
1236
1237             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1238             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1239                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1240             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1241
1242             // Send the CanCommitTransaction message.
1243
1244             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1245             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1246
1247             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1248             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1249
1250             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1251         }};
1252     }
1253
1254     @Test
1255     public void testCommitWithPersistenceDisabled() throws Throwable {
1256         dataStoreContextBuilder.persistent(false);
1257         new ShardTestKit(getSystem()) {{
1258             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1259                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1260                     "testCommitWithPersistenceDisabled");
1261
1262             waitUntilLeader(shard);
1263
1264             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1265
1266             // Setup a simulated transactions with a mock cohort.
1267
1268             final String transactionID = "tx";
1269             final MutableCompositeModification modification = new MutableCompositeModification();
1270             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1271             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1272                 TestModel.TEST_PATH, containerNode, modification);
1273
1274             final FiniteDuration duration = duration("5 seconds");
1275
1276             // Simulate the ForwardedReadyTransaction messages that would be sent
1277             // by the ShardTransaction.
1278
1279             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1280                 cohort, modification, true, false), getRef());
1281             expectMsgClass(duration, ReadyTransactionReply.class);
1282
1283             // Send the CanCommitTransaction message.
1284
1285             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1286             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1287                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1288             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1289
1290             // Send the CanCommitTransaction message.
1291
1292             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1293             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1294
1295             final InOrder inOrder = inOrder(cohort);
1296             inOrder.verify(cohort).canCommit();
1297             inOrder.verify(cohort).preCommit();
1298             inOrder.verify(cohort).commit();
1299
1300             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1301             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1302
1303             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1304         }};
1305     }
1306
1307     private static DataTreeCandidateTip mockCandidate(final String name) {
1308         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1309         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1310         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1311         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1312         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1313         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1314         return mockCandidate;
1315     }
1316
1317     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1318         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1319         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1320         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1321         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1322         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1323         return mockCandidate;
1324     }
1325
1326     @Test
1327     public void testCommitWhenTransactionHasNoModifications(){
1328         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1329         // but here that need not happen
1330         new ShardTestKit(getSystem()) {
1331             {
1332                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1333                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1334                         "testCommitWhenTransactionHasNoModifications");
1335
1336                 waitUntilLeader(shard);
1337
1338                 final String transactionID = "tx1";
1339                 final MutableCompositeModification modification = new MutableCompositeModification();
1340                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1341                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1342                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1343                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1344                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1345
1346                 final FiniteDuration duration = duration("5 seconds");
1347
1348                 // Simulate the ForwardedReadyTransaction messages that would be sent
1349                 // by the ShardTransaction.
1350
1351                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1352                         cohort, modification, true, false), getRef());
1353                 expectMsgClass(duration, ReadyTransactionReply.class);
1354
1355                 // Send the CanCommitTransaction message.
1356
1357                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1358                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1359                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1360                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1361
1362                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1363                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1364
1365                 final InOrder inOrder = inOrder(cohort);
1366                 inOrder.verify(cohort).canCommit();
1367                 inOrder.verify(cohort).preCommit();
1368                 inOrder.verify(cohort).commit();
1369
1370                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1371                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1372
1373                 // Use MBean for verification
1374                 // Committed transaction count should increase as usual
1375                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1376
1377                 // Commit index should not advance because this does not go into the journal
1378                 assertEquals(-1, shardStats.getCommitIndex());
1379
1380                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1381
1382             }
1383         };
1384     }
1385
1386     @Test
1387     public void testCommitWhenTransactionHasModifications(){
1388         new ShardTestKit(getSystem()) {
1389             {
1390                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1391                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1392                         "testCommitWhenTransactionHasModifications");
1393
1394                 waitUntilLeader(shard);
1395
1396                 final String transactionID = "tx1";
1397                 final MutableCompositeModification modification = new MutableCompositeModification();
1398                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1399                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1400                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1401                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1402                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1403                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1404
1405                 final FiniteDuration duration = duration("5 seconds");
1406
1407                 // Simulate the ForwardedReadyTransaction messages that would be sent
1408                 // by the ShardTransaction.
1409
1410                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1411                         cohort, modification, true, false), getRef());
1412                 expectMsgClass(duration, ReadyTransactionReply.class);
1413
1414                 // Send the CanCommitTransaction message.
1415
1416                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1417                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1418                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1419                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1420
1421                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1422                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1423
1424                 final InOrder inOrder = inOrder(cohort);
1425                 inOrder.verify(cohort).canCommit();
1426                 inOrder.verify(cohort).preCommit();
1427                 inOrder.verify(cohort).commit();
1428
1429                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1430                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1431
1432                 // Use MBean for verification
1433                 // Committed transaction count should increase as usual
1434                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1435
1436                 // Commit index should advance as we do not have an empty modification
1437                 assertEquals(0, shardStats.getCommitIndex());
1438
1439                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1440
1441             }
1442         };
1443     }
1444
1445     @Test
1446     public void testCommitPhaseFailure() throws Throwable {
1447         new ShardTestKit(getSystem()) {{
1448             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1449                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1450                     "testCommitPhaseFailure");
1451
1452             waitUntilLeader(shard);
1453
1454             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1455             // commit phase.
1456
1457             final String transactionID1 = "tx1";
1458             final MutableCompositeModification modification1 = new MutableCompositeModification();
1459             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1460             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1461             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1462             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1463             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1464
1465             final String transactionID2 = "tx2";
1466             final MutableCompositeModification modification2 = new MutableCompositeModification();
1467             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1468             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1469
1470             final FiniteDuration duration = duration("5 seconds");
1471             final Timeout timeout = new Timeout(duration);
1472
1473             // Simulate the ForwardedReadyTransaction messages that would be sent
1474             // by the ShardTransaction.
1475
1476             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1477                     cohort1, modification1, true, false), getRef());
1478             expectMsgClass(duration, ReadyTransactionReply.class);
1479
1480             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1481                     cohort2, modification2, true, false), getRef());
1482             expectMsgClass(duration, ReadyTransactionReply.class);
1483
1484             // Send the CanCommitTransaction message for the first Tx.
1485
1486             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1487             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1488                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1489             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1490
1491             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1492             // processed after the first Tx completes.
1493
1494             final Future<Object> canCommitFuture = Patterns.ask(shard,
1495                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1496
1497             // Send the CommitTransaction message for the first Tx. This should send back an error
1498             // and trigger the 2nd Tx to proceed.
1499
1500             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1501             expectMsgClass(duration, akka.actor.Status.Failure.class);
1502
1503             // Wait for the 2nd Tx to complete the canCommit phase.
1504
1505             final CountDownLatch latch = new CountDownLatch(1);
1506             canCommitFuture.onComplete(new OnComplete<Object>() {
1507                 @Override
1508                 public void onComplete(final Throwable t, final Object resp) {
1509                     latch.countDown();
1510                 }
1511             }, getSystem().dispatcher());
1512
1513             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1514
1515             final InOrder inOrder = inOrder(cohort1, cohort2);
1516             inOrder.verify(cohort1).canCommit();
1517             inOrder.verify(cohort1).preCommit();
1518             inOrder.verify(cohort1).commit();
1519             inOrder.verify(cohort2).canCommit();
1520
1521             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1522         }};
1523     }
1524
1525     @Test
1526     public void testPreCommitPhaseFailure() throws Throwable {
1527         new ShardTestKit(getSystem()) {{
1528             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1529                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1530                     "testPreCommitPhaseFailure");
1531
1532             waitUntilLeader(shard);
1533
1534             final String transactionID1 = "tx1";
1535             final MutableCompositeModification modification1 = new MutableCompositeModification();
1536             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1537             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1538             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1539
1540             final String transactionID2 = "tx2";
1541             final MutableCompositeModification modification2 = new MutableCompositeModification();
1542             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1543             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1544
1545             final FiniteDuration duration = duration("5 seconds");
1546             final Timeout timeout = new Timeout(duration);
1547
1548             // Simulate the ForwardedReadyTransaction messages that would be sent
1549             // by the ShardTransaction.
1550
1551             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1552                     cohort1, modification1, true, false), getRef());
1553             expectMsgClass(duration, ReadyTransactionReply.class);
1554
1555             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1556                     cohort2, modification2, true, false), getRef());
1557             expectMsgClass(duration, ReadyTransactionReply.class);
1558
1559             // Send the CanCommitTransaction message for the first Tx.
1560
1561             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1562             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1563                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1564             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1565
1566             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1567             // processed after the first Tx completes.
1568
1569             final Future<Object> canCommitFuture = Patterns.ask(shard,
1570                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1571
1572             // Send the CommitTransaction message for the first Tx. This should send back an error
1573             // and trigger the 2nd Tx to proceed.
1574
1575             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1576             expectMsgClass(duration, akka.actor.Status.Failure.class);
1577
1578             // Wait for the 2nd Tx to complete the canCommit phase.
1579
1580             final CountDownLatch latch = new CountDownLatch(1);
1581             canCommitFuture.onComplete(new OnComplete<Object>() {
1582                 @Override
1583                 public void onComplete(final Throwable t, final Object resp) {
1584                     latch.countDown();
1585                 }
1586             }, getSystem().dispatcher());
1587
1588             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1589
1590             final InOrder inOrder = inOrder(cohort1, cohort2);
1591             inOrder.verify(cohort1).canCommit();
1592             inOrder.verify(cohort1).preCommit();
1593             inOrder.verify(cohort2).canCommit();
1594
1595             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1596         }};
1597     }
1598
1599     @Test
1600     public void testCanCommitPhaseFailure() throws Throwable {
1601         new ShardTestKit(getSystem()) {{
1602             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1603                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1604                     "testCanCommitPhaseFailure");
1605
1606             waitUntilLeader(shard);
1607
1608             final FiniteDuration duration = duration("5 seconds");
1609
1610             final String transactionID1 = "tx1";
1611             final MutableCompositeModification modification = new MutableCompositeModification();
1612             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1613             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1614
1615             // Simulate the ForwardedReadyTransaction messages that would be sent
1616             // by the ShardTransaction.
1617
1618             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1619                     cohort, modification, true, false), getRef());
1620             expectMsgClass(duration, ReadyTransactionReply.class);
1621
1622             // Send the CanCommitTransaction message.
1623
1624             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1625             expectMsgClass(duration, akka.actor.Status.Failure.class);
1626
1627             // Send another can commit to ensure the failed one got cleaned up.
1628
1629             reset(cohort);
1630
1631             final String transactionID2 = "tx2";
1632             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1633
1634             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1635                     cohort, modification, true, false), getRef());
1636             expectMsgClass(duration, ReadyTransactionReply.class);
1637
1638             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1639             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1640                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1641             assertEquals("getCanCommit", true, reply.getCanCommit());
1642
1643             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1644         }};
1645     }
1646
1647     @Test
1648     public void testCanCommitPhaseFalseResponse() throws Throwable {
1649         new ShardTestKit(getSystem()) {{
1650             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1651                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1652                     "testCanCommitPhaseFalseResponse");
1653
1654             waitUntilLeader(shard);
1655
1656             final FiniteDuration duration = duration("5 seconds");
1657
1658             final String transactionID1 = "tx1";
1659             final MutableCompositeModification modification = new MutableCompositeModification();
1660             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1661             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1662
1663             // Simulate the ForwardedReadyTransaction messages that would be sent
1664             // by the ShardTransaction.
1665
1666             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1667                     cohort, modification, true, false), getRef());
1668             expectMsgClass(duration, ReadyTransactionReply.class);
1669
1670             // Send the CanCommitTransaction message.
1671
1672             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1673             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1674                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1675             assertEquals("getCanCommit", false, reply.getCanCommit());
1676
1677             // Send another can commit to ensure the failed one got cleaned up.
1678
1679             reset(cohort);
1680
1681             final String transactionID2 = "tx2";
1682             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1683
1684             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1685                 cohort, modification, true, false), getRef());
1686             expectMsgClass(duration, ReadyTransactionReply.class);
1687
1688             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1689             reply = CanCommitTransactionReply.fromSerializable(
1690                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1691             assertEquals("getCanCommit", true, reply.getCanCommit());
1692
1693             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1694         }};
1695     }
1696
1697     @Test
1698     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1699         new ShardTestKit(getSystem()) {{
1700             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1701                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1702                     "testImmediateCommitWithCanCommitPhaseFailure");
1703
1704             waitUntilLeader(shard);
1705
1706             final FiniteDuration duration = duration("5 seconds");
1707
1708             final String transactionID1 = "tx1";
1709             final MutableCompositeModification modification = new MutableCompositeModification();
1710             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1711             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1712
1713             // Simulate the ForwardedReadyTransaction messages that would be sent
1714             // by the ShardTransaction.
1715
1716             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1717                     cohort, modification, true, true), getRef());
1718
1719             expectMsgClass(duration, akka.actor.Status.Failure.class);
1720
1721             // Send another can commit to ensure the failed one got cleaned up.
1722
1723             reset(cohort);
1724
1725             final String transactionID2 = "tx2";
1726             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1727             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1728             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1729             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1730             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1731             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1732             doReturn(candidateRoot).when(candidate).getRootNode();
1733             doReturn(candidate).when(cohort).getCandidate();
1734
1735             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1736                     cohort, modification, true, true), getRef());
1737
1738             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1739
1740             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1741         }};
1742     }
1743
1744     @Test
1745     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1746         new ShardTestKit(getSystem()) {{
1747             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1748                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1749                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1750
1751             waitUntilLeader(shard);
1752
1753             final FiniteDuration duration = duration("5 seconds");
1754
1755             final String transactionID = "tx1";
1756             final MutableCompositeModification modification = new MutableCompositeModification();
1757             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1758             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1759
1760             // Simulate the ForwardedReadyTransaction messages that would be sent
1761             // by the ShardTransaction.
1762
1763             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1764                     cohort, modification, true, true), getRef());
1765
1766             expectMsgClass(duration, akka.actor.Status.Failure.class);
1767
1768             // Send another can commit to ensure the failed one got cleaned up.
1769
1770             reset(cohort);
1771
1772             final String transactionID2 = "tx2";
1773             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1774             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1775             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1776             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1777             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1778             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1779             doReturn(candidateRoot).when(candidate).getRootNode();
1780             doReturn(candidate).when(cohort).getCandidate();
1781
1782             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1783                     cohort, modification, true, true), getRef());
1784
1785             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1786
1787             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1788         }};
1789     }
1790
1791     @Test
1792     public void testAbortBeforeFinishCommit() throws Throwable {
1793         new ShardTestKit(getSystem()) {{
1794             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1795                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1796                     "testAbortBeforeFinishCommit");
1797
1798             waitUntilLeader(shard);
1799
1800             final FiniteDuration duration = duration("5 seconds");
1801             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1802
1803             final String transactionID = "tx1";
1804             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1805                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1806                 @Override
1807                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1808                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1809
1810                     // Simulate an AbortTransaction message occurring during replication, after
1811                     // persisting and before finishing the commit to the in-memory store.
1812                     // We have no followers so due to optimizations in the RaftActor, it does not
1813                     // attempt replication and thus we can't send an AbortTransaction message b/c
1814                     // it would be processed too late after CommitTransaction completes. So we'll
1815                     // simulate an AbortTransaction message occurring during replication by calling
1816                     // the shard directly.
1817                     //
1818                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1819
1820                     return preCommitFuture;
1821                 }
1822             };
1823
1824             final MutableCompositeModification modification = new MutableCompositeModification();
1825             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1826                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1827                     modification, preCommit);
1828
1829             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1830                 cohort, modification, true, false), getRef());
1831             expectMsgClass(duration, ReadyTransactionReply.class);
1832
1833             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1834             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1835                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1836             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1837
1838             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1839             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1840
1841             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1842
1843             // Since we're simulating an abort occurring during replication and before finish commit,
1844             // the data should still get written to the in-memory store since we've gotten past
1845             // canCommit and preCommit and persisted the data.
1846             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1847
1848             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1849         }};
1850     }
1851
1852     @Test
1853     public void testTransactionCommitTimeout() throws Throwable {
1854         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1855
1856         new ShardTestKit(getSystem()) {{
1857             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1858                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1859                     "testTransactionCommitTimeout");
1860
1861             waitUntilLeader(shard);
1862
1863             final FiniteDuration duration = duration("5 seconds");
1864
1865             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1866
1867             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1868             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1869                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1870
1871             // Create 1st Tx - will timeout
1872
1873             final String transactionID1 = "tx1";
1874             final MutableCompositeModification modification1 = new MutableCompositeModification();
1875             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1876                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1877                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1878                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1879                     modification1);
1880
1881             // Create 2nd Tx
1882
1883             final String transactionID2 = "tx3";
1884             final MutableCompositeModification modification2 = new MutableCompositeModification();
1885             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1886                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1887             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1888                     listNodePath,
1889                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1890                     modification2);
1891
1892             // Ready the Tx's
1893
1894             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1895                     cohort1, modification1, true, false), getRef());
1896             expectMsgClass(duration, ReadyTransactionReply.class);
1897
1898             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1899                     cohort2, modification2, true, false), getRef());
1900             expectMsgClass(duration, ReadyTransactionReply.class);
1901
1902             // canCommit 1st Tx. We don't send the commit so it should timeout.
1903
1904             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1905             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1906
1907             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1908
1909             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1910             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1911
1912             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1913
1914             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1915             expectMsgClass(duration, akka.actor.Status.Failure.class);
1916
1917             // Commit the 2nd Tx.
1918
1919             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1920             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1921
1922             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1923             assertNotNull(listNodePath + " not found", node);
1924
1925             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1926         }};
1927     }
1928
1929     @Test
1930     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1931         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1932
1933         new ShardTestKit(getSystem()) {{
1934             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1935                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1936                     "testTransactionCommitQueueCapacityExceeded");
1937
1938             waitUntilLeader(shard);
1939
1940             final FiniteDuration duration = duration("5 seconds");
1941
1942             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1943
1944             final String transactionID1 = "tx1";
1945             final MutableCompositeModification modification1 = new MutableCompositeModification();
1946             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1947                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1948
1949             final String transactionID2 = "tx2";
1950             final MutableCompositeModification modification2 = new MutableCompositeModification();
1951             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1952                     TestModel.OUTER_LIST_PATH,
1953                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1954                     modification2);
1955
1956             final String transactionID3 = "tx3";
1957             final MutableCompositeModification modification3 = new MutableCompositeModification();
1958             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1959                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1960
1961             // Ready the Tx's
1962
1963             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1964                     cohort1, modification1, true, false), getRef());
1965             expectMsgClass(duration, ReadyTransactionReply.class);
1966
1967             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1968                     cohort2, modification2, true, false), getRef());
1969             expectMsgClass(duration, ReadyTransactionReply.class);
1970
1971             // The 3rd Tx should exceed queue capacity and fail.
1972
1973             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1974                     cohort3, modification3, true, false), getRef());
1975             expectMsgClass(duration, akka.actor.Status.Failure.class);
1976
1977             // canCommit 1st Tx.
1978
1979             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1980             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1981
1982             // canCommit the 2nd Tx - it should get queued.
1983
1984             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1985
1986             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1987
1988             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1989             expectMsgClass(duration, akka.actor.Status.Failure.class);
1990
1991             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1992         }};
1993     }
1994
1995     @Test
1996     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1997         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1998
1999         new ShardTestKit(getSystem()) {{
2000             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2001                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2002                     "testTransactionCommitWithPriorExpiredCohortEntries");
2003
2004             waitUntilLeader(shard);
2005
2006             final FiniteDuration duration = duration("5 seconds");
2007
2008             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2009
2010             final String transactionID1 = "tx1";
2011             final MutableCompositeModification modification1 = new MutableCompositeModification();
2012             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2013                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2014
2015             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2016                 cohort1, modification1, true, false), getRef());
2017             expectMsgClass(duration, ReadyTransactionReply.class);
2018
2019             final String transactionID2 = "tx2";
2020             final MutableCompositeModification modification2 = new MutableCompositeModification();
2021             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2022                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2023
2024             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2025                     cohort2, modification2, true, false), getRef());
2026             expectMsgClass(duration, ReadyTransactionReply.class);
2027
2028             final String transactionID3 = "tx3";
2029             final MutableCompositeModification modification3 = new MutableCompositeModification();
2030             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2031                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2032
2033             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2034                 cohort3, modification3, true, false), getRef());
2035             expectMsgClass(duration, ReadyTransactionReply.class);
2036
2037             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2038             // should expire from the queue and the last one should be processed.
2039
2040             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2041             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2042
2043             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2044         }};
2045     }
2046
2047     @Test
2048     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2049         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2050
2051         new ShardTestKit(getSystem()) {{
2052             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2053                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2054                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2055
2056             waitUntilLeader(shard);
2057
2058             final FiniteDuration duration = duration("5 seconds");
2059
2060             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2061
2062             final String transactionID1 = "tx1";
2063             final MutableCompositeModification modification1 = new MutableCompositeModification();
2064             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2065                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2066
2067             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2068                     cohort1, modification1, true, false), getRef());
2069             expectMsgClass(duration, ReadyTransactionReply.class);
2070
2071             // CanCommit the first one so it's the current in-progress CohortEntry.
2072
2073             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2074             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2075
2076             // Ready the second Tx.
2077
2078             final String transactionID2 = "tx2";
2079             final MutableCompositeModification modification2 = new MutableCompositeModification();
2080             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2081                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2082
2083             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2084                     cohort2, modification2, true, false), getRef());
2085             expectMsgClass(duration, ReadyTransactionReply.class);
2086
2087             // Ready the third Tx.
2088
2089             final String transactionID3 = "tx3";
2090             final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2091             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2092                     .apply(modification3);
2093                 modification3.ready();
2094             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2095
2096             shard.tell(readyMessage, getRef());
2097
2098             // Commit the first Tx. After completing, the second should expire from the queue and the third
2099             // Tx committed.
2100
2101             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2102             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2103
2104             // Expect commit reply from the third Tx.
2105
2106             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2107
2108             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2109             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2110
2111             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2112         }};
2113     }
2114
2115     @Test
2116     public void testCanCommitBeforeReadyFailure() throws Throwable {
2117         new ShardTestKit(getSystem()) {{
2118             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2119                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2120                     "testCanCommitBeforeReadyFailure");
2121
2122             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2123             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2124
2125             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2126         }};
2127     }
2128
2129     @Test
2130     public void testAbortCurrentTransaction() throws Throwable {
2131         new ShardTestKit(getSystem()) {{
2132             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2133                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2134                     "testAbortCurrentTransaction");
2135
2136             waitUntilLeader(shard);
2137
2138             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2139
2140             final String transactionID1 = "tx1";
2141             final MutableCompositeModification modification1 = new MutableCompositeModification();
2142             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2143             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2144             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2145
2146             final String transactionID2 = "tx2";
2147             final MutableCompositeModification modification2 = new MutableCompositeModification();
2148             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2149             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2150
2151             final FiniteDuration duration = duration("5 seconds");
2152             final Timeout timeout = new Timeout(duration);
2153
2154             // Simulate the ForwardedReadyTransaction messages that would be sent
2155             // by the ShardTransaction.
2156
2157             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2158                     cohort1, modification1, true, false), getRef());
2159             expectMsgClass(duration, ReadyTransactionReply.class);
2160
2161             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2162                     cohort2, modification2, true, false), getRef());
2163             expectMsgClass(duration, ReadyTransactionReply.class);
2164
2165             // Send the CanCommitTransaction message for the first Tx.
2166
2167             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2168             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2169                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2170             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2171
2172             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2173             // processed after the first Tx completes.
2174
2175             final Future<Object> canCommitFuture = Patterns.ask(shard,
2176                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2177
2178             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2179             // Tx to proceed.
2180
2181             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2182             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2183
2184             // Wait for the 2nd Tx to complete the canCommit phase.
2185
2186             Await.ready(canCommitFuture, duration);
2187
2188             final InOrder inOrder = inOrder(cohort1, cohort2);
2189             inOrder.verify(cohort1).canCommit();
2190             inOrder.verify(cohort2).canCommit();
2191
2192             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2193         }};
2194     }
2195
2196     @Test
2197     public void testAbortQueuedTransaction() throws Throwable {
2198         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2199         new ShardTestKit(getSystem()) {{
2200             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2201             @SuppressWarnings("serial")
2202             final Creator<Shard> creator = new Creator<Shard>() {
2203                 @Override
2204                 public Shard create() throws Exception {
2205                     return new Shard(shardID, Collections.<String,String>emptyMap(),
2206                             dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
2207                         @Override
2208                         public void onReceiveCommand(final Object message) throws Exception {
2209                             super.onReceiveCommand(message);
2210                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2211                                 if(cleaupCheckLatch.get() != null) {
2212                                     cleaupCheckLatch.get().countDown();
2213                                 }
2214                             }
2215                         }
2216                     };
2217                 }
2218             };
2219
2220             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2221                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2222                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2223
2224             waitUntilLeader(shard);
2225
2226             final String transactionID = "tx1";
2227
2228             final MutableCompositeModification modification = new MutableCompositeModification();
2229             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2230             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2231
2232             final FiniteDuration duration = duration("5 seconds");
2233
2234             // Ready the tx.
2235
2236             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2237                     cohort, modification, true, false), getRef());
2238             expectMsgClass(duration, ReadyTransactionReply.class);
2239
2240             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2241
2242             // Send the AbortTransaction message.
2243
2244             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2245             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2246
2247             verify(cohort).abort();
2248
2249             // Verify the tx cohort is removed from queue at the cleanup check interval.
2250
2251             cleaupCheckLatch.set(new CountDownLatch(1));
2252             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2253                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2254
2255             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2256
2257             // Now send CanCommitTransaction - should fail.
2258
2259             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2260
2261             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2262             assertTrue("Failure type", failure instanceof IllegalStateException);
2263
2264             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2265         }};
2266     }
2267
2268     @Test
2269     public void testCreateSnapshot() throws Exception {
2270         testCreateSnapshot(true, "testCreateSnapshot");
2271     }
2272
2273     @Test
2274     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2275         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2276     }
2277
2278     @SuppressWarnings("serial")
2279     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2280
2281         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2282
2283         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2284         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2285             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2286                 super(delegate);
2287             }
2288
2289             @Override
2290             public void saveSnapshot(final Object o) {
2291                 savedSnapshot.set(o);
2292                 super.saveSnapshot(o);
2293             }
2294         }
2295
2296         dataStoreContextBuilder.persistent(persistent);
2297
2298         new ShardTestKit(getSystem()) {{
2299             class TestShard extends Shard {
2300
2301                 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2302                                     final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2303                     super(name, peerAddresses, datastoreContext, schemaContext);
2304                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2305                 }
2306
2307                 @Override
2308                 public void handleCommand(final Object message) {
2309                     super.handleCommand(message);
2310
2311                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2312                         latch.get().countDown();
2313                     }
2314                 }
2315
2316                 @Override
2317                 public RaftActorContext getRaftActorContext() {
2318                     return super.getRaftActorContext();
2319                 }
2320             }
2321
2322             final Creator<Shard> creator = new Creator<Shard>() {
2323                 @Override
2324                 public Shard create() throws Exception {
2325                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2326                             newDatastoreContext(), SCHEMA_CONTEXT);
2327                 }
2328             };
2329
2330             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2331                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2332
2333             waitUntilLeader(shard);
2334
2335             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2336
2337             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2338
2339             // Trigger creation of a snapshot by ensuring
2340             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2341             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2342
2343             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2344
2345             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2346                     savedSnapshot.get() instanceof Snapshot);
2347
2348             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2349
2350             latch.set(new CountDownLatch(1));
2351             savedSnapshot.set(null);
2352
2353             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2354
2355             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2356
2357             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2358                     savedSnapshot.get() instanceof Snapshot);
2359
2360             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2361
2362             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2363         }
2364
2365         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2366
2367             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2368             assertEquals("Root node", expectedRoot, actual);
2369
2370         }};
2371     }
2372
2373     /**
2374      * This test simply verifies that the applySnapShot logic will work
2375      * @throws ReadFailedException
2376      * @throws DataValidationFailedException
2377      */
2378     @Test
2379     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2380         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2381         store.setSchemaContext(SCHEMA_CONTEXT);
2382
2383         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2384         putTransaction.write(TestModel.TEST_PATH,
2385             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2386         commitTransaction(store, putTransaction);
2387
2388
2389         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2390
2391         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2392
2393         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2394         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2395
2396         commitTransaction(store, writeTransaction);
2397
2398         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2399
2400         assertEquals(expected, actual);
2401     }
2402
2403     @Test
2404     public void testRecoveryApplicable(){
2405
2406         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2407                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2408
2409         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2410                 persistentContext, SCHEMA_CONTEXT);
2411
2412         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2413                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2414
2415         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2416             nonPersistentContext, SCHEMA_CONTEXT);
2417
2418         new ShardTestKit(getSystem()) {{
2419             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2420                     persistentProps, "testPersistence1");
2421
2422             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2423
2424             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2425
2426             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2427                     nonPersistentProps, "testPersistence2");
2428
2429             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2430
2431             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2432
2433         }};
2434
2435     }
2436
2437     @Test
2438     public void testOnDatastoreContext() {
2439         new ShardTestKit(getSystem()) {{
2440             dataStoreContextBuilder.persistent(true);
2441
2442             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2443
2444             assertEquals("isRecoveryApplicable", true,
2445                     shard.underlyingActor().persistence().isRecoveryApplicable());
2446
2447             waitUntilLeader(shard);
2448
2449             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2450
2451             assertEquals("isRecoveryApplicable", false,
2452                 shard.underlyingActor().persistence().isRecoveryApplicable());
2453
2454             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2455
2456             assertEquals("isRecoveryApplicable", true,
2457                 shard.underlyingActor().persistence().isRecoveryApplicable());
2458
2459             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2460         }};
2461     }
2462
2463     @Test
2464     public void testRegisterRoleChangeListener() throws Exception {
2465         new ShardTestKit(getSystem()) {
2466             {
2467                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2468                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2469                         "testRegisterRoleChangeListener");
2470
2471                 waitUntilLeader(shard);
2472
2473                 final TestActorRef<MessageCollectorActor> listener =
2474                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2475
2476                 shard.tell(new RegisterRoleChangeListener(), listener);
2477
2478                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2479
2480                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2481                     ShardLeaderStateChanged.class);
2482                 assertEquals("getLocalShardDataTree present", true,
2483                         leaderStateChanged.getLocalShardDataTree().isPresent());
2484                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2485                     leaderStateChanged.getLocalShardDataTree().get());
2486
2487                 MessageCollectorActor.clearMessages(listener);
2488
2489                 // Force a leader change
2490
2491                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2492
2493                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2494                         ShardLeaderStateChanged.class);
2495                 assertEquals("getLocalShardDataTree present", false,
2496                         leaderStateChanged.getLocalShardDataTree().isPresent());
2497
2498                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2499             }
2500         };
2501     }
2502
2503     @Test
2504     public void testFollowerInitialSyncStatus() throws Exception {
2505         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2506                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2507                 "testFollowerInitialSyncStatus");
2508
2509         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2510
2511         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2512
2513         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2514
2515         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2516
2517         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2518     }
2519
2520     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2521         modification.ready();
2522         store.validate(modification);
2523         store.commit(store.prepare(modification));
2524     }
2525
2526     @Test
2527     public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
2528         new ShardTestKit(getSystem()) {{
2529             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
2530             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
2531             final Creator<Shard> creator = new Creator<Shard>() {
2532                 boolean firstElectionTimeout = true;
2533
2534                 @Override
2535                 public Shard create() throws Exception {
2536                     return new Shard(shardID, Collections.<String,String>emptyMap(),
2537                         dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2538                         @Override
2539                         public void onReceiveCommand(final Object message) throws Exception {
2540                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
2541                                 firstElectionTimeout = false;
2542                                 final ActorRef self = getSelf();
2543                                 new Thread() {
2544                                     @Override
2545                                     public void run() {
2546                                         Uninterruptibles.awaitUninterruptibly(
2547                                             onChangeListenerRegistered, 5, TimeUnit.SECONDS);
2548                                         self.tell(message, self);
2549                                     }
2550                                 }.start();
2551
2552                                 onFirstElectionTimeout.countDown();
2553                             } else {
2554                                 super.onReceiveCommand(message);
2555                             }
2556                         }
2557                     };
2558                 }
2559             };
2560
2561             final MockDataChangeListener listener = new MockDataChangeListener(1);
2562             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2563                 "testDataChangeListenerOnFollower-DataChangeListener");
2564
2565             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2566                 Props.create(new DelegatingShardCreator(creator)),
2567                 "testDataChangeListenerOnFollower");
2568
2569             assertEquals("Got first ElectionTimeout", true,
2570                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
2571
2572             shard.tell(new FindLeader(), getRef());
2573             final FindLeaderReply findLeadeReply =
2574                 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2575             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
2576
2577             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2578
2579             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2580             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2581                 RegisterChangeListenerReply.class);
2582             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2583
2584             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2585
2586             onChangeListenerRegistered.countDown();
2587
2588             listener.waitForChangeEvents();
2589
2590             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2591             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2592         }};
2593     }
2594
2595     @Test
2596     public void testClusteredDataChangeListernerRegistration() throws Exception {
2597         new ShardTestKit(getSystem()) {{
2598             final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
2599                 .shardName("inventory").type("config").build();
2600
2601             final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
2602                 .shardName("inventory").type("config").build();
2603             final Creator<Shard> followerShardCreator = new Creator<Shard>() {
2604
2605                 @Override
2606                 public Shard create() throws Exception {
2607                     return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
2608                         "akka://test/user/" + member2ShardID.toString()),
2609                         dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2610                         @Override
2611                         public void onReceiveCommand(final Object message) throws Exception {
2612
2613                             if(!(message instanceof ElectionTimeout)) {
2614                                 super.onReceiveCommand(message);
2615                             }
2616                         }
2617                     };
2618                 }
2619             };
2620
2621             final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
2622
2623                 @Override
2624                 public Shard create() throws Exception {
2625                     return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
2626                         "akka://test/user/" + member1ShardID.toString()),
2627                         dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
2628                 }
2629             };
2630
2631
2632             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2633                 Props.create(new DelegatingShardCreator(followerShardCreator)),
2634                 member1ShardID.toString());
2635
2636             final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
2637                 Props.create(new DelegatingShardCreator(leaderShardCreator)),
2638                 member2ShardID.toString());
2639             // Sleep to let election happen
2640             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
2641
2642             shard.tell(new FindLeader(), getRef());
2643             final FindLeaderReply findLeaderReply =
2644                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2645             assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
2646
2647             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2648             final MockDataChangeListener listener = new MockDataChangeListener(1);
2649             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2650                 "testDataChangeListenerOnFollower-DataChangeListener");
2651
2652             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2653             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2654                 RegisterChangeListenerReply.class);
2655             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2656
2657             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2658
2659             listener.waitForChangeEvents();
2660
2661             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2662             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2663         }};
2664     }
2665 }